Port the core Repeater class (push/stop executor, async iterator protocol, overflow protection) from @repeaterjs/repeater into src/repeater.ts, stripping buffers and combinators we don't use. This removes the 7-year-old dependency while keeping the same API surface for all current and future adapters (Redis, WebSocket, SSE, Iroh). Also add the async utility research doc for reference.
7.1 KiB
7.1 KiB
Async Utility
This is an older async utility that could potentially be used/adapted so we can drop the repeater depency
/**
* AsyncUtils Library
* A utility library for asynchronous data processing, offering functions for
* mapping, filtering, reducing, batching, deduplication, and more.
*/
/**
* Converts a source into an AsyncIterable for consistent processing.
* @param source - The source data, which can be an `AsyncIterable`, `Iterable`, `Array`, or `Set`.
* @returns An `AsyncIterable` representation of the source data.
*/
export function toAsyncIterable<T>(
source: AsyncIterable<T> | Iterable<T> | T[] | Set<T>
): AsyncIterable<T> {
if (Symbol.asyncIterator in source) {
return source as AsyncIterable<T>;
} else if (Symbol.iterator in source || Array.isArray(source)) {
return {
async *[Symbol.asyncIterator]() {
for (const item of source as Iterable<T>) {
yield item;
}
},
};
}
throw new Error("Invalid source: must be AsyncIterable, Iterable, Array, or Set.");
}
/**
* Maps items in an AsyncIterable using a transformation function.
* @param source - The source AsyncIterable.
* @param fn - The transformation function.
* @returns An AsyncIterable of transformed items.
*/
export async function* map<T, U>(
source: AsyncIterable<T>,
fn: (value: T) => Promise<U> | U
): AsyncIterable<U> {
for await (const value of source) {
yield await fn(value);
}
}
/**
* Filters items in an AsyncIterable based on a predicate function.
* @param source - The source AsyncIterable.
* @param predicate - The predicate function.
* @returns An AsyncIterable of items that satisfy the predicate.
*/
export async function* filter<T>(
source: AsyncIterable<T>,
predicate: (value: T) => Promise<boolean> | boolean
): AsyncIterable<T> {
for await (const value of source) {
if (await predicate(value)) yield value;
}
}
/**
* Reduces the items in an AsyncIterable to a single value.
* @param source - The source AsyncIterable.
* @param reducer - The reducer function.
* @param initialValue - The initial value for the reduction.
* @returns The final reduced value.
*/
export async function reduce<T, U>(
source: AsyncIterable<T>,
reducer: (accumulator: U, value: T) => Promise<U> | U,
initialValue: U
): Promise<U> {
let accumulator = initialValue;
for await (const value of source) {
accumulator = await reducer(accumulator, value);
}
return accumulator;
}
/**
* Converts an AsyncIterable to an array.
* @param source - The source AsyncIterable.
* @returns A promise resolving to an array of all items.
*/
export async function toArray<T>(source: AsyncIterable<T>): Promise<T[]> {
const result: T[] = [];
for await (const value of source) {
result.push(value);
}
return result;
}
/**
* Converts an AsyncIterable to a set of unique items.
* @param source - The source AsyncIterable.
* @returns A promise resolving to a Set of unique items.
*/
export async function toSet<T>(source: AsyncIterable<T>): Promise<Set<T>> {
const result = new Set<T>();
for await (const value of source) {
result.add(value);
}
return result;
}
/**
* Yields only the first `count` items from an AsyncIterable.
* @param source - The source AsyncIterable.
* @param count - The maximum number of items to yield.
* @returns An AsyncIterable of the first `count` items.
*/
export async function* take<T>(
source: AsyncIterable<T>,
count: number
): AsyncIterable<T> {
let i = 0;
for await (const value of source) {
if (i++ >= count) return;
yield value;
}
}
/**
* Chains multiple sources into a single AsyncIterable.
* @param sources - The sources to combine.
* @returns An AsyncIterable combining all the input sources.
*/
export async function* chain<T>(
...sources: (AsyncIterable<T> | Iterable<T> | T[] | Set<T>)[]
): AsyncIterable<T> {
for (const source of sources) {
for await (const value of toAsyncIterable(source)) {
yield value;
}
}
}
/**
* Groups items based on a key function and returns a Map.
* @param source - The source AsyncIterable.
* @param keyFn - A function to compute the grouping key for each item.
* @returns A promise resolving to a Map of grouped items.
*/
export async function groupBy<T, K>(
source: AsyncIterable<T>,
keyFn: (value: T) => K
): Promise<Map<K, T[]>> {
const groups = new Map<K, T[]>();
for await (const value of source) {
const key = keyFn(value);
if (!groups.has(key)) {
groups.set(key, []);
}
groups.get(key)!.push(value);
}
return groups;
}
/**
* Processes the items in batches of a specified size.
* @param source - The source AsyncIterable.
* @param size - The size of each batch.
* @returns An AsyncIterable of batches (arrays) of items.
*/
export async function* batch<T>(
source: AsyncIterable<T>,
size: number
): AsyncIterable<T[]> {
let batch: T[] = [];
for await (const value of source) {
batch.push(value);
if (batch.length === size) {
yield batch;
batch = [];
}
}
if (batch.length > 0) yield batch;
}
/**
* Flattens an AsyncIterable of arrays into individual elements.
* @param source - The source AsyncIterable of arrays.
* @returns An AsyncIterable of flattened elements.
*/
export async function* flat<T>(
source: AsyncIterable<T[]>
): AsyncIterable<T> {
for await (const array of source) {
for (const value of array) {
yield value;
}
}
}
/**
* Deduplicates items in an AsyncIterable.
* @param source - The source AsyncIterable.
* @returns An AsyncIterable of unique items.
*/
export async function* dedupe<T>(
source: AsyncIterable<T>
): AsyncIterable<T> {
const seen = new Set<T>();
for await (const value of source) {
if (!seen.has(value)) {
seen.add(value);
yield value;
}
}
}
/**
* Slides a fixed-size window over the stream, yielding overlapping or non-overlapping windows.
* @param source - The source AsyncIterable.
* @param size - The size of the window.
* @param step - The step size to slide the window (default is 1).
* @returns An AsyncIterable of windows.
*/
export async function* window<T>(
source: AsyncIterable<T>,
size: number,
step: number = 1
): AsyncIterable<T[]> {
const buffer: T[] = [];
for await (const value of source) {
buffer.push(value);
if (buffer.length === size) {
yield [...buffer];
buffer.splice(0, step);
}
}
}
/**
* Performs a streaming join between two sources based on a key function.
* @param source1 - The first source.
* @param source2 - The second source.
* @param keyFn1 - Key function for the first source.
* @param keyFn2 - Key function for the second source.
* @returns An AsyncIterable of tuples with matching items.
*/
export async function* join<T, U, K>(
source1: AsyncIterable<T>,
source2: AsyncIterable<U>,
keyFn1: (value: T) => K,
keyFn2: (value: U) => K
): AsyncIterable<[T, U]> {
const map2 = new Map<K, U>();
for await (const value of source2) {
const key = keyFn2(value);
map2.set(key, value);
}
for await (const value of source1) {
const key = keyFn1(value);
if (map2.has(key)) {
yield [value, map2.get(key)!];
}
}
}