--- status: draft last_updated: 2026-05-08 --- # API Surface Core pubsub creation, types, and operators. No transport dependencies. ## `createPubSub` ```ts function createPubSub( config?: PubSubConfig, ): PubSub; ``` Factory function. Accepts an optional `eventTarget` config. If none is provided, uses `new EventTarget()` (in-process). ### Event Envelope Every event dispatched through pubsub uses the `EventEnvelope` format: ```ts interface EventEnvelope { readonly type: TType; readonly id: string; readonly payload: TPayload; } ``` The envelope is the cross-platform serialization contract. All transport adapters serialize/deserialize this format. Domain-specific data goes in `payload`. ### Reserved Event Types Event types starting with `__` (double underscore) are reserved for adapter control messages (e.g., `__subscribe`, `__unsubscribe`). User code must not define event types with this prefix. Control events use the empty string `""` for the `id` field by convention — they use the `topic` field in their `payload` for routing instead. `createPubSub.publish()` should reject or warn on event types starting with `__`. See [ADR-003](decisions/003-subscription-control-protocol.md). ### Topic Scoping Topics are scoped by `id` using the `type:id` convention: ```ts pubsub.publish("call.responded", requestId, { output }); // → dispatches event with CustomEvent type "call.responded:{requestId}", detail = { type, id, payload } const stream = pubsub.subscribe("call.responded", requestId); // → subscribes to topic "call.responded:{requestId}" ``` Unlike the previous tuple-based model, `id` is always required. This simplifies the type system and makes correlation explicit. ### `PubSubEventMap` The type parameter that defines the event map. Maps event type strings to their payload types: ```ts type PubSubEventMap = { [eventType: string]: unknown; }; ``` ### `PubSub.publish()` Publishes an event to the pubsub. Throws if the event type starts with `__` (reserved for adapter control messages). ```ts pubsub.publish("call.responded", requestId, { output }); // → dispatches event with CustomEvent type "call.responded:{requestId}", detail = { type, id, payload } ``` The `CustomEvent.type` is the composite `type:id` string. This is the key that `addEventListener` and `dispatchEvent` use for matching. The `EventEnvelope` in `detail` preserves the separate `type` and `id` fields for transport adapters that need them. ### `PubSub.subscribe()` Returns a `Repeater>` (async iterable). Consumers iterate with `for await`: ```ts for await (const envelope of pubsub.subscribe("session.status", sessionId)) { // envelope.type === "session.status" // envelope.id === sessionId // envelope.payload === the typed payload } ``` The `Repeater` automatically cleans up its `addEventListener` when the consumer breaks out of the loop (the `stop` promise resolves). ## Types | Export | Source | Description | |--------|--------|-------------| | `EventEnvelope` | `types.ts` | Cross-platform envelope: `{ type, id, payload }`. JSON-serializable. | | `TypedEvent` | `types.ts` | Event with typed `type` and `detail`. Omits `CustomEvent`'s untyped fields. | | `TypedEventTarget` | `types.ts` | Extends `EventTarget` with typed `addEventListener`, `dispatchEvent`, `removeEventListener`. | | `TypedEventListener` | `types.ts` | `(evt: TEvent) => void` | | `TypedEventListenerObject` | `types.ts` | `{ handleEvent(object: TEvent): void }` | | `TypedEventListenerOrEventListenerObject` | `types.ts` | Union of the above | | `PubSub` | `create_pubsub.ts` | `{ publish, subscribe }` — publish takes `(type, id, payload)`, subscribe takes `(type, id)` and returns `Repeater` | | `PubSubConfig` | `create_pubsub.ts` | `{ eventTarget?: PubSubEventTarget }` | | `PubSubEvent` | `create_pubsub.ts` | Derived `TypedEvent` for a specific event type, with `detail` as `EventEnvelope` | | `PubSubEventTarget` | `create_pubsub.ts` | `TypedEventTarget>` | ## Operators All operators work with any `AsyncIterable`. Operators that return `Repeater` provide backpressure-aware push semantics. ### Repeater-returning operators These wrap source iterables in a `Repeater` with explicit push/stop control: #### `filter` ```ts function filter(filterFn: (value: T) => Promise | boolean): (source: AsyncIterable) => Repeater; ``` Type-narrowing overload available: `filter(fn: (input: T) => input is U)`. #### `map` ```ts function map(mapper: (input: T) => Promise | O): (source: AsyncIterable) => Repeater; ``` #### `pipe` ```ts function pipe(a: A, ab: (a: A) => B): B; // up to 5 arguments ``` Compose operators: `pipe(pubsub.subscribe("myEvent", id), filter(isRelevant), map(transform))` ### AsyncGenerator operators These use native `async function*` generators for simpler stream transformations: #### `take` Yields only the first `count` items from the source. ```ts async function* take(source: AsyncIterable, count: number): AsyncIterable ``` #### `reduce` Reduces the stream to a single value. ```ts async function reduce(source: AsyncIterable, reducer: (acc: U, value: T) => Promise | U, initialValue: U): Promise ``` #### `toArray` Collects all items into an array. ```ts async function toArray(source: AsyncIterable): Promise ``` #### `batch` Groups items into arrays of `size`. ```ts async function* batch(source: AsyncIterable, size: number): AsyncIterable ``` #### `dedupe` Yields only unique items (uses `Set` for deduplication). ```ts async function* dedupe(source: AsyncIterable): AsyncIterable ``` #### `window` Sliding window of `size` items, advancing by `step` (default 1). ```ts async function* window(source: AsyncIterable, size: number, step?: number): AsyncIterable ``` #### `flat` Flattens an `AsyncIterable` into `AsyncIterable`. ```ts async function* flat(source: AsyncIterable): AsyncIterable ``` #### `groupBy` Groups items by key into a `Map`. Terminal operation (consumes entire stream). ```ts async function groupBy(source: AsyncIterable, keyFn: (value: T) => K): Promise> ``` #### `chain` Concatenates multiple async iterables into one. ```ts async function* chain(...sources: AsyncIterable[]): AsyncIterable ``` #### `join` Streaming join between two sources on matching keys. ```ts async function* join(source1: AsyncIterable, source2: AsyncIterable, keyFn1: (value: T) => K, keyFn2: (value: U) => K): AsyncIterable<[T, U]> ``` ## Attribution `createPubSub`, `filter`, `map`, and `pipe` are adapted from `@graphql-yoga/subscription` (MIT). `TypedEventTarget` types are adapted from `@graphql-yoga/typed-event-target` (MIT). `Repeater` is inlined from `@repeaterjs/repeater` (MIT). See file headers for full license text.