Compare commits
1 Commits
wave1/core
...
wave1/core
| Author | SHA1 | Date | |
|---|---|---|---|
| dd720a9e0b |
272
test/create_pubsub.test.ts
Normal file
272
test/create_pubsub.test.ts
Normal file
@@ -0,0 +1,272 @@
|
||||
import { describe, it, expect, vi } from "vitest";
|
||||
import { createPubSub } from "../src/create_pubsub.js";
|
||||
import type { EventEnvelope, TypedEventTarget } from "../src/types.js";
|
||||
|
||||
type TestEventMap = {
|
||||
"message.sent": string;
|
||||
"user.joined": { name: string };
|
||||
"session.status": { status: string; code: number };
|
||||
};
|
||||
|
||||
describe("createPubSub", () => {
|
||||
describe("publish", () => {
|
||||
it("dispatches event with correct type:id topic", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<"message.sent", string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "abc123");
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "abc123", "hello");
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].type).toBe("message.sent");
|
||||
expect(received[0].id).toBe("abc123");
|
||||
});
|
||||
|
||||
it("throws on __-prefixed event types", () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
|
||||
expect(() => pubsub.publish("__subscribe" as any, "", {})).toThrow(
|
||||
'Event types starting with "__" are reserved for adapter control messages. Received: "__subscribe"',
|
||||
);
|
||||
});
|
||||
|
||||
it("dispatches EventEnvelope with type, id, and payload as CustomEvent detail", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<"user.joined", { name: string }>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("user.joined", "user1");
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("user.joined", "user1", { name: "Alice" });
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toEqual({
|
||||
type: "user.joined",
|
||||
id: "user1",
|
||||
payload: { name: "Alice" },
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe("subscribe", () => {
|
||||
it("returns async iterable that yields EventEnvelope objects", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "msg1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length === 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "msg1", "hello");
|
||||
|
||||
await consume;
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0]).toEqual({
|
||||
type: "message.sent",
|
||||
id: "msg1",
|
||||
payload: "hello",
|
||||
});
|
||||
});
|
||||
|
||||
it("yields envelope with correct type, id, and payload fields", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
let result: EventEnvelope<"session.status", { status: string; code: number }> | undefined;
|
||||
|
||||
const iterator = pubsub.subscribe("session.status", "sess1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
result = envelope;
|
||||
break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("session.status", "sess1", { status: "active", code: 200 });
|
||||
|
||||
await consume;
|
||||
|
||||
expect(result).toBeDefined();
|
||||
expect(result!.type).toBe("session.status");
|
||||
expect(result!.id).toBe("sess1");
|
||||
expect(result!.payload).toEqual({ status: "active", code: 200 });
|
||||
});
|
||||
|
||||
it("receives events only for the subscribed topic", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "msg1");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 2) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "msg1", "first");
|
||||
pubsub.publish("message.sent", "msg_different", "wrong topic");
|
||||
pubsub.publish("message.sent", "msg1", "second");
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(2);
|
||||
expect(received[0].payload).toBe("first");
|
||||
expect(received[1].payload).toBe("second");
|
||||
});
|
||||
|
||||
it("multiple subscribers on the same topic all receive events", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received1: EventEnvelope<string>[] = [];
|
||||
const received2: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator1 = pubsub.subscribe("message.sent", "msg1");
|
||||
const iterator2 = pubsub.subscribe("message.sent", "msg1");
|
||||
|
||||
const consume1 = (async () => {
|
||||
for await (const envelope of iterator1) {
|
||||
received1.push(envelope);
|
||||
if (received1.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
const consume2 = (async () => {
|
||||
for await (const envelope of iterator2) {
|
||||
received2.push(envelope);
|
||||
if (received2.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "msg1", "broadcast");
|
||||
|
||||
await Promise.all([consume1, consume2]);
|
||||
|
||||
expect(received1).toHaveLength(1);
|
||||
expect(received2).toHaveLength(1);
|
||||
expect(received1[0].payload).toBe("broadcast");
|
||||
expect(received2[0].payload).toBe("broadcast");
|
||||
});
|
||||
|
||||
it("cleanup: breaking out of for await loop removes the listener", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "cleanup-test");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "cleanup-test", "first");
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].payload).toBe("first");
|
||||
|
||||
pubsub.publish("message.sent", "cleanup-test", "after-break");
|
||||
await new Promise((resolve) => setTimeout(resolve, 20));
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
|
||||
const secondIterator = pubsub.subscribe("message.sent", "cleanup-test");
|
||||
const secondReceived: EventEnvelope<string>[] = [];
|
||||
const consume2 = (async () => {
|
||||
for await (const envelope of secondIterator) {
|
||||
secondReceived.push(envelope);
|
||||
if (secondReceived.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "cleanup-test", "second-listener");
|
||||
await consume2;
|
||||
|
||||
expect(secondReceived).toHaveLength(1);
|
||||
expect(secondReceived[0].payload).toBe("second-listener");
|
||||
expect(received).toHaveLength(1);
|
||||
});
|
||||
});
|
||||
|
||||
describe("createPubSub config", () => {
|
||||
it("with custom eventTarget dispatches to that target", () => {
|
||||
const customTarget = new EventTarget() as TypedEventTarget<any>;
|
||||
const dispatchSpy = vi.spyOn(customTarget, "dispatchEvent");
|
||||
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: customTarget });
|
||||
|
||||
pubsub.publish("message.sent", "custom1", "hello custom");
|
||||
|
||||
expect(dispatchSpy).toHaveBeenCalledTimes(1);
|
||||
expect(dispatchSpy.mock.calls[0][0].type).toBe("message.sent:custom1");
|
||||
expect((dispatchSpy.mock.calls[0][0] as CustomEvent).detail).toEqual({
|
||||
type: "message.sent",
|
||||
id: "custom1",
|
||||
payload: "hello custom",
|
||||
});
|
||||
|
||||
dispatchSpy.mockRestore();
|
||||
});
|
||||
|
||||
it("without eventTarget uses new EventTarget() (in-process)", async () => {
|
||||
const pubsub = createPubSub<TestEventMap>();
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "inproc1");
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "inproc1", "in-process works");
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].payload).toBe("in-process works");
|
||||
});
|
||||
|
||||
it("custom eventTarget receives events via subscribe", async () => {
|
||||
const customTarget = new EventTarget() as TypedEventTarget<any>;
|
||||
const pubsub = createPubSub<TestEventMap>({ eventTarget: customTarget });
|
||||
const received: EventEnvelope<string>[] = [];
|
||||
|
||||
const iterator = pubsub.subscribe("message.sent", "custom-sub");
|
||||
|
||||
const consume = (async () => {
|
||||
for await (const envelope of iterator) {
|
||||
received.push(envelope);
|
||||
if (received.length >= 1) break;
|
||||
}
|
||||
})();
|
||||
|
||||
pubsub.publish("message.sent", "custom-sub", "via custom");
|
||||
|
||||
await consume;
|
||||
|
||||
expect(received).toHaveLength(1);
|
||||
expect(received[0].payload).toBe("via custom");
|
||||
});
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user