Files
pubsub/docs/research/async-utility.md
glm-5.1 9c332529df Replace @repeaterjs/repeater dependency with inlined Repeater class
Port the core Repeater class (push/stop executor, async iterator protocol,
overflow protection) from @repeaterjs/repeater into src/repeater.ts, stripping
buffers and combinators we don't use. This removes the 7-year-old dependency
while keeping the same API surface for all current and future adapters
(Redis, WebSocket, SSE, Iroh).

Also add the async utility research doc for reference.
2026-04-30 12:10:11 +00:00

7.1 KiB

Async Utility

This is an older async utility that could potentially be used/adapted so we can drop the repeater depency

/**
 * AsyncUtils Library
 * A utility library for asynchronous data processing, offering functions for
 * mapping, filtering, reducing, batching, deduplication, and more.
 */

/**
 * Converts a source into an AsyncIterable for consistent processing.
 * @param source - The source data, which can be an `AsyncIterable`, `Iterable`, `Array`, or `Set`.
 * @returns An `AsyncIterable` representation of the source data.
 */
export function toAsyncIterable<T>(
  source: AsyncIterable<T> | Iterable<T> | T[] | Set<T>
): AsyncIterable<T> {
  if (Symbol.asyncIterator in source) {
    return source as AsyncIterable<T>;
  } else if (Symbol.iterator in source || Array.isArray(source)) {
    return {
      async *[Symbol.asyncIterator]() {
        for (const item of source as Iterable<T>) {
          yield item;
        }
      },
    };
  }
  throw new Error("Invalid source: must be AsyncIterable, Iterable, Array, or Set.");
}

/**
 * Maps items in an AsyncIterable using a transformation function.
 * @param source - The source AsyncIterable.
 * @param fn - The transformation function.
 * @returns An AsyncIterable of transformed items.
 */
export async function* map<T, U>(
  source: AsyncIterable<T>,
  fn: (value: T) => Promise<U> | U
): AsyncIterable<U> {
  for await (const value of source) {
    yield await fn(value);
  }
}

/**
 * Filters items in an AsyncIterable based on a predicate function.
 * @param source - The source AsyncIterable.
 * @param predicate - The predicate function.
 * @returns An AsyncIterable of items that satisfy the predicate.
 */
export async function* filter<T>(
  source: AsyncIterable<T>,
  predicate: (value: T) => Promise<boolean> | boolean
): AsyncIterable<T> {
  for await (const value of source) {
    if (await predicate(value)) yield value;
  }
}

/**
 * Reduces the items in an AsyncIterable to a single value.
 * @param source - The source AsyncIterable.
 * @param reducer - The reducer function.
 * @param initialValue - The initial value for the reduction.
 * @returns The final reduced value.
 */
export async function reduce<T, U>(
  source: AsyncIterable<T>,
  reducer: (accumulator: U, value: T) => Promise<U> | U,
  initialValue: U
): Promise<U> {
  let accumulator = initialValue;
  for await (const value of source) {
    accumulator = await reducer(accumulator, value);
  }
  return accumulator;
}

/**
 * Converts an AsyncIterable to an array.
 * @param source - The source AsyncIterable.
 * @returns A promise resolving to an array of all items.
 */
export async function toArray<T>(source: AsyncIterable<T>): Promise<T[]> {
  const result: T[] = [];
  for await (const value of source) {
    result.push(value);
  }
  return result;
}

/**
 * Converts an AsyncIterable to a set of unique items.
 * @param source - The source AsyncIterable.
 * @returns A promise resolving to a Set of unique items.
 */
export async function toSet<T>(source: AsyncIterable<T>): Promise<Set<T>> {
  const result = new Set<T>();
  for await (const value of source) {
    result.add(value);
  }
  return result;
}

/**
 * Yields only the first `count` items from an AsyncIterable.
 * @param source - The source AsyncIterable.
 * @param count - The maximum number of items to yield.
 * @returns An AsyncIterable of the first `count` items.
 */
