Files
pubsub/test/operators.test.ts
glm-5.1 dd843132f9 feat(core-operators-tests): add comprehensive tests for all 13 stream operators
53 tests covering filter, map, pipe, take, reduce, toArray, batch,
dedupe, window, flat, groupBy, chain, and join operators including
async predicates/mappers, type-narrowing overloads, edge cases, and
pipe+subscribe composition.
2026-05-08 06:07:36 +00:00

511 lines
15 KiB
TypeScript

import { describe, it, expect } from "vitest";
import {
filter,
map,
pipe,
take,
reduce,
toArray,
batch,
dedupe,
window,
flat,
groupBy,
chain,
join,
} from "../src/operators.js";
import { createPubSub } from "../src/create_pubsub.js";
import { Repeater } from "../src/repeater.js";
async function* fromArray<T>(items: T[]): AsyncIterable<T> {
for (const item of items) {
yield item;
}
}
async function collect<T>(source: AsyncIterable<T>): Promise<T[]> {
const result: T[] = [];
for await (const item of source) {
result.push(item);
}
return result;
}
describe("filter", () => {
it("filters items by predicate", async () => {
const source = fromArray([1, 2, 3, 4, 5]);
const result = await collect(filter((x: number) => x % 2 === 0)(source));
expect(result).toEqual([2, 4]);
});
it("supports type-narrowing overload", async () => {
type Fish = { type: "fish"; swim: boolean };
type Bird = { type: "bird"; fly: boolean };
type Animal = Fish | Bird;
const animals: Animal[] = [
{ type: "fish", swim: true },
{ type: "bird", fly: true },
{ type: "fish", swim: false },
];
const source = fromArray(animals);
const isFish = (a: Animal): a is Fish => a.type === "fish";
const result = await collect(filter(isFish)(source));
expect(result).toEqual([
{ type: "fish", swim: true },
{ type: "fish", swim: false },
]);
});
it("supports async predicate", async () => {
const source = fromArray([1, 2, 3, 4, 5]);
const result = await collect(
filter(async (x: number) => {
await Promise.resolve();
return x > 3;
})(source),
);
expect(result).toEqual([4, 5]);
});
it("yields nothing when all items are filtered", async () => {
const source = fromArray([1, 3, 5]);
const result = await collect(filter((x: number) => x % 2 === 0)(source));
expect(result).toEqual([]);
});
it("yields all items when nothing is filtered", async () => {
const source = fromArray([2, 4, 6]);
const result = await collect(filter((x: number) => x % 2 === 0)(source));
expect(result).toEqual([2, 4, 6]);
});
});
describe("map", () => {
it("transforms items", async () => {
const source = fromArray([1, 2, 3]);
const result = await collect(map((x: number) => x * 2)(source));
expect(result).toEqual([2, 4, 6]);
});
it("supports async mapper", async () => {
const source = fromArray([1, 2, 3]);
const result = await collect(
map(async (x: number) => {
await Promise.resolve();
return x + 10;
})(source),
);
expect(result).toEqual([11, 12, 13]);
});
it("transforms types", async () => {
const source = fromArray([1, 2, 3]);
const result = await collect(map((x: number) => `num:${x}`)(source));
expect(result).toEqual(["num:1", "num:2", "num:3"]);
});
});
describe("pipe", () => {
it("returns input when no functions provided", () => {
const result = pipe(42);
expect(result).toBe(42);
});
it("composes 1 function", () => {
const result = pipe(5, (x: number) => x * 2);
expect(result).toBe(10);
});
it("composes 2 functions", () => {
const result = pipe(5, (x: number) => x * 2, (x: number) => x + 1);
expect(result).toBe(11);
});
it("composes 3 functions", () => {
const result = pipe(
5,
(x: number) => x * 2,
(x: number) => x + 1,
(x: number) => x * 3,
);
expect(result).toBe(33);
});
it("composes 4 functions", () => {
const result = pipe(
5,
(x: number) => x * 2,
(x: number) => x + 1,
(x: number) => x * 3,
(x: number) => x - 5,
);
expect(result).toBe(28);
});
it("composes filter and map with subscribe", async () => {
type Events = {
message: string;
};
const pubsub = createPubSub<Events>();
const id = "test-id";
const subscription = pubsub.subscribe("message", id);
const filtered = filter((e: { type: string; id: string; payload: string }) => e.payload.startsWith("hello"));
const mapped = map((e: { type: string; id: string; payload: string }) => e.payload.toUpperCase());
const result = pipe(subscription, filtered, mapped);
const iterator = result[Symbol.asyncIterator]();
const results: string[] = [];
const collectNext = () => iterator.next().then((r) => {
if (!r.done) results.push(r.value);
});
void collectNext();
void collectNext();
pubsub.publish("message", id, "hello world");
pubsub.publish("message", id, "goodbye");
pubsub.publish("message", id, "hello again");
await new Promise((r) => setTimeout(r, 50));
await iterator.return?.();
expect(results).toEqual(["HELLO WORLD", "HELLO AGAIN"]);
});
});
describe("take", () => {
it("yields first N items, then stops", async () => {
const source = fromArray([1, 2, 3, 4, 5]);
const result = await collect(take(source, 3));
expect(result).toEqual([1, 2, 3]);
});
it("yields all items if count exceeds source length", async () => {
const source = fromArray([1, 2]);
const result = await collect(take(source, 5));
expect(result).toEqual([1, 2]);
});
it("yields nothing when count is 0", async () => {
const source = fromArray([1, 2, 3]);
const result = await collect(take(source, 0));
expect(result).toEqual([]);
});
it("works with single item", async () => {
const source = fromArray([42]);
const result = await collect(take(source, 1));
expect(result).toEqual([42]);
});
});
describe("reduce", () => {
it("reduces to single value", async () => {
const source = fromArray([1, 2, 3, 4]);
const result = await reduce(source, (acc: number, val: number) => acc + val, 0);
expect(result).toBe(10);
});
it("returns initial value for empty source", async () => {
const source = fromArray<number>([]);
const result = await reduce(source, (acc: number, val: number) => acc + val, 0);
expect(result).toBe(0);
});
it("supports async reducer", async () => {
const source = fromArray([1, 2, 3]);
const result = await reduce(
source,
async (acc: number, val: number) => {
await Promise.resolve();
return acc + val;
},
0,
);
expect(result).toBe(6);
});
it("reduces with string concatenation", async () => {
const source = fromArray(["a", "b", "c"]);
const result = await reduce(source, (acc: string, val: string) => acc + val, "");
expect(result).toBe("abc");
});
});
describe("toArray", () => {
it("collects all items into array", async () => {
const source = fromArray([1, 2, 3]);
const result = await toArray(source);
expect(result).toEqual([1, 2, 3]);
});
it("returns empty array for empty source", async () => {
const source = fromArray<number>([]);
const result = await toArray(source);
expect(result).toEqual([]);
});
it("preserves order", async () => {
const source = fromArray([5, 3, 1, 4, 2]);
const result = await toArray(source);
expect(result).toEqual([5, 3, 1, 4, 2]);
});
});
describe("batch", () => {
it("groups into arrays of size", async () => {
const source = fromArray([1, 2, 3, 4, 5, 6]);
const result = await collect(batch(source, 2));
expect(result).toEqual([[1, 2], [3, 4], [5, 6]]);
});
it("yields remaining items if not a full batch", async () => {
const source = fromArray([1, 2, 3, 4, 5]);
const result = await collect(batch(source, 2));
expect(result).toEqual([[1, 2], [3, 4], [5]]);
});
it("yields single batch when source length equals size", async () => {
const source = fromArray([1, 2, 3]);
const result = await collect(batch(source, 3));
expect(result).toEqual([[1, 2, 3]]);
});
it("yields each item individually when size is 1", async () => {
const source = fromArray([1, 2, 3]);
const result = await collect(batch(source, 1));
expect(result).toEqual([[1], [2], [3]]);
});
it("returns empty for empty source", async () => {
const source = fromArray<number>([]);
const result = await collect(batch(source, 3));
expect(result).toEqual([]);
});
});
describe("dedupe", () => {
it("yields only unique items", async () => {
const source = fromArray([1, 2, 2, 3, 1, 4, 3]);
const result = await collect(dedupe(source));
expect(result).toEqual([1, 2, 3, 4]);
});
it("yields all items when no duplicates", async () => {
const source = fromArray([1, 2, 3]);
const result = await collect(dedupe(source));
expect(result).toEqual([1, 2, 3]);
});
it("returns empty for empty source", async () => {
const source = fromArray<number>([]);
const result = await collect(dedupe(source));
expect(result).toEqual([]);
});
it("handles all same values", async () => {
const source = fromArray([5, 5, 5, 5]);
const result = await collect(dedupe(source));
expect(result).toEqual([5]);
});
});
describe("window", () => {
it("produces sliding window of size with default step 1", async () => {
const source = fromArray([1, 2, 3, 4, 5]);
const result = await collect(window(source, 3));
expect(result).toEqual([[1, 2, 3], [2, 3, 4], [3, 4, 5]]);
});
it("advances by step greater than 1", async () => {
const source = fromArray([1, 2, 3, 4, 5, 6]);
const result = await collect(window(source, 3, 2));
expect(result).toEqual([[1, 2, 3], [3, 4, 5]]);
});
it("yields nothing when source is shorter than window size", async () => {
const source = fromArray([1, 2]);
const result = await collect(window(source, 3));
expect(result).toEqual([]);
});
it("yields single window when source length equals size", async () => {
const source = fromArray([1, 2, 3]);
const result = await collect(window(source, 3));
expect(result).toEqual([[1, 2, 3]]);
});
it("returns empty for empty source", async () => {
const source = fromArray<number>([]);
const result = await collect(window(source, 2));
expect(result).toEqual([]);
});
});
describe("flat", () => {
it("flattens AsyncIterable<T[]> into AsyncIterable<T>", async () => {
async function* arrays() {
yield [1, 2];
yield [3, 4];
yield [5];
}
const result = await collect(flat(arrays()));
expect(result).toEqual([1, 2, 3, 4, 5]);
});
it("handles empty inner arrays", async () => {
async function* arrays() {
yield [1, 2];
yield [] as number[];
yield [3];
}
const result = await collect(flat(arrays()));
expect(result).toEqual([1, 2, 3]);
});
it("returns empty for empty source", async () => {
async function* arrays() {
// empty
}
const result = await collect(flat(arrays()));
expect(result).toEqual([]);
});
});
describe("groupBy", () => {
it("groups items by key into Map", async () => {
const source = fromArray([
{ name: "alice", dept: "eng" },
{ name: "bob", dept: "eng" },
{ name: "carol", dept: "sales" },
]);
const result = await groupBy(source, (v: { name: string; dept: string }) => v.dept);
expect(result.get("eng")).toEqual([
{ name: "alice", dept: "eng" },
{ name: "bob", dept: "eng" },
]);
expect(result.get("sales")).toEqual([{ name: "carol", dept: "sales" }]);
});
it("returns empty Map for empty source", async () => {
const source = fromArray<number>([]);
const result = await groupBy(source, (v: number) => v);
expect(result.size).toBe(0);
});
it("groups with numeric keys", async () => {
const source = fromArray([1, 2, 3, 4, 5, 6]);
const result = await groupBy(source, (v: number) => v % 2);
expect(result.get(0)).toEqual([2, 4, 6]);
expect(result.get(1)).toEqual([1, 3, 5]);
});
});
describe("chain", () => {
it("concatenates multiple async iterables", async () => {
const source = chain(fromArray([1, 2]), fromArray([3, 4]), fromArray([5]));
const result = await collect(source);
expect(result).toEqual([1, 2, 3, 4, 5]);
});
it("handles single iterable", async () => {
const source = chain(fromArray([1, 2, 3]));
const result = await collect(source);
expect(result).toEqual([1, 2, 3]);
});
it("handles empty iterables", async () => {
const source = chain(fromArray<number>([]), fromArray([1, 2]), fromArray<number>([]));
const result = await collect(source);
expect(result).toEqual([1, 2]);
});
it("returns empty when all sources are empty", async () => {
const source = chain(fromArray<number>([]), fromArray<number>([]));
const result = await collect(source);
expect(result).toEqual([]);
});
});
describe("join", () => {
it("joins two sources on matching keys", async () => {
const source1 = fromArray([
{ id: 1, name: "alice" },
{ id: 2, name: "bob" },
{ id: 3, name: "carol" },
]);
const source2 = fromArray([
{ userId: 2, role: "admin" },
{ userId: 3, role: "user" },
{ userId: 1, role: "user" },
]);
const result = await collect(
join(
source1,
source2,
(v: { id: number; name: string }) => v.id,
(v: { userId: number; role: string }) => v.userId,
),
);
expect(result).toEqual([
[
{ id: 1, name: "alice" },
{ userId: 1, role: "user" },
],
[
{ id: 2, name: "bob" },
{ userId: 2, role: "admin" },
],
[
{ id: 3, name: "carol" },
{ userId: 3, role: "user" },
],
]);
});
it("yields nothing when no keys match", async () => {
const source1 = fromArray([{ id: 1, name: "alice" }]);
const source2 = fromArray([{ userId: 99, role: "admin" }]);
const result = await collect(
join(
source1,
source2,
(v: { id: number; name: string }) => v.id,
(v: { userId: number; role: string }) => v.userId,
),
);
expect(result).toEqual([]);
});
it("handles empty sources", async () => {
const source1 = fromArray<{ id: number; name: string }>([]);
const source2 = fromArray<{ userId: number; role: string }>([]);
const result = await collect(
join(source1, source2, (v) => v.id, (v) => v.userId),
);
expect(result).toEqual([]);
});
it("joins with duplicate keys in source2 (last wins)", async () => {
const source1 = fromArray([{ id: 1, name: "alice" }]);
const source2 = fromArray([
{ userId: 1, role: "admin" },
{ userId: 1, role: "user" },
]);
const result = await collect(
join(
source1,
source2,
(v: { id: number; name: string }) => v.id,
(v: { userId: number; role: string }) => v.userId,
),
);
expect(result).toHaveLength(1);
expect(result[0][0]).toEqual({ id: 1, name: "alice" });
expect(result[0][1].role).toBe("user");
});
});