feat: implement ADR-007 subscription transport — PendingRequestMap.subscribe(), CallHandler dispatch, SSE AsyncGenerator handlers

Add remote subscription support so spokes can consume streaming operations
over pubsub transports (WebSocket, Redis). Extract checkAccess to access.ts
to break circular dep between call.ts and subscribe.ts.
This commit is contained in:
2026-05-16 06:03:21 +00:00
parent 5ec6c380a7
commit 92936f4232
14 changed files with 907 additions and 106 deletions

27
src/access.ts Normal file
View File

@@ -0,0 +1,27 @@
import type { AccessControl, Identity } from "./types.js";
export function checkAccess(accessControl: AccessControl, identity: Identity): boolean {
const { requiredScopes, requiredScopesAny, resourceType, resourceAction } = accessControl;
if (requiredScopes.length > 0) {
const hasAll = requiredScopes.every((scope: string) => identity.scopes.includes(scope));
if (!hasAll) return false;
}
if (requiredScopesAny && requiredScopesAny.length > 0) {
const hasAny = requiredScopesAny.some((scope: string) => identity.scopes.includes(scope));
if (!hasAny) return false;
}
if (resourceType && resourceAction) {
if (!identity.resources) return false;
for (const [key, actions] of Object.entries(identity.resources)) {
if (key.startsWith(`${resourceType}:`) && actions.includes(resourceAction)) {
return true;
}
}
return false;
}
return true;
}

View File

