From dd720a9e0b0e2dfda5d04aa6399ff45607dc3a3a Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Fri, 8 May 2026 06:06:36 +0000 Subject: [PATCH] feat(core-pubsub-tests): add comprehensive tests for createPubSub, EventEnvelope, and in-process event target --- test/create_pubsub.test.ts | 272 +++++++++++++++++++++++++++++++++++++ 1 file changed, 272 insertions(+) create mode 100644 test/create_pubsub.test.ts diff --git a/test/create_pubsub.test.ts b/test/create_pubsub.test.ts new file mode 100644 index 0000000..0c3f882 --- /dev/null +++ b/test/create_pubsub.test.ts @@ -0,0 +1,272 @@ +import { describe, it, expect, vi } from "vitest"; +import { createPubSub } from "../src/create_pubsub.js"; +import type { EventEnvelope, TypedEventTarget } from "../src/types.js"; + +type TestEventMap = { + "message.sent": string; + "user.joined": { name: string }; + "session.status": { status: string; code: number }; +}; + +describe("createPubSub", () => { + describe("publish", () => { + it("dispatches event with correct type:id topic", async () => { + const pubsub = createPubSub(); + const received: EventEnvelope<"message.sent", string>[] = []; + + const iterator = pubsub.subscribe("message.sent", "abc123"); + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("message.sent", "abc123", "hello"); + await consume; + + expect(received).toHaveLength(1); + expect(received[0].type).toBe("message.sent"); + expect(received[0].id).toBe("abc123"); + }); + + it("throws on __-prefixed event types", () => { + const pubsub = createPubSub(); + + expect(() => pubsub.publish("__subscribe" as any, "", {})).toThrow( + 'Event types starting with "__" are reserved for adapter control messages. Received: "__subscribe"', + ); + }); + + it("dispatches EventEnvelope with type, id, and payload as CustomEvent detail", async () => { + const pubsub = createPubSub(); + const received: EventEnvelope<"user.joined", { name: string }>[] = []; + + const iterator = pubsub.subscribe("user.joined", "user1"); + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("user.joined", "user1", { name: "Alice" }); + await consume; + + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ + type: "user.joined", + id: "user1", + payload: { name: "Alice" }, + }); + }); + }); + + describe("subscribe", () => { + it("returns async iterable that yields EventEnvelope objects", async () => { + const pubsub = createPubSub(); + const received: EventEnvelope[] = []; + + const iterator = pubsub.subscribe("message.sent", "msg1"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length === 1) break; + } + })(); + + pubsub.publish("message.sent", "msg1", "hello"); + + await consume; + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ + type: "message.sent", + id: "msg1", + payload: "hello", + }); + }); + + it("yields envelope with correct type, id, and payload fields", async () => { + const pubsub = createPubSub(); + let result: EventEnvelope<"session.status", { status: string; code: number }> | undefined; + + const iterator = pubsub.subscribe("session.status", "sess1"); + + const consume = (async () => { + for await (const envelope of iterator) { + result = envelope; + break; + } + })(); + + pubsub.publish("session.status", "sess1", { status: "active", code: 200 }); + + await consume; + + expect(result).toBeDefined(); + expect(result!.type).toBe("session.status"); + expect(result!.id).toBe("sess1"); + expect(result!.payload).toEqual({ status: "active", code: 200 }); + }); + + it("receives events only for the subscribed topic", async () => { + const pubsub = createPubSub(); + const received: EventEnvelope[] = []; + + const iterator = pubsub.subscribe("message.sent", "msg1"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 2) break; + } + })(); + + pubsub.publish("message.sent", "msg1", "first"); + pubsub.publish("message.sent", "msg_different", "wrong topic"); + pubsub.publish("message.sent", "msg1", "second"); + + await consume; + + expect(received).toHaveLength(2); + expect(received[0].payload).toBe("first"); + expect(received[1].payload).toBe("second"); + }); + + it("multiple subscribers on the same topic all receive events", async () => { + const pubsub = createPubSub(); + const received1: EventEnvelope[] = []; + const received2: EventEnvelope[] = []; + + const iterator1 = pubsub.subscribe("message.sent", "msg1"); + const iterator2 = pubsub.subscribe("message.sent", "msg1"); + + const consume1 = (async () => { + for await (const envelope of iterator1) { + received1.push(envelope); + if (received1.length >= 1) break; + } + })(); + + const consume2 = (async () => { + for await (const envelope of iterator2) { + received2.push(envelope); + if (received2.length >= 1) break; + } + })(); + + pubsub.publish("message.sent", "msg1", "broadcast"); + + await Promise.all([consume1, consume2]); + + expect(received1).toHaveLength(1); + expect(received2).toHaveLength(1); + expect(received1[0].payload).toBe("broadcast"); + expect(received2[0].payload).toBe("broadcast"); + }); + + it("cleanup: breaking out of for await loop removes the listener", async () => { + const pubsub = createPubSub(); + const received: EventEnvelope[] = []; + + const iterator = pubsub.subscribe("message.sent", "cleanup-test"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + break; + } + })(); + + pubsub.publish("message.sent", "cleanup-test", "first"); + await consume; + + expect(received).toHaveLength(1); + expect(received[0].payload).toBe("first"); + + pubsub.publish("message.sent", "cleanup-test", "after-break"); + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(received).toHaveLength(1); + + const secondIterator = pubsub.subscribe("message.sent", "cleanup-test"); + const secondReceived: EventEnvelope[] = []; + const consume2 = (async () => { + for await (const envelope of secondIterator) { + secondReceived.push(envelope); + if (secondReceived.length >= 1) break; + } + })(); + + pubsub.publish("message.sent", "cleanup-test", "second-listener"); + await consume2; + + expect(secondReceived).toHaveLength(1); + expect(secondReceived[0].payload).toBe("second-listener"); + expect(received).toHaveLength(1); + }); + }); + + describe("createPubSub config", () => { + it("with custom eventTarget dispatches to that target", () => { + const customTarget = new EventTarget() as TypedEventTarget; + const dispatchSpy = vi.spyOn(customTarget, "dispatchEvent"); + + const pubsub = createPubSub({ eventTarget: customTarget }); + + pubsub.publish("message.sent", "custom1", "hello custom"); + + expect(dispatchSpy).toHaveBeenCalledTimes(1); + expect(dispatchSpy.mock.calls[0][0].type).toBe("message.sent:custom1"); + expect((dispatchSpy.mock.calls[0][0] as CustomEvent).detail).toEqual({ + type: "message.sent", + id: "custom1", + payload: "hello custom", + }); + + dispatchSpy.mockRestore(); + }); + + it("without eventTarget uses new EventTarget() (in-process)", async () => { + const pubsub = createPubSub(); + const received: EventEnvelope[] = []; + + const iterator = pubsub.subscribe("message.sent", "inproc1"); + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("message.sent", "inproc1", "in-process works"); + await consume; + + expect(received).toHaveLength(1); + expect(received[0].payload).toBe("in-process works"); + }); + + it("custom eventTarget receives events via subscribe", async () => { + const customTarget = new EventTarget() as TypedEventTarget; + const pubsub = createPubSub({ eventTarget: customTarget }); + const received: EventEnvelope[] = []; + + const iterator = pubsub.subscribe("message.sent", "custom-sub"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("message.sent", "custom-sub", "via custom"); + + await consume; + + expect(received).toHaveLength(1); + expect(received[0].payload).toBe("via custom"); + }); + }); +}); \ No newline at end of file