Fix critical publish() bug, address review findings

CRITICAL: createPubSub.publish() was dispatching CustomEvent with
just the event type (e.g. 'call.responded') instead of the composite
topic string ('call.responded:uuid-123'). This broke all adapters
that rely on topic-scoped dispatch — Redis subscribe/publish
channels didn't match, and WS server fan-out routing would fail.
Fixed to dispatch with the full type:id composite.

Other fixes:
- Add __ prefix runtime guard in publish() (reserved for control)
- Add Redis barrel re-export to src/index.ts (ADR-002 compliance)
- Clarify WS server: adapter's onclose calls removeConnection
  internally; user doesn't need to
- WS client: document null callback no-op, removeEventListener
  edge cases (unregistered callback, null callback)
- WS server: document dispatchEvent always returns true
- Redis spec: document in-flight message edge case after unsubscribe
- Worker adapter: rename createMainThreadEventTarget to
  createWorkerThreadEventTarget, createWorkerEventTarget to
  createWorkerHostEventTarget (fix inverted naming)
- api-surface.md: add PubSub.publish() section documenting the
  type:id composite and __ guard
This commit is contained in:
2026-05-08 05:17:43 +00:00
parent e60f0a1aa0
commit be7fe67145
7 changed files with 52 additions and 20 deletions

View File

@@ -1,6 +1,6 @@
---
status: draft
last_updated: 2026-05-01
last_updated: 2026-05-08
---
# API Surface
@@ -33,7 +33,7 @@ The envelope is the cross-platform serialization contract. All transport adapter
### Reserved Event Types
Event types starting with `__` (double underscore) are reserved for adapter control messages (e.g., `__subscribe`, `__unsubscribe`). User code must not define event types with this prefix. Control events use the empty string `""` for the `id` field by convention — they use the `topic` field in their `payload` for routing instead. See [ADR-003](decisions/003-subscription-control-protocol.md).
Event types starting with `__` (double underscore) are reserved for adapter control messages (e.g., `__subscribe`, `__unsubscribe`). User code must not define event types with this prefix. Control events use the empty string `""` for the `id` field by convention — they use the `topic` field in their `payload` for routing instead. `createPubSub.publish()` should reject or warn on event types starting with `__`. See [ADR-003](decisions/003-subscription-control-protocol.md).
### Topic Scoping
@@ -59,6 +59,17 @@ type PubSubEventMap = {
};
```
### `PubSub.publish()`
Publishes an event to the pubsub. Throws if the event type starts with `__` (reserved for adapter control messages).
```ts
pubsub.publish("call.responded", requestId, { output });
// → dispatches event with CustomEvent type "call.responded:{requestId}", detail = { type, id, payload }
```
The `CustomEvent.type` is the composite `type:id` string. This is the key that `addEventListener` and `dispatchEvent` use for matching. The `EventEnvelope` in `detail` preserves the separate `type` and `id` fields for transport adapters that need them.
### `PubSub.subscribe()`
Returns a `Repeater<EventEnvelope<TKey, TPayload>>` (async iterable). Consumers iterate with `for await`:

View File

@@ -1,6 +1,6 @@
---
status: draft
last_updated: 2026-05-07
last_updated: 2026-05-08
---
# Redis Event Target
@@ -45,6 +45,7 @@ Currently uses the topic string directly as the Redis channel name (e.g., `call.
- **No error handling** — connection failures, reconnection, and message parse errors are not handled
- **No channel prefix** — raw event types as channel names risk collision in shared Redis instances
- **No unsubscribe cleanup on client disconnect** — if the subscribe client disconnects, registered callbacks remain in the map but will never fire
- **In-flight messages after unsubscribe** — if `removeEventListener` triggers an `unsubscribe` while a Redis message is in flight, the message may arrive after the callback is removed. This is harmless (the callback set is empty, so the message is a no-op) but worth noting for implementers
## Test Coverage

View File

@@ -61,7 +61,8 @@ This is how the server adapter knows which events to forward to this spoke. With
When `addEventListener(type, callback)` is called:
1. Register the local listener (standard `EventTarget` behavior)
2. If this is the first listener for this topic (no previous listeners registered), send a `__subscribe` control event to the server:
2. If `callback` is `null`, this is a no-op — no listener is registered and no `__subscribe` is sent
3. If this is the first listener for this topic (no previous listeners registered), send a `__subscribe` control event to the server:
```json
{ "type": "__subscribe", "id": "", "payload": { "topic": "call.responded:uuid-123" } }
@@ -74,7 +75,9 @@ The `id` field is the empty string (`""`) for control events by convention. The
When `removeEventListener(type, callback)` is called:
1. Remove the local listener (standard `EventTarget` behavior)
2. If no listeners remain for this topic, send an `__unsubscribe` control event:
2. If `callback` was never registered for this type, this is a no-op — no `__unsubscribe` is sent (the reference count wasn't incremented, so it shouldn't be decremented)
3. If `callback` is `null`, remove all listeners for this type. If no listeners remain after removal, send `__unsubscribe`
4. If this was the last registered listener for this topic, send an `__unsubscribe` control event:
```json
{ "type": "__unsubscribe", "id": "", "payload": { "topic": "call.responded:uuid-123" } }

View File

