Resolve WebSocket event target open questions, add subscription control protocol
- Resolve OQ1: WS server accepts raw WebSocket instances via addConnection/removeConnection (framework-agnostic, not coupled to Hono/Express/Bun/Deno) - Resolve OQ2: Backpressure handled by disconnecting slow consumers at configurable threshold (default 1MB), with onBackpressure callback for observability - Resolve OQ3: Topic-based fan-out with subscription tracking instead of broadcast-all; spokes send __subscribe/__unsubscribe control events; direct messaging via 'direct:' topic pattern Add ADR-003 for subscription control protocol decision. Update all fan-out adapters (WS server, Iroh hub) and spoke adapters (WS client, Iroh spoke) with subscription tracking/forwarding. Fix routing key ambiguity (full topic string, not event type alone). Add error handling, composition, and reserved type sections. Clarify Worker as symmetric-only.
This commit is contained in:
@@ -1,86 +1,232 @@
|
||||
---
|
||||
status: draft
|
||||
last_updated: 2026-05-07
|
||||
last_updated: 2026-05-08
|
||||
---
|
||||
|
||||
# WebSocket Server Event Target
|
||||
|
||||
**Import**: `@alkdev/pubsub/event-target-websocket-server`
|
||||
**Peer dep**: none (WebSocket is a web standard, but a server framework like Hono may be needed for upgrade handling)
|
||||
**Peer dep**: none (WebSocket is a web standard)
|
||||
**Status**: Not yet implemented.
|
||||
|
||||
Manages multiple WebSocket connections for the server (hub) side. Handles fan-out: `dispatchEvent` sends to all connected spokes; `addEventListener` aggregates subscriptions across all connections.
|
||||
Manages multiple WebSocket connections for the server (hub) side. Handles topic-based fan-out: `dispatchEvent` sends to connections subscribed to that topic; `addEventListener` aggregates subscriptions across all connections.
|
||||
|
||||
## `createWebSocketServerEventTarget`
|
||||
|
||||
```ts
|
||||
interface WebSocketServerEventTarget<TEvent extends TypedEvent> extends TypedEventTarget<TEvent> {
|
||||
addConnection(ws: WebSocket): void;
|
||||
removeConnection(ws: WebSocket): void;
|
||||
}
|
||||
|
||||
function createWebSocketServerEventTarget<TEvent extends TypedEvent>(
|
||||
options: CreateWebSocketServerEventTargetArgs,
|
||||
): TypedEventTarget<TEvent>;
|
||||
): WebSocketServerEventTarget<TEvent>;
|
||||
```
|
||||
|
||||
### `CreateWebSocketServerEventTargetArgs`
|
||||
|
||||
| Field | Type | Required | Description |
|
||||
|-------|------|----------|-------------|
|
||||
| `onConnection` | callback | No | Called when a new spoke connects. Receives the spoke's event target for per-connection customization. |
|
||||
| `onDisconnection` | callback | No | Called when a spoke disconnects. Receives the spoke's event target for cleanup. |
|
||||
| `onConnection` | `(spoke: TypedEventTarget<TEvent>, ws: WebSocket) => void` | No | Called when a new spoke connects. Receives the spoke's per-connection event target and the raw `WebSocket`. |
|
||||
| `onDisconnection` | `(spoke: TypedEventTarget<TEvent>, ws: WebSocket) => void` | No | Called when a spoke disconnects. Receives the spoke's event target and `WebSocket` for cleanup. |
|
||||
| `maxBufferedAmount` | `number` | No | Per-connection backpressure threshold in bytes. Default: `1_048_576` (1 MB). When a connection's `bufferedAmount` exceeds this, the connection is closed with code `1013` (Try Again Later). |
|
||||
| `onBackpressure` | `(ws: WebSocket, bufferedAmount: number) => void` | No | Called for observability when a connection exceeds `maxBufferedAmount`, before the connection is closed. Cannot prevent the disconnect. Useful for logging and metrics. |
|
||||
|
||||
## How It Works
|
||||
|
||||
Unlike the client adapter, the server adapter manages a `Set<WebSocket>` of active connections:
|
||||
The server adapter manages a `Set<WebSocket>` of active connections and a `Map<string, Set<WebSocket>>` (the `subscriptions` map) tracking which connections are subscribed to which topic strings.
|
||||
|
||||
- `dispatchEvent` → iterates all connected WebSockets, sends JSON envelope to each
|
||||
- `addEventListener` → registers local listeners. The server doesn't subscribe to individual spokes — it listens for events from any spoke
|
||||
- `dispatchEvent` → looks up connections subscribed to the event type, sends JSON envelope to each
|
||||
- `addEventListener` → registers local listeners. The server listens for events from any spoke
|
||||
- `removeEventListener` → removes local listeners
|
||||
|
||||
### Connection Lifecycle
|
||||
|
||||
The caller handles the HTTP upgrade (framework-specific) and passes connected `WebSocket` instances to the adapter:
|
||||
|
||||
```ts
|
||||
import { createWebSocketServerEventTarget } from "@alkdev/pubsub/event-target-websocket-server";
|
||||
|
||||
const serverTarget = createWebSocketServerEventTarget({});
|
||||
|
||||
// Hono example — the adapter doesn't know about Hono
|
||||
app.get("/ws", (c) => {
|
||||
return c.upgrade(async (ws) => {
|
||||
serverTarget.addConnection(ws);
|
||||
ws.addEventListener("close", () => {
|
||||
serverTarget.removeConnection(ws);
|
||||
});
|
||||
});
|
||||
});
|
||||
```
|
||||
|
||||
The adapter only handles raw `WebSocket` instances. It does not depend on any server framework (Hono, Express, Bun, Deno, etc.). The caller is responsible for:
|
||||
- HTTP upgrade
|
||||
- Passing connected `WebSocket`s to `addConnection`
|
||||
- Removing connections on close via `removeConnection`
|
||||
|
||||
### `addConnection` / `removeConnection`
|
||||
|
||||
```ts
|
||||
addConnection(ws: WebSocket): void
|
||||
removeConnection(ws: WebSocket): void
|
||||
```
|
||||
|
||||
The server adapter exposes these methods on the returned `WebSocketServerEventTarget` for the caller to register and unregister connections. When a connection is added, the adapter sets up `onmessage` and `onclose` handlers. When removed, it cleans up all topic subscriptions for that connection.
|
||||
|
||||
`removeConnection` cleans up internal state (subscription maps, event handlers) but does **not** close the `WebSocket`. The caller is responsible for closing the connection if needed. Typically it's called from a `close` event handler, where the connection is already closing.
|
||||
|
||||
### Error Handling
|
||||
|
||||
- **Malformed JSON from a spoke** → the message is silently ignored. The adapter logs a warning (via `console.warn` or a configurable logger) and continues processing other messages from that connection. The connection is not closed — a single malformed message should not disconnect a client.
|
||||
- **Duplicate `__subscribe`** → idempotent. Adding a connection to a topic set it's already in is a no-op. The `Set<WebSocket>` data structure handles this naturally.
|
||||
- **Invalid topic format in control events** → silently ignored. An empty topic string or malformed topic is logged and discarded.
|
||||
- **Send failure** → if `ws.send()` throws (connection died between the `bufferedAmount` check and the send), the adapter catches the error, removes the connection from the subscription maps, and fires `onDisconnection`.
|
||||
- **`onclose` from client** → the adapter removes the connection from all subscription maps and fires `onDisconnection`.
|
||||
|
||||
### Concurrency Model
|
||||
|
||||
This adapter assumes a single-threaded event loop (Node.js, Bun, Deno, browsers). In environments with worker threads, the caller must ensure `addConnection`/`removeConnection` and `dispatchEvent` are not called concurrently.
|
||||
|
||||
### Subscription Tracking
|
||||
|
||||
The server adapter maintains a `subscriptions` map (`Map<string, Set<WebSocket>>`) from topic string to subscribed connections. Spokes subscribe to topics they're interested in, and `dispatchEvent` only sends to connections that have subscribed to that topic.
|
||||
|
||||
**Why subscription tracking?** Without it, `dispatchEvent` would send every event to all connected spokes, regardless of whether they care about that topic. This wastes bandwidth and — worse — leaks data to clients that shouldn't receive it (e.g., a chat room message sent to clients not in that room).
|
||||
|
||||
**Topic scoping alignment:** The `type:id` topic pattern already used by `createPubSub` (e.g., `"message.sent:conv-123"`) is the routing key. A spoke subscribes to `"message.sent:conv-123"`, and the server only sends events for that topic to that spoke. This is the same pattern Redis uses for channel subscriptions — it's just implemented at the adapter level instead of delegated to an external broker.
|
||||
|
||||
**Direct messaging:** A spoke subscribing to `"direct:${spokeId}"` effectively creates a "room of one." The server can target that specific spoke by dispatching an event with that topic type. No special API needed — topic scoping handles it.
|
||||
|
||||
### Control Protocol
|
||||
|
||||
Spokes communicate subscription changes to the hub using control events in the `EventEnvelope` format with reserved `__`-prefixed types:
|
||||
|
||||
```json
|
||||
{ "type": "__subscribe", "id": "", "payload": { "topic": "message.sent:conv-123" } }
|
||||
{ "type": "__unsubscribe", "id": "", "payload": { "topic": "message.sent:conv-123" } }
|
||||
```
|
||||
|
||||
Convention: event types starting with `__` are reserved control messages. They are not dispatched to local listeners — they are handled internally by the adapter to update the subscription map.
|
||||
|
||||
When a spoke's `addEventListener` is called, the client adapter sends a `__subscribe` control event to the server. When `removeEventListener` is called and no listeners remain for that topic, the client adapter sends an `__unsubscribe` control event. See [WebSocket Client Event Target](websocket-client.md) for the client-side behavior.
|
||||
|
||||
### Incoming Messages
|
||||
|
||||
Each connected spoke sends JSON envelopes. The server listens on `ws.onmessage` for each connection:
|
||||
|
||||
```ts
|
||||
for (const ws of this.connections) {
|
||||
ws.onmessage = (msg) => {
|
||||
const envelope = JSON.parse(msg.data);
|
||||
const topic = `${envelope.type}:${envelope.id}`;
|
||||
const event = new CustomEvent(topic, { detail: envelope });
|
||||
this.dispatchEvent(event); // dispatches to local listeners
|
||||
};
|
||||
}
|
||||
ws.onmessage = (msg) => {
|
||||
const envelope = JSON.parse(msg.data);
|
||||
|
||||
// Control protocol
|
||||
if (envelope.type === "__subscribe") {
|
||||
addConnectionToTopic(ws, envelope.payload.topic);
|
||||
return;
|
||||
}
|
||||
if (envelope.type === "__unsubscribe") {
|
||||
removeConnectionFromTopic(ws, envelope.payload.topic);
|
||||
return;
|
||||
}
|
||||
|
||||
// Regular event — dispatch to local listeners
|
||||
const topic = `${envelope.type}:${envelope.id}`;
|
||||
const event = new CustomEvent(topic, { detail: envelope });
|
||||
this.dispatchEvent(event);
|
||||
};
|
||||
```
|
||||
|
||||
### Outgoing Messages (Fan-out)
|
||||
### Outgoing Messages (Topic-Based Fan-out)
|
||||
|
||||
```ts
|
||||
dispatchEvent(event) {
|
||||
// event.type is the full topic string, e.g. "message.sent:conv-123"
|
||||
// This matches the topics that spokes subscribe to via __subscribe
|
||||
const message = JSON.stringify(event.detail);
|
||||
for (const ws of this.connections) {
|
||||
ws.send(message);
|
||||
|
||||
// Send only to connections subscribed to this topic
|
||||
const subscribers = this.subscriptions.get(event.type);
|
||||
if (subscribers) {
|
||||
for (const ws of subscribers) {
|
||||
sendWithBackpression(ws, message);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
```
|
||||
|
||||
The routing key for fan-out is the full `CustomEvent.type` string (e.g., `"message.sent:conv-123"`), which matches the topic strings that spokes subscribe to via `__subscribe`. This is the same `type:id` pattern used by `createPubSub`.
|
||||
|
||||
**Local listeners:** `dispatchEvent` also delivers to local listeners registered via `addEventListener` on the server itself, via the standard `EventTarget.prototype.dispatchEvent` mechanism. Local listeners use the same `type:id` topic strings.
|
||||
|
||||
### Backpressure
|
||||
|
||||
WebSocket `send()` never blocks — it silently buffers until memory is exhausted. Unbounded buffering is the primary cause of OOM in production WebSocket servers. This adapter handles backpressure with a configurable threshold policy:
|
||||
|
||||
**Default policy: disconnect slow consumers**
|
||||
1. Before each `ws.send()`, check `ws.bufferedAmount`
|
||||
2. If `bufferedAmount > maxBufferedAmount` (default 1 MB), close the connection with code `1013` (Try Again Later) — the current event is **not** sent
|
||||
3. Call `onBackpressure` callback (if provided) before closing, for observability (logging, metrics). The connection is always closed after the callback runs; the callback cannot prevent the disconnect.
|
||||
|
||||
**Why disconnect, not drop silently:** A slow consumer that's still subscribed will continue receiving events. Silently dropping doesn't solve the underlying problem — it just delays it. Disconnecting is honest and gives the client a chance to reconnect.
|
||||
|
||||
**Why 1 MB default:** Enough headroom for brief network hiccups (a few hundred messages), but low enough to prevent runaway memory growth. This matches the production-tested defaults in uWebSockets.js and is far below Redis's 8 MB soft limit.
|
||||
|
||||
**`bufferedAmount` caveats:** In Node.js `ws`, `bufferedAmount` is updated asynchronously and may not reflect the exact current state. This is acceptable for threshold-based backpressure — the check is conservative, not precise.
|
||||
|
||||
## Per-Connection Spoke Targets
|
||||
|
||||
The server adapter creates a `WebSocketClientEventTarget` for each incoming connection. This allows the hub to target specific spokes if needed (e.g., responding to a specific request).
|
||||
The server adapter does **not** create `WebSocketClientEventTarget` instances for each connection. The per-connection `TypedEventTarget` available in the `onConnection` callback is a minimal facade that:
|
||||
- Provides `addEventListener`/`removeEventListener` that listens only for events **received from that specific spoke** — not events from other spokes or from the server's own `dispatchEvent`
|
||||
- Dispatches events from that spoke to the server's local listeners
|
||||
|
||||
Direct messaging to a specific spoke is achieved through topic scoping: `"direct:${spokeId}"`. The spoke subscribes to that topic; the hub dispatches to it.
|
||||
|
||||
## Key Properties
|
||||
|
||||
- **Fan-out** — dispatchEvent sends to all connected spokes
|
||||
- **Topic-based fan-out** — dispatchEvent sends only to connections subscribed to the event type, not all connections
|
||||
- **Aggregate subscription** — addEventListener listens for events from any spoke
|
||||
- **Connection lifecycle** — manages add/remove of WebSocket connections
|
||||
- **Connection lifecycle** — `addConnection`/`removeConnection` for the caller to register/unregister WebSocket instances
|
||||
- **Backpressure protection** — configurable threshold with disconnect policy
|
||||
- **No native deps** — works with any WebSocket server (Node ws, Bun, Deno, Hono)
|
||||
- **Framework-agnostic** — takes raw `WebSocket` instances, doesn't handle HTTP upgrade
|
||||
|
||||
## Open Questions
|
||||
## Design Decisions
|
||||
|
||||
1. **Server framework coupling** — Should this adapter take a raw `WebSocketServer` or just handle `WebSocket` instances? Raw `WebSocket` instances keeps it framework-agnostic. The caller (hub code) handles the HTTP upgrade and passes connected `WebSocket`s to the adapter.
|
||||
2. **Backpressure** — What happens when `ws.send()` blocks or buffers? Should there be a max buffer size per connection?
|
||||
3. **Selective fan-out** — Should `dispatchEvent` always send to all connections, or should there be a way to target a specific spoke?
|
||||
### ADR: Framework-Agnostic Raw WebSocket Interface
|
||||
|
||||
**Decision:** Accept raw `WebSocket` instances via `addConnection`/`removeConnection`, not a `WebSocketServer`.
|
||||
|
||||
**Rationale:** The adapter should work with any server framework. Hono, Bun, Deno, Node `ws`, and Cloudflare Workers all produce `WebSocket`-like objects but have incompatible server APIs. By accepting raw `WebSocket` instances, the caller handles the framework-specific HTTP upgrade and passes connected sockets to the adapter.
|
||||
|
||||
### ADR: Topic-Based Fan-out with Subscription Tracking
|
||||
|
||||
**Decision:** `dispatchEvent` sends only to connections subscribed to the event type, not all connections. Spokes declare subscriptions via `__subscribe`/`__unsubscribe` control events.
|
||||
|
||||
**Rationale:**
|
||||
- Broadcast-all wastes bandwidth and leaks data to uninterested clients
|
||||
- The `type:id` topic pattern already provides the routing abstraction — topics like `"message.sent:conv-123"` are natural fan-out keys
|
||||
- This is how Redis works internally (channel subscriptions) — we're just implementing the same pattern at the adapter level
|
||||
- Direct messaging falls out naturally: a spoke subscribing to `"direct:${spokeId}"` creates a "room of one"
|
||||
|
||||
### ADR: Disconnect Slow Consumers
|
||||
|
||||
**Decision:** When a connection's `bufferedAmount` exceeds the threshold, close the connection.
|
||||
|
||||
**Rationale:**
|
||||
- Unbounded buffering causes OOM — the most common production failure mode for WebSocket servers
|
||||
- Silently dropping messages doesn't solve the problem; the slow client keeps receiving new events
|
||||
- Disconnecting is honest: the client can reconnect and re-subscribe
|
||||
- This matches the production-proven behavior of uWebSockets.js and Redis's `client-output-buffer-limit`
|
||||
|
||||
## Test Plan
|
||||
|
||||
1. **Fan-out** — dispatchEvent sends to all connected WebSockets
|
||||
2. **Incoming aggregation** — messages from any spoke dispatch to local listeners
|
||||
3. **Connection add/remove** — new connections are tracked, disconnections are cleaned up
|
||||
4. **Mixed topology** — server adapter and client adapters can communicate bidirectionally
|
||||
1. **Topic-based fan-out** — dispatchEvent sends only to connections subscribed to that event type
|
||||
2. **Subscription protocol** — `__subscribe`/`__unsubscribe` control events correctly update the subscription map
|
||||
3. **Incoming aggregation** — messages from any spoke dispatch to local listeners
|
||||
4. **Connection add/remove** — new connections are tracked, disconnections clean up all subscriptions
|
||||
5. **Backpressure disconnect** — slow consumers exceeding threshold are disconnected
|
||||
6. **Backpressure callback** — `onBackpressure` is called before disconnecting
|
||||
7. **Direct messaging** — events dispatched to `"direct:${spokeId}"` reach only the target spoke
|
||||
8. **Mixed topology** — server adapter and client adapters can communicate bidirectionally
|
||||
Reference in New Issue
Block a user