# Async Utility This is an older async utility that could potentially be used/adapted so we can drop the repeater depency ```typescript /** * AsyncUtils Library * A utility library for asynchronous data processing, offering functions for * mapping, filtering, reducing, batching, deduplication, and more. */ /** * Converts a source into an AsyncIterable for consistent processing. * @param source - The source data, which can be an `AsyncIterable`, `Iterable`, `Array`, or `Set`. * @returns An `AsyncIterable` representation of the source data. */ export function toAsyncIterable( source: AsyncIterable | Iterable | T[] | Set ): AsyncIterable { if (Symbol.asyncIterator in source) { return source as AsyncIterable; } else if (Symbol.iterator in source || Array.isArray(source)) { return { async *[Symbol.asyncIterator]() { for (const item of source as Iterable) { yield item; } }, }; } throw new Error("Invalid source: must be AsyncIterable, Iterable, Array, or Set."); } /** * Maps items in an AsyncIterable using a transformation function. * @param source - The source AsyncIterable. * @param fn - The transformation function. * @returns An AsyncIterable of transformed items. */ export async function* map( source: AsyncIterable, fn: (value: T) => Promise | U ): AsyncIterable { for await (const value of source) { yield await fn(value); } } /** * Filters items in an AsyncIterable based on a predicate function. * @param source - The source AsyncIterable. * @param predicate - The predicate function. * @returns An AsyncIterable of items that satisfy the predicate. */ export async function* filter( source: AsyncIterable, predicate: (value: T) => Promise | boolean ): AsyncIterable { for await (const value of source) { if (await predicate(value)) yield value; } } /** * Reduces the items in an AsyncIterable to a single value. * @param source - The source AsyncIterable. * @param reducer - The reducer function. * @param initialValue - The initial value for the reduction. * @returns The final reduced value. */ export async function reduce( source: AsyncIterable, reducer: (accumulator: U, value: T) => Promise | U, initialValue: U ): Promise { let accumulator = initialValue; for await (const value of source) { accumulator = await reducer(accumulator, value); } return accumulator; } /** * Converts an AsyncIterable to an array. * @param source - The source AsyncIterable. * @returns A promise resolving to an array of all items. */ export async function toArray(source: AsyncIterable): Promise { const result: T[] = []; for await (const value of source) { result.push(value); } return result; } /** * Converts an AsyncIterable to a set of unique items. * @param source - The source AsyncIterable. * @returns A promise resolving to a Set of unique items. */ export async function toSet(source: AsyncIterable): Promise> { const result = new Set(); for await (const value of source) { result.add(value); } return result; } /** * Yields only the first `count` items from an AsyncIterable. * @param source - The source AsyncIterable. * @param count - The maximum number of items to yield. * @returns An AsyncIterable of the first `count` items. */ export async function* take( source: AsyncIterable, count: number ): AsyncIterable { let i = 0; for await (const value of source) { if (i++ >= count) return; yield value; } } /** * Chains multiple sources into a single AsyncIterable. * @param sources - The sources to combine. * @returns An AsyncIterable combining all the input sources. */ export async function* chain( ...sources: (AsyncIterable | Iterable | T[] | Set)[] ): AsyncIterable { for (const source of sources) { for await (const value of toAsyncIterable(source)) { yield value; } } } /** * Groups items based on a key function and returns a Map. * @param source - The source AsyncIterable. * @param keyFn - A function to compute the grouping key for each item. * @returns A promise resolving to a Map of grouped items. */ export async function groupBy( source: AsyncIterable, keyFn: (value: T) => K ): Promise> { const groups = new Map(); for await (const value of source) { const key = keyFn(value); if (!groups.has(key)) { groups.set(key, []); } groups.get(key)!.push(value); } return groups; } /** * Processes the items in batches of a specified size. * @param source - The source AsyncIterable. * @param size - The size of each batch. * @returns An AsyncIterable of batches (arrays) of items. */ export async function* batch( source: AsyncIterable, size: number ): AsyncIterable { let batch: T[] = []; for await (const value of source) { batch.push(value); if (batch.length === size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } /** * Flattens an AsyncIterable of arrays into individual elements. * @param source - The source AsyncIterable of arrays. * @returns An AsyncIterable of flattened elements. */ export async function* flat( source: AsyncIterable ): AsyncIterable { for await (const array of source) { for (const value of array) { yield value; } } } /** * Deduplicates items in an AsyncIterable. * @param source - The source AsyncIterable. * @returns An AsyncIterable of unique items. */ export async function* dedupe( source: AsyncIterable ): AsyncIterable { const seen = new Set(); for await (const value of source) { if (!seen.has(value)) { seen.add(value); yield value; } } } /** * Slides a fixed-size window over the stream, yielding overlapping or non-overlapping windows. * @param source - The source AsyncIterable. * @param size - The size of the window. * @param step - The step size to slide the window (default is 1). * @returns An AsyncIterable of windows. */ export async function* window( source: AsyncIterable, size: number, step: number = 1 ): AsyncIterable { const buffer: T[] = []; for await (const value of source) { buffer.push(value); if (buffer.length === size) { yield [...buffer]; buffer.splice(0, step); } } } /** * Performs a streaming join between two sources based on a key function. * @param source1 - The first source. * @param source2 - The second source. * @param keyFn1 - Key function for the first source. * @param keyFn2 - Key function for the second source. * @returns An AsyncIterable of tuples with matching items. */ export async function* join( source1: AsyncIterable, source2: AsyncIterable, keyFn1: (value: T) => K, keyFn2: (value: U) => K ): AsyncIterable<[T, U]> { const map2 = new Map(); for await (const value of source2) { const key = keyFn2(value); map2.set(key, value); } for await (const value of source1) { const key = keyFn1(value); if (map2.has(key)) { yield [value, map2.get(key)!]; } } } ```