Files
pubsub/docs/research/async-utility.md
glm-5.1 9c332529df Replace @repeaterjs/repeater dependency with inlined Repeater class
Port the core Repeater class (push/stop executor, async iterator protocol,
overflow protection) from @repeaterjs/repeater into src/repeater.ts, stripping
buffers and combinators we don't use. This removes the 7-year-old dependency
while keeping the same API surface for all current and future adapters
(Redis, WebSocket, SSE, Iroh).

Also add the async utility research doc for reference.
2026-04-30 12:10:11 +00:00

264 lines
7.1 KiB
Markdown

# Async Utility
This is an older async utility that could potentially be used/adapted so we can drop the repeater depency
```typescript
/**
* AsyncUtils Library
* A utility library for asynchronous data processing, offering functions for
* mapping, filtering, reducing, batching, deduplication, and more.
*/
/**
* Converts a source into an AsyncIterable for consistent processing.
* @param source - The source data, which can be an `AsyncIterable`, `Iterable`, `Array`, or `Set`.
* @returns An `AsyncIterable` representation of the source data.
*/
export function toAsyncIterable<T>(
source: AsyncIterable<T> | Iterable<T> | T[] | Set<T>
): AsyncIterable<T> {
if (Symbol.asyncIterator in source) {
return source as AsyncIterable<T>;
} else if (Symbol.iterator in source || Array.isArray(source)) {
return {
async *[Symbol.asyncIterator]() {
for (const item of source as Iterable<T>) {
yield item;
}
},
};
}
throw new Error("Invalid source: must be AsyncIterable, Iterable, Array, or Set.");
}
/**
* Maps items in an AsyncIterable using a transformation function.
* @param source - The source AsyncIterable.
* @param fn - The transformation function.
* @returns An AsyncIterable of transformed items.
*/
export async function* map<T, U>(
source: AsyncIterable<T>,
fn: (value: T) => Promise<U> | U
): AsyncIterable<U> {
for await (const value of source) {
yield await fn(value);
}
}
/**
* Filters items in an AsyncIterable based on a predicate function.
* @param source - The source AsyncIterable.
* @param predicate - The predicate function.
* @returns An AsyncIterable of items that satisfy the predicate.
*/
export async function* filter<T>(
source: AsyncIterable<T>,
predicate: (value: T) => Promise<boolean> | boolean
): AsyncIterable<T> {
for await (const value of source) {
if (await predicate(value)) yield value;
}
}
/**
* Reduces the items in an AsyncIterable to a single value.
* @param source - The source AsyncIterable.
* @param reducer - The reducer function.
* @param initialValue - The initial value for the reduction.
* @returns The final reduced value.
*/
export async function reduce<T, U>(
source: AsyncIterable<T>,
reducer: (accumulator: U, value: T) => Promise<U> | U,
initialValue: U
): Promise<U> {
let accumulator = initialValue;
for await (const value of source) {
accumulator = await reducer(accumulator, value);
}
return accumulator;
}
/**
* Converts an AsyncIterable to an array.
* @param source - The source AsyncIterable.
* @returns A promise resolving to an array of all items.
*/
export async function toArray<T>(source: AsyncIterable<T>): Promise<T[]> {
const result: T[] = [];
for await (const value of source) {
result.push(value);
}
return result;
}
/**
* Converts an AsyncIterable to a set of unique items.
* @param source - The source AsyncIterable.
* @returns A promise resolving to a Set of unique items.
*/
export async function toSet<T>(source: AsyncIterable<T>): Promise<Set<T>> {
const result = new Set<T>();
for await (const value of source) {
result.add(value);
}
return result;
}
/**
* Yields only the first `count` items from an AsyncIterable.
* @param source - The source AsyncIterable.
* @param count - The maximum number of items to yield.
* @returns An AsyncIterable of the first `count` items.
*/
export async function* take<T>(
source: AsyncIterable<T>,
count: number
): AsyncIterable<T> {
let i = 0;
for await (const value of source) {
if (i++ >= count) return;
yield value;
}
}
/**
* Chains multiple sources into a single AsyncIterable.
* @param sources - The sources to combine.
* @returns An AsyncIterable combining all the input sources.
*/
export async function* chain<T>(
...sources: (AsyncIterable<T> | Iterable<T> | T[] | Set<T>)[]
): AsyncIterable<T> {
for (const source of sources) {
for await (const value of toAsyncIterable(source)) {
yield value;
}
}
}
/**
* Groups items based on a key function and returns a Map.
* @param source - The source AsyncIterable.
* @param keyFn - A function to compute the grouping key for each item.
* @returns A promise resolving to a Map of grouped items.
*/
export async function groupBy<T, K>(
source: AsyncIterable<T>,
keyFn: (value: T) => K
): Promise<Map<K, T[]>> {
const groups = new Map<K, T[]>();
for await (const value of source) {
const key = keyFn(value);
if (!groups.has(key)) {
groups.set(key, []);
}
groups.get(key)!.push(value);
}
return groups;
}
/**
* Processes the items in batches of a specified size.
* @param source - The source AsyncIterable.
* @param size - The size of each batch.
* @returns An AsyncIterable of batches (arrays) of items.
*/
export async function* batch<T>(
source: AsyncIterable<T>,
size: number
): AsyncIterable<T[]> {
let batch: T[] = [];
for await (const value of source) {
batch.push(value);
if (batch.length === size) {
yield batch;
batch = [];
}
}
if (batch.length > 0) yield batch;
}
/**
* Flattens an AsyncIterable of arrays into individual elements.
* @param source - The source AsyncIterable of arrays.
* @returns An AsyncIterable of flattened elements.
*/
export async function* flat<T>(
source: AsyncIterable<T[]>
): AsyncIterable<T> {
for await (const array of source) {
for (const value of array) {
yield value;
}
}
}
/**
* Deduplicates items in an AsyncIterable.
* @param source - The source AsyncIterable.
* @returns An AsyncIterable of unique items.
*/
export async function* dedupe<T>(
source: AsyncIterable<T>
): AsyncIterable<T> {
const seen = new Set<T>();
for await (const value of source) {
if (!seen.has(value)) {
seen.add(value);
yield value;
}
}
}
/**
* Slides a fixed-size window over the stream, yielding overlapping or non-overlapping windows.
* @param source - The source AsyncIterable.
* @param size - The size of the window.
* @param step - The step size to slide the window (default is 1).
* @returns An AsyncIterable of windows.
*/
export async function* window<T>(
source: AsyncIterable<T>,
size: number,
step: number = 1
): AsyncIterable<T[]> {
const buffer: T[] = [];
for await (const value of source) {
buffer.push(value);
if (buffer.length === size) {
yield [...buffer];
buffer.splice(0, step);
}
}
}
/**
* Performs a streaming join between two sources based on a key function.
* @param source1 - The first source.
* @param source2 - The second source.
* @param keyFn1 - Key function for the first source.
* @param keyFn2 - Key function for the second source.
* @returns An AsyncIterable of tuples with matching items.
*/
export async function* join<T, U, K>(
source1: AsyncIterable<T>,
source2: AsyncIterable<U>,
keyFn1: (value: T) => K,
keyFn2: (value: U) => K
): AsyncIterable<[T, U]> {
const map2 = new Map<K, U>();
for await (const value of source2) {
const key = keyFn2(value);
map2.set(key, value);
}
for await (const value of source1) {
const key = keyFn1(value);
if (map2.has(key)) {
yield [value, map2.get(key)!];
}
}
}
```