export async function* take<T>(
  source: AsyncIterable<T>,
  count: number
): AsyncIterable<T> {
  let i = 0;
  for await (const value of source) {
    if (i++ >= count) return;
    yield value;
  }
}

/**
 * Chains multiple sources into a single AsyncIterable.
 * @param sources - The sources to combine.
 * @returns An AsyncIterable combining all the input sources.
 */
export async function* chain<T>(
  ...sources: (AsyncIterable<T> | Iterable<T> | T[] | Set<T>)[]
): AsyncIterable<T> {
  for (const source of sources) {
    for await (const value of toAsyncIterable(source)) {
      yield value;
    }
  }
}

/**
 * Groups items based on a key function and returns a Map.
 * @param source - The source AsyncIterable.
 * @param keyFn - A function to compute the grouping key for each item.
 * @returns A promise resolving to a Map of grouped items.
 */
export async function groupBy<T, K>(
  source: AsyncIterable<T>,
  keyFn: (value: T) => K
): Promise<Map<K, T[]>> {
  const groups = new Map<K, T[]>();
  for await (const value of source) {
    const key = keyFn(value);
    if (!groups.has(key)) {
      groups.set(key, []);
    }
    groups.get(key)!.push(value);
  }
  return groups;
}

/**
 * Processes the items in batches of a specified size.
 * @param source - The source AsyncIterable.
 * @param size - The size of each batch.
 * @returns An AsyncIterable of batches (arrays) of items.
 */
export async function* batch<T>(
  source: AsyncIterable<T>,
  size: number
): AsyncIterable<T[]> {
  let batch: T[] = [];
  for await (const value of source) {
    batch.push(value);
    if (batch.length === size) {
      yield batch;
      batch = [];
    }
  }
  if (batch.length > 0) yield batch;
}

/**
 * Flattens an AsyncIterable of arrays into individual elements.
 * @param source - The source AsyncIterable of arrays.
 * @returns An AsyncIterable of flattened elements.
 */
export async function* flat<T>(
  source: AsyncIterable<T[]>
): AsyncIterable<T> {
  for await (const array of source) {
    for (const value of array) {
      yield value;
    }
  }
}

/**
 * Deduplicates items in an AsyncIterable.
 * @param source - The source AsyncIterable.
 * @returns An AsyncIterable of unique items.
 */
export async function* dedupe<T>(
  source: AsyncIterable<T>
): AsyncIterable<T> {
  const seen = new Set<T>();
  for await (const value of source) {
    if (!seen.has(value)) {
      seen.add(value);
      yield value;
    }
  }
}

/**
 * Slides a fixed-size window over the stream, yielding overlapping or non-overlapping windows.
 * @param source - The source AsyncIterable.
 * @param size - The size of the window.
 * @param step - The step size to slide the window (default is 1).
 * @returns An AsyncIterable of windows.
 */
export async function* window<T>(
  source: AsyncIterable<T>,
  size: number,
  step: number = 1
): AsyncIterable<T[]> {
  const buffer: T[] = [];
  for await (const value of source) {
    buffer.push(value);
    if (buffer.length === size) {
      yield [...buffer];
      buffer.splice(0, step);
    }
  }
}

/**
 * Performs a streaming join between two sources based on a key function.
 * @param source1 - The first source.
 * @param source2 - The second source.
 * @param keyFn1 - Key function for the first source.
 * @param keyFn2 - Key function for the second source.
 * @returns An AsyncIterable of tuples with matching items.
 */
export async function* join<T, U, K>(
  source1: AsyncIterable<T>,
  source2: AsyncIterable<U>,
  keyFn1: (value: T) => K,
  keyFn2: (value: U) => K
): AsyncIterable<[T, U]> {
  const map2 = new Map<K, U>();
  for await (const value of source2) {
    const key = keyFn2(value);
    map2.set(key, value);
  }
  for await (const value of source1) {
    const key = keyFn1(value);
    if (map2.has(key)) {
      yield [value, map2.get(key)!];
    }
  }
}