Files
pubsub/docs/architecture/event-targets/websocket-server.md
glm-5.1 be7fe67145 Fix critical publish() bug, address review findings
CRITICAL: createPubSub.publish() was dispatching CustomEvent with
just the event type (e.g. 'call.responded') instead of the composite
topic string ('call.responded:uuid-123'). This broke all adapters
that rely on topic-scoped dispatch — Redis subscribe/publish
channels didn't match, and WS server fan-out routing would fail.
Fixed to dispatch with the full type:id composite.

Other fixes:
- Add __ prefix runtime guard in publish() (reserved for control)
- Add Redis barrel re-export to src/index.ts (ADR-002 compliance)
- Clarify WS server: adapter's onclose calls removeConnection
  internally; user doesn't need to
- WS client: document null callback no-op, removeEventListener
  edge cases (unregistered callback, null callback)
- WS server: document dispatchEvent always returns true
- Redis spec: document in-flight message edge case after unsubscribe
- Worker adapter: rename createMainThreadEventTarget to
  createWorkerThreadEventTarget, createWorkerEventTarget to
  createWorkerHostEventTarget (fix inverted naming)
- api-surface.md: add PubSub.publish() section documenting the
  type:id composite and __ guard
2026-05-08 05:17:43 +00:00

15 KiB

status, last_updated
status last_updated
draft 2026-05-08

WebSocket Server Event Target

Import: @alkdev/pubsub/event-target-websocket-server Peer dep: none (WebSocket is a web standard) Status: Not yet implemented.

Manages multiple WebSocket connections for the server (hub) side. Handles topic-based fan-out: dispatchEvent sends to connections subscribed to that topic; addEventListener aggregates subscriptions across all connections.

createWebSocketServerEventTarget

interface WebSocketServerEventTarget<TEvent extends TypedEvent> extends TypedEventTarget<TEvent> {
  addConnection(ws: WebSocket): void;
  removeConnection(ws: WebSocket): void;
}

function createWebSocketServerEventTarget<TEvent extends TypedEvent>(
  options: CreateWebSocketServerEventTargetArgs,
): WebSocketServerEventTarget<TEvent>;

CreateWebSocketServerEventTargetArgs

Field Type Required Description
onConnection (spoke: TypedEventTarget<TEvent>, ws: WebSocket) => void No Called when a new spoke connects. Receives the spoke's per-connection event target and the raw WebSocket.
onDisconnection (spoke: TypedEventTarget<TEvent>, ws: WebSocket) => void No Called when a spoke disconnects. Receives the spoke's event target and WebSocket for cleanup.
maxBufferedAmount number No Per-connection backpressure threshold in bytes. Default: 1_048_576 (1 MB). When a connection's bufferedAmount exceeds this, the connection is closed with code 1013 (Try Again Later).
onBackpressure (ws: WebSocket, bufferedAmount: number) => void No Called for observability when a connection exceeds maxBufferedAmount, before the connection is closed. Cannot prevent the disconnect. Useful for logging and metrics.

How It Works

The server adapter manages a Set<WebSocket> of active connections and a Map<string, Set<WebSocket>> (the subscriptions map) tracking which connections are subscribed to which topic strings.

  • dispatchEvent → looks up connections subscribed to the event type, sends JSON envelope to each
  • addEventListener → registers local listeners. The server listens for events from any spoke
  • removeEventListener → removes local listeners

Connection Lifecycle

The caller handles the HTTP upgrade (framework-specific) and passes connected WebSocket instances to the adapter:

import { createWebSocketServerEventTarget } from "@alkdev/pubsub/event-target-websocket-server";

const serverTarget = createWebSocketServerEventTarget({});

// Hono example — the adapter doesn't know about Hono
app.get("/ws", (c) => {
  return c.upgrade(async (ws) => {
    serverTarget.addConnection(ws);
    // removeConnection is called automatically by the adapter's onclose handler.
    // Only call removeConnection manually for forced disconnections (backpressure, auth failures).
  });
});

The adapter only handles raw WebSocket instances. It does not depend on any server framework (Hono, Express, Bun, Deno, etc.). The caller is responsible for:

  • HTTP upgrade
  • Passing connected WebSockets to addConnection
  • Removing connections on close via removeConnection

addConnection / removeConnection

addConnection(ws: WebSocket): void
removeConnection(ws: WebSocket): void

The server adapter exposes these methods on the returned WebSocketServerEventTarget for the caller to register and unregister connections. When a connection is added, the adapter sets up onmessage and onclose handlers. When removed, it cleans up all topic subscriptions for that connection.

removeConnection cleans up internal state (subscription maps, event handlers) but does not close the WebSocket. The caller is responsible for closing the connection if needed. Typically it's called from a close event handler, where the connection is already closing.