@@ -1,10 +1,12 @@
import { Type, type Static } from "@alkdev/typebox";
import { createPubSub, type PubSub } from "@alkdev/pubsub";
import { createPubSub, type PubSub, Repeater, type Push, type Stop } from "@alkdev/pubsub";
import { OperationRegistry } from "./registry.js";
import { subscribe } from "./subscribe.js";
import { CallError, InfrastructureErrorCode, mapError } from "./error.js";
import { ResponseEnvelopeSchema, isResponseEnvelope } from "./response-envelope.js";
import type { ResponseEnvelope } from "./response-envelope.js";
import type { Identity, OperationContext, AccessControl } from "./types.js";
import type { Identity, OperationContext } from "./types.js";
import { OperationType } from "./types.js";
export const CallEventSchema = {
"call.requested": Type.Object({
@@ -49,13 +51,25 @@ type CallPubSubMap = {
"call.error": CallErrorEvent;
};
interface PendingRequest {
interface PendingCall {
resolve: (value: ResponseEnvelope) => void;
reject: (reason: unknown) => void;
deadline?: number;
timer?: ReturnType<typeof setTimeout>;
}
interface SubscriptionState {
push: Push<ResponseEnvelope>;
stop: Stop;
deadline?: number;
timer?: ReturnType<typeof setTimeout>;
consumerStopped?: boolean;
}
type PendingEntry =
| { type: "call"; pending: PendingCall }
| { type: "subscribe"; state: SubscriptionState };
export interface CallHandlerConfig {
registry: OperationRegistry;
callMap: PendingRequestMap;
@@ -64,7 +78,7 @@ export interface CallHandlerConfig {
export type CallHandler = (event: CallRequestedEvent) => Promise<void>;
export class PendingRequestMap {
private requests = new Map<string, PendingRequest>();
private entries = new Map<string, PendingEntry>();
private pubsub: PubSub<CallPubSubMap>;
constructor(eventTarget?: EventTarget) {
@@ -79,11 +93,21 @@ export class PendingRequestMap {
(async () => {
for await (const envelope of respondedIter) {
const responded = envelope.payload;
const pending = this.requests.get(responded.requestId);
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
this.requests.delete(responded.requestId);
pending.resolve(responded.output as ResponseEnvelope);
const entry = this.entries.get(responded.requestId);
if (!entry) continue;
if (entry.type === "call") {
if (entry.pending.timer) clearTimeout(entry.pending.timer);
this.entries.delete(responded.requestId);
entry.pending.resolve(responded.output as ResponseEnvelope);
} else {
if (entry.state.timer) {
clearTimeout(entry.state.timer);
if (entry.state.deadline) {
entry.state.timer = this.startSubscriptionTimer(responded.requestId, entry.state.deadline);
}
}
entry.state.push(responded.output as ResponseEnvelope);
}
}
})();
@@ -92,11 +116,18 @@ export class PendingRequestMap {
(async () => {
for await (const envelope of errorIter) {
const err = envelope.payload;
const pending = this.requests.get(err.requestId);
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
this.requests.delete(err.requestId);
pending.reject(new CallError(err.code, err.message, err.details));
const entry = this.entries.get(err.requestId);
if (!entry) continue;
if (entry.type === "call") {
if (entry.pending.timer) clearTimeout(entry.pending.timer);
this.entries.delete(err.requestId);
entry.pending.reject(new CallError(err.code, err.message, err.details));
} else {
if (entry.state.timer) clearTimeout(entry.state.timer);
entry.state.consumerStopped = true;
entry.state.stop(new CallError(err.code, err.message, err.details));
this.entries.delete(err.requestId);
}
}
})();
@@ -105,16 +136,34 @@ export class PendingRequestMap {
(async () => {
for await (const envelope of abortedIter) {
const aborted = envelope.payload;
const pending = this.requests.get(aborted.requestId);
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
this.requests.delete(aborted.requestId);
pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${aborted.requestId} was aborted`));
const entry = this.entries.get(aborted.requestId);
if (!entry) continue;
if (entry.type === "call") {
if (entry.pending.timer) clearTimeout(entry.pending.timer);
this.entries.delete(aborted.requestId);
entry.pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${aborted.requestId} was aborted`));
} else {
if (entry.state.timer) clearTimeout(entry.state.timer);
entry.state.consumerStopped = true;
entry.state.stop();
this.entries.delete(aborted.requestId);
}
}
})();
}
private startSubscriptionTimer(requestId: string, deadline: number): ReturnType<typeof setTimeout> {
return setTimeout(() => {
const entry = this.entries.get(requestId);
if (!entry || entry.type !== "subscribe") return;
if (entry.state.timer) clearTimeout(entry.state.timer);
entry.state.consumerStopped = true;
this.pubsub.publish("call.aborted", "", { requestId });
entry.state.stop(new CallError(InfrastructureErrorCode.TIMEOUT, `Subscription ${requestId} timed out (idle)`, { deadline }));
}, deadline);
}
async call(
operationId: string,
input: unknown,
@@ -123,17 +172,17 @@ export class PendingRequestMap {
const requestId = crypto.randomUUID();
return new Promise((resolve, reject) => {
const pending: PendingRequest = { resolve, reject };
const pending: PendingCall = { resolve, reject };
if (options?.deadline) {
pending.deadline = options.deadline;
pending.timer = setTimeout(() => {
this.requests.delete(requestId);
this.entries.delete(requestId);
reject(new CallError(InfrastructureErrorCode.TIMEOUT, `Request ${requestId} timed out`, { deadline: options.deadline }));
}, options.deadline - Date.now());
}
this.requests.set(requestId, pending);
this.entries.set(requestId, { type: "call", pending });
this.pubsub.publish("call.requested", "", {
requestId,
@@ -146,6 +195,47 @@ export class PendingRequestMap {
});
}
subscribe(
operationId: string,
input: unknown,
options?: { parentRequestId?: string; deadline?: number; identity?: Identity },
): AsyncIterable<ResponseEnvelope> {
const requestId = crypto.randomUUID();
const repeater = new Repeater<ResponseEnvelope>((push: Push<ResponseEnvelope>, stop: Stop) => {
const state: SubscriptionState = { push, stop };
if (options?.deadline) {
state.deadline = options.deadline;
state.timer = this.startSubscriptionTimer(requestId, options.deadline);
}
this.entries.set(requestId, { type: "subscribe", state });
this.pubsub.publish("call.requested", "", {
requestId,
operationId,
input,
parentRequestId: options?.parentRequestId,
deadline: options?.deadline,
identity: options?.identity,
});
stop.then(() => {
const entry = this.entries.get(requestId);
if (entry && entry.type === "subscribe") {
if (entry.state.timer) clearTimeout(entry.state.timer);
if (!entry.state.consumerStopped) {
this.pubsub.publish("call.aborted", "", { requestId });
}
this.entries.delete(requestId);
}
});
});
return repeater;
}
respond(requestId: string, output: ResponseEnvelope): void {
if (!isResponseEnvelope(output)) {
throw new Error("PendingRequestMap.respond() requires a ResponseEnvelope. Use isResponseEnvelope() to check values before calling respond().");
@@ -166,17 +256,24 @@ export class PendingRequestMap {
}
abort(requestId: string): void {
const pending = this.requests.get(requestId);
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
this.requests.delete(requestId);
const entry = this.entries.get(requestId);
if (!entry) return;
if (entry.type === "call") {
if (entry.pending.timer) clearTimeout(entry.pending.timer);
this.entries.delete(requestId);
this.pubsub.publish("call.aborted", "", { requestId });
pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${requestId} was aborted`));
entry.pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${requestId} was aborted`));
} else {
if (entry.state.timer) clearTimeout(entry.state.timer);
entry.state.consumerStopped = true;
this.pubsub.publish("call.aborted", "", { requestId });
entry.state.stop();
}
}
getPendingCount(): number {
return this.requests.size;
return this.entries.size;
}
}
@@ -193,8 +290,19 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler {
};
try {
const envelope = await registry.execute(operationId, input, context);
callMap.respond(requestId, envelope);
const spec = registry.getSpec(operationId);
if (!spec) {
throw new CallError(InfrastructureErrorCode.OPERATION_NOT_FOUND, `Operation not found: ${operationId}`, { operationId });
}
if (spec.type === OperationType.SUBSCRIPTION) {
for await (const envelope of subscribe(registry, operationId, input, context)) {
callMap.respond(requestId, envelope);
}
} else {
const envelope = await registry.execute(operationId, input, context);
callMap.respond(requestId, envelope);
}
} catch (error) {
const spec = registry.getSpec(operationId);
const callError = mapError(error, spec?.errorSchemas);
@@ -203,29 +311,5 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler {
};
}
export function checkAccess(accessControl: AccessControl, identity: Identity): boolean {
const { requiredScopes, requiredScopesAny, resourceType, resourceAction } = accessControl;
if (requiredScopes.length > 0) {
const hasAll = requiredScopes.every((scope: string) => identity.scopes.includes(scope));
if (!hasAll) return false;
}
if (requiredScopesAny && requiredScopesAny.length > 0) {
const hasAny = requiredScopesAny.some((scope: string) => identity.scopes.includes(scope));
if (!hasAny) return false;
}
if (resourceType && resourceAction) {
if (!identity.resources) return false;
for (const [key, actions] of Object.entries(identity.resources)) {
if (key.startsWith(`${resourceType}:`) && actions.includes(resourceAction)) {
return true;
}
}
return false;
}
return true;
}

View File

@@ -1,6 +1,6 @@
import * as Type from "@alkdev/typebox";
import { FromSchema } from "./from_schema.js";
import { OperationType, type OperationSpec, type OperationHandler, type OperationContext } from "./types.js";
import { OperationType, type OperationSpec, type OperationHandler, type SubscriptionHandler, type OperationContext } from "./types.js";
import { CallError } from "./error.js";
import { httpEnvelope } from "./response-envelope.js";
@@ -51,6 +51,95 @@ export interface HTTPServiceConfig {
timeout?: number;
}
export interface SSEEvent {
data: string;
eventType: string;
lastEventId: string;
}
export function parseSSEFrames(buffer: string): { events: SSEEvent[]; remaining: string } {
const events: SSEEvent[] = [];
let remaining = "";
let text = buffer;
if (text.charCodeAt(0) === 0xfeff) {
text = text.slice(1);
}
const lines = text.split(/\r\n|\r|\n/);
let dataBuffer: string[] = [];
let eventType = "";
let lastEventId = "";
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
if (i === lines.length - 1) {
remaining = line;
break;
}
if (line === "") {
if (dataBuffer.length > 0) {
events.push({
data: dataBuffer.join("\n"),
eventType: eventType || "message",
lastEventId,
});
}
dataBuffer = [];
eventType = "";
continue;
}
if (line.startsWith(":")) {
continue;
}
const colonIndex = line.indexOf(":");
if (colonIndex === -1) {
const field = line;
const value = "";
processSSEField(field, value, dataBuffer, (type) => { eventType = type; }, (id) => { lastEventId = id; });
continue;
}
const field = line.slice(0, colonIndex);
let value = line.slice(colonIndex + 1);
if (value.startsWith(" ")) {
value = value.slice(1);
}
processSSEField(field, value, dataBuffer, (type) => { eventType = type; }, (id) => { lastEventId = id; });
}
if (dataBuffer.length > 0) {
remaining = dataBuffer.join("\n");
}
return { events, remaining };
}
function processSSEField(
field: string,
value: string,
dataBuffer: string[],
setEventType: (type: string) => void,
setLastEventId: (id: string) => void,
): void {
switch (field) {
case "data":
dataBuffer.push(value);
break;
case "event":
setEventType(value);
break;
case "id":
setLastEventId(value);
break;
}
}
function resolveRef(spec: OpenAPISpec, ref: string): unknown {
if (!ref.startsWith("#/")) {
throw new Error(`External refs not supported: ${ref}`);
@@ -221,16 +310,109 @@ function getAuthHeaders(config: HTTPServiceConfig): Record<string, string> {
return headers;
}
type HTTPOperationHandler = OperationHandler<unknown, unknown, OperationContext> | SubscriptionHandler<unknown, unknown, OperationContext>;
function createHTTPOperation(
spec: OpenAPISpec,
operation: OpenAPIOperation,
method: string,
path: string,
config: HTTPServiceConfig,
): OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> } {
): OperationSpec & { handler: HTTPOperationHandler } {
const operationId = normalizeOperationId(operation, method, path);
const opType = detectOperationType(method, operation);
const authHeaders = getAuthHeaders(config);
const responseHeaders = (): Record<string, string> => ({ ...authHeaders, "Content-Type": "application/json" });
if (opType === OperationType.SUBSCRIPTION) {
const handler: SubscriptionHandler<unknown, unknown, OperationContext> = async function* (input: unknown, context: OperationContext) {
const inputObj = (input as Record<string, unknown>) || {};
let urlPath = path;
const queryParams: Record<string, string> = {};
for (const [key, value] of Object.entries(inputObj)) {
if (path.includes(`{${key}}`)) {
urlPath = urlPath.replace(`{${key}}`, encodeURIComponent(String(value)));
} else if (key === "body") {
// body not typically used for SSE GET, but supported
} else {
queryParams[key] = String(value);
}
}
const url = new URL(config.baseUrl + urlPath);
for (const [key, value] of Object.entries(queryParams)) {
url.searchParams.set(key, value);
}
const headers: Record<string, string> = {
...authHeaders,
"Accept": "text/event-stream",
};
const response = await fetch(url.toString(), {
method: method.toUpperCase(),
headers,
signal: config.timeout ? AbortSignal.timeout(config.timeout) : undefined,
});
if (!response.ok) {
throw new CallError("EXECUTION_ERROR", `HTTP ${response.status}: ${response.statusText}`);
}
const reader = response.body!.getReader();
const decoder = new TextDecoder();
let buffer = "";
const responseHeadersObj = Object.fromEntries(response.headers.entries());
try {
while (true) {
const { done, value: chunk } = await reader.read();
if (done) break;
buffer += decoder.decode(chunk, { stream: true });
const { events, remaining } = parseSSEFrames(buffer);
buffer = remaining;
for (const event of events) {
if (event.data.trim() === "") continue;
let parsedData: unknown = event.data;
try {
parsedData = JSON.parse(event.data);
} catch {
// not JSON — yield raw data string
}
yield httpEnvelope(parsedData, {
statusCode: response.status,
headers: responseHeadersObj,
contentType: "text/event-stream",
});
}
}
} finally {
reader.releaseLock();
}
};
return {
name: operationId,
namespace: config.namespace,
version: "1.0.0",
type: opType,
description: operation.description || operation.summary || `${method.toUpperCase()} ${path}`,
tags: operation.tags,
inputSchema: buildInputSchema(spec, operation),
outputSchema: buildOutputSchema(spec, operation),
accessControl: { requiredScopes: [] },
handler,
_meta: {
method: method.toUpperCase(),
path,
summary: operation.summary,
},
};
}
const handler: OperationHandler<unknown, unknown, OperationContext> = async (input: unknown, context: OperationContext) => {
const inputObj = (input as Record<string, unknown>) || {};
@@ -306,8 +488,8 @@ function createHTTPOperation(
};
}
export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): Array<OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> }> {
const operations: Array<OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> }> = [];
export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): Array<OperationSpec & { handler: HTTPOperationHandler }> {
const operations: Array<OperationSpec & { handler: HTTPOperationHandler }> = [];
const basePath = spec.basePath || "";
for (const [path, methods] of Object.entries(spec.paths)) {
@@ -328,7 +510,7 @@ export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): Array
return operations;
}
export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise<Array<OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> }>> {
export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise<Array<OperationSpec & { handler: HTTPOperationHandler }>> {
let content: string;
if (fs) {
content = await fs.readFile(path);
@@ -340,7 +522,7 @@ export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, f
return FromOpenAPI(spec, config);
}
export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise<Array<OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> }>> {
export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise<Array<OperationSpec & { handler: HTTPOperationHandler }>> {
const response = await fetch(url);
const spec = await response.json() as OpenAPISpec;
return FromOpenAPI(spec, config);

View File

@@ -11,8 +11,9 @@ export { scanOperations } from "./scanner.js";
export type { OperationManifest, ScannerFS } from "./scanner.js";
export { CallError, InfrastructureErrorCode, mapError } from "./error.js";
export type { CallErrorCode } from "./error.js";
export { PendingRequestMap, buildCallHandler, checkAccess } from "./call.js";
export { PendingRequestMap, buildCallHandler } from "./call.js";
export type { CallEventMap, CallEventMapValue, CallRequestedEvent, CallRespondedEvent, CallAbortedEvent, CallErrorEvent, CallHandler, CallHandlerConfig } from "./call.js";
export { checkAccess } from "./access.js";
export { subscribe } from "./subscribe.js";
export { createMCPClient, closeMCPClient, MCPClientLoader } from "./from_mcp.js";
export type { MCPClientConfig, MCPClientWrapper } from "./from_mcp.js";

View File

@@ -5,7 +5,7 @@ import { KindGuard } from "@alkdev/typebox";
import { assertIsSchema, validateOrThrow, collectErrors, formatValueErrors } from "./validation.js";
import { isResponseEnvelope, localEnvelope, type ResponseEnvelope } from "./response-envelope.js";
import { CallError, InfrastructureErrorCode } from "./error.js";
import { checkAccess } from "./call.js";
import { checkAccess } from "./access.js";
const logger = getLogger("operations:registry");

View File

@@ -2,7 +2,7 @@ import type { OperationContext, AccessControl } from "./types.js";
import { OperationRegistry } from "./registry.js";
import { type ResponseEnvelope, isResponseEnvelope, localEnvelope } from "./response-envelope.js";
import { CallError, InfrastructureErrorCode } from "./error.js";
import { checkAccess } from "./call.js";
import { checkAccess } from "./access.js";
export async function* subscribe(
registry: OperationRegistry,