test: add integration tests for createPubSub with Redis event target
This commit is contained in:
346
test/integration-pubsub-redis.test.ts
Normal file
346
test/integration-pubsub-redis.test.ts
Normal file
@@ -0,0 +1,346 @@
|
||||
import { describe, it, expect, vi } from "vitest";
|
||||
import { createPubSub } from "../src/create_pubsub.js";
|
||||
import { createRedisEventTarget } from "../src/event-target-redis.js";
|
||||
import type { EventEnvelope } from "../src/types.js";
|
||||
|
||||
type TestEventMap = {
|
||||
"message.sent": string;
|
||||
"user.joined": { name: string };
|
||||
"call.responded": { status: string };
|
||||
"session.started": { sessionId: string; timestamp: number };
|
||||
};
|
||||
|
||||
function createLinkedMockRedis() {
|
||||
const publications: { channel: string; message: string }[] = [];
|
||||
const subscriptions: { channel: string }[] = [];
|
||||
const unsubscriptions: { channel: string }[] = [];
|
||||
let messageListener: ((channel: string, message: string) => void) | null = null;
|
||||
|
||||
const publishClient = {
|
||||
publish: vi.fn((channel: string, message: string) => {
|
||||
publications.push({ channel, message });
|
||||
if (messageListener) {
|
||||
setImmediate(() => messageListener!(channel, message));
|
||||
}
|
||||
return 1;
|
||||
}),
|
||||
publications,
|
||||
};
|
||||
|
||||
const subscribeClient = {
|
||||
subscribe: vi.fn((channel: string) => {
|
||||
subscriptions.push({ channel });
|
||||
}),
|
||||
unsubscribe: vi.fn((channel: string) => {
|
||||
unsubscriptions.push({ channel });
|
||||
}),
|
||||
on: vi.fn((event: string, callback: (channel: string, message: string) => void) => {
|
||||
if (event === "message") {
|
||||
messageListener = callback;
|
||||
}
|
||||
return {} as any;
|
||||
}),
|
||||
subscriptions,
|
||||
unsubscriptions,
|
||||
};
|
||||
|
||||
return { publishClient, subscribeClient };
|
||||
}
|
||||
|
||||
describe("createPubSub with Redis event target", () => {
|
||||
describe("publish", () => {
|
||||
it("dispatches event through Redis and subscriber receives it", async () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
const received: EventEnvelope<"message.sent", string>[] = [];
|
||||
const iterator = pubsub.subscribe("message.sent", "msg1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "msg1", "hello redis");
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].type).toBe("message.sent");
|
||||
expect(received[0].id).toBe("msg1");
|
||||
expect(received[0].payload).toBe("hello redis");
|
||||
});
|
||||
|
||||
it("publishes to Redis with correct channel name matching type:id", () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
pubsub.publish("user.joined", "user-42", { name: "Alice" });
|
||||
|
||||
expect(publishClient.publish).toHaveBeenCalledTimes(1);
|
||||
expect(publishClient.publish).toHaveBeenCalledWith(
|
||||
"user.joined:user-42",
|
||||
JSON.stringify({ type: "user.joined", id: "user-42", payload: { name: "Alice" } }),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
describe("subscribe", () => {
|
||||
it("subscribes to Redis channel on subscribe call", async () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "sub1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
if (envelope.payload === "done") break;
|
||||
}
|
||||
})();
|
||||
|
||||
expect(subscribeClient.subscribe).toHaveBeenCalledWith("message.sent:sub1");
|
||||
|
||||
pubsub.publish("message.sent", "sub1", "done");
|
||||
await consume;
|
||||
});
|
||||
|
||||
it("receives events only for the subscribed topic", async () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
const received: EventEnvelope<"message.sent", string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "filtered");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 2) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "filtered", "first");
|
||||
pubsub.publish("message.sent", "other", "wrong topic");
|
||||
pubsub.publish("message.sent", "filtered", "second");
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(2);
|
||||
expect(received[0].payload).toBe("first");
|
||||
expect(received[1].payload).toBe("second");
|
||||
});
|
||||
|
||||
it("multiple subscribers on the same topic all receive events", async () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
const received1: EventEnvelope<"message.sent", string>[] = [];
|
||||
const received2: EventEnvelope<"message.sent", string>[] = [];
|
||||
|
||||
const iterator1 = pubsub.subscribe("message.sent", "broadcast1");
|
||||
const iterator2 = pubsub.subscribe("message.sent", "broadcast1");
|
||||
|
||||
const consume1 = (async () => {
|
||||
for await (const envelope of iterator1) {
|
||||
received1.push(envelope);
|
||||
if (received1.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
const consume2 = (async () => {
|
||||
for await (const envelope of iterator2) {
|
||||
received2.push(envelope);
|
||||
if (received2.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "broadcast1", "hello all");
|
||||
|
||||
await Promise.all([consume1, consume2]);
|
||||
|
||||
expect(received1).toHaveLength(1);
|
||||
expect(received2).toHaveLength(1);
|
||||
expect(received1[0].payload).toBe("hello all");
|
||||
expect(received2[0].payload).toBe("hello all");
|
||||
});
|
||||
});
|
||||
|
||||
describe("full envelope round-trip", () => {
|
||||
it("preserves type, id, and payload through serialization/deserialization", async () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
const received: EventEnvelope<"session.started", { sessionId: string; timestamp: number }>[] = [];
|
||||
const iterator = pubsub.subscribe("session.started", "sess-abc");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("session.started", "sess-abc", { sessionId: "sess-abc", timestamp: 1700000000 });
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toEqual({
|
||||
type: "session.started",
|
||||
id: "sess-abc",
|
||||
payload: { sessionId: "sess-abc", timestamp: 1700000000 },
|
||||
});
|
||||
});
|
||||
|
||||
it("round-trips envelope with simple string payload", async () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
const received: EventEnvelope<"message.sent", string>[] = [];
|
||||
const iterator = pubsub.subscribe("message.sent", "rt1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "rt1", "round-trip test");
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received[0].type).toBe("message.sent");
|
||||
expect(received[0].id).toBe("rt1");
|
||||
expect(received[0].payload).toBe("round-trip test");
|
||||
});
|
||||
});
|
||||
|
||||
describe("topic scoping with type:id through Redis", () => {
|
||||
it("scoped topics like call.responded:uuid-123 work correctly", async () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
const received: EventEnvelope<"call.responded", { status: string }>[] = [];
|
||||
const iterator = pubsub.subscribe("call.responded", "uuid-123");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("call.responded", "uuid-123", { status: "ok" });
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].type).toBe("call.responded");
|
||||
expect(received[0].id).toBe("uuid-123");
|
||||
expect(received[0].payload).toEqual({ status: "ok" });
|
||||
|
||||
expect(publishClient.publish).toHaveBeenCalledWith(
|
||||
"call.responded:uuid-123",
|
||||
JSON.stringify({ type: "call.responded", id: "uuid-123", payload: { status: "ok" } }),
|
||||
);
|
||||
expect(subscribeClient.subscribe).toHaveBeenCalledWith("call.responded:uuid-123");
|
||||
});
|
||||
|
||||
it("events on one scoped topic do not leak to another scoped topic", async () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
const received: EventEnvelope<"call.responded", { status: string }>[] = [];
|
||||
const iterator = pubsub.subscribe("call.responded", "uuid-aaa");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("call.responded", "uuid-bbb", { status: "wrong" });
|
||||
pubsub.publish("call.responded", "uuid-aaa", { status: "correct" });
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].id).toBe("uuid-aaa");
|
||||
expect(received[0].payload).toEqual({ status: "correct" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("channel prefix", () => {
|
||||
it("applies prefix to both publish and subscribe channels", async () => {
|
||||
const { publishClient, subscribeClient } = createLinkedMockRedis();
|
||||
const eventTarget = createRedisEventTarget({
|
||||
publishClient: publishClient as any,
|
||||
subscribeClient: subscribeClient as any,
|
||||
prefix: "alk:events:",
|
||||
});
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: eventTarget as any });
|
||||
|
||||
const received: EventEnvelope<"message.sent", string>[] = [];
|
||||
const iterator = pubsub.subscribe("message.sent", "prefixed-1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "prefixed-1", "with prefix");
|
||||
|
||||
await consume;
|
||||
|
||||
expect(subscribeClient.subscribe).toHaveBeenCalledWith("alk:events:message.sent:prefixed-1");
|
||||
expect(publishClient.publish).toHaveBeenCalledWith(
|
||||
"alk:events:message.sent:prefixed-1",
|
||||
JSON.stringify({ type: "message.sent", id: "prefixed-1", payload: "with prefix" }),
|
||||
);
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].payload).toBe("with prefix");
|
||||
});
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user