From be7fe67145167a3ca2b897ef88f0bdcdb4043764 Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Fri, 8 May 2026 05:17:43 +0000 Subject: [PATCH] Fix critical publish() bug, address review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CRITICAL: createPubSub.publish() was dispatching CustomEvent with just the event type (e.g. 'call.responded') instead of the composite topic string ('call.responded:uuid-123'). This broke all adapters that rely on topic-scoped dispatch — Redis subscribe/publish channels didn't match, and WS server fan-out routing would fail. Fixed to dispatch with the full type:id composite. Other fixes: - Add __ prefix runtime guard in publish() (reserved for control) - Add Redis barrel re-export to src/index.ts (ADR-002 compliance) - Clarify WS server: adapter's onclose calls removeConnection internally; user doesn't need to - WS client: document null callback no-op, removeEventListener edge cases (unregistered callback, null callback) - WS server: document dispatchEvent always returns true - Redis spec: document in-flight message edge case after unsubscribe - Worker adapter: rename createMainThreadEventTarget to createWorkerThreadEventTarget, createWorkerEventTarget to createWorkerHostEventTarget (fix inverted naming) - api-surface.md: add PubSub.publish() section documenting the type:id composite and __ guard --- docs/architecture/api-surface.md | 15 +++++++++-- docs/architecture/event-targets/redis.md | 3 ++- .../event-targets/websocket-client.md | 7 +++-- .../event-targets/websocket-server.md | 9 ++++--- docs/architecture/event-targets/worker.md | 27 ++++++++++++------- src/create_pubsub.ts | 8 +++++- src/index.ts | 3 ++- 7 files changed, 52 insertions(+), 20 deletions(-) diff --git a/docs/architecture/api-surface.md b/docs/architecture/api-surface.md index d43615a..e1da375 100644 --- a/docs/architecture/api-surface.md +++ b/docs/architecture/api-surface.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-05-01 +last_updated: 2026-05-08 --- # API Surface @@ -33,7 +33,7 @@ The envelope is the cross-platform serialization contract. All transport adapter ### Reserved Event Types -Event types starting with `__` (double underscore) are reserved for adapter control messages (e.g., `__subscribe`, `__unsubscribe`). User code must not define event types with this prefix. Control events use the empty string `""` for the `id` field by convention — they use the `topic` field in their `payload` for routing instead. See [ADR-003](decisions/003-subscription-control-protocol.md). +Event types starting with `__` (double underscore) are reserved for adapter control messages (e.g., `__subscribe`, `__unsubscribe`). User code must not define event types with this prefix. Control events use the empty string `""` for the `id` field by convention — they use the `topic` field in their `payload` for routing instead. `createPubSub.publish()` should reject or warn on event types starting with `__`. See [ADR-003](decisions/003-subscription-control-protocol.md). ### Topic Scoping @@ -59,6 +59,17 @@ type PubSubEventMap = { }; ``` +### `PubSub.publish()` + +Publishes an event to the pubsub. Throws if the event type starts with `__` (reserved for adapter control messages). + +```ts +pubsub.publish("call.responded", requestId, { output }); +// → dispatches event with CustomEvent type "call.responded:{requestId}", detail = { type, id, payload } +``` + +The `CustomEvent.type` is the composite `type:id` string. This is the key that `addEventListener` and `dispatchEvent` use for matching. The `EventEnvelope` in `detail` preserves the separate `type` and `id` fields for transport adapters that need them. + ### `PubSub.subscribe()` Returns a `Repeater>` (async iterable). Consumers iterate with `for await`: diff --git a/docs/architecture/event-targets/redis.md b/docs/architecture/event-targets/redis.md index 57b65ba..5d72529 100644 --- a/docs/architecture/event-targets/redis.md +++ b/docs/architecture/event-targets/redis.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-05-07 +last_updated: 2026-05-08 --- # Redis Event Target @@ -45,6 +45,7 @@ Currently uses the topic string directly as the Redis channel name (e.g., `call. - **No error handling** — connection failures, reconnection, and message parse errors are not handled - **No channel prefix** — raw event types as channel names risk collision in shared Redis instances - **No unsubscribe cleanup on client disconnect** — if the subscribe client disconnects, registered callbacks remain in the map but will never fire +- **In-flight messages after unsubscribe** — if `removeEventListener` triggers an `unsubscribe` while a Redis message is in flight, the message may arrive after the callback is removed. This is harmless (the callback set is empty, so the message is a no-op) but worth noting for implementers ## Test Coverage diff --git a/docs/architecture/event-targets/websocket-client.md b/docs/architecture/event-targets/websocket-client.md index 79a3d0f..51b0c0a 100644 --- a/docs/architecture/event-targets/websocket-client.md +++ b/docs/architecture/event-targets/websocket-client.md @@ -61,7 +61,8 @@ This is how the server adapter knows which events to forward to this spoke. With When `addEventListener(type, callback)` is called: 1. Register the local listener (standard `EventTarget` behavior) -2. If this is the first listener for this topic (no previous listeners registered), send a `__subscribe` control event to the server: +2. If `callback` is `null`, this is a no-op — no listener is registered and no `__subscribe` is sent +3. If this is the first listener for this topic (no previous listeners registered), send a `__subscribe` control event to the server: ```json { "type": "__subscribe", "id": "", "payload": { "topic": "call.responded:uuid-123" } } @@ -74,7 +75,9 @@ The `id` field is the empty string (`""`) for control events by convention. The When `removeEventListener(type, callback)` is called: 1. Remove the local listener (standard `EventTarget` behavior) -2. If no listeners remain for this topic, send an `__unsubscribe` control event: +2. If `callback` was never registered for this type, this is a no-op — no `__unsubscribe` is sent (the reference count wasn't incremented, so it shouldn't be decremented) +3. If `callback` is `null`, remove all listeners for this type. If no listeners remain after removal, send `__unsubscribe` +4. If this was the last registered listener for this topic, send an `__unsubscribe` control event: ```json { "type": "__unsubscribe", "id": "", "payload": { "topic": "call.responded:uuid-123" } } diff --git a/docs/architecture/event-targets/websocket-server.md b/docs/architecture/event-targets/websocket-server.md index dd872e9..4ff3ded 100644 --- a/docs/architecture/event-targets/websocket-server.md +++ b/docs/architecture/event-targets/websocket-server.md @@ -54,9 +54,8 @@ const serverTarget = createWebSocketServerEventTarget({}); app.get("/ws", (c) => { return c.upgrade(async (ws) => { serverTarget.addConnection(ws); - ws.addEventListener("close", () => { - serverTarget.removeConnection(ws); - }); + // removeConnection is called automatically by the adapter's onclose handler. + // Only call removeConnection manually for forced disconnections (backpressure, auth failures). }); }); ``` @@ -77,6 +76,8 @@ The server adapter exposes these methods on the returned `WebSocketServerEventTa `removeConnection` cleans up internal state (subscription maps, event handlers) but does **not** close the `WebSocket`. The caller is responsible for closing the connection if needed. Typically it's called from a `close` event handler, where the connection is already closing. +**Note:** The adapter's `onclose` handler (set up by `addConnection`) calls `removeConnection` internally. This means the caller does not need to call `removeConnection` from their own `close` handler — the adapter handles cleanup automatically when a connection closes. The `removeConnection` method is exposed for cases where the caller needs to manually disconnect a connection (e.g., for backpressure or authentication failures). + ### Error Handling - **Malformed JSON from a spoke** → the message is silently ignored. The adapter logs a warning (via `console.warn` or a configurable logger) and continues processing other messages from that connection. The connection is not closed — a single malformed message should not disconnect a client. @@ -85,6 +86,8 @@ The server adapter exposes these methods on the returned `WebSocketServerEventTa - **Send failure** → if `ws.send()` throws (connection died between the `bufferedAmount` check and the send), the adapter catches the error, removes the connection from the subscription maps, and fires `onDisconnection`. - **`onclose` from client** → the adapter removes the connection from all subscription maps and fires `onDisconnection`. +- **`dispatchEvent` return value** — always returns `true`, regardless of subscriber count or send failures. Errors are handled via side effects (connection removal, `onDisconnection` callback), not via the return value. This matches the `EventTarget` contract where `return false` means `preventDefault` was called, not "send failed." + ### Concurrency Model This adapter assumes a single-threaded event loop (Node.js, Bun, Deno, browsers). In environments with worker threads, the caller must ensure `addConnection`/`removeConnection` and `dispatchEvent` are not called concurrently. diff --git a/docs/architecture/event-targets/worker.md b/docs/architecture/event-targets/worker.md index d643d1e..0edffb2 100644 --- a/docs/architecture/event-targets/worker.md +++ b/docs/architecture/event-targets/worker.md @@ -1,6 +1,6 @@ --- status: draft -last_updated: 2026-05-07 +last_updated: 2026-05-08 --- # Worker Event Target @@ -14,15 +14,17 @@ Enables `createPubSub` to work across Worker boundaries. Two factory functions: ## API ```ts -// Main thread — wraps a Worker instance -function createWorkerEventTarget( +// Main thread side — wraps a Worker instance +function createWorkerHostEventTarget( worker: Worker, ): TypedEventTarget; -// Worker thread — wraps parent message port -function createMainThreadEventTarget(): TypedEventTarget; +// Worker thread side — wraps parent message port +function createWorkerThreadEventTarget(): TypedEventTarget; ``` +The naming convention: `Host` is the side that owns the `Worker` object (typically the main thread). `Thread` is the side that runs inside the worker (accessing `self.onmessage` / `parentPort`). + ## Protocol Worker messages use the `EventEnvelope` format over `postMessage`: @@ -31,9 +33,10 @@ Worker messages use the `EventEnvelope` format over `postMessage`: { "type": "call.responded", "id": "uuid-123", "payload": { "output": 42 } } ``` -### Main Thread → Worker +### Host → Worker Thread ```ts +// Host side (createWorkerHostEventTarget) dispatchEvent(event) { this.worker.postMessage(event.detail); // event.detail is the EventEnvelope @@ -41,19 +44,20 @@ dispatchEvent(event) { } ``` -### Worker → Main Thread +### Worker Thread → Host ```ts +// Worker thread side (createWorkerThreadEventTarget) dispatchEvent(event) { globalThis.postMessage(event.detail); return true; } ``` -### Receiving +### Receiving on Host Side ```ts -// Main thread side +// Host side (createWorkerHostEventTarget) this.worker.onmessage = (msg) => { const envelope = msg.data; const topic = `${envelope.type}:${envelope.id}`; @@ -61,7 +65,10 @@ this.worker.onmessage = (msg) => { // dispatch to listeners }; -// Worker thread side +### Receiving on Worker Thread Side + +```ts +// Worker thread side (createWorkerThreadEventTarget) globalThis.onmessage = (msg) => { const envelope = msg.data; const topic = `${envelope.type}:${envelope.id}`; diff --git a/src/create_pubsub.ts b/src/create_pubsub.ts index 3a52536..4afc3fd 100644 --- a/src/create_pubsub.ts +++ b/src/create_pubsub.ts @@ -69,8 +69,14 @@ export function createPubSub( id: string, payload: TEventMap[TType], ) { + if (type.startsWith("__")) { + throw new Error( + `Event types starting with "__" are reserved for adapter control messages. Received: "${type}"`, + ); + } const envelope: EventEnvelope = { type, id, payload }; - const event = new CustomEvent(type, { detail: envelope }) as PubSubEvent< + const topic = `${type}:${id}`; + const event = new CustomEvent(topic, { detail: envelope }) as PubSubEvent< TEventMap, TType >; diff --git a/src/index.ts b/src/index.ts index 88596c1..d4fa1b7 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,5 @@ export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type PubSubEventTarget, type PubSubEventMap } from "./create_pubsub.js"; export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js"; export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js"; -export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js"; \ No newline at end of file +export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js"; +export { createRedisEventTarget, type CreateRedisEventTargetArgs } from "./event-target-redis.js"; \ No newline at end of file