feat(core-pubsub-tests): add comprehensive tests for createPubSub, EventEnvelope, and in-process event target
This commit is contained in:
272
test/create_pubsub.test.ts
Normal file
272
test/create_pubsub.test.ts
Normal file
@@ -0,0 +1,272 @@
|
|||||||
|
import { describe, it, expect, vi } from "vitest";
|
||||||
|
import { createPubSub } from "../src/create_pubsub.js";
|
||||||
|
import type { EventEnvelope, TypedEventTarget } from "../src/types.js";
|
||||||
|
|
||||||
|
type TestEventMap = {
|
||||||
|
"message.sent": string;
|
||||||
|
"user.joined": { name: string };
|
||||||
|
"session.status": { status: string; code: number };
|
||||||
|
};
|
||||||
|
|
||||||
|
describe("createPubSub", () => {
|
||||||
|
describe("publish", () => {
|
||||||
|
it("dispatches event with correct type:id topic", async () => {
|
||||||
|
const pubsub = createPubSub<TestEventMap>();
|
||||||
|
const received: EventEnvelope<"message.sent", string>[] = [];
|
||||||
|
|
||||||
|
const iterator = pubsub.subscribe("message.sent", "abc123");
|
||||||
|
const consume = (async () => {
|
||||||
|
for await (const envelope of iterator) {
|
||||||
|
received.push(envelope);
|
||||||
|
if (received.length >= 1) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "abc123", "hello");
|
||||||
|
await consume;
|
||||||
|
|
||||||
|
expect(received).toHaveLength(1);
|
||||||
|
expect(received[0].type).toBe("message.sent");
|
||||||
|
expect(received[0].id).toBe("abc123");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("throws on __-prefixed event types", () => {
|
||||||
|
const pubsub = createPubSub<TestEventMap>();
|
||||||
|
|
||||||
|
expect(() => pubsub.publish("__subscribe" as any, "", {})).toThrow(
|
||||||
|
'Event types starting with "__" are reserved for adapter control messages. Received: "__subscribe"',
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("dispatches EventEnvelope with type, id, and payload as CustomEvent detail", async () => {
|
||||||
|
const pubsub = createPubSub<TestEventMap>();
|
||||||
|
const received: EventEnvelope<"user.joined", { name: string }>[] = [];
|
||||||
|
|
||||||
|
const iterator = pubsub.subscribe("user.joined", "user1");
|
||||||
|
const consume = (async () => {
|
||||||
|
for await (const envelope of iterator) {
|
||||||
|
received.push(envelope);
|
||||||
|
if (received.length >= 1) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("user.joined", "user1", { name: "Alice" });
|
||||||
|
await consume;
|
||||||
|
|
||||||
|
expect(received).toHaveLength(1);
|
||||||
|
expect(received[0]).toEqual({
|
||||||
|
type: "user.joined",
|
||||||
|
id: "user1",
|
||||||
|
payload: { name: "Alice" },
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("subscribe", () => {
|
||||||
|
it("returns async iterable that yields EventEnvelope objects", async () => {
|
||||||
|
const pubsub = createPubSub<TestEventMap>();
|
||||||
|
const received: EventEnvelope<string>[] = [];
|
||||||
|
|
||||||
|
const iterator = pubsub.subscribe("message.sent", "msg1");
|
||||||
|
|
||||||
|
const consume = (async () => {
|
||||||
|
for await (const envelope of iterator) {
|
||||||
|
received.push(envelope);
|
||||||
|
if (received.length === 1) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "msg1", "hello");
|
||||||
|
|
||||||
|
await consume;
|
||||||
|
expect(received).toHaveLength(1);
|
||||||
|
expect(received[0]).toEqual({
|
||||||
|
type: "message.sent",
|
||||||
|
id: "msg1",
|
||||||
|
payload: "hello",
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("yields envelope with correct type, id, and payload fields", async () => {
|
||||||
|
const pubsub = createPubSub<TestEventMap>();
|
||||||
|
let result: EventEnvelope<"session.status", { status: string; code: number }> | undefined;
|
||||||
|
|
||||||
|
const iterator = pubsub.subscribe("session.status", "sess1");
|
||||||
|
|
||||||
|
const consume = (async () => {
|
||||||
|
for await (const envelope of iterator) {
|
||||||
|
result = envelope;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("session.status", "sess1", { status: "active", code: 200 });
|
||||||
|
|
||||||
|
await consume;
|
||||||
|
|
||||||
|
expect(result).toBeDefined();
|
||||||
|
expect(result!.type).toBe("session.status");
|
||||||
|
expect(result!.id).toBe("sess1");
|
||||||
|
expect(result!.payload).toEqual({ status: "active", code: 200 });
|
||||||
|
});
|
||||||
|
|
||||||
|
it("receives events only for the subscribed topic", async () => {
|
||||||
|
const pubsub = createPubSub<TestEventMap>();
|
||||||
|
const received: EventEnvelope<string>[] = [];
|
||||||
|
|
||||||
|
const iterator = pubsub.subscribe("message.sent", "msg1");
|
||||||
|
|
||||||
|
const consume = (async () => {
|
||||||
|
for await (const envelope of iterator) {
|
||||||
|
received.push(envelope);
|
||||||
|
if (received.length >= 2) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "msg1", "first");
|
||||||
|
pubsub.publish("message.sent", "msg_different", "wrong topic");
|
||||||
|
pubsub.publish("message.sent", "msg1", "second");
|
||||||
|
|
||||||
|
await consume;
|
||||||
|
|
||||||
|
expect(received).toHaveLength(2);
|
||||||
|
expect(received[0].payload).toBe("first");
|
||||||
|
expect(received[1].payload).toBe("second");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("multiple subscribers on the same topic all receive events", async () => {
|
||||||
|
const pubsub = createPubSub<TestEventMap>();
|
||||||
|
const received1: EventEnvelope<string>[] = [];
|
||||||
|
const received2: EventEnvelope<string>[] = [];
|
||||||
|
|
||||||
|
const iterator1 = pubsub.subscribe("message.sent", "msg1");
|
||||||
|
const iterator2 = pubsub.subscribe("message.sent", "msg1");
|
||||||
|
|
||||||
|
const consume1 = (async () => {
|
||||||
|
for await (const envelope of iterator1) {
|
||||||
|
received1.push(envelope);
|
||||||
|
if (received1.length >= 1) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
const consume2 = (async () => {
|
||||||
|
for await (const envelope of iterator2) {
|
||||||
|
received2.push(envelope);
|
||||||
|
if (received2.length >= 1) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "msg1", "broadcast");
|
||||||
|
|
||||||
|
await Promise.all([consume1, consume2]);
|
||||||
|
|
||||||
|
expect(received1).toHaveLength(1);
|
||||||
|
expect(received2).toHaveLength(1);
|
||||||
|
expect(received1[0].payload).toBe("broadcast");
|
||||||
|
expect(received2[0].payload).toBe("broadcast");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("cleanup: breaking out of for await loop removes the listener", async () => {
|
||||||
|
const pubsub = createPubSub<TestEventMap>();
|
||||||
|
const received: EventEnvelope<string>[] = [];
|
||||||
|
|
||||||
|
const iterator = pubsub.subscribe("message.sent", "cleanup-test");
|
||||||
|
|
||||||
|
const consume = (async () => {
|
||||||
|
for await (const envelope of iterator) {
|
||||||
|
received.push(envelope);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "cleanup-test", "first");
|
||||||
|
await consume;
|
||||||
|
|
||||||
|
expect(received).toHaveLength(1);
|
||||||
|
expect(received[0].payload).toBe("first");
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "cleanup-test", "after-break");
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||||
|
|
||||||
|
expect(received).toHaveLength(1);
|
||||||
|
|
||||||
|
const secondIterator = pubsub.subscribe("message.sent", "cleanup-test");
|
||||||
|
const secondReceived: EventEnvelope<string>[] = [];
|
||||||
|
const consume2 = (async () => {
|
||||||
|
for await (const envelope of secondIterator) {
|
||||||
|
secondReceived.push(envelope);
|
||||||
|
if (secondReceived.length >= 1) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "cleanup-test", "second-listener");
|
||||||
|
await consume2;
|
||||||
|
|
||||||
|
expect(secondReceived).toHaveLength(1);
|
||||||
|
expect(secondReceived[0].payload).toBe("second-listener");
|
||||||
|
expect(received).toHaveLength(1);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("createPubSub config", () => {
|
||||||
|
it("with custom eventTarget dispatches to that target", () => {
|
||||||
|
const customTarget = new EventTarget() as TypedEventTarget<any>;
|
||||||
|
const dispatchSpy = vi.spyOn(customTarget, "dispatchEvent");
|
||||||
|
|
||||||
|
const pubsub = createPubSub<TestEventMap>({ eventTarget: customTarget });
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "custom1", "hello custom");
|
||||||
|
|
||||||
|
expect(dispatchSpy).toHaveBeenCalledTimes(1);
|
||||||
|
expect(dispatchSpy.mock.calls[0][0].type).toBe("message.sent:custom1");
|
||||||
|
expect((dispatchSpy.mock.calls[0][0] as CustomEvent).detail).toEqual({
|
||||||
|
type: "message.sent",
|
||||||
|
id: "custom1",
|
||||||
|
payload: "hello custom",
|
||||||
|
});
|
||||||
|
|
||||||
|
dispatchSpy.mockRestore();
|
||||||
|
});
|
||||||
|
|
||||||
|
it("without eventTarget uses new EventTarget() (in-process)", async () => {
|
||||||
|
const pubsub = createPubSub<TestEventMap>();
|
||||||
|
const received: EventEnvelope<string>[] = [];
|
||||||
|
|
||||||
|
const iterator = pubsub.subscribe("message.sent", "inproc1");
|
||||||
|
const consume = (async () => {
|
||||||
|
for await (const envelope of iterator) {
|
||||||
|
received.push(envelope);
|
||||||
|
if (received.length >= 1) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "inproc1", "in-process works");
|
||||||
|
await consume;
|
||||||
|
|
||||||
|
expect(received).toHaveLength(1);
|
||||||
|
expect(received[0].payload).toBe("in-process works");
|
||||||
|
});
|
||||||
|
|
||||||
|
it("custom eventTarget receives events via subscribe", async () => {
|
||||||
|
const customTarget = new EventTarget() as TypedEventTarget<any>;
|
||||||
|
const pubsub = createPubSub<TestEventMap>({ eventTarget: customTarget });
|
||||||
|
const received: EventEnvelope<string>[] = [];
|
||||||
|
|
||||||
|
const iterator = pubsub.subscribe("message.sent", "custom-sub");
|
||||||
|
|
||||||
|
const consume = (async () => {
|
||||||
|
for await (const envelope of iterator) {
|
||||||
|
received.push(envelope);
|
||||||
|
if (received.length >= 1) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
pubsub.publish("message.sent", "custom-sub", "via custom");
|
||||||
|
|
||||||
|
await consume;
|
||||||
|
|
||||||
|
expect(received).toHaveLength(1);
|
||||||
|
expect(received[0].payload).toBe("via custom");
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user