Files
pubsub/docs/architecture/event-targets/websocket-server.md
glm-5.1 be7fe67145 Fix critical publish() bug, address review findings
CRITICAL: createPubSub.publish() was dispatching CustomEvent with
just the event type (e.g. 'call.responded') instead of the composite
topic string ('call.responded:uuid-123'). This broke all adapters
that rely on topic-scoped dispatch — Redis subscribe/publish
channels didn't match, and WS server fan-out routing would fail.
Fixed to dispatch with the full type:id composite.

Other fixes:
- Add __ prefix runtime guard in publish() (reserved for control)
- Add Redis barrel re-export to src/index.ts (ADR-002 compliance)
- Clarify WS server: adapter's onclose calls removeConnection
  internally; user doesn't need to
- WS client: document null callback no-op, removeEventListener
  edge cases (unregistered callback, null callback)
- WS server: document dispatchEvent always returns true
- Redis spec: document in-flight message edge case after unsubscribe
- Worker adapter: rename createMainThreadEventTarget to
  createWorkerThreadEventTarget, createWorkerEventTarget to
  createWorkerHostEventTarget (fix inverted naming)
- api-surface.md: add PubSub.publish() section documenting the
  type:id composite and __ guard
2026-05-08 05:17:43 +00:00

235 lines
15 KiB
Markdown

