feat(ws-client): implement WebSocket client event target adapter

This commit is contained in:
2026-05-08 07:04:04 +00:00
parent 60a51948f1
commit b2b07b179e
5 changed files with 610 additions and 1 deletions

View File

@@ -26,6 +26,16 @@
"types": "./dist/event-target-redis.d.cts", "types": "./dist/event-target-redis.d.cts",
"default": "./dist/event-target-redis.cjs" "default": "./dist/event-target-redis.cjs"
} }
},
"./event-target-websocket-client": {
"import": {
"types": "./dist/event-target-websocket-client.d.ts",
"default": "./dist/event-target-websocket-client.js"
},
"require": {
"types": "./dist/event-target-websocket-client.d.cts",
"default": "./dist/event-target-websocket-client.cjs"
}
} }
}, },
"publishConfig": { "publishConfig": {

View File

@@ -0,0 +1,100 @@
import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js";
export function createWebSocketClientEventTarget<TEvent extends TypedEvent>(
ws: WebSocket,
): TypedEventTarget<TEvent> {
const callbacksForTopic = new Map<string, Set<EventListener>>();
ws.onmessage = (event: MessageEvent) => {
let envelope: EventEnvelope;
try {
envelope = JSON.parse(event.data as string) as EventEnvelope;
} catch {
console.warn(
`Failed to parse WebSocket message: ${event.data}`,
);
return;
}
if (typeof envelope.type !== "string" || envelope.type.startsWith("__")) {
return;
}
const topic = `${envelope.type}:${envelope.id}`;
const callbacks = callbacksForTopic.get(topic);
if (callbacks === undefined) {
return;
}
const customEvent = new CustomEvent(topic, {
detail: envelope,
}) as TEvent;
for (const callback of callbacks) {
callback(customEvent);
}
};
function addCallback(topic: string, callback: EventListener) {
let callbacks = callbacksForTopic.get(topic);
const isFirst = callbacks === undefined;
if (isFirst) {
callbacks = new Set();
callbacksForTopic.set(topic, callbacks);
}
callbacks!.add(callback);
if (isFirst) {
ws.send(
JSON.stringify({
type: "__subscribe",
id: "",
payload: { topic },
}),
);
}
}
function removeCallback(topic: string, callback: EventListener) {
const callbacks = callbacksForTopic.get(topic);
if (callbacks === undefined) {
return;
}
const existed = callbacks.delete(callback);
if (!existed) {
return;
}
if (callbacks.size > 0) {
return;
}
callbacksForTopic.delete(topic);
ws.send(
JSON.stringify({
type: "__unsubscribe",
id: "",
payload: { topic },
}),
);
}
return {
addEventListener(topic, callbackOrOptions: EventListenerOrEventListenerObject) {
if (callbackOrOptions != null) {
const callback =
"handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions;
addCallback(topic, callback);
}
},
dispatchEvent(event: TEvent) {
ws.send(JSON.stringify(event.detail));
return true;
},
removeEventListener(topic, callbackOrOptions: EventListenerOrEventListenerObject) {
if (callbackOrOptions != null) {
const callback =
"handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions;
removeCallback(topic, callback);
}
},
};
}

View File

@@ -2,4 +2,5 @@ export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type Pu
export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js"; export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js";
export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js"; export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js";
export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js"; export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js";
export { createRedisEventTarget, type CreateRedisEventTargetArgs } from "./event-target-redis.js"; export { createRedisEventTarget, type CreateRedisEventTargetArgs } from "./event-target-redis.js";
export { createWebSocketClientEventTarget } from "./event-target-websocket-client.js";

View File

@@ -0,0 +1,497 @@
import { describe, it, expect, vi, beforeEach } from "vitest";
import { createWebSocketClientEventTarget } from "../src/event-target-websocket-client.js";
import type { EventEnvelope, TypedEvent } from "../src/types.js";
type TestEvent = TypedEvent<string, EventEnvelope>;
function createMockWebSocket() {
const sent: string[] = [];
let onmessageHandler: ((event: { data: string }) => void) | null = null;
const ws = {
send: vi.fn((data: string) => {
sent.push(data);
}),
get onmessage() {
return onmessageHandler;
},
set onmessage(handler: ((event: { data: string }) => void) | null) {
onmessageHandler = handler;
},
sent,
simulateMessage(data: string) {
if (onmessageHandler) {
onmessageHandler({ data });
}
},
};
Object.defineProperty(ws, "onmessage", {
get() {
return onmessageHandler;
},
set(handler: ((event: { data: string }) => void) | null) {
onmessageHandler = handler;
},
});
return ws;
}
describe("createWebSocketClientEventTarget", () => {
describe("dispatchEvent (send path)", () => {
it("serializes envelope detail and calls ws.send", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const envelope: EventEnvelope<"call.responded", { status: string }> = {
type: "call.responded",
id: "uuid-123",
payload: { status: "ok" },
};
const event = new CustomEvent("call.responded:uuid-123", {
detail: envelope,
}) as TestEvent;
eventTarget.dispatchEvent(event);
expect(ws.send).toHaveBeenCalledTimes(1);
expect(ws.send).toHaveBeenCalledWith(JSON.stringify(envelope));
});
it("returns true from dispatchEvent", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const event = new CustomEvent("test:event", {
detail: { type: "test", id: "event", payload: null },
}) as TestEvent;
const result = eventTarget.dispatchEvent(event);
expect(result).toBe(true);
});
it("propagates ws.send() errors to caller", () => {
const ws = createMockWebSocket();
ws.send.mockImplementation(() => {
throw new Error("WebSocket is not open");
});
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const event = new CustomEvent("test:event", {
detail: { type: "test", id: "event", payload: null },
}) as TestEvent;
expect(() => eventTarget.dispatchEvent(event)).toThrow("WebSocket is not open");
});
});
describe("addEventListener (subscribe path)", () => {
it("sends __subscribe control event on first listener for a topic", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("call.responded:uuid-123", listener);
expect(ws.send).toHaveBeenCalledTimes(1);
expect(ws.send).toHaveBeenCalledWith(
JSON.stringify({
type: "__subscribe",
id: "",
payload: { topic: "call.responded:uuid-123" },
}),
);
});
it("sends __subscribe only once when multiple listeners are added for the same topic", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener1 = vi.fn();
const listener2 = vi.fn();
eventTarget.addEventListener("message.sent:msg1", listener1);
eventTarget.addEventListener("message.sent:msg1", listener2);
expect(ws.send).toHaveBeenCalledTimes(1);
});
it("does not send __subscribe when callback is null", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
eventTarget.addEventListener("topic:a", null as any);
expect(ws.send).not.toHaveBeenCalled();
});
it("supports EventListenerObject with handleEvent", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const handleEvent = vi.fn();
const listenerObject = { handleEvent };
eventTarget.addEventListener("obj:test", listenerObject);
expect(ws.send).toHaveBeenCalledTimes(1);
expect(ws.send).toHaveBeenCalledWith(
JSON.stringify({
type: "__subscribe",
id: "",
payload: { topic: "obj:test" },
}),
);
});
});
describe("removeEventListener (unsubscribe path)", () => {
it("sends __unsubscribe when the last listener for a topic is removed", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("topic:a", listener);
expect(ws.send).toHaveBeenCalledTimes(1);
eventTarget.removeEventListener("topic:a", listener);
expect(ws.send).toHaveBeenCalledTimes(2);
expect(ws.send).toHaveBeenCalledWith(
JSON.stringify({
type: "__unsubscribe",
id: "",
payload: { topic: "topic:a" },
}),
);
});
it("does not send __unsubscribe while other listeners remain for the same topic", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener1 = vi.fn();
const listener2 = vi.fn();
eventTarget.addEventListener("event:type1", listener1);
eventTarget.addEventListener("event:type1", listener2);
expect(ws.send).toHaveBeenCalledTimes(1);
eventTarget.removeEventListener("event:type1", listener1);
expect(ws.send).toHaveBeenCalledTimes(1);
eventTarget.removeEventListener("event:type1", listener2);
expect(ws.send).toHaveBeenCalledTimes(2);
expect(ws.send).toHaveBeenCalledWith(
JSON.stringify({
type: "__unsubscribe",
id: "",
payload: { topic: "event:type1" },
}),
);
});
it("does not send __unsubscribe when removing a callback that was never registered", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener1 = vi.fn();
eventTarget.addEventListener("topic:a", listener1);
const unregisteredListener = vi.fn();
eventTarget.removeEventListener("topic:a", unregisteredListener);
expect(ws.send).toHaveBeenCalledTimes(1);
});
it("does not send __unsubscribe when callback is null", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
eventTarget.addEventListener("topic:a", null as any);
eventTarget.removeEventListener("topic:a", null as any);
expect(ws.send).not.toHaveBeenCalled();
});
it("supports EventListenerObject with handleEvent for removal", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const handleEvent = vi.fn();
const listenerObject = { handleEvent };
eventTarget.addEventListener("obj:test", listenerObject);
expect(ws.send).toHaveBeenCalledWith(
JSON.stringify({
type: "__subscribe",
id: "",
payload: { topic: "obj:test" },
}),
);
eventTarget.removeEventListener("obj:test", listenerObject);
expect(ws.send).toHaveBeenCalledWith(
JSON.stringify({
type: "__unsubscribe",
id: "",
payload: { topic: "obj:test" },
}),
);
});
});
describe("receive path (ws.onmessage)", () => {
it("parses envelope, creates CustomEvent with type:id topic, and dispatches to listeners", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("message.sent:msg1", listener);
const envelope: EventEnvelope = {
type: "message.sent",
id: "msg1",
payload: "hello world",
};
ws.simulateMessage(JSON.stringify(envelope));
expect(listener).toHaveBeenCalledTimes(1);
const receivedEvent = listener.mock.calls[0][0] as TestEvent;
expect(receivedEvent.type).toBe("message.sent:msg1");
expect(receivedEvent.detail).toEqual(envelope);
});
it("delivers messages to all listeners on the same topic", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener1 = vi.fn();
const listener2 = vi.fn();
eventTarget.addEventListener("topic:x", listener1);
eventTarget.addEventListener("topic:x", listener2);
const envelope: EventEnvelope = { type: "topic", id: "x", payload: "data" };
ws.simulateMessage(JSON.stringify(envelope));
expect(listener1).toHaveBeenCalledTimes(1);
expect(listener2).toHaveBeenCalledTimes(1);
expect((listener1.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
expect((listener2.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
});
it("ignores messages for topics with no registered listeners", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("topic:a", listener);
const envelope: EventEnvelope = { type: "other", id: "b", payload: null };
ws.simulateMessage(JSON.stringify(envelope));
expect(listener).not.toHaveBeenCalled();
});
});
describe("malformed JSON handling", () => {
it("silently ignores malformed JSON and logs a warning", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("topic:a", listener);
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
ws.simulateMessage("not valid json{{");
expect(listener).not.toHaveBeenCalled();
expect(warnSpy).toHaveBeenCalledTimes(1);
expect(warnSpy).toHaveBeenCalledWith(
expect.stringContaining("Failed to parse WebSocket message"),
);
warnSpy.mockRestore();
});
it("continues delivering valid messages after a parse error", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("topic:a", listener);
const warnSpy = vi.spyOn(console, "warn").mockImplementation(() => {});
ws.simulateMessage("broken{{{");
expect(listener).not.toHaveBeenCalled();
const envelope: EventEnvelope = { type: "topic", id: "a", payload: "good" };
ws.simulateMessage(JSON.stringify(envelope));
expect(listener).toHaveBeenCalledTimes(1);
expect((listener.mock.calls[0][0] as TestEvent).detail).toEqual(envelope);
warnSpy.mockRestore();
});
});
describe("control events from server", () => {
it("silently ignores __subscribe control events received from server", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("topic:a", listener);
ws.simulateMessage(JSON.stringify({
type: "__subscribe",
id: "",
payload: { topic: "topic:a" },
}));
expect(listener).not.toHaveBeenCalled();
});
it("silently ignores __unsubscribe control events received from server", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("topic:a", listener);
ws.simulateMessage(JSON.stringify({
type: "__unsubscribe",
id: "",
payload: { topic: "topic:a" },
}));
expect(listener).not.toHaveBeenCalled();
});
it("ignores any event type starting with __", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("__custom:thing", listener);
ws.simulateMessage(JSON.stringify({
type: "__custom",
id: "thing",
payload: null,
}));
expect(listener).not.toHaveBeenCalled();
});
});
describe("EventEnvelope round-trip", () => {
it("round-trips full { type, id, payload } envelope through send and receive", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("user.joined:user-99", listener);
const originalEnvelope: EventEnvelope<"user.joined", { name: string; role: string }> = {
type: "user.joined",
id: "user-99",
payload: { name: "Bob", role: "admin" },
};
const event = new CustomEvent("user.joined:user-99", {
detail: originalEnvelope,
}) as TestEvent;
eventTarget.dispatchEvent(event);
const dispatchedData = ws.sent[1];
expect(dispatchedData).toBe(JSON.stringify(originalEnvelope));
const sentData = dispatchedData;
ws.simulateMessage(sentData);
expect(listener).toHaveBeenCalledTimes(1);
const receivedDetail = (listener.mock.calls[0][0] as TestEvent).detail as EventEnvelope;
expect(receivedDetail).toEqual(originalEnvelope);
expect(receivedDetail.type).toBe("user.joined");
expect(receivedDetail.id).toBe("user-99");
expect(receivedDetail.payload).toEqual({ name: "Bob", role: "admin" });
});
it("round-trips envelope with null payload", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener = vi.fn();
eventTarget.addEventListener("ping:1", listener);
const envelope: EventEnvelope = { type: "ping", id: "1", payload: null };
ws.simulateMessage(JSON.stringify(envelope));
expect(listener).toHaveBeenCalledTimes(1);
const receivedDetail = (listener.mock.calls[0][0] as TestEvent).detail as EventEnvelope;
expect(receivedDetail).toEqual(envelope);
});
});
describe("subscription reference counting", () => {
it("re-sends __subscribe after all listeners removed and a new one added", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listener1 = vi.fn();
eventTarget.addEventListener("topic:a", listener1);
expect(ws.send).toHaveBeenCalledTimes(1);
eventTarget.removeEventListener("topic:a", listener1);
expect(ws.send).toHaveBeenCalledTimes(2);
const listener2 = vi.fn();
eventTarget.addEventListener("topic:a", listener2);
expect(ws.send).toHaveBeenCalledTimes(3);
expect(ws.send).toHaveBeenNthCalledWith(1,
JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "topic:a" } }),
);
expect(ws.send).toHaveBeenNthCalledWith(2,
JSON.stringify({ type: "__unsubscribe", id: "", payload: { topic: "topic:a" } }),
);
expect(ws.send).toHaveBeenNthCalledWith(3,
JSON.stringify({ type: "__subscribe", id: "", payload: { topic: "topic:a" } }),
);
});
it("tracks separate topics independently", () => {
const ws = createMockWebSocket();
const eventTarget = createWebSocketClientEventTarget<TestEvent>(ws as any);
const listenerA = vi.fn();
const listenerB = vi.fn();
eventTarget.addEventListener("topic:a", listenerA);
eventTarget.addEventListener("topic:b", listenerB);
expect(ws.send).toHaveBeenCalledTimes(2);
eventTarget.removeEventListener("topic:a", listenerA);
expect(ws.send).toHaveBeenCalledTimes(3);
expect(ws.send).toHaveBeenNthCalledWith(3,
JSON.stringify({ type: "__unsubscribe", id: "", payload: { topic: "topic:a" } }),
);
const listenerB2 = vi.fn();
eventTarget.addEventListener("topic:b", listenerB2);
expect(ws.send).toHaveBeenCalledTimes(3);
});
});
});

View File

@@ -4,6 +4,7 @@ export default defineConfig({
entry: [ entry: [
'src/index.ts', 'src/index.ts',
'src/event-target-redis.ts', 'src/event-target-redis.ts',
'src/event-target-websocket-client.ts',
], ],
format: ['esm', 'cjs'], format: ['esm', 'cjs'],
dts: true, dts: true,