import { describe, it, expect, vi } from "vitest"; import { createPubSub } from "../src/create_pubsub.js"; import { createRedisEventTarget } from "../src/event-target-redis.js"; import type { EventEnvelope } from "../src/types.js"; type TestEventMap = { "message.sent": string; "user.joined": { name: string }; "call.responded": { status: string }; "session.started": { sessionId: string; timestamp: number }; }; function createLinkedMockRedis() { const publications: { channel: string; message: string }[] = []; const subscriptions: { channel: string }[] = []; const unsubscriptions: { channel: string }[] = []; let messageListener: ((channel: string, message: string) => void) | null = null; const publishClient = { publish: vi.fn((channel: string, message: string) => { publications.push({ channel, message }); if (messageListener) { setImmediate(() => messageListener!(channel, message)); } return 1; }), publications, }; const subscribeClient = { subscribe: vi.fn((channel: string) => { subscriptions.push({ channel }); }), unsubscribe: vi.fn((channel: string) => { unsubscriptions.push({ channel }); }), on: vi.fn((event: string, callback: (channel: string, message: string) => void) => { if (event === "message") { messageListener = callback; } return {} as any; }), subscriptions, unsubscriptions, }; return { publishClient, subscribeClient }; } describe("createPubSub with Redis event target", () => { describe("publish", () => { it("dispatches event through Redis and subscriber receives it", async () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); const received: EventEnvelope<"message.sent", string>[] = []; const iterator = pubsub.subscribe("message.sent", "msg1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("message.sent", "msg1", "hello redis"); await consume; expect(received).toHaveLength(1); expect(received[0].type).toBe("message.sent"); expect(received[0].id).toBe("msg1"); expect(received[0].payload).toBe("hello redis"); }); it("publishes to Redis with correct channel name matching type:id", () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); pubsub.publish("user.joined", "user-42", { name: "Alice" }); expect(publishClient.publish).toHaveBeenCalledTimes(1); expect(publishClient.publish).toHaveBeenCalledWith( "user.joined:user-42", JSON.stringify({ type: "user.joined", id: "user-42", payload: { name: "Alice" } }), ); }); }); describe("subscribe", () => { it("subscribes to Redis channel on subscribe call", async () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); const iterator = pubsub.subscribe("message.sent", "sub1"); const consume = (async () => { for await (const envelope of iterator) { if (envelope.payload === "done") break; } })(); expect(subscribeClient.subscribe).toHaveBeenCalledWith("message.sent:sub1"); pubsub.publish("message.sent", "sub1", "done"); await consume; }); it("receives events only for the subscribed topic", async () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); const received: EventEnvelope<"message.sent", string>[] = []; const iterator = pubsub.subscribe("message.sent", "filtered"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 2) break; } })(); pubsub.publish("message.sent", "filtered", "first"); pubsub.publish("message.sent", "other", "wrong topic"); pubsub.publish("message.sent", "filtered", "second"); await consume; expect(received).toHaveLength(2); expect(received[0].payload).toBe("first"); expect(received[1].payload).toBe("second"); }); it("multiple subscribers on the same topic all receive events", async () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); const received1: EventEnvelope<"message.sent", string>[] = []; const received2: EventEnvelope<"message.sent", string>[] = []; const iterator1 = pubsub.subscribe("message.sent", "broadcast1"); const iterator2 = pubsub.subscribe("message.sent", "broadcast1"); const consume1 = (async () => { for await (const envelope of iterator1) { received1.push(envelope); if (received1.length >= 1) break; } })(); const consume2 = (async () => { for await (const envelope of iterator2) { received2.push(envelope); if (received2.length >= 1) break; } })(); pubsub.publish("message.sent", "broadcast1", "hello all"); await Promise.all([consume1, consume2]); expect(received1).toHaveLength(1); expect(received2).toHaveLength(1); expect(received1[0].payload).toBe("hello all"); expect(received2[0].payload).toBe("hello all"); }); }); describe("full envelope round-trip", () => { it("preserves type, id, and payload through serialization/deserialization", async () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); const received: EventEnvelope<"session.started", { sessionId: string; timestamp: number }>[] = []; const iterator = pubsub.subscribe("session.started", "sess-abc"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("session.started", "sess-abc", { sessionId: "sess-abc", timestamp: 1700000000 }); await consume; expect(received).toHaveLength(1); expect(received[0]).toEqual({ type: "session.started", id: "sess-abc", payload: { sessionId: "sess-abc", timestamp: 1700000000 }, }); }); it("round-trips envelope with simple string payload", async () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); const received: EventEnvelope<"message.sent", string>[] = []; const iterator = pubsub.subscribe("message.sent", "rt1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("message.sent", "rt1", "round-trip test"); await consume; expect(received[0].type).toBe("message.sent"); expect(received[0].id).toBe("rt1"); expect(received[0].payload).toBe("round-trip test"); }); }); describe("topic scoping with type:id through Redis", () => { it("scoped topics like call.responded:uuid-123 work correctly", async () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); const received: EventEnvelope<"call.responded", { status: string }>[] = []; const iterator = pubsub.subscribe("call.responded", "uuid-123"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("call.responded", "uuid-123", { status: "ok" }); await consume; expect(received).toHaveLength(1); expect(received[0].type).toBe("call.responded"); expect(received[0].id).toBe("uuid-123"); expect(received[0].payload).toEqual({ status: "ok" }); expect(publishClient.publish).toHaveBeenCalledWith( "call.responded:uuid-123", JSON.stringify({ type: "call.responded", id: "uuid-123", payload: { status: "ok" } }), ); expect(subscribeClient.subscribe).toHaveBeenCalledWith("call.responded:uuid-123"); }); it("events on one scoped topic do not leak to another scoped topic", async () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); const received: EventEnvelope<"call.responded", { status: string }>[] = []; const iterator = pubsub.subscribe("call.responded", "uuid-aaa"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("call.responded", "uuid-bbb", { status: "wrong" }); pubsub.publish("call.responded", "uuid-aaa", { status: "correct" }); await consume; expect(received).toHaveLength(1); expect(received[0].id).toBe("uuid-aaa"); expect(received[0].payload).toEqual({ status: "correct" }); }); }); describe("channel prefix", () => { it("applies prefix to both publish and subscribe channels", async () => { const { publishClient, subscribeClient } = createLinkedMockRedis(); const eventTarget = createRedisEventTarget({ publishClient: publishClient as any, subscribeClient: subscribeClient as any, prefix: "alk:events:", }); const pubsub = createPubSub({ eventTarget: eventTarget as any }); const received: EventEnvelope<"message.sent", string>[] = []; const iterator = pubsub.subscribe("message.sent", "prefixed-1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("message.sent", "prefixed-1", "with prefix"); await consume; expect(subscribeClient.subscribe).toHaveBeenCalledWith("alk:events:message.sent:prefixed-1"); expect(publishClient.publish).toHaveBeenCalledWith( "alk:events:message.sent:prefixed-1", JSON.stringify({ type: "message.sent", id: "prefixed-1", payload: "with prefix" }), ); expect(received).toHaveLength(1); expect(received[0].payload).toBe("with prefix"); }); }); });