Compare commits
1 Commits
wave1/core
...
wave1/core
| Author | SHA1 | Date | |
|---|---|---|---|
| dd843132f9 |
@@ -1,272 +0,0 @@
|
||||
import { describe, it, expect, vi } from "vitest";
|
||||
import { createPubSub } from "../src/create_pubsub.js";
|
||||
import type { EventEnvelope, TypedEventTarget } from "../src/types.js";
|
||||
|
||||
type TestEventMap = {
|
||||
"message.sent": string;
|
||||
"user.joined": { name: string };
|
||||
"session.status": { status: string; code: number };
|
||||
};
|
||||
|
||||
describe("createPubSub", () => {
|
||||
describe("publish", () => {
|
||||
it("dispatches event with correct type:id topic", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<"message.sent", string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "abc123");
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "abc123", "hello");
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].type).toBe("message.sent");
|
||||
expect(received[0].id).toBe("abc123");
|
||||
});
|
||||
|
||||
it("throws on __-prefixed event types", () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
|
||||
expect(() => pubsub.publish("__subscribe" as any, "", {})).toThrow(
|
||||
'Event types starting with "__" are reserved for adapter control messages. Received: "__subscribe"',
|
||||
);
|
||||
});
|
||||
|
||||
it("dispatches EventEnvelope with type, id, and payload as CustomEvent detail", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<"user.joined", { name: string }>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("user.joined", "user1");
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("user.joined", "user1", { name: "Alice" });
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toEqual({
|
||||
type: "user.joined",
|
||||
id: "user1",
|
||||
payload: { name: "Alice" },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("subscribe", () => {
|
||||
it("returns async iterable that yields EventEnvelope objects", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "msg1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length === 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "msg1", "hello");
|
||||
|
||||
await consume;
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toEqual({
|
||||
type: "message.sent",
|
||||
id: "msg1",
|
||||
payload: "hello",
|
||||
});
|
||||
});
|
||||
|
||||
it("yields envelope with correct type, id, and payload fields", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
let result: EventEnvelope<"session.status", { status: string; code: number }> | undefined;
|
||||
|
||||
const iterator = pubsub.subscribe("session.status", "sess1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
result = envelope;
|
||||
break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("session.status", "sess1", { status: "active", code: 200 });
|
||||
|
||||
await consume;
|
||||
|
||||
expect(result).toBeDefined();
|
||||
expect(result!.type).toBe("session.status");
|
||||
expect(result!.id).toBe("sess1");
|
||||
expect(result!.payload).toEqual({ status: "active", code: 200 });
|
||||
});
|
||||
|
||||
it("receives events only for the subscribed topic", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "msg1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 2) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "msg1", "first");
|
||||
pubsub.publish("message.sent", "msg_different", "wrong topic");
|
||||
pubsub.publish("message.sent", "msg1", "second");
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(2);
|
||||
expect(received[0].payload).toBe("first");
|
||||
expect(received[1].payload).toBe("second");
|
||||
});
|
||||
|
||||
it("multiple subscribers on the same topic all receive events", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received1: EventEnvelope<string>[] = [];
|
||||
const received2: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator1 = pubsub.subscribe("message.sent", "msg1");
|
||||
const iterator2 = pubsub.subscribe("message.sent", "msg1");
|
||||
|
||||
const consume1 = (async () => {
|
||||
for await (const envelope of iterator1) {
|
||||
received1.push(envelope);
|
||||
if (received1.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
const consume2 = (async () => {
|
||||
for await (const envelope of iterator2) {
|
||||
received2.push(envelope);
|
||||
if (received2.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "msg1", "broadcast");
|
||||
|
||||
await Promise.all([consume1, consume2]);
|
||||
|
||||
expect(received1).toHaveLength(1);
|
||||
expect(received2).toHaveLength(1);
|
||||
expect(received1[0].payload).toBe("broadcast");
|
||||
expect(received2[0].payload).toBe("broadcast");
|
||||
});
|
||||
|
||||
it("cleanup: breaking out of for await loop removes the listener", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "cleanup-test");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "cleanup-test", "first");
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].payload).toBe("first");
|
||||
|
||||
pubsub.publish("message.sent", "cleanup-test", "after-break");
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
|
||||
const secondIterator = pubsub.subscribe("message.sent", "cleanup-test");
|
||||
const secondReceived: EventEnvelope<string>[] = [];
|
||||
const consume2 = (async () => {
|
||||
for await (const envelope of secondIterator) {
|
||||
secondReceived.push(envelope);
|
||||
if (secondReceived.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "cleanup-test", "second-listener");
|
||||
await consume2;
|
||||
|
||||
expect(secondReceived).toHaveLength(1);
|
||||
expect(secondReceived[0].payload).toBe("second-listener");
|
||||
expect(received).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createPubSub config", () => {
|
||||
it("with custom eventTarget dispatches to that target", () => {
|
||||
const customTarget = new EventTarget() as TypedEventTarget<any>;
|
||||
const dispatchSpy = vi.spyOn(customTarget, "dispatchEvent");
|
||||
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: customTarget });
|
||||
|
||||
pubsub.publish("message.sent", "custom1", "hello custom");
|
||||
|
||||
expect(dispatchSpy).toHaveBeenCalledTimes(1);
|
||||
expect(dispatchSpy.mock.calls[0][0].type).toBe("message.sent:custom1");
|
||||
expect((dispatchSpy.mock.calls[0][0] as CustomEvent).detail).toEqual({
|
||||
type: "message.sent",
|
||||
id: "custom1",
|
||||
payload: "hello custom",
|
||||
});
|
||||
|
||||
dispatchSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("without eventTarget uses new EventTarget() (in-process)", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "inproc1");
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "inproc1", "in-process works");
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].payload).toBe("in-process works");
|
||||
});
|
||||
|
||||
it("custom eventTarget receives events via subscribe", async () => {
|
||||
const customTarget = new EventTarget() as TypedEventTarget<any>;
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: customTarget });
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "custom-sub");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "custom-sub", "via custom");
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].payload).toBe("via custom");
|
||||
});
|
||||
});
|
||||
});
|
||||
511
test/operators.test.ts
Normal file
511
test/operators.test.ts
Normal file
@@ -0,0 +1,511 @@
|
||||
import { describe, it, expect } from "vitest";
|
||||
import {
|
||||
filter,
|
||||
map,
|
||||
pipe,
|
||||
take,
|
||||
reduce,
|
||||
toArray,
|
||||
batch,
|
||||
dedupe,
|
||||
window,
|
||||
flat,
|
||||
groupBy,
|
||||
chain,
|
||||
join,
|
||||
} from "../src/operators.js";
|
||||
import { createPubSub } from "../src/create_pubsub.js";
|
||||
import { Repeater } from "../src/repeater.js";
|
||||
|
||||
async function* fromArray<T>(items: T[]): AsyncIterable<T> {
|
||||
for (const item of items) {
|
||||
yield item;
|
||||
}
|
||||
}
|
||||
|
||||
async function collect<T>(source: AsyncIterable<T>): Promise<T[]> {
|
||||
const result: T[] = [];
|
||||
for await (const item of source) {
|
||||
result.push(item);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
describe("filter", () => {
|
||||
it("filters items by predicate", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(filter((x: number) => x % 2 === 0)(source));
|
||||
expect(result).toEqual([2, 4]);
|
||||
});
|
||||
|
||||
it("supports type-narrowing overload", async () => {
|
||||
type Fish = { type: "fish"; swim: boolean };
|
||||
type Bird = { type: "bird"; fly: boolean };
|
||||
type Animal = Fish | Bird;
|
||||
|
||||
const animals: Animal[] = [
|
||||
{ type: "fish", swim: true },
|
||||
{ type: "bird", fly: true },
|
||||
{ type: "fish", swim: false },
|
||||
];
|
||||
const source = fromArray(animals);
|
||||
const isFish = (a: Animal): a is Fish => a.type === "fish";
|
||||
const result = await collect(filter(isFish)(source));
|
||||
expect(result).toEqual([
|
||||
{ type: "fish", swim: true },
|
||||
{ type: "fish", swim: false },
|
||||
]);
|
||||
});
|
||||
|
||||
it("supports async predicate", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(
|
||||
filter(async (x: number) => {
|
||||
await Promise.resolve();
|
||||
return x > 3;
|
||||
})(source),
|
||||
);
|
||||
expect(result).toEqual([4, 5]);
|
||||
});
|
||||
|
||||
it("yields nothing when all items are filtered", async () => {
|
||||
const source = fromArray([1, 3, 5]);
|
||||
const result = await collect(filter((x: number) => x % 2 === 0)(source));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("yields all items when nothing is filtered", async () => {
|
||||
const source = fromArray([2, 4, 6]);
|
||||
const result = await collect(filter((x: number) => x % 2 === 0)(source));
|
||||
expect(result).toEqual([2, 4, 6]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("map", () => {
|
||||
it("transforms items", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(map((x: number) => x * 2)(source));
|
||||
expect(result).toEqual([2, 4, 6]);
|
||||
});
|
||||
|
||||
it("supports async mapper", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(
|
||||
map(async (x: number) => {
|
||||
await Promise.resolve();
|
||||
return x + 10;
|
||||
})(source),
|
||||
);
|
||||
expect(result).toEqual([11, 12, 13]);
|
||||
});
|
||||
|
||||
it("transforms types", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(map((x: number) => `num:${x}`)(source));
|
||||
expect(result).toEqual(["num:1", "num:2", "num:3"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("pipe", () => {
|
||||
it("returns input when no functions provided", () => {
|
||||
const result = pipe(42);
|
||||
expect(result).toBe(42);
|
||||
});
|
||||
|
||||
it("composes 1 function", () => {
|
||||
const result = pipe(5, (x: number) => x * 2);
|
||||
expect(result).toBe(10);
|
||||
});
|
||||
|
||||
it("composes 2 functions", () => {
|
||||
const result = pipe(5, (x: number) => x * 2, (x: number) => x + 1);
|
||||
expect(result).toBe(11);
|
||||
});
|
||||
|
||||
it("composes 3 functions", () => {
|
||||
const result = pipe(
|
||||
5,
|
||||
(x: number) => x * 2,
|
||||
(x: number) => x + 1,
|
||||
(x: number) => x * 3,
|
||||
);
|
||||
expect(result).toBe(33);
|
||||
});
|
||||
|
||||
it("composes 4 functions", () => {
|
||||
const result = pipe(
|
||||
5,
|
||||
(x: number) => x * 2,
|
||||
(x: number) => x + 1,
|
||||
(x: number) => x * 3,
|
||||
(x: number) => x - 5,
|
||||
);
|
||||
expect(result).toBe(28);
|
||||
});
|
||||
|
||||
it("composes filter and map with subscribe", async () => {
|
||||
type Events = {
|
||||
message: string;
|
||||
};
|
||||
const pubsub = createPubSub<Events>();
|
||||
const id = "test-id";
|
||||
|
||||
const subscription = pubsub.subscribe("message", id);
|
||||
const filtered = filter((e: { type: string; id: string; payload: string }) => e.payload.startsWith("hello"));
|
||||
const mapped = map((e: { type: string; id: string; payload: string }) => e.payload.toUpperCase());
|
||||
|
||||
const result = pipe(subscription, filtered, mapped);
|
||||
const iterator = result[Symbol.asyncIterator]();
|
||||
|
||||
const results: string[] = [];
|
||||
|
||||
const collectNext = () => iterator.next().then((r) => {
|
||||
if (!r.done) results.push(r.value);
|
||||
});
|
||||
|
||||
void collectNext();
|
||||
void collectNext();
|
||||
|
||||
pubsub.publish("message", id, "hello world");
|
||||
pubsub.publish("message", id, "goodbye");
|
||||
pubsub.publish("message", id, "hello again");
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
await iterator.return?.();
|
||||
|
||||
expect(results).toEqual(["HELLO WORLD", "HELLO AGAIN"]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("take", () => {
|
||||
it("yields first N items, then stops", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(take(source, 3));
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("yields all items if count exceeds source length", async () => {
|
||||
const source = fromArray([1, 2]);
|
||||
const result = await collect(take(source, 5));
|
||||
expect(result).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("yields nothing when count is 0", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(take(source, 0));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("works with single item", async () => {
|
||||
const source = fromArray([42]);
|
||||
const result = await collect(take(source, 1));
|
||||
expect(result).toEqual([42]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("reduce", () => {
|
||||
it("reduces to single value", async () => {
|
||||
const source = fromArray([1, 2, 3, 4]);
|
||||
const result = await reduce(source, (acc: number, val: number) => acc + val, 0);
|
||||
expect(result).toBe(10);
|
||||
});
|
||||
|
||||
it("returns initial value for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await reduce(source, (acc: number, val: number) => acc + val, 0);
|
||||
expect(result).toBe(0);
|
||||
});
|
||||
|
||||
it("supports async reducer", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await reduce(
|
||||
source,
|
||||
async (acc: number, val: number) => {
|
||||
await Promise.resolve();
|
||||
return acc + val;
|
||||
},
|
||||
0,
|
||||
);
|
||||
expect(result).toBe(6);
|
||||
});
|
||||
|
||||
it("reduces with string concatenation", async () => {
|
||||
const source = fromArray(["a", "b", "c"]);
|
||||
const result = await reduce(source, (acc: string, val: string) => acc + val, "");
|
||||
expect(result).toBe("abc");
|
||||
});
|
||||
});
|
||||
|
||||
describe("toArray", () => {
|
||||
it("collects all items into array", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await toArray(source);
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("returns empty array for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await toArray(source);
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("preserves order", async () => {
|
||||
const source = fromArray([5, 3, 1, 4, 2]);
|
||||
const result = await toArray(source);
|
||||
expect(result).toEqual([5, 3, 1, 4, 2]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("batch", () => {
|
||||
it("groups into arrays of size", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5, 6]);
|
||||
const result = await collect(batch(source, 2));
|
||||
expect(result).toEqual([[1, 2], [3, 4], [5, 6]]);
|
||||
});
|
||||
|
||||
it("yields remaining items if not a full batch", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(batch(source, 2));
|
||||
expect(result).toEqual([[1, 2], [3, 4], [5]]);
|
||||
});
|
||||
|
||||
it("yields single batch when source length equals size", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(batch(source, 3));
|
||||
expect(result).toEqual([[1, 2, 3]]);
|
||||
});
|
||||
|
||||
it("yields each item individually when size is 1", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(batch(source, 1));
|
||||
expect(result).toEqual([[1], [2], [3]]);
|
||||
});
|
||||
|
||||
it("returns empty for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await collect(batch(source, 3));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("dedupe", () => {
|
||||
it("yields only unique items", async () => {
|
||||
const source = fromArray([1, 2, 2, 3, 1, 4, 3]);
|
||||
const result = await collect(dedupe(source));
|
||||
expect(result).toEqual([1, 2, 3, 4]);
|
||||
});
|
||||
|
||||
it("yields all items when no duplicates", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(dedupe(source));
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("returns empty for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await collect(dedupe(source));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("handles all same values", async () => {
|
||||
const source = fromArray([5, 5, 5, 5]);
|
||||
const result = await collect(dedupe(source));
|
||||
expect(result).toEqual([5]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("window", () => {
|
||||
it("produces sliding window of size with default step 1", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5]);
|
||||
const result = await collect(window(source, 3));
|
||||
expect(result).toEqual([[1, 2, 3], [2, 3, 4], [3, 4, 5]]);
|
||||
});
|
||||
|
||||
it("advances by step greater than 1", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5, 6]);
|
||||
const result = await collect(window(source, 3, 2));
|
||||
expect(result).toEqual([[1, 2, 3], [3, 4, 5]]);
|
||||
});
|
||||
|
||||
it("yields nothing when source is shorter than window size", async () => {
|
||||
const source = fromArray([1, 2]);
|
||||
const result = await collect(window(source, 3));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("yields single window when source length equals size", async () => {
|
||||
const source = fromArray([1, 2, 3]);
|
||||
const result = await collect(window(source, 3));
|
||||
expect(result).toEqual([[1, 2, 3]]);
|
||||
});
|
||||
|
||||
it("returns empty for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await collect(window(source, 2));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("flat", () => {
|
||||
it("flattens AsyncIterable<T[]> into AsyncIterable<T>", async () => {
|
||||
async function* arrays() {
|
||||
yield [1, 2];
|
||||
yield [3, 4];
|
||||
yield [5];
|
||||
}
|
||||
const result = await collect(flat(arrays()));
|
||||
expect(result).toEqual([1, 2, 3, 4, 5]);
|
||||
});
|
||||
|
||||
it("handles empty inner arrays", async () => {
|
||||
async function* arrays() {
|
||||
yield [1, 2];
|
||||
yield [] as number[];
|
||||
yield [3];
|
||||
}
|
||||
const result = await collect(flat(arrays()));
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("returns empty for empty source", async () => {
|
||||
async function* arrays() {
|
||||
// empty
|
||||
}
|
||||
const result = await collect(flat(arrays()));
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("groupBy", () => {
|
||||
it("groups items by key into Map", async () => {
|
||||
const source = fromArray([
|
||||
{ name: "alice", dept: "eng" },
|
||||
{ name: "bob", dept: "eng" },
|
||||
{ name: "carol", dept: "sales" },
|
||||
]);
|
||||
const result = await groupBy(source, (v: { name: string; dept: string }) => v.dept);
|
||||
expect(result.get("eng")).toEqual([
|
||||
{ name: "alice", dept: "eng" },
|
||||
{ name: "bob", dept: "eng" },
|
||||
]);
|
||||
expect(result.get("sales")).toEqual([{ name: "carol", dept: "sales" }]);
|
||||
});
|
||||
|
||||
it("returns empty Map for empty source", async () => {
|
||||
const source = fromArray<number>([]);
|
||||
const result = await groupBy(source, (v: number) => v);
|
||||
expect(result.size).toBe(0);
|
||||
});
|
||||
|
||||
it("groups with numeric keys", async () => {
|
||||
const source = fromArray([1, 2, 3, 4, 5, 6]);
|
||||
const result = await groupBy(source, (v: number) => v % 2);
|
||||
expect(result.get(0)).toEqual([2, 4, 6]);
|
||||
expect(result.get(1)).toEqual([1, 3, 5]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("chain", () => {
|
||||
it("concatenates multiple async iterables", async () => {
|
||||
const source = chain(fromArray([1, 2]), fromArray([3, 4]), fromArray([5]));
|
||||
const result = await collect(source);
|
||||
expect(result).toEqual([1, 2, 3, 4, 5]);
|
||||
});
|
||||
|
||||
it("handles single iterable", async () => {
|
||||
const source = chain(fromArray([1, 2, 3]));
|
||||
const result = await collect(source);
|
||||
expect(result).toEqual([1, 2, 3]);
|
||||
});
|
||||
|
||||
it("handles empty iterables", async () => {
|
||||
const source = chain(fromArray<number>([]), fromArray([1, 2]), fromArray<number>([]));
|
||||
const result = await collect(source);
|
||||
expect(result).toEqual([1, 2]);
|
||||
});
|
||||
|
||||
it("returns empty when all sources are empty", async () => {
|
||||
const source = chain(fromArray<number>([]), fromArray<number>([]));
|
||||
const result = await collect(source);
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe("join", () => {
|
||||
it("joins two sources on matching keys", async () => {
|
||||
const source1 = fromArray([
|
||||
{ id: 1, name: "alice" },
|
||||
{ id: 2, name: "bob" },
|
||||
{ id: 3, name: "carol" },
|
||||
]);
|
||||
const source2 = fromArray([
|
||||
{ userId: 2, role: "admin" },
|
||||
{ userId: 3, role: "user" },
|
||||
{ userId: 1, role: "user" },
|
||||
]);
|
||||
const result = await collect(
|
||||
join(
|
||||
source1,
|
||||
source2,
|
||||
(v: { id: number; name: string }) => v.id,
|
||||
(v: { userId: number; role: string }) => v.userId,
|
||||
),
|
||||
);
|
||||
expect(result).toEqual([
|
||||
[
|
||||
{ id: 1, name: "alice" },
|
||||
{ userId: 1, role: "user" },
|
||||
],
|
||||
[
|
||||
{ id: 2, name: "bob" },
|
||||
{ userId: 2, role: "admin" },
|
||||
],
|
||||
[
|
||||
{ id: 3, name: "carol" },
|
||||
{ userId: 3, role: "user" },
|
||||
],
|
||||
]);
|
||||
});
|
||||
|
||||
it("yields nothing when no keys match", async () => {
|
||||
const source1 = fromArray([{ id: 1, name: "alice" }]);
|
||||
const source2 = fromArray([{ userId: 99, role: "admin" }]);
|
||||
const result = await collect(
|
||||
join(
|
||||
source1,
|
||||
source2,
|
||||
(v: { id: number; name: string }) => v.id,
|
||||
(v: { userId: number; role: string }) => v.userId,
|
||||
),
|
||||
);
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("handles empty sources", async () => {
|
||||
const source1 = fromArray<{ id: number; name: string }>([]);
|
||||
const source2 = fromArray<{ userId: number; role: string }>([]);
|
||||
const result = await collect(
|
||||
join(source1, source2, (v) => v.id, (v) => v.userId),
|
||||
);
|
||||
expect(result).toEqual([]);
|
||||
});
|
||||
|
||||
it("joins with duplicate keys in source2 (last wins)", async () => {
|
||||
const source1 = fromArray([{ id: 1, name: "alice" }]);
|
||||
const source2 = fromArray([
|
||||
{ userId: 1, role: "admin" },
|
||||
{ userId: 1, role: "user" },
|
||||
]);
|
||||
const result = await collect(
|
||||
join(
|
||||
source1,
|
||||
source2,
|
||||
(v: { id: number; name: string }) => v.id,
|
||||
(v: { userId: number; role: string }) => v.userId,
|
||||
),
|
||||
);
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0][0]).toEqual({ id: 1, name: "alice" });
|
||||
expect(result[0][1].role).toBe("user");
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user