@@ -54,9 +54,8 @@ const serverTarget = createWebSocketServerEventTarget({});
app.get("/ws", (c) => {
return c.upgrade(async (ws) => {
serverTarget.addConnection(ws);
ws.addEventListener("close", () => {
serverTarget.removeConnection(ws);
});
// removeConnection is called automatically by the adapter's onclose handler.
// Only call removeConnection manually for forced disconnections (backpressure, auth failures).
});
});
```
@@ -77,6 +76,8 @@ The server adapter exposes these methods on the returned `WebSocketServerEventTa
`removeConnection` cleans up internal state (subscription maps, event handlers) but does **not** close the `WebSocket`. The caller is responsible for closing the connection if needed. Typically it's called from a `close` event handler, where the connection is already closing.
**Note:** The adapter's `onclose` handler (set up by `addConnection`) calls `removeConnection` internally. This means the caller does not need to call `removeConnection` from their own `close` handler — the adapter handles cleanup automatically when a connection closes. The `removeConnection` method is exposed for cases where the caller needs to manually disconnect a connection (e.g., for backpressure or authentication failures).
### Error Handling
- **Malformed JSON from a spoke** → the message is silently ignored. The adapter logs a warning (via `console.warn` or a configurable logger) and continues processing other messages from that connection. The connection is not closed — a single malformed message should not disconnect a client.
@@ -85,6 +86,8 @@ The server adapter exposes these methods on the returned `WebSocketServerEventTa
- **Send failure** → if `ws.send()` throws (connection died between the `bufferedAmount` check and the send), the adapter catches the error, removes the connection from the subscription maps, and fires `onDisconnection`.
- **`onclose` from client** → the adapter removes the connection from all subscription maps and fires `onDisconnection`.
- **`dispatchEvent` return value** — always returns `true`, regardless of subscriber count or send failures. Errors are handled via side effects (connection removal, `onDisconnection` callback), not via the return value. This matches the `EventTarget` contract where `return false` means `preventDefault` was called, not "send failed."
### Concurrency Model
This adapter assumes a single-threaded event loop (Node.js, Bun, Deno, browsers). In environments with worker threads, the caller must ensure `addConnection`/`removeConnection` and `dispatchEvent` are not called concurrently.

View File

@@ -1,6 +1,6 @@
---
status: draft
last_updated: 2026-05-07
last_updated: 2026-05-08
---
# Worker Event Target
@@ -14,15 +14,17 @@ Enables `createPubSub` to work across Worker boundaries. Two factory functions:
## API
```ts
// Main thread — wraps a Worker instance
function createWorkerEventTarget<TEvent extends TypedEvent>(
// Main thread side — wraps a Worker instance
function createWorkerHostEventTarget<TEvent extends TypedEvent>(
worker: Worker,
): TypedEventTarget<TEvent>;
// Worker thread — wraps parent message port
function createMainThreadEventTarget<TEvent extends TypedEvent>(): TypedEventTarget<TEvent>;
// Worker thread side — wraps parent message port
function createWorkerThreadEventTarget<TEvent extends TypedEvent>(): TypedEventTarget<TEvent>;
```
The naming convention: `Host` is the side that owns the `Worker` object (typically the main thread). `Thread` is the side that runs inside the worker (accessing `self.onmessage` / `parentPort`).
## Protocol
Worker messages use the `EventEnvelope` format over `postMessage`:
@@ -31,9 +33,10 @@ Worker messages use the `EventEnvelope` format over `postMessage`:
{ "type": "call.responded", "id": "uuid-123", "payload": { "output": 42 } }
```
### Main Thread → Worker
### Host → Worker Thread
```ts
// Host side (createWorkerHostEventTarget)
dispatchEvent(event) {
this.worker.postMessage(event.detail);
// event.detail is the EventEnvelope
@@ -41,19 +44,20 @@ dispatchEvent(event) {
}
```
### Worker → Main Thread
### Worker Thread → Host
```ts
// Worker thread side (createWorkerThreadEventTarget)
dispatchEvent(event) {
globalThis.postMessage(event.detail);
return true;
}
```
### Receiving
### Receiving on Host Side
```ts
// Main thread side
// Host side (createWorkerHostEventTarget)
this.worker.onmessage = (msg) => {
const envelope = msg.data;
const topic = `${envelope.type}:${envelope.id}`;
@@ -61,7 +65,10 @@ this.worker.onmessage = (msg) => {
// dispatch to listeners
};
// Worker thread side
### Receiving on Worker Thread Side
```ts
// Worker thread side (createWorkerThreadEventTarget)
globalThis.onmessage = (msg) => {
const envelope = msg.data;
const topic = `${envelope.type}:${envelope.id}`;

View File

@@ -69,8 +69,14 @@ export function createPubSub<TEventMap extends PubSubEventMap>(
id: string,
payload: TEventMap[TType],
) {
if (type.startsWith("__")) {
throw new Error(
`Event types starting with "__" are reserved for adapter control messages. Received: "${type}"`,
);
}
const envelope: EventEnvelope<TType, TEventMap[TType]> = { type, id, payload };
const event = new CustomEvent(type, { detail: envelope }) as PubSubEvent<
const topic = `${type}:${id}`;
const event = new CustomEvent(topic, { detail: envelope }) as PubSubEvent<
TEventMap,
TType
>;

View File

@@ -1,4 +1,5 @@
export { createPubSub, type PubSub, type PubSubConfig, type PubSubEvent, type PubSubEventTarget, type PubSubEventMap } from "./create_pubsub.js";
export { type EventEnvelope, type TypedEvent, type TypedEventTarget, type TypedEventListener, type TypedEventListenerObject, type TypedEventListenerOrEventListenerObject } from "./types.js";
export { filter, map, pipe, take, reduce, toArray, batch, dedupe, window, flat, groupBy, chain, join } from "./operators.js";
export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js";
export { Repeater, RepeaterOverflowError, type Push, type Stop, type RepeaterExecutor, type RepeaterBuffer } from "./repeater.js";
export { createRedisEventTarget, type CreateRedisEventTargetArgs } from "./event-target-redis.js";