diff --git a/docs/reviews/pre-release-review-2025-05-16.md b/docs/reviews/pre-release-review-2025-05-16.md index a3049a8..a900838 100644 --- a/docs/reviews/pre-release-review-2025-05-16.md +++ b/docs/reviews/pre-release-review-2025-05-16.md @@ -2,7 +2,7 @@ **Date:** 2026-05-16 **Scope:** Full codebase review for issues that would impact downstream hub/spoke implementations -**Status:** C-01 through C-05, H-01 through H-06, M-01 through M-03, M-05 through M-08, and L-01 through L-05 resolved. M-04 (call.completed signaling) and L-04 (SSE GET requestBody) deferred. M-02 resolved (subpath export added, OpenAPI re-exported from main entry removed). +**Status:** C-01 through C-05, H-01 through H-06, M-01 through M-08, and L-01 through L-05 resolved. L-04 resolved. --- @@ -129,13 +129,11 @@ Both adapters have access to version information (MCP server info, OpenAPI `info `CallRequestedEvent` schema now has both `deadline?: number` and `idleTimeout?: number` as optional fields. `PendingRequestMap.subscribe()` accepts `{ idleTimeout }` instead of `{ deadline }`. The idle timer is correctly reset on each event using the stored `idleTimeout` value. -### M-04. No unsubscribe/completion signaling in call protocol +### M-04. No unsubscribe/completion signaling in call protocol ✅ RESOLVED -**File:** `src/call.ts:298-301` +**File:** `src/call.ts` -**Status:** DEFERRED — will be implemented in a dedicated session due to protocol-level impact. - -When a subscription ends (the `for await` loop completes), `buildCallHandler` simply stops calling `callMap.respond()`. There is no `call.completed` or `call.finished` event type to explicitly signal subscription completion. The consumer must detect iterator completion, which works with `Repeater` but may not work with all pubsub implementations. +**Resolution:** Added `call.completed` event type to the call protocol. When a subscription's async generator finishes naturally, `buildCallHandler` now calls `callMap.complete(requestId)` which publishes a `call.completed` event. The `PendingRequestMap` handles this event by closing the consumer's Repeater iterator (for subscriptions) or rejecting the pending promise (for calls). Added `CallCompletedEvent` type and schema, `complete()` method on `PendingRequestMap`, and a pubsub listener for `call.completed` in `setupSubscriptions()`. The `consumerStopped` flag is set before `stop()` to prevent spurious `call.aborted` events. ### M-05. mapError uses fragile message.includes(code) matching ✅ RESOLVED @@ -177,11 +175,11 @@ When a subscription ends (the `for await` loop completes), `buildCallHandler` si **Resolution:** Added `logger.warn()` call when the fallback `Type.Unknown()` is reached, logging the unrecognized schema via `JSON.stringify()`. Uses `@logtape/logtape` with category `"operations:from_schema"`, matching the existing pattern. -### L-04. from_openapi.ts SSE GET includes requestBody handling +### L-04. from_openapi.ts SSE handler now forwards requestBody ✅ RESOLVED **File:** `src/from_openapi.ts` -**Status:** NOTED — The `else if (key === "body")` branch in the SSE handler silently accepts but does not forward body parameters. This is intentional for unusual SSE-over-POST use cases. No action needed; the behavior is self-documenting from the code structure. +**Resolution:** The SSE subscription handler previously had an empty `else if (key === "body")` branch that silently dropped body parameters. Fixed by adding `body` variable extraction and forwarding it as a JSON request body (with `Content-Type: application/json` header) when present. This correctly supports SSE-over-POST patterns while preserving GET-without-body behavior for standard SSE endpoints. ### L-05. No convenience registerAll methods on MCP or OpenAPI adapters ✅ RESOLVED diff --git a/src/call.ts b/src/call.ts index 9aa9c19..3408942 100644 --- a/src/call.ts +++ b/src/call.ts @@ -30,6 +30,9 @@ export const CallEventSchema = { requestId: Type.String(), output: ResponseEnvelopeSchema, }), + "call.completed": Type.Object({ + requestId: Type.String(), + }), "call.aborted": Type.Object({ requestId: Type.String(), }), @@ -43,15 +46,17 @@ export const CallEventSchema = { export type CallRequestedEvent = Static; export type CallRespondedEvent = Static; +export type CallCompletedEvent = Static; export type CallAbortedEvent = Static; export type CallErrorEvent = Static; -export type CallEventMapValue = CallRequestedEvent | CallRespondedEvent | CallAbortedEvent | CallErrorEvent; +export type CallEventMapValue = CallRequestedEvent | CallRespondedEvent | CallCompletedEvent | CallAbortedEvent | CallErrorEvent; export const CallEventMap = CallEventSchema; type CallPubSubMap = { "call.requested": CallRequestedEvent; "call.responded": CallRespondedEvent; + "call.completed": CallCompletedEvent; "call.aborted": CallAbortedEvent; "call.error": CallErrorEvent; }; @@ -141,6 +146,27 @@ export class PendingRequestMap { logger.error(`call.error listener error: ${error instanceof Error ? error.message : String(error)}`); }); + const completedIter = this.pubsub.subscribe("call.completed", ""); + (async () => { + for await (const envelope of completedIter) { + const completed = envelope.payload; + const entry = this.entries.get(completed.requestId); + if (!entry) continue; + + if (entry.type === "subscribe") { + if (entry.state.timer) clearTimeout(entry.state.timer); + entry.state.consumerStopped = true; + entry.state.stop(); + } else { + if (entry.pending.timer) clearTimeout(entry.pending.timer); + entry.pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${completed.requestId} completed without response`)); + } + this.entries.delete(completed.requestId); + } + })().catch((error) => { + logger.error(`call.completed listener error: ${error instanceof Error ? error.message : String(error)}`); + }); + const abortedIter = this.pubsub.subscribe("call.aborted", ""); (async () => { for await (const envelope of abortedIter) { @@ -267,6 +293,10 @@ export class PendingRequestMap { }); } + complete(requestId: string): void { + this.pubsub.publish("call.completed", "", { requestId }); + } + abort(requestId: string): void { const entry = this.entries.get(requestId); if (!entry) return; @@ -312,6 +342,7 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler { for await (const envelope of subscribe(registry, operationId, input, context)) { callMap.respond(requestId, envelope); } + callMap.complete(requestId); } else { const envelope = await registry.execute(operationId, input, context); callMap.respond(requestId, envelope); diff --git a/src/from_openapi.ts b/src/from_openapi.ts index 952ad8d..19a24b4 100644 --- a/src/from_openapi.ts +++ b/src/from_openapi.ts @@ -334,11 +334,13 @@ function createHTTPOperation( let urlPath = path; const queryParams: Record = {}; + let body: unknown = undefined; for (const [key, value] of Object.entries(inputObj)) { if (path.includes(`{${key}}`)) { urlPath = urlPath.replace(`{${key}}`, encodeURIComponent(String(value))); } else if (key === "body") { + body = value; } else { queryParams[key] = String(value); } @@ -351,12 +353,14 @@ function createHTTPOperation( const headers: Record = { ...authHeaders, + ...(body ? { "Content-Type": "application/json" } : {}), "Accept": "text/event-stream", }; const response = await httpClient(url.toString(), { method: method.toUpperCase(), headers, + body: body ? JSON.stringify(body) : undefined, signal: config.timeout ? AbortSignal.timeout(config.timeout) : undefined, }); diff --git a/src/index.ts b/src/index.ts index 1c04c0e..8c92c1e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -11,7 +11,7 @@ export type { OperationManifest, ScannerFS } from "./scanner.js"; export { CallError, InfrastructureErrorCode, mapError } from "./error.js"; export type { CallErrorCode } from "./error.js"; export { PendingRequestMap, buildCallHandler } from "./call.js"; -export type { CallEventMap, CallEventMapValue, CallRequestedEvent, CallRespondedEvent, CallAbortedEvent, CallErrorEvent, CallHandler, CallHandlerConfig } from "./call.js"; +export type { CallEventMap, CallEventMapValue, CallRequestedEvent, CallRespondedEvent, CallCompletedEvent, CallAbortedEvent, CallErrorEvent, CallHandler, CallHandlerConfig } from "./call.js"; export { checkAccess, enforceAccess } from "./access.js"; export { subscribe } from "./subscribe.js"; export { defaultAdapter, zodAdapter, valibotAdapter } from "./from_typemap.js"; diff --git a/test/call.test.ts b/test/call.test.ts index 52411ca..1a7fa6e 100644 --- a/test/call.test.ts +++ b/test/call.test.ts @@ -1162,7 +1162,6 @@ describe("CallHandler SUBSCRIPTION dispatch", () => { const consumePromise = (async () => { try { for await (const _ of subscribeIter) { - // should not reach } } catch (error) { caughtError = error; @@ -1184,4 +1183,149 @@ describe("CallHandler SUBSCRIPTION dispatch", () => { expect(caughtError).toBeInstanceOf(CallError); expect((caughtError as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED); }); +}); + +describe("call.completed signaling", () => { + it("complete() closes subscription iterator", async () => { + const map = new PendingRequestMap(); + let completedRequestId: string | undefined; + + const completedIter = map["pubsub"].subscribe("call.completed", ""); + const completedPromise = (async () => { + for await (const envelope of completedIter) { + completedRequestId = envelope.payload.requestId; + break; + } + })(); + + const subscribeIter = map.subscribe("test.stream", {}); + let iterationCompleted = false; + + const consumePromise = (async () => { + for await (const _ of subscribeIter) { + } + iterationCompleted = true; + })(); + + await new Promise((r) => setTimeout(r, 20)); + const requestId = [...map["entries"].keys()][0]; + + map.respond(requestId, localEnvelope("event1", "test.stream")); + + await new Promise((r) => setTimeout(r, 20)); + map.complete(requestId); + + await consumePromise; + await completedPromise; + + expect(iterationCompleted).toBe(true); + expect(completedRequestId).toBe(requestId); + expect(map.getPendingCount()).toBe(0); + }); + + it("buildCallHandler publishes call.completed when subscription generator finishes", async () => { + const registry = new OperationRegistry(); + registry.register({ + name: "finite", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "finite stream", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + handler: async function* (_input: unknown, _context: unknown) { + yield "a"; + yield "b"; + }, + }); + + const callMap = new PendingRequestMap(); + const handler = buildCallHandler({ registry, callMap }); + + let completedRequestId: string | undefined; + const completedIter = callMap["pubsub"].subscribe("call.completed", ""); + const completedPromise = (async () => { + for await (const envelope of completedIter) { + completedRequestId = envelope.payload.requestId; + break; + } + })(); + + const subscribeIter = callMap.subscribe("test.finite", {}); + const results: ResponseEnvelope[] = []; + const consumePromise = (async () => { + for await (const envelope of subscribeIter) { + results.push(envelope); + } + })(); + + await new Promise((r) => setTimeout(r, 20)); + const requestId = [...callMap["entries"].keys()][0]; + + await handler({ + requestId, + operationId: "test.finite", + input: {}, + }); + + await consumePromise; + await completedPromise; + + expect(results).toHaveLength(2); + expect(results[0].data).toBe("a"); + expect(results[1].data).toBe("b"); + expect(completedRequestId).toBe(requestId); + expect(callMap.getPendingCount()).toBe(0); + }); + + it("complete() on a call entry rejects the pending promise", async () => { + const map = new PendingRequestMap(); + + let completedRequestId: string | undefined; + const completedIter = map["pubsub"].subscribe("call.completed", ""); + const completedPromise = (async () => { + for await (const envelope of completedIter) { + completedRequestId = envelope.payload.requestId; + break; + } + })(); + + const callPromise = map.call("test.op", {}); + const requestId = [...map["entries"].keys()][0]; + + map.complete(requestId); + + await completedPromise; + expect(completedRequestId).toBe(requestId); + + try { + await callPromise; + expect.fail("Should have thrown"); + } catch (error) { + expect(error).toBeInstanceOf(CallError); + expect((error as CallError).code).toBe(InfrastructureErrorCode.ABORTED); + expect((error as CallError).message).toContain("completed without response"); + } + + expect(map.getPendingCount()).toBe(0); + }); + + it("complete() with invalid requestId still publishes event", async () => { + const map = new PendingRequestMap(); + + let completedReceived = false; + const completedIter = map["pubsub"].subscribe("call.completed", ""); + const completedPromise = (async () => { + for await (const envelope of completedIter) { + completedReceived = true; + break; + } + })(); + + map.complete("nonexistent-id"); + + await completedPromise; + expect(completedReceived).toBe(true); + }); }); \ No newline at end of file diff --git a/test/from_openapi.test.ts b/test/from_openapi.test.ts index 1603ab6..9a6adca 100644 --- a/test/from_openapi.test.ts +++ b/test/from_openapi.test.ts @@ -579,7 +579,6 @@ describe("FromOpenAPI SUBSCRIPTION handler", () => { try { for await (const _ of handler({}, {} as any)) { - // should not reach } expect.fail("Expected CallError"); } catch (error) { @@ -588,4 +587,112 @@ describe("FromOpenAPI SUBSCRIPTION handler", () => { expect((error as CallError).message).toContain("HTTP 500"); } }); + + it("SSE handler forwards body in request when provided", async () => { + const sseStream = "data: {\"ok\":true}\n\n"; + const encoder = new TextEncoder(); + + const reader = { + read: vi.fn() + .mockResolvedValueOnce({ done: false, value: encoder.encode(sseStream) }) + .mockResolvedValueOnce({ done: true, value: undefined }), + releaseLock: vi.fn(), + }; + + const fetchMock = vi.fn().mockResolvedValue({ + ok: true, + status: 200, + statusText: "OK", + headers: new Headers({ "Content-Type": "text/event-stream" }), + body: { getReader: () => reader }, + }); + globalThis.fetch = fetchMock; + + const specWithBody = { + openapi: "3.0.0", + info: { title: "Test", version: "1.0.0" }, + paths: { + "/events": { + post: { + operationId: "postEvents", + description: "Post and stream events", + requestBody: { + content: { + "application/json": { + schema: { type: "object", properties: { filter: { type: "string" } } }, + }, + }, + }, + responses: { + "200": { + content: { + "text/event-stream": { + schema: { type: "object", properties: { ok: { type: "boolean" } } }, + }, + }, + }, + }, + }, + }, + }, + }; + + const ops = FromOpenAPI(specWithBody as any, config); + const postEvents = ops.find((o) => o.name === "postEvents")!; + const handler = postEvents.handler as SubscriptionHandler; + + const results: unknown[] = []; + for await (const value of handler({ body: { filter: "important" } }, {} as any)) { + results.push(value); + } + + expect(fetchMock).toHaveBeenCalledTimes(1); + const fetchArgs = fetchMock.mock.calls[0]; + expect(fetchArgs[1].method).toBe("POST"); + expect(fetchArgs[1].body).toBe(JSON.stringify({ filter: "important" })); + expect(fetchArgs[1].headers["Content-Type"]).toBe("application/json"); + + expect(results).toHaveLength(1); + if (isResponseEnvelope(results[0])) { + expect(results[0].data).toEqual({ ok: true }); + } + }); + + it("SSE handler does not send body when input has no body key", async () => { + const sseStream = "data: {\"event\":\"ping\"}\n\n"; + const encoder = new TextEncoder(); + + const reader = { + read: vi.fn() + .mockResolvedValueOnce({ done: false, value: encoder.encode(sseStream) }) + .mockResolvedValueOnce({ done: true, value: undefined }), + releaseLock: vi.fn(), + }; + + const fetchMock = vi.fn().mockResolvedValue({ + ok: true, + status: 200, + statusText: "OK", + headers: new Headers({ "Content-Type": "text/event-stream" }), + body: { getReader: () => reader }, + }); + globalThis.fetch = fetchMock; + + const ops = FromOpenAPI(simpleSpec as any, config); + const streamEvents = ops.find((o) => o.name === "streamEvents")!; + const handler = streamEvents.handler as SubscriptionHandler; + + const results: unknown[] = []; + for await (const value of handler({}, {} as any)) { + results.push(value); + } + + expect(fetchMock).toHaveBeenCalledTimes(1); + const fetchArgs = fetchMock.mock.calls[0]; + expect(fetchArgs[1].method).toBe("GET"); + expect(fetchArgs[1].body).toBeUndefined(); + expect(fetchArgs[1].headers["Content-Type"]).toBeUndefined(); + + expect(results).toHaveLength(1); + }); }); \ No newline at end of file