- Add close() to Redis, WS Client, WS Server, Worker Host, Worker Thread adapters for graceful teardown (cleanup subscriptions, restore handlers, clear maps) - WS Client now saves/restores original onmessage (consistent with WS Server) - WS Client dispatchEvent/addEventListener/removeEventListener are no-ops after close() - WS Server close() removes all connections and clears local listeners - Redis close() unsubscribes all channels and removes message listener - Worker Host/Thread close() restore original onmessage and clear callbacks - Worker Thread throws clear error if globalThis.postMessage is unavailable - Add double-call guard to WS Server removeConnection - Export new adapter interface types (RedisEventTarget, WebSocketClientEventTarget, etc.) - Add sideEffects: false to package.json for tree-shaking - Update architecture docs: lifecycle section, close() contract, adapter status updates - 22 new tests covering close(), handler restoration, idempotency, and context guard
8.3 KiB
status, last_updated
| status | last_updated |
|---|---|
| draft | 2026-05-08 |
API Surface
Core pubsub creation, types, and operators. No transport dependencies.
createPubSub
function createPubSub<TEventMap extends PubSubEventMap>(
config?: PubSubConfig<TEventMap>,
): PubSub<TEventMap>;
Factory function. Accepts an optional eventTarget config. If none is provided, uses new EventTarget() (in-process).
Event Envelope
Every event dispatched through pubsub uses the EventEnvelope format:
interface EventEnvelope<TType extends string = string, TPayload = unknown> {
readonly type: TType;
readonly id: string;
readonly payload: TPayload;
}
The envelope is the cross-platform serialization contract. All transport adapters serialize/deserialize this format. Domain-specific data goes in payload.
Reserved Event Types
Event types starting with __ (double underscore) are reserved for adapter control messages (e.g., __subscribe, __unsubscribe). User code must not define event types with this prefix. Control events use the empty string "" for the id field by convention — they use the topic field in their payload for routing instead. createPubSub.publish() should reject or warn on event types starting with __. See ADR-003.
Topic Scoping
Topics are scoped by id using the type:id convention:
pubsub.publish("call.responded", requestId, { output });
// → dispatches event with CustomEvent type "call.responded:{requestId}", detail = { type, id, payload }
const stream = pubsub.subscribe("call.responded", requestId);
// → subscribes to topic "call.responded:{requestId}"
Unlike the previous tuple-based model, id is always required. This simplifies the type system and makes correlation explicit.
PubSubEventMap
The type parameter that defines the event map. Maps event type strings to their payload types:
type PubSubEventMap = {
[eventType: string]: unknown;
};
PubSub.publish()
Publishes an event to the pubsub. Throws if the event type starts with __ (reserved for adapter control messages).
pubsub.publish("call.responded", requestId, { output });
// → dispatches event with CustomEvent type "call.responded:{requestId}", detail = { type, id, payload }
The CustomEvent.type is the composite type:id string. This is the key that addEventListener and dispatchEvent use for matching. The EventEnvelope in detail preserves the separate type and id fields for transport adapters that need them.
PubSub.subscribe()
Returns a Repeater<EventEnvelope<TKey, TPayload>> (async iterable). Consumers iterate with for await:
for await (const envelope of pubsub.subscribe("session.status", sessionId)) {
// envelope.type === "session.status"
// envelope.id === sessionId
// envelope.payload === the typed payload
}
The Repeater automatically cleans up its addEventListener when the consumer breaks out of the loop (the stop promise resolves).
Types
| Export | Source | Description |
|---|---|---|
EventEnvelope<TType, TPayload> |
types.ts |
Cross-platform envelope: { type, id, payload }. JSON-serializable. |
TypedEvent<TType, TDetail> |
types.ts |
Event with typed type and detail. Omits CustomEvent's untyped fields. |
TypedEventTarget<TEvent> |
types.ts |
Extends EventTarget with typed addEventListener, dispatchEvent, removeEventListener. All adapters' dispatchEvent returns true (events are non-cancelable). |
TypedEventListener<TEvent> |
types.ts |
(evt: TEvent) => void |
TypedEventListenerObject<TEvent> |
types.ts |
{ handleEvent(object: TEvent): void } |
TypedEventListenerOrEventListenerObject<TEvent> |
types.ts |
Union of the above |
PubSub<TEventMap> |
create_pubsub.ts |
{ publish, subscribe } — publish takes (type, id, payload), subscribe takes (type, id) and returns Repeater<EventEnvelope> |
PubSubConfig<TEventMap> |
create_pubsub.ts |
{ eventTarget?: PubSubEventTarget } |
PubSubEvent<TEventMap, TType> |
create_pubsub.ts |
Derived TypedEvent for a specific event type, with detail as EventEnvelope<TType, TPayload> |
PubSubEventTarget<TEventMap> |
create_pubsub.ts |
TypedEventTarget<PubSubEvent<...>> |
Adapter Lifecycle
All transport adapters provide a close() method for graceful teardown. After close():
- The adapter is unusable (no-op for
addEventListener,removeEventListener,dispatchEvent) - All subscriptions are cleaned up (Redis channels unsubscribed,
__unsubscribesent for WebSocket topics, callbacks cleared) - Intercepted handlers are restored to their originals
- The underlying transport (Redis connection, WebSocket, Worker) is not destroyed — the caller owns it
close() is idempotent. Calling it multiple times is safe.
Adapter return types reflect this:
| Adapter | Return type |
|---|---|
| Redis | RedisEventTarget<TEvent> (extends TypedEventTarget<TEvent>, adds close()) |
| WebSocket Client | WebSocketClientEventTarget<TEvent> (extends TypedEventTarget<TEvent>, adds close()) |
| WebSocket Server | WebSocketServerEventTarget<TEvent> (extends TypedEventTarget<TEvent>, adds addConnection, removeConnection, close()) |
| Worker Host | WorkerHostEventTarget<TEvent> (extends TypedEventTarget<TEvent>, adds close()) |
| Worker Thread | WorkerThreadEventTarget<TEvent> (extends TypedEventTarget<TEvent>, adds close()) |
Operators
All operators work with any AsyncIterable. Operators that return Repeater provide backpressure-aware push semantics.
Repeater-returning operators
These wrap source iterables in a Repeater with explicit push/stop control:
filter
function filter<T>(filterFn: (value: T) => Promise<boolean> | boolean): (source: AsyncIterable<T>) => Repeater<T>;
Type-narrowing overload available: filter<T, U extends T>(fn: (input: T) => input is U).
map
function map<T, O>(mapper: (input: T) => Promise<O> | O): (source: AsyncIterable<T>) => Repeater<O>;
pipe
function pipe<A, B>(a: A, ab: (a: A) => B): B;
// up to 5 arguments
Compose operators: pipe(pubsub.subscribe("myEvent", id), filter(isRelevant), map(transform))
AsyncGenerator operators
These use native async function* generators for simpler stream transformations:
take
Yields only the first count items from the source.
async function* take<T>(source: AsyncIterable<T>, count: number): AsyncIterable<T>
reduce
Reduces the stream to a single value.
async function reduce<T, U>(source: AsyncIterable<T>, reducer: (acc: U, value: T) => Promise<U> | U, initialValue: U): Promise<U>
toArray
Collects all items into an array.
async function toArray<T>(source: AsyncIterable<T>): Promise<T[]>
batch
Groups items into arrays of size.
async function* batch<T>(source: AsyncIterable<T>, size: number): AsyncIterable<T[]>
dedupe
Yields only unique items (uses Set for deduplication).
async function* dedupe<T>(source: AsyncIterable<T>): AsyncIterable<T>
window
Sliding window of size items, advancing by step (default 1).
async function* window<T>(source: AsyncIterable<T>, size: number, step?: number): AsyncIterable<T[]>
flat
Flattens an AsyncIterable<T[]> into AsyncIterable<T>.
async function* flat<T>(source: AsyncIterable<T[]>): AsyncIterable<T>
groupBy
Groups items by key into a Map. Terminal operation (consumes entire stream).
async function groupBy<T, K>(source: AsyncIterable<T>, keyFn: (value: T) => K): Promise<Map<K, T[]>>
chain
Concatenates multiple async iterables into one.
async function* chain<T>(...sources: AsyncIterable<T>[]): AsyncIterable<T>
join
Streaming join between two sources on matching keys.
async function* join<T, U, K>(source1: AsyncIterable<T>, source2: AsyncIterable<U>, keyFn1: (value: T) => K, keyFn2: (value: U) => K): AsyncIterable<[T, U]>
Attribution
createPubSub, filter, map, and pipe are adapted from @graphql-yoga/subscription (MIT). TypedEventTarget types are adapted from @graphql-yoga/typed-event-target (MIT). Repeater is inlined from @repeaterjs/repeater (MIT). See file headers for full license text.