diff --git a/package.json b/package.json index 8700c50..6eeb14a 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,16 @@ "types": "./dist/event-target-redis.d.cts", "default": "./dist/event-target-redis.cjs" } + }, + "./event-target-websocket-client": { + "import": { + "types": "./dist/event-target-websocket-client.d.ts", + "default": "./dist/event-target-websocket-client.js" + }, + "require": { + "types": "./dist/event-target-websocket-client.d.cts", + "default": "./dist/event-target-websocket-client.cjs" + } } }, "publishConfig": { diff --git a/src/event-target-websocket-client.ts b/src/event-target-websocket-client.ts new file mode 100644 index 0000000..8fcd750 --- /dev/null +++ b/src/event-target-websocket-client.ts @@ -0,0 +1,100 @@ +import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js"; + +export function createWebSocketClientEventTarget( + ws: WebSocket, +): TypedEventTarget { + const callbacksForTopic = new Map>(); + + ws.onmessage = (event: MessageEvent) => { + let envelope: EventEnvelope; + try { + envelope = JSON.parse(event.data as string) as EventEnvelope; + } catch { + console.warn( + `Failed to parse WebSocket message: ${event.data}`, + ); + return; + } + + if (typeof envelope.type !== "string" || envelope.type.startsWith("__")) { + return; + } + + const topic = `${envelope.type}:${envelope.id}`; + const callbacks = callbacksForTopic.get(topic); + if (callbacks === undefined) { + return; + } + + const customEvent = new CustomEvent(topic, { + detail: envelope, + }) as TEvent; + + for (const callback of callbacks) { + callback(customEvent); + } + }; + + function addCallback(topic: string, callback: EventListener) { + let callbacks = callbacksForTopic.get(topic); + const isFirst = callbacks === undefined; + if (isFirst) { + callbacks = new Set(); + callbacksForTopic.set(topic, callbacks); + } + callbacks!.add(callback); + + if (isFirst) { + ws.send( + JSON.stringify({ + type: "__subscribe", + id: "", + payload: { topic }, + }), + ); + } + } + + function removeCallback(topic: string, callback: EventListener) { + const callbacks = callbacksForTopic.get(topic); + if (callbacks === undefined) { + return; + } + const existed = callbacks.delete(callback); + if (!existed) { + return; + } + if (callbacks.size > 0) { + return; + } + callbacksForTopic.delete(topic); + ws.send( + JSON.stringify({ + type: "__unsubscribe", + id: "", + payload: { topic }, + }), + ); + } + + return { + addEventListener(topic, callbackOrOptions: EventListenerOrEventListenerObject) { + if (callbackOrOptions != null) { + const callback = + "handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions; + addCallback(topic, callback); + } + }, + dispatchEvent(event: TEvent) { + ws.send(JSON.stringify(event.detail)); + return true; + }, + removeEventListener(topic, callbackOrOptions: EventListenerOrEventListenerObject) { + if (callbackOrOptions != null) { + const callback = + "handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions; + removeCallback(topic, callback); + } + }, + }; +} \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index d4fa1b7..19abaf9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,4 +2,5 @@ export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type Pu export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js"; export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js"; export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js"; -export { createRedisEventTarget, type CreateRedisEventTargetArgs } from "./event-target-redis.js"; \ No newline at end of file +export { createRedisEventTarget, type CreateRedisEventTargetArgs } from "./event-target-redis.js"; +export { createWebSocketClientEventTarget } from "./event-target-websocket-client.js"; \ No newline at end of file diff --git a/test/event-target-websocket-client.test.ts b/test/event-target-websocket-client.test.ts new file mode 100644 index 0000000..8a196f4 --- /dev/null +++ b/test/event-target-websocket-client.test.ts @@ -0,0 +1,497 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; +import { createWebSocketClientEventTarget } from "../src/event-target-websocket-client.js"; +import type { EventEnvelope, TypedEvent } from "../src/types.js"; + +type TestEvent = TypedEvent; + +function createMockWebSocket() { + const sent: string[] = []; + let onmessageHandler: ((event: { data: string }) => void) | null = null; + + const ws = { + send: vi.fn((data: string) => { + sent.push(data); + }), + get onmessage() { + return onmessageHandler; + }, + set onmessage(handler: ((event: { data: string }) => void) | null) { + onmessageHandler = handler; + }, + sent, + simulateMessage(data: string) { + if (onmessageHandler) { + onmessageHandler({ data }); + } + }, + }; + + Object.defineProperty(ws, "onmessage", { + get() { + return onmessageHandler; + }, + set(handler: ((event: { data: string }) => void) | null) { + onmessageHandler = handler; + }, + }); + + return ws; +} + +describe("createWebSocketClientEventTarget", () => { + describe("dispatchEvent (send path)", () => { + it("serializes envelope detail and calls ws.send", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const envelope: EventEnvelope<"call.responded", { status: string }> = { + type: "call.responded", + id: "uuid-123", + payload: { status: "ok" }, + }; + + const event = new CustomEvent("call.responded:uuid-123", { + detail: envelope, + }) as TestEvent; + + eventTarget.dispatchEvent(event); + + expect(ws.send).toHaveBeenCalledTimes(1); + expect(ws.send).toHaveBeenCalledWith(JSON.stringify(envelope)); + }); + + it("returns true from dispatchEvent", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const event = new CustomEvent("test:event", { + detail: { type: "test", id: "event", payload: null }, + }) as TestEvent; + + const result = eventTarget.dispatchEvent(event); + expect(result).toBe(true); + }); + + it("propagates ws.send() errors to caller", () => { + const ws = createMockWebSocket(); + ws.send.mockImplementation(() => { + throw new Error("WebSocket is not open"); + }); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const event = new CustomEvent("test:event", { + detail: { type: "test", id: "event", payload: null }, + }) as TestEvent; + + expect(() => eventTarget.dispatchEvent(event)).toThrow("WebSocket is not open"); + }); + }); + + describe("addEventListener (subscribe path)", () => { + it("sends __subscribe control event on first listener for a topic", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("call.responded:uuid-123", listener); + + expect(ws.send).toHaveBeenCalledTimes(1); + expect(ws.send).toHaveBeenCalledWith( + JSON.stringify({ + type: "__subscribe", + id: "", + payload: { topic: "call.responded:uuid-123" }, + }), + ); + }); + + it("sends __subscribe only once when multiple listeners are added for the same topic", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener1 = vi.fn(); + const listener2 = vi.fn(); + eventTarget.addEventListener("message.sent:msg1", listener1); + eventTarget.addEventListener("message.sent:msg1", listener2); + + expect(ws.send).toHaveBeenCalledTimes(1); + }); + + it("does not send __subscribe when callback is null", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + eventTarget.addEventListener("topic:a", null as any); + + expect(ws.send).not.toHaveBeenCalled(); + }); + + it("supports EventListenerObject with handleEvent", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const handleEvent = vi.fn(); + const listenerObject = { handleEvent }; + + eventTarget.addEventListener("obj:test", listenerObject); + + expect(ws.send).toHaveBeenCalledTimes(1); + expect(ws.send).toHaveBeenCalledWith( + JSON.stringify({ + type: "__subscribe", + id: "", + payload: { topic: "obj:test" }, + }), + ); + }); + }); + + describe("removeEventListener (unsubscribe path)", () => { + it("sends __unsubscribe when the last listener for a topic is removed", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + expect(ws.send).toHaveBeenCalledTimes(1); + + eventTarget.removeEventListener("topic:a", listener); + + expect(ws.send).toHaveBeenCalledTimes(2); + expect(ws.send).toHaveBeenCalledWith( + JSON.stringify({ + type: "__unsubscribe", + id: "", + payload: { topic: "topic:a" }, + }), + ); + }); + + it("does not send __unsubscribe while other listeners remain for the same topic", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener1 = vi.fn(); + const listener2 = vi.fn(); + eventTarget.addEventListener("event:type1", listener1); + eventTarget.addEventListener("event:type1", listener2); + + expect(ws.send).toHaveBeenCalledTimes(1); + + eventTarget.removeEventListener("event:type1", listener1); + + expect(ws.send).toHaveBeenCalledTimes(1); + + eventTarget.removeEventListener("event:type1", listener2); + + expect(ws.send).toHaveBeenCalledTimes(2); + expect(ws.send).toHaveBeenCalledWith( + JSON.stringify({ + type: "__unsubscribe", + id: "", + payload: { topic: "event:type1" }, + }), + ); + }); + + it("does not send __unsubscribe when removing a callback that was never registered", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener1 = vi.fn(); + eventTarget.addEventListener("topic:a", listener1); + + const unregisteredListener = vi.fn(); + eventTarget.removeEventListener("topic:a", unregisteredListener); + + expect(ws.send).toHaveBeenCalledTimes(1); + }); + + it("does not send __unsubscribe when callback is null", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + eventTarget.addEventListener("topic:a", null as any); + + eventTarget.removeEventListener("topic:a", null as any); + + expect(ws.send).not.toHaveBeenCalled(); + }); + + it("supports EventListenerObject with handleEvent for removal", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const handleEvent = vi.fn(); + const listenerObject = { handleEvent }; + + eventTarget.addEventListener("obj:test", listenerObject); + expect(ws.send).toHaveBeenCalledWith( + JSON.stringify({ + type: "__subscribe", + id: "", + payload: { topic: "obj:test" }, + }), + ); + + eventTarget.removeEventListener("obj:test", listenerObject); + expect(ws.send).toHaveBeenCalledWith( + JSON.stringify({ + type: "__unsubscribe", + id: "", + payload: { topic: "obj:test" }, + }), + ); + }); + }); + + describe("receive path (ws.onmessage)", () => { + it("parses envelope, creates CustomEvent with type:id topic, and dispatches to listeners", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("message.sent:msg1", listener); + + const envelope: EventEnvelope = { + type: "message.sent", + id: "msg1", + payload: "hello world", + }; + ws.simulateMessage(JSON.stringify(envelope)); + + expect(listener).toHaveBeenCalledTimes(1); + const receivedEvent = listener.mock.calls[0][0] as TestEvent; + expect(receivedEvent.type).toBe("message.sent:msg1"); + expect(receivedEvent.detail).toEqual(envelope); + }); + + it("delivers messages to all listeners on the same topic", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener1 = vi.fn(); + const listener2 = vi.fn(); + eventTarget.addEventListener("topic:x", listener1); + eventTarget.addEventListener("topic:x", listener2); + + const envelope: EventEnvelope = { type: "topic", id: "x", payload: "data" }; + ws.simulateMessage(JSON.stringify(envelope)); + + expect(listener1).toHaveBeenCalledTimes(1); + expect(listener2).toHaveBeenCalledTimes(1); + expect((listener1.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); + expect((listener2.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); + }); + + it("ignores messages for topics with no registered listeners", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + const envelope: EventEnvelope = { type: "other", id: "b", payload: null }; + ws.simulateMessage(JSON.stringify(envelope)); + + expect(listener).not.toHaveBeenCalled(); + }); + }); + + describe("malformed JSON handling", () => { + it("silently ignores malformed JSON and logs a warning", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + ws.simulateMessage("not valid json{{"); + + expect(listener).not.toHaveBeenCalled(); + expect(warnSpy).toHaveBeenCalledTimes(1); + expect(warnSpy).toHaveBeenCalledWith( + expect.stringContaining("Failed to parse WebSocket message"), + ); + + warnSpy.mockRestore(); + }); + + it("continues delivering valid messages after a parse error", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {}); + + ws.simulateMessage("broken{{{"); + + expect(listener).not.toHaveBeenCalled(); + + const envelope: EventEnvelope = { type: "topic", id: "a", payload: "good" }; + ws.simulateMessage(JSON.stringify(envelope)); + + expect(listener).toHaveBeenCalledTimes(1); + expect((listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope); + + warnSpy.mockRestore(); + }); + }); + + describe("control events from server", () => { + it("silently ignores __subscribe control events received from server", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + ws.simulateMessage(JSON.stringify({ + type: "__subscribe", + id: "", + payload: { topic: "topic:a" }, + })); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("silently ignores __unsubscribe control events received from server", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("topic:a", listener); + + ws.simulateMessage(JSON.stringify({ + type: "__unsubscribe", + id: "", + payload: { topic: "topic:a" }, + })); + + expect(listener).not.toHaveBeenCalled(); + }); + + it("ignores any event type starting with __", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("__custom:thing", listener); + + ws.simulateMessage(JSON.stringify({ + type: "__custom", + id: "thing", + payload: null, + })); + + expect(listener).not.toHaveBeenCalled(); + }); + }); + + describe("EventEnvelope round-trip", () => { + it("round-trips full { type, id, payload } envelope through send and receive", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("user.joined:user-99", listener); + + const originalEnvelope: EventEnvelope<"user.joined", { name: string; role: string }> = { + type: "user.joined", + id: "user-99", + payload: { name: "Bob", role: "admin" }, + }; + + const event = new CustomEvent("user.joined:user-99", { + detail: originalEnvelope, + }) as TestEvent; + + eventTarget.dispatchEvent(event); + + const dispatchedData = ws.sent[1]; + expect(dispatchedData).toBe(JSON.stringify(originalEnvelope)); + + const sentData = dispatchedData; + + ws.simulateMessage(sentData); + + expect(listener).toHaveBeenCalledTimes(1); + const receivedDetail = (listener.mock.calls[0][0] as TestEvent).detail as EventEnvelope; + expect(receivedDetail).toEqual(originalEnvelope); + expect(receivedDetail.type).toBe("user.joined"); + expect(receivedDetail.id).toBe("user-99"); + expect(receivedDetail.payload).toEqual({ name: "Bob", role: "admin" }); + }); + + it("round-trips envelope with null payload", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener = vi.fn(); + eventTarget.addEventListener("ping:1", listener); + + const envelope: EventEnvelope = { type: "ping", id: "1", payload: null }; + ws.simulateMessage(JSON.stringify(envelope)); + + expect(listener).toHaveBeenCalledTimes(1); + const receivedDetail = (listener.mock.calls[0][0] as TestEvent).detail as EventEnvelope; + expect(receivedDetail).toEqual(envelope); + }); + }); + + describe("subscription reference counting", () => { + it("re-sends __subscribe after all listeners removed and a new one added", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listener1 = vi.fn(); + eventTarget.addEventListener("topic:a", listener1); + expect(ws.send).toHaveBeenCalledTimes(1); + + eventTarget.removeEventListener("topic:a", listener1); + expect(ws.send).toHaveBeenCalledTimes(2); + + const listener2 = vi.fn(); + eventTarget.addEventListener("topic:a", listener2); + expect(ws.send).toHaveBeenCalledTimes(3); + + expect(ws.send).toHaveBeenNthCalledWith(1, + JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "topic:a" } }), + ); + expect(ws.send).toHaveBeenNthCalledWith(2, + JSON.stringify({ type: "__unsubscribe", id: "", payload: { topic: "topic:a" } }), + ); + expect(ws.send).toHaveBeenNthCalledWith(3, + JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "topic:a" } }), + ); + }); + + it("tracks separate topics independently", () => { + const ws = createMockWebSocket(); + const eventTarget = createWebSocketClientEventTarget(ws as any); + + const listenerA = vi.fn(); + const listenerB = vi.fn(); + eventTarget.addEventListener("topic:a", listenerA); + eventTarget.addEventListener("topic:b", listenerB); + + expect(ws.send).toHaveBeenCalledTimes(2); + + eventTarget.removeEventListener("topic:a", listenerA); + expect(ws.send).toHaveBeenCalledTimes(3); + + expect(ws.send).toHaveBeenNthCalledWith(3, + JSON.stringify({ type: "__unsubscribe", id: "", payload: { topic: "topic:a" } }), + ); + + const listenerB2 = vi.fn(); + eventTarget.addEventListener("topic:b", listenerB2); + expect(ws.send).toHaveBeenCalledTimes(3); + }); + }); +}); \ No newline at end of file diff --git a/tsup.config.ts b/tsup.config.ts index fd4ae46..e68b35e 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -4,6 +4,7 @@ export default defineConfig({ entry: [ 'src/index.ts', 'src/event-target-redis.ts', + 'src/event-target-websocket-client.ts', ], format: ['esm', 'cjs'], dts: true,