import { describe, it, expect, vi } from "vitest"; import { createPubSub } from "../src/create_pubsub.js"; import type { EventEnvelope, TypedEventTarget } from "../src/types.js"; type TestEventMap = { "message.sent": string; "user.joined": { name: string }; "session.status": { status: string; code: number }; }; describe("createPubSub", () => { describe("publish", () => { it("dispatches event with correct type:id topic", async () => { const pubsub = createPubSub(); const received: EventEnvelope<"message.sent", string>[] = []; const iterator = pubsub.subscribe("message.sent", "abc123"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("message.sent", "abc123", "hello"); await consume; expect(received).toHaveLength(1); expect(received[0].type).toBe("message.sent"); expect(received[0].id).toBe("abc123"); }); it("throws on __-prefixed event types", () => { const pubsub = createPubSub(); expect(() => pubsub.publish("__subscribe" as any, "", {})).toThrow( 'Event types starting with "__" are reserved for adapter control messages. Received: "__subscribe"', ); }); it("dispatches EventEnvelope with type, id, and payload as CustomEvent detail", async () => { const pubsub = createPubSub(); const received: EventEnvelope<"user.joined", { name: string }>[] = []; const iterator = pubsub.subscribe("user.joined", "user1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("user.joined", "user1", { name: "Alice" }); await consume; expect(received).toHaveLength(1); expect(received[0]).toEqual({ type: "user.joined", id: "user1", payload: { name: "Alice" }, }); }); }); describe("subscribe", () => { it("returns async iterable that yields EventEnvelope objects", async () => { const pubsub = createPubSub(); const received: EventEnvelope[] = []; const iterator = pubsub.subscribe("message.sent", "msg1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length === 1) break; } })(); pubsub.publish("message.sent", "msg1", "hello"); await consume; expect(received).toHaveLength(1); expect(received[0]).toEqual({ type: "message.sent", id: "msg1", payload: "hello", }); }); it("yields envelope with correct type, id, and payload fields", async () => { const pubsub = createPubSub(); let result: EventEnvelope<"session.status", { status: string; code: number }> | undefined; const iterator = pubsub.subscribe("session.status", "sess1"); const consume = (async () => { for await (const envelope of iterator) { result = envelope; break; } })(); pubsub.publish("session.status", "sess1", { status: "active", code: 200 }); await consume; expect(result).toBeDefined(); expect(result!.type).toBe("session.status"); expect(result!.id).toBe("sess1"); expect(result!.payload).toEqual({ status: "active", code: 200 }); }); it("receives events only for the subscribed topic", async () => { const pubsub = createPubSub(); const received: EventEnvelope[] = []; const iterator = pubsub.subscribe("message.sent", "msg1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 2) break; } })(); pubsub.publish("message.sent", "msg1", "first"); pubsub.publish("message.sent", "msg_different", "wrong topic"); pubsub.publish("message.sent", "msg1", "second"); await consume; expect(received).toHaveLength(2); expect(received[0].payload).toBe("first"); expect(received[1].payload).toBe("second"); }); it("multiple subscribers on the same topic all receive events", async () => { const pubsub = createPubSub(); const received1: EventEnvelope[] = []; const received2: EventEnvelope[] = []; const iterator1 = pubsub.subscribe("message.sent", "msg1"); const iterator2 = pubsub.subscribe("message.sent", "msg1"); const consume1 = (async () => { for await (const envelope of iterator1) { received1.push(envelope); if (received1.length >= 1) break; } })(); const consume2 = (async () => { for await (const envelope of iterator2) { received2.push(envelope); if (received2.length >= 1) break; } })(); pubsub.publish("message.sent", "msg1", "broadcast"); await Promise.all([consume1, consume2]); expect(received1).toHaveLength(1); expect(received2).toHaveLength(1); expect(received1[0].payload).toBe("broadcast"); expect(received2[0].payload).toBe("broadcast"); }); it("cleanup: breaking out of for await loop removes the listener", async () => { const pubsub = createPubSub(); const received: EventEnvelope[] = []; const iterator = pubsub.subscribe("message.sent", "cleanup-test"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); break; } })(); pubsub.publish("message.sent", "cleanup-test", "first"); await consume; expect(received).toHaveLength(1); expect(received[0].payload).toBe("first"); pubsub.publish("message.sent", "cleanup-test", "after-break"); await new Promise((resolve) => setTimeout(resolve, 20)); expect(received).toHaveLength(1); const secondIterator = pubsub.subscribe("message.sent", "cleanup-test"); const secondReceived: EventEnvelope[] = []; const consume2 = (async () => { for await (const envelope of secondIterator) { secondReceived.push(envelope); if (secondReceived.length >= 1) break; } })(); pubsub.publish("message.sent", "cleanup-test", "second-listener"); await consume2; expect(secondReceived).toHaveLength(1); expect(secondReceived[0].payload).toBe("second-listener"); expect(received).toHaveLength(1); }); }); describe("createPubSub config", () => { it("with custom eventTarget dispatches to that target", () => { const customTarget = new EventTarget() as TypedEventTarget; const dispatchSpy = vi.spyOn(customTarget, "dispatchEvent"); const pubsub = createPubSub({ eventTarget: customTarget }); pubsub.publish("message.sent", "custom1", "hello custom"); expect(dispatchSpy).toHaveBeenCalledTimes(1); expect(dispatchSpy.mock.calls[0][0].type).toBe("message.sent:custom1"); expect((dispatchSpy.mock.calls[0][0] as CustomEvent).detail).toEqual({ type: "message.sent", id: "custom1", payload: "hello custom", }); dispatchSpy.mockRestore(); }); it("without eventTarget uses new EventTarget() (in-process)", async () => { const pubsub = createPubSub(); const received: EventEnvelope[] = []; const iterator = pubsub.subscribe("message.sent", "inproc1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("message.sent", "inproc1", "in-process works"); await consume; expect(received).toHaveLength(1); expect(received[0].payload).toBe("in-process works"); }); it("custom eventTarget receives events via subscribe", async () => { const customTarget = new EventTarget() as TypedEventTarget; const pubsub = createPubSub({ eventTarget: customTarget }); const received: EventEnvelope[] = []; const iterator = pubsub.subscribe("message.sent", "custom-sub"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("message.sent", "custom-sub", "via custom"); await consume; expect(received).toHaveLength(1); expect(received[0].payload).toBe("via custom"); }); }); });