---
status: draft
last_updated: 2026-05-08
---
# WebSocket Server Event Target
**Import**: `@alkdev/pubsub/event-target-websocket-server`
**Peer dep**: none (WebSocket is a web standard)
**Status**: Not yet implemented.
Manages multiple WebSocket connections for the server (hub) side. Handles topic-based fan-out: `dispatchEvent` sends to connections subscribed to that topic; `addEventListener` aggregates subscriptions across all connections.
## `createWebSocketServerEventTarget`
```ts
interface WebSocketServerEventTarget<TEvent extends TypedEvent> extends TypedEventTarget<TEvent> {
addConnection(ws: WebSocket): void;
removeConnection(ws: WebSocket): void;
}
function createWebSocketServerEventTarget<TEvent extends TypedEvent>(
options: CreateWebSocketServerEventTargetArgs,
): WebSocketServerEventTarget<TEvent>;
```
### `CreateWebSocketServerEventTargetArgs`
| Field | Type | Required | Description |
|-------|------|----------|-------------|
| `onConnection` | `(spoke: TypedEventTarget<TEvent>, ws: WebSocket) => void` | No | Called when a new spoke connects. Receives the spoke's per-connection event target and the raw `WebSocket`. |
| `onDisconnection` | `(spoke: TypedEventTarget<TEvent>, ws: WebSocket) => void` | No | Called when a spoke disconnects. Receives the spoke's event target and `WebSocket` for cleanup. |
| `maxBufferedAmount` | `number` | No | Per-connection backpressure threshold in bytes. Default: `1_048_576` (1 MB). When a connection's `bufferedAmount` exceeds this, the connection is closed with code `1013` (Try Again Later). |
| `onBackpressure` | `(ws: WebSocket, bufferedAmount: number) => void` | No | Called for observability when a connection exceeds `maxBufferedAmount`, before the connection is closed. Cannot prevent the disconnect. Useful for logging and metrics. |
## How It Works
The server adapter manages a `Set<WebSocket>` of active connections and a `Map<string, Set<WebSocket>>` (the `subscriptions` map) tracking which connections are subscribed to which topic strings.
- `dispatchEvent` → looks up connections subscribed to the event type, sends JSON envelope to each
- `addEventListener` → registers local listeners. The server listens for events from any spoke
- `removeEventListener` → removes local listeners
### Connection Lifecycle
The caller handles the HTTP upgrade (framework-specific) and passes connected `WebSocket` instances to the adapter:
```ts
import { createWebSocketServerEventTarget } from "@alkdev/pubsub/event-target-websocket-server";
const serverTarget = createWebSocketServerEventTarget({});
// Hono example — the adapter doesn't know about Hono
app.get("/ws", (c) => {
return c.upgrade(async (ws) => {
serverTarget.addConnection(ws);
// removeConnection is called automatically by the adapter's onclose handler.
// Only call removeConnection manually for forced disconnections (backpressure, auth failures).
});
});
```
The adapter only handles raw `WebSocket` instances. It does not depend on any server framework (Hono, Express, Bun, Deno, etc.). The caller is responsible for:
- HTTP upgrade
- Passing connected `WebSocket`s to `addConnection`
- Removing connections on close via `removeConnection`
### `addConnection` / `removeConnection`
```ts
addConnection(ws: WebSocket): void
removeConnection(ws: WebSocket): void
```
The server adapter exposes these methods on the returned `WebSocketServerEventTarget` for the caller to register and unregister connections. When a connection is added, the adapter sets up `onmessage` and `onclose` handlers. When removed, it cleans up all topic subscriptions for that connection.
`removeConnection` cleans up internal state (subscription maps, event handlers) but does **not** close the `WebSocket`. The caller is responsible for closing the connection if needed. Typically it's called from a `close` event handler, where the connection is already closing.
**Note:** The adapter's `onclose` handler (set up by `addConnection`) calls `removeConnection` internally. This means the caller does not need to call `removeConnection` from their own `close` handler — the adapter handles cleanup automatically when a connection closes. The `removeConnection` method is exposed for cases where the caller needs to manually disconnect a connection (e.g., for backpressure or authentication failures).
### Error Handling
- **Malformed JSON from a spoke** → the message is silently ignored. The adapter logs a warning (via `console.warn` or a configurable logger) and continues processing other messages from that connection. The connection is not closed — a single malformed message should not disconnect a client.
- **Duplicate `__subscribe`** → idempotent. Adding a connection to a topic set it's already in is a no-op. The `Set<WebSocket>` data structure handles this naturally.
- **Invalid topic format in control events** → silently ignored. An empty topic string or malformed topic is logged and discarded.
- **Send failure** → if `ws.send()` throws (connection died between the `bufferedAmount` check and the send), the adapter catches the error, removes the connection from the subscription maps, and fires `onDisconnection`.
- **`onclose` from client** → the adapter removes the connection from all subscription maps and fires `onDisconnection`.
- **`dispatchEvent` return value** — always returns `true`, regardless of subscriber count or send failures. Errors are handled via side effects (connection removal, `onDisconnection` callback), not via the return value. This matches the `EventTarget` contract where `return false` means `preventDefault` was called, not "send failed."
### Concurrency Model
This adapter assumes a single-threaded event loop (Node.js, Bun, Deno, browsers). In environments with worker threads, the caller must ensure `addConnection`/`removeConnection` and `dispatchEvent` are not called concurrently.
### Subscription Tracking
The server adapter maintains a `subscriptions` map (`Map<string, Set<WebSocket>>`) from topic string to subscribed connections. Spokes subscribe to topics they're interested in, and `dispatchEvent` only sends to connections that have subscribed to that topic.
**Why subscription tracking?** Without it, `dispatchEvent` would send every event to all connected spokes, regardless of whether they care about that topic. This wastes bandwidth and — worse — leaks data to clients that shouldn't receive it (e.g., a chat room message sent to clients not in that room).
**Topic scoping alignment:** The `type:id` topic pattern already used by `createPubSub` (e.g., `"message.sent:conv-123"`) is the routing key. A spoke subscribes to `"message.sent:conv-123"`, and the server only sends events for that topic to that spoke. This is the same pattern Redis uses for channel subscriptions — it's just implemented at the adapter level instead of delegated to an external broker.
**Direct messaging:** A spoke subscribing to `"direct:${spokeId}"` effectively creates a "room of one." The server can target that specific spoke by dispatching an event with that topic type. No special API needed — topic scoping handles it.
### Control Protocol
Spokes communicate subscription changes to the hub using control events in the `EventEnvelope` format with reserved `__`-prefixed types:
```json
{ "type": "__subscribe", "id": "", "payload": { "topic": "message.sent:conv-123" } }
{ "type": "__unsubscribe", "id": "", "payload": { "topic": "message.sent:conv-123" } }
```
Convention: event types starting with `__` are reserved control messages. They are not dispatched to local listeners — they are handled internally by the adapter to update the subscription map.
When a spoke's `addEventListener` is called, the client adapter sends a `__subscribe` control event to the server. When `removeEventListener` is called and no listeners remain for that topic, the client adapter sends an `__unsubscribe` control event. See [WebSocket Client Event Target](websocket-client.md) for the client-side behavior.
### Incoming Messages
Each connected spoke sends JSON envelopes. The server listens on `ws.onmessage` for each connection:
```ts
ws.onmessage = (msg) => {
const envelope = JSON.parse(msg.data);
// Control protocol
if (envelope.type === "__subscribe") {
addConnectionToTopic(ws, envelope.payload.topic);
return;
}
if (envelope.type === "__unsubscribe") {
removeConnectionFromTopic(ws, envelope.payload.topic);
return;
}
// Regular event — dispatch to local listeners
const topic = `${envelope.type}:${envelope.id}`;
const event = new CustomEvent(topic, { detail: envelope });
this.dispatchEvent(event);
};
```
### Outgoing Messages (Topic-Based Fan-out)
```ts
dispatchEvent(event) {
// event.type is the full topic string, e.g. "message.sent:conv-123"
// This matches the topics that spokes subscribe to via __subscribe
const message = JSON.stringify(event.detail);
// Send only to connections subscribed to this topic
const subscribers = this.subscriptions.get(event.type);
if (subscribers) {
for (const ws of subscribers) {
sendWithBackpression(ws, message);
}
}
return true;
}
```
The routing key for fan-out is the full `CustomEvent.type` string (e.g., `"message.sent:conv-123"`), which matches the topic strings that spokes subscribe to via `__subscribe`. This is the same `type:id` pattern used by `createPubSub`.
**Local listeners:** `dispatchEvent` also delivers to local listeners registered via `addEventListener` on the server itself, via the standard `EventTarget.prototype.dispatchEvent` mechanism. Local listeners use the same `type:id` topic strings.
### Backpressure
WebSocket `send()` never blocks — it silently buffers until memory is exhausted. Unbounded buffering is the primary cause of OOM in production WebSocket servers. This adapter handles backpressure with a configurable threshold policy:
**Default policy: disconnect slow consumers**
1. Before each `ws.send()`, check `ws.bufferedAmount`
2. If `bufferedAmount > maxBufferedAmount` (default 1 MB), close the connection with code `1013` (Try Again Later) — the current event is **not** sent
3. Call `onBackpressure` callback (if provided) before closing, for observability (logging, metrics). The connection is always closed after the callback runs; the callback cannot prevent the disconnect.
**Why disconnect, not drop silently:** A slow consumer that's still subscribed will continue receiving events. Silently dropping doesn't solve the underlying problem — it just delays it. Disconnecting is honest and gives the client a chance to reconnect.
**Why 1 MB default:** Enough headroom for brief network hiccups (a few hundred messages), but low enough to prevent runaway memory growth. This matches the production-tested defaults in uWebSockets.js and is far below Redis's 8 MB soft limit.
**`bufferedAmount` caveats:** In Node.js `ws`, `bufferedAmount` is updated asynchronously and may not reflect the exact current state. This is acceptable for threshold-based backpressure — the check is conservative, not precise.
## Per-Connection Spoke Targets
The server adapter does **not** create `WebSocketClientEventTarget` instances for each connection. The per-connection `TypedEventTarget` available in the `onConnection` callback is a minimal facade that:
- Provides `addEventListener`/`removeEventListener` that listens only for events **received from that specific spoke** — not events from other spokes or from the server's own `dispatchEvent`
- Dispatches events from that spoke to the server's local listeners
Direct messaging to a specific spoke is achieved through topic scoping: `"direct:${spokeId}"`. The spoke subscribes to that topic; the hub dispatches to it.
## Key Properties
- **Topic-based fan-out** — dispatchEvent sends only to connections subscribed to the event type, not all connections
- **Aggregate subscription** — addEventListener listens for events from any spoke
- **Connection lifecycle** — `addConnection`/`removeConnection` for the caller to register/unregister WebSocket instances
- **Backpressure protection** — configurable threshold with disconnect policy
- **No native deps** — works with any WebSocket server (Node ws, Bun, Deno, Hono)
- **Framework-agnostic** — takes raw `WebSocket` instances, doesn't handle HTTP upgrade
## Design Decisions
### ADR: Framework-Agnostic Raw WebSocket Interface
**Decision:** Accept raw `WebSocket` instances via `addConnection`/`removeConnection`, not a `WebSocketServer`.
**Rationale:** The adapter should work with any server framework. Hono, Bun, Deno, Node `ws`, and Cloudflare Workers all produce `WebSocket`-like objects but have incompatible server APIs. By accepting raw `WebSocket` instances, the caller handles the framework-specific HTTP upgrade and passes connected sockets to the adapter.
### ADR: Topic-Based Fan-out with Subscription Tracking
**Decision:** `dispatchEvent` sends only to connections subscribed to the event type, not all connections. Spokes declare subscriptions via `__subscribe`/`__unsubscribe` control events.
**Rationale:**
- Broadcast-all wastes bandwidth and leaks data to uninterested clients
- The `type:id` topic pattern already provides the routing abstraction — topics like `"message.sent:conv-123"` are natural fan-out keys
- This is how Redis works internally (channel subscriptions) — we're just implementing the same pattern at the adapter level
- Direct messaging falls out naturally: a spoke subscribing to `"direct:${spokeId}"` creates a "room of one"
### ADR: Disconnect Slow Consumers
**Decision:** When a connection's `bufferedAmount` exceeds the threshold, close the connection.
**Rationale:**
- Unbounded buffering causes OOM — the most common production failure mode for WebSocket servers
- Silently dropping messages doesn't solve the problem; the slow client keeps receiving new events
- Disconnecting is honest: the client can reconnect and re-subscribe
- This matches the production-proven behavior of uWebSockets.js and Redis's `client-output-buffer-limit`
## Test Plan
1. **Topic-based fan-out** — dispatchEvent sends only to connections subscribed to that event type
2. **Subscription protocol**`__subscribe`/`__unsubscribe` control events correctly update the subscription map
3. **Incoming aggregation** — messages from any spoke dispatch to local listeners
4. **Connection add/remove** — new connections are tracked, disconnections clean up all subscriptions
5. **Backpressure disconnect** — slow consumers exceeding threshold are disconnected
6. **Backpressure callback**`onBackpressure` is called before disconnecting
7. **Direct messaging** — events dispatched to `"direct:${spokeId}"` reach only the target spoke
8. **Mixed topology** — server adapter and client adapters can communicate bidirectionally