Extracted from alkhub_ts packages/core/operations/ and packages/core/mcp/. - Runtime-agnostic (injected fs/env deps, no Deno globals) - Direct @logtape/logtape import instead of logger wrapper - PendingRequestMap with pubsub-wired call protocol - Peer-dep isolation for MCP adapter (sub-path export) - Schema const naming convention (XSchema + X type alias) - 68 tests passing, build + lint + test all green
11 KiB
status, last_updated
| status | last_updated |
|---|---|
| draft | 2026-04-30 |
Call Protocol
PendingRequestMap, CallHandler, call≡subscribe semantics, event types, error model, and access control.
Overview
The call protocol is the unified transport layer for all operation invocations. It provides a single event-based mechanism that works the same whether the call is local (in-process), remote (hub↔spoke over websocket), or streamed (subscription). It is built on @alkdev/pubsub.
At the protocol level, call and subscribe are the same thing with different consumption patterns:
call: Publishcall.requested, subscribe tocall.responded:{requestId}, resolve on first response →Promise<TOutput>subscribe: Publishcall.requested, subscribe tocall.responded:{requestId}, yield each response →AsyncIterable<TOutput>
Both use the same event types, the same requestId correlation, and the same PendingRequestMap. call is semantically subscribe().next().
Event Types
All communication flows through typed events. The event map is defined as CallEventMap using TypeBox schemas, compatible with @alkdev/pubsub's PubSubPublishArgsByKey.
CallEventMap
const CallEventMap = {
"call.requested": Type.Object({
requestId: Type.String(),
operationId: Type.String(),
input: Type.Unknown(),
parentRequestId: Type.Optional(Type.String()),
deadline: Type.Optional(Type.Number()),
identity: Type.Optional(Type.Object({
id: Type.String(),
scopes: Type.Array(Type.String()),
resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))),
})),
}),
"call.responded": Type.Object({
requestId: Type.String(),
output: Type.Unknown(),
}),
"call.aborted": Type.Object({
requestId: Type.String(),
}),
"call.error": Type.Object({
requestId: Type.String(),
code: Type.String(),
message: Type.String(),
details: Type.Optional(Type.Unknown()),
}),
}
Request Correlation
Every call has a unique requestId (UUID). Nested calls include parentRequestId to track the call chain. Responses and errors match to requests by requestId.
Event Flow
Caller Handler
│ │
│─── call.requested ───────────────>│
│ {requestId, operationId, │
│ input, identity, deadline} │
│ │
│<── call.responded ────────────────│
│ {requestId, output} │
On error:
│<── call.error ────────────────────│
│ {requestId, code, message, │
│ details} │
On abort (caller cancels):
│─── call.aborted ─────────────────>│
│ {requestId} │
Identity
The identity field in call.requested carries the caller's security context through the call chain. Derived from keypal's ApiKeyMetadata — scopes maps directly, resources uses key format "type:id" with scope arrays. Checked by CallHandler against the operation's AccessControl.
PendingRequestMap
PendingRequestMap manages in-flight requests and provides the call() interface. It wraps @alkdev/pubsub internally.
Construction
const callMap = new PendingRequestMap(eventTarget?)
- Creates an internal
PubSub<CallPubSubMap>usingcreatePubSub - If
eventTargetis provided, passes it tocreatePubSubfor transport-level event routing (Redis, WebSocket, etc.) - Wires subscription handlers for
call.responded,call.error, andcall.abortedto route events back to waiting callers
call(operationId, input, options?)
async call(
operationId: string,
input: unknown,
options?: { parentRequestId?: string; deadline?: number; identity?: Identity },
): Promise<unknown>
- Generate
requestIdviacrypto.randomUUID() - Create a
PendingRequestwithresolve/rejectfrom a new Promise - If
deadlineis set, start a timeout timer that rejects withTIMEOUT - Store
PendingRequestin the internal map - Publish
call.requestedevent with all fields - Return the Promise (resolves on
call.responded, rejects oncall.errororcall.aborted)
Internal Subscription Wiring
On construction, three async loops subscribe to pubsub topics:
call.responded: Look upPendingRequestbyrequestId, clear timer if set, resolve withoutputcall.error: Look upPendingRequest, clear timer, reject withCallError(code, message, details)call.aborted: Look upPendingRequest, clear timer, reject withCallError(ABORTED, ...)
respond(requestId, output)
Publishes call.responded. Used by handlers to send results back through the protocol.
emitError(requestId, code, message, details?)
Publishes call.error. Used by handlers to send errors.
abort(requestId)
Looks up the PendingRequest, clears its timer, publishes call.aborted, rejects the Promise with CallError(ABORTED, ...).
CallHandler
buildCallHandler creates a function that bridges pubsub events to OperationRegistry.execute().
function buildCallHandler(config: CallHandlerConfig): CallHandler
interface CallHandlerConfig {
registry: OperationRegistry
eventTarget?: EventTarget
}
type CallHandler = (event: CallRequestedEvent) => Promise<void>
Handler Flow
- Look up operation by
operationIdfrom the registry - If not found, throw
CallError(OPERATION_NOT_FOUND, ...) - Check access control (see below)
- Validate input with
validateOrThrow - Execute operation handler
- On success: the handler is expected to have published
call.respondedthrough whatever mechanism - On failure:
mapErrorconverts the thrown value toCallError
The CallHandler is designed to be wired into a pubsub subscription:
const callHandler = buildCallHandler({ registry, eventTarget })
pubsub.subscribe("call.requested", callHandler)
Access Control
Enforcement Point
CallHandler enforces AccessControl before dispatching to registry.execute(). Direct registry.execute() calls bypass access control — this is by design for trusted internal calls.
Flow
call.requested event arrives with Identity
→ Look up operation's AccessControl
→ Check requiredScopes (caller has ALL?)
→ Check requiredScopesAny (caller has ANY?)
→ Check resourceType/resourceAction against identity.resources
→ All pass → proceed to execute
→ Any fail → throw CallError(ACCESS_DENIED, ...)
checkAccess Implementation
function checkAccess(accessControl: AccessControl, identity: Identity): boolean
- If
requiredScopesis non-empty, verifyidentity.scopescontains every entry (AND) - If
requiredScopesAnyis non-empty, verifyidentity.scopescontains at least one entry (OR) - If
resourceTypeandresourceActionare set, verifyidentity.resources["{resourceType}:{resourceId}"]includesresourceAction - Return
trueif all applicable checks pass
Note: Access control without an identity in the CallRequestedEvent is allowed — unauthenticated calls are permitted if the AccessControl check passes (e.g., operations with empty requiredScopes).
Error Model
The call protocol uses a unified error model. Both infrastructure and domain errors flow through CallError.
CallError
class CallError extends Error {
readonly code: CallErrorCode // InfrastructureErrorCode | string
readonly details?: unknown
}
Infrastructure Error Codes
Reserved codes produced by CallHandler and PendingRequestMap:
| Code | When | Details |
|---|---|---|
OPERATION_NOT_FOUND |
No operation matches operationId |
{ operationId: string } |
ACCESS_DENIED |
Missing scopes | { requiredScopes?: string[] } |
VALIDATION_ERROR |
Input fails inputSchema check |
Wrapped from Value.Errors |
TIMEOUT |
Deadline exceeded | { deadline: number } |
ABORTED |
Call cancelled | — |
EXECUTION_ERROR |
Handler threw, no errorSchemas match |
{ message: string } |
UNKNOWN_ERROR |
Non-Error thrown | { raw: string } |
Domain Error Propagation
Operations declare their possible errors via errorSchemas on IOperationDefinition. When a handler throws, mapError matches the thrown error against declared schemas — falls back to EXECUTION_ERROR if no match.
errorSchemas is the contract between operation and callers about what errors it might produce. No errorSchemas = safe default with EXECUTION_ERROR wrapper.
mapError Resolution
- If already a
CallError, return as-is - If
Errorinstance anderrorSchemasprovided, check iferror.messageincludes any declared error code → returnCallError(code, message, error) - If
Errorinstance, returnCallError(EXECUTION_ERROR, error.message, error) - Otherwise, return
CallError(UNKNOWN_ERROR, String(error), { raw: String(error) })
Nested Call Wiring
Routing is an env construction concern, not a separate protocol layer. buildEnv creates the OperationEnv:
- Direct mode:
buildEnv({ registry, context })— env functions callregistry.execute()directly - Call protocol mode:
buildEnv({ registry, context, callMap })— env functions callcallMap.call(), publishingcall.requestedevents withparentRequestIdpropagation
parentRequestId enables call graph reconstruction and abort cascading — every nested call includes it.
Transport Mapping
The call protocol is transport-agnostic. The PubSub event target determines how events move:
| Transport | Use Case | EventTarget impl |
|---|---|---|
| In-process | Local hub operations | Browser EventTarget (default) |
| Redis | Cross-process events | RedisEventTarget (from @alkdev/pubsub) |
| WebSocket | Hub ↔ spoke bidirectional | WebSocketEventTarget (future) |
Same protocol, same event shapes, same PendingRequestMap — different eventTarget.
Subscribe (Direct)
The subscribe() function provides direct in-process subscription consumption:
async function* subscribe(
registry: OperationRegistry,
operationId: string,
input: unknown,
context: OperationContext,
): AsyncGenerator<unknown, void, unknown>
Gets the operation from the registry, casts its handler to AsyncGenerator, and yields values. Properly cleans up with generator.return() in a finally block.
Use subscribe() for in-process consumption. Use PendingRequestMap.call() for cross-transport invocation that resolves after one event. For cross-transport streaming, use PendingRequestMap.subscribe() to yield multiple events.