--- status: draft last_updated: 2026-05-08 --- # WebSocket Server Event Target **Import**: `@alkdev/pubsub/event-target-websocket-server` **Peer dep**: none (WebSocket is a web standard) **Status**: Not yet implemented. Manages multiple WebSocket connections for the server (hub) side. Handles topic-based fan-out: `dispatchEvent` sends to connections subscribed to that topic; `addEventListener` aggregates subscriptions across all connections. ## `createWebSocketServerEventTarget` ```ts interface WebSocketServerEventTarget extends TypedEventTarget { addConnection(ws: WebSocket): void; removeConnection(ws: WebSocket): void; } function createWebSocketServerEventTarget( options: CreateWebSocketServerEventTargetArgs, ): WebSocketServerEventTarget; ``` ### `CreateWebSocketServerEventTargetArgs` | Field | Type | Required | Description | |-------|------|----------|-------------| | `onConnection` | `(spoke: TypedEventTarget, ws: WebSocket) => void` | No | Called when a new spoke connects. Receives the spoke's per-connection event target and the raw `WebSocket`. | | `onDisconnection` | `(spoke: TypedEventTarget, ws: WebSocket) => void` | No | Called when a spoke disconnects. Receives the spoke's event target and `WebSocket` for cleanup. | | `maxBufferedAmount` | `number` | No | Per-connection backpressure threshold in bytes. Default: `1_048_576` (1 MB). When a connection's `bufferedAmount` exceeds this, the connection is closed with code `1013` (Try Again Later). | | `onBackpressure` | `(ws: WebSocket, bufferedAmount: number) => void` | No | Called for observability when a connection exceeds `maxBufferedAmount`, before the connection is closed. Cannot prevent the disconnect. Useful for logging and metrics. | ## How It Works The server adapter manages a `Set` of active connections and a `Map>` (the `subscriptions` map) tracking which connections are subscribed to which topic strings. - `dispatchEvent` → looks up connections subscribed to the event type, sends JSON envelope to each - `addEventListener` → registers local listeners. The server listens for events from any spoke - `removeEventListener` → removes local listeners ### Connection Lifecycle The caller handles the HTTP upgrade (framework-specific) and passes connected `WebSocket` instances to the adapter: ```ts import { createWebSocketServerEventTarget } from "@alkdev/pubsub/event-target-websocket-server"; const serverTarget = createWebSocketServerEventTarget({}); // Hono example — the adapter doesn't know about Hono app.get("/ws", (c) => { return c.upgrade(async (ws) => { serverTarget.addConnection(ws); // removeConnection is called automatically by the adapter's onclose handler. // Only call removeConnection manually for forced disconnections (backpressure, auth failures). }); }); ``` The adapter only handles raw `WebSocket` instances. It does not depend on any server framework (Hono, Express, Bun, Deno, etc.). The caller is responsible for: - HTTP upgrade - Passing connected `WebSocket`s to `addConnection` - Removing connections on close via `removeConnection` ### `addConnection` / `removeConnection` ```ts addConnection(ws: WebSocket): void removeConnection(ws: WebSocket): void ``` The server adapter exposes these methods on the returned `WebSocketServerEventTarget` for the caller to register and unregister connections. When a connection is added, the adapter sets up `onmessage` and `onclose` handlers. When removed, it cleans up all topic subscriptions for that connection. `removeConnection` cleans up internal state (subscription maps, event handlers) but does **not** close the `WebSocket`. The caller is responsible for closing the connection if needed. Typically it's called from a `close` event handler, where the connection is already closing. **Note:** The adapter's `onclose` handler (set up by `addConnection`) calls `removeConnection` internally. This means the caller does not need to call `removeConnection` from their own `close` handler — the adapter handles cleanup automatically when a connection closes. The `removeConnection` method is exposed for cases where the caller needs to manually disconnect a connection (e.g., for backpressure or authentication failures). ### Error Handling - **Malformed JSON from a spoke** → the message is silently ignored. The adapter logs a warning (via `console.warn` or a configurable logger) and continues processing other messages from that connection. The connection is not closed — a single malformed message should not disconnect a client. - **Duplicate `__subscribe`** → idempotent. Adding a connection to a topic set it's already in is a no-op. The `Set` data structure handles this naturally. - **Invalid topic format in control events** → silently ignored. An empty topic string or malformed topic is logged and discarded. - **Send failure** → if `ws.send()` throws (connection died between the `bufferedAmount` check and the send), the adapter catches the error, removes the connection from the subscription maps, and fires `onDisconnection`. - **`onclose` from client** → the adapter removes the connection from all subscription maps and fires `onDisconnection`. - **`dispatchEvent` return value** — always returns `true`, regardless of subscriber count or send failures. Errors are handled via side effects (connection removal, `onDisconnection` callback), not via the return value. This matches the `EventTarget` contract where `return false` means `preventDefault` was called, not "send failed." ### Concurrency Model This adapter assumes a single-threaded event loop (Node.js, Bun, Deno, browsers). In environments with worker threads, the caller must ensure `addConnection`/`removeConnection` and `dispatchEvent` are not called concurrently. ### Subscription Tracking The server adapter maintains a `subscriptions` map (`Map>`) from topic string to subscribed connections. Spokes subscribe to topics they're interested in, and `dispatchEvent` only sends to connections that have subscribed to that topic. **Why subscription tracking?** Without it, `dispatchEvent` would send every event to all connected spokes, regardless of whether they care about that topic. This wastes bandwidth and — worse — leaks data to clients that shouldn't receive it (e.g., a chat room message sent to clients not in that room). **Topic scoping alignment:** The `type:id` topic pattern already used by `createPubSub` (e.g., `"message.sent:conv-123"`) is the routing key. A spoke subscribes to `"message.sent:conv-123"`, and the server only sends events for that topic to that spoke. This is the same pattern Redis uses for channel subscriptions — it's just implemented at the adapter level instead of delegated to an external broker. **Direct messaging:** A spoke subscribing to `"direct:${spokeId}"` effectively creates a "room of one." The server can target that specific spoke by dispatching an event with that topic type. No special API needed — topic scoping handles it. ### Control Protocol Spokes communicate subscription changes to the hub using control events in the `EventEnvelope` format with reserved `__`-prefixed types: ```json { "type": "__subscribe", "id": "", "payload": { "topic": "message.sent:conv-123" } } { "type": "__unsubscribe", "id": "", "payload": { "topic": "message.sent:conv-123" } } ``` Convention: event types starting with `__` are reserved control messages. They are not dispatched to local listeners — they are handled internally by the adapter to update the subscription map. When a spoke's `addEventListener` is called, the client adapter sends a `__subscribe` control event to the server. When `removeEventListener` is called and no listeners remain for that topic, the client adapter sends an `__unsubscribe` control event. See [WebSocket Client Event Target](websocket-client.md) for the client-side behavior. ### Incoming Messages Each connected spoke sends JSON envelopes. The server listens on `ws.onmessage` for each connection: ```ts ws.onmessage = (msg) => { const envelope = JSON.parse(msg.data); // Control protocol if (envelope.type === "__subscribe") { addConnectionToTopic(ws, envelope.payload.topic); return; } if (envelope.type === "__unsubscribe") { removeConnectionFromTopic(ws, envelope.payload.topic); return; } // Regular event — dispatch to local listeners const topic = `${envelope.type}:${envelope.id}`; const event = new CustomEvent(topic, { detail: envelope }); this.dispatchEvent(event); }; ``` ### Outgoing Messages (Topic-Based Fan-out) ```ts dispatchEvent(event) { // event.type is the full topic string, e.g. "message.sent:conv-123" // This matches the topics that spokes subscribe to via __subscribe const message = JSON.stringify(event.detail); // Send only to connections subscribed to this topic const subscribers = this.subscriptions.get(event.type); if (subscribers) { for (const ws of subscribers) { sendWithBackpression(ws, message); } } return true; } ``` The routing key for fan-out is the full `CustomEvent.type` string (e.g., `"message.sent:conv-123"`), which matches the topic strings that spokes subscribe to via `__subscribe`. This is the same `type:id` pattern used by `createPubSub`. **Local listeners:** `dispatchEvent` also delivers to local listeners registered via `addEventListener` on the server itself, via the standard `EventTarget.prototype.dispatchEvent` mechanism. Local listeners use the same `type:id` topic strings. ### Backpressure WebSocket `send()` never blocks — it silently buffers until memory is exhausted. Unbounded buffering is the primary cause of OOM in production WebSocket servers. This adapter handles backpressure with a configurable threshold policy: **Default policy: disconnect slow consumers** 1. Before each `ws.send()`, check `ws.bufferedAmount` 2. If `bufferedAmount > maxBufferedAmount` (default 1 MB), close the connection with code `1013` (Try Again Later) — the current event is **not** sent 3. Call `onBackpressure` callback (if provided) before closing, for observability (logging, metrics). The connection is always closed after the callback runs; the callback cannot prevent the disconnect. **Why disconnect, not drop silently:** A slow consumer that's still subscribed will continue receiving events. Silently dropping doesn't solve the underlying problem — it just delays it. Disconnecting is honest and gives the client a chance to reconnect. **Why 1 MB default:** Enough headroom for brief network hiccups (a few hundred messages), but low enough to prevent runaway memory growth. This matches the production-tested defaults in uWebSockets.js and is far below Redis's 8 MB soft limit. **`bufferedAmount` caveats:** In Node.js `ws`, `bufferedAmount` is updated asynchronously and may not reflect the exact current state. This is acceptable for threshold-based backpressure — the check is conservative, not precise. ## Per-Connection Spoke Targets The server adapter does **not** create `WebSocketClientEventTarget` instances for each connection. The per-connection `TypedEventTarget` available in the `onConnection` callback is a minimal facade that: - Provides `addEventListener`/`removeEventListener` that listens only for events **received from that specific spoke** — not events from other spokes or from the server's own `dispatchEvent` - Dispatches events from that spoke to the server's local listeners Direct messaging to a specific spoke is achieved through topic scoping: `"direct:${spokeId}"`. The spoke subscribes to that topic; the hub dispatches to it. ## Key Properties - **Topic-based fan-out** — dispatchEvent sends only to connections subscribed to the event type, not all connections - **Aggregate subscription** — addEventListener listens for events from any spoke - **Connection lifecycle** — `addConnection`/`removeConnection` for the caller to register/unregister WebSocket instances - **Backpressure protection** — configurable threshold with disconnect policy - **No native deps** — works with any WebSocket server (Node ws, Bun, Deno, Hono) - **Framework-agnostic** — takes raw `WebSocket` instances, doesn't handle HTTP upgrade ## Design Decisions ### ADR: Framework-Agnostic Raw WebSocket Interface **Decision:** Accept raw `WebSocket` instances via `addConnection`/`removeConnection`, not a `WebSocketServer`. **Rationale:** The adapter should work with any server framework. Hono, Bun, Deno, Node `ws`, and Cloudflare Workers all produce `WebSocket`-like objects but have incompatible server APIs. By accepting raw `WebSocket` instances, the caller handles the framework-specific HTTP upgrade and passes connected sockets to the adapter. ### ADR: Topic-Based Fan-out with Subscription Tracking **Decision:** `dispatchEvent` sends only to connections subscribed to the event type, not all connections. Spokes declare subscriptions via `__subscribe`/`__unsubscribe` control events. **Rationale:** - Broadcast-all wastes bandwidth and leaks data to uninterested clients - The `type:id` topic pattern already provides the routing abstraction — topics like `"message.sent:conv-123"` are natural fan-out keys - This is how Redis works internally (channel subscriptions) — we're just implementing the same pattern at the adapter level - Direct messaging falls out naturally: a spoke subscribing to `"direct:${spokeId}"` creates a "room of one" ### ADR: Disconnect Slow Consumers **Decision:** When a connection's `bufferedAmount` exceeds the threshold, close the connection. **Rationale:** - Unbounded buffering causes OOM — the most common production failure mode for WebSocket servers - Silently dropping messages doesn't solve the problem; the slow client keeps receiving new events - Disconnecting is honest: the client can reconnect and re-subscribe - This matches the production-proven behavior of uWebSockets.js and Redis's `client-output-buffer-limit` ## Test Plan 1. **Topic-based fan-out** — dispatchEvent sends only to connections subscribed to that event type 2. **Subscription protocol** — `__subscribe`/`__unsubscribe` control events correctly update the subscription map 3. **Incoming aggregation** — messages from any spoke dispatch to local listeners 4. **Connection add/remove** — new connections are tracked, disconnections clean up all subscriptions 5. **Backpressure disconnect** — slow consumers exceeding threshold are disconnected 6. **Backpressure callback** — `onBackpressure` is called before disconnecting 7. **Direct messaging** — events dispatched to `"direct:${spokeId}"` reach only the target spoke 8. **Mixed topology** — server adapter and client adapters can communicate bidirectionally