diff --git a/docs/reviews/pre-release-review-2025-05-16.md b/docs/reviews/pre-release-review-2025-05-16.md index a474d9c..a292290 100644 --- a/docs/reviews/pre-release-review-2025-05-16.md +++ b/docs/reviews/pre-release-review-2025-05-16.md @@ -2,7 +2,7 @@ **Date:** 2026-05-16 **Scope:** Full codebase review for issues that would impact downstream hub/spoke implementations -**Status:** C-01 through C-05 and H-01 through H-06 resolved. M-01 through M-08 and L-01 through L-05 remain as follow-ups. +**Status:** C-01 through C-05, H-01 through H-06, and M-03 resolved. M-01 through M-02 and M-04 through M-08, plus L-01 through L-05, remain as follow-ups. --- @@ -116,11 +116,16 @@ These two schemas are nearly identical, differing only in the `handler` field. A **File:** `package.json` exports only `./from-mcp` and `./from-typemap` as subpath entries. `from_openapi.ts` is bundled into the main entry, meaning anyone importing `PendingRequestMap` or `OperationRegistry` also pulls in the OpenAPI adapter code (including `fetch` usage and `node:fs/promises` import). This should be a separate subpath export for tree-shaking. -### M-03. Subscription deadline semantics are ambiguous +### M-03. Subscription deadline semantics are ambiguous ✅ RESOLVED -**File:** `src/call.ts:103-109` +**File:** `src/call.ts` -When a `call.responded` event arrives for a subscription, the deadline timer is reset to the same `entry.state.deadline` value. If `deadline` is an absolute timestamp (e.g., `Date.now() + 5000`), the new timer delay would be `deadline - Date.now()`, which shrinks over time and could become negative. The documentation doesn't specify whether `deadline` is absolute or relative. This should be clarified and the reset logic adjusted. +**Resolution:** Split the overloaded `deadline` field into two distinct concepts: + +- `deadline` (absolute timestamp, ms since epoch) — used by `call()`. Total time limit for a request/response cycle. Computed as `setTimeout(fn, Math.max(0, deadline - Date.now()))`. +- `idleTimeout` (relative duration in ms) — used by `subscribe()`. Maximum ms between events before the subscription is considered idle. Reset on each `call.responded` event. Omit for no idle timeout (subscription lives until explicit abort). + +`CallRequestedEvent` schema now has both `deadline?: number` and `idleTimeout?: number` as optional fields. `PendingRequestMap.subscribe()` accepts `{ idleTimeout }` instead of `{ deadline }`. The idle timer is correctly reset on each event using the stored `idleTimeout` value. ### M-04. No unsubscribe/completion signaling in call protocol diff --git a/src/call.ts b/src/call.ts index 2d1f162..9aa9c19 100644 --- a/src/call.ts +++ b/src/call.ts @@ -17,7 +17,8 @@ export const CallEventSchema = { operationId: Type.String(), input: Type.Unknown(), parentRequestId: Type.Optional(Type.String()), - deadline: Type.Optional(Type.Number()), + deadline: Type.Optional(Type.Number({ description: "Absolute timestamp (ms since epoch) for call/response timeout. Used by calls." })), + idleTimeout: Type.Optional(Type.Number({ description: "Relative duration (ms) between events before subscription is considered idle. Used by subscriptions. Omit for no idle timeout." })), identity: Type.Optional(Type.Object({ id: Type.String(), scopes: Type.Array(Type.String()), @@ -65,7 +66,7 @@ interface PendingCall { interface SubscriptionState { push: Push; stop: Stop; - deadline?: number; + idleTimeout?: number; timer?: ReturnType; consumerStopped?: boolean; } @@ -107,8 +108,8 @@ export class PendingRequestMap { } else { if (entry.state.timer) { clearTimeout(entry.state.timer); - if (entry.state.deadline) { - entry.state.timer = this.startSubscriptionTimer(responded.requestId, entry.state.deadline); + if (entry.state.idleTimeout != null) { + entry.state.timer = this.startIdleTimer(responded.requestId, entry.state.idleTimeout); } } entry.state.push(responded.output as ResponseEnvelope); @@ -163,15 +164,15 @@ export class PendingRequestMap { }); } - private startSubscriptionTimer(requestId: string, deadline: number): ReturnType { + private startIdleTimer(requestId: string, idleTimeout: number): ReturnType { return setTimeout(() => { const entry = this.entries.get(requestId); if (!entry || entry.type !== "subscribe") return; if (entry.state.timer) clearTimeout(entry.state.timer); entry.state.consumerStopped = true; this.pubsub.publish("call.aborted", "", { requestId }); - entry.state.stop(new CallError(InfrastructureErrorCode.TIMEOUT, `Subscription ${requestId} timed out (idle)`, { deadline })); - }, deadline); + entry.state.stop(new CallError(InfrastructureErrorCode.TIMEOUT, `Subscription ${requestId} timed out (idle after ${idleTimeout}ms)`, { idleTimeout })); + }, idleTimeout); } async call( @@ -186,10 +187,11 @@ export class PendingRequestMap { if (options?.deadline) { pending.deadline = options.deadline; + const delay = Math.max(0, options.deadline - Date.now()); pending.timer = setTimeout(() => { this.entries.delete(requestId); reject(new CallError(InfrastructureErrorCode.TIMEOUT, `Request ${requestId} timed out`, { deadline: options.deadline })); - }, options.deadline - Date.now()); + }, delay); } this.entries.set(requestId, { type: "call", pending }); @@ -208,16 +210,16 @@ export class PendingRequestMap { subscribe( operationId: string, input: unknown, - options?: { parentRequestId?: string; deadline?: number; identity?: Identity }, + options?: { parentRequestId?: string; idleTimeout?: number; identity?: Identity }, ): AsyncIterable { const requestId = crypto.randomUUID(); const repeater = new Repeater((push: Push, stop: Stop) => { const state: SubscriptionState = { push, stop }; - if (options?.deadline) { - state.deadline = options.deadline; - state.timer = this.startSubscriptionTimer(requestId, options.deadline); + if (options?.idleTimeout != null) { + state.idleTimeout = options.idleTimeout; + state.timer = this.startIdleTimer(requestId, options.idleTimeout); } this.entries.set(requestId, { type: "subscribe", state }); @@ -227,7 +229,7 @@ export class PendingRequestMap { operationId, input, parentRequestId: options?.parentRequestId, - deadline: options?.deadline, + idleTimeout: options?.idleTimeout, identity: options?.identity, }); diff --git a/test/call.test.ts b/test/call.test.ts index e122ac7..52411ca 100644 --- a/test/call.test.ts +++ b/test/call.test.ts @@ -956,11 +956,11 @@ describe("PendingRequestMap.subscribe()", () => { expect(iterationCompleted).toBe(true); }); - it("times out on idle deadline", async () => { + it("times out on idle timeout", async () => { const map = new PendingRequestMap(); - const deadline = 80; + const idleTimeout = 80; - const subscribeIter = map.subscribe("test.slow", {}, { deadline }); + const subscribeIter = map.subscribe("test.slow", {}, { idleTimeout }); let caughtError: unknown; const consumePromise = (async () => { @@ -981,9 +981,9 @@ describe("PendingRequestMap.subscribe()", () => { it("resets idle timeout on each envelope", async () => { const map = new PendingRequestMap(); - const deadline = 150; + const idleTimeout = 150; - const subscribeIter = map.subscribe("test.heartbeat", {}, { deadline }); + const subscribeIter = map.subscribe("test.heartbeat", {}, { idleTimeout }); const results: ResponseEnvelope[] = []; const consumePromise = (async () => { @@ -1056,6 +1056,37 @@ describe("PendingRequestMap.subscribe()", () => { await consumePromise; expect(map.getPendingCount()).toBe(0); }); + + it("subscribe without idleTimeout never times out", async () => { + const map = new PendingRequestMap(); + + const subscribeIter = map.subscribe("test.persistent", {}); + let eventCount = 0; + + const consumePromise = (async () => { + for await (const _ of subscribeIter) { + eventCount++; + if (eventCount >= 1) break; + } + })(); + + await new Promise((r) => setTimeout(r, 50)); + const requestId = [...map["entries"].keys()][0]; + map.respond(requestId, localEnvelope("event", "test.persistent")); + + await consumePromise; + expect(eventCount).toBe(1); + }); + + it("call() with past deadline times out immediately", async () => { + const map = new PendingRequestMap(); + + const deadline = Date.now() - 100; + const callPromise = map.call("test.op", { value: "hello" }, { deadline }); + + await expect(callPromise).rejects.toThrow("timed out"); + await expect(callPromise).rejects.toBeInstanceOf(CallError); + }); }); describe("CallHandler SUBSCRIPTION dispatch", () => {