diff --git a/docs/research/async-utility.md b/docs/research/async-utility.md new file mode 100644 index 0000000..4a1be16 --- /dev/null +++ b/docs/research/async-utility.md @@ -0,0 +1,263 @@ +# Async Utility +This is an older async utility that could potentially be used/adapted so we can drop the repeater depency + +```typescript +/** + * AsyncUtils Library + * A utility library for asynchronous data processing, offering functions for + * mapping, filtering, reducing, batching, deduplication, and more. + */ + +/** + * Converts a source into an AsyncIterable for consistent processing. + * @param source - The source data, which can be an `AsyncIterable`, `Iterable`, `Array`, or `Set`. + * @returns An `AsyncIterable` representation of the source data. + */ +export function toAsyncIterable( + source: AsyncIterable | Iterable | T[] | Set +): AsyncIterable { + if (Symbol.asyncIterator in source) { + return source as AsyncIterable; + } else if (Symbol.iterator in source || Array.isArray(source)) { + return { + async *[Symbol.asyncIterator]() { + for (const item of source as Iterable) { + yield item; + } + }, + }; + } + throw new Error("Invalid source: must be AsyncIterable, Iterable, Array, or Set."); +} + +/** + * Maps items in an AsyncIterable using a transformation function. + * @param source - The source AsyncIterable. + * @param fn - The transformation function. + * @returns An AsyncIterable of transformed items. + */ +export async function* map( + source: AsyncIterable, + fn: (value: T) => Promise | U +): AsyncIterable { + for await (const value of source) { + yield await fn(value); + } +} + +/** + * Filters items in an AsyncIterable based on a predicate function. + * @param source - The source AsyncIterable. + * @param predicate - The predicate function. + * @returns An AsyncIterable of items that satisfy the predicate. + */ +export async function* filter( + source: AsyncIterable, + predicate: (value: T) => Promise | boolean +): AsyncIterable { + for await (const value of source) { + if (await predicate(value)) yield value; + } +} + +/** + * Reduces the items in an AsyncIterable to a single value. + * @param source - The source AsyncIterable. + * @param reducer - The reducer function. + * @param initialValue - The initial value for the reduction. + * @returns The final reduced value. + */ +export async function reduce( + source: AsyncIterable, + reducer: (accumulator: U, value: T) => Promise | U, + initialValue: U +): Promise { + let accumulator = initialValue; + for await (const value of source) { + accumulator = await reducer(accumulator, value); + } + return accumulator; +} + +/** + * Converts an AsyncIterable to an array. + * @param source - The source AsyncIterable. + * @returns A promise resolving to an array of all items. + */ +export async function toArray(source: AsyncIterable): Promise { + const result: T[] = []; + for await (const value of source) { + result.push(value); + } + return result; +} + +/** + * Converts an AsyncIterable to a set of unique items. + * @param source - The source AsyncIterable. + * @returns A promise resolving to a Set of unique items. + */ +export async function toSet(source: AsyncIterable): Promise> { + const result = new Set(); + for await (const value of source) { + result.add(value); + } + return result; +} + +/** + * Yields only the first `count` items from an AsyncIterable. + * @param source - The source AsyncIterable. + * @param count - The maximum number of items to yield. + * @returns An AsyncIterable of the first `count` items. + */ +export async function* take( + source: AsyncIterable, + count: number +): AsyncIterable { + let i = 0; + for await (const value of source) { + if (i++ >= count) return; + yield value; + } +} + +/** + * Chains multiple sources into a single AsyncIterable. + * @param sources - The sources to combine. + * @returns An AsyncIterable combining all the input sources. + */ +export async function* chain( + ...sources: (AsyncIterable | Iterable | T[] | Set)[] +): AsyncIterable { + for (const source of sources) { + for await (const value of toAsyncIterable(source)) { + yield value; + } + } +} + +/** + * Groups items based on a key function and returns a Map. + * @param source - The source AsyncIterable. + * @param keyFn - A function to compute the grouping key for each item. + * @returns A promise resolving to a Map of grouped items. + */ +export async function groupBy( + source: AsyncIterable, + keyFn: (value: T) => K +): Promise> { + const groups = new Map(); + for await (const value of source) { + const key = keyFn(value); + if (!groups.has(key)) { + groups.set(key, []); + } + groups.get(key)!.push(value); + } + return groups; +} + +/** + * Processes the items in batches of a specified size. + * @param source - The source AsyncIterable. + * @param size - The size of each batch. + * @returns An AsyncIterable of batches (arrays) of items. + */ +export async function* batch( + source: AsyncIterable, + size: number +): AsyncIterable { + let batch: T[] = []; + for await (const value of source) { + batch.push(value); + if (batch.length === size) { + yield batch; + batch = []; + } + } + if (batch.length > 0) yield batch; +} + +/** + * Flattens an AsyncIterable of arrays into individual elements. + * @param source - The source AsyncIterable of arrays. + * @returns An AsyncIterable of flattened elements. + */ +export async function* flat( + source: AsyncIterable +): AsyncIterable { + for await (const array of source) { + for (const value of array) { + yield value; + } + } +} + +/** + * Deduplicates items in an AsyncIterable. + * @param source - The source AsyncIterable. + * @returns An AsyncIterable of unique items. + */ +export async function* dedupe( + source: AsyncIterable +): AsyncIterable { + const seen = new Set(); + for await (const value of source) { + if (!seen.has(value)) { + seen.add(value); + yield value; + } + } +} + +/** + * Slides a fixed-size window over the stream, yielding overlapping or non-overlapping windows. + * @param source - The source AsyncIterable. + * @param size - The size of the window. + * @param step - The step size to slide the window (default is 1). + * @returns An AsyncIterable of windows. + */ +export async function* window( + source: AsyncIterable, + size: number, + step: number = 1 +): AsyncIterable { + const buffer: T[] = []; + for await (const value of source) { + buffer.push(value); + if (buffer.length === size) { + yield [...buffer]; + buffer.splice(0, step); + } + } +} + +/** + * Performs a streaming join between two sources based on a key function. + * @param source1 - The first source. + * @param source2 - The second source. + * @param keyFn1 - Key function for the first source. + * @param keyFn2 - Key function for the second source. + * @returns An AsyncIterable of tuples with matching items. + */ +export async function* join( + source1: AsyncIterable, + source2: AsyncIterable, + keyFn1: (value: T) => K, + keyFn2: (value: U) => K +): AsyncIterable<[T, U]> { + const map2 = new Map(); + for await (const value of source2) { + const key = keyFn2(value); + map2.set(key, value); + } + for await (const value of source1) { + const key = keyFn1(value); + if (map2.has(key)) { + yield [value, map2.get(key)!]; + } + } +} +``` + diff --git a/package-lock.json b/package-lock.json index 7686580..cd80765 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,9 +8,6 @@ "name": "@alkdev/pubsub", "version": "0.1.0", "license": "MIT OR Apache-2.0", - "dependencies": { - "@repeaterjs/repeater": "^3.0.0" - }, "devDependencies": { "@types/node": "^22.0.0", "@vitest/coverage-v8": "^3.2.4", @@ -632,12 +629,6 @@ "node": ">=14" } }, - "node_modules/@repeaterjs/repeater": { - "version": "3.0.6", - "resolved": "https://registry.npmjs.org/@repeaterjs/repeater/-/repeater-3.0.6.tgz", - "integrity": "sha512-Javneu5lsuhwNCryN+pXH93VPQ8g0dBX7wItHFgYiwQmzE1sVdg5tWHiOgHywzL2W21XQopa7IwIEnNbmeUJYA==", - "license": "MIT" - }, "node_modules/@rollup/rollup-android-arm-eabi": { "version": "4.60.2", "resolved": "https://registry.npmjs.org/@rollup/rollup-android-arm-eabi/-/rollup-android-arm-eabi-4.60.2.tgz", diff --git a/package.json b/package.json index 6ef9157..8700c50 100644 --- a/package.json +++ b/package.json @@ -52,9 +52,7 @@ "quic" ], "license": "MIT OR Apache-2.0", - "dependencies": { - "@repeaterjs/repeater": "^3.0.0" - }, + "dependencies": {}, "peerDependencies": { "ioredis": "^5.0.0" }, diff --git a/src/create_pubsub.ts b/src/create_pubsub.ts index 2d917bf..70868a8 100644 --- a/src/create_pubsub.ts +++ b/src/create_pubsub.ts @@ -24,7 +24,7 @@ * SOFTWARE. */ -import { Repeater } from "@repeaterjs/repeater"; +import { Repeater } from "./repeater.js"; import type { TypedEventTarget, TypedEvent } from "./types.js"; export type PubSubPublishArgsByKey = { diff --git a/src/index.ts b/src/index.ts index 8a33783..9eddd8d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type PubSubEventTarget, type PubSubPublishArgsByKey } from "./create_pubsub.js"; export { type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js"; export { filter, map, pipe } from "./operators.js"; -export { Repeater } from "@repeaterjs/repeater"; \ No newline at end of file +export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js"; \ No newline at end of file diff --git a/src/operators.ts b/src/operators.ts index 1b057f2..aa60676 100644 --- a/src/operators.ts +++ b/src/operators.ts @@ -4,7 +4,7 @@ * License: MIT */ -import { Repeater, type Stop, type Push } from "@repeaterjs/repeater"; +import { Repeater, type Stop, type Push } from "./repeater.js"; export function filter( filterFn: (input: T) => input is U, diff --git a/src/repeater.ts b/src/repeater.ts new file mode 100644 index 0000000..63c6a6d --- /dev/null +++ b/src/repeater.ts @@ -0,0 +1,413 @@ +/* + * Adapted from @repeaterjs/repeater + * Original source: https://github.com/repeaterjs/repeater + * License: MIT + * + * Copyright (c) 2022 Repeater Contributors + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +export class RepeaterOverflowError extends Error { + constructor(message: string) { + super(message); + Object.defineProperty(this, "name", { + value: "RepeaterOverflowError", + enumerable: false, + }); + if (typeof Object.setPrototypeOf === "function") { + Object.setPrototypeOf(this, this.constructor.prototype); + } else { + (this as any).__proto__ = this.constructor.prototype; + } + + if (typeof (Error as any).captureStackTrace === "function") { + (Error as any).captureStackTrace(this, this.constructor); + } + } +} + +export type Push = ( + value: PromiseLike | T, +) => Promise; + +export type Stop = ((err?: unknown) => undefined) & Promise; + +export type RepeaterExecutor = ( + push: Push, + stop: Stop, +) => PromiseLike | TReturn; + +interface PushOperation { + value: Promise; + resolve(next?: PromiseLike | TNext): unknown; +} + +interface NextOperation { + value: PromiseLike | TNext | undefined; + resolve(iteration: Promise>): unknown; +} + +const Initial = 0; +const Started = 1; +const Stopped = 2; +const Done = 3; +const Rejected = 4; + +export const MAX_QUEUE_LENGTH = 1024; + +const NOOP = () => {}; + +interface RepeaterRecord { + state: number; + executor: RepeaterExecutor; + buffer: RepeaterBuffer | undefined; + pushes: Array>; + nexts: Array>; + pending: Promise | undefined; + execution: Promise | undefined; + err: unknown; + onnext: (value?: PromiseLike | TNext) => unknown; + onstop: () => unknown; +} + +export interface RepeaterBuffer { + empty: boolean; + full: boolean; + add(value: TValue): unknown; + remove(): TValue; +} + +function swallow(value: any): void { + if (value != null && typeof value.then === "function") { + value.then(NOOP, NOOP); + } +} + +function consumeExecution( + r: RepeaterRecord, +): Promise { + const err = r.err; + const execution = Promise.resolve(r.execution).then((value) => { + if (err != null) { + throw err; + } + + return value; + }); + + r.err = undefined; + r.execution = execution.then( + () => undefined, + () => undefined, + ); + + return r.pending === undefined ? execution : r.pending.then(() => execution); +} + +function createIteration( + r: RepeaterRecord, + value: Promise | T | TReturn | undefined, +): Promise> { + const done = r.state >= Done; + return Promise.resolve(value).then((value: any) => { + if (!done && r.state >= Rejected) { + return consumeExecution(r).then((value: any) => ({ + value, + done: true, + })); + } + + return { value, done }; + }); +} + +function stop( + r: RepeaterRecord, + err?: unknown, +): void { + if (r.state >= Stopped) { + return; + } + + r.state = Stopped; + r.onnext(); + r.onstop(); + if (r.err == null) { + r.err = err; + } + + if ( + r.pushes.length === 0 && + (typeof r.buffer === "undefined" || r.buffer.empty) + ) { + finish(r); + } else { + for (const push of r.pushes) { + push.resolve(); + } + } +} + +function finish(r: RepeaterRecord): void { + if (r.state >= Done) { + return; + } + + if (r.state < Stopped) { + stop(r); + } + + r.state = Done; + r.buffer = undefined; + for (const next of r.nexts) { + const execution: Promise = + r.pending === undefined + ? consumeExecution(r) + : r.pending.then(() => consumeExecution(r)); + next.resolve(createIteration(r, execution)); + } + + r.pushes = []; + r.nexts = []; +} + +function reject(r: RepeaterRecord): void { + if (r.state >= Rejected) { + return; + } + + if (r.state < Done) { + finish(r); + } + + r.state = Rejected; +} + +function push( + r: RepeaterRecord, + value: PromiseLike | T, +): Promise { + swallow(value); + if (r.pushes.length >= MAX_QUEUE_LENGTH) { + throw new RepeaterOverflowError( + `No more than ${MAX_QUEUE_LENGTH} pending calls to push are allowed on a single repeater.`, + ); + } else if (r.state >= Stopped) { + return Promise.resolve(undefined); + } + + let valueP: Promise = + r.pending === undefined + ? Promise.resolve(value) + : r.pending.then(() => value); + + valueP = valueP.catch((err) => { + if (r.state < Stopped) { + r.err = err; + } + + reject(r); + return undefined; + }); + + let nextP: Promise; + if (r.nexts.length) { + const next = r.nexts.shift()!; + next.resolve(createIteration(r, valueP)); + if (r.nexts.length) { + nextP = Promise.resolve(r.nexts[0].value); + } else if (typeof r.buffer !== "undefined" && !r.buffer.full) { + nextP = Promise.resolve(undefined); + } else { + nextP = new Promise((resolve) => (r.onnext = resolve)); + } + } else if (typeof r.buffer !== "undefined" && !r.buffer.full) { + r.buffer.add(valueP); + nextP = Promise.resolve(undefined); + } else { + nextP = new Promise((resolve) => r.pushes.push({ resolve, value: valueP })); + } + + let floating = true; + let next = {} as Promise; + const unhandled = nextP.catch((err) => { + if (floating) { + throw err; + } + + return undefined; + }); + + next.then = (onfulfilled, onrejected): any => { + floating = false; + return Promise.prototype.then.call(nextP, onfulfilled, onrejected); + }; + + next.catch = (onrejected): any => { + floating = false; + return Promise.prototype.catch.call(nextP, onrejected); + }; + + next.finally = nextP.finally.bind(nextP); + r.pending = valueP + .then(() => unhandled) + .catch((err) => { + r.err = err; + reject(r); + }); + + return next; +} + +function createStop( + r: RepeaterRecord, +): Stop { + const stop1 = stop.bind(null, r as RepeaterRecord) as Stop; + const stopP = new Promise((resolve) => (r.onstop = resolve as () => unknown)); + stop1.then = stopP.then.bind(stopP); + stop1.catch = stopP.catch.bind(stopP); + stop1.finally = stopP.finally.bind(stopP); + return stop1; +} + +function execute( + r: RepeaterRecord, +): void { + if (r.state >= Started) { + return; + } + + r.state = Started; + const push1 = (push as any).bind(null, r) as Push; + const stop1 = createStop(r); + r.execution = new Promise((resolve) => resolve(r.executor(push1, stop1))); + r.execution.catch(() => stop(r)); +} + +type RecordMap = WeakMap< + Repeater, + RepeaterRecord +>; + +const records: RecordMap = new WeakMap(); + +export class Repeater { + constructor( + executor: RepeaterExecutor, + buffer?: RepeaterBuffer | undefined, + ) { + records.set(this, { + executor, + buffer, + err: undefined, + state: Initial, + pushes: [], + nexts: [], + pending: undefined, + execution: undefined, + onnext: NOOP, + onstop: NOOP, + }); + } + + next( + value?: PromiseLike | TNext, + ): Promise> { + swallow(value); + const r = records.get(this); + if (r === undefined) { + throw new Error("WeakMap error"); + } + + if (r.nexts.length >= MAX_QUEUE_LENGTH) { + throw new RepeaterOverflowError( + `No more than ${MAX_QUEUE_LENGTH} pending calls to next are allowed on a single repeater.`, + ); + } + + if (r.state <= Initial) { + execute(r); + } + + r.onnext(value); + if (typeof r.buffer !== "undefined" && !r.buffer.empty) { + const result = createIteration( + r, + r.buffer.remove() as Promise, + ); + if (r.pushes.length) { + const push = r.pushes.shift()!; + r.buffer.add(push.value); + r.onnext = push.resolve; + } + + return result; + } else if (r.pushes.length) { + const push = r.pushes.shift()!; + r.onnext = push.resolve; + return createIteration(r, push.value); + } else if (r.state >= Stopped) { + finish(r); + return createIteration(r, consumeExecution(r)); + } + + return new Promise((resolve) => r.nexts.push({ resolve, value })); + } + + return( + value?: PromiseLike | TReturn, + ): Promise> { + swallow(value); + const r = records.get(this); + if (r === undefined) { + throw new Error("WeakMap error"); + } + + finish(r); + r.execution = Promise.resolve(r.execution).then(() => value); + return createIteration(r, consumeExecution(r)); + } + + throw(err: unknown): Promise> { + const r = records.get(this); + if (r === undefined) { + throw new Error("WeakMap error"); + } + + if ( + r.state <= Initial || + r.state >= Stopped || + (typeof r.buffer !== "undefined" && !r.buffer.empty) + ) { + finish(r); + if (r.err == null) { + r.err = err; + } + + return createIteration(r, consumeExecution(r)); + } + + return this.next(Promise.reject(err)); + } + + [Symbol.asyncIterator](): this { + return this; + } +} \ No newline at end of file