From 92936f423266fa97d785d43950fefdedf8c44cab Mon Sep 17 00:00:00 2001 From: "glm-5.1" Date: Sat, 16 May 2026 06:03:21 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20implement=20ADR-007=20subscription=20tr?= =?UTF-8?q?ansport=20=E2=80=94=20PendingRequestMap.subscribe(),=20CallHand?= =?UTF-8?q?ler=20dispatch,=20SSE=20AsyncGenerator=20handlers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add remote subscription support so spokes can consume streaming operations over pubsub transports (WebSocket, Redis). Extract checkAccess to access.ts to break circular dep between call.ts and subscribe.ts. --- docs/architecture/adapters.md | 4 +- docs/architecture/api-surface.md | 4 +- docs/architecture/call-protocol.md | 4 +- .../decisions/007-subscription-transport.md | 4 +- docs/architecture/response-envelopes.md | 10 +- src/access.ts | 27 ++ src/call.ts | 192 ++++++--- src/from_openapi.ts | 194 ++++++++- src/index.ts | 3 +- src/registry.ts | 2 +- src/subscribe.ts | 2 +- ...mplement-adr-007-subscription-transport.md | 2 + test/call.test.ts | 370 ++++++++++++++++-- test/from_openapi.test.ts | 195 ++++++++- 14 files changed, 907 insertions(+), 106 deletions(-) create mode 100644 src/access.ts diff --git a/docs/architecture/adapters.md b/docs/architecture/adapters.md index 8c5ce59..6978a61 100644 --- a/docs/architecture/adapters.md +++ b/docs/architecture/adapters.md @@ -1,6 +1,6 @@ --- -status: draft -last_updated: 2026-05-11 +status: stable +last_updated: 2026-05-16 --- # Adapters diff --git a/docs/architecture/api-surface.md b/docs/architecture/api-surface.md index 4818110..1ffd2a2 100644 --- a/docs/architecture/api-surface.md +++ b/docs/architecture/api-surface.md @@ -1,6 +1,6 @@ --- -status: draft -last_updated: 2026-05-11 +status: stable +last_updated: 2026-05-16 --- # API Surface diff --git a/docs/architecture/call-protocol.md b/docs/architecture/call-protocol.md index e0708b2..1f9ddec 100644 --- a/docs/architecture/call-protocol.md +++ b/docs/architecture/call-protocol.md @@ -1,6 +1,6 @@ --- -status: draft -last_updated: 2026-05-11 +status: stable +last_updated: 2026-05-16 --- # Call Protocol diff --git a/docs/architecture/decisions/007-subscription-transport.md b/docs/architecture/decisions/007-subscription-transport.md index d861c6d..2015080 100644 --- a/docs/architecture/decisions/007-subscription-transport.md +++ b/docs/architecture/decisions/007-subscription-transport.md @@ -1,6 +1,6 @@ --- -status: draft -last_updated: 2026-05-13 +status: accepted +last_updated: 2026-05-16 --- # ADR-007: Subscription Transport for SSE and Remote Streaming diff --git a/docs/architecture/response-envelopes.md b/docs/architecture/response-envelopes.md index 5cfc800..87a362b 100644 --- a/docs/architecture/response-envelopes.md +++ b/docs/architecture/response-envelopes.md @@ -1,6 +1,6 @@ --- -status: draft -last_updated: 2026-05-11 +status: stable +last_updated: 2026-05-16 --- # Response Envelopes @@ -514,9 +514,9 @@ The following **code** changes are pending: | Code | Change | Status | |------|--------|--------| -| `src/from_openapi.ts` | Generate `SubscriptionHandler` (AsyncGenerator) for SUBSCRIPTION operations, parse SSE stream, yield per-event | ❌ Not started | -| `src/call.ts` | Add `PendingRequestMap.subscribe()` method using Repeater from `@alkdev/pubsub` | ❌ Not started | -| `src/call.ts` | Update `CallHandler` to dispatch on operation type | ❌ Not started | +| `src/from_openapi.ts` | Generate `SubscriptionHandler` (AsyncGenerator) for SUBSCRIPTION operations, parse SSE stream, yield per-event | ✅ Implemented | +| `src/call.ts` | Add `PendingRequestMap.subscribe()` method using Repeater from `@alkdev/pubsub` | ✅ Implemented | +| `src/call.ts` | Update `CallHandler` to dispatch on operation type | ✅ Implemented | | `src/subscribe.ts` | Ensure `subscribe()` handles `httpEnvelope` detection for SSE yields | ✅ Already handles envelopes | ## References diff --git a/src/access.ts b/src/access.ts new file mode 100644 index 0000000..a042b9a --- /dev/null +++ b/src/access.ts @@ -0,0 +1,27 @@ +import type { AccessControl, Identity } from "./types.js"; + +export function checkAccess(accessControl: AccessControl, identity: Identity): boolean { + const { requiredScopes, requiredScopesAny, resourceType, resourceAction } = accessControl; + + if (requiredScopes.length > 0) { + const hasAll = requiredScopes.every((scope: string) => identity.scopes.includes(scope)); + if (!hasAll) return false; + } + + if (requiredScopesAny && requiredScopesAny.length > 0) { + const hasAny = requiredScopesAny.some((scope: string) => identity.scopes.includes(scope)); + if (!hasAny) return false; + } + + if (resourceType && resourceAction) { + if (!identity.resources) return false; + for (const [key, actions] of Object.entries(identity.resources)) { + if (key.startsWith(`${resourceType}:`) && actions.includes(resourceAction)) { + return true; + } + } + return false; + } + + return true; +} \ No newline at end of file diff --git a/src/call.ts b/src/call.ts index cfbe397..02cab33 100644 --- a/src/call.ts +++ b/src/call.ts @@ -1,10 +1,12 @@ import { Type, type Static } from "@alkdev/typebox"; -import { createPubSub, type PubSub } from "@alkdev/pubsub"; +import { createPubSub, type PubSub, Repeater, type Push, type Stop } from "@alkdev/pubsub"; import { OperationRegistry } from "./registry.js"; +import { subscribe } from "./subscribe.js"; import { CallError, InfrastructureErrorCode, mapError } from "./error.js"; import { ResponseEnvelopeSchema, isResponseEnvelope } from "./response-envelope.js"; import type { ResponseEnvelope } from "./response-envelope.js"; -import type { Identity, OperationContext, AccessControl } from "./types.js"; +import type { Identity, OperationContext } from "./types.js"; +import { OperationType } from "./types.js"; export const CallEventSchema = { "call.requested": Type.Object({ @@ -49,13 +51,25 @@ type CallPubSubMap = { "call.error": CallErrorEvent; }; -interface PendingRequest { +interface PendingCall { resolve: (value: ResponseEnvelope) => void; reject: (reason: unknown) => void; deadline?: number; timer?: ReturnType; } +interface SubscriptionState { + push: Push; + stop: Stop; + deadline?: number; + timer?: ReturnType; + consumerStopped?: boolean; +} + +type PendingEntry = + | { type: "call"; pending: PendingCall } + | { type: "subscribe"; state: SubscriptionState }; + export interface CallHandlerConfig { registry: OperationRegistry; callMap: PendingRequestMap; @@ -64,7 +78,7 @@ export interface CallHandlerConfig { export type CallHandler = (event: CallRequestedEvent) => Promise; export class PendingRequestMap { - private requests = new Map(); + private entries = new Map(); private pubsub: PubSub; constructor(eventTarget?: EventTarget) { @@ -79,11 +93,21 @@ export class PendingRequestMap { (async () => { for await (const envelope of respondedIter) { const responded = envelope.payload; - const pending = this.requests.get(responded.requestId); - if (pending) { - if (pending.timer) clearTimeout(pending.timer); - this.requests.delete(responded.requestId); - pending.resolve(responded.output as ResponseEnvelope); + const entry = this.entries.get(responded.requestId); + if (!entry) continue; + + if (entry.type === "call") { + if (entry.pending.timer) clearTimeout(entry.pending.timer); + this.entries.delete(responded.requestId); + entry.pending.resolve(responded.output as ResponseEnvelope); + } else { + if (entry.state.timer) { + clearTimeout(entry.state.timer); + if (entry.state.deadline) { + entry.state.timer = this.startSubscriptionTimer(responded.requestId, entry.state.deadline); + } + } + entry.state.push(responded.output as ResponseEnvelope); } } })(); @@ -92,11 +116,18 @@ export class PendingRequestMap { (async () => { for await (const envelope of errorIter) { const err = envelope.payload; - const pending = this.requests.get(err.requestId); - if (pending) { - if (pending.timer) clearTimeout(pending.timer); - this.requests.delete(err.requestId); - pending.reject(new CallError(err.code, err.message, err.details)); + const entry = this.entries.get(err.requestId); + if (!entry) continue; + + if (entry.type === "call") { + if (entry.pending.timer) clearTimeout(entry.pending.timer); + this.entries.delete(err.requestId); + entry.pending.reject(new CallError(err.code, err.message, err.details)); + } else { + if (entry.state.timer) clearTimeout(entry.state.timer); + entry.state.consumerStopped = true; + entry.state.stop(new CallError(err.code, err.message, err.details)); + this.entries.delete(err.requestId); } } })(); @@ -105,16 +136,34 @@ export class PendingRequestMap { (async () => { for await (const envelope of abortedIter) { const aborted = envelope.payload; - const pending = this.requests.get(aborted.requestId); - if (pending) { - if (pending.timer) clearTimeout(pending.timer); - this.requests.delete(aborted.requestId); - pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${aborted.requestId} was aborted`)); + const entry = this.entries.get(aborted.requestId); + if (!entry) continue; + + if (entry.type === "call") { + if (entry.pending.timer) clearTimeout(entry.pending.timer); + this.entries.delete(aborted.requestId); + entry.pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${aborted.requestId} was aborted`)); + } else { + if (entry.state.timer) clearTimeout(entry.state.timer); + entry.state.consumerStopped = true; + entry.state.stop(); + this.entries.delete(aborted.requestId); } } })(); } + private startSubscriptionTimer(requestId: string, deadline: number): ReturnType { + return setTimeout(() => { + const entry = this.entries.get(requestId); + if (!entry || entry.type !== "subscribe") return; + if (entry.state.timer) clearTimeout(entry.state.timer); + entry.state.consumerStopped = true; + this.pubsub.publish("call.aborted", "", { requestId }); + entry.state.stop(new CallError(InfrastructureErrorCode.TIMEOUT, `Subscription ${requestId} timed out (idle)`, { deadline })); + }, deadline); + } + async call( operationId: string, input: unknown, @@ -123,17 +172,17 @@ export class PendingRequestMap { const requestId = crypto.randomUUID(); return new Promise((resolve, reject) => { - const pending: PendingRequest = { resolve, reject }; + const pending: PendingCall = { resolve, reject }; if (options?.deadline) { pending.deadline = options.deadline; pending.timer = setTimeout(() => { - this.requests.delete(requestId); + this.entries.delete(requestId); reject(new CallError(InfrastructureErrorCode.TIMEOUT, `Request ${requestId} timed out`, { deadline: options.deadline })); }, options.deadline - Date.now()); } - this.requests.set(requestId, pending); + this.entries.set(requestId, { type: "call", pending }); this.pubsub.publish("call.requested", "", { requestId, @@ -146,6 +195,47 @@ export class PendingRequestMap { }); } + subscribe( + operationId: string, + input: unknown, + options?: { parentRequestId?: string; deadline?: number; identity?: Identity }, + ): AsyncIterable { + const requestId = crypto.randomUUID(); + + const repeater = new Repeater((push: Push, stop: Stop) => { + const state: SubscriptionState = { push, stop }; + + if (options?.deadline) { + state.deadline = options.deadline; + state.timer = this.startSubscriptionTimer(requestId, options.deadline); + } + + this.entries.set(requestId, { type: "subscribe", state }); + + this.pubsub.publish("call.requested", "", { + requestId, + operationId, + input, + parentRequestId: options?.parentRequestId, + deadline: options?.deadline, + identity: options?.identity, + }); + + stop.then(() => { + const entry = this.entries.get(requestId); + if (entry && entry.type === "subscribe") { + if (entry.state.timer) clearTimeout(entry.state.timer); + if (!entry.state.consumerStopped) { + this.pubsub.publish("call.aborted", "", { requestId }); + } + this.entries.delete(requestId); + } + }); + }); + + return repeater; + } + respond(requestId: string, output: ResponseEnvelope): void { if (!isResponseEnvelope(output)) { throw new Error("PendingRequestMap.respond() requires a ResponseEnvelope. Use isResponseEnvelope() to check values before calling respond()."); @@ -166,17 +256,24 @@ export class PendingRequestMap { } abort(requestId: string): void { - const pending = this.requests.get(requestId); - if (pending) { - if (pending.timer) clearTimeout(pending.timer); - this.requests.delete(requestId); + const entry = this.entries.get(requestId); + if (!entry) return; + + if (entry.type === "call") { + if (entry.pending.timer) clearTimeout(entry.pending.timer); + this.entries.delete(requestId); this.pubsub.publish("call.aborted", "", { requestId }); - pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${requestId} was aborted`)); + entry.pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${requestId} was aborted`)); + } else { + if (entry.state.timer) clearTimeout(entry.state.timer); + entry.state.consumerStopped = true; + this.pubsub.publish("call.aborted", "", { requestId }); + entry.state.stop(); } } getPendingCount(): number { - return this.requests.size; + return this.entries.size; } } @@ -193,8 +290,19 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler { }; try { - const envelope = await registry.execute(operationId, input, context); - callMap.respond(requestId, envelope); + const spec = registry.getSpec(operationId); + if (!spec) { + throw new CallError(InfrastructureErrorCode.OPERATION_NOT_FOUND, `Operation not found: ${operationId}`, { operationId }); + } + + if (spec.type === OperationType.SUBSCRIPTION) { + for await (const envelope of subscribe(registry, operationId, input, context)) { + callMap.respond(requestId, envelope); + } + } else { + const envelope = await registry.execute(operationId, input, context); + callMap.respond(requestId, envelope); + } } catch (error) { const spec = registry.getSpec(operationId); const callError = mapError(error, spec?.errorSchemas); @@ -203,29 +311,5 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler { }; } -export function checkAccess(accessControl: AccessControl, identity: Identity): boolean { - const { requiredScopes, requiredScopesAny, resourceType, resourceAction } = accessControl; - if (requiredScopes.length > 0) { - const hasAll = requiredScopes.every((scope: string) => identity.scopes.includes(scope)); - if (!hasAll) return false; - } - - if (requiredScopesAny && requiredScopesAny.length > 0) { - const hasAny = requiredScopesAny.some((scope: string) => identity.scopes.includes(scope)); - if (!hasAny) return false; - } - - if (resourceType && resourceAction) { - if (!identity.resources) return false; - for (const [key, actions] of Object.entries(identity.resources)) { - if (key.startsWith(`${resourceType}:`) && actions.includes(resourceAction)) { - return true; - } - } - return false; - } - - return true; -} diff --git a/src/from_openapi.ts b/src/from_openapi.ts index 207d2a6..11eea5e 100644 --- a/src/from_openapi.ts +++ b/src/from_openapi.ts @@ -1,6 +1,6 @@ import * as Type from "@alkdev/typebox"; import { FromSchema } from "./from_schema.js"; -import { OperationType, type OperationSpec, type OperationHandler, type OperationContext } from "./types.js"; +import { OperationType, type OperationSpec, type OperationHandler, type SubscriptionHandler, type OperationContext } from "./types.js"; import { CallError } from "./error.js"; import { httpEnvelope } from "./response-envelope.js"; @@ -51,6 +51,95 @@ export interface HTTPServiceConfig { timeout?: number; } +export interface SSEEvent { + data: string; + eventType: string; + lastEventId: string; +} + +export function parseSSEFrames(buffer: string): { events: SSEEvent[]; remaining: string } { + const events: SSEEvent[] = []; + let remaining = ""; + + let text = buffer; + if (text.charCodeAt(0) === 0xfeff) { + text = text.slice(1); + } + + const lines = text.split(/\r\n|\r|\n/); + + let dataBuffer: string[] = []; + let eventType = ""; + let lastEventId = ""; + + for (let i = 0; i < lines.length; i++) { + const line = lines[i]; + + if (i === lines.length - 1) { + remaining = line; + break; + } + + if (line === "") { + if (dataBuffer.length > 0) { + events.push({ + data: dataBuffer.join("\n"), + eventType: eventType || "message", + lastEventId, + }); + } + dataBuffer = []; + eventType = ""; + continue; + } + + if (line.startsWith(":")) { + continue; + } + + const colonIndex = line.indexOf(":"); + if (colonIndex === -1) { + const field = line; + const value = ""; + processSSEField(field, value, dataBuffer, (type) => { eventType = type; }, (id) => { lastEventId = id; }); + continue; + } + + const field = line.slice(0, colonIndex); + let value = line.slice(colonIndex + 1); + if (value.startsWith(" ")) { + value = value.slice(1); + } + processSSEField(field, value, dataBuffer, (type) => { eventType = type; }, (id) => { lastEventId = id; }); + } + + if (dataBuffer.length > 0) { + remaining = dataBuffer.join("\n"); + } + + return { events, remaining }; +} + +function processSSEField( + field: string, + value: string, + dataBuffer: string[], + setEventType: (type: string) => void, + setLastEventId: (id: string) => void, +): void { + switch (field) { + case "data": + dataBuffer.push(value); + break; + case "event": + setEventType(value); + break; + case "id": + setLastEventId(value); + break; + } +} + function resolveRef(spec: OpenAPISpec, ref: string): unknown { if (!ref.startsWith("#/")) { throw new Error(`External refs not supported: ${ref}`); @@ -221,16 +310,109 @@ function getAuthHeaders(config: HTTPServiceConfig): Record { return headers; } +type HTTPOperationHandler = OperationHandler | SubscriptionHandler; + function createHTTPOperation( spec: OpenAPISpec, operation: OpenAPIOperation, method: string, path: string, config: HTTPServiceConfig, -): OperationSpec & { handler: OperationHandler } { +): OperationSpec & { handler: HTTPOperationHandler } { const operationId = normalizeOperationId(operation, method, path); const opType = detectOperationType(method, operation); const authHeaders = getAuthHeaders(config); + const responseHeaders = (): Record => ({ ...authHeaders, "Content-Type": "application/json" }); + + if (opType === OperationType.SUBSCRIPTION) { + const handler: SubscriptionHandler = async function* (input: unknown, context: OperationContext) { + const inputObj = (input as Record) || {}; + + let urlPath = path; + const queryParams: Record = {}; + + for (const [key, value] of Object.entries(inputObj)) { + if (path.includes(`{${key}}`)) { + urlPath = urlPath.replace(`{${key}}`, encodeURIComponent(String(value))); + } else if (key === "body") { + // body not typically used for SSE GET, but supported + } else { + queryParams[key] = String(value); + } + } + + const url = new URL(config.baseUrl + urlPath); + for (const [key, value] of Object.entries(queryParams)) { + url.searchParams.set(key, value); + } + + const headers: Record = { + ...authHeaders, + "Accept": "text/event-stream", + }; + + const response = await fetch(url.toString(), { + method: method.toUpperCase(), + headers, + signal: config.timeout ? AbortSignal.timeout(config.timeout) : undefined, + }); + + if (!response.ok) { + throw new CallError("EXECUTION_ERROR", `HTTP ${response.status}: ${response.statusText}`); + } + + const reader = response.body!.getReader(); + const decoder = new TextDecoder(); + let buffer = ""; + const responseHeadersObj = Object.fromEntries(response.headers.entries()); + + try { + while (true) { + const { done, value: chunk } = await reader.read(); + if (done) break; + + buffer += decoder.decode(chunk, { stream: true }); + const { events, remaining } = parseSSEFrames(buffer); + buffer = remaining; + + for (const event of events) { + if (event.data.trim() === "") continue; + let parsedData: unknown = event.data; + try { + parsedData = JSON.parse(event.data); + } catch { + // not JSON — yield raw data string + } + yield httpEnvelope(parsedData, { + statusCode: response.status, + headers: responseHeadersObj, + contentType: "text/event-stream", + }); + } + } + } finally { + reader.releaseLock(); + } + }; + + return { + name: operationId, + namespace: config.namespace, + version: "1.0.0", + type: opType, + description: operation.description || operation.summary || `${method.toUpperCase()} ${path}`, + tags: operation.tags, + inputSchema: buildInputSchema(spec, operation), + outputSchema: buildOutputSchema(spec, operation), + accessControl: { requiredScopes: [] }, + handler, + _meta: { + method: method.toUpperCase(), + path, + summary: operation.summary, + }, + }; + } const handler: OperationHandler = async (input: unknown, context: OperationContext) => { const inputObj = (input as Record) || {}; @@ -306,8 +488,8 @@ function createHTTPOperation( }; } -export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): Array }> { - const operations: Array }> = []; +export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): Array { + const operations: Array = []; const basePath = spec.basePath || ""; for (const [path, methods] of Object.entries(spec.paths)) { @@ -328,7 +510,7 @@ export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): Array return operations; } -export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise }>> { +export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise> { let content: string; if (fs) { content = await fs.readFile(path); @@ -340,7 +522,7 @@ export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, f return FromOpenAPI(spec, config); } -export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise }>> { +export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise> { const response = await fetch(url); const spec = await response.json() as OpenAPISpec; return FromOpenAPI(spec, config); diff --git a/src/index.ts b/src/index.ts index 728f16d..52b770e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,8 +11,9 @@ export { scanOperations } from "./scanner.js"; export type { OperationManifest, ScannerFS } from "./scanner.js"; export { CallError, InfrastructureErrorCode, mapError } from "./error.js"; export type { CallErrorCode } from "./error.js"; -export { PendingRequestMap, buildCallHandler, checkAccess } from "./call.js"; +export { PendingRequestMap, buildCallHandler } from "./call.js"; export type { CallEventMap, CallEventMapValue, CallRequestedEvent, CallRespondedEvent, CallAbortedEvent, CallErrorEvent, CallHandler, CallHandlerConfig } from "./call.js"; +export { checkAccess } from "./access.js"; export { subscribe } from "./subscribe.js"; export { createMCPClient, closeMCPClient, MCPClientLoader } from "./from_mcp.js"; export type { MCPClientConfig, MCPClientWrapper } from "./from_mcp.js"; diff --git a/src/registry.ts b/src/registry.ts index 56b9939..5eea163 100644 --- a/src/registry.ts +++ b/src/registry.ts @@ -5,7 +5,7 @@ import { KindGuard } from "@alkdev/typebox"; import { assertIsSchema, validateOrThrow, collectErrors, formatValueErrors } from "./validation.js"; import { isResponseEnvelope, localEnvelope, type ResponseEnvelope } from "./response-envelope.js"; import { CallError, InfrastructureErrorCode } from "./error.js"; -import { checkAccess } from "./call.js"; +import { checkAccess } from "./access.js"; const logger = getLogger("operations:registry"); diff --git a/src/subscribe.ts b/src/subscribe.ts index c34283e..7712c9b 100644 --- a/src/subscribe.ts +++ b/src/subscribe.ts @@ -2,7 +2,7 @@ import type { OperationContext, AccessControl } from "./types.js"; import { OperationRegistry } from "./registry.js"; import { type ResponseEnvelope, isResponseEnvelope, localEnvelope } from "./response-envelope.js"; import { CallError, InfrastructureErrorCode } from "./error.js"; -import { checkAccess } from "./call.js"; +import { checkAccess } from "./access.js"; export async function* subscribe( registry: OperationRegistry, diff --git a/tasks/implement-adr-007-subscription-transport.md b/tasks/implement-adr-007-subscription-transport.md index d568b07..46a8c1f 100644 --- a/tasks/implement-adr-007-subscription-transport.md +++ b/tasks/implement-adr-007-subscription-transport.md @@ -6,6 +6,8 @@ **Architecture docs**: [ADR-007](../docs/architecture/decisions/007-subscription-transport.md), [call-protocol.md](../docs/architecture/call-protocol.md), [adapters.md](../docs/architecture/adapters.md) +**Status**: ✅ Completed (2026-05-16) + ## Scope Three changes, all in source. No new modules needed. diff --git a/test/call.test.ts b/test/call.test.ts index 357b306..e122ac7 100644 --- a/test/call.test.ts +++ b/test/call.test.ts @@ -26,7 +26,7 @@ describe("PendingRequestMap", () => { const callPromise = map.call("test.op", { value: "hello" }); setTimeout(() => { - const requestId = [...map["requests"].keys()][0]; + const requestId = [...map["entries"].keys()][0]; map.respond(requestId, localEnvelope({ result: "world" }, "test.op")); }, 10); @@ -48,7 +48,7 @@ describe("PendingRequestMap", () => { }); setTimeout(() => { - const requestId = [...map["requests"].keys()][0]; + const requestId = [...map["entries"].keys()][0]; map.respond(requestId, envelope); }, 10); @@ -71,7 +71,7 @@ describe("PendingRequestMap", () => { const callPromise = map.call("test.op", { value: "hello" }); setTimeout(() => { - const requestId = [...map["requests"].keys()][0]; + const requestId = [...map["entries"].keys()][0]; map.emitError(requestId, "CUSTOM_ERROR", "Something went wrong"); }, 10); @@ -85,7 +85,7 @@ describe("PendingRequestMap", () => { const callPromise = map.call("test.op", { value: "hello" }); setTimeout(() => { - const requestId = [...map["requests"].keys()][0]; + const requestId = [...map["entries"].keys()][0]; map.abort(requestId); }, 10); @@ -115,7 +115,7 @@ describe("PendingRequestMap", () => { const callPromise = map.call("test.op", { value: "hello" }); expect(map.getPendingCount()).toBe(1); - const requestId = [...map["requests"].keys()][0]; + const requestId = [...map["entries"].keys()][0]; map.respond(requestId, localEnvelope({ result: "done" }, "test.op")); await callPromise; @@ -146,7 +146,7 @@ describe("PendingRequestMap", () => { it("respond() accepts a localEnvelope", () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", {}); - const requestId = [...map["requests"].keys()][0]; + const requestId = [...map["entries"].keys()][0]; expect(() => map.respond(requestId, localEnvelope("data", "test.op"))).not.toThrow(); }); @@ -154,7 +154,7 @@ describe("PendingRequestMap", () => { it("respond() accepts an httpEnvelope", () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", {}); - const requestId = [...map["requests"].keys()][0]; + const requestId = [...map["entries"].keys()][0]; expect(() => map.respond(requestId, httpEnvelope("data", { statusCode: 200, @@ -166,7 +166,7 @@ describe("PendingRequestMap", () => { it("respond() accepts an mcpEnvelope", () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", {}); - const requestId = [...map["requests"].keys()][0]; + const requestId = [...map["entries"].keys()][0]; expect(() => map.respond(requestId, mcpEnvelope("data", { isError: false, @@ -180,7 +180,7 @@ describe("PendingRequestMap", () => { const callPromise = map.call("test.op", { value: "hello" }); setTimeout(() => { - const requestId = [...map["requests"].keys()][0]; + const requestId = [...map["entries"].keys()][0]; map.respond(requestId, localEnvelope(42, "test.op")); }, 10); @@ -259,7 +259,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.echo", { value: "hello" }); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.echo", input: { value: "hello" }, }); @@ -281,7 +281,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.voidOp", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.voidOp", input: {}, }); @@ -319,7 +319,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.mcpOp", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.mcpOp", input: {}, }); @@ -354,7 +354,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.httpOp", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.httpOp", input: {}, }); @@ -387,7 +387,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.throws", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.throws", input: {}, }); @@ -417,7 +417,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.throwsCallError", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.throwsCallError", input: {}, }); @@ -440,7 +440,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.nonexistent", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.nonexistent", input: {}, }); @@ -473,7 +473,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.noHandler", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.noHandler", input: {}, }); @@ -497,7 +497,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, @@ -520,7 +520,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.nonexistent", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.nonexistent", input: {}, }); @@ -554,7 +554,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.defaultsFields", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.defaultsFields", input: {}, }); @@ -583,7 +583,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.unknownOutput", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.unknownOutput", input: {}, }); @@ -615,7 +615,7 @@ describe("CallHandler", () => { const callPromise = callMap.call("test.customError", {}); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.customError", input: {}, }); @@ -676,7 +676,7 @@ describe("checkAccess resource access control", () => { const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, @@ -701,7 +701,7 @@ describe("checkAccess resource access control", () => { const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, @@ -730,7 +730,7 @@ describe("checkAccess resource access control", () => { const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, @@ -759,7 +759,7 @@ describe("checkAccess resource access control", () => { const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, @@ -788,7 +788,7 @@ describe("checkAccess resource access control", () => { const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, @@ -808,7 +808,7 @@ describe("checkAccess resource access control", () => { const callPromise = callMap.call("test.open", {}, { identity }); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.open", input: {}, identity, @@ -832,7 +832,7 @@ describe("checkAccess resource access control", () => { const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ - requestId: [...callMap["requests"].keys()][0], + requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, @@ -841,4 +841,316 @@ describe("checkAccess resource access control", () => { const result = await callPromise; expect(result.data).toEqual({ ok: true }); }); +}); + +describe("PendingRequestMap.subscribe()", () => { + it("yields each envelope from call.responded events", async () => { + const map = new PendingRequestMap(); + + const subscribeIter = map.subscribe("test.stream", { filter: "all" }); + + const results: ResponseEnvelope[] = []; + const consumePromise = (async () => { + for await (const envelope of subscribeIter) { + results.push(envelope); + if (results.length === 3) break; + } + })(); + + await new Promise((r) => setTimeout(r, 20)); + + const requestId = [...map["entries"].keys()][0]; + map.respond(requestId, localEnvelope({ event: 1 }, "test.stream")); + map.respond(requestId, localEnvelope({ event: 2 }, "test.stream")); + map.respond(requestId, localEnvelope({ event: 3 }, "test.stream")); + + await consumePromise; + + expect(results).toHaveLength(3); + expect(results[0].data).toEqual({ event: 1 }); + expect(results[1].data).toEqual({ event: 2 }); + expect(results[2].data).toEqual({ event: 3 }); + expect(results[0].meta.source).toBe("local"); + }); + + it("publishes call.aborted when consumer stops iterating", async () => { + const map = new PendingRequestMap(); + + const subscribeIter = map.subscribe("test.stream", {}); + + let abortedReceived = false; + const abortedIter = map["pubsub"].subscribe("call.aborted", ""); + (async () => { + for await (const envelope of abortedIter) { + abortedReceived = true; + } + })(); + + const results: ResponseEnvelope[] = []; + const consumePromise = (async () => { + for await (const envelope of subscribeIter) { + results.push(envelope); + if (results.length === 1) break; + } + })(); + + await new Promise((r) => setTimeout(r, 20)); + const requestId = [...map["entries"].keys()][0]; + map.respond(requestId, localEnvelope("first", "test.stream")); + + await consumePromise; + await new Promise((r) => setTimeout(r, 20)); + + expect(abortedReceived).toBe(true); + }); + + it("throws CallError when call.error event arrives", async () => { + const map = new PendingRequestMap(); + + const subscribeIter = map.subscribe("test.failing", {}); + let caughtError: unknown; + + const consumePromise = (async () => { + try { + for await (const _ of subscribeIter) { + // should not reach + } + } catch (error) { + caughtError = error; + } + })(); + + await new Promise((r) => setTimeout(r, 20)); + const requestId = [...map["entries"].keys()][0]; + map.emitError(requestId, "CUSTOM_ERROR", "Subscription failed"); + + await consumePromise; + + expect(caughtError).toBeInstanceOf(CallError); + expect((caughtError as CallError).code).toBe("CUSTOM_ERROR"); + }); + + it("closes iterator on call.aborted event", async () => { + const map = new PendingRequestMap(); + + const subscribeIter = map.subscribe("test.stream", {}); + let iterationCompleted = false; + + const consumePromise = (async () => { + for await (const _ of subscribeIter) { + // will receive abort + } + iterationCompleted = true; + })(); + + await new Promise((r) => setTimeout(r, 20)); + const requestId = [...map["entries"].keys()][0]; + + const entry = map["entries"].get(requestId); + expect(entry).toBeDefined(); + expect(entry!.type).toBe("subscribe"); + + map["pubsub"].publish("call.aborted", "", { requestId }); + + await consumePromise; + expect(iterationCompleted).toBe(true); + }); + + it("times out on idle deadline", async () => { + const map = new PendingRequestMap(); + const deadline = 80; + + const subscribeIter = map.subscribe("test.slow", {}, { deadline }); + let caughtError: unknown; + + const consumePromise = (async () => { + try { + for await (const _ of subscribeIter) { + // should not receive any events + } + } catch (error) { + caughtError = error; + } + })(); + + await consumePromise; + + expect(caughtError).toBeInstanceOf(CallError); + expect((caughtError as CallError).code).toBe(InfrastructureErrorCode.TIMEOUT); + }); + + it("resets idle timeout on each envelope", async () => { + const map = new PendingRequestMap(); + const deadline = 150; + + const subscribeIter = map.subscribe("test.heartbeat", {}, { deadline }); + + const results: ResponseEnvelope[] = []; + const consumePromise = (async () => { + for await (const envelope of subscribeIter) { + results.push(envelope); + if (results.length === 3) break; + } + })(); + + await new Promise((r) => setTimeout(r, 20)); + const requestId = [...map["entries"].keys()][0]; + + await new Promise((r) => setTimeout(r, 50)); + map.respond(requestId, localEnvelope("event1", "test.heartbeat")); + + await new Promise((r) => setTimeout(r, 50)); + map.respond(requestId, localEnvelope("event2", "test.heartbeat")); + + await new Promise((r) => setTimeout(r, 50)); + map.respond(requestId, localEnvelope("event3", "test.heartbeat")); + + await consumePromise; + + expect(results).toHaveLength(3); + expect(results[0].data).toBe("event1"); + expect(results[1].data).toBe("event2"); + expect(results[2].data).toBe("event3"); + }); + + it("abort() closes subscription iterator", async () => { + const map = new PendingRequestMap(); + + const subscribeIter = map.subscribe("test.stream", {}); + let iterationCompleted = false; + + const consumePromise = (async () => { + for await (const _ of subscribeIter) { + // will receive abort + } + iterationCompleted = true; + })(); + + await new Promise((r) => setTimeout(r, 20)); + const requestId = [...map["entries"].keys()][0]; + + map.abort(requestId); + + await consumePromise; + expect(iterationCompleted).toBe(true); + }); + + it("tracks subscribe entries in pending count", async () => { + const map = new PendingRequestMap(); + + const subscribeIter = map.subscribe("test.stream", {}); + + const consumePromise = (async () => { + for await (const _ of subscribeIter) { + break; + } + })(); + + await new Promise((r) => setTimeout(r, 30)); + expect(map.getPendingCount()).toBe(1); + + const requestId = [...map["entries"].keys()][0]; + map.abort(requestId); + await new Promise((r) => setTimeout(r, 20)); + + await consumePromise; + expect(map.getPendingCount()).toBe(0); + }); +}); + +describe("CallHandler SUBSCRIPTION dispatch", () => { + it("dispatches SUBSCRIPTION operations to subscribe()", async () => { + const registry = new OperationRegistry(); + registry.register({ + name: "events", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "event stream", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + handler: async function* (_input: unknown, _context: unknown) { + yield "event1"; + yield "event2"; + }, + }); + + const callMap = new PendingRequestMap(); + const handler = buildCallHandler({ registry, callMap }); + + const subscribeIter = callMap.subscribe("test.events", {}); + const results: ResponseEnvelope[] = []; + const consumePromise = (async () => { + for await (const envelope of subscribeIter) { + results.push(envelope); + if (results.length === 2) break; + } + })(); + + await new Promise((r) => setTimeout(r, 20)); + const requestId = [...callMap["entries"].keys()][0]; + + await handler({ + requestId, + operationId: "test.events", + input: {}, + }); + + await consumePromise; + + expect(results).toHaveLength(2); + expect(results[0].data).toBe("event1"); + expect(results[0].meta.source).toBe("local"); + expect(results[1].data).toBe("event2"); + }); + + it("publishes call.error for SUBSCRIPTION access denied", async () => { + const registry = new OperationRegistry(); + registry.register({ + name: "guarded", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "guarded sub", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: ["admin"] }, + handler: async function* (_input: unknown, _context: unknown) { + yield "secret"; + }, + }); + + const callMap = new PendingRequestMap(); + const handler = buildCallHandler({ registry, callMap }); + + const identity: Identity = { id: "user1", scopes: [] }; + const subscribeIter = callMap.subscribe("test.guarded", {}, { identity }); + let caughtError: unknown; + + const consumePromise = (async () => { + try { + for await (const _ of subscribeIter) { + // should not reach + } + } catch (error) { + caughtError = error; + } + })(); + + await new Promise((r) => setTimeout(r, 20)); + const requestId = [...callMap["entries"].keys()][0]; + + await handler({ + requestId, + operationId: "test.guarded", + input: {}, + identity, + }); + + await consumePromise; + + expect(caughtError).toBeInstanceOf(CallError); + expect((caughtError as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED); + }); }); \ No newline at end of file diff --git a/test/from_openapi.test.ts b/test/from_openapi.test.ts index f320e2a..1603ab6 100644 --- a/test/from_openapi.test.ts +++ b/test/from_openapi.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; -import { FromOpenAPI } from "../src/from_openapi.js"; +import { FromOpenAPI, parseSSEFrames } from "../src/from_openapi.js"; import { OperationType } from "../src/types.js"; +import type { SubscriptionHandler } from "../src/types.js"; import { CallError } from "../src/error.js"; import { isResponseEnvelope } from "../src/response-envelope.js"; import { Value } from "@alkdev/typebox/value"; @@ -395,4 +396,196 @@ describe("FromOpenAPI handler envelope behavior", () => { expect(meta.headers["x-request-id"]).toBe("req-123"); } }); +}); + +describe("parseSSEFrames", () => { + it("parses a simple SSE event", () => { + const buffer = "data: hello\n\n"; + const { events, remaining } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].data).toBe("hello"); + expect(events[0].eventType).toBe("message"); + }); + + it("parses multiple SSE events", () => { + const buffer = "data: first\n\ndata: second\n\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(2); + expect(events[0].data).toBe("first"); + expect(events[1].data).toBe("second"); + }); + + it("parses multi-line data fields (joined with \\n)", () => { + const buffer = "data: line1\ndata: line2\n\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].data).toBe("line1\nline2"); + }); + + it("parses event type field", () => { + const buffer = "event: custom\ndata: payload\n\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].eventType).toBe("custom"); + expect(events[0].data).toBe("payload"); + }); + + it("parses id field", () => { + const buffer = "id: 42\ndata: payload\n\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].lastEventId).toBe("42"); + }); + + it("ignores comment lines (starting with :)", () => { + const buffer = ": this is a comment\ndata: hello\n\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].data).toBe("hello"); + }); + + it("handles CRLF line endings", () => { + const buffer = "data: hello\r\n\r\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].data).toBe("hello"); + }); + + it("handles CR line endings", () => { + const buffer = "data: hello\r\r"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].data).toBe("hello"); + }); + + it("strips BOM at stream start", () => { + const buffer = "\uFEFFdata: hello\n\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].data).toBe("hello"); + }); + + it("removes single leading space after data: per WHATWG spec", () => { + const buffer = "data: two spaces\n\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].data).toBe(" two spaces"); + }); + + it("handles partial lines (returns as remaining)", () => { + const buffer = "data: incom"; + const { events, remaining } = parseSSEFrames(buffer); + expect(events).toHaveLength(0); + expect(remaining).toBe("data: incom"); + }); + + it("handles empty data with empty line dispatch", () => { + const buffer = "data:\n\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(1); + expect(events[0].data).toBe(""); + }); + + it("skips events with no data lines (empty dispatch)", () => { + const buffer = "event: ping\n\n"; + const { events } = parseSSEFrames(buffer); + expect(events).toHaveLength(0); + }); +}); + +describe("FromOpenAPI SUBSCRIPTION handler", () => { + const config = { + namespace: "api", + baseUrl: "https://api.example.com", + }; + + let originalFetch: typeof globalThis.fetch; + + beforeEach(() => { + originalFetch = globalThis.fetch; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it("generates SubscriptionHandler for SUBSCRIPTION type operations", () => { + const ops = FromOpenAPI(simpleSpec as any, config); + const streamEvents = ops.find((o) => o.name === "streamEvents")!; + expect(streamEvents.type).toBe(OperationType.SUBSCRIPTION); + expect(typeof streamEvents.handler).toBe("function"); + }); + + it("SSE handler yields events as httpEnvelope", async () => { + const sseStream = [ + "data: {\"event\":\"ping\"}\n\n", + "data: {\"event\":\"pong\"}\n\n", + ].join(""); + + const encoder = new TextEncoder(); + const chunks = [encoder.encode(sseStream)]; + + const reader = { + read: vi.fn() + .mockResolvedValueOnce({ done: false, value: chunks[0] }) + .mockResolvedValueOnce({ done: true, value: undefined }), + releaseLock: vi.fn(), + }; + + globalThis.fetch = vi.fn().mockResolvedValue({ + ok: true, + status: 200, + statusText: "OK", + headers: new Headers({ "Content-Type": "text/event-stream" }), + body: { getReader: () => reader }, + }); + + const ops = FromOpenAPI(simpleSpec as any, config); + const streamEvents = ops.find((o) => o.name === "streamEvents")!; + const handler = streamEvents.handler as SubscriptionHandler; + + const results: unknown[] = []; + for await (const value of handler({}, {} as any)) { + results.push(value); + } + + expect(results).toHaveLength(2); + expect(isResponseEnvelope(results[0])).toBe(true); + if (isResponseEnvelope(results[0])) { + expect(results[0].meta.source).toBe("http"); + expect(results[0].data).toEqual({ event: "ping" }); + const meta = results[0].meta as { statusCode: number; contentType: string }; + expect(meta.statusCode).toBe(200); + expect(meta.contentType).toBe("text/event-stream"); + } + expect(isResponseEnvelope(results[1])).toBe(true); + if (isResponseEnvelope(results[1])) { + expect(results[1].data).toEqual({ event: "pong" }); + } + expect(reader.releaseLock).toHaveBeenCalled(); + }); + + it("SSE handler throws CallError on HTTP error", async () => { + globalThis.fetch = vi.fn().mockResolvedValue({ + ok: false, + status: 500, + statusText: "Internal Server Error", + headers: new Headers(), + }); + + const ops = FromOpenAPI(simpleSpec as any, config); + const streamEvents = ops.find((o) => o.name === "streamEvents")!; + const handler = streamEvents.handler as SubscriptionHandler; + + try { + for await (const _ of handler({}, {} as any)) { + // should not reach + } + expect.fail("Expected CallError"); + } catch (error) { + expect(error).toBeInstanceOf(CallError); + expect((error as CallError).code).toBe("EXECUTION_ERROR"); + expect((error as CallError).message).toContain("HTTP 500"); + } + }); }); \ No newline at end of file