feat(core-operators-tests): add comprehensive tests for all 13 stream operators
53 tests covering filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, and join operators including async predicates/mappers, type-narrowing overloads, edge cases, and pipe+subscribe composition.
This commit is contained in:
511
test/operators.test.ts
Normal file
511
test/operators.test.ts
Normal file
@@ -0,0 +1,511 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import {
|
||||
filter,
|
||||
map,
|
||||
pipe,
|
||||
take,
|
||||
reduce,
|
||||
toArray,
|
||||
batch,
|
||||
dedupe,
|
||||
window,
|
||||
flat,
|
||||
groupBy,
|
||||
chain,
|
||||
join,
|
||||
} from "../src/operators.js";
|
||||
import { createPubSub } from "../src/create_pubsub.js";
|
||||
import { Repeater } from "../src/repeater.js";
|
||||
|
||||
async function* fromArray<T>(items: T[]): AsyncIterable<T> {
|
||||
for (const item of items) {
|
||||
yield item;
|
||||
}
|
||||
}
|
||||
|
||||
async function collect<T>(source: AsyncIterable<T>): Promise<T[]> {
|
||||
const result: T[] = [];
|
||||
for await (const item of source) {
|
||||
result.push(item);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
describe("filter", () => {
|
||||
it("filters items by predicate", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(filter((x: number) => x % 2 === 0)(source));
|
||||
expect(result).toEqual([2, 4]);
|
||||
});
|
||||
|
||||
it("supports type-narrowing overload", async () => {
|
||||
type Fish = { type: "fish"; swim: boolean };
|
||||
type Bird = { type: "bird"; fly: boolean };
|
||||
type Animal = Fish | Bird;
|
||||
|
||||
const animals: Animal[] = [
|
||||
{ type: "fish", swim: true },
|
||||
{ type: "bird", fly: true },
|
||||
{ type: "fish", swim: false },
|
||||
];
|
||||
const source = fromArray(animals);
|
||||
const isFish = (a: Animal): a is Fish => a.type === "fish";
|
||||
const result = await collect(filter(isFish)(source));
|
||||
expect(result).toEqual([
|
||||
{ type: "fish", swim: true },
|
||||
{ type: "fish", swim: false },
|
||||
]);
|
||||
});
|
||||
|
||||
it("supports async predicate", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(
|
||||
filter(async (x: number) => {
|
||||
await Promise.resolve();
|
||||
return x > 3;
|
||||
})(source),
|
||||
);
|
||||
expect(result).toEqual([4, 5]);
|
||||
});
|
||||
|
||||
it("yields nothing when all items are filtered", async () => {
|
||||
const source = fromArray([1, 3, 5]);
|
||||
const result = await collect(filter((x: number) => x % 2 === 0)(source));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("yields all items when nothing is filtered", async () => {
|
||||
const source = fromArray([2, 4, 6]);
|
||||
const result = await collect(filter((x: number) => x % 2 === 0)(source));
|
||||
expect(result).toEqual([2, 4, 6]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("map", () => {
|
||||
it("transforms items", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(map((x: number) => x * 2)(source));
|
||||
expect(result).toEqual([2, 4, 6]);
|
||||
});
|
||||
|
||||
it("supports async mapper", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(
|
||||
map(async (x: number) => {
|
||||
await Promise.resolve();
|
||||
return x + 10;
|
||||
})(source),
|
||||
);
|
||||
expect(result).toEqual([11, 12, 13]);
|
||||
});
|
||||
|
||||
it("transforms types", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(map((x: number) => `num:${x}`)(source));
|
||||
expect(result).toEqual(["num:1", "num:2", "num:3"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("pipe", () => {
|
||||
it("returns input when no functions provided", () => {
|
||||
const result = pipe(42);
|
||||
expect(result).toBe(42);
|
||||
});
|
||||
|
||||
it("composes 1 function", () => {
|
||||
const result = pipe(5, (x: number) => x * 2);
|
||||
expect(result).toBe(10);
|
||||
});
|
||||
|
||||
it("composes 2 functions", () => {
|
||||
const result = pipe(5, (x: number) => x * 2, (x: number) => x + 1);
|
||||
expect(result).toBe(11);
|
||||
});
|
||||
|
||||
it("composes 3 functions", () => {
|
||||
const result = pipe(
|
||||
5,
|
||||
(x: number) => x * 2,
|
||||
(x: number) => x + 1,
|
||||
(x: number) => x * 3,
|
||||
);
|
||||
expect(result).toBe(33);
|
||||
});
|
||||
|
||||
it("composes 4 functions", () => {
|
||||
const result = pipe(
|
||||
5,
|
||||
(x: number) => x * 2,
|
||||
(x: number) => x + 1,
|
||||
(x: number) => x * 3,
|
||||
(x: number) => x - 5,
|
||||
);
|
||||
expect(result).toBe(28);
|
||||
});
|
||||
|
||||
it("composes filter and map with subscribe", async () => {
|
||||
type Events = {
|
||||
message: string;
|
||||
};
|
||||
const pubsub = createPubSub<Events>();
|
||||
const id = "test-id";
|
||||
|
||||
const subscription = pubsub.subscribe("message", id);
|
||||
const filtered = filter((e: { type: string; id: string; payload: string }) => e.payload.startsWith("hello"));
|
||||
const mapped = map((e: { type: string; id: string; payload: string }) => e.payload.toUpperCase());
|
||||
|
||||
const result = pipe(subscription, filtered, mapped);
|
||||
const iterator = result[Symbol.asyncIterator]();
|
||||
|
||||
const results: string[] = [];
|
||||
|
||||
const collectNext = () => iterator.next().then((r) => {
|
||||
if (!r.done) results.push(r.value);
|
||||
});
|
||||
|
||||
void collectNext();
|
||||
void collectNext();
|
||||
|
||||
pubsub.publish("message", id, "hello world");
|
||||
pubsub.publish("message", id, "goodbye");
|
||||
pubsub.publish("message", id, "hello again");
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
await iterator.return?.();
|
||||
|
||||
expect(results).toEqual(["HELLO WORLD", "HELLO AGAIN"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("take", () => {
|
||||
it("yields first N items, then stops", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(take(source, 3));
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("yields all items if count exceeds source length", async () => {
|
||||
const source = fromArray([1, 2]);
|
||||
const result = await collect(take(source, 5));
|
||||
expect(result).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("yields nothing when count is 0", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(take(source, 0));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("works with single item", async () => {
|
||||
const source = fromArray([42]);
|
||||
const result = await collect(take(source, 1));
|
||||
expect(result).toEqual([42]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("reduce", () => {
|
||||
it("reduces to single value", async () => {
|
||||
const source = fromArray([1, 2, 3, 4]);
|
||||
const result = await reduce(source, (acc: number, val: number) => acc + val, 0);
|
||||
expect(result).toBe(10);
|
||||
});
|
||||
|
||||
it("returns initial value for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await reduce(source, (acc: number, val: number) => acc + val, 0);
|
||||
expect(result).toBe(0);
|
||||
});
|
||||
|
||||
it("supports async reducer", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await reduce(
|
||||
source,
|
||||
async (acc: number, val: number) => {
|
||||
await Promise.resolve();
|
||||
return acc + val;
|
||||
},
|
||||
0,
|
||||
);
|
||||
expect(result).toBe(6);
|
||||
});
|
||||
|
||||
it("reduces with string concatenation", async () => {
|
||||
const source = fromArray(["a", "b", "c"]);
|
||||
const result = await reduce(source, (acc: string, val: string) => acc + val, "");
|
||||
expect(result).toBe("abc");
|
||||
});
|
||||
});
|
||||
|
||||
describe("toArray", () => {
|
||||
it("collects all items into array", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await toArray(source);
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("returns empty array for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await toArray(source);
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("preserves order", async () => {
|
||||
const source = fromArray([5, 3, 1, 4, 2]);
|
||||
const result = await toArray(source);
|
||||
expect(result).toEqual([5, 3, 1, 4, 2]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("batch", () => {
|
||||
it("groups into arrays of size", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5, 6]);
|
||||
const result = await collect(batch(source, 2));
|
||||
expect(result).toEqual([[1, 2], [3, 4], [5, 6]]);
|
||||
});
|
||||
|
||||
it("yields remaining items if not a full batch", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(batch(source, 2));
|
||||
expect(result).toEqual([[1, 2], [3, 4], [5]]);
|
||||
});
|
||||
|
||||
it("yields single batch when source length equals size", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(batch(source, 3));
|
||||
expect(result).toEqual([[1, 2, 3]]);
|
||||
});
|
||||
|
||||
it("yields each item individually when size is 1", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(batch(source, 1));
|
||||
expect(result).toEqual([[1], [2], [3]]);
|
||||
});
|
||||
|
||||
it("returns empty for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await collect(batch(source, 3));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("dedupe", () => {
|
||||
it("yields only unique items", async () => {
|
||||
const source = fromArray([1, 2, 2, 3, 1, 4, 3]);
|
||||
const result = await collect(dedupe(source));
|
||||
expect(result).toEqual([1, 2, 3, 4]);
|
||||
});
|
||||
|
||||
it("yields all items when no duplicates", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(dedupe(source));
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("returns empty for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await collect(dedupe(source));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("handles all same values", async () => {
|
||||
const source = fromArray([5, 5, 5, 5]);
|
||||
const result = await collect(dedupe(source));
|
||||
expect(result).toEqual([5]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("window", () => {
|
||||
it("produces sliding window of size with default step 1", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(window(source, 3));
|
||||
expect(result).toEqual([[1, 2, 3], [2, 3, 4], [3, 4, 5]]);
|
||||
});
|
||||
|
||||
it("advances by step greater than 1", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5, 6]);
|
||||
const result = await collect(window(source, 3, 2));
|
||||
expect(result).toEqual([[1, 2, 3], [3, 4, 5]]);
|
||||
});
|
||||
|
||||
it("yields nothing when source is shorter than window size", async () => {
|
||||
const source = fromArray([1, 2]);
|
||||
const result = await collect(window(source, 3));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("yields single window when source length equals size", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(window(source, 3));
|
||||
expect(result).toEqual([[1, 2, 3]]);
|
||||
});
|
||||
|
||||
it("returns empty for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await collect(window(source, 2));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("flat", () => {
|
||||
it("flattens AsyncIterable<T[]> into AsyncIterable<T>", async () => {
|
||||
async function* arrays() {
|
||||
yield [1, 2];
|
||||
yield [3, 4];
|
||||
yield [5];
|
||||
}
|
||||
const result = await collect(flat(arrays()));
|
||||
expect(result).toEqual([1, 2, 3, 4, 5]);
|
||||
});
|
||||
|
||||
it("handles empty inner arrays", async () => {
|
||||
async function* arrays() {
|
||||
yield [1, 2];
|
||||
yield [] as number[];
|
||||
yield [3];
|
||||
}
|
||||
const result = await collect(flat(arrays()));
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("returns empty for empty source", async () => {
|
||||
async function* arrays() {
|
||||
// empty
|
||||
}
|
||||
const result = await collect(flat(arrays()));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("groupBy", () => {
|
||||
it("groups items by key into Map", async () => {
|
||||
const source = fromArray([
|
||||
{ name: "alice", dept: "eng" },
|
||||
{ name: "bob", dept: "eng" },
|
||||
{ name: "carol", dept: "sales" },
|
||||
]);
|
||||
const result = await groupBy(source, (v: { name: string; dept: string }) => v.dept);
|
||||
expect(result.get("eng")).toEqual([
|
||||
{ name: "alice", dept: "eng" },
|
||||
{ name: "bob", dept: "eng" },
|
||||
]);
|
||||
expect(result.get("sales")).toEqual([{ name: "carol", dept: "sales" }]);
|
||||
});
|
||||
|
||||
it("returns empty Map for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await groupBy(source, (v: number) => v);
|
||||
expect(result.size).toBe(0);
|
||||
});
|
||||
|
||||
it("groups with numeric keys", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5, 6]);
|
||||
const result = await groupBy(source, (v: number) => v % 2);
|
||||
expect(result.get(0)).toEqual([2, 4, 6]);
|
||||
expect(result.get(1)).toEqual([1, 3, 5]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("chain", () => {
|
||||
it("concatenates multiple async iterables", async () => {
|
||||
const source = chain(fromArray([1, 2]), fromArray([3, 4]), fromArray([5]));
|
||||
const result = await collect(source);
|
||||
expect(result).toEqual([1, 2, 3, 4, 5]);
|
||||
});
|
||||
|
||||
it("handles single iterable", async () => {
|
||||
const source = chain(fromArray([1, 2, 3]));
|
||||
const result = await collect(source);
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("handles empty iterables", async () => {
|
||||
const source = chain(fromArray<number>([]), fromArray([1, 2]), fromArray<number>([]));
|
||||
const result = await collect(source);
|
||||
expect(result).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("returns empty when all sources are empty", async () => {
|
||||
const source = chain(fromArray<number>([]), fromArray<number>([]));
|
||||
const result = await collect(source);
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("join", () => {
|
||||
it("joins two sources on matching keys", async () => {
|
||||
const source1 = fromArray([
|
||||
{ id: 1, name: "alice" },
|
||||
{ id: 2, name: "bob" },
|
||||
{ id: 3, name: "carol" },
|
||||
]);
|
||||
const source2 = fromArray([
|
||||
{ userId: 2, role: "admin" },
|
||||
{ userId: 3, role: "user" },
|
||||
{ userId: 1, role: "user" },
|
||||
]);
|
||||
const result = await collect(
|
||||
join(
|
||||
source1,
|
||||
source2,
|
||||
(v: { id: number; name: string }) => v.id,
|
||||
(v: { userId: number; role: string }) => v.userId,
|
||||
),
|
||||
);
|
||||
expect(result).toEqual([
|
||||
[
|
||||
{ id: 1, name: "alice" },
|
||||
{ userId: 1, role: "user" },
|
||||
],
|
||||
[
|
||||
{ id: 2, name: "bob" },
|
||||
{ userId: 2, role: "admin" },
|
||||
],
|
||||
[
|
||||
{ id: 3, name: "carol" },
|
||||
{ userId: 3, role: "user" },
|
||||
],
|
||||
]);
|
||||
});
|
||||
|
||||
it("yields nothing when no keys match", async () => {
|
||||
const source1 = fromArray([{ id: 1, name: "alice" }]);
|
||||
const source2 = fromArray([{ userId: 99, role: "admin" }]);
|
||||
const result = await collect(
|
||||
join(
|
||||
source1,
|
||||
source2,
|
||||
(v: { id: number; name: string }) => v.id,
|
||||
(v: { userId: number; role: string }) => v.userId,
|
||||
),
|
||||
);
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("handles empty sources", async () => {
|
||||
const source1 = fromArray<{ id: number; name: string }>([]);
|
||||
const source2 = fromArray<{ userId: number; role: string }>([]);
|
||||
const result = await collect(
|
||||
join(source1, source2, (v) => v.id, (v) => v.userId),
|
||||
);
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("joins with duplicate keys in source2 (last wins)", async () => {
|
||||
const source1 = fromArray([{ id: 1, name: "alice" }]);
|
||||
const source2 = fromArray([
|
||||
{ userId: 1, role: "admin" },
|
||||
{ userId: 1, role: "user" },
|
||||
]);
|
||||
const result = await collect(
|
||||
join(
|
||||
source1,
|
||||
source2,
|
||||
(v: { id: number; name: string }) => v.id,
|
||||
(v: { userId: number; role: string }) => v.userId,
|
||||
),
|
||||
);
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0][0]).toEqual({ id: 1, name: "alice" });
|
||||
expect(result[0][1].role).toBe("user");
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user