Files
pubsub/docs/architecture/api-surface.md
glm-5.1 a12c52b407 fix: add close() lifecycle methods to all adapters, fix WS client handler preservation, add Worker thread context guard
- Add close() to Redis, WS Client, WS Server, Worker Host, Worker Thread adapters
  for graceful teardown (cleanup subscriptions, restore handlers, clear maps)
- WS Client now saves/restores original onmessage (consistent with WS Server)
- WS Client dispatchEvent/addEventListener/removeEventListener are no-ops after close()
- WS Server close() removes all connections and clears local listeners
- Redis close() unsubscribes all channels and removes message listener
- Worker Host/Thread close() restore original onmessage and clear callbacks
- Worker Thread throws clear error if globalThis.postMessage is unavailable
- Add double-call guard to WS Server removeConnection
- Export new adapter interface types (RedisEventTarget, WebSocketClientEventTarget, etc.)
- Add sideEffects: false to package.json for tree-shaking
- Update architecture docs: lifecycle section, close() contract, adapter status updates
- 22 new tests covering close(), handler restoration, idempotency, and context guard
2026-05-08 16:19:16 +00:00

240 lines
8.3 KiB
Markdown

---
status: draft
last_updated: 2026-05-08
---
# API Surface
Core pubsub creation, types, and operators. No transport dependencies.
## `createPubSub`
```ts
function createPubSub<TEventMap extends PubSubEventMap>(
config?: PubSubConfig<TEventMap>,
): PubSub<TEventMap>;
```
Factory function. Accepts an optional `eventTarget` config. If none is provided, uses `new EventTarget()` (in-process).
### Event Envelope
Every event dispatched through pubsub uses the `EventEnvelope` format:
```ts
interface EventEnvelope<TType extends string = string, TPayload = unknown> {
readonly type: TType;
readonly id: string;
readonly payload: TPayload;
}
```
The envelope is the cross-platform serialization contract. All transport adapters serialize/deserialize this format. Domain-specific data goes in `payload`.
### Reserved Event Types
Event types starting with `__` (double underscore) are reserved for adapter control messages (e.g., `__subscribe`, `__unsubscribe`). User code must not define event types with this prefix. Control events use the empty string `""` for the `id` field by convention — they use the `topic` field in their `payload` for routing instead. `createPubSub.publish()` should reject or warn on event types starting with `__`. See [ADR-003](decisions/003-subscription-control-protocol.md).
### Topic Scoping
Topics are scoped by `id` using the `type:id` convention:
```ts
pubsub.publish("call.responded", requestId, { output });
// → dispatches event with CustomEvent type "call.responded:{requestId}", detail = { type, id, payload }
const stream = pubsub.subscribe("call.responded", requestId);
// → subscribes to topic "call.responded:{requestId}"
```
Unlike the previous tuple-based model, `id` is always required. This simplifies the type system and makes correlation explicit.
### `PubSubEventMap`
The type parameter that defines the event map. Maps event type strings to their payload types:
```ts
type PubSubEventMap = {
[eventType: string]: unknown;
};
```
### `PubSub.publish()`
Publishes an event to the pubsub. Throws if the event type starts with `__` (reserved for adapter control messages).
```ts
pubsub.publish("call.responded", requestId, { output });
// → dispatches event with CustomEvent type "call.responded:{requestId}", detail = { type, id, payload }
```
The `CustomEvent.type` is the composite `type:id` string. This is the key that `addEventListener` and `dispatchEvent` use for matching. The `EventEnvelope` in `detail` preserves the separate `type` and `id` fields for transport adapters that need them.
### `PubSub.subscribe()`
Returns a `Repeater<EventEnvelope<TKey, TPayload>>` (async iterable). Consumers iterate with `for await`:
```ts
for await (const envelope of pubsub.subscribe("session.status", sessionId)) {
// envelope.type === "session.status"
// envelope.id === sessionId
// envelope.payload === the typed payload
}
```
The `Repeater` automatically cleans up its `addEventListener` when the consumer breaks out of the loop (the `stop` promise resolves).
## Types
| Export | Source | Description |
|--------|--------|-------------|
| `EventEnvelope<TType, TPayload>` | `types.ts` | Cross-platform envelope: `{ type, id, payload }`. JSON-serializable. |
| `TypedEvent<TType, TDetail>` | `types.ts` | Event with typed `type` and `detail`. Omits `CustomEvent`'s untyped fields. |
| `TypedEventTarget<TEvent>` | `types.ts` | Extends `EventTarget` with typed `addEventListener`, `dispatchEvent`, `removeEventListener`. All adapters' `dispatchEvent` returns `true` (events are non-cancelable). |
| `TypedEventListener<TEvent>` | `types.ts` | `(evt: TEvent) => void` |
| `TypedEventListenerObject<TEvent>` | `types.ts` | `{ handleEvent(object: TEvent): void }` |
| `TypedEventListenerOrEventListenerObject<TEvent>` | `types.ts` | Union of the above |
| `PubSub<TEventMap>` | `create_pubsub.ts` | `{ publish, subscribe }` — publish takes `(type, id, payload)`, subscribe takes `(type, id)` and returns `Repeater<EventEnvelope>` |
| `PubSubConfig<TEventMap>` | `create_pubsub.ts` | `{ eventTarget?: PubSubEventTarget }` |
| `PubSubEvent<TEventMap, TType>` | `create_pubsub.ts` | Derived `TypedEvent` for a specific event type, with `detail` as `EventEnvelope<TType, TPayload>` |
| `PubSubEventTarget<TEventMap>` | `create_pubsub.ts` | `TypedEventTarget<PubSubEvent<...>>` |
## Adapter Lifecycle
All transport adapters provide a `close()` method for graceful teardown. After `close()`:
- The adapter is unusable (no-op for `addEventListener`, `removeEventListener`, `dispatchEvent`)
- All subscriptions are cleaned up (Redis channels unsubscribed, `__unsubscribe` sent for WebSocket topics, callbacks cleared)
- Intercepted handlers are restored to their originals
- The underlying transport (Redis connection, WebSocket, Worker) is **not** destroyed — the caller owns it
`close()` is idempotent. Calling it multiple times is safe.
Adapter return types reflect this:
| Adapter | Return type |
|---------|-------------|
| Redis | `RedisEventTarget<TEvent>` (extends `TypedEventTarget<TEvent>`, adds `close()`) |
| WebSocket Client | `WebSocketClientEventTarget<TEvent>` (extends `TypedEventTarget<TEvent>`, adds `close()`) |
| WebSocket Server | `WebSocketServerEventTarget<TEvent>` (extends `TypedEventTarget<TEvent>`, adds `addConnection`, `removeConnection`, `close()`) |
| Worker Host | `WorkerHostEventTarget<TEvent>` (extends `TypedEventTarget<TEvent>`, adds `close()`) |
| Worker Thread | `WorkerThreadEventTarget<TEvent>` (extends `TypedEventTarget<TEvent>`, adds `close()`) |
## Operators
All operators work with any `AsyncIterable`. Operators that return `Repeater` provide backpressure-aware push semantics.
### Repeater-returning operators
These wrap source iterables in a `Repeater` with explicit push/stop control:
#### `filter`
```ts
function filter<T>(filterFn: (value: T) => Promise<boolean> | boolean): (source: AsyncIterable<T>) => Repeater<T>;
```
Type-narrowing overload available: `filter<T, U extends T>(fn: (input: T) => input is U)`.
#### `map`
```ts
function map<T, O>(mapper: (input: T) => Promise<O> | O): (source: AsyncIterable<T>) => Repeater<O>;
```
#### `pipe`
```ts
function pipe<A, B>(a: A, ab: (a: A) => B): B;
// up to 5 arguments
```
Compose operators: `pipe(pubsub.subscribe("myEvent", id), filter(isRelevant), map(transform))`
### AsyncGenerator operators
These use native `async function*` generators for simpler stream transformations:
#### `take`
Yields only the first `count` items from the source.
```ts
async function* take<T>(source: AsyncIterable<T>, count: number): AsyncIterable<T>
```
#### `reduce`
Reduces the stream to a single value.
```ts
async function reduce<T, U>(source: AsyncIterable<T>, reducer: (acc: U, value: T) => Promise<U> | U, initialValue: U): Promise<U>
```
#### `toArray`
Collects all items into an array.
```ts
async function toArray<T>(source: AsyncIterable<T>): Promise<T[]>
```
#### `batch`
Groups items into arrays of `size`.
```ts
async function* batch<T>(source: AsyncIterable<T>, size: number): AsyncIterable<T[]>
```
#### `dedupe`
Yields only unique items (uses `Set` for deduplication).
```ts
async function* dedupe<T>(source: AsyncIterable<T>): AsyncIterable<T>
```
#### `window`
Sliding window of `size` items, advancing by `step` (default 1).
```ts
async function* window<T>(source: AsyncIterable<T>, size: number, step?: number): AsyncIterable<T[]>
```
#### `flat`
Flattens an `AsyncIterable<T[]>` into `AsyncIterable<T>`.
```ts
async function* flat<T>(source: AsyncIterable<T[]>): AsyncIterable<T>
```
#### `groupBy`
Groups items by key into a `Map`. Terminal operation (consumes entire stream).
```ts
async function groupBy<T, K>(source: AsyncIterable<T>, keyFn: (value: T) => K): Promise<Map<K, T[]>>
```
#### `chain`
Concatenates multiple async iterables into one.
```ts
async function* chain<T>(...sources: AsyncIterable<T>[]): AsyncIterable<T>
```
#### `join`
Streaming join between two sources on matching keys.
```ts
async function* join<T, U, K>(source1: AsyncIterable<T>, source2: AsyncIterable<U>, keyFn1: (value: T) => K, keyFn2: (value: U) => K): AsyncIterable<[T, U]>
```
## Attribution
`createPubSub`, `filter`, `map`, and `pipe` are adapted from `@graphql-yoga/subscription` (MIT). `TypedEventTarget` types are adapted from `@graphql-yoga/typed-event-target` (MIT). `Repeater` is inlined from `@repeaterjs/repeater` (MIT). See file headers for full license text.