Note: The adapter's onclose handler (set up by addConnection) calls removeConnection internally. This means the caller does not need to call removeConnection from their own close handler — the adapter handles cleanup automatically when a connection closes. The removeConnection method is exposed for cases where the caller needs to manually disconnect a connection (e.g., for backpressure or authentication failures).

Error Handling

  • Malformed JSON from a spoke → the message is silently ignored. The adapter logs a warning (via console.warn or a configurable logger) and continues processing other messages from that connection. The connection is not closed — a single malformed message should not disconnect a client.

  • Duplicate __subscribe → idempotent. Adding a connection to a topic set it's already in is a no-op. The Set<WebSocket> data structure handles this naturally.

  • Invalid topic format in control events → silently ignored. An empty topic string or malformed topic is logged and discarded.

  • Send failure → if ws.send() throws (connection died between the bufferedAmount check and the send), the adapter catches the error, removes the connection from the subscription maps, and fires onDisconnection.

  • onclose from client → the adapter removes the connection from all subscription maps and fires onDisconnection.

  • dispatchEvent return value — always returns true, regardless of subscriber count or send failures. Errors are handled via side effects (connection removal, onDisconnection callback), not via the return value. This matches the EventTarget contract where return false means preventDefault was called, not "send failed."

Concurrency Model

This adapter assumes a single-threaded event loop (Node.js, Bun, Deno, browsers). In environments with worker threads, the caller must ensure addConnection/removeConnection and dispatchEvent are not called concurrently.

Subscription Tracking

The server adapter maintains a subscriptions map (Map<string, Set<WebSocket>>) from topic string to subscribed connections. Spokes subscribe to topics they're interested in, and dispatchEvent only sends to connections that have subscribed to that topic.

Why subscription tracking? Without it, dispatchEvent would send every event to all connected spokes, regardless of whether they care about that topic. This wastes bandwidth and — worse — leaks data to clients that shouldn't receive it (e.g., a chat room message sent to clients not in that room).

Topic scoping alignment: The type:id topic pattern already used by createPubSub (e.g., "message.sent:conv-123") is the routing key. A spoke subscribes to "message.sent:conv-123", and the server only sends events for that topic to that spoke. This is the same pattern Redis uses for channel subscriptions — it's just implemented at the adapter level instead of delegated to an external broker.

Direct messaging: A spoke subscribing to "direct:${spokeId}" effectively creates a "room of one." The server can target that specific spoke by dispatching an event with that topic type. No special API needed — topic scoping handles it.

Control Protocol

Spokes communicate subscription changes to the hub using control events in the EventEnvelope format with reserved __-prefixed types:

{ "type": "__subscribe", "id": "", "payload": { "topic": "message.sent:conv-123" } }
{ "type": "__unsubscribe", "id": "", "payload": { "topic": "message.sent:conv-123" } }

Convention: event types starting with __ are reserved control messages. They are not dispatched to local listeners — they are handled internally by the adapter to update the subscription map.

When a spoke's addEventListener is called, the client adapter sends a __subscribe control event to the server. When removeEventListener is called and no listeners remain for that topic, the client adapter sends an __unsubscribe control event. See WebSocket Client Event Target for the client-side behavior.

Incoming Messages

Each connected spoke sends JSON envelopes. The server listens on ws.onmessage for each connection:

ws.onmessage = (msg) => {
  const envelope = JSON.parse(msg.data);

  // Control protocol
  if (envelope.type === "__subscribe") {
    addConnectionToTopic(ws, envelope.payload.topic);
    return;
  }
  if (envelope.type === "__unsubscribe") {
    removeConnectionFromTopic(ws, envelope.payload.topic);
    return;
  }

  // Regular event — dispatch to local listeners
  const topic = `${envelope.type}:${envelope.id}`;
  const event = new CustomEvent(topic, { detail: envelope });
  this.dispatchEvent(event);
};

Outgoing Messages (Topic-Based Fan-out)

dispatchEvent(event) {
  // event.type is the full topic string, e.g. "message.sent:conv-123"
  // This matches the topics that spokes subscribe to via __subscribe
  const message = JSON.stringify(event.detail);

  // Send only to connections subscribed to this topic
  const subscribers = this.subscriptions.get(event.type);
  if (subscribers) {
    for (const ws of subscribers) {
      sendWithBackpression(ws, message);
    }
  }
  return true;
}

The routing key for fan-out is the full CustomEvent.type string (e.g., "message.sent:conv-123"), which matches the topic strings that spokes subscribe to via __subscribe. This is the same type:id pattern used by createPubSub.

