--- status: draft last_updated: 2026-04-30 --- # Call Protocol Unified event-based protocol for request/response and streaming operations. Built on `@alkdev/pubsub`'s `TypedEventTarget` and `Repeater` primitives. ## Overview The call protocol provides a single event-based mechanism that works identically whether the operation is local (in-process), remote (hub/spoke over WebSocket or Iroh), or streamed (subscription). It is transport-agnostic — the same event shapes, same `requestId` correlation, same `PendingRequestMap`. Only the `EventTarget` changes. Two consumption patterns share the same protocol: - **`call()`**: Publish `call.requested`, subscribe to response events scoped by `requestId`, resolve on first response → `Promise` - **`subscribe()`**: Publish `call.requested`, subscribe to `call.part` events scoped by `requestId`, yield each part until `call.completed` or `call.error` → `Repeater` Both use `call.requested` as the trigger. The `operationId` and `operation.type` on the handler side determine which pattern applies. The protocol itself doesn't distinguish — it's the handler that decides whether to respond once (`respond()`) or stream (`part()` + `complete()`). ## Event Types All events use TypeBox schemas, compatible with `@alkdev/pubsub`'s `PubSubPublishArgsByKey`. Schemas are exported as `CallEventSchema` for runtime validation. ### `CallEventSchema` ```ts const CallEventSchema = { "call.requested": Type.Object({ requestId: Type.String(), operationId: Type.String(), input: Type.Unknown(), parentRequestId: Type.Optional(Type.String()), deadline: Type.Optional(Type.Number()), identity: Type.Optional(Type.Object({ id: Type.String(), scopes: Type.Array(Type.String()), resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))), })), }), "call.responded": Type.Object({ requestId: Type.String(), output: Type.Unknown(), }), "call.part": Type.Object({ requestId: Type.String(), output: Type.Unknown(), index: Type.Optional(Type.Number()), }), "call.completed": Type.Object({ requestId: Type.String(), }), "call.aborted": Type.Object({ requestId: Type.String(), }), "call.error": Type.Object({ requestId: Type.String(), code: Type.String(), message: Type.String(), details: Type.Optional(Type.Unknown()), }), } ``` ### Topic Scoping Response events are scoped by `requestId` using pubsub's built-in topic scoping: | Event | Publish | Subscribe (caller) | Subscribe (handler) | |-------|---------|--------------------|--------------------| | `call.requested` | `pubsub.publish("call.requested", event)` | — | Unscoped: `pubsub.subscribe("call.requested")` | | `call.responded` | `pubsub.publish("call.responded", requestId, event)` | Scoped: `pubsub.subscribe("call.responded", requestId)` | — | | `call.part` | `pubsub.publish("call.part", requestId, event)` | Scoped: `pubsub.subscribe("call.part", requestId)` | — | | `call.completed` | `pubsub.publish("call.completed", requestId, event)` | Scoped: `pubsub.subscribe("call.completed", requestId)` | — | | `call.aborted` | `pubsub.publish("call.aborted", requestId, event)` | Scoped: `pubsub.subscribe("call.aborted", requestId)` | Scoped: `pubsub.subscribe("call.aborted", requestId)` | | `call.error` | `pubsub.publish("call.error", requestId, event)` | Scoped: `pubsub.subscribe("call.error", requestId)` | — | This gives every `requestId` its own event channel. On Redis, this maps to `call.responded:{uuid}` channels. On WebSocket or Iroh, the topic string is a routing key. In-process, it's a `CustomEvent` with `type: "call.responded:{uuid}"`. **Why scoped instead of unscoped + manual matching?** Scoped topics avoid O(n) fanout. A caller only receives events for its own request. This matters especially on Redis (pub/sub channels) and Iroh (topic订阅), where unscoped subscriptions would deliver every response to every listener. ## Event Flow ### Call (request/response) ``` Caller Handler │ │ │─── call.requested ───────────────>│ │ {requestId, operationId, │ │ input, identity, deadline} │ │ │ │<── call.responded:{requestId} ────│ │ {requestId, output} │ ``` On error: ``` │<── call.error:{requestId} ───────│ │ {requestId, code, message, │ │ details} │ ``` On timeout or caller cancellation: ``` │─── call.aborted:{requestId} ────>│ │ {requestId} │ ``` ### Subscribe (request/stream) ``` Caller Handler │ │ │─── call.requested ───────────────>│ │ {requestId, operationId, │ │ input, identity} │ │ │ │<── call.part:{requestId} ────────│ │ {requestId, output, index?} │ │ │ │<── call.part:{requestId} ────────│ │ {requestId, output, index?} │ │ │ │<── call.completed:{requestId} ────│ ← stream ends normally │ {requestId} │ ``` On stream error: ``` │<── call.error:{requestId} ───────│ │ {requestId, code, message} │ ``` On caller cancellation (consumer breaks out of `for await`): ``` │─── call.aborted:{requestId} ────>│ │ {requestId} │ ``` ### Nesting Nested calls include `parentRequestId` to track the call chain: ``` │─── call.requested ───────────────>│ {requestId: A, parentRequestId: P} ``` This enables call graph reconstruction and abort cascading — every nested call includes its parent's `requestId`. ## `PendingRequestMap` The primary consumer interface. Wraps `createPubSub` internally and manages the full call/subscribe lifecycle. ### Construction ```ts const callMap = new PendingRequestMap(eventTarget?) ``` - Creates an internal `PubSub` - If `eventTarget` is provided, passes it to `createPubSub` for transport-level event routing ### `call(operationId, input, options?)` → `Promise` 1. Generate `requestId` via `crypto.randomUUID()` 2. Subscribe to `call.responded:{requestId}`, `call.error:{requestId}`, `call.aborted:{requestId}` (scoped) 3. If `deadline` is set, start a timeout timer that publishes `call.aborted` on expiry 4. Publish `call.requested` 5. Return a Promise — resolves on `call.responded`, rejects on `call.error` or `call.aborted` 6. Cleanup: close all scoped subscriptions on settlement ### `subscribe(operationId, input, options?)` → `Repeater` 1. Generate `requestId` via `crypto.randomUUID()` 2. Publish `call.requested` 3. Create scoped subscriptions: `call.part:{requestId}`, `call.completed:{requestId}`, `call.error:{requestId}` 4. Return a `Repeater` that: - Yields `output` from each `call.part` event - Completes on `call.completed` - Rejects on `call.error` - On consumer break (Repeater `stop`), publishes `call.aborted:{requestId}` and closes all subscriptions This means consumers can use operators: ```ts const stream = callMap.subscribe("events.live", { topic: "sensors" }); const filtered = pipe(stream, filter(isRelevant), map(transform)); for await (const value of filtered) { // handle each filtered/mapped stream value } ``` ### Handler-side methods | Method | Description | |--------|-------------| | `respond(requestId, output)` | Publish `call.responded:{requestId}` — single response for call | | `part(requestId, output, index?)` | Publish `call.part:{requestId}` — next chunk in subscription stream | | `complete(requestId)` | Publish `call.completed:{requestId}` — stream ended normally | | `emitError(requestId, code, message, details?)` | Publish `call.error:{requestId}` — error response | | `abort(requestId)` | Publish `call.aborted:{requestId}` — caller cancellation | ## Transport Mapping Same protocol, same event shapes, same `PendingRequestMap` — different `EventTarget`: | Transport | Use Case | EventTarget impl | |-----------|----------|-----------------| | In-process | Local operations | Browser `EventTarget` (default) | | Redis | Cross-process events | `RedisEventTarget` from `@alkdev/pubsub/event-target-redis` | | WebSocket | Hub ↔ spoke bidirectional | `WebSocketEventTarget` (future) | | Iroh | P2P QUIC | `IrohEventTarget` (future) | | SSE | Server → client streaming | `SSEEventTarget` (future) | ## Error Model ### `CallError` ```ts class CallError extends Error { readonly code: string; readonly details?: unknown; } ``` ### Infrastructure Error Codes | Code | When | Details | |------|------|---------| | `OPERATION_NOT_FOUND` | No operation matches `operationId` | `{ operationId: string }` | | `ACCESS_DENIED` | Missing scopes | `{ requiredScopes?: string[] }` | | `VALIDATION_ERROR` | Input fails schema check | Wrapped from `Value.Errors` | | `TIMEOUT` | Deadline exceeded | `{ deadline: number }` | | `ABORTED` | Call/stream cancelled | — | | `EXECUTION_ERROR` | Handler threw, no `errorSchemas` match | `{ message: string }` | | `UNKNOWN_ERROR` | Non-Error thrown | `{ raw: string }` | ## TypeBox Schemas and Validation All event shapes are defined as TypeBox schemas in `CallEventSchema`. Consumers can use `Value.Check()` or `Value.Errors()` from `@alkdev/typebox` for runtime validation: ```ts import { Value } from "@alkdev/typebox"; import { CallEventSchema } from "@alkdev/pubsub/call"; if (!Value.Check(CallEventSchema["call.requested"], incoming)) { const errors = [...Value.Errors(CallEventSchema["call.requested"], incoming)]; // reject with VALIDATION_ERROR } ``` This enables validation on the Iroh and SSE transports where incoming data is untrusted JSON. ## Relationship to `@alkdev/operations` `@alkdev/operations` provides the `OperationRegistry`, access control, and handler dispatch. It uses `@alkdev/pubsub/call` for: - `PendingRequestMap` — call/subscribe client interface - `CallEventSchema` — runtime validation of incoming events - `CallError` and `CallErrorCode` — error construction and matching - Type exports — `CallRequestedEvent`, etc. for handler signatures The `CallHandler` in operations receives `call.requested` events, looks up the operation, validates input, checks access, and dispatches to the handler. For query/mutation handlers, it calls `respond()`. For subscription handlers, it calls `part()` and `complete()`. ## Operators and Stream Composition Since `subscribe()` returns a `Repeater` (which implements `AsyncIterable`), all pubsub operators work on streams: ```ts import { pipe, filter, map } from "@alkdev/pubsub"; const stream = callMap.subscribe("events.live", { topic: "sensors" }); const filtered = pipe( stream, filter((e) => e.priority > 5), map((e) => ({ ...e, enriched: true })), ); ``` This works the same regardless of whether the stream source is in-process, remote via Redis, or remote via Iroh/SSE.