From dd843132f986d501adffa31e7a682fcc88f6c68a Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Fri, 8 May 2026 06:07:36 +0000 Subject: [PATCH] feat(core-operators-tests): add comprehensive tests for all 13 stream operators 53 tests covering filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, and join operators including async predicates/mappers, type-narrowing overloads, edge cases, and pipe+subscribe composition. --- test/operators.test.ts | 511 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 511 insertions(+) create mode 100644 test/operators.test.ts diff --git a/test/operators.test.ts b/test/operators.test.ts new file mode 100644 index 0000000..63fe7f7 --- /dev/null +++ b/test/operators.test.ts @@ -0,0 +1,511 @@ +import { describe, it, expect } from "vitest"; +import { + filter, + map, + pipe, + take, + reduce, + toArray, + batch, + dedupe, + window, + flat, + groupBy, + chain, + join, +} from "../src/operators.js"; +import { createPubSub } from "../src/create_pubsub.js"; +import { Repeater } from "../src/repeater.js"; + +async function* fromArray(items: T[]): AsyncIterable { + for (const item of items) { + yield item; + } +} + +async function collect(source: AsyncIterable): Promise { + const result: T[] = []; + for await (const item of source) { + result.push(item); + } + return result; +} + +describe("filter", () => { + it("filters items by predicate", async () => { + const source = fromArray([1, 2, 3, 4, 5]); + const result = await collect(filter((x: number) => x % 2 === 0)(source)); + expect(result).toEqual([2, 4]); + }); + + it("supports type-narrowing overload", async () => { + type Fish = { type: "fish"; swim: boolean }; + type Bird = { type: "bird"; fly: boolean }; + type Animal = Fish | Bird; + + const animals: Animal[] = [ + { type: "fish", swim: true }, + { type: "bird", fly: true }, + { type: "fish", swim: false }, + ]; + const source = fromArray(animals); + const isFish = (a: Animal): a is Fish => a.type === "fish"; + const result = await collect(filter(isFish)(source)); + expect(result).toEqual([ + { type: "fish", swim: true }, + { type: "fish", swim: false }, + ]); + }); + + it("supports async predicate", async () => { + const source = fromArray([1, 2, 3, 4, 5]); + const result = await collect( + filter(async (x: number) => { + await Promise.resolve(); + return x > 3; + })(source), + ); + expect(result).toEqual([4, 5]); + }); + + it("yields nothing when all items are filtered", async () => { + const source = fromArray([1, 3, 5]); + const result = await collect(filter((x: number) => x % 2 === 0)(source)); + expect(result).toEqual([]); + }); + + it("yields all items when nothing is filtered", async () => { + const source = fromArray([2, 4, 6]); + const result = await collect(filter((x: number) => x % 2 === 0)(source)); + expect(result).toEqual([2, 4, 6]); + }); +}); + +describe("map", () => { + it("transforms items", async () => { + const source = fromArray([1, 2, 3]); + const result = await collect(map((x: number) => x * 2)(source)); + expect(result).toEqual([2, 4, 6]); + }); + + it("supports async mapper", async () => { + const source = fromArray([1, 2, 3]); + const result = await collect( + map(async (x: number) => { + await Promise.resolve(); + return x + 10; + })(source), + ); + expect(result).toEqual([11, 12, 13]); + }); + + it("transforms types", async () => { + const source = fromArray([1, 2, 3]); + const result = await collect(map((x: number) => `num:${x}`)(source)); + expect(result).toEqual(["num:1", "num:2", "num:3"]); + }); +}); + +describe("pipe", () => { + it("returns input when no functions provided", () => { + const result = pipe(42); + expect(result).toBe(42); + }); + + it("composes 1 function", () => { + const result = pipe(5, (x: number) => x * 2); + expect(result).toBe(10); + }); + + it("composes 2 functions", () => { + const result = pipe(5, (x: number) => x * 2, (x: number) => x + 1); + expect(result).toBe(11); + }); + + it("composes 3 functions", () => { + const result = pipe( + 5, + (x: number) => x * 2, + (x: number) => x + 1, + (x: number) => x * 3, + ); + expect(result).toBe(33); + }); + + it("composes 4 functions", () => { + const result = pipe( + 5, + (x: number) => x * 2, + (x: number) => x + 1, + (x: number) => x * 3, + (x: number) => x - 5, + ); + expect(result).toBe(28); + }); + + it("composes filter and map with subscribe", async () => { + type Events = { + message: string; + }; + const pubsub = createPubSub(); + const id = "test-id"; + + const subscription = pubsub.subscribe("message", id); + const filtered = filter((e: { type: string; id: string; payload: string }) => e.payload.startsWith("hello")); + const mapped = map((e: { type: string; id: string; payload: string }) => e.payload.toUpperCase()); + + const result = pipe(subscription, filtered, mapped); + const iterator = result[Symbol.asyncIterator](); + + const results: string[] = []; + + const collectNext = () => iterator.next().then((r) => { + if (!r.done) results.push(r.value); + }); + + void collectNext(); + void collectNext(); + + pubsub.publish("message", id, "hello world"); + pubsub.publish("message", id, "goodbye"); + pubsub.publish("message", id, "hello again"); + + await new Promise((r) => setTimeout(r, 50)); + await iterator.return?.(); + + expect(results).toEqual(["HELLO WORLD", "HELLO AGAIN"]); + }); +}); + +describe("take", () => { + it("yields first N items, then stops", async () => { + const source = fromArray([1, 2, 3, 4, 5]); + const result = await collect(take(source, 3)); + expect(result).toEqual([1, 2, 3]); + }); + + it("yields all items if count exceeds source length", async () => { + const source = fromArray([1, 2]); + const result = await collect(take(source, 5)); + expect(result).toEqual([1, 2]); + }); + + it("yields nothing when count is 0", async () => { + const source = fromArray([1, 2, 3]); + const result = await collect(take(source, 0)); + expect(result).toEqual([]); + }); + + it("works with single item", async () => { + const source = fromArray([42]); + const result = await collect(take(source, 1)); + expect(result).toEqual([42]); + }); +}); + +describe("reduce", () => { + it("reduces to single value", async () => { + const source = fromArray([1, 2, 3, 4]); + const result = await reduce(source, (acc: number, val: number) => acc + val, 0); + expect(result).toBe(10); + }); + + it("returns initial value for empty source", async () => { + const source = fromArray([]); + const result = await reduce(source, (acc: number, val: number) => acc + val, 0); + expect(result).toBe(0); + }); + + it("supports async reducer", async () => { + const source = fromArray([1, 2, 3]); + const result = await reduce( + source, + async (acc: number, val: number) => { + await Promise.resolve(); + return acc + val; + }, + 0, + ); + expect(result).toBe(6); + }); + + it("reduces with string concatenation", async () => { + const source = fromArray(["a", "b", "c"]); + const result = await reduce(source, (acc: string, val: string) => acc + val, ""); + expect(result).toBe("abc"); + }); +}); + +describe("toArray", () => { + it("collects all items into array", async () => { + const source = fromArray([1, 2, 3]); + const result = await toArray(source); + expect(result).toEqual([1, 2, 3]); + }); + + it("returns empty array for empty source", async () => { + const source = fromArray([]); + const result = await toArray(source); + expect(result).toEqual([]); + }); + + it("preserves order", async () => { + const source = fromArray([5, 3, 1, 4, 2]); + const result = await toArray(source); + expect(result).toEqual([5, 3, 1, 4, 2]); + }); +}); + +describe("batch", () => { + it("groups into arrays of size", async () => { + const source = fromArray([1, 2, 3, 4, 5, 6]); + const result = await collect(batch(source, 2)); + expect(result).toEqual([[1, 2], [3, 4], [5, 6]]); + }); + + it("yields remaining items if not a full batch", async () => { + const source = fromArray([1, 2, 3, 4, 5]); + const result = await collect(batch(source, 2)); + expect(result).toEqual([[1, 2], [3, 4], [5]]); + }); + + it("yields single batch when source length equals size", async () => { + const source = fromArray([1, 2, 3]); + const result = await collect(batch(source, 3)); + expect(result).toEqual([[1, 2, 3]]); + }); + + it("yields each item individually when size is 1", async () => { + const source = fromArray([1, 2, 3]); + const result = await collect(batch(source, 1)); + expect(result).toEqual([[1], [2], [3]]); + }); + + it("returns empty for empty source", async () => { + const source = fromArray([]); + const result = await collect(batch(source, 3)); + expect(result).toEqual([]); + }); +}); + +describe("dedupe", () => { + it("yields only unique items", async () => { + const source = fromArray([1, 2, 2, 3, 1, 4, 3]); + const result = await collect(dedupe(source)); + expect(result).toEqual([1, 2, 3, 4]); + }); + + it("yields all items when no duplicates", async () => { + const source = fromArray([1, 2, 3]); + const result = await collect(dedupe(source)); + expect(result).toEqual([1, 2, 3]); + }); + + it("returns empty for empty source", async () => { + const source = fromArray([]); + const result = await collect(dedupe(source)); + expect(result).toEqual([]); + }); + + it("handles all same values", async () => { + const source = fromArray([5, 5, 5, 5]); + const result = await collect(dedupe(source)); + expect(result).toEqual([5]); + }); +}); + +describe("window", () => { + it("produces sliding window of size with default step 1", async () => { + const source = fromArray([1, 2, 3, 4, 5]); + const result = await collect(window(source, 3)); + expect(result).toEqual([[1, 2, 3], [2, 3, 4], [3, 4, 5]]); + }); + + it("advances by step greater than 1", async () => { + const source = fromArray([1, 2, 3, 4, 5, 6]); + const result = await collect(window(source, 3, 2)); + expect(result).toEqual([[1, 2, 3], [3, 4, 5]]); + }); + + it("yields nothing when source is shorter than window size", async () => { + const source = fromArray([1, 2]); + const result = await collect(window(source, 3)); + expect(result).toEqual([]); + }); + + it("yields single window when source length equals size", async () => { + const source = fromArray([1, 2, 3]); + const result = await collect(window(source, 3)); + expect(result).toEqual([[1, 2, 3]]); + }); + + it("returns empty for empty source", async () => { + const source = fromArray([]); + const result = await collect(window(source, 2)); + expect(result).toEqual([]); + }); +}); + +describe("flat", () => { + it("flattens AsyncIterable into AsyncIterable", async () => { + async function* arrays() { + yield [1, 2]; + yield [3, 4]; + yield [5]; + } + const result = await collect(flat(arrays())); + expect(result).toEqual([1, 2, 3, 4, 5]); + }); + + it("handles empty inner arrays", async () => { + async function* arrays() { + yield [1, 2]; + yield [] as number[]; + yield [3]; + } + const result = await collect(flat(arrays())); + expect(result).toEqual([1, 2, 3]); + }); + + it("returns empty for empty source", async () => { + async function* arrays() { + // empty + } + const result = await collect(flat(arrays())); + expect(result).toEqual([]); + }); +}); + +describe("groupBy", () => { + it("groups items by key into Map", async () => { + const source = fromArray([ + { name: "alice", dept: "eng" }, + { name: "bob", dept: "eng" }, + { name: "carol", dept: "sales" }, + ]); + const result = await groupBy(source, (v: { name: string; dept: string }) => v.dept); + expect(result.get("eng")).toEqual([ + { name: "alice", dept: "eng" }, + { name: "bob", dept: "eng" }, + ]); + expect(result.get("sales")).toEqual([{ name: "carol", dept: "sales" }]); + }); + + it("returns empty Map for empty source", async () => { + const source = fromArray([]); + const result = await groupBy(source, (v: number) => v); + expect(result.size).toBe(0); + }); + + it("groups with numeric keys", async () => { + const source = fromArray([1, 2, 3, 4, 5, 6]); + const result = await groupBy(source, (v: number) => v % 2); + expect(result.get(0)).toEqual([2, 4, 6]); + expect(result.get(1)).toEqual([1, 3, 5]); + }); +}); + +describe("chain", () => { + it("concatenates multiple async iterables", async () => { + const source = chain(fromArray([1, 2]), fromArray([3, 4]), fromArray([5])); + const result = await collect(source); + expect(result).toEqual([1, 2, 3, 4, 5]); + }); + + it("handles single iterable", async () => { + const source = chain(fromArray([1, 2, 3])); + const result = await collect(source); + expect(result).toEqual([1, 2, 3]); + }); + + it("handles empty iterables", async () => { + const source = chain(fromArray([]), fromArray([1, 2]), fromArray([])); + const result = await collect(source); + expect(result).toEqual([1, 2]); + }); + + it("returns empty when all sources are empty", async () => { + const source = chain(fromArray([]), fromArray([])); + const result = await collect(source); + expect(result).toEqual([]); + }); +}); + +describe("join", () => { + it("joins two sources on matching keys", async () => { + const source1 = fromArray([ + { id: 1, name: "alice" }, + { id: 2, name: "bob" }, + { id: 3, name: "carol" }, + ]); + const source2 = fromArray([ + { userId: 2, role: "admin" }, + { userId: 3, role: "user" }, + { userId: 1, role: "user" }, + ]); + const result = await collect( + join( + source1, + source2, + (v: { id: number; name: string }) => v.id, + (v: { userId: number; role: string }) => v.userId, + ), + ); + expect(result).toEqual([ + [ + { id: 1, name: "alice" }, + { userId: 1, role: "user" }, + ], + [ + { id: 2, name: "bob" }, + { userId: 2, role: "admin" }, + ], + [ + { id: 3, name: "carol" }, + { userId: 3, role: "user" }, + ], + ]); + }); + + it("yields nothing when no keys match", async () => { + const source1 = fromArray([{ id: 1, name: "alice" }]); + const source2 = fromArray([{ userId: 99, role: "admin" }]); + const result = await collect( + join( + source1, + source2, + (v: { id: number; name: string }) => v.id, + (v: { userId: number; role: string }) => v.userId, + ), + ); + expect(result).toEqual([]); + }); + + it("handles empty sources", async () => { + const source1 = fromArray<{ id: number; name: string }>([]); + const source2 = fromArray<{ userId: number; role: string }>([]); + const result = await collect( + join(source1, source2, (v) => v.id, (v) => v.userId), + ); + expect(result).toEqual([]); + }); + + it("joins with duplicate keys in source2 (last wins)", async () => { + const source1 = fromArray([{ id: 1, name: "alice" }]); + const source2 = fromArray([ + { userId: 1, role: "admin" }, + { userId: 1, role: "user" }, + ]); + const result = await collect( + join( + source1, + source2, + (v: { id: number; name: string }) => v.id, + (v: { userId: number; role: string }) => v.userId, + ), + ); + expect(result).toHaveLength(1); + expect(result[0][0]).toEqual({ id: 1, name: "alice" }); + expect(result[0][1].role).toBe("user"); + }); +}); \ No newline at end of file