Local listeners: dispatchEvent also delivers to local listeners registered via addEventListener on the server itself, via the standard EventTarget.prototype.dispatchEvent mechanism. Local listeners use the same type:id topic strings.

Backpressure

WebSocket send() never blocks — it silently buffers until memory is exhausted. Unbounded buffering is the primary cause of OOM in production WebSocket servers. This adapter handles backpressure with a configurable threshold policy:

Default policy: disconnect slow consumers

  1. Before each ws.send(), check ws.bufferedAmount
  2. If bufferedAmount > maxBufferedAmount (default 1 MB), close the connection with code 1013 (Try Again Later) — the current event is not sent
  3. Call onBackpressure callback (if provided) before closing, for observability (logging, metrics). The connection is always closed after the callback runs; the callback cannot prevent the disconnect.

Why disconnect, not drop silently: A slow consumer that's still subscribed will continue receiving events. Silently dropping doesn't solve the underlying problem — it just delays it. Disconnecting is honest and gives the client a chance to reconnect.

Why 1 MB default: Enough headroom for brief network hiccups (a few hundred messages), but low enough to prevent runaway memory growth. This matches the production-tested defaults in uWebSockets.js and is far below Redis's 8 MB soft limit.

bufferedAmount caveats: In Node.js ws, bufferedAmount is updated asynchronously and may not reflect the exact current state. This is acceptable for threshold-based backpressure — the check is conservative, not precise.

Per-Connection Spoke Targets

The server adapter does not create WebSocketClientEventTarget instances for each connection. The per-connection TypedEventTarget available in the onConnection callback is a minimal facade that:

  • Provides addEventListener/removeEventListener that listens only for events received from that specific spoke — not events from other spokes or from the server's own dispatchEvent
  • Dispatches events from that spoke to the server's local listeners

Direct messaging to a specific spoke is achieved through topic scoping: "direct:${spokeId}". The spoke subscribes to that topic; the hub dispatches to it.

Key Properties

  • Topic-based fan-out — dispatchEvent sends only to connections subscribed to the event type, not all connections
  • Aggregate subscription — addEventListener listens for events from any spoke
  • Connection lifecycleaddConnection/removeConnection for the caller to register/unregister WebSocket instances
  • Backpressure protection — configurable threshold with disconnect policy
  • No native deps — works with any WebSocket server (Node ws, Bun, Deno, Hono)
  • Framework-agnostic — takes raw WebSocket instances, doesn't handle HTTP upgrade

Design Decisions

ADR: Framework-Agnostic Raw WebSocket Interface

Decision: Accept raw WebSocket instances via addConnection/removeConnection, not a WebSocketServer.

Rationale: The adapter should work with any server framework. Hono, Bun, Deno, Node ws, and Cloudflare Workers all produce WebSocket-like objects but have incompatible server APIs. By accepting raw WebSocket instances, the caller handles the framework-specific HTTP upgrade and passes connected sockets to the adapter.

ADR: Topic-Based Fan-out with Subscription Tracking

Decision: dispatchEvent sends only to connections subscribed to the event type, not all connections. Spokes declare subscriptions via __subscribe/__unsubscribe control events.

Rationale:

  • Broadcast-all wastes bandwidth and leaks data to uninterested clients
  • The type:id topic pattern already provides the routing abstraction — topics like "message.sent:conv-123" are natural fan-out keys
  • This is how Redis works internally (channel subscriptions) — we're just implementing the same pattern at the adapter level
  • Direct messaging falls out naturally: a spoke subscribing to "direct:${spokeId}" creates a "room of one"

ADR: Disconnect Slow Consumers

Decision: When a connection's bufferedAmount exceeds the threshold, close the connection.

Rationale:

  • Unbounded buffering causes OOM — the most common production failure mode for WebSocket servers
  • Silently dropping messages doesn't solve the problem; the slow client keeps receiving new events
  • Disconnecting is honest: the client can reconnect and re-subscribe
  • This matches the production-proven behavior of uWebSockets.js and Redis's client-output-buffer-limit

Test Plan

  1. Topic-based fan-out — dispatchEvent sends only to connections subscribed to that event type
  2. Subscription protocol__subscribe/__unsubscribe control events correctly update the subscription map
  3. Incoming aggregation — messages from any spoke dispatch to local listeners
  4. Connection add/remove — new connections are tracked, disconnections clean up all subscriptions
  5. Backpressure disconnect — slow consumers exceeding threshold are disconnected
  6. Backpressure callbackonBackpressure is called before disconnecting
  7. Direct messaging — events dispatched to "direct:${spokeId}" reach only the target spoke
  8. Mixed topology — server adapter and client adapters can communicate bidirectionally