diff --git a/docs/architecture.md b/docs/architecture.md index 5f7ad5f..b0aa702 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -6,7 +6,8 @@ |----------|---------| | [architecture/README.md](architecture/README.md) | Overview, why this exists, interface contract, consumer context | | [architecture/api-surface.md](architecture/api-surface.md) | createPubSub, EventEnvelope, PubSub types, operators | -| [architecture/event-targets.md](architecture/event-targets.md) | In-process, Redis, WebSocket, Worker adapters | +| [architecture/event-targets.md](architecture/event-targets.md) | Adapter index — links to individual adapter specs | +| [architecture/event-targets/](architecture/event-targets/) | Per-adapter docs (in-process, redis, websocket-client, websocket-server, worker, iroh-spoke, iroh-hub) | | [architecture/iroh-transport.md](architecture/iroh-transport.md) | Iroh P2P QUIC transport, framing, identity, hub/spoke | | [architecture/build-distribution.md](architecture/build-distribution.md) | Dependencies, project structure, tree-shaking, sub-path exports | diff --git a/docs/architecture/event-targets.md b/docs/architecture/event-targets.md index 7f973f9..7b337d5 100644 --- a/docs/architecture/event-targets.md +++ b/docs/architecture/event-targets.md @@ -1,11 +1,11 @@ --- status: draft -last_updated: 2026-05-01 +last_updated: 2026-05-07 --- # Event Target Adapters -In-process, Redis, WebSocket, and Worker event targets. All implement `TypedEventTarget`. +All adapters implement the `TypedEventTarget` interface and use the `EventEnvelope` format (`{ type, id, payload }`) as the serialization contract. ## Interface Contract @@ -17,139 +17,23 @@ Every adapter must implement: | `dispatchEvent(event)` | Send/dispatch event. Returns `boolean` (always `true` for non-cancelable events). | | `removeEventListener(type, callback)` | Unregister listener. Clean up underlying subscription when no listeners remain for a topic. | -All adapters use the `EventEnvelope` format (`{ type, id, payload }`) as the serialization contract. Adapters that cross process boundaries (Redis, WebSocket, Iroh) serialize/deserialize the full envelope as JSON. +## Topology Model -## In-Process (Default) +Adapters come in two shapes: -No adapter needed. `createPubSub` uses `new EventTarget()` by default. This works for single-process deployments where all pubsub participants share the same memory. +- **Symmetric** (single connection) — wraps one connection. Same interface on both sides. Examples: Redis, Iroh spoke, WebSocket client, Worker main-thread. +- **Fan-out** (multi-connection) — manages multiple connections. `dispatchEvent` sends to all; `addEventListener` aggregates from all. Examples: WebSocket server, Iroh hub, Worker pool manager. -No explicit `InProcessEventTarget` class — the web standard `EventTarget` already implements the interface. Could be formalized later if a name makes the API clearer, but `new EventTarget()` is already the standard. +The `createPubSub` layer is topology-agnostic. A hub composes multiple adapters and uses operators to combine streams — this is downstream application logic, not a package boundary. -## Redis +## Adapter Docs -**Import**: `@alkdev/pubsub/event-target-redis` -**Peer dep**: `ioredis@^5.0.0` (optional) - -### `createRedisEventTarget` - -```ts -function createRedisEventTarget( - args: CreateRedisEventTargetArgs, -): TypedEventTarget; -``` - -### `CreateRedisEventTargetArgs` - -| Field | Type | Required | Description | -|-------|------|----------|-------------| -| `publishClient` | `Redis \| Cluster` | Yes | ioredis client for publishing. Can share a connection. | -| `subscribeClient` | `Redis \| Cluster` | Yes | ioredis client for subscribing. Must be dedicated — Redis requires subscriber connections to only receive messages. | -| `serializer` | `{ stringify, parse }` | No | Custom serializer. Defaults to `JSON`. | - -### How It Works - -- `dispatchEvent` → `publishClient.publish(event.type, serializer.stringify(event.detail))` -- `addEventListener` → `subscribeClient.subscribe(topic)`, track callbacks per topic -- `removeEventListener` → remove callback; if no callbacks remain for topic, `subscribeClient.unsubscribe(topic)` -- On message: deserializes with `serializer.parse`, reconstructs `CustomEvent(channel, { detail: envelope })` - -The `detail` of the `CustomEvent` dispatched to local listeners is the full `EventEnvelope` object (`{ type, id, payload }`). - -### Channel Naming - -Currently uses the topic string directly as the Redis channel name (e.g., `session.status:proj_123`). Architecture recommends `alk:events:{eventType}` prefix but this is not yet implemented. Should be configurable: `createRedisEventTarget({ ..., prefix: "alk:events:" })`. - -### Limitations (Current) - -- **No error handling** — connection failures, reconnection, and message parse errors are not handled -- **No channel prefix** — raw event types as channel names risk collision in shared Redis instances -- **No unsubscribe cleanup on client disconnect** — if the subscribe client disconnects, registered callbacks remain in the map but will never fire - -### Test Coverage - -No tests yet (test directory is empty). Previous alkhub had 5 Redis tests (publish path only, mocked ioredis). - -## WebSocket - -**Import**: `@alkdev/pubsub/event-target-websocket` (not yet implemented) -**Peer dep**: none (WebSocket is a web standard) - -### Design (Spec from `spoke-runner.md`) - -```ts -class WebSocketEventTarget implements TypedEventTarget { - private listeners = new Map void>>() - - constructor(private ws: WebSocket) { - ws.onmessage = (msg) => { - const envelope = JSON.parse(msg.data as string) // { type, id, payload } - const topic = `${envelope.type}:${envelope.id}` - const event = new CustomEvent(topic, { detail: envelope }) - for (const listener of this.listeners.get(topic) ?? []) { - listener(event) - } - } - } - - dispatchEvent(event: CustomEvent): boolean { - this.ws.send(JSON.stringify(event.detail)) // sends { type, id, payload } - return true - } - - addEventListener(type: string, listener: (event: CustomEvent) => void): void { ... } - removeEventListener(type: string, listener: (event: CustomEvent) => void): void { ... } -} -``` - -### Key Properties - -- **Bidirectional** — `dispatchEvent` sends over WS, `addEventListener` receives from WS -- **Per-connection** — hub creates one per spoke connection -- **JSON framing** — WebSocket provides native message boundaries (no length-prefix needed) -- **No native deps** — works in browsers and Node -- **Envelope serialization** — sends/receives the full `EventEnvelope` JSON (`{ type, id, payload }`) - -### Gap: Reconnection - -WebSocket connections drop. On reconnect, the spoke must re-register with the hub (same `hub.register` flow). The `WebSocketEventTarget` itself is per-connection — a new connection means a new event target instance. Reconnection logic belongs to the spoke lifecycle, not the event target. - -## Worker - -**Import**: `@alkdev/pubsub/event-target-worker` (not yet implemented) -**Peer dep**: none (Web Worker API is standard) - -### Design - -A `WorkerEventTarget` implementing `TypedEventTarget` over `postMessage`/`onmessage`. This enables `createPubSub` to work across Web Worker boundaries. - -The worker message protocol uses the `EventEnvelope` format: - -```json -{ "type": "call.responded", "id": "uuid-123", "payload": { "output": 42 } } -``` - -### Two-Sided Design - -- **Main thread** (`WorkerPoolManager` side): dispatches typed messages to workers via `worker.postMessage()`, receives responses via `worker.onmessage` -- **Worker thread**: dispatches to main thread via `parentPort.postMessage()`, receives from main thread via `globalThis.onmessage` - -Both sides wrap `postMessage`/`onmessage` to implement the `TypedEventTarget` interface: - -```ts -// Main thread side -const workerEventTarget = createWorkerEventTarget(worker); - -// Worker thread side -const mainEventTarget = createMainThreadEventTarget(); -``` - -### Key Properties - -- **Bidirectional** — both sides can publish and subscribe -- **Per-worker** — each worker gets its own event target -- **Structured clone** — Web Workers use structured clone algorithm for serialization, but JSON-serializable `EventEnvelope` ensures cross-platform compatibility -- **No native deps** — works in any environment with Web Worker support - -### Relationship to Taskgraph / Operations - -The worker event target enables distributed operation execution. Workers can subscribe to `call.requested` events and publish `call.responded` events through the event target, allowing `@alkdev/operations` to dispatch work to worker threads via the same pubsub transport. \ No newline at end of file +| Adapter | Import | Status | +|---------|--------|--------| +| [In-Process](in-process.md) | (default, no import) | Implemented (built-in `EventTarget`) | +| [Redis](redis.md) | `@alkdev/pubsub/event-target-redis` | Implemented. Needs tests. | +| [WebSocket Client](websocket-client.md) | `@alkdev/pubsub/event-target-websocket-client` | Not yet implemented | +| [WebSocket Server](websocket-server.md) | `@alkdev/pubsub/event-target-websocket-server` | Not yet implemented | +| [Worker](worker.md) | `@alkdev/pubsub/event-target-worker` | Not yet implemented (R&D on Node vs Web Worker) | +| [Iroh Spoke](iroh-spoke.md) | `@alkdev/pubsub/event-target-iroh-spoke` | Not yet implemented (R&D on binding) | +| [Iroh Hub](iroh-hub.md) | `@alkdev/pubsub/event-target-iroh-hub` | Not yet implemented (R&D on binding) | \ No newline at end of file diff --git a/docs/architecture/event-targets/in-process.md b/docs/architecture/event-targets/in-process.md new file mode 100644 index 0000000..224a7c1 --- /dev/null +++ b/docs/architecture/event-targets/in-process.md @@ -0,0 +1,25 @@ +--- +status: draft +last_updated: 2026-05-07 +--- + +# In-Process Event Target + +No adapter needed. `createPubSub` uses `new EventTarget()` by default. + +This works for single-process deployments where all pubsub participants share the same memory. The web standard `EventTarget` already implements `addEventListener`/`dispatchEvent`/`removeEventListener`. + +```ts +const pubsub = createPubSub(); +// uses new EventTarget() by default +``` + +Could be formalized as an explicit `InProcessEventTarget` for documentation purposes, but there's no functional need — the browser/Node `EventTarget` is the reference implementation of the `TypedEventTarget` contract. + +## Design Notes + +- Topics use the `type:id` string convention (e.g., `"call.responded:uuid-123"`) +- `CustomEvent.detail` carries the full `EventEnvelope` object +- No serialization — objects are passed by reference +- Synchronous dispatch — listeners fire immediately in the current call stack +- No connection lifecycle, no reconnection concerns \ No newline at end of file diff --git a/docs/architecture/event-targets/iroh-hub.md b/docs/architecture/event-targets/iroh-hub.md new file mode 100644 index 0000000..a1ce631 --- /dev/null +++ b/docs/architecture/event-targets/iroh-hub.md @@ -0,0 +1,75 @@ +--- +status: draft +last_updated: 2026-05-07 +--- + +# Iroh Hub Event Target + +**Import**: `@alkdev/pubsub/event-target-iroh-hub` +**Peer dep**: `@rayhanadev/iroh` (optional, NAPI-RS native addon) +**Status**: Not yet implemented. Needs R&D on binding stability, NAPI under Deno. + +P2P QUIC event target for the hub (server) side. The hub accepts incoming connections and bidirectional streams. Manages multiple connected spokes. + +## `createIrohHubEventTarget` + +```ts +async function createIrohHubEventTarget( + args: CreateIrohHubEventTargetArgs, +): Promise>; +``` + +### `CreateIrohHubEventTargetArgs` + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `endpoint` | `Endpoint` | Yes | iroh endpoint (created with `Endpoint.create()`) | +| `alpn` | `string` | No | Application-layer protocol. Default: `"alkpubsub/1"` | + +## How It Works + +Similar to the WebSocket server adapter, the Iroh hub adapter manages multiple connections: + +- `dispatchEvent` → writes JSON envelope to all connected spokes' `SendStream`s +- `addEventListener` → registers local listeners for events from any spoke +- On incoming connection → `endpoint.accept()` → `connection.acceptBi()` → new spoke tracked + +Each spoke gets its own read loop that parses length-prefixed JSON messages from `RecvStream` and dispatches locally. + +## Connection Lifecycle + +1. Hub creates `Endpoint` and starts accepting +2. Spoke connects → hub gets `Connection` from `endpoint.accept()` +3. Hub accepts stream → `connection.acceptBi()` → `SendStream` + `RecvStream` +4. Hub creates per-spoke read loop +5. On disconnect → `RecvStream.readExact()` throws → remove spoke from set +6. Hub continues accepting new connections + +## Fan-Out + +```ts +dispatchEvent(event) { + const message = encodeEnvelope(event.detail); + for (const spoke of this.spokes) { + spoke.sendStream.writeAll(message); + } + return true; +} +``` + +## Key Properties + +- **Multi-connection** — manages a set of connected spokes +- **Fan-out** — dispatchEvent sends to all connected spokes +- **Accepts incoming** — endpoint.accept() loop runs continuously +- **Cryptographic identity** — each spoke verified by Ed25519 NodeId + +## R&D Needed + +1. **Binding stability** — same as spoke adapter. `@rayhanadev/iroh` needs testing. +2. **Concurrent accept** — can `endpoint.accept()` handle multiple simultaneous connections? +3. **Stream vs. Connection per spoke** — current design: one bidirectional stream per spoke on a single connection. Alternative: one connection per spoke. Need to benchmark which is better for the expected workload. +4. **1:N fan-out** — for hub to N spokes, each spoke gets its own stream. For true broadcast, `iroh-gossip` would be better (not yet available in TS). +5. **Connection rejection** — how to reject connections from unknown `NodeId`s. + +See [../iroh-transport.md](../iroh-transport.md) for full protocol details, identity, and comparison with WebSocket. \ No newline at end of file diff --git a/docs/architecture/event-targets/iroh-spoke.md b/docs/architecture/event-targets/iroh-spoke.md new file mode 100644 index 0000000..b89e862 --- /dev/null +++ b/docs/architecture/event-targets/iroh-spoke.md @@ -0,0 +1,66 @@ +--- +status: draft +last_updated: 2026-05-07 +--- + +# Iroh Spoke Event Target + +**Import**: `@alkdev/pubsub/event-target-iroh-spoke` +**Peer dep**: `@rayhanadev/iroh` (optional, NAPI-RS native addon) +**Status**: Not yet implemented. Needs R&D on binding stability and Deno/NAPI compatibility. + +P2P QUIC event target for the spoke (client) side. The spoke initiates the connection and opens the bidirectional stream. + +## `createIrohSpokeEventTarget` + +```ts +async function createIrohSpokeEventTarget( + args: CreateIrohSpokeEventTargetArgs, +): Promise>; +``` + +### `CreateIrohSpokeEventTargetArgs` + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `endpoint` | `Endpoint` | Yes | iroh endpoint (created with `Endpoint.create()`) | +| `hubNodeId` | `string` \| `NodeId` | Yes | The hub's public key (Ed25519) | +| `alpn` | `string` | No | Application-layer protocol. Default: `"alkpubsub/1"` | + +## Protocol + +Single bidirectional QUIC stream per connection. Length-prefixed JSON messages: + +``` +[4 bytes: length N][N bytes: JSON payload] +``` + +The JSON payload is the `EventEnvelope`: + +```json +{ "type": "call.responded", "id": "uuid-123", "payload": { "output": 42 } } +``` + +## Connection Flow + +1. Spoke creates `Endpoint` +2. Spoke calls `endpoint.connect(hubNodeId, alpn)` → `Connection` +3. Spoke calls `connection.openBi()` → `SendStream` + `RecvStream` +4. Spoke wraps streams in `IrohSpokeEventTarget` +5. On disconnect: `RecvStream.readExact()` throws, spoke must reconnect + +## Key Properties + +- **NAT traversal** — spoke dials hub by `NodeId`, no public IP needed +- **Cryptographic identity** — `Connection.remoteNodeId()` verifies the hub +- **Bidirectional** — `dispatchEvent` writes to `SendStream`, `addEventListener` reads from `RecvStream` +- **Per-connection** — one event target per QUIC connection + +## R&D Needed + +1. **Binding stability** — `@rayhanadev/iroh` has one author and no tests. API surface is small (10 methods) but needs validation. +2. **NAPI under Deno** — NAPI-RS `.node` binaries need testing under Deno 2.x. +3. **Stream multiplexing** — multiple `openBi()` streams on one connection vs. single stream with multiplexed events. Single stream + JSON framing is simpler. +4. **Reconnection** — `RecvStream.readExact()` throws on connection close. Need to propagate this to listeners and support reconnect. + +See [../iroh-transport.md](../iroh-transport.md) for full protocol details, identity, and comparison with WebSocket. \ No newline at end of file diff --git a/docs/architecture/event-targets/redis.md b/docs/architecture/event-targets/redis.md new file mode 100644 index 0000000..57b65ba --- /dev/null +++ b/docs/architecture/event-targets/redis.md @@ -0,0 +1,59 @@ +--- +status: draft +last_updated: 2026-05-07 +--- + +# Redis Event Target + +**Import**: `@alkdev/pubsub/event-target-redis` +**Peer dep**: `ioredis@^5.0.0` (optional) +**Status**: Implemented. Needs tests. + +Adapted from `@graphql-yoga/redis-event-target` (MIT). + +## `createRedisEventTarget` + +```ts +function createRedisEventTarget( + args: CreateRedisEventTargetArgs, +): TypedEventTarget; +``` + +### `CreateRedisEventTargetArgs` + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `publishClient` | `Redis \| Cluster` | Yes | ioredis client for publishing. Can share a connection. | +| `subscribeClient` | `Redis \| Cluster` | Yes | ioredis client for subscribing. Must be dedicated — Redis requires subscriber connections to only receive messages. | +| `serializer` | `{ stringify, parse }` | No | Custom serializer. Defaults to `JSON`. | + +## How It Works + +- `dispatchEvent` → `publishClient.publish(event.type, serializer.stringify(event.detail))` +- `addEventListener` → `subscribeClient.subscribe(topic)`, track callbacks per topic +- `removeEventListener` → remove callback; if no callbacks remain for topic, `subscribeClient.unsubscribe(topic)` +- On message: deserializes with `serializer.parse`, reconstructs `CustomEvent(channel, { detail: envelope })` + +The `detail` of the `CustomEvent` dispatched to local listeners is the full `EventEnvelope` object (`{ type, id, payload }`). + +## Channel Naming + +Currently uses the topic string directly as the Redis channel name (e.g., `call.responded:uuid-123`). Should support a configurable prefix: `createRedisEventTarget({ ..., prefix: "alk:events:" })`. + +## Limitations (Current) + +- **No error handling** — connection failures, reconnection, and message parse errors are not handled +- **No channel prefix** — raw event types as channel names risk collision in shared Redis instances +- **No unsubscribe cleanup on client disconnect** — if the subscribe client disconnects, registered callbacks remain in the map but will never fire + +## Test Coverage + +No tests yet. Need: + +1. **Publish path** — dispatchEvent sends to Redis with correct channel and serialized envelope +2. **Subscribe path** — addEventListener subscribes to Redis, onMessage dispatches to local listeners +3. **Unsubscribe** — removeEventListener unsubscribes from Redis when no listeners remain for a topic +4. **Topic scoping** — type:id topics are correctly formed +5. **Envelope serialization** — full `{ type, id, payload }` round-trips through JSON +6. **Multiple listeners** — multiple listeners on same topic, single Redis subscribe +7. **Error propagation** — what happens on connection failure \ No newline at end of file diff --git a/docs/architecture/event-targets/websocket-client.md b/docs/architecture/event-targets/websocket-client.md new file mode 100644 index 0000000..a5d4ab2 --- /dev/null +++ b/docs/architecture/event-targets/websocket-client.md @@ -0,0 +1,73 @@ +--- +status: draft +last_updated: 2026-05-07 +--- + +# WebSocket Client Event Target + +**Import**: `@alkdev/pubsub/event-target-websocket-client` +**Peer dep**: none (WebSocket is a web standard) +**Status**: Not yet implemented. + +Wraps a single `WebSocket` connection for the client (spoke) side. Bidirectional — can both send and receive events. + +## `createWebSocketClientEventTarget` + +```ts +function createWebSocketClientEventTarget( + ws: WebSocket, +): TypedEventTarget; +``` + +Takes an already-connected `WebSocket`. The caller is responsible for connection lifecycle (including reconnection — see below). + +## Protocol + +WebSocket provides native message boundaries (no length-prefix needed). Each message is a JSON-serialized `EventEnvelope`: + +```json +{ "type": "call.responded", "id": "uuid-123", "payload": { "output": 42 } } +``` + +### Sending (dispatchEvent) + +```ts +dispatchEvent(event) { + this.ws.send(JSON.stringify(event.detail)); + // event.detail is the EventEnvelope { type, id, payload } + return true; +} +``` + +### Receiving (addEventListener) + +```ts +ws.onmessage = (msg) => { + const envelope = JSON.parse(msg.data); + const topic = `${envelope.type}:${envelope.id}`; + const event = new CustomEvent(topic, { detail: envelope }); + // dispatch to local listeners +}; +``` + +## Key Properties + +- **Bidirectional** — `dispatchEvent` sends over WS, `addEventListener` receives from WS +- **Per-connection** — one event target per WebSocket connection +- **JSON framing** — WebSocket provides native message boundaries +- **No native deps** — works in browsers and Node +- **Envelope serialization** — sends/receives the full `EventEnvelope` JSON + +## Reconnection + +WebSocket connections drop. On reconnect, the spoke must create a new `WebSocket` and a new `WebSocketClientEventTarget`. Reconnection logic belongs to the spoke lifecycle, not the event target. + +The event target itself is per-connection. A new connection means a new instance. + +## Test Plan + +1. **Send path** — dispatchEvent serializes envelope and calls ws.send +2. **Receive path** — ws.onmessage parses envelope, creates CustomEvent, dispatches to listeners +3. **Topic scoping** — type:id topics correctly formed from envelope +4. **Connection close** — ws.onclose propagates to listeners (error event?) +5. **Multiple listeners** — multiple addEventListener on same topic \ No newline at end of file diff --git a/docs/architecture/event-targets/websocket-server.md b/docs/architecture/event-targets/websocket-server.md new file mode 100644 index 0000000..e46a1f7 --- /dev/null +++ b/docs/architecture/event-targets/websocket-server.md @@ -0,0 +1,86 @@ +--- +status: draft +last_updated: 2026-05-07 +--- + +# WebSocket Server Event Target + +**Import**: `@alkdev/pubsub/event-target-websocket-server` +**Peer dep**: none (WebSocket is a web standard, but a server framework like Hono may be needed for upgrade handling) +**Status**: Not yet implemented. + +Manages multiple WebSocket connections for the server (hub) side. Handles fan-out: `dispatchEvent` sends to all connected spokes; `addEventListener` aggregates subscriptions across all connections. + +## `createWebSocketServerEventTarget` + +```ts +function createWebSocketServerEventTarget( + options: CreateWebSocketServerEventTargetArgs, +): TypedEventTarget; +``` + +### `CreateWebSocketServerEventTargetArgs` + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `onConnection` | callback | No | Called when a new spoke connects. Receives the spoke's event target for per-connection customization. | +| `onDisconnection` | callback | No | Called when a spoke disconnects. Receives the spoke's event target for cleanup. | + +## How It Works + +Unlike the client adapter, the server adapter manages a `Set` of active connections: + +- `dispatchEvent` → iterates all connected WebSockets, sends JSON envelope to each +- `addEventListener` → registers local listeners. The server doesn't subscribe to individual spokes — it listens for events from any spoke +- `removeEventListener` → removes local listeners + +### Incoming Messages + +Each connected spoke sends JSON envelopes. The server listens on `ws.onmessage` for each connection: + +```ts +for (const ws of this.connections) { + ws.onmessage = (msg) => { + const envelope = JSON.parse(msg.data); + const topic = `${envelope.type}:${envelope.id}`; + const event = new CustomEvent(topic, { detail: envelope }); + this.dispatchEvent(event); // dispatches to local listeners + }; +} +``` + +### Outgoing Messages (Fan-out) + +```ts +dispatchEvent(event) { + const message = JSON.stringify(event.detail); + for (const ws of this.connections) { + ws.send(message); + } + return true; +} +``` + +## Per-Connection Spoke Targets + +The server adapter creates a `WebSocketClientEventTarget` for each incoming connection. This allows the hub to target specific spokes if needed (e.g., responding to a specific request). + +## Key Properties + +- **Fan-out** — dispatchEvent sends to all connected spokes +- **Aggregate subscription** — addEventListener listens for events from any spoke +- **Connection lifecycle** — manages add/remove of WebSocket connections +- **No native deps** — works with any WebSocket server (Node ws, Bun, Deno, Hono) + +## Open Questions + +1. **Server framework coupling** — Should this adapter take a raw `WebSocketServer` or just handle `WebSocket` instances? Raw `WebSocket` instances keeps it framework-agnostic. The caller (hub code) handles the HTTP upgrade and passes connected `WebSocket`s to the adapter. +2. **Backpressure** — What happens when `ws.send()` blocks or buffers? Should there be a max buffer size per connection? +3. **Selective fan-out** — Should `dispatchEvent` always send to all connections, or should there be a way to target a specific spoke? + +## Test Plan + +1. **Fan-out** — dispatchEvent sends to all connected WebSockets +2. **Incoming aggregation** — messages from any spoke dispatch to local listeners +3. **Connection add/remove** — new connections are tracked, disconnections are cleaned up +4. **Mixed topology** — server adapter and client adapters can communicate bidirectionally \ No newline at end of file diff --git a/docs/architecture/event-targets/worker.md b/docs/architecture/event-targets/worker.md new file mode 100644 index 0000000..d643d1e --- /dev/null +++ b/docs/architecture/event-targets/worker.md @@ -0,0 +1,127 @@ +--- +status: draft +last_updated: 2026-05-07 +--- + +# Worker Event Target + +**Import**: `@alkdev/pubsub/event-target-worker` +**Peer dep**: none (Web Worker / Node worker_threads are standard) +**Status**: Not yet implemented. Needs R&D on Node vs Web Worker API differences. + +Enables `createPubSub` to work across Worker boundaries. Two factory functions: one for the main thread side, one for the worker thread side. + +## API + +```ts +// Main thread — wraps a Worker instance +function createWorkerEventTarget( + worker: Worker, +): TypedEventTarget; + +// Worker thread — wraps parent message port +function createMainThreadEventTarget(): TypedEventTarget; +``` + +## Protocol + +Worker messages use the `EventEnvelope` format over `postMessage`: + +```json +{ "type": "call.responded", "id": "uuid-123", "payload": { "output": 42 } } +``` + +### Main Thread → Worker + +```ts +dispatchEvent(event) { + this.worker.postMessage(event.detail); + // event.detail is the EventEnvelope + return true; +} +``` + +### Worker → Main Thread + +```ts +dispatchEvent(event) { + globalThis.postMessage(event.detail); + return true; +} +``` + +### Receiving + +```ts +// Main thread side +this.worker.onmessage = (msg) => { + const envelope = msg.data; + const topic = `${envelope.type}:${envelope.id}`; + const event = new CustomEvent(topic, { detail: envelope }); + // dispatch to listeners +}; + +// Worker thread side +globalThis.onmessage = (msg) => { + const envelope = msg.data; + const topic = `${envelope.type}:${envelope.id}`; + const event = new CustomEvent(topic, { detail: envelope }); + // dispatch to listeners +}; +``` + +## Key Properties + +- **Bidirectional** — both sides can publish and subscribe +- **Per-worker** — each worker gets its own event target on the main thread side +- **Structured clone** — Web Workers use structured clone for serialization, but the JSON-serializable `EventEnvelope` ensures cross-platform compatibility +- **No native deps** — works in any environment with Worker support + +## Open Questions / R&D Needed + +### Node vs Web Worker API + +The APIs differ significantly: + +| Feature | Web Worker | Node `worker_threads` | +|---------|-----------|----------------------| +| Create | `new Worker(url)` | `new Worker(path)` | +| Send | `worker.postMessage(msg)` | `worker.postMessage(msg)` | +| Receive | `worker.onmessage` | `worker.on('message')` | +| Worker send | `self.postMessage(msg)` | `parentPort.postMessage(msg)` | +| Worker receive | `self.onmessage` | `parentPort.on('message')` | +| Transfer | `postMessage(msg, [transfer])` | `postMessage(msg, [transferList])` | +| `MessagePort` | No built-in | Yes — `MessageChannel` for direct ports | + +Options: +1. **Two adapters** — `event-target-web-worker` and `event-target-node-worker` +2. **One adapter with runtime detection** — detect environment and use appropriate API +3. **One adapter abstracting both** — wrap the differences behind a common interface + +Recommendation: Start with a single adapter that targets Web Workers (browser + Deno + Bun all support this API). Add Node `worker_threads` support later if needed, potentially with a `MessagePort`-based approach for direct channels. + +### Worker Pool Pattern + +The original sandbox implementation used a worker pool pattern. A `WorkerPoolManager` would: +1. Maintain a pool of workers +2. Assign tasks to available workers +3. Collect results and fan out to subscribers + +This is **not** part of the `WorkerEventTarget` — it's a downstream concern for `@alkdev/operations`. The event target just wraps a single `postMessage`/`onmessage` channel. Pool management belongs higher. + +### Transferable Objects + +Web Workers support `Transferable` objects (ArrayBuffers, etc.) for zero-copy transfer. The current `EventEnvelope` is JSON, which gets structured-cloned. If large payloads need zero-copy transfer, the envelope could support a `Transferable` field, but this adds complexity and is not needed for the initial implementation. + +## Relationship to Downstream + +Workers can subscribe to events and publish results through the event target, allowing `@alkdev/operations` to dispatch work to worker threads via the same pubsub transport. The correlation (`id` field in the envelope) connects request to response. + +## Test Plan + +1. **Main → Worker send** — dispatchEvent from main posts message to worker +2. **Worker → Main send** — dispatchEvent from worker posts message to main +3. **Bidirectional** — both sides can subscribe and publish +4. **Topic scoping** — type:id topics correctly formed +5. **Envelope round-trip** — full envelope survives serialization +6. **Worker termination** — cleanup when worker exits \ No newline at end of file