import { describe, it, expect, vi, beforeEach } from "vitest"; import { createWebSocketClientEventTarget } from "../src/event-target-websocket-client.js"; import { createWebSocketServerEventTarget } from "../src/event-target-websocket-server.js"; import type { SpokeEventTarget } from "../src/event-target-websocket-server.js"; import { createPubSub } from "../src/create_pubsub.js"; import type { EventEnvelope, TypedEvent } from "../src/types.js"; type TestEvent = TypedEvent; function createPipe() { let serverOnmessage: ((ev: { data: string }) => void) | null = null; let clientOnmessage: ((ev: { data: string }) => void) | null = null; let serverOnclose: ((ev: { code: number; reason?: string }) => void) | null = null; const serverSideWs = { bufferedAmount: 0, sent: [] as string[], send: vi.fn((data: string) => { clientOnmessage?.({ data }); }) as any, close: vi.fn() as any, get onmessage() { return serverOnmessage; }, set onmessage(handler: ((ev: { data: string }) => void) | null) { serverOnmessage = handler; }, get onclose() { return serverOnclose; }, set onclose(handler: ((ev: { code: number; reason?: string }) => void) | null) { serverOnclose = handler; }, simulateMessage(data: string) { if (serverOnmessage) { serverOnmessage({ data }); } }, simulateClose(code = 1000, reason = "") { if (serverOnclose) { serverOnclose({ code, reason }); } }, }; Object.defineProperty(serverSideWs, "onmessage", { get() { return serverOnmessage; }, set(handler: ((ev: { data: string }) => void) | null) { serverOnmessage = handler; }, }); Object.defineProperty(serverSideWs, "onclose", { get() { return serverOnclose; }, set(handler: ((ev: { code: number; reason?: string }) => void) | null) { serverOnclose = handler; }, }); let clientOnclose: ((ev: { code: number; reason?: string }) => void) | null = null; const clientSideWs = { sent: [] as string[], send: vi.fn((data: string) => { serverOnmessage?.({ data }); }) as any, get onmessage() { return clientOnmessage; }, set onmessage(handler: ((ev: { data: string }) => void) | null) { clientOnmessage = handler; }, get onclose() { return clientOnclose; }, set onclose(handler: ((ev: { code: number; reason?: string }) => void) | null) { clientOnclose = handler; }, simulateMessage(data: string) { if (clientOnmessage) { clientOnmessage({ data }); } }, simulateClose(code = 1000, reason = "") { if (clientOnclose) { clientOnclose({ code, reason }); } }, }; Object.defineProperty(clientSideWs, "onmessage", { get() { return clientOnmessage; }, set(handler: ((ev: { data: string }) => void) | null) { clientOnmessage = handler; }, }); Object.defineProperty(clientSideWs, "onclose", { get() { return clientOnclose; }, set(handler: ((ev: { code: number; reason?: string }) => void) | null) { clientOnclose = handler; }, }); return { serverSideWs, clientSideWs }; } describe("WebSocket Client-Server Integration", () => { describe("bidirectional communication", () => { it("client sends event to server via dispatchEvent, server fan-out delivers to subscribed clients", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); const client1Listener = vi.fn(); client1.addEventListener("chat:room1", client1Listener); const client2Listener = vi.fn(); client2.addEventListener("chat:room1", client2Listener); const serverListener = vi.fn(); server.addEventListener("chat:room1", serverListener); const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; server.dispatchEvent(event); expect(client1Listener).toHaveBeenCalledTimes(1); expect(client2Listener).toHaveBeenCalledTimes(1); expect(serverListener).toHaveBeenCalledTimes(1); expect((client1Listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); expect((client2Listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); expect(serverListener.mock.calls[0][0].detail).toEqual(envelope); }); it("client dispatches event, server local listener receives it, and it fans out to other subscribed clients", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); client2.addEventListener("chat:room1", vi.fn()); const serverListener = vi.fn(); server.addEventListener("chat:room1", serverListener); const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "from client1" }; const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; client1.dispatchEvent(event); expect(serverListener).toHaveBeenCalledTimes(1); expect((serverListener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); }); }); describe("subscription control protocol end-to-end", () => { it("client addEventListener sends __subscribe, server tracks subscription, server dispatchEvent sends to subscribed client, client removeEventListener sends __unsubscribe, server removes from subscription map", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs, clientSideWs } = createPipe(); server.addConnection(serverSideWs as any); const client = createWebSocketClientEventTarget(clientSideWs as any); const listener = vi.fn(); client.addEventListener("chat:room1", listener); const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; server.dispatchEvent(event); expect(listener).toHaveBeenCalledTimes(1); expect((listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); client.removeEventListener("chat:room1", listener); const event2 = new CustomEvent("chat:room1", { detail: { type: "chat", id: "room1", payload: "world" }, }) as TestEvent; server.dispatchEvent(event2); expect(listener).toHaveBeenCalledTimes(1); }); it("__subscribe is only sent once for multiple listeners on the same topic", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs, clientSideWs } = createPipe(); server.addConnection(serverSideWs as any); const client = createWebSocketClientEventTarget(clientSideWs as any); const listener1 = vi.fn(); const listener2 = vi.fn(); client.addEventListener("chat:room1", listener1); client.addEventListener("chat:room1", listener2); const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; server.dispatchEvent(event); expect(listener1).toHaveBeenCalledTimes(1); expect(listener2).toHaveBeenCalledTimes(1); }); it("__unsubscribe is sent only when the last listener is removed", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs, clientSideWs } = createPipe(); server.addConnection(serverSideWs as any); const client = createWebSocketClientEventTarget(clientSideWs as any); const listener1 = vi.fn(); const listener2 = vi.fn(); client.addEventListener("chat:room1", listener1); client.addEventListener("chat:room1", listener2); client.removeEventListener("chat:room1", listener1); const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "still here" }; const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; server.dispatchEvent(event); expect(listener2).toHaveBeenCalledTimes(1); client.removeEventListener("chat:room1", listener2); const event2 = new CustomEvent("chat:room1", { detail: { type: "chat", id: "room1", payload: "gone" }, }) as TestEvent; server.dispatchEvent(event2); expect(listener2).toHaveBeenCalledTimes(1); }); }); describe("topic-based fan-out", () => { it("subscribed clients receive events on their topic, unsubscribed clients do NOT", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); const room1Listener = vi.fn(); client1.addEventListener("chat:room1", room1Listener); const room2Listener = vi.fn(); client2.addEventListener("chat:room2", room2Listener); const envelope1: EventEnvelope = { type: "chat", id: "room1", payload: "hello room1" }; const event1 = new CustomEvent("chat:room1", { detail: envelope1 }) as TestEvent; server.dispatchEvent(event1); expect(room1Listener).toHaveBeenCalledTimes(1); expect(room2Listener).not.toHaveBeenCalled(); const envelope2: EventEnvelope = { type: "chat", id: "room2", payload: "hello room2" }; const event2 = new CustomEvent("chat:room2", { detail: envelope2 }) as TestEvent; server.dispatchEvent(event2); expect(room1Listener).toHaveBeenCalledTimes(1); expect(room2Listener).toHaveBeenCalledTimes(1); }); it("multiple clients subscribed to the same topic all receive events", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); const { serverSideWs: s3, clientSideWs: c3 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); server.addConnection(s3 as any); const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); const client3 = createWebSocketClientEventTarget(c3 as any); const listener1 = vi.fn(); const listener2 = vi.fn(); const listener3 = vi.fn(); client1.addEventListener("chat:room1", listener1); client2.addEventListener("chat:room1", listener2); client3.addEventListener("chat:room2", listener3); const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "broadcast" }; const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; server.dispatchEvent(event); expect(listener1).toHaveBeenCalledTimes(1); expect(listener2).toHaveBeenCalledTimes(1); expect(listener3).not.toHaveBeenCalled(); }); it("client that unsubscribes stops receiving events", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); const listener1 = vi.fn(); const listener2 = vi.fn(); client1.addEventListener("chat:room1", listener1); client2.addEventListener("chat:room1", listener2); const envelope1: EventEnvelope = { type: "chat", id: "room1", payload: "first" }; const event1 = new CustomEvent("chat:room1", { detail: envelope1 }) as TestEvent; server.dispatchEvent(event1); expect(listener1).toHaveBeenCalledTimes(1); expect(listener2).toHaveBeenCalledTimes(1); client2.removeEventListener("chat:room1", listener2); const envelope2: EventEnvelope = { type: "chat", id: "room1", payload: "second" }; const event2 = new CustomEvent("chat:room1", { detail: envelope2 }) as TestEvent; server.dispatchEvent(event2); expect(listener1).toHaveBeenCalledTimes(2); expect(listener2).toHaveBeenCalledTimes(1); }); }); describe("direct messaging via topic scoping", () => { it("server dispatches to direct:${spokeId}, only that specific spoke receives it", () => { const onConnection = vi.fn(); const server = createWebSocketServerEventTarget({ onConnection }); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const spoke1 = onConnection.mock.calls[0][0] as SpokeEventTarget; const spoke2 = onConnection.mock.calls[1][0] as SpokeEventTarget; const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); const client1DirectListener = vi.fn(); const client2DirectListener = vi.fn(); client1.addEventListener("direct:spoke1", client1DirectListener); client2.addEventListener("direct:spoke2", client2DirectListener); const directEnvelope: EventEnvelope = { type: "direct", id: "spoke1", payload: "secret for spoke1" }; const directEvent = new CustomEvent("direct:spoke1", { detail: directEnvelope }) as TestEvent; spoke1.dispatchEvent(directEvent); expect(client1DirectListener).toHaveBeenCalledTimes(1); expect((client1DirectListener.mock.calls[0][0] as TestEvent).detail).toEqual(directEnvelope); expect(client2DirectListener).not.toHaveBeenCalled(); const directEnvelope2: EventEnvelope = { type: "direct", id: "spoke2", payload: "secret for spoke2" }; const directEvent2 = new CustomEvent("direct:spoke2", { detail: directEnvelope2 }) as TestEvent; spoke2.dispatchEvent(directEvent2); expect(client2DirectListener).toHaveBeenCalledTimes(1); expect((client2DirectListener.mock.calls[0][0] as TestEvent).detail).toEqual(directEnvelope2); expect(client1DirectListener).toHaveBeenCalledTimes(1); }); }); describe("createPubSub with WS client event target", () => { it("subscribe and publish through PubSub wired to WS client", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs, clientSideWs } = createPipe(); server.addConnection(serverSideWs as any); const clientET = createWebSocketClientEventTarget(clientSideWs as any); type EventMap = { "chat": string }; const pubsub = createPubSub({ eventTarget: clientET }); const received: EventEnvelope<"chat", string>[] = []; const iterator = pubsub.subscribe("chat", "room1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); server.dispatchEvent( new CustomEvent("chat:room1", { detail: { type: "chat", id: "room1", payload: "hello from server" }, }) as TestEvent, ); return consume.then(() => { expect(received).toHaveLength(1); expect(received[0]).toEqual({ type: "chat", id: "room1", payload: "hello from server", }); }); }); it("publish through PubSub on client dispatches event that reaches server local listener", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs, clientSideWs } = createPipe(); server.addConnection(serverSideWs as any); const clientET = createWebSocketClientEventTarget(clientSideWs as any); type EventMap = { "chat": string }; const pubsub = createPubSub({ eventTarget: clientET }); const serverListener = vi.fn(); server.addEventListener("chat:room1", serverListener); pubsub.publish("chat", "room1", "from client pubsub"); expect(serverListener).toHaveBeenCalledTimes(1); expect((serverListener.mock.calls[0][0] as TestEvent).detail).toEqual({ type: "chat", id: "room1", payload: "from client pubsub", }); }); it("multiple PubSub subscribers on same topic all receive events from server", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const clientET1 = createWebSocketClientEventTarget(c1 as any); const clientET2 = createWebSocketClientEventTarget(c2 as any); type EventMap = { "chat": string }; const pubsub1 = createPubSub({ eventTarget: clientET1 }); const pubsub2 = createPubSub({ eventTarget: clientET2 }); const received1: EventEnvelope<"chat", string>[] = []; const received2: EventEnvelope<"chat", string>[] = []; const iterator1 = pubsub1.subscribe("chat", "room1"); const consume1 = (async () => { for await (const envelope of iterator1) { received1.push(envelope); if (received1.length >= 1) break; } })(); const iterator2 = pubsub2.subscribe("chat", "room1"); const consume2 = (async () => { for await (const envelope of iterator2) { received2.push(envelope); if (received2.length >= 1) break; } })(); server.dispatchEvent( new CustomEvent("chat:room1", { detail: { type: "chat", id: "room1", payload: "broadcast" }, }) as TestEvent, ); return Promise.all([consume1, consume2]).then(() => { expect(received1).toHaveLength(1); expect(received2).toHaveLength(1); expect(received1[0].payload).toBe("broadcast"); expect(received2[0].payload).toBe("broadcast"); }); }); }); describe("createPubSub with WS server event target", () => { it("subscribe and publish through PubSub wired to WS server", async () => { const server = createWebSocketServerEventTarget(); const { serverSideWs, clientSideWs } = createPipe(); server.addConnection(serverSideWs as any); const client = createWebSocketClientEventTarget(clientSideWs as any); type EventMap = { "chat": string }; const pubsub = createPubSub({ eventTarget: server }); const clientListener = vi.fn(); client.addEventListener("chat:room1", clientListener); const received: EventEnvelope<"chat", string>[] = []; const iterator = pubsub.subscribe("chat", "room1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("chat", "room1", "hello from server pubsub"); await consume; expect(received).toHaveLength(1); expect(received[0]).toEqual({ type: "chat", id: "room1", payload: "hello from server pubsub", }); expect(clientListener).toHaveBeenCalledTimes(1); expect((clientListener.mock.calls[0][0] as TestEvent).detail).toEqual({ type: "chat", id: "room1", payload: "hello from server pubsub", }); }); it("server PubSub publish fans out to subscribed WS clients", async () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); type EventMap = { "chat": string }; const pubsub = createPubSub({ eventTarget: server }); const listener1 = vi.fn(); const listener2 = vi.fn(); client1.addEventListener("chat:room1", listener1); client2.addEventListener("chat:room1", listener2); const received: EventEnvelope<"chat", string>[] = []; const iterator = pubsub.subscribe("chat", "room1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("chat", "room1", "fan-out"); await consume; expect(received).toHaveLength(1); expect(received[0].payload).toBe("fan-out"); expect(listener1).toHaveBeenCalledTimes(1); expect(listener2).toHaveBeenCalledTimes(1); }); it("server PubSub publish does not reach unsubscribed clients", async () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); type EventMap = { "chat": string }; const pubsub = createPubSub({ eventTarget: server }); const subscribedListener = vi.fn(); const unsubscribedListener = vi.fn(); client1.addEventListener("chat:room1", subscribedListener); client2.addEventListener("chat:room2", unsubscribedListener); const received: EventEnvelope<"chat", string>[] = []; const iterator = pubsub.subscribe("chat", "room1"); const consume = (async () => { for await (const envelope of iterator) { received.push(envelope); if (received.length >= 1) break; } })(); pubsub.publish("chat", "room1", "directed"); await consume; expect(subscribedListener).toHaveBeenCalledTimes(1); expect(unsubscribedListener).not.toHaveBeenCalled(); }); }); describe("end-to-end event envelope round-trip", () => { it("preserves full EventEnvelope through client dispatch → server fan-out → client receive", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); const listener2 = vi.fn(); client2.addEventListener("chat:room1", listener2); const envelope: EventEnvelope<"chat", { msg: string; ts: number }> = { type: "chat", id: "room1", payload: { msg: "hello world", ts: 1234567890 }, }; const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; server.dispatchEvent(event); expect(listener2).toHaveBeenCalledTimes(1); const receivedEvent = listener2.mock.calls[0][0] as TestEvent; expect(receivedEvent.detail).toEqual(envelope); expect(receivedEvent.type).toBe("chat:room1"); }); it("preserves envelope when client publishes and server re-dispatches to subscribed clients", () => { const onConnection = vi.fn(); const server = createWebSocketServerEventTarget({ onConnection }); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const spoke1 = onConnection.mock.calls[0][0] as SpokeEventTarget; const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); const listener2 = vi.fn(); client2.addEventListener("chat:room1", listener2); const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "from client1" }; spoke1.addEventListener("chat:room1", (event: Event) => { server.dispatchEvent(event as TestEvent); }); const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; client1.dispatchEvent(event); expect(listener2).toHaveBeenCalledTimes(1); const received = (listener2.mock.calls[0][0] as TestEvent).detail as EventEnvelope; expect(received).toEqual(envelope); }); }); describe("connection lifecycle", () => { it("disconnecting a client removes it from server subscription map", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs, clientSideWs } = createPipe(); server.addConnection(serverSideWs as any); const client = createWebSocketClientEventTarget(clientSideWs as any); const listener = vi.fn(); client.addEventListener("chat:room1", listener); serverSideWs.simulateClose(); const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "after close" }; const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; server.dispatchEvent(event); expect(listener).not.toHaveBeenCalled(); }); it("one client disconnecting does not affect other clients", () => { const server = createWebSocketServerEventTarget(); const { serverSideWs: s1, clientSideWs: c1 } = createPipe(); const { serverSideWs: s2, clientSideWs: c2 } = createPipe(); server.addConnection(s1 as any); server.addConnection(s2 as any); const client1 = createWebSocketClientEventTarget(c1 as any); const client2 = createWebSocketClientEventTarget(c2 as any); const listener1 = vi.fn(); const listener2 = vi.fn(); client1.addEventListener("chat:room1", listener1); client2.addEventListener("chat:room1", listener2); s1.simulateClose(); const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "after close" }; const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; server.dispatchEvent(event); expect(listener1).not.toHaveBeenCalled(); expect(listener2).toHaveBeenCalledTimes(1); expect((listener2.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); }); }); });