import { describe, it, expect } from "vitest"; import { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join, } from "../src/operators.js"; import { createPubSub } from "../src/create_pubsub.js"; import { Repeater } from "../src/repeater.js"; async function* fromArray(items: T[]): AsyncIterable { for (const item of items) { yield item; } } async function collect(source: AsyncIterable): Promise { const result: T[] = []; for await (const item of source) { result.push(item); } return result; } describe("filter", () => { it("filters items by predicate", async () => { const source = fromArray([1, 2, 3, 4, 5]); const result = await collect(filter((x: number) => x % 2 === 0)(source)); expect(result).toEqual([2, 4]); }); it("supports type-narrowing overload", async () => { type Fish = { type: "fish"; swim: boolean }; type Bird = { type: "bird"; fly: boolean }; type Animal = Fish | Bird; const animals: Animal[] = [ { type: "fish", swim: true }, { type: "bird", fly: true }, { type: "fish", swim: false }, ]; const source = fromArray(animals); const isFish = (a: Animal): a is Fish => a.type === "fish"; const result = await collect(filter(isFish)(source)); expect(result).toEqual([ { type: "fish", swim: true }, { type: "fish", swim: false }, ]); }); it("supports async predicate", async () => { const source = fromArray([1, 2, 3, 4, 5]); const result = await collect( filter(async (x: number) => { await Promise.resolve(); return x > 3; })(source), ); expect(result).toEqual([4, 5]); }); it("yields nothing when all items are filtered", async () => { const source = fromArray([1, 3, 5]); const result = await collect(filter((x: number) => x % 2 === 0)(source)); expect(result).toEqual([]); }); it("yields all items when nothing is filtered", async () => { const source = fromArray([2, 4, 6]); const result = await collect(filter((x: number) => x % 2 === 0)(source)); expect(result).toEqual([2, 4, 6]); }); }); describe("map", () => { it("transforms items", async () => { const source = fromArray([1, 2, 3]); const result = await collect(map((x: number) => x * 2)(source)); expect(result).toEqual([2, 4, 6]); }); it("supports async mapper", async () => { const source = fromArray([1, 2, 3]); const result = await collect( map(async (x: number) => { await Promise.resolve(); return x + 10; })(source), ); expect(result).toEqual([11, 12, 13]); }); it("transforms types", async () => { const source = fromArray([1, 2, 3]); const result = await collect(map((x: number) => `num:${x}`)(source)); expect(result).toEqual(["num:1", "num:2", "num:3"]); }); }); describe("pipe", () => { it("returns input when no functions provided", () => { const result = pipe(42); expect(result).toBe(42); }); it("composes 1 function", () => { const result = pipe(5, (x: number) => x * 2); expect(result).toBe(10); }); it("composes 2 functions", () => { const result = pipe(5, (x: number) => x * 2, (x: number) => x + 1); expect(result).toBe(11); }); it("composes 3 functions", () => { const result = pipe( 5, (x: number) => x * 2, (x: number) => x + 1, (x: number) => x * 3, ); expect(result).toBe(33); }); it("composes 4 functions", () => { const result = pipe( 5, (x: number) => x * 2, (x: number) => x + 1, (x: number) => x * 3, (x: number) => x - 5, ); expect(result).toBe(28); }); it("composes filter and map with subscribe", async () => { type Events = { message: string; }; const pubsub = createPubSub(); const id = "test-id"; const subscription = pubsub.subscribe("message", id); const filtered = filter((e: { type: string; id: string; payload: string }) => e.payload.startsWith("hello")); const mapped = map((e: { type: string; id: string; payload: string }) => e.payload.toUpperCase()); const result = pipe(subscription, filtered, mapped); const iterator = result[Symbol.asyncIterator](); const results: string[] = []; const collectNext = () => iterator.next().then((r) => { if (!r.done) results.push(r.value); }); void collectNext(); void collectNext(); pubsub.publish("message", id, "hello world"); pubsub.publish("message", id, "goodbye"); pubsub.publish("message", id, "hello again"); await new Promise((r) => setTimeout(r, 50)); await iterator.return?.(); expect(results).toEqual(["HELLO WORLD", "HELLO AGAIN"]); }); }); describe("take", () => { it("yields first N items, then stops", async () => { const source = fromArray([1, 2, 3, 4, 5]); const result = await collect(take(source, 3)); expect(result).toEqual([1, 2, 3]); }); it("yields all items if count exceeds source length", async () => { const source = fromArray([1, 2]); const result = await collect(take(source, 5)); expect(result).toEqual([1, 2]); }); it("yields nothing when count is 0", async () => { const source = fromArray([1, 2, 3]); const result = await collect(take(source, 0)); expect(result).toEqual([]); }); it("works with single item", async () => { const source = fromArray([42]); const result = await collect(take(source, 1)); expect(result).toEqual([42]); }); }); describe("reduce", () => { it("reduces to single value", async () => { const source = fromArray([1, 2, 3, 4]); const result = await reduce(source, (acc: number, val: number) => acc + val, 0); expect(result).toBe(10); }); it("returns initial value for empty source", async () => { const source = fromArray([]); const result = await reduce(source, (acc: number, val: number) => acc + val, 0); expect(result).toBe(0); }); it("supports async reducer", async () => { const source = fromArray([1, 2, 3]); const result = await reduce( source, async (acc: number, val: number) => { await Promise.resolve(); return acc + val; }, 0, ); expect(result).toBe(6); }); it("reduces with string concatenation", async () => { const source = fromArray(["a", "b", "c"]); const result = await reduce(source, (acc: string, val: string) => acc + val, ""); expect(result).toBe("abc"); }); }); describe("toArray", () => { it("collects all items into array", async () => { const source = fromArray([1, 2, 3]); const result = await toArray(source); expect(result).toEqual([1, 2, 3]); }); it("returns empty array for empty source", async () => { const source = fromArray([]); const result = await toArray(source); expect(result).toEqual([]); }); it("preserves order", async () => { const source = fromArray([5, 3, 1, 4, 2]); const result = await toArray(source); expect(result).toEqual([5, 3, 1, 4, 2]); }); }); describe("batch", () => { it("groups into arrays of size", async () => { const source = fromArray([1, 2, 3, 4, 5, 6]); const result = await collect(batch(source, 2)); expect(result).toEqual([[1, 2], [3, 4], [5, 6]]); }); it("yields remaining items if not a full batch", async () => { const source = fromArray([1, 2, 3, 4, 5]); const result = await collect(batch(source, 2)); expect(result).toEqual([[1, 2], [3, 4], [5]]); }); it("yields single batch when source length equals size", async () => { const source = fromArray([1, 2, 3]); const result = await collect(batch(source, 3)); expect(result).toEqual([[1, 2, 3]]); }); it("yields each item individually when size is 1", async () => { const source = fromArray([1, 2, 3]); const result = await collect(batch(source, 1)); expect(result).toEqual([[1], [2], [3]]); }); it("returns empty for empty source", async () => { const source = fromArray([]); const result = await collect(batch(source, 3)); expect(result).toEqual([]); }); }); describe("dedupe", () => { it("yields only unique items", async () => { const source = fromArray([1, 2, 2, 3, 1, 4, 3]); const result = await collect(dedupe(source)); expect(result).toEqual([1, 2, 3, 4]); }); it("yields all items when no duplicates", async () => { const source = fromArray([1, 2, 3]); const result = await collect(dedupe(source)); expect(result).toEqual([1, 2, 3]); }); it("returns empty for empty source", async () => { const source = fromArray([]); const result = await collect(dedupe(source)); expect(result).toEqual([]); }); it("handles all same values", async () => { const source = fromArray([5, 5, 5, 5]); const result = await collect(dedupe(source)); expect(result).toEqual([5]); }); }); describe("window", () => { it("produces sliding window of size with default step 1", async () => { const source = fromArray([1, 2, 3, 4, 5]); const result = await collect(window(source, 3)); expect(result).toEqual([[1, 2, 3], [2, 3, 4], [3, 4, 5]]); }); it("advances by step greater than 1", async () => { const source = fromArray([1, 2, 3, 4, 5, 6]); const result = await collect(window(source, 3, 2)); expect(result).toEqual([[1, 2, 3], [3, 4, 5]]); }); it("yields nothing when source is shorter than window size", async () => { const source = fromArray([1, 2]); const result = await collect(window(source, 3)); expect(result).toEqual([]); }); it("yields single window when source length equals size", async () => { const source = fromArray([1, 2, 3]); const result = await collect(window(source, 3)); expect(result).toEqual([[1, 2, 3]]); }); it("returns empty for empty source", async () => { const source = fromArray([]); const result = await collect(window(source, 2)); expect(result).toEqual([]); }); }); describe("flat", () => { it("flattens AsyncIterable into AsyncIterable", async () => { async function* arrays() { yield [1, 2]; yield [3, 4]; yield [5]; } const result = await collect(flat(arrays())); expect(result).toEqual([1, 2, 3, 4, 5]); }); it("handles empty inner arrays", async () => { async function* arrays() { yield [1, 2]; yield [] as number[]; yield [3]; } const result = await collect(flat(arrays())); expect(result).toEqual([1, 2, 3]); }); it("returns empty for empty source", async () => { async function* arrays() { // empty } const result = await collect(flat(arrays())); expect(result).toEqual([]); }); }); describe("groupBy", () => { it("groups items by key into Map", async () => { const source = fromArray([ { name: "alice", dept: "eng" }, { name: "bob", dept: "eng" }, { name: "carol", dept: "sales" }, ]); const result = await groupBy(source, (v: { name: string; dept: string }) => v.dept); expect(result.get("eng")).toEqual([ { name: "alice", dept: "eng" }, { name: "bob", dept: "eng" }, ]); expect(result.get("sales")).toEqual([{ name: "carol", dept: "sales" }]); }); it("returns empty Map for empty source", async () => { const source = fromArray([]); const result = await groupBy(source, (v: number) => v); expect(result.size).toBe(0); }); it("groups with numeric keys", async () => { const source = fromArray([1, 2, 3, 4, 5, 6]); const result = await groupBy(source, (v: number) => v % 2); expect(result.get(0)).toEqual([2, 4, 6]); expect(result.get(1)).toEqual([1, 3, 5]); }); }); describe("chain", () => { it("concatenates multiple async iterables", async () => { const source = chain(fromArray([1, 2]), fromArray([3, 4]), fromArray([5])); const result = await collect(source); expect(result).toEqual([1, 2, 3, 4, 5]); }); it("handles single iterable", async () => { const source = chain(fromArray([1, 2, 3])); const result = await collect(source); expect(result).toEqual([1, 2, 3]); }); it("handles empty iterables", async () => { const source = chain(fromArray([]), fromArray([1, 2]), fromArray([])); const result = await collect(source); expect(result).toEqual([1, 2]); }); it("returns empty when all sources are empty", async () => { const source = chain(fromArray([]), fromArray([])); const result = await collect(source); expect(result).toEqual([]); }); }); describe("join", () => { it("joins two sources on matching keys", async () => { const source1 = fromArray([ { id: 1, name: "alice" }, { id: 2, name: "bob" }, { id: 3, name: "carol" }, ]); const source2 = fromArray([ { userId: 2, role: "admin" }, { userId: 3, role: "user" }, { userId: 1, role: "user" }, ]); const result = await collect( join( source1, source2, (v: { id: number; name: string }) => v.id, (v: { userId: number; role: string }) => v.userId, ), ); expect(result).toEqual([ [ { id: 1, name: "alice" }, { userId: 1, role: "user" }, ], [ { id: 2, name: "bob" }, { userId: 2, role: "admin" }, ], [ { id: 3, name: "carol" }, { userId: 3, role: "user" }, ], ]); }); it("yields nothing when no keys match", async () => { const source1 = fromArray([{ id: 1, name: "alice" }]); const source2 = fromArray([{ userId: 99, role: "admin" }]); const result = await collect( join( source1, source2, (v: { id: number; name: string }) => v.id, (v: { userId: number; role: string }) => v.userId, ), ); expect(result).toEqual([]); }); it("handles empty sources", async () => { const source1 = fromArray<{ id: number; name: string }>([]); const source2 = fromArray<{ userId: number; role: string }>([]); const result = await collect( join(source1, source2, (v) => v.id, (v) => v.userId), ); expect(result).toEqual([]); }); it("joins with duplicate keys in source2 (last wins)", async () => { const source1 = fromArray([{ id: 1, name: "alice" }]); const source2 = fromArray([ { userId: 1, role: "admin" }, { userId: 1, role: "user" }, ]); const result = await collect( join( source1, source2, (v: { id: number; name: string }) => v.id, (v: { userId: number; role: string }) => v.userId, ), ); expect(result).toHaveLength(1); expect(result[0][0]).toEqual({ id: 1, name: "alice" }); expect(result[0][1].role).toBe("user"); }); });