feat(ws-server): implement WebSocket server event target adapter
This commit is contained in:
282
src/event-target-websocket-server.ts
Normal file
282
src/event-target-websocket-server.ts
Normal file
@@ -0,0 +1,282 @@
|
||||
import type { TypedEventTarget, TypedEvent, EventEnvelope } from "./types.js";
|
||||
|
||||
export interface WebSocketLike {
|
||||
send(data: string): void;
|
||||
close(code?: number, reason?: string): void;
|
||||
bufferedAmount: number;
|
||||
onmessage: ((ev: { data: string }) => void) | null;
|
||||
onclose: ((ev: { code: number; reason?: string }) => void) | null;
|
||||
}
|
||||
|
||||
export interface SpokeEventTarget<TEvent extends TypedEvent> extends TypedEventTarget<TEvent> {
|
||||
readonly ws: WebSocketLike;
|
||||
}
|
||||
|
||||
export interface CreateWebSocketServerEventTargetArgs<TEvent extends TypedEvent> {
|
||||
onConnection?: (spoke: SpokeEventTarget<TEvent>, ws: WebSocketLike) => void;
|
||||
onDisconnection?: (spoke: SpokeEventTarget<TEvent>, ws: WebSocketLike) => void;
|
||||
maxBufferedAmount?: number;
|
||||
onBackpressure?: (ws: WebSocketLike, bufferedAmount: number) => void;
|
||||
}
|
||||
|
||||
export interface WebSocketServerEventTarget<TEvent extends TypedEvent> extends TypedEventTarget<TEvent> {
|
||||
addConnection(ws: WebSocketLike): void;
|
||||
removeConnection(ws: WebSocketLike): void;
|
||||
}
|
||||
|
||||
export function createWebSocketServerEventTarget<TEvent extends TypedEvent>(
|
||||
args?: CreateWebSocketServerEventTargetArgs<TEvent>,
|
||||
): WebSocketServerEventTarget<TEvent> {
|
||||
const maxBufferedAmount = args?.maxBufferedAmount ?? 1_048_576;
|
||||
const onConnection = args?.onConnection;
|
||||
const onDisconnection = args?.onDisconnection;
|
||||
const onBackpressure = args?.onBackpressure;
|
||||
|
||||
const subscriptions = new Map<string, Set<WebSocketLike>>();
|
||||
const connectionSubscriptions = new Map<WebSocketLike, Set<string>>();
|
||||
|
||||
const connectionListeners = new Map<WebSocketLike, Map<string, Set<EventListener>>>();
|
||||
|
||||
const localListeners = new Map<string, Set<EventListener>>();
|
||||
|
||||
const spokeTargets = new Map<WebSocketLike, SpokeEventTarget<TEvent>>();
|
||||
|
||||
const originalOnmessage = new Map<WebSocketLike, ((ev: { data: string }) => void) | null>();
|
||||
const originalOnclose = new Map<WebSocketLike, ((ev: { code: number; reason?: string }) => void) | null>();
|
||||
|
||||
function createSpokeTarget(ws: WebSocketLike): SpokeEventTarget<TEvent> {
|
||||
const listeners = new Map<string, Set<EventListener>>();
|
||||
connectionListeners.set(ws, listeners);
|
||||
|
||||
const target: SpokeEventTarget<TEvent> = {
|
||||
get ws() {
|
||||
return ws;
|
||||
},
|
||||
addEventListener(type, callbackOrOptions: EventListenerOrEventListenerObject) {
|
||||
if (callbackOrOptions == null) return;
|
||||
const callback =
|
||||
"handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions;
|
||||
let set = listeners.get(type);
|
||||
if (set === undefined) {
|
||||
set = new Set();
|
||||
listeners.set(type, set);
|
||||
}
|
||||
set.add(callback as EventListener);
|
||||
},
|
||||
removeEventListener(type, callbackOrOptions: EventListenerOrEventListenerObject) {
|
||||
if (callbackOrOptions == null) return;
|
||||
const callback =
|
||||
"handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions;
|
||||
const set = listeners.get(type);
|
||||
if (set === undefined) return;
|
||||
set.delete(callback as EventListener);
|
||||
if (set.size === 0) {
|
||||
listeners.delete(type);
|
||||
}
|
||||
},
|
||||
dispatchEvent(event: TEvent): boolean {
|
||||
const message = JSON.stringify(event.detail);
|
||||
try {
|
||||
if (ws.bufferedAmount > maxBufferedAmount) {
|
||||
onBackpressure?.(ws, ws.bufferedAmount);
|
||||
ws.close(1013, "Try Again Later");
|
||||
removeConnection(ws);
|
||||
return true;
|
||||
}
|
||||
ws.send(message);
|
||||
} catch {
|
||||
removeConnection(ws);
|
||||
}
|
||||
return true;
|
||||
},
|
||||
};
|
||||
|
||||
spokeTargets.set(ws, target);
|
||||
return target;
|
||||
}
|
||||
|
||||
function removeConnection(ws: WebSocketLike) {
|
||||
const topics = connectionSubscriptions.get(ws);
|
||||
if (topics !== undefined) {
|
||||
for (const topic of topics) {
|
||||
const subscribers = subscriptions.get(topic);
|
||||
if (subscribers !== undefined) {
|
||||
subscribers.delete(ws);
|
||||
if (subscribers.size === 0) {
|
||||
subscriptions.delete(topic);
|
||||
}
|
||||
}
|
||||
}
|
||||
connectionSubscriptions.delete(ws);
|
||||
}
|
||||
|
||||
connectionListeners.delete(ws);
|
||||
|
||||
const spoke = spokeTargets.get(ws);
|
||||
spokeTargets.delete(ws);
|
||||
|
||||
const prevOnmessage = originalOnmessage.get(ws) ?? null;
|
||||
const prevOnclose = originalOnclose.get(ws) ?? null;
|
||||
ws.onmessage = prevOnmessage;
|
||||
ws.onclose = prevOnclose;
|
||||
originalOnmessage.delete(ws);
|
||||
originalOnclose.delete(ws);
|
||||
|
||||
if (spoke !== undefined) {
|
||||
onDisconnection?.(spoke, ws);
|
||||
}
|
||||
}
|
||||
|
||||
function addConnection(ws: WebSocketLike) {
|
||||
if (spokeTargets.has(ws)) return;
|
||||
|
||||
originalOnmessage.set(ws, ws.onmessage);
|
||||
originalOnclose.set(ws, ws.onclose);
|
||||
|
||||
const spoke = createSpokeTarget(ws);
|
||||
|
||||
connectionSubscriptions.set(ws, new Set());
|
||||
|
||||
const prevOnclose = originalOnclose.get(ws)!;
|
||||
|
||||
ws.onmessage = (ev: { data: string }) => {
|
||||
let envelope: EventEnvelope;
|
||||
try {
|
||||
envelope = JSON.parse(ev.data) as EventEnvelope;
|
||||
} catch {
|
||||
console.warn(`Failed to parse WebSocket message: ${ev.data}`);
|
||||
return;
|
||||
}
|
||||
|
||||
if (typeof envelope.type !== "string") return;
|
||||
|
||||
if (envelope.type === "__subscribe") {
|
||||
const topic = (envelope.payload as Record<string, unknown>)?.topic;
|
||||
if (typeof topic === "string" && topic.length > 0) {
|
||||
let subscribers = subscriptions.get(topic);
|
||||
if (subscribers === undefined) {
|
||||
subscribers = new Set();
|
||||
subscriptions.set(topic, subscribers);
|
||||
}
|
||||
subscribers.add(ws);
|
||||
const topics = connectionSubscriptions.get(ws);
|
||||
if (topics !== undefined) {
|
||||
topics.add(topic);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (envelope.type === "__unsubscribe") {
|
||||
const topic = (envelope.payload as Record<string, unknown>)?.topic;
|
||||
if (typeof topic === "string" && topic.length > 0) {
|
||||
const subscribers = subscriptions.get(topic);
|
||||
if (subscribers !== undefined) {
|
||||
subscribers.delete(ws);
|
||||
if (subscribers.size === 0) {
|
||||
subscriptions.delete(topic);
|
||||
}
|
||||
}
|
||||
const topics = connectionSubscriptions.get(ws);
|
||||
if (topics !== undefined) {
|
||||
topics.delete(topic);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
const topic = `${envelope.type}:${envelope.id}`;
|
||||
const customEvent = new CustomEvent(topic, { detail: envelope }) as TEvent;
|
||||
|
||||
const spokeListeners = connectionListeners.get(ws);
|
||||
if (spokeListeners !== undefined) {
|
||||
const cbs = spokeListeners.get(topic);
|
||||
if (cbs !== undefined) {
|
||||
for (const cb of cbs) {
|
||||
cb(customEvent);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const localCbs = localListeners.get(topic);
|
||||
if (localCbs !== undefined) {
|
||||
for (const cb of localCbs) {
|
||||
cb(customEvent);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
ws.onclose = (ev: { code: number; reason?: string }) => {
|
||||
removeConnection(ws);
|
||||
if (prevOnclose !== null) {
|
||||
prevOnclose(ev);
|
||||
}
|
||||
};
|
||||
|
||||
onConnection?.(spoke, ws);
|
||||
}
|
||||
|
||||
function sendToConnection(ws: WebSocketLike, message: string) {
|
||||
try {
|
||||
if (ws.bufferedAmount > maxBufferedAmount) {
|
||||
onBackpressure?.(ws, ws.bufferedAmount);
|
||||
ws.close(1013, "Try Again Later");
|
||||
removeConnection(ws);
|
||||
return;
|
||||
}
|
||||
ws.send(message);
|
||||
} catch {
|
||||
removeConnection(ws);
|
||||
}
|
||||
}
|
||||
|
||||
const serverTarget: WebSocketServerEventTarget<TEvent> = {
|
||||
addConnection(ws: WebSocketLike) {
|
||||
addConnection(ws);
|
||||
},
|
||||
removeConnection(ws: WebSocketLike) {
|
||||
removeConnection(ws);
|
||||
},
|
||||
addEventListener(type, callbackOrOptions: EventListenerOrEventListenerObject) {
|
||||
if (callbackOrOptions == null) return;
|
||||
const callback =
|
||||
"handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions;
|
||||
let set = localListeners.get(type);
|
||||
if (set === undefined) {
|
||||
set = new Set();
|
||||
localListeners.set(type, set);
|
||||
}
|
||||
set.add(callback as EventListener);
|
||||
},
|
||||
removeEventListener(type, callbackOrOptions: EventListenerOrEventListenerObject) {
|
||||
if (callbackOrOptions == null) return;
|
||||
const callback =
|
||||
"handleEvent" in callbackOrOptions ? callbackOrOptions.handleEvent : callbackOrOptions;
|
||||
const set = localListeners.get(type);
|
||||
if (set === undefined) return;
|
||||
set.delete(callback as EventListener);
|
||||
if (set.size === 0) {
|
||||
localListeners.delete(type);
|
||||
}
|
||||
},
|
||||
dispatchEvent(event: TEvent): boolean {
|
||||
const message = JSON.stringify(event.detail);
|
||||
const subscribers = subscriptions.get(event.type);
|
||||
if (subscribers !== undefined) {
|
||||
for (const ws of subscribers) {
|
||||
sendToConnection(ws, message);
|
||||
}
|
||||
}
|
||||
|
||||
const localCbs = localListeners.get(event.type);
|
||||
if (localCbs !== undefined) {
|
||||
for (const cb of localCbs) {
|
||||
cb(event);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
},
|
||||
};
|
||||
|
||||
return serverTarget;
|
||||
}
|
||||
@@ -3,4 +3,5 @@ export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedE
|
||||
export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js";
|
||||
export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js";
|
||||
export { createRedisEventTarget, type CreateRedisEventTargetArgs } from "./event-target-redis.js";
|
||||
export { createWebSocketClientEventTarget } from "./event-target-websocket-client.js";
|
||||
export { createWebSocketClientEventTarget } from "./event-target-websocket-client.js";
|
||||
export { createWebSocketServerEventTarget, type WebSocketLike, type SpokeEventTarget, type CreateWebSocketServerEventTargetArgs, type WebSocketServerEventTarget } from "./event-target-websocket-server.js";
|
||||
Reference in New Issue
Block a user