diff --git a/package.json b/package.json index 6eeb14a..479642f 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,16 @@ "types": "./dist/event-target-websocket-client.d.cts", "default": "./dist/event-target-websocket-client.cjs" } + }, + "./event-target-websocket-server": { + "import": { + "types": "./dist/event-target-websocket-server.d.ts", + "default": "./dist/event-target-websocket-server.js" + }, + "require": { + "types": "./dist/event-target-websocket-server.d.cts", + "default": "./dist/event-target-websocket-server.cjs" + } } }, "publishConfig": { diff --git a/src/event-target-websocket-server.ts b/src/event-target-websocket-server.ts new file mode 100644 index 0000000..bcda0d3 --- /dev/null +++ b/src/event-target-websocket-server.ts @@ -0,0 +1,282 @@ +import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js"; + +export interface WebSocketLike { + send(data: string): void; + close(code?: number, reason?: string): void; + bufferedAmount: number; + onmessage: ((ev: { data: string }) => void) | null; + onclose: ((ev: { code: number; reason?: string }) => void) | null; +} + +export interface SpokeEventTarget extends TypedEventTarget { + readonly ws: WebSocketLike; +} + +export interface CreateWebSocketServerEventTargetArgs { + onConnection?: (spoke: SpokeEventTarget, ws: WebSocketLike) => void; + onDisconnection?: (spoke: SpokeEventTarget, ws: WebSocketLike) => void; + maxBufferedAmount?: number; + onBackpressure?: (ws: WebSocketLike, bufferedAmount: number) => void; +} + +export interface WebSocketServerEventTarget extends TypedEventTarget { + addConnection(ws: WebSocketLike): void; + removeConnection(ws: WebSocketLike): void; +} + +export function createWebSocketServerEventTarget( + args?: CreateWebSocketServerEventTargetArgs, +): WebSocketServerEventTarget { + const maxBufferedAmount = args?.maxBufferedAmount ?? 1_048_576; + const onConnection = args?.onConnection; + const onDisconnection = args?.onDisconnection; + const onBackpressure = args?.onBackpressure; + + const subscriptions = new Map>(); + const connectionSubscriptions = new Map>(); + + const connectionListeners = new Map>>(); + + const localListeners = new Map>(); + + const spokeTargets = new Map>(); + + const originalOnmessage = new Map void) | null>(); + const originalOnclose = new Map void) | null>(); + + function createSpokeTarget(ws: WebSocketLike): SpokeEventTarget { + const listeners = new Map>(); + connectionListeners.set(ws, listeners); + + const target: SpokeEventTarget = { + get ws() { + return ws; + }, + addEventListener(type, callbackOrOptions: EventListenerOrEventListenerObject) { + if (callbackOrOptions == null) return; + const callback = + "handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions; + let set = listeners.get(type); + if (set === undefined) { + set = new Set(); + listeners.set(type, set); + } + set.add(callback as EventListener); + }, + removeEventListener(type, callbackOrOptions: EventListenerOrEventListenerObject) { + if (callbackOrOptions == null) return; + const callback = + "handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions; + const set = listeners.get(type); + if (set === undefined) return; + set.delete(callback as EventListener); + if (set.size === 0) { + listeners.delete(type); + } + }, + dispatchEvent(event: TEvent): boolean { + const message = JSON.stringify(event.detail); + try { + if (ws.bufferedAmount > maxBufferedAmount) { + onBackpressure?.(ws, ws.bufferedAmount); + ws.close(1013, "Try Again Later"); + removeConnection(ws); + return true; + } + ws.send(message); + } catch { + removeConnection(ws); + } + return true; + }, + }; + + spokeTargets.set(ws, target); + return target; + } + + function removeConnection(ws: WebSocketLike) { + const topics = connectionSubscriptions.get(ws); + if (topics !== undefined) { + for (const topic of topics) { + const subscribers = subscriptions.get(topic); + if (subscribers !== undefined) { + subscribers.delete(ws); + if (subscribers.size === 0) { + subscriptions.delete(topic); + } + } + } + connectionSubscriptions.delete(ws); + } + + connectionListeners.delete(ws); + + const spoke = spokeTargets.get(ws); + spokeTargets.delete(ws); + + const prevOnmessage = originalOnmessage.get(ws) ?? null; + const prevOnclose = originalOnclose.get(ws) ?? null; + ws.onmessage = prevOnmessage; + ws.onclose = prevOnclose; + originalOnmessage.delete(ws); + originalOnclose.delete(ws); + + if (spoke !== undefined) { + onDisconnection?.(spoke, ws); + } + } + + function addConnection(ws: WebSocketLike) { + if (spokeTargets.has(ws)) return; + + originalOnmessage.set(ws, ws.onmessage); + originalOnclose.set(ws, ws.onclose); + + const spoke = createSpokeTarget(ws); + + connectionSubscriptions.set(ws, new Set()); + + const prevOnclose = originalOnclose.get(ws)!; + + ws.onmessage = (ev: { data: string }) => { + let envelope: EventEnvelope; + try { + envelope = JSON.parse(ev.data) as EventEnvelope; + } catch { + console.warn(`Failed to parse WebSocket message: ${ev.data}`); + return; + } + + if (typeof envelope.type !== "string") return; + + if (envelope.type === "__subscribe") { + const topic = (envelope.payload as Record)?.topic; + if (typeof topic === "string" && topic.length > 0) { + let subscribers = subscriptions.get(topic); + if (subscribers === undefined) { + subscribers = new Set(); + subscriptions.set(topic, subscribers); + } + subscribers.add(ws); + const topics = connectionSubscriptions.get(ws); + if (topics !== undefined) { + topics.add(topic); + } + } + return; + } + + if (envelope.type === "__unsubscribe") { + const topic = (envelope.payload as Record)?.topic; + if (typeof topic === "string" && topic.length > 0) { + const subscribers = subscriptions.get(topic); + if (subscribers !== undefined) { + subscribers.delete(ws); + if (subscribers.size === 0) { + subscriptions.delete(topic); + } + } + const topics = connectionSubscriptions.get(ws); + if (topics !== undefined) { + topics.delete(topic); + } + } + return; + } + + const topic = `${envelope.type}:${envelope.id}`; + const customEvent = new CustomEvent(topic, { detail: envelope }) as TEvent; + + const spokeListeners = connectionListeners.get(ws); + if (spokeListeners !== undefined) { + const cbs = spokeListeners.get(topic); + if (cbs !== undefined) { + for (const cb of cbs) { + cb(customEvent); + } + } + } + + const localCbs = localListeners.get(topic); + if (localCbs !== undefined) { + for (const cb of localCbs) { + cb(customEvent); + } + } + }; + + ws.onclose = (ev: { code: number; reason?: string }) => { + removeConnection(ws); + if (prevOnclose !== null) { + prevOnclose(ev); + } + }; + + onConnection?.(spoke, ws); + } + + function sendToConnection(ws: WebSocketLike, message: string) { + try { + if (ws.bufferedAmount > maxBufferedAmount) { + onBackpressure?.(ws, ws.bufferedAmount); + ws.close(1013, "Try Again Later"); + removeConnection(ws); + return; + } + ws.send(message); + } catch { + removeConnection(ws); + } + } + + const serverTarget: WebSocketServerEventTarget = { + addConnection(ws: WebSocketLike) { + addConnection(ws); + }, + removeConnection(ws: WebSocketLike) { + removeConnection(ws); + }, + addEventListener(type, callbackOrOptions: EventListenerOrEventListenerObject) { + if (callbackOrOptions == null) return; + const callback = + "handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions; + let set = localListeners.get(type); + if (set === undefined) { + set = new Set(); + localListeners.set(type, set); + } + set.add(callback as EventListener); + }, + removeEventListener(type, callbackOrOptions: EventListenerOrEventListenerObject) { + if (callbackOrOptions == null) return; + const callback = + "handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions; + const set = localListeners.get(type); + if (set === undefined) return; + set.delete(callback as EventListener); + if (set.size === 0) { + localListeners.delete(type); + } + }, + dispatchEvent(event: TEvent): boolean { + const message = JSON.stringify(event.detail); + const subscribers = subscriptions.get(event.type); + if (subscribers !== undefined) { + for (const ws of subscribers) { + sendToConnection(ws, message); + } + } + + const localCbs = localListeners.get(event.type); + if (localCbs !== undefined) { + for (const cb of localCbs) { + cb(event); + } + } + return true; + }, + }; + + return serverTarget; +} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 19abaf9..41da041 100644 --- a/src/index.ts +++ b/src/index.ts @@ -3,4 +3,5 @@ export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedE export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js"; export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js"; export { createRedisEventTarget, type CreateRedisEventTargetArgs } from "./event-target-redis.js"; -export { createWebSocketClientEventTarget } from "./event-target-websocket-client.js"; \ No newline at end of file +export { createWebSocketClientEventTarget } from "./event-target-websocket-client.js"; +export { createWebSocketServerEventTarget, type WebSocketLike, type SpokeEventTarget, type CreateWebSocketServerEventTargetArgs, type WebSocketServerEventTarget } from "./event-target-websocket-server.js"; \ No newline at end of file diff --git a/test/event-target-websocket-server.test.ts b/test/event-target-websocket-server.test.ts new file mode 100644 index 0000000..59ce9da --- /dev/null +++ b/test/event-target-websocket-server.test.ts @@ -0,0 +1,822 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { createWebSocketServerEventTarget } from "../src/event-target-websocket-server.js"; +import type { WebSocketLike, SpokeEventTarget } from "../src/event-target-websocket-server.js"; +import type { EventEnvelope, TypedEvent } from "../src/types.js"; + +type TestEvent = TypedEvent; + +function createMockWebSocket(): WebSocketLike & { + sent: string[]; + simulateMessage: (data: string) => void; + simulateClose: (code?: number, reason?: string) => void; +} { + let onmessageHandler: ((ev: { data: string }) => void) | null = null; + let oncloseHandler: ((ev: { code: number; reason?: string }) => void) | null = null; + + const ws = { + bufferedAmount: 0, + sent: [] as string[], + send: vi.fn((data: string) => { + ws.sent.push(data); + }) as any, + close: vi.fn() as any, + get onmessage() { + return onmessageHandler; + }, + set onmessage(handler: ((ev: { data: string }) => void) | null) { + onmessageHandler = handler; + }, + get onclose() { + return oncloseHandler; + }, + set onclose(handler: ((ev: { code: number; reason?: string }) => void) | null) { + oncloseHandler = handler; + }, + simulateMessage(data: string) { + if (onmessageHandler) { + onmessageHandler({ data }); + } + }, + simulateClose(code: number = 1000, reason?: string) { + if (oncloseHandler) { + oncloseHandler({ code, reason }); + } + }, + }; + + Object.defineProperty(ws, "onmessage", { + get() { + return onmessageHandler; + }, + set(handler: ((ev: { data: string }) => void) | null) { + onmessageHandler = handler; + }, + }); + + Object.defineProperty(ws, "onclose", { + get() { + return oncloseHandler; + }, + set(handler: ((ev: { code: number; reason?: string }) => void) | null) { + oncloseHandler = handler; + }, + }); + + return ws; +} + +describe("createWebSocketServerEventTarget", () => { + describe("addConnection", () => { + it("sets up onmessage and onclose handlers on the WebSocket", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + expect(ws.onmessage).toBeNull(); + expect(ws.onclose).toBeNull(); + + server.addConnection(ws as any); + + expect(ws.onmessage).not.toBeNull(); + expect(ws.onclose).not.toBeNull(); + }); + + it("calls onConnection callback when a new connection is added", () => { + const onConnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onConnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + + expect(onConnection).toHaveBeenCalledTimes(1); + expect(onConnection).toHaveBeenCalledWith(expect.any(Object), ws); + const [spoke] = onConnection.mock.calls[0]; + expect(spoke).toHaveProperty("addEventListener"); + expect(spoke).toHaveProperty("removeEventListener"); + expect(spoke).toHaveProperty("dispatchEvent"); + expect(spoke).toHaveProperty("ws"); + }); + + it("does not add the same connection twice", () => { + const onConnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onConnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + server.addConnection(ws as any); + + expect(onConnection).toHaveBeenCalledTimes(1); + }); + + it("preserves original onclose handler", () => { + const originalOnclose = vi.fn(); + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + ws.onclose = originalOnclose; + + server.addConnection(ws as any); + ws.simulateClose(1000); + + expect(originalOnclose).toHaveBeenCalledTimes(1); + }); + }); + + describe("removeConnection", () => { + it("cleans up subscription maps when a connection is removed", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + server.removeConnection(ws as any); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws.send).not.toHaveBeenCalledWith(JSON.stringify(envelope)); + }); + + it("calls onDisconnection callback when a connection is removed", () => { + const onDisconnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onDisconnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + server.removeConnection(ws as any); + + expect(onDisconnection).toHaveBeenCalledTimes(1); + expect(onDisconnection).toHaveBeenCalledWith(expect.any(Object), ws); + }); + + it("does not close the WebSocket", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + server.removeConnection(ws as any); + + expect(ws.close).not.toHaveBeenCalled(); + }); + + it("restores original onmessage and onclose handlers", () => { + const originalOnmessage = vi.fn(); + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + ws.onmessage = originalOnmessage; + + server.addConnection(ws as any); + expect(ws.onmessage).not.toBe(originalOnmessage); + + server.removeConnection(ws as any); + expect(ws.onmessage).toBe(originalOnmessage); + }); + }); + + describe("automatic cleanup on close", () => { + it("calls removeConnection automatically when WebSocket closes", () => { + const onDisconnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onDisconnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateClose(1000); + + expect(onDisconnection).toHaveBeenCalledTimes(1); + }); + }); + + describe("subscription tracking (__subscribe / __unsubscribe)", () => { + it("adds connection to topic subscriber set on __subscribe", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws.send).toHaveBeenCalledWith(JSON.stringify(envelope)); + }); + + it("is idempotent for duplicate __subscribe", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws.send).toHaveBeenCalledTimes(1); + }); + + it("removes connection from topic subscriber set on __unsubscribe", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + ws.simulateMessage(JSON.stringify({ type: "__unsubscribe", id: "", payload: { topic: "chat:room1" } })); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws.send).not.toHaveBeenCalled(); + }); + + it("silently ignores invalid topic in __subscribe", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + server.addConnection(ws as any); + + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "" } })); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: {} })); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: 123 } })); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws.send).not.toHaveBeenCalled(); + warnSpy.mockRestore(); + }); + + it("does not dispatch __subscribe to local listeners", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + const listener = vi.fn(); + + server.addConnection(ws as any); + server.addEventListener("__subscribe" as any, listener); + + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("does not dispatch __unsubscribe to local listeners", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + const listener = vi.fn(); + + server.addConnection(ws as any); + server.addEventListener("__unsubscribe" as any, listener); + + ws.simulateMessage(JSON.stringify({ type: "__unsubscribe", id: "", payload: { topic: "chat:room1" } })); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("cleans up all subscriptions when a connection is removed", () => { + const server = createWebSocketServerEventTarget(); + const ws1 = createMockWebSocket(); + const ws2 = createMockWebSocket(); + + server.addConnection(ws1 as any); + server.addConnection(ws2 as any); + + ws1.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + ws2.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + server.removeConnection(ws1 as any); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws1.send).not.toHaveBeenCalledWith(JSON.stringify(envelope)); + expect(ws2.send).toHaveBeenCalledWith(JSON.stringify(envelope)); + }); + }); + + describe("topic-based fan-out", () => { + it("sends events only to connections subscribed to that topic", () => { + const server = createWebSocketServerEventTarget(); + const ws1 = createMockWebSocket(); + const ws2 = createMockWebSocket(); + + server.addConnection(ws1 as any); + server.addConnection(ws2 as any); + + ws1.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws1.send).toHaveBeenCalledWith(JSON.stringify(envelope)); + expect(ws2.send).not.toHaveBeenCalledWith(JSON.stringify(envelope)); + }); + + it("sends to multiple subscribed connections", () => { + const server = createWebSocketServerEventTarget(); + const ws1 = createMockWebSocket(); + const ws2 = createMockWebSocket(); + + server.addConnection(ws1 as any); + server.addConnection(ws2 as any); + + ws1.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + ws2.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws1.send).toHaveBeenCalledWith(JSON.stringify(envelope)); + expect(ws2.send).toHaveBeenCalledWith(JSON.stringify(envelope)); + }); + + it("routes events to different topics independently", () => { + const server = createWebSocketServerEventTarget(); + const ws1 = createMockWebSocket(); + const ws2 = createMockWebSocket(); + + server.addConnection(ws1 as any); + server.addConnection(ws2 as any); + + ws1.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + ws2.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room2" } })); + + const envelope1: EventEnvelope = { type: "chat", id: "room1", payload: "hello1" }; + const envelope2: EventEnvelope = { type: "chat", id: "room2", payload: "hello2" }; + + const event1 = new CustomEvent("chat:room1", { detail: envelope1 }) as TestEvent; + const event2 = new CustomEvent("chat:room2", { detail: envelope2 }) as TestEvent; + + server.dispatchEvent(event1); + + expect(ws1.send).toHaveBeenCalledWith(JSON.stringify(envelope1)); + expect(ws2.send).not.toHaveBeenCalledWith(JSON.stringify(envelope1)); + + server.dispatchEvent(event2); + + expect(ws2.send).toHaveBeenCalledWith(JSON.stringify(envelope2)); + expect(ws1.send).not.toHaveBeenCalledWith(JSON.stringify(envelope2)); + }); + + it("does not send to unsubscribed connections", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws.send).not.toHaveBeenCalled(); + }); + }); + + describe("dispatchEvent on server target", () => { + it("always returns true", () => { + const server = createWebSocketServerEventTarget(); + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + + expect(server.dispatchEvent(event)).toBe(true); + }); + + it("returns true even when there are no subscribers", () => { + const server = createWebSocketServerEventTarget(); + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + + expect(server.dispatchEvent(event)).toBe(true); + }); + + it("delivers to local listeners", () => { + const server = createWebSocketServerEventTarget(); + const listener = vi.fn(); + server.addEventListener("chat:room1", listener); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(listener).toHaveBeenCalledTimes(1); + expect((listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); + }); + + it("delivers to local listeners even without WebSocket subscribers", () => { + const server = createWebSocketServerEventTarget(); + const listener = vi.fn(); + server.addEventListener("chat:room1", listener); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(listener).toHaveBeenCalledTimes(1); + }); + }); + + describe("incoming messages from spokes", () => { + it("dispatches regular events to local listeners", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + const listener = vi.fn(); + + server.addConnection(ws as any); + server.addEventListener("chat:room1", listener); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + ws.simulateMessage(JSON.stringify(envelope)); + + expect(listener).toHaveBeenCalledTimes(1); + expect((listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); + }); + + it("delivers events from a spoke to its per-connection spoke target", () => { + const onConnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onConnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + + const spoke = onConnection.mock.calls[0][0] as SpokeEventTarget; + const spokeListener = vi.fn(); + spoke.addEventListener("chat:room1", spokeListener); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + ws.simulateMessage(JSON.stringify(envelope)); + + expect(spokeListener).toHaveBeenCalledTimes(1); + expect((spokeListener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); + }); + + it("does not deliver spoke events to other spokes' listeners", () => { + const onConnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onConnection }); + const ws1 = createMockWebSocket(); + const ws2 = createMockWebSocket(); + + server.addConnection(ws1 as any); + server.addConnection(ws2 as any); + + const spoke1 = onConnection.mock.calls[0][0] as SpokeEventTarget; + const spoke2 = onConnection.mock.calls[1][0] as SpokeEventTarget; + + const spoke1Listener = vi.fn(); + const spoke2Listener = vi.fn(); + spoke1.addEventListener("chat:room1", spoke1Listener); + spoke2.addEventListener("chat:room1", spoke2Listener); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "from-ws1" }; + ws1.simulateMessage(JSON.stringify(envelope)); + + expect(spoke1Listener).toHaveBeenCalledTimes(1); + expect(spoke2Listener).not.toHaveBeenCalled(); + }); + }); + + describe("spoke target dispatchEvent", () => { + it("sends events to the specific spoke connection", () => { + const onConnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onConnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + + const spoke = onConnection.mock.calls[0][0] as SpokeEventTarget; + const envelope: EventEnvelope = { type: "direct", id: "spoke1", payload: "hello" }; + const event = new CustomEvent("direct:spoke1", { detail: envelope }) as TestEvent; + + spoke.dispatchEvent(event); + + expect(ws.send).toHaveBeenCalledWith(JSON.stringify(envelope)); + expect(spoke.dispatchEvent(event)).toBe(true); + }); + }); + + describe("malformed JSON handling", () => { + it("silently ignores malformed JSON and logs a warning", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + const listener = vi.fn(); + + server.addConnection(ws as any); + server.addEventListener("topic:a", listener); + + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + ws.simulateMessage("not valid json{{"); + + expect(listener).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(warnSpy).toHaveBeenCalledWith(expect.stringContaining("Failed to parse WebSocket message")); + + warnSpy.mockRestore(); + }); + + it("continues delivering valid messages after a parse error", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + const listener = vi.fn(); + + server.addConnection(ws as any); + server.addEventListener("topic:a", listener); + + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + ws.simulateMessage("broken{{{"); + + const envelope: EventEnvelope = { type: "topic", id: "a", payload: "good" }; + ws.simulateMessage(JSON.stringify(envelope)); + + expect(listener).toHaveBeenCalledTimes(1); + warnSpy.mockRestore(); + }); + }); + + describe("backpressure handling", () => { + it("closes connection when bufferedAmount exceeds threshold", () => { + const server = createWebSocketServerEventTarget({ maxBufferedAmount: 1024 }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + ws.bufferedAmount = 2048; + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws.close).toHaveBeenCalledWith(1013, "Try Again Later"); + }); + + it("calls onBackpressure callback before disconnecting", () => { + const onBackpressure = vi.fn(); + const server = createWebSocketServerEventTarget({ + maxBufferedAmount: 1024, + onBackpressure, + }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + ws.bufferedAmount = 2048; + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(onBackpressure).toHaveBeenCalledTimes(1); + expect(onBackpressure).toHaveBeenCalledWith(ws, 2048); + }); + + it("removes connection after backpressure disconnect", () => { + const onDisconnection = vi.fn(); + const server = createWebSocketServerEventTarget({ + maxBufferedAmount: 1024, + onDisconnection, + }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + ws.bufferedAmount = 2048; + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(onDisconnection).toHaveBeenCalledTimes(1); + }); + + it("does not send the current event when backpressure threshold is exceeded", () => { + const server = createWebSocketServerEventTarget({ maxBufferedAmount: 1024 }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + ws.bufferedAmount = 2048; + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + + (ws.send as ReturnType).mockClear(); + server.dispatchEvent(event); + + expect(ws.send).not.toHaveBeenCalled(); + }); + + it("default maxBufferedAmount is 1MB (1_048_576)", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + ws.bufferedAmount = 1_048_577; + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + + (ws.send as ReturnType).mockClear(); + server.dispatchEvent(event); + + expect(ws.send).not.toHaveBeenCalled(); + expect(ws.close).toHaveBeenCalledWith(1013, "Try Again Later"); + }); + + it("allows sending when bufferedAmount is below threshold", () => { + const server = createWebSocketServerEventTarget({ maxBufferedAmount: 1024 }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + ws.bufferedAmount = 512; + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(ws.send).toHaveBeenCalledWith(JSON.stringify(envelope)); + }); + }); + + describe("send failure handling", () => { + it("removes connection and fires onDisconnection when ws.send throws", () => { + const onDisconnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onDisconnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + (ws.send as ReturnType).mockImplementation(() => { + throw new Error("Connection closed"); + }); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(onDisconnection).toHaveBeenCalledTimes(1); + }); + + it("dispatchEvent still returns true when ws.send throws", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + ws.simulateMessage(JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "chat:room1" } })); + + (ws.send as ReturnType).mockImplementation(() => { + throw new Error("Connection closed"); + }); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + + expect(server.dispatchEvent(event)).toBe(true); + }); + }); + + describe("addEventListener / removeEventListener on server target", () => { + it("adds and removes local listeners", () => { + const server = createWebSocketServerEventTarget(); + const listener = vi.fn(); + + server.addEventListener("chat:room1", listener); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(listener).toHaveBeenCalledTimes(1); + + server.removeEventListener("chat:room1", listener); + server.dispatchEvent(event); + + expect(listener).toHaveBeenCalledTimes(1); + }); + + it("supports EventListenerObject with handleEvent", () => { + const server = createWebSocketServerEventTarget(); + const handleEvent = vi.fn(); + const listenerObject = { handleEvent }; + + server.addEventListener("chat:room1", listenerObject); + + const envelope: EventEnvelope = { type: "chat", id: "room1", payload: "hello" }; + const event = new CustomEvent("chat:room1", { detail: envelope }) as TestEvent; + server.dispatchEvent(event); + + expect(handleEvent).toHaveBeenCalledTimes(1); + + server.removeEventListener("chat:room1", listenerObject); + server.dispatchEvent(event); + + expect(handleEvent).toHaveBeenCalledTimes(1); + }); + }); + + describe("per-connection spoke target", () => { + it("exposes ws property on spoke target", () => { + const onConnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onConnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + + const spoke = onConnection.mock.calls[0][0] as SpokeEventTarget; + expect(spoke.ws).toBe(ws); + }); + + it("spoke target dispatchEvent sends to specific connection", () => { + const onConnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onConnection }); + const ws1 = createMockWebSocket(); + const ws2 = createMockWebSocket(); + + server.addConnection(ws1 as any); + server.addConnection(ws2 as any); + + const spoke1 = onConnection.mock.calls[0][0] as SpokeEventTarget; + const envelope: EventEnvelope = { type: "direct", id: "spoke1", payload: "secret" }; + const event = new CustomEvent("direct:spoke1", { detail: envelope }) as TestEvent; + + spoke1.dispatchEvent(event); + + expect(ws1.send).toHaveBeenCalledWith(JSON.stringify(envelope)); + expect(ws2.send).not.toHaveBeenCalled(); + }); + + it("spoke target dispatchEvent returns true", () => { + const onConnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onConnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + + const spoke = onConnection.mock.calls[0][0] as SpokeEventTarget; + const envelope: EventEnvelope = { type: "direct", id: "spoke1", payload: "hello" }; + const event = new CustomEvent("direct:spoke1", { detail: envelope }) as TestEvent; + + expect(spoke.dispatchEvent(event)).toBe(true); + }); + }); + + describe("invalid message handling", () => { + it("ignores messages without a string type", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + const listener = vi.fn(); + + server.addConnection(ws as any); + server.addEventListener("topic:a", listener); + + ws.simulateMessage(JSON.stringify({ id: "1", payload: null })); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("handles non-string type gracefully", () => { + const server = createWebSocketServerEventTarget(); + const ws = createMockWebSocket(); + const listener = vi.fn(); + + server.addConnection(ws as any); + server.addEventListener("topic:a", listener); + + ws.simulateMessage(JSON.stringify({ type: 123, id: "1", payload: null })); + + expect(listener).not.toHaveBeenCalled(); + }); + }); + + describe("onDisconnection callback", () => { + it("receives the spoke event target and raw WebSocket", () => { + const onDisconnection = vi.fn(); + const server = createWebSocketServerEventTarget({ onDisconnection }); + const ws = createMockWebSocket(); + + server.addConnection(ws as any); + server.removeConnection(ws as any); + + expect(onDisconnection).toHaveBeenCalledTimes(1); + const [spoke, rawWs] = onDisconnection.mock.calls[0]; + expect(spoke).toHaveProperty("addEventListener"); + expect(spoke).toHaveProperty("removeEventListener"); + expect(spoke).toHaveProperty("dispatchEvent"); + expect(spoke).toHaveProperty("ws"); + expect(rawWs).toBe(ws); + }); + }); +}); \ No newline at end of file diff --git a/tsup.config.ts b/tsup.config.ts index e68b35e..12942f7 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -5,6 +5,7 @@ export default defineConfig({ 'src/index.ts', 'src/event-target-redis.ts', 'src/event-target-websocket-client.ts', + 'src/event-target-websocket-server.ts', ], format: ['esm', 'cjs'], dts: true,