Add remote subscription support so spokes can consume streaming operations over pubsub transports (WebSocket, Redis). Extract checkAccess to access.ts to break circular dep between call.ts and subscribe.ts.
8.4 KiB
Task: Implement ADR-007 Subscription Transport
Priority: High (blocks hub↔spoke streaming)
Dependencies: ADR-005 (done), ADR-006 (done). Heartbeat idle-timeout semantics settled in ADR-007 amended 2026-05-13.
Architecture docs: ADR-007, call-protocol.md, adapters.md
Status: ✅ Completed (2026-05-16)
Scope
Three changes, all in source. No new modules needed.
1. PendingRequestMap.subscribe() — Remote subscription transport (src/call.ts)
Add a subscribe() method that returns AsyncIterable<ResponseEnvelope> using @alkdev/pubsub's Repeater.
What to build:
-
Add
PendingEntrydiscriminated union to track whether arequestIdis a single-resolutioncallor a multi-yieldsubscribe:type PendingEntry = | { type: "call"; promise: PendingRequest } | { type: "subscribe"; repeater: Repeater<ResponseEnvelope> }Replace
Map<string, PendingRequest>withMap<string, PendingEntry>. -
Add
subscribe()method onPendingRequestMap:subscribe( operationId: string, input: unknown, options?: { parentRequestId?: string; deadline?: number; identity?: Identity }, ): AsyncIterable<ResponseEnvelope>- Generate
requestIdviacrypto.randomUUID() - Create a
Repeater<ResponseEnvelope>from@alkdev/pubsub - Store
{ type: "subscribe", repeater }in the map - Publish
call.requested(same ascall()does) - Return the Repeater as
AsyncIterable
- Generate
-
Update
setupSubscriptions()event routing to handle both entry types:call.responded:{requestId}: Iftype: "call"→ resolve promise + delete. Iftype: "subscribe"→ push to repeater (keep entry).call.error:{requestId}: Iftype: "call"→ reject promise + delete. Iftype: "subscribe"→ push error to repeater then close repeater + delete.call.aborted:{requestId}: Iftype: "call"→ reject promise + delete. Iftype: "subscribe"→ close repeater + delete.- Orphaned events (requestId not in map): silently ignore.
-
Implement heartbeat-based idle timeout for
subscribe():- When
deadlineis set onsubscribe(), store it in thePendingEntry.subscribe. - In the
call.respondedhandler for subscription entries, reset the deadline timer on each received envelope. - If the deadline timer fires (no envelope received within
deadlinems), treat it asTIMEOUT— close the repeater, publishcall.aborted, delete the entry. - Heartbeats from handlers are indistinguishable from data at the protocol level — a
ResponseEnvelopewith_meta: { heartbeat: true }is just another envelope that resets the deadline. Consumers filter heartbeats by inspectingenvelope._meta.
- When
-
Update
abort()method: handletype: "subscribe"by closing the repeater. -
Update
getPendingCount(): count both call and subscribe entries.
2. CallHandler type dispatch (src/call.ts)
Update buildCallHandler() to check the operation's type and route accordingly:
- QUERY / MUTATION: Call
registry.execute(), publish singlecall.respondedorcall.error(unchanged from current behavior) - SUBSCRIPTION: Call
subscribe(), iterate the generator, publishcall.respondedper yield, handlecall.abortedby callinggenerator.return()
return async (event: CallRequestedEvent): Promise<void> => {
const { requestId, operationId, input, identity } = event;
const context: OperationContext = {
requestId,
parentRequestId: event.parentRequestId,
identity,
};
try {
const spec = registry.getSpec(operationId);
if (!spec) {
throw new CallError(InfrastructureErrorCode.OPERATION_NOT_FOUND, `Operation not found: ${operationId}`);
}
if (spec.type === OperationType.SUBSCRIPTION) {
// Subscription path: iterate generator, publish per yield
const generator = subscribe(registry, operationId, input, context);
for await (const envelope of generator) {
callMap.respond(requestId, envelope);
}
} else {
// QUERY / MUTATION: single response
const envelope = await registry.execute(operationId, input, context);
callMap.respond(requestId, envelope);
}
} catch (error) {
const spec = registry.getSpec(operationId);
const callError = mapError(error, spec?.errorSchemas);
callMap.emitError(requestId, callError.code, callError.message, callError.details);
}
};
For subscription abort handling: CallHandler should subscribe to call.aborted:{requestId} (or use the existing callMap mechanism) and call generator.return() when an abort arrives. This can be deferred to a follow-on — the initial implementation can let the generator naturally complete or error out.
3. FromOpenAPI SSE AsyncGenerator handlers (src/from_openapi.ts)
Replace the single-return OperationHandler for SUBSCRIPTION-type operations with an AsyncGenerator-based SubscriptionHandler.
What to build:
-
Add SSE parser function
parseSSEFrames(buffer: string): { events: SSEEvent[], remaining: string }:- Handle BOM (U+FEFF) at stream start
- Split on
\n,\r\n,\r - Parse
data:,event:,id:fields per WHATWG spec :-prefixed lines (comments) are ignored- Empty line dispatches buffered event
- Partial lines across
read()calls: retainremainingbuffer, prepend to next chunk
-
Add
SSEEventinterface:interface SSEEvent { data: string eventType: string lastEventId: string } -
In
createHTTPOperation(), whenopType === OperationType.SUBSCRIPTION, generate aSubscriptionHandlerinstead ofOperationHandler. The handler should:- Call
fetch()with URL/params/auth (same as current) - On HTTP error, throw
CallError("EXECUTION_ERROR", ...) - Read response body as
ReadableStreamviaresponse.body.getReader() - Decode chunks with
TextDecoder - Parse SSE frames from the text stream via
parseSSEFrames() - Yield each parsed event wrapped in
httpEnvelope(data, { statusCode, headers, contentType: "text/event-stream" }) - In
finallyblock: release the reader lock - Handle heartbeat yield if needed for long-lived SSE streams (TBD — initial pass can skip)
- Call
-
Update the return type of
createHTTPOperation()and callers to handle theOperationHandler | SubscriptionHandlerunion:FromOpenAPI()return type:Array<OperationSpec & { handler: OperationHandler | SubscriptionHandler }>FromOpenAPIFile()andFromOpenAPIUrl()return types similarly
-
SSE
eventtype andidfields: The parser parses them but they are NOT carried in the currenthttpEnvelope()— thedatafield value is the primary payload. If consumers need per-event metadata, a futureSSEResponseMetatype can be added. This is out of scope for this task. Heartbeat detection (for filterable no-op envelopes) is also deferred.
Tests
PendingRequestMap.subscribe(): test with in-process EventTarget, verify Repeater yields each envelope, verify deadline kills after idle interval, verify abort cleans up, verify heartbeat resets deadlineCallHandlerdispatch: test QUERY/MUTATION path (execute), test SUBSCRIPTION path (subscribe + multiple responds), test error propagation- SSE parser: test BOM, all line endings, multiple
data:lines,event:/id:fields, comments, empty data, partial-line buffering, malformed lines FromOpenAPISUBSCRIPTION handler: mock fetch returning a readable SSE stream, verify each event is yielded as httpEnvelope, verify error handling
Acceptance Criteria
PendingRequestMap.subscribe()exists and returnsAsyncIterable<ResponseEnvelope>- Multiple
call.respondedevents with the samerequestIdroute to the Repeater, not a single-resolution promise - Deadline timer fires as idle timeout (resets on each received envelope) for subscriptions
CallHandlerdispatches QUERY/MUTATION toexecute(), SUBSCRIPTION tosubscribe()- SSE operations from
FromOpenAPIare consumable as subscriptions - All existing tests still pass
- TypeScript compiles clean (
npm run lint)
References
@alkdev/pubsubRepeater API: seesrc/repeater.tsin@alkdev/pubsubsource- heartbeat idle timeout design: ADR-007 § Open Questions #2
- SSE spec: https://html.spec.whatwg.org/multipage/server-sent-events.html