729 lines
27 KiB
TypeScript
729 lines
27 KiB
TypeScript
import { describe, it, expect, vi, beforeEach } from "vitest";
|
|
import { createWebSocketClientEventTarget } from "../src/event-target-websocket-client.js";
|
|
import { createWebSocketServerEventTarget } from "../src/event-target-websocket-server.js";
|
|
import type { SpokeEventTarget } from "../src/event-target-websocket-server.js";
|
|
import { createPubSub } from "../src/create_pubsub.js";
|
|
import type { EventEnvelope, TypedEvent } from "../src/types.js";
|
|
|
|
type TestEvent = TypedEvent<string, EventEnvelope>;
|
|
|
|
function createPipe() {
|
|
let serverOnmessage: ((ev: { data: string }) => void) | null = null;
|
|
let clientOnmessage: ((ev: { data: string }) => void) | null = null;
|
|
let serverOnclose: ((ev: { code: number; reason?: string }) => void) | null = null;
|
|
|
|
const serverSideWs = {
|
|
bufferedAmount: 0,
|
|
sent: [] as string[],
|
|
send: vi.fn((data: string) => {
|
|
clientOnmessage?.({ data });
|
|
}) as any,
|
|
close: vi.fn() as any,
|
|
get onmessage() {
|
|
return serverOnmessage;
|
|
},
|
|
set onmessage(handler: ((ev: { data: string }) => void) | null) {
|
|
serverOnmessage = handler;
|
|
},
|
|
get onclose() {
|
|
return serverOnclose;
|
|
},
|
|
set onclose(handler: ((ev: { code: number; reason?: string }) => void) | null) {
|
|
serverOnclose = handler;
|
|
},
|
|
simulateMessage(data: string) {
|
|
if (serverOnmessage) {
|
|
serverOnmessage({ data });
|
|
}
|
|
},
|
|
simulateClose(code = 1000, reason = "") {
|
|
if (serverOnclose) {
|
|
serverOnclose({ code, reason });
|
|
}
|
|
},
|
|
};
|
|
|
|
Object.defineProperty(serverSideWs, "onmessage", {
|
|
get() {
|
|
return serverOnmessage;
|
|
},
|
|
set(handler: ((ev: { data: string }) => void) | null) {
|
|
serverOnmessage = handler;
|
|
},
|
|
});
|
|
|
|
Object.defineProperty(serverSideWs, "onclose", {
|
|
get() {
|
|
return serverOnclose;
|
|
},
|
|
set(handler: ((ev: { code: number; reason?: string }) => void) | null) {
|
|
serverOnclose = handler;
|
|
},
|
|
});
|
|
|
|
let clientOnclose: ((ev: { code: number; reason?: string }) => void) | null = null;
|
|
|
|
const clientSideWs = {
|
|
sent: [] as string[],
|
|
send: vi.fn((data: string) => {
|
|
serverOnmessage?.({ data });
|
|
}) as any,
|
|
get onmessage() {
|
|
return clientOnmessage;
|
|
},
|
|
set onmessage(handler: ((ev: { data: string }) => void) | null) {
|
|
clientOnmessage = handler;
|
|
},
|
|
get onclose() {
|
|
return clientOnclose;
|
|
},
|
|
set onclose(handler: ((ev: { code: number; reason?: string }) => void) | null) {
|
|
clientOnclose = handler;
|
|
},
|
|
simulateMessage(data: string) {
|
|
if (clientOnmessage) {
|
|
clientOnmessage({ data });
|
|
}
|
|
},
|
|
simulateClose(code = 1000, reason = "") {
|
|
if (clientOnclose) {
|
|
clientOnclose({ code, reason });
|
|
}
|
|
},
|
|
};
|
|
|
|
Object.defineProperty(clientSideWs, "onmessage", {
|
|
get() {
|
|
return clientOnmessage;
|
|
},
|
|
set(handler: ((ev: { data: string }) => void) | null) {
|
|
clientOnmessage = handler;
|
|
},
|
|
});
|
|
|
|
Object.defineProperty(clientSideWs, "onclose", {
|
|
get() {
|
|
return clientOnclose;
|
|
},
|
|
set(handler: ((ev: { code: number; reason?: string }) => void) | null) {
|
|
clientOnclose = handler;
|
|
},
|
|
});
|
|
|
|
return { serverSideWs, clientSideWs };
|
|
}
|
|
|
|
describe("WebSocket Client-Server Integration", () => {
|
|
describe("bidirectional communication", () => {
|
|
it("client sends event to server via dispatchEvent, server fan-out delivers to subscribed clients", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
const client1Listener = vi.fn();
|
|
client1.addEventListener("chat:room1", client1Listener);
|
|
|
|
const client2Listener = vi.fn();
|
|
client2.addEventListener("chat:room1", client2Listener);
|
|
|
|
const serverListener = vi.fn();
|
|
server.addEventListener("chat:room1", serverListener);
|
|
|
|
const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" };
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
server.dispatchEvent(event);
|
|
|
|
expect(client1Listener).toHaveBeenCalledTimes(1);
|
|
expect(client2Listener).toHaveBeenCalledTimes(1);
|
|
expect(serverListener).toHaveBeenCalledTimes(1);
|
|
|
|
expect((client1Listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
|
|
expect((client2Listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
|
|
expect(serverListener.mock.calls[0][0].detail).toEqual(envelope);
|
|
});
|
|
|
|
it("client dispatches event, server local listener receives it, and it fans out to other subscribed clients", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
client2.addEventListener("chat:room1", vi.fn());
|
|
|
|
const serverListener = vi.fn();
|
|
server.addEventListener("chat:room1", serverListener);
|
|
|
|
const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "from client1" };
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
client1.dispatchEvent(event);
|
|
|
|
expect(serverListener).toHaveBeenCalledTimes(1);
|
|
expect((serverListener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
|
|
});
|
|
});
|
|
|
|
describe("subscription control protocol end-to-end", () => {
|
|
it("client addEventListener sends __subscribe, server tracks subscription, server dispatchEvent sends to subscribed client, client removeEventListener sends __unsubscribe, server removes from subscription map", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs, clientSideWs } = createPipe();
|
|
|
|
server.addConnection(serverSideWs as any);
|
|
const client = createWebSocketClientEventTarget<TestEvent>(clientSideWs as any);
|
|
|
|
const listener = vi.fn();
|
|
client.addEventListener("chat:room1", listener);
|
|
|
|
const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" };
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
server.dispatchEvent(event);
|
|
|
|
expect(listener).toHaveBeenCalledTimes(1);
|
|
expect((listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
|
|
|
|
client.removeEventListener("chat:room1", listener);
|
|
|
|
const event2 = new CustomEvent("chat:room1", {
|
|
detail: { type: "chat", id: "room1", payload: "world" },
|
|
}) as TestEvent;
|
|
server.dispatchEvent(event2);
|
|
|
|
expect(listener).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("__subscribe is only sent once for multiple listeners on the same topic", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs, clientSideWs } = createPipe();
|
|
|
|
server.addConnection(serverSideWs as any);
|
|
const client = createWebSocketClientEventTarget<TestEvent>(clientSideWs as any);
|
|
|
|
const listener1 = vi.fn();
|
|
const listener2 = vi.fn();
|
|
client.addEventListener("chat:room1", listener1);
|
|
client.addEventListener("chat:room1", listener2);
|
|
|
|
const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" };
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
server.dispatchEvent(event);
|
|
|
|
expect(listener1).toHaveBeenCalledTimes(1);
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("__unsubscribe is sent only when the last listener is removed", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs, clientSideWs } = createPipe();
|
|
|
|
server.addConnection(serverSideWs as any);
|
|
const client = createWebSocketClientEventTarget<TestEvent>(clientSideWs as any);
|
|
|
|
const listener1 = vi.fn();
|
|
const listener2 = vi.fn();
|
|
client.addEventListener("chat:room1", listener1);
|
|
client.addEventListener("chat:room1", listener2);
|
|
|
|
client.removeEventListener("chat:room1", listener1);
|
|
|
|
const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "still here" };
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
server.dispatchEvent(event);
|
|
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
|
|
client.removeEventListener("chat:room1", listener2);
|
|
|
|
const event2 = new CustomEvent("chat:room1", {
|
|
detail: { type: "chat", id: "room1", payload: "gone" },
|
|
}) as TestEvent;
|
|
server.dispatchEvent(event2);
|
|
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
});
|
|
});
|
|
|
|
describe("topic-based fan-out", () => {
|
|
it("subscribed clients receive events on their topic, unsubscribed clients do NOT", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
const room1Listener = vi.fn();
|
|
client1.addEventListener("chat:room1", room1Listener);
|
|
|
|
const room2Listener = vi.fn();
|
|
client2.addEventListener("chat:room2", room2Listener);
|
|
|
|
const envelope1: EventEnvelope = { type: "chat", id: "room1", payload: "hello room1" };
|
|
const event1 = new CustomEvent("chat:room1", { detail: envelope1 }) as TestEvent;
|
|
server.dispatchEvent(event1);
|
|
|
|
expect(room1Listener).toHaveBeenCalledTimes(1);
|
|
expect(room2Listener).not.toHaveBeenCalled();
|
|
|
|
const envelope2: EventEnvelope = { type: "chat", id: "room2", payload: "hello room2" };
|
|
const event2 = new CustomEvent("chat:room2", { detail: envelope2 }) as TestEvent;
|
|
server.dispatchEvent(event2);
|
|
|
|
expect(room1Listener).toHaveBeenCalledTimes(1);
|
|
expect(room2Listener).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("multiple clients subscribed to the same topic all receive events", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
const { serverSideWs: s3, clientSideWs: c3 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
server.addConnection(s3 as any);
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
const client3 = createWebSocketClientEventTarget<TestEvent>(c3 as any);
|
|
|
|
const listener1 = vi.fn();
|
|
const listener2 = vi.fn();
|
|
const listener3 = vi.fn();
|
|
|
|
client1.addEventListener("chat:room1", listener1);
|
|
client2.addEventListener("chat:room1", listener2);
|
|
client3.addEventListener("chat:room2", listener3);
|
|
|
|
const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "broadcast" };
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
server.dispatchEvent(event);
|
|
|
|
expect(listener1).toHaveBeenCalledTimes(1);
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
expect(listener3).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("client that unsubscribes stops receiving events", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
const listener1 = vi.fn();
|
|
const listener2 = vi.fn();
|
|
|
|
client1.addEventListener("chat:room1", listener1);
|
|
client2.addEventListener("chat:room1", listener2);
|
|
|
|
const envelope1: EventEnvelope = { type: "chat", id: "room1", payload: "first" };
|
|
const event1 = new CustomEvent("chat:room1", { detail: envelope1 }) as TestEvent;
|
|
server.dispatchEvent(event1);
|
|
|
|
expect(listener1).toHaveBeenCalledTimes(1);
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
|
|
client2.removeEventListener("chat:room1", listener2);
|
|
|
|
const envelope2: EventEnvelope = { type: "chat", id: "room1", payload: "second" };
|
|
const event2 = new CustomEvent("chat:room1", { detail: envelope2 }) as TestEvent;
|
|
server.dispatchEvent(event2);
|
|
|
|
expect(listener1).toHaveBeenCalledTimes(2);
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
});
|
|
});
|
|
|
|
describe("direct messaging via topic scoping", () => {
|
|
it("server dispatches to direct:${spokeId}, only that specific spoke receives it", () => {
|
|
const onConnection = vi.fn();
|
|
const server = createWebSocketServerEventTarget<TestEvent>({ onConnection });
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const spoke1 = onConnection.mock.calls[0][0] as SpokeEventTarget<TestEvent>;
|
|
const spoke2 = onConnection.mock.calls[1][0] as SpokeEventTarget<TestEvent>;
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
const client1DirectListener = vi.fn();
|
|
const client2DirectListener = vi.fn();
|
|
client1.addEventListener("direct:spoke1", client1DirectListener);
|
|
client2.addEventListener("direct:spoke2", client2DirectListener);
|
|
|
|
const directEnvelope: EventEnvelope = { type: "direct", id: "spoke1", payload: "secret for spoke1" };
|
|
const directEvent = new CustomEvent("direct:spoke1", { detail: directEnvelope }) as TestEvent;
|
|
spoke1.dispatchEvent(directEvent);
|
|
|
|
expect(client1DirectListener).toHaveBeenCalledTimes(1);
|
|
expect((client1DirectListener.mock.calls[0][0] as TestEvent).detail).toEqual(directEnvelope);
|
|
expect(client2DirectListener).not.toHaveBeenCalled();
|
|
|
|
const directEnvelope2: EventEnvelope = { type: "direct", id: "spoke2", payload: "secret for spoke2" };
|
|
const directEvent2 = new CustomEvent("direct:spoke2", { detail: directEnvelope2 }) as TestEvent;
|
|
spoke2.dispatchEvent(directEvent2);
|
|
|
|
expect(client2DirectListener).toHaveBeenCalledTimes(1);
|
|
expect((client2DirectListener.mock.calls[0][0] as TestEvent).detail).toEqual(directEnvelope2);
|
|
expect(client1DirectListener).toHaveBeenCalledTimes(1);
|
|
});
|
|
});
|
|
|
|
describe("createPubSub with WS client event target", () => {
|
|
it("subscribe and publish through PubSub wired to WS client", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs, clientSideWs } = createPipe();
|
|
|
|
server.addConnection(serverSideWs as any);
|
|
const clientET = createWebSocketClientEventTarget<TestEvent>(clientSideWs as any);
|
|
|
|
type EventMap = { "chat": string };
|
|
const pubsub = createPubSub<EventMap>({ eventTarget: clientET });
|
|
|
|
const received: EventEnvelope<"chat", string>[] = [];
|
|
|
|
const iterator = pubsub.subscribe("chat", "room1");
|
|
const consume = (async () => {
|
|
for await (const envelope of iterator) {
|
|
received.push(envelope);
|
|
if (received.length >= 1) break;
|
|
}
|
|
})();
|
|
|
|
server.dispatchEvent(
|
|
new CustomEvent("chat:room1", {
|
|
detail: { type: "chat", id: "room1", payload: "hello from server" },
|
|
}) as TestEvent,
|
|
);
|
|
|
|
return consume.then(() => {
|
|
expect(received).toHaveLength(1);
|
|
expect(received[0]).toEqual({
|
|
type: "chat",
|
|
id: "room1",
|
|
payload: "hello from server",
|
|
});
|
|
});
|
|
});
|
|
|
|
it("publish through PubSub on client dispatches event that reaches server local listener", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs, clientSideWs } = createPipe();
|
|
|
|
server.addConnection(serverSideWs as any);
|
|
const clientET = createWebSocketClientEventTarget<TestEvent>(clientSideWs as any);
|
|
|
|
type EventMap = { "chat": string };
|
|
const pubsub = createPubSub<EventMap>({ eventTarget: clientET });
|
|
|
|
const serverListener = vi.fn();
|
|
server.addEventListener("chat:room1", serverListener);
|
|
|
|
pubsub.publish("chat", "room1", "from client pubsub");
|
|
|
|
expect(serverListener).toHaveBeenCalledTimes(1);
|
|
expect((serverListener.mock.calls[0][0] as TestEvent).detail).toEqual({
|
|
type: "chat",
|
|
id: "room1",
|
|
payload: "from client pubsub",
|
|
});
|
|
});
|
|
|
|
it("multiple PubSub subscribers on same topic all receive events from server", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const clientET1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const clientET2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
type EventMap = { "chat": string };
|
|
const pubsub1 = createPubSub<EventMap>({ eventTarget: clientET1 });
|
|
const pubsub2 = createPubSub<EventMap>({ eventTarget: clientET2 });
|
|
|
|
const received1: EventEnvelope<"chat", string>[] = [];
|
|
const received2: EventEnvelope<"chat", string>[] = [];
|
|
|
|
const iterator1 = pubsub1.subscribe("chat", "room1");
|
|
const consume1 = (async () => {
|
|
for await (const envelope of iterator1) {
|
|
received1.push(envelope);
|
|
if (received1.length >= 1) break;
|
|
}
|
|
})();
|
|
|
|
const iterator2 = pubsub2.subscribe("chat", "room1");
|
|
const consume2 = (async () => {
|
|
for await (const envelope of iterator2) {
|
|
received2.push(envelope);
|
|
if (received2.length >= 1) break;
|
|
}
|
|
})();
|
|
|
|
server.dispatchEvent(
|
|
new CustomEvent("chat:room1", {
|
|
detail: { type: "chat", id: "room1", payload: "broadcast" },
|
|
}) as TestEvent,
|
|
);
|
|
|
|
return Promise.all([consume1, consume2]).then(() => {
|
|
expect(received1).toHaveLength(1);
|
|
expect(received2).toHaveLength(1);
|
|
expect(received1[0].payload).toBe("broadcast");
|
|
expect(received2[0].payload).toBe("broadcast");
|
|
});
|
|
});
|
|
});
|
|
|
|
describe("createPubSub with WS server event target", () => {
|
|
it("subscribe and publish through PubSub wired to WS server", async () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs, clientSideWs } = createPipe();
|
|
|
|
server.addConnection(serverSideWs as any);
|
|
const client = createWebSocketClientEventTarget<TestEvent>(clientSideWs as any);
|
|
|
|
type EventMap = { "chat": string };
|
|
const pubsub = createPubSub<EventMap>({ eventTarget: server });
|
|
|
|
const clientListener = vi.fn();
|
|
client.addEventListener("chat:room1", clientListener);
|
|
|
|
const received: EventEnvelope<"chat", string>[] = [];
|
|
const iterator = pubsub.subscribe("chat", "room1");
|
|
const consume = (async () => {
|
|
for await (const envelope of iterator) {
|
|
received.push(envelope);
|
|
if (received.length >= 1) break;
|
|
}
|
|
})();
|
|
|
|
pubsub.publish("chat", "room1", "hello from server pubsub");
|
|
|
|
await consume;
|
|
|
|
expect(received).toHaveLength(1);
|
|
expect(received[0]).toEqual({
|
|
type: "chat",
|
|
id: "room1",
|
|
payload: "hello from server pubsub",
|
|
});
|
|
|
|
expect(clientListener).toHaveBeenCalledTimes(1);
|
|
expect((clientListener.mock.calls[0][0] as TestEvent).detail).toEqual({
|
|
type: "chat",
|
|
id: "room1",
|
|
payload: "hello from server pubsub",
|
|
});
|
|
});
|
|
|
|
it("server PubSub publish fans out to subscribed WS clients", async () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
type EventMap = { "chat": string };
|
|
const pubsub = createPubSub<EventMap>({ eventTarget: server });
|
|
|
|
const listener1 = vi.fn();
|
|
const listener2 = vi.fn();
|
|
client1.addEventListener("chat:room1", listener1);
|
|
client2.addEventListener("chat:room1", listener2);
|
|
|
|
const received: EventEnvelope<"chat", string>[] = [];
|
|
const iterator = pubsub.subscribe("chat", "room1");
|
|
const consume = (async () => {
|
|
for await (const envelope of iterator) {
|
|
received.push(envelope);
|
|
if (received.length >= 1) break;
|
|
}
|
|
})();
|
|
|
|
pubsub.publish("chat", "room1", "fan-out");
|
|
|
|
await consume;
|
|
|
|
expect(received).toHaveLength(1);
|
|
expect(received[0].payload).toBe("fan-out");
|
|
|
|
expect(listener1).toHaveBeenCalledTimes(1);
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
});
|
|
|
|
it("server PubSub publish does not reach unsubscribed clients", async () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
type EventMap = { "chat": string };
|
|
const pubsub = createPubSub<EventMap>({ eventTarget: server });
|
|
|
|
const subscribedListener = vi.fn();
|
|
const unsubscribedListener = vi.fn();
|
|
client1.addEventListener("chat:room1", subscribedListener);
|
|
client2.addEventListener("chat:room2", unsubscribedListener);
|
|
|
|
const received: EventEnvelope<"chat", string>[] = [];
|
|
const iterator = pubsub.subscribe("chat", "room1");
|
|
const consume = (async () => {
|
|
for await (const envelope of iterator) {
|
|
received.push(envelope);
|
|
if (received.length >= 1) break;
|
|
}
|
|
})();
|
|
|
|
pubsub.publish("chat", "room1", "directed");
|
|
|
|
await consume;
|
|
|
|
expect(subscribedListener).toHaveBeenCalledTimes(1);
|
|
expect(unsubscribedListener).not.toHaveBeenCalled();
|
|
});
|
|
});
|
|
|
|
describe("end-to-end event envelope round-trip", () => {
|
|
it("preserves full EventEnvelope through client dispatch → server fan-out → client receive", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
const listener2 = vi.fn();
|
|
client2.addEventListener("chat:room1", listener2);
|
|
|
|
const envelope: EventEnvelope<"chat", { msg: string; ts: number }> = {
|
|
type: "chat",
|
|
id: "room1",
|
|
payload: { msg: "hello world", ts: 1234567890 },
|
|
};
|
|
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
server.dispatchEvent(event);
|
|
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
const receivedEvent = listener2.mock.calls[0][0] as TestEvent;
|
|
expect(receivedEvent.detail).toEqual(envelope);
|
|
expect(receivedEvent.type).toBe("chat:room1");
|
|
});
|
|
|
|
it("preserves envelope when client publishes and server re-dispatches to subscribed clients", () => {
|
|
const onConnection = vi.fn();
|
|
const server = createWebSocketServerEventTarget<TestEvent>({ onConnection });
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const spoke1 = onConnection.mock.calls[0][0] as SpokeEventTarget<TestEvent>;
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
const listener2 = vi.fn();
|
|
client2.addEventListener("chat:room1", listener2);
|
|
|
|
const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "from client1" };
|
|
|
|
spoke1.addEventListener("chat:room1", (event: Event) => {
|
|
server.dispatchEvent(event as TestEvent);
|
|
});
|
|
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
client1.dispatchEvent(event);
|
|
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
const received = (listener2.mock.calls[0][0] as TestEvent).detail as EventEnvelope;
|
|
expect(received).toEqual(envelope);
|
|
});
|
|
});
|
|
|
|
describe("connection lifecycle", () => {
|
|
it("disconnecting a client removes it from server subscription map", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs, clientSideWs } = createPipe();
|
|
|
|
server.addConnection(serverSideWs as any);
|
|
const client = createWebSocketClientEventTarget<TestEvent>(clientSideWs as any);
|
|
|
|
const listener = vi.fn();
|
|
client.addEventListener("chat:room1", listener);
|
|
|
|
serverSideWs.simulateClose();
|
|
|
|
const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "after close" };
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
server.dispatchEvent(event);
|
|
|
|
expect(listener).not.toHaveBeenCalled();
|
|
});
|
|
|
|
it("one client disconnecting does not affect other clients", () => {
|
|
const server = createWebSocketServerEventTarget<TestEvent>();
|
|
const { serverSideWs: s1, clientSideWs: c1 } = createPipe();
|
|
const { serverSideWs: s2, clientSideWs: c2 } = createPipe();
|
|
|
|
server.addConnection(s1 as any);
|
|
server.addConnection(s2 as any);
|
|
|
|
const client1 = createWebSocketClientEventTarget<TestEvent>(c1 as any);
|
|
const client2 = createWebSocketClientEventTarget<TestEvent>(c2 as any);
|
|
|
|
const listener1 = vi.fn();
|
|
const listener2 = vi.fn();
|
|
client1.addEventListener("chat:room1", listener1);
|
|
client2.addEventListener("chat:room1", listener2);
|
|
|
|
s1.simulateClose();
|
|
|
|
const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "after close" };
|
|
const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent;
|
|
server.dispatchEvent(event);
|
|
|
|
expect(listener1).not.toHaveBeenCalled();
|
|
expect(listener2).toHaveBeenCalledTimes(1);
|
|
expect((listener2.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
|
|
});
|
|
});
|
|
}); |