fix: split deadline/idleTimeout semantics in call protocol (M-03)
This commit is contained in:
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
**Date:** 2026-05-16
|
**Date:** 2026-05-16
|
||||||
**Scope:** Full codebase review for issues that would impact downstream hub/spoke implementations
|
**Scope:** Full codebase review for issues that would impact downstream hub/spoke implementations
|
||||||
**Status:** C-01 through C-05 and H-01 through H-06 resolved. M-01 through M-08 and L-01 through L-05 remain as follow-ups.
|
**Status:** C-01 through C-05, H-01 through H-06, and M-03 resolved. M-01 through M-02 and M-04 through M-08, plus L-01 through L-05, remain as follow-ups.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
@@ -116,11 +116,16 @@ These two schemas are nearly identical, differing only in the `handler` field. A
|
|||||||
|
|
||||||
**File:** `package.json` exports only `./from-mcp` and `./from-typemap` as subpath entries. `from_openapi.ts` is bundled into the main entry, meaning anyone importing `PendingRequestMap` or `OperationRegistry` also pulls in the OpenAPI adapter code (including `fetch` usage and `node:fs/promises` import). This should be a separate subpath export for tree-shaking.
|
**File:** `package.json` exports only `./from-mcp` and `./from-typemap` as subpath entries. `from_openapi.ts` is bundled into the main entry, meaning anyone importing `PendingRequestMap` or `OperationRegistry` also pulls in the OpenAPI adapter code (including `fetch` usage and `node:fs/promises` import). This should be a separate subpath export for tree-shaking.
|
||||||
|
|
||||||
### M-03. Subscription deadline semantics are ambiguous
|
### M-03. Subscription deadline semantics are ambiguous ✅ RESOLVED
|
||||||
|
|
||||||
**File:** `src/call.ts:103-109`
|
**File:** `src/call.ts`
|
||||||
|
|
||||||
When a `call.responded` event arrives for a subscription, the deadline timer is reset to the same `entry.state.deadline` value. If `deadline` is an absolute timestamp (e.g., `Date.now() + 5000`), the new timer delay would be `deadline - Date.now()`, which shrinks over time and could become negative. The documentation doesn't specify whether `deadline` is absolute or relative. This should be clarified and the reset logic adjusted.
|
**Resolution:** Split the overloaded `deadline` field into two distinct concepts:
|
||||||
|
|
||||||
|
- `deadline` (absolute timestamp, ms since epoch) — used by `call()`. Total time limit for a request/response cycle. Computed as `setTimeout(fn, Math.max(0, deadline - Date.now()))`.
|
||||||
|
- `idleTimeout` (relative duration in ms) — used by `subscribe()`. Maximum ms between events before the subscription is considered idle. Reset on each `call.responded` event. Omit for no idle timeout (subscription lives until explicit abort).
|
||||||
|
|
||||||
|
`CallRequestedEvent` schema now has both `deadline?: number` and `idleTimeout?: number` as optional fields. `PendingRequestMap.subscribe()` accepts `{ idleTimeout }` instead of `{ deadline }`. The idle timer is correctly reset on each event using the stored `idleTimeout` value.
|
||||||
|
|
||||||
### M-04. No unsubscribe/completion signaling in call protocol
|
### M-04. No unsubscribe/completion signaling in call protocol
|
||||||
|
|
||||||
|
|||||||
28
src/call.ts
28
src/call.ts
@@ -17,7 +17,8 @@ export const CallEventSchema = {
|
|||||||
operationId: Type.String(),
|
operationId: Type.String(),
|
||||||
input: Type.Unknown(),
|
input: Type.Unknown(),
|
||||||
parentRequestId: Type.Optional(Type.String()),
|
parentRequestId: Type.Optional(Type.String()),
|
||||||
deadline: Type.Optional(Type.Number()),
|
deadline: Type.Optional(Type.Number({ description: "Absolute timestamp (ms since epoch) for call/response timeout. Used by calls." })),
|
||||||
|
idleTimeout: Type.Optional(Type.Number({ description: "Relative duration (ms) between events before subscription is considered idle. Used by subscriptions. Omit for no idle timeout." })),
|
||||||
identity: Type.Optional(Type.Object({
|
identity: Type.Optional(Type.Object({
|
||||||
id: Type.String(),
|
id: Type.String(),
|
||||||
scopes: Type.Array(Type.String()),
|
scopes: Type.Array(Type.String()),
|
||||||
@@ -65,7 +66,7 @@ interface PendingCall {
|
|||||||
interface SubscriptionState {
|
interface SubscriptionState {
|
||||||
push: Push<ResponseEnvelope>;
|
push: Push<ResponseEnvelope>;
|
||||||
stop: Stop;
|
stop: Stop;
|
||||||
deadline?: number;
|
idleTimeout?: number;
|
||||||
timer?: ReturnType<typeof setTimeout>;
|
timer?: ReturnType<typeof setTimeout>;
|
||||||
consumerStopped?: boolean;
|
consumerStopped?: boolean;
|
||||||
}
|
}
|
||||||
@@ -107,8 +108,8 @@ export class PendingRequestMap {
|
|||||||
} else {
|
} else {
|
||||||
if (entry.state.timer) {
|
if (entry.state.timer) {
|
||||||
clearTimeout(entry.state.timer);
|
clearTimeout(entry.state.timer);
|
||||||
if (entry.state.deadline) {
|
if (entry.state.idleTimeout != null) {
|
||||||
entry.state.timer = this.startSubscriptionTimer(responded.requestId, entry.state.deadline);
|
entry.state.timer = this.startIdleTimer(responded.requestId, entry.state.idleTimeout);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
entry.state.push(responded.output as ResponseEnvelope);
|
entry.state.push(responded.output as ResponseEnvelope);
|
||||||
@@ -163,15 +164,15 @@ export class PendingRequestMap {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private startSubscriptionTimer(requestId: string, deadline: number): ReturnType<typeof setTimeout> {
|
private startIdleTimer(requestId: string, idleTimeout: number): ReturnType<typeof setTimeout> {
|
||||||
return setTimeout(() => {
|
return setTimeout(() => {
|
||||||
const entry = this.entries.get(requestId);
|
const entry = this.entries.get(requestId);
|
||||||
if (!entry || entry.type !== "subscribe") return;
|
if (!entry || entry.type !== "subscribe") return;
|
||||||
if (entry.state.timer) clearTimeout(entry.state.timer);
|
if (entry.state.timer) clearTimeout(entry.state.timer);
|
||||||
entry.state.consumerStopped = true;
|
entry.state.consumerStopped = true;
|
||||||
this.pubsub.publish("call.aborted", "", { requestId });
|
this.pubsub.publish("call.aborted", "", { requestId });
|
||||||
entry.state.stop(new CallError(InfrastructureErrorCode.TIMEOUT, `Subscription ${requestId} timed out (idle)`, { deadline }));
|
entry.state.stop(new CallError(InfrastructureErrorCode.TIMEOUT, `Subscription ${requestId} timed out (idle after ${idleTimeout}ms)`, { idleTimeout }));
|
||||||
}, deadline);
|
}, idleTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
async call(
|
async call(
|
||||||
@@ -186,10 +187,11 @@ export class PendingRequestMap {
|
|||||||
|
|
||||||
if (options?.deadline) {
|
if (options?.deadline) {
|
||||||
pending.deadline = options.deadline;
|
pending.deadline = options.deadline;
|
||||||
|
const delay = Math.max(0, options.deadline - Date.now());
|
||||||
pending.timer = setTimeout(() => {
|
pending.timer = setTimeout(() => {
|
||||||
this.entries.delete(requestId);
|
this.entries.delete(requestId);
|
||||||
reject(new CallError(InfrastructureErrorCode.TIMEOUT, `Request ${requestId} timed out`, { deadline: options.deadline }));
|
reject(new CallError(InfrastructureErrorCode.TIMEOUT, `Request ${requestId} timed out`, { deadline: options.deadline }));
|
||||||
}, options.deadline - Date.now());
|
}, delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.entries.set(requestId, { type: "call", pending });
|
this.entries.set(requestId, { type: "call", pending });
|
||||||
@@ -208,16 +210,16 @@ export class PendingRequestMap {
|
|||||||
subscribe(
|
subscribe(
|
||||||
operationId: string,
|
operationId: string,
|
||||||
input: unknown,
|
input: unknown,
|
||||||
options?: { parentRequestId?: string; deadline?: number; identity?: Identity },
|
options?: { parentRequestId?: string; idleTimeout?: number; identity?: Identity },
|
||||||
): AsyncIterable<ResponseEnvelope> {
|
): AsyncIterable<ResponseEnvelope> {
|
||||||
const requestId = crypto.randomUUID();
|
const requestId = crypto.randomUUID();
|
||||||
|
|
||||||
const repeater = new Repeater<ResponseEnvelope>((push: Push<ResponseEnvelope>, stop: Stop) => {
|
const repeater = new Repeater<ResponseEnvelope>((push: Push<ResponseEnvelope>, stop: Stop) => {
|
||||||
const state: SubscriptionState = { push, stop };
|
const state: SubscriptionState = { push, stop };
|
||||||
|
|
||||||
if (options?.deadline) {
|
if (options?.idleTimeout != null) {
|
||||||
state.deadline = options.deadline;
|
state.idleTimeout = options.idleTimeout;
|
||||||
state.timer = this.startSubscriptionTimer(requestId, options.deadline);
|
state.timer = this.startIdleTimer(requestId, options.idleTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.entries.set(requestId, { type: "subscribe", state });
|
this.entries.set(requestId, { type: "subscribe", state });
|
||||||
@@ -227,7 +229,7 @@ export class PendingRequestMap {
|
|||||||
operationId,
|
operationId,
|
||||||
input,
|
input,
|
||||||
parentRequestId: options?.parentRequestId,
|
parentRequestId: options?.parentRequestId,
|
||||||
deadline: options?.deadline,
|
idleTimeout: options?.idleTimeout,
|
||||||
identity: options?.identity,
|
identity: options?.identity,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -956,11 +956,11 @@ describe("PendingRequestMap.subscribe()", () => {
|
|||||||
expect(iterationCompleted).toBe(true);
|
expect(iterationCompleted).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("times out on idle deadline", async () => {
|
it("times out on idle timeout", async () => {
|
||||||
const map = new PendingRequestMap();
|
const map = new PendingRequestMap();
|
||||||
const deadline = 80;
|
const idleTimeout = 80;
|
||||||
|
|
||||||
const subscribeIter = map.subscribe("test.slow", {}, { deadline });
|
const subscribeIter = map.subscribe("test.slow", {}, { idleTimeout });
|
||||||
let caughtError: unknown;
|
let caughtError: unknown;
|
||||||
|
|
||||||
const consumePromise = (async () => {
|
const consumePromise = (async () => {
|
||||||
@@ -981,9 +981,9 @@ describe("PendingRequestMap.subscribe()", () => {
|
|||||||
|
|
||||||
it("resets idle timeout on each envelope", async () => {
|
it("resets idle timeout on each envelope", async () => {
|
||||||
const map = new PendingRequestMap();
|
const map = new PendingRequestMap();
|
||||||
const deadline = 150;
|
const idleTimeout = 150;
|
||||||
|
|
||||||
const subscribeIter = map.subscribe("test.heartbeat", {}, { deadline });
|
const subscribeIter = map.subscribe("test.heartbeat", {}, { idleTimeout });
|
||||||
|
|
||||||
const results: ResponseEnvelope[] = [];
|
const results: ResponseEnvelope[] = [];
|
||||||
const consumePromise = (async () => {
|
const consumePromise = (async () => {
|
||||||
@@ -1056,6 +1056,37 @@ describe("PendingRequestMap.subscribe()", () => {
|
|||||||
await consumePromise;
|
await consumePromise;
|
||||||
expect(map.getPendingCount()).toBe(0);
|
expect(map.getPendingCount()).toBe(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("subscribe without idleTimeout never times out", async () => {
|
||||||
|
const map = new PendingRequestMap();
|
||||||
|
|
||||||
|
const subscribeIter = map.subscribe("test.persistent", {});
|
||||||
|
let eventCount = 0;
|
||||||
|
|
||||||
|
const consumePromise = (async () => {
|
||||||
|
for await (const _ of subscribeIter) {
|
||||||
|
eventCount++;
|
||||||
|
if (eventCount >= 1) break;
|
||||||
|
}
|
||||||
|
})();
|
||||||
|
|
||||||
|
await new Promise((r) => setTimeout(r, 50));
|
||||||
|
const requestId = [...map["entries"].keys()][0];
|
||||||
|
map.respond(requestId, localEnvelope("event", "test.persistent"));
|
||||||
|
|
||||||
|
await consumePromise;
|
||||||
|
expect(eventCount).toBe(1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("call() with past deadline times out immediately", async () => {
|
||||||
|
const map = new PendingRequestMap();
|
||||||
|
|
||||||
|
const deadline = Date.now() - 100;
|
||||||
|
const callPromise = map.call("test.op", { value: "hello" }, { deadline });
|
||||||
|
|
||||||
|
await expect(callPromise).rejects.toThrow("timed out");
|
||||||
|
await expect(callPromise).rejects.toBeInstanceOf(CallError);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("CallHandler SUBSCRIPTION dispatch", () => {
|
describe("CallHandler SUBSCRIPTION dispatch", () => {
|
||||||
|
|||||||
Reference in New Issue
Block a user