From 04b3464c36103eaf23b8a20f59ec96fa4b45f1c7 Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Thu, 30 Apr 2026 13:46:39 +0000 Subject: [PATCH] Add call protocol module with streaming support New sub-path export @alkdev/pubsub/call providing: - CallEventSchema (TypeBox schemas) for call.requested/responded/part/completed/aborted/error - PendingRequestMap with call() (request/response) and subscribe() (streaming via Repeater) - CallError class and CallErrorCode constants - Scoped topic subscriptions (call.responded:{requestId}) to avoid O(n) fanout - subscribe() yields call.part events until call.completed or call.error, with automatic call.aborted on consumer break Also adds @alkdev/typebox as runtime dependency and architecture doc. --- docs/architecture/README.md | 6 +- docs/architecture/call-protocol.md | 285 ++++++++++++++++++++++++++ package-lock.json | 9 + package.json | 14 +- src/call.ts | 307 +++++++++++++++++++++++++++++ tsup.config.ts | 1 + 6 files changed, 619 insertions(+), 3 deletions(-) create mode 100644 docs/architecture/call-protocol.md create mode 100644 src/call.ts diff --git a/docs/architecture/README.md b/docs/architecture/README.md index 1d416d6..d13a87e 100644 --- a/docs/architecture/README.md +++ b/docs/architecture/README.md @@ -30,7 +30,8 @@ const pubsub = createPubSub({ ## What This Package Provides -- **Core** — `createPubSub`, `TypedEventTarget`, `TypedEvent`, topic scoping, `filter`/`map`/`pipe` operators +- **Core** — `createPubSub`, `TypedEventTarget`, `TypedEvent`, topic scoping, `filter`/`map`/`pipe` operators, `Repeater` (inlined from @repeaterjs/repeater) +- **Call protocol** (`@alkdev/pubsub/call`) — `PendingRequestMap`, `CallEventSchema`, `CallError`, event types for request/response and streaming operations - **Adapters** (each is a peer-dep island, importable via sub-path export): - In-process (default `EventTarget`, no adapter needed) - Redis (`@alkdev/pubsub/event-target-redis`, peer dep: `ioredis`) @@ -51,7 +52,7 @@ The hub uses pubsub for event routing between operations, runners, and the SSE i ### Future: standalone spoke SDK -Spokes will import `@alkdev/pubsub` directly to create their event target (WebSocket or Iroh) and wire it into `createPubSub`. The call protocol types live in a separate `@alkdev/call-protocol` package (not yet extracted). +Spokes will import `@alkdev/pubsub` directly to create their event target (WebSocket or Iroh) and wire it into `createPubSub`. Call protocol types and `PendingRequestMap` are available from `@alkdev/pubsub/call`. ## Threat Model @@ -64,6 +65,7 @@ Spokes will import `@alkdev/pubsub` directly to create their event target (WebSo | Document | Content | |----------|---------| | [api-surface.md](api-surface.md) | createPubSub factory, PubSub types, operators, TypedEventTarget types | +| [call-protocol.md](call-protocol.md) | Call/subscribe protocol — event types, PendingRequestMap, streaming, error model, transport mapping | | [event-targets.md](event-targets.md) | In-process, Redis, WebSocket adapters — interface, configuration, limitations | | [iroh-transport.md](iroh-transport.md) | Iroh P2P QUIC transport — protocol, framing, identity, hub/spoke sides, reconnection | | [build-distribution.md](build-distribution.md) | Dependencies, project structure, tree-shaking, sub-path exports, targets | diff --git a/docs/architecture/call-protocol.md b/docs/architecture/call-protocol.md new file mode 100644 index 0000000..f66556d --- /dev/null +++ b/docs/architecture/call-protocol.md @@ -0,0 +1,285 @@ +--- +status: draft +last_updated: 2026-04-30 +--- + +# Call Protocol + +Unified event-based protocol for request/response and streaming operations. Built on `@alkdev/pubsub`'s `TypedEventTarget` and `Repeater` primitives. + +## Overview + +The call protocol provides a single event-based mechanism that works identically whether the operation is local (in-process), remote (hub/spoke over WebSocket or Iroh), or streamed (subscription). It is transport-agnostic — the same event shapes, same `requestId` correlation, same `PendingRequestMap`. Only the `EventTarget` changes. + +Two consumption patterns share the same protocol: + +- **`call()`**: Publish `call.requested`, subscribe to response events scoped by `requestId`, resolve on first response → `Promise` +- **`subscribe()`**: Publish `call.requested`, subscribe to `call.part` events scoped by `requestId`, yield each part until `call.completed` or `call.error` → `Repeater` + +Both use `call.requested` as the trigger. The `operationId` and `operation.type` on the handler side determine which pattern applies. The protocol itself doesn't distinguish — it's the handler that decides whether to respond once (`respond()`) or stream (`part()` + `complete()`). + +## Event Types + +All events use TypeBox schemas, compatible with `@alkdev/pubsub`'s `PubSubPublishArgsByKey`. Schemas are exported as `CallEventSchema` for runtime validation. + +### `CallEventSchema` + +```ts +const CallEventSchema = { + "call.requested": Type.Object({ + requestId: Type.String(), + operationId: Type.String(), + input: Type.Unknown(), + parentRequestId: Type.Optional(Type.String()), + deadline: Type.Optional(Type.Number()), + identity: Type.Optional(Type.Object({ + id: Type.String(), + scopes: Type.Array(Type.String()), + resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))), + })), + }), + "call.responded": Type.Object({ + requestId: Type.String(), + output: Type.Unknown(), + }), + "call.part": Type.Object({ + requestId: Type.String(), + output: Type.Unknown(), + index: Type.Optional(Type.Number()), + }), + "call.completed": Type.Object({ + requestId: Type.String(), + }), + "call.aborted": Type.Object({ + requestId: Type.String(), + }), + "call.error": Type.Object({ + requestId: Type.String(), + code: Type.String(), + message: Type.String(), + details: Type.Optional(Type.Unknown()), + }), +} +``` + +### Topic Scoping + +Response events are scoped by `requestId` using pubsub's built-in topic scoping: + +| Event | Publish | Subscribe (caller) | Subscribe (handler) | +|-------|---------|--------------------|--------------------| +| `call.requested` | `pubsub.publish("call.requested", event)` | — | Unscoped: `pubsub.subscribe("call.requested")` | +| `call.responded` | `pubsub.publish("call.responded", requestId, event)` | Scoped: `pubsub.subscribe("call.responded", requestId)` | — | +| `call.part` | `pubsub.publish("call.part", requestId, event)` | Scoped: `pubsub.subscribe("call.part", requestId)` | — | +| `call.completed` | `pubsub.publish("call.completed", requestId, event)` | Scoped: `pubsub.subscribe("call.completed", requestId)` | — | +| `call.aborted` | `pubsub.publish("call.aborted", requestId, event)` | Scoped: `pubsub.subscribe("call.aborted", requestId)` | Scoped: `pubsub.subscribe("call.aborted", requestId)` | +| `call.error` | `pubsub.publish("call.error", requestId, event)` | Scoped: `pubsub.subscribe("call.error", requestId)` | — | + +This gives every `requestId` its own event channel. On Redis, this maps to `call.responded:{uuid}` channels. On WebSocket or Iroh, the topic string is a routing key. In-process, it's a `CustomEvent` with `type: "call.responded:{uuid}"`. + +**Why scoped instead of unscoped + manual matching?** Scoped topics avoid O(n) fanout. A caller only receives events for its own request. This matters especially on Redis (pub/sub channels) and Iroh (topic订阅), where unscoped subscriptions would deliver every response to every listener. + +## Event Flow + +### Call (request/response) + +``` +Caller Handler + │ │ + │─── call.requested ───────────────>│ + │ {requestId, operationId, │ + │ input, identity, deadline} │ + │ │ + │<── call.responded:{requestId} ────│ + │ {requestId, output} │ +``` + +On error: + +``` + │<── call.error:{requestId} ───────│ + │ {requestId, code, message, │ + │ details} │ +``` + +On timeout or caller cancellation: + +``` + │─── call.aborted:{requestId} ────>│ + │ {requestId} │ +``` + +### Subscribe (request/stream) + +``` +Caller Handler + │ │ + │─── call.requested ───────────────>│ + │ {requestId, operationId, │ + │ input, identity} │ + │ │ + │<── call.part:{requestId} ────────│ + │ {requestId, output, index?} │ + │ │ + │<── call.part:{requestId} ────────│ + │ {requestId, output, index?} │ + │ │ + │<── call.completed:{requestId} ────│ ← stream ends normally + │ {requestId} │ +``` + +On stream error: + +``` + │<── call.error:{requestId} ───────│ + │ {requestId, code, message} │ +``` + +On caller cancellation (consumer breaks out of `for await`): + +``` + │─── call.aborted:{requestId} ────>│ + │ {requestId} │ +``` + +### Nesting + +Nested calls include `parentRequestId` to track the call chain: + +``` + │─── call.requested ───────────────>│ {requestId: A, parentRequestId: P} +``` + +This enables call graph reconstruction and abort cascading — every nested call includes its parent's `requestId`. + +## `PendingRequestMap` + +The primary consumer interface. Wraps `createPubSub` internally and manages the full call/subscribe lifecycle. + +### Construction + +```ts +const callMap = new PendingRequestMap(eventTarget?) +``` + +- Creates an internal `PubSub` +- If `eventTarget` is provided, passes it to `createPubSub` for transport-level event routing + +### `call(operationId, input, options?)` → `Promise` + +1. Generate `requestId` via `crypto.randomUUID()` +2. Subscribe to `call.responded:{requestId}`, `call.error:{requestId}`, `call.aborted:{requestId}` (scoped) +3. If `deadline` is set, start a timeout timer that publishes `call.aborted` on expiry +4. Publish `call.requested` +5. Return a Promise — resolves on `call.responded`, rejects on `call.error` or `call.aborted` +6. Cleanup: close all scoped subscriptions on settlement + +### `subscribe(operationId, input, options?)` → `Repeater` + +1. Generate `requestId` via `crypto.randomUUID()` +2. Publish `call.requested` +3. Create scoped subscriptions: `call.part:{requestId}`, `call.completed:{requestId}`, `call.error:{requestId}` +4. Return a `Repeater` that: + - Yields `output` from each `call.part` event + - Completes on `call.completed` + - Rejects on `call.error` + - On consumer break (Repeater `stop`), publishes `call.aborted:{requestId}` and closes all subscriptions + +This means consumers can use operators: + +```ts +const stream = callMap.subscribe("events.live", { topic: "sensors" }); +const filtered = pipe(stream, filter(isRelevant), map(transform)); +for await (const value of filtered) { + // handle each filtered/mapped stream value +} +``` + +### Handler-side methods + +| Method | Description | +|--------|-------------| +| `respond(requestId, output)` | Publish `call.responded:{requestId}` — single response for call | +| `part(requestId, output, index?)` | Publish `call.part:{requestId}` — next chunk in subscription stream | +| `complete(requestId)` | Publish `call.completed:{requestId}` — stream ended normally | +| `emitError(requestId, code, message, details?)` | Publish `call.error:{requestId}` — error response | +| `abort(requestId)` | Publish `call.aborted:{requestId}` — caller cancellation | + +## Transport Mapping + +Same protocol, same event shapes, same `PendingRequestMap` — different `EventTarget`: + +| Transport | Use Case | EventTarget impl | +|-----------|----------|-----------------| +| In-process | Local operations | Browser `EventTarget` (default) | +| Redis | Cross-process events | `RedisEventTarget` from `@alkdev/pubsub/event-target-redis` | +| WebSocket | Hub ↔ spoke bidirectional | `WebSocketEventTarget` (future) | +| Iroh | P2P QUIC | `IrohEventTarget` (future) | +| SSE | Server → client streaming | `SSEEventTarget` (future) | + +## Error Model + +### `CallError` + +```ts +class CallError extends Error { + readonly code: string; + readonly details?: unknown; +} +``` + +### Infrastructure Error Codes + +| Code | When | Details | +|------|------|---------| +| `OPERATION_NOT_FOUND` | No operation matches `operationId` | `{ operationId: string }` | +| `ACCESS_DENIED` | Missing scopes | `{ requiredScopes?: string[] }` | +| `VALIDATION_ERROR` | Input fails schema check | Wrapped from `Value.Errors` | +| `TIMEOUT` | Deadline exceeded | `{ deadline: number }` | +| `ABORTED` | Call/stream cancelled | — | +| `EXECUTION_ERROR` | Handler threw, no `errorSchemas` match | `{ message: string }` | +| `UNKNOWN_ERROR` | Non-Error thrown | `{ raw: string }` | + +## TypeBox Schemas and Validation + +All event shapes are defined as TypeBox schemas in `CallEventSchema`. Consumers can use `Value.Check()` or `Value.Errors()` from `@alkdev/typebox` for runtime validation: + +```ts +import { Value } from "@alkdev/typebox"; +import { CallEventSchema } from "@alkdev/pubsub/call"; + +if (!Value.Check(CallEventSchema["call.requested"], incoming)) { + const errors = [...Value.Errors(CallEventSchema["call.requested"], incoming)]; + // reject with VALIDATION_ERROR +} +``` + +This enables validation on the Iroh and SSE transports where incoming data is untrusted JSON. + +## Relationship to `@alkdev/operations` + +`@alkdev/operations` provides the `OperationRegistry`, access control, and handler dispatch. It uses `@alkdev/pubsub/call` for: + +- `PendingRequestMap` — call/subscribe client interface +- `CallEventSchema` — runtime validation of incoming events +- `CallError` and `CallErrorCode` — error construction and matching +- Type exports — `CallRequestedEvent`, etc. for handler signatures + +The `CallHandler` in operations receives `call.requested` events, looks up the operation, validates input, checks access, and dispatches to the handler. For query/mutation handlers, it calls `respond()`. For subscription handlers, it calls `part()` and `complete()`. + +## Operators and Stream Composition + +Since `subscribe()` returns a `Repeater` (which implements `AsyncIterable`), all pubsub operators work on streams: + +```ts +import { pipe, filter, map } from "@alkdev/pubsub"; + +const stream = callMap.subscribe("events.live", { topic: "sensors" }); +const filtered = pipe( + stream, + filter((e) => e.priority > 5), + map((e) => ({ ...e, enriched: true })), +); +``` + +This works the same regardless of whether the stream source is in-process, remote via Redis, or remote via Iroh/SSE. \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index cd80765..c4986cd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,6 +8,9 @@ "name": "@alkdev/pubsub", "version": "0.1.0", "license": "MIT OR Apache-2.0", + "dependencies": { + "@alkdev/typebox": "^0.34.49" + }, "devDependencies": { "@types/node": "^22.0.0", "@vitest/coverage-v8": "^3.2.4", @@ -28,6 +31,12 @@ } } }, + "node_modules/@alkdev/typebox": { + "version": "0.34.49", + "resolved": "https://registry.npmjs.org/@alkdev/typebox/-/typebox-0.34.49.tgz", + "integrity": "sha512-hMidpI6GlMgQMlW9KEd8I3ywgewV6mva9iJaDuBfGtgeRAGrB8yyu6T/fHmgmyQineZ8l4/1PdH/VNr3S2er2g==", + "license": "MIT" + }, "node_modules/@ampproject/remapping": { "version": "2.3.0", "resolved": "https://registry.npmjs.org/@ampproject/remapping/-/remapping-2.3.0.tgz", diff --git a/package.json b/package.json index 8700c50..5b48d36 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,16 @@ "types": "./dist/event-target-redis.d.cts", "default": "./dist/event-target-redis.cjs" } + }, + "./call": { + "import": { + "types": "./dist/call.d.ts", + "default": "./dist/call.js" + }, + "require": { + "types": "./dist/call.d.cts", + "default": "./dist/call.cjs" + } } }, "publishConfig": { @@ -52,7 +62,9 @@ "quic" ], "license": "MIT OR Apache-2.0", - "dependencies": {}, + "dependencies": { + "@alkdev/typebox": "^0.34.49" + }, "peerDependencies": { "ioredis": "^5.0.0" }, diff --git a/src/call.ts b/src/call.ts new file mode 100644 index 0000000..3206bd5 --- /dev/null +++ b/src/call.ts @@ -0,0 +1,307 @@ +import { Type, type Static } from "@alkdev/typebox"; +import { createPubSub, type PubSub } from "./create_pubsub.js"; +import { Repeater, type Push, type Stop } from "./repeater.js"; + +export const CallEventSchema = { + "call.requested": Type.Object({ + requestId: Type.String(), + operationId: Type.String(), + input: Type.Unknown(), + parentRequestId: Type.Optional(Type.String()), + deadline: Type.Optional(Type.Number()), + identity: Type.Optional(Type.Object({ + id: Type.String(), + scopes: Type.Array(Type.String()), + resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))), + })), + }), + "call.responded": Type.Object({ + requestId: Type.String(), + output: Type.Unknown(), + }), + "call.part": Type.Object({ + requestId: Type.String(), + output: Type.Unknown(), + index: Type.Optional(Type.Number()), + }), + "call.completed": Type.Object({ + requestId: Type.String(), + }), + "call.aborted": Type.Object({ + requestId: Type.String(), + }), + "call.error": Type.Object({ + requestId: Type.String(), + code: Type.String(), + message: Type.String(), + details: Type.Optional(Type.Unknown()), + }), +} as const; + +export type CallRequestedEvent = Static; +export type CallRespondedEvent = Static; +export type CallPartEvent = Static; +export type CallCompletedEvent = Static; +export type CallAbortedEvent = Static; +export type CallErrorEvent = Static; + +type CallPubSubMap = { + "call.requested": [CallRequestedEvent]; + "call.responded": [string, CallRespondedEvent]; + "call.part": [string, CallPartEvent]; + "call.completed": [string, CallCompletedEvent]; + "call.aborted": [string, CallAbortedEvent]; + "call.error": [string, CallErrorEvent]; +}; + +export const CallErrorCode = { + OPERATION_NOT_FOUND: "OPERATION_NOT_FOUND", + ACCESS_DENIED: "ACCESS_DENIED", + VALIDATION_ERROR: "VALIDATION_ERROR", + TIMEOUT: "TIMEOUT", + ABORTED: "ABORTED", + EXECUTION_ERROR: "EXECUTION_ERROR", + UNKNOWN_ERROR: "UNKNOWN_ERROR", +} as const; + +export type CallErrorCodeType = (typeof CallErrorCode)[keyof typeof CallErrorCode]; + +export class CallError extends Error { + readonly code: string; + readonly details?: unknown; + + constructor(code: string, message: string, details?: unknown) { + super(message); + this.name = "CallError"; + this.code = code; + this.details = details; + } +} + +interface PendingRequest { + resolve: (value: unknown) => void; + reject: (reason: unknown) => void; + deadline?: number; + timer?: ReturnType; + unsubscribe: () => void; +} + +export class PendingRequestMap { + private requests = new Map(); + private pubsub: PubSub; + + constructor(eventTarget?: EventTarget) { + this.pubsub = createPubSub( + eventTarget ? { eventTarget: eventTarget as any } : undefined, + ); + } + + async call( + operationId: string, + input: unknown, + options?: { parentRequestId?: string; deadline?: number; identity?: CallRequestedEvent["identity"] }, + ): Promise { + const requestId = crypto.randomUUID(); + + const respondedIter = this.pubsub.subscribe("call.responded", requestId); + const errorIter = this.pubsub.subscribe("call.error", requestId); + const abortedIter = this.pubsub.subscribe("call.aborted", requestId); + + const cleanup = (): void => { + respondedIter.return?.(); + errorIter.return?.(); + abortedIter.return?.(); + }; + + let timer: ReturnType | undefined; + if (options?.deadline) { + timer = setTimeout(() => { + cleanup(); + this.pubsub.publish("call.aborted", requestId, { requestId }); + }, options.deadline - Date.now()); + } + + this.pubsub.publish("call.requested", { + requestId, + operationId, + input, + parentRequestId: options?.parentRequestId, + deadline: options?.deadline, + identity: options?.identity, + }); + + try { + const result = await new Promise((resolve, reject) => { + const pending: PendingRequest = { + resolve: (value: unknown) => { + if (timer) clearTimeout(timer); + cleanup(); + resolve(value); + }, + reject: (reason: unknown) => { + if (timer) clearTimeout(timer); + cleanup(); + reject(reason); + }, + deadline: options?.deadline, + timer, + unsubscribe: cleanup, + }; + + this.requests.set(requestId, pending); + + (async () => { + for await (const event of respondedIter) { + const responded = event as CallRespondedEvent; + const p = this.requests.get(responded.requestId); + if (p) { + this.requests.delete(responded.requestId); + p.resolve(responded.output); + } + return; + } + })(); + + (async () => { + for await (const event of errorIter) { + const err = event as CallErrorEvent; + const p = this.requests.get(err.requestId); + if (p) { + this.requests.delete(err.requestId); + p.reject(new CallError(err.code, err.message, err.details)); + } + return; + } + })(); + + (async () => { + for await (const event of abortedIter) { + const aborted = event as CallAbortedEvent; + const p = this.requests.get(aborted.requestId); + if (p) { + this.requests.delete(aborted.requestId); + p.reject(new CallError(CallErrorCode.ABORTED, `Request ${aborted.requestId} was aborted`)); + } + return; + } + })(); + }); + + return result; + } finally { + if (timer) clearTimeout(timer); + } + } + + subscribe( + operationId: string, + input: unknown, + options?: { parentRequestId?: string; deadline?: number; identity?: CallRequestedEvent["identity"] }, + ): Repeater { + const requestId = crypto.randomUUID(); + const map = this; + + return new Repeater(async function (push: Push, stop: Stop) { + map.pubsub.publish("call.requested", { + requestId, + operationId, + input, + parentRequestId: options?.parentRequestId, + deadline: options?.deadline, + identity: options?.identity, + }); + + const partIter = map.pubsub.subscribe("call.part", requestId); + const completedIter = map.pubsub.subscribe("call.completed", requestId); + const errorIter = map.pubsub.subscribe("call.error", requestId); + + let settled = false; + + const cleanup = (): void => { + if (!settled) { + settled = true; + map.pubsub.publish("call.aborted", requestId, { requestId }); + } + partIter.return?.(); + completedIter.return?.(); + errorIter.return?.(); + }; + + stop.then(cleanup); + + try { + const partPromise = (async (): Promise => { + for await (const event of partIter) { + const part = event as CallPartEvent; + await push(part.output); + } + throw new Error("part stream ended unexpectedly"); + })(); + + const completedPromise = (async () => { + for await (const _ of completedIter) { + return; + } + })(); + + const errorPromise = (async (): Promise => { + for await (const event of errorIter) { + const err = event as CallErrorEvent; + throw new CallError(err.code, err.message, err.details); + } + throw new Error("error stream ended unexpectedly"); + })(); + + await Promise.race([completedPromise, errorPromise, partPromise]); + } finally { + cleanup(); + stop(); + } + }); + } + + respond(requestId: string, output: unknown): void { + this.pubsub.publish("call.responded", requestId, { + requestId, + output, + }); + } + + part(requestId: string, output: unknown, index?: number): void { + this.pubsub.publish("call.part", requestId, { + requestId, + output, + index, + }); + } + + complete(requestId: string): void { + this.pubsub.publish("call.completed", requestId, { requestId }); + } + + emitError(requestId: string, code: string, message: string, details?: unknown): void { + this.pubsub.publish("call.error", requestId, { + requestId, + code, + message, + details, + }); + } + + abort(requestId: string): void { + const pending = this.requests.get(requestId); + if (pending) { + if (pending.timer) clearTimeout(pending.timer); + this.requests.delete(requestId); + pending.unsubscribe(); + this.pubsub.publish("call.aborted", requestId, { requestId }); + pending.reject(new CallError(CallErrorCode.ABORTED, `Request ${requestId} was aborted`)); + } else { + this.pubsub.publish("call.aborted", requestId, { requestId }); + } + } + + getPendingCount(): number { + return this.requests.size; + } +} \ No newline at end of file diff --git a/tsup.config.ts b/tsup.config.ts index fd4ae46..2a75871 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -4,6 +4,7 @@ export default defineConfig({ entry: [ 'src/index.ts', 'src/event-target-redis.ts', + 'src/call.ts', ], format: ['esm', 'cjs'], dts: true,