Files
pubsub/tasks/008-websocket-server-adapter.md
glm-5.1 1306716897 Decompose architecture into atomic, dependency-ordered tasks
19 tasks covering core testing, Redis hardening, WebSocket client/server
adapters, Worker adapter, and final review gates. Iroh adapters are tracked
as a deferred placeholder blocked on the @alkdev/iroh fork.

Phases: core validation → Redis hardening → review gate → WebSocket
adapters → review gate → Worker adapter → review gate → final validation.
2026-05-08 05:50:43 +00:00

69 lines
3.5 KiB
Markdown

---
id: websocket-server-adapter
name: Implement WebSocket server event target adapter
status: pending
depends_on: [websocket-client-adapter]
scope: broad
risk: medium
impact: component
level: implementation
---
## Description
Implement the `createWebSocketServerEventTarget` adapter as specified in `docs/architecture/event-targets/websocket-server.md`.
This is a fan-out (multi-connection) adapter that manages multiple WebSocket connections for the hub/server side. It must implement topic-based fan-out with subscription tracking using `__subscribe`/`__unsubscribe` control events.
Key requirements from the architecture:
- Factory returns `WebSocketServerEventTarget` which extends `TypedEventTarget` with `addConnection(ws)` and `removeConnection(ws)`
- `addConnection` sets up `onmessage` and `onclose` handlers on the WebSocket
- `removeConnection` cleans up subscription maps and event handlers (but does NOT close the WebSocket)
- `dispatchEvent` sends to only connections subscribed to the event type (topic-based fan-out)
- `addEventListener` registers local listeners (aggregate from all spokes)
- Subscription tracking: `Map<string, Set<WebSocket>>` from topic to subscribed connections
- Control protocol: `__subscribe`/`__unsubscribe` messages from spokes update subscription map
- Backpressure: configurable threshold (default 1MB) — disconnect slow consumers
- `onConnection` and `onDisconnection` callbacks for lifecycle events
- Framework-agnostic: takes raw `WebSocket` instances, doesn't handle HTTP upgrade
- `dispatchEvent` always returns `true` (errors handled via side effects)
## Acceptance Criteria
- [ ] `src/event-target-websocket-server.ts` exists
- [ ] `createWebSocketServerEventTarget(options)` returns `WebSocketServerEventTarget`
- [ ] `WebSocketServerEventTarget` extends `TypedEventTarget` with `addConnection` and `removeConnection`
- [ ] `addConnection(ws)` sets up `onmessage` and `onclose` handlers
- [ ] `removeConnection(ws)` cleans up internal state but does not close the WebSocket
- [ ] `addConnection` `onclose` handler automatically calls `removeConnection`
- [ ] `dispatchEvent` sends only to connections subscribed to the event type
- [ ] `addEventListener` registers local listeners (aggregate events from all spokes)
- [ ] `__subscribe` control events from spokes add connection to topic's subscriber set
- [ ] `__unsubscribe` control events from spokes remove connection from topic's subscriber set
- [ ] Duplicate `__subscribe` is idempotent (Set handles naturally)
- [ ] Invalid topic format in control events is silently ignored
- [ ] Malformed JSON from spokes is silently ignored (logged)
- [ ] Backpressure: before `ws.send()`, check `bufferedAmount` against threshold
- [ ] Backpressure: exceeding threshold closes connection with code 1013
- [ ] `onBackpressure` callback called before disconnecting
- [ ] `onConnection` callback receives spoke event target and raw WebSocket
- [ ] `onDisconnection` callback receives spoke event target and raw WebSocket
- [ ] `dispatchEvent` always returns `true`
- [ ] Send failure (ws.send throws) catches error, removes connection, fires `onDisconnection`
- [ ] No comments in source code (project convention)
- [ ] Sub-path export added to `package.json` and `tsup.config.ts`
- [ ] Barrel re-export added to `src/index.ts`
## References
- docs/architecture/event-targets/websocket-server.md
- docs/architecture/decisions/003-subscription-control-protocol.md
- src/types.ts
## Notes
> To be filled by implementation agent
## Summary
> To be filled on completion