diff --git a/test/integration-pubsub-redis.test.ts b/test/integration-pubsub-redis.test.ts new file mode 100644 index 0000000..dafc502 --- /dev/null +++ b/test/integration-pubsub-redis.test.ts @@ -0,0 +1,346 @@ +import { describe, it, expect, vi } from "vitest"; +import { createPubSub } from "../src/create_pubsub.js"; +import { createRedisEventTarget } from "../src/event-target-redis.js"; +import type { EventEnvelope } from "../src/types.js"; + +type TestEventMap = { + "message.sent": string; + "user.joined": { name: string }; + "call.responded": { status: string }; + "session.started": { sessionId: string; timestamp: number }; +}; + +function createLinkedMockRedis() { + const publications: { channel: string; message: string }[] = []; + const subscriptions: { channel: string }[] = []; + const unsubscriptions: { channel: string }[] = []; + let messageListener: ((channel: string, message: string) => void) | null = null; + + const publishClient = { + publish: vi.fn((channel: string, message: string) => { + publications.push({ channel, message }); + if (messageListener) { + setImmediate(() => messageListener!(channel, message)); + } + return 1; + }), + publications, + }; + + const subscribeClient = { + subscribe: vi.fn((channel: string) => { + subscriptions.push({ channel }); + }), + unsubscribe: vi.fn((channel: string) => { + unsubscriptions.push({ channel }); + }), + on: vi.fn((event: string, callback: (channel: string, message: string) => void) => { + if (event === "message") { + messageListener = callback; + } + return {} as any; + }), + subscriptions, + unsubscriptions, + }; + + return { publishClient, subscribeClient }; +} + +describe("createPubSub with Redis event target", () => { + describe("publish", () => { + it("dispatches event through Redis and subscriber receives it", async () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + const received: EventEnvelope<"message.sent", string>[] = []; + const iterator = pubsub.subscribe("message.sent", "msg1"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("message.sent", "msg1", "hello redis"); + + await consume; + + expect(received).toHaveLength(1); + expect(received[0].type).toBe("message.sent"); + expect(received[0].id).toBe("msg1"); + expect(received[0].payload).toBe("hello redis"); + }); + + it("publishes to Redis with correct channel name matching type:id", () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + pubsub.publish("user.joined", "user-42", { name: "Alice" }); + + expect(publishClient.publish).toHaveBeenCalledTimes(1); + expect(publishClient.publish).toHaveBeenCalledWith( + "user.joined:user-42", + JSON.stringify({ type: "user.joined", id: "user-42", payload: { name: "Alice" } }), + ); + }); + }); + + describe("subscribe", () => { + it("subscribes to Redis channel on subscribe call", async () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + const iterator = pubsub.subscribe("message.sent", "sub1"); + + const consume = (async () => { + for await (const envelope of iterator) { + if (envelope.payload === "done") break; + } + })(); + + expect(subscribeClient.subscribe).toHaveBeenCalledWith("message.sent:sub1"); + + pubsub.publish("message.sent", "sub1", "done"); + await consume; + }); + + it("receives events only for the subscribed topic", async () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + const received: EventEnvelope<"message.sent", string>[] = []; + + const iterator = pubsub.subscribe("message.sent", "filtered"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 2) break; + } + })(); + + pubsub.publish("message.sent", "filtered", "first"); + pubsub.publish("message.sent", "other", "wrong topic"); + pubsub.publish("message.sent", "filtered", "second"); + + await consume; + + expect(received).toHaveLength(2); + expect(received[0].payload).toBe("first"); + expect(received[1].payload).toBe("second"); + }); + + it("multiple subscribers on the same topic all receive events", async () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + const received1: EventEnvelope<"message.sent", string>[] = []; + const received2: EventEnvelope<"message.sent", string>[] = []; + + const iterator1 = pubsub.subscribe("message.sent", "broadcast1"); + const iterator2 = pubsub.subscribe("message.sent", "broadcast1"); + + const consume1 = (async () => { + for await (const envelope of iterator1) { + received1.push(envelope); + if (received1.length >= 1) break; + } + })(); + + const consume2 = (async () => { + for await (const envelope of iterator2) { + received2.push(envelope); + if (received2.length >= 1) break; + } + })(); + + pubsub.publish("message.sent", "broadcast1", "hello all"); + + await Promise.all([consume1, consume2]); + + expect(received1).toHaveLength(1); + expect(received2).toHaveLength(1); + expect(received1[0].payload).toBe("hello all"); + expect(received2[0].payload).toBe("hello all"); + }); + }); + + describe("full envelope round-trip", () => { + it("preserves type, id, and payload through serialization/deserialization", async () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + const received: EventEnvelope<"session.started", { sessionId: string; timestamp: number }>[] = []; + const iterator = pubsub.subscribe("session.started", "sess-abc"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("session.started", "sess-abc", { sessionId: "sess-abc", timestamp: 1700000000 }); + + await consume; + + expect(received).toHaveLength(1); + expect(received[0]).toEqual({ + type: "session.started", + id: "sess-abc", + payload: { sessionId: "sess-abc", timestamp: 1700000000 }, + }); + }); + + it("round-trips envelope with simple string payload", async () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + const received: EventEnvelope<"message.sent", string>[] = []; + const iterator = pubsub.subscribe("message.sent", "rt1"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("message.sent", "rt1", "round-trip test"); + + await consume; + + expect(received[0].type).toBe("message.sent"); + expect(received[0].id).toBe("rt1"); + expect(received[0].payload).toBe("round-trip test"); + }); + }); + + describe("topic scoping with type:id through Redis", () => { + it("scoped topics like call.responded:uuid-123 work correctly", async () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + const received: EventEnvelope<"call.responded", { status: string }>[] = []; + const iterator = pubsub.subscribe("call.responded", "uuid-123"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("call.responded", "uuid-123", { status: "ok" }); + + await consume; + + expect(received).toHaveLength(1); + expect(received[0].type).toBe("call.responded"); + expect(received[0].id).toBe("uuid-123"); + expect(received[0].payload).toEqual({ status: "ok" }); + + expect(publishClient.publish).toHaveBeenCalledWith( + "call.responded:uuid-123", + JSON.stringify({ type: "call.responded", id: "uuid-123", payload: { status: "ok" } }), + ); + expect(subscribeClient.subscribe).toHaveBeenCalledWith("call.responded:uuid-123"); + }); + + it("events on one scoped topic do not leak to another scoped topic", async () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + const received: EventEnvelope<"call.responded", { status: string }>[] = []; + const iterator = pubsub.subscribe("call.responded", "uuid-aaa"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("call.responded", "uuid-bbb", { status: "wrong" }); + pubsub.publish("call.responded", "uuid-aaa", { status: "correct" }); + + await consume; + + expect(received).toHaveLength(1); + expect(received[0].id).toBe("uuid-aaa"); + expect(received[0].payload).toEqual({ status: "correct" }); + }); + }); + + describe("channel prefix", () => { + it("applies prefix to both publish and subscribe channels", async () => { + const { publishClient, subscribeClient } = createLinkedMockRedis(); + const eventTarget = createRedisEventTarget({ + publishClient: publishClient as any, + subscribeClient: subscribeClient as any, + prefix: "alk:events:", + }); + const pubsub = createPubSub({ eventTarget: eventTarget as any }); + + const received: EventEnvelope<"message.sent", string>[] = []; + const iterator = pubsub.subscribe("message.sent", "prefixed-1"); + + const consume = (async () => { + for await (const envelope of iterator) { + received.push(envelope); + if (received.length >= 1) break; + } + })(); + + pubsub.publish("message.sent", "prefixed-1", "with prefix"); + + await consume; + + expect(subscribeClient.subscribe).toHaveBeenCalledWith("alk:events:message.sent:prefixed-1"); + expect(publishClient.publish).toHaveBeenCalledWith( + "alk:events:message.sent:prefixed-1", + JSON.stringify({ type: "message.sent", id: "prefixed-1", payload: "with prefix" }), + ); + expect(received).toHaveLength(1); + expect(received[0].payload).toBe("with prefix"); + }); + }); +}); \ No newline at end of file