import { describe, it, expect, vi } from "vitest"; import { PendingRequestMap, buildCallHandler } from "../src/call.js"; import { CallError, InfrastructureErrorCode } from "../src/error.js"; import { OperationRegistry } from "../src/registry.js"; import { Type } from "@alkdev/typebox"; import { OperationType } from "../src/types.js"; import type { Identity } from "../src/types.js"; import { localEnvelope, httpEnvelope, mcpEnvelope, isResponseEnvelope } from "../src/response-envelope.js"; import type { ResponseEnvelope } from "../src/response-envelope.js"; describe("PendingRequestMap", () => { it("creates instance without event target", () => { const map = new PendingRequestMap(); expect(map.getPendingCount()).toBe(0); }); it("creates instance with event target", () => { const target = new EventTarget(); const map = new PendingRequestMap(target); expect(map.getPendingCount()).toBe(0); }); it("call() resolves when respond() is called with a ResponseEnvelope", async () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", { value: "hello" }); setTimeout(() => { const requestId = [...map["entries"].keys()][0]; map.respond(requestId, localEnvelope({ result: "world" }, "test.op")); }, 10); const result = await callPromise; expect(isResponseEnvelope(result)).toBe(true); expect(result.meta.source).toBe("local"); expect(result.data).toEqual({ result: "world" }); }); it("call() resolves with the full ResponseEnvelope", async () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", { value: "hello" }); const envelope = httpEnvelope({ status: "ok" }, { statusCode: 200, headers: { "content-type": "application/json" }, contentType: "application/json", }); setTimeout(() => { const requestId = [...map["entries"].keys()][0]; map.respond(requestId, envelope); }, 10); const result = await callPromise; expect(result.meta.source).toBe("http"); expect(result.data).toEqual({ status: "ok" }); if (result.meta.source === "http") { expect(result.meta.statusCode).toBe(200); } }); it("respond() throws when called with non-envelope value", () => { const map = new PendingRequestMap(); expect(() => map.respond("req-1", { result: "world" } as any)).toThrow("ResponseEnvelope"); }); it("call() rejects when emitError() is called", async () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", { value: "hello" }); setTimeout(() => { const requestId = [...map["entries"].keys()][0]; map.emitError(requestId, "CUSTOM_ERROR", "Something went wrong"); }, 10); await expect(callPromise).rejects.toThrow("Something went wrong"); await expect(callPromise).rejects.toBeInstanceOf(CallError); }); it("abort() rejects the pending call", async () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", { value: "hello" }); setTimeout(() => { const requestId = [...map["entries"].keys()][0]; map.abort(requestId); }, 10); await expect(callPromise).rejects.toThrow("was aborted"); await expect(callPromise).rejects.toBeInstanceOf(CallError); }); it("call() with deadline times out", async () => { const map = new PendingRequestMap(); const deadline = Date.now() + 50; const callPromise = map.call("test.op", { value: "hello" }, { deadline }); await expect(callPromise).rejects.toThrow("timed out"); await expect(callPromise).rejects.toBeInstanceOf(CallError); }); it("tracks pending requests", () => { const map = new PendingRequestMap(); map.call("test.op1", {}); map.call("test.op2", {}); expect(map.getPendingCount()).toBe(2); }); it("cleans up after call resolves", async () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", { value: "hello" }); expect(map.getPendingCount()).toBe(1); const requestId = [...map["entries"].keys()][0]; map.respond(requestId, localEnvelope({ result: "done" }, "test.op")); await callPromise; expect(map.getPendingCount()).toBe(0); }); it("respond() throws when called with a non-envelope value", () => { const map = new PendingRequestMap(); expect(() => map.respond("r1", { result: "raw" } as any)).toThrow( "PendingRequestMap.respond() requires a ResponseEnvelope" ); }); it("respond() throws when called with a null value", () => { const map = new PendingRequestMap(); expect(() => map.respond("r1", null as any)).toThrow( "PendingRequestMap.respond() requires a ResponseEnvelope" ); }); it("respond() throws when called with a string value", () => { const map = new PendingRequestMap(); expect(() => map.respond("r1", "raw string" as any)).toThrow( "PendingRequestMap.respond() requires a ResponseEnvelope" ); }); it("respond() accepts a localEnvelope", () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", {}); const requestId = [...map["entries"].keys()][0]; expect(() => map.respond(requestId, localEnvelope("data", "test.op"))).not.toThrow(); }); it("respond() accepts an httpEnvelope", () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", {}); const requestId = [...map["entries"].keys()][0]; expect(() => map.respond(requestId, httpEnvelope("data", { statusCode: 200, headers: {}, contentType: "text/plain", }))).not.toThrow(); }); it("respond() accepts an mcpEnvelope", () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", {}); const requestId = [...map["entries"].keys()][0]; expect(() => map.respond(requestId, mcpEnvelope("data", { isError: false, content: [], }))).not.toThrow(); }); it("call() returns Promise", async () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", { value: "hello" }); setTimeout(() => { const requestId = [...map["entries"].keys()][0]; map.respond(requestId, localEnvelope(42, "test.op")); }, 10); const result = await callPromise; expect(isResponseEnvelope(result)).toBe(true); expect(result.data).toBe(42); expect(result.meta.source).toBe("local"); }); }); describe("CallHandler", () => { function makeRegistry(ops?: Array>) { const registry = new OperationRegistry(); registry.register({ name: "echo", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "echo op", inputSchema: Type.Object({ value: Type.String() }), outputSchema: Type.Object({ value: Type.String() }), accessControl: { requiredScopes: [] }, handler: async (input: unknown) => ({ value: (input as { value: string }).value }), }); registry.register({ name: "voidOp", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "void op", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: [] }, handler: async () => undefined, }); registry.register({ name: "guarded", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "guarded op", inputSchema: Type.Object({}), outputSchema: Type.Object({ ok: Type.Boolean() }), accessControl: { requiredScopes: [], resourceType: "project", resourceAction: "read", }, handler: async () => ({ ok: true }), }); registry.register({ name: "open", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "open op", inputSchema: Type.Object({}), outputSchema: Type.Object({ ok: Type.Boolean() }), accessControl: { requiredScopes: [] }, handler: async () => ({ ok: true }), }); if (ops) { for (const op of ops) { registry.register(op as any); } } return registry; } it("wraps handler return value in localEnvelope and publishes call.responded", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.echo", { value: "hello" }); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.echo", input: { value: "hello" }, }); const result = await callPromise; expect(result.meta.source).toBe("local"); if (result.meta.source === "local") { expect(result.meta.operationId).toBe("test.echo"); expect(typeof result.meta.timestamp).toBe("number"); } expect(result.data).toEqual({ value: "hello" }); }); it("wraps undefined handler return value and publishes call.responded", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.voidOp", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.voidOp", input: {}, }); const result = await callPromise; expect(result.meta.source).toBe("local"); if (result.meta.source === "local") { expect(result.meta.operationId).toBe("test.voidOp"); } expect(result.data).toBeUndefined(); }); it("passes through a pre-built ResponseEnvelope from handler", async () => { const registry = new OperationRegistry(); const mcpResult = mcpEnvelope({ structured: true }, { isError: false, content: [{ type: "text" as const, text: "result" }], structuredContent: { structured: true }, }); registry.register({ name: "mcpOp", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "mcp op", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: [] }, handler: async () => mcpResult, }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.mcpOp", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.mcpOp", input: {}, }); const result = await callPromise; expect(result.meta.source).toBe("mcp"); expect(result.data).toEqual({ structured: true }); }); it("passes through httpEnvelope from handler", async () => { const registry = new OperationRegistry(); const httpResult = httpEnvelope({ items: [1, 2, 3] }, { statusCode: 200, headers: { "content-type": "application/json" }, contentType: "application/json", }); registry.register({ name: "httpOp", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "http op", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: [] }, handler: async () => httpResult, }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.httpOp", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.httpOp", input: {}, }); const result = await callPromise; expect(result.meta.source).toBe("http"); if (result.meta.source === "http") { expect(result.meta.statusCode).toBe(200); } expect(result.data).toEqual({ items: [1, 2, 3] }); }); it("publishes call.error when handler throws", async () => { const registry = new OperationRegistry(); registry.register({ name: "throws", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "throws op", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: [] }, handler: async () => { throw new Error("handler failed"); }, }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.throws", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.throws", input: {}, }); await expect(callPromise).rejects.toThrow("handler failed"); }); it("publishes call.error with CallError when handler throws CallError", async () => { const registry = new OperationRegistry(); registry.register({ name: "throwsCallError", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "throws call error", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: [] }, handler: async () => { throw new CallError("CUSTOM_CODE", "custom error message", { detail: "info" }); }, }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.throwsCallError", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.throwsCallError", input: {}, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe("CUSTOM_CODE"); expect((error as CallError).message).toBe("custom error message"); } }); it("publishes call.error when operation not found", async () => { const registry = new OperationRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.nonexistent", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.nonexistent", input: {}, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe(InfrastructureErrorCode.OPERATION_NOT_FOUND); } }); it("publishes call.error when handler not registered", async () => { const registry = new OperationRegistry(); registry.registerSpec({ name: "noHandler", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "op without handler", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: [] }, }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.noHandler", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.noHandler", input: {}, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe(InfrastructureErrorCode.OPERATION_NOT_FOUND); } }); it("publishes call.error on access denied", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const identity: Identity = { id: "user1", scopes: [] }; const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED); } }); it("publishes call.error for unknown operations with mapError defaults", async () => { const registry = new OperationRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.nonexistent", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.nonexistent", input: {}, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe(InfrastructureErrorCode.OPERATION_NOT_FOUND); } }); it("applies Value.Cast normalization via execute()", async () => { const registry = new OperationRegistry(); registry.register({ name: "defaultsFields", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "op with default fields", inputSchema: Type.Object({}), outputSchema: Type.Object({ name: Type.String(), count: Type.Number({ default: 0 }) }), accessControl: { requiredScopes: [] }, handler: async () => ({ name: "test" }), }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.defaultsFields", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.defaultsFields", input: {}, }); const result = await callPromise; expect(result.data).toEqual({ name: "test", count: 0 }); }); it("does not normalize with Value.Cast when outputSchema is Unknown", async () => { const registry = new OperationRegistry(); registry.register({ name: "unknownOutput", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "op with unknown output", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: [] }, handler: async () => ({ name: "test", extra: "field" }), }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.unknownOutput", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.unknownOutput", input: {}, }); const result = await callPromise; expect(result.data).toEqual({ name: "test", extra: "field" }); }); it("maps errors using spec errorSchemas", async () => { const registry = new OperationRegistry(); registry.register({ name: "customError", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "op with custom errors", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: [] }, errorSchemas: [ { code: "NOT_FOUND", description: "Item not found", schema: Type.Object({ id: Type.String() }) }, ], handler: async () => { throw new Error("NOT_FOUND: item 42"); }, }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const callPromise = callMap.call("test.customError", {}); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.customError", input: {}, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe("NOT_FOUND"); expect((error as CallError).message).toBe("NOT_FOUND: item 42"); } }); }); describe("checkAccess resource access control", () => { function makeRegistry(accessControlOverrides: Record = {}) { const registry = new OperationRegistry(); registry.register({ name: "guarded", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "guarded op", inputSchema: Type.Object({}), outputSchema: Type.Object({ ok: Type.Boolean() }), accessControl: { requiredScopes: [], resourceType: "project", resourceAction: "read", ...accessControlOverrides, }, handler: async () => ({ ok: true }), }); registry.register({ name: "open", namespace: "test", version: "1.0.0", type: OperationType.QUERY, description: "open op", inputSchema: Type.Object({}), outputSchema: Type.Object({ ok: Type.Boolean() }), accessControl: { requiredScopes: [], }, handler: async () => ({ ok: true }), }); return registry; } it("denies access when resourceType/resourceAction are set and identity.resources is undefined", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const identity: Identity = { id: "user1", scopes: [] }; const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED); } }); it("denies access when resourceType/resourceAction are set and identity.resources is empty", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const identity: Identity = { id: "user1", scopes: [], resources: {} }; const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED); } }); it("denies access when identity.resources has no matching resource type", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const identity: Identity = { id: "user1", scopes: [], resources: { "document:abc": ["read"] }, }; const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED); } }); it("denies access when identity.resources has matching type but wrong action", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const identity: Identity = { id: "user1", scopes: [], resources: { "project:abc": ["write"] }, }; const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, }); try { await callPromise; expect.fail("Should have thrown"); } catch (error) { expect(error).toBeInstanceOf(CallError); expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED); } }); it("grants access when identity.resources has matching type and action", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const identity: Identity = { id: "user1", scopes: [], resources: { "project:abc": ["read"] }, }; const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, }); const result = await callPromise; expect(result.data).toEqual({ ok: true }); }); it("grants access when neither resourceType nor resourceAction are set", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const identity: Identity = { id: "user1", scopes: [] }; const callPromise = callMap.call("test.open", {}, { identity }); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.open", input: {}, identity, }); const result = await callPromise; expect(result.data).toEqual({ ok: true }); }); it("grants access when identity.resources matches and identity has no scopes required", async () => { const registry = makeRegistry(); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const identity: Identity = { id: "user1", scopes: ["some:scope"], resources: { "project:xyz": ["read", "write"] }, }; const callPromise = callMap.call("test.guarded", {}, { identity }); await handler({ requestId: [...callMap["entries"].keys()][0], operationId: "test.guarded", input: {}, identity, }); const result = await callPromise; expect(result.data).toEqual({ ok: true }); }); }); describe("PendingRequestMap.subscribe()", () => { it("yields each envelope from call.responded events", async () => { const map = new PendingRequestMap(); const subscribeIter = map.subscribe("test.stream", { filter: "all" }); const results: ResponseEnvelope[] = []; const consumePromise = (async () => { for await (const envelope of subscribeIter) { results.push(envelope); if (results.length === 3) break; } })(); await new Promise((r) => setTimeout(r, 20)); const requestId = [...map["entries"].keys()][0]; map.respond(requestId, localEnvelope({ event: 1 }, "test.stream")); map.respond(requestId, localEnvelope({ event: 2 }, "test.stream")); map.respond(requestId, localEnvelope({ event: 3 }, "test.stream")); await consumePromise; expect(results).toHaveLength(3); expect(results[0].data).toEqual({ event: 1 }); expect(results[1].data).toEqual({ event: 2 }); expect(results[2].data).toEqual({ event: 3 }); expect(results[0].meta.source).toBe("local"); }); it("publishes call.aborted when consumer stops iterating", async () => { const map = new PendingRequestMap(); const subscribeIter = map.subscribe("test.stream", {}); let abortedReceived = false; const abortedIter = map["pubsub"].subscribe("call.aborted", ""); (async () => { for await (const envelope of abortedIter) { abortedReceived = true; } })(); const results: ResponseEnvelope[] = []; const consumePromise = (async () => { for await (const envelope of subscribeIter) { results.push(envelope); if (results.length === 1) break; } })(); await new Promise((r) => setTimeout(r, 20)); const requestId = [...map["entries"].keys()][0]; map.respond(requestId, localEnvelope("first", "test.stream")); await consumePromise; await new Promise((r) => setTimeout(r, 20)); expect(abortedReceived).toBe(true); }); it("throws CallError when call.error event arrives", async () => { const map = new PendingRequestMap(); const subscribeIter = map.subscribe("test.failing", {}); let caughtError: unknown; const consumePromise = (async () => { try { for await (const _ of subscribeIter) { // should not reach } } catch (error) { caughtError = error; } })(); await new Promise((r) => setTimeout(r, 20)); const requestId = [...map["entries"].keys()][0]; map.emitError(requestId, "CUSTOM_ERROR", "Subscription failed"); await consumePromise; expect(caughtError).toBeInstanceOf(CallError); expect((caughtError as CallError).code).toBe("CUSTOM_ERROR"); }); it("closes iterator on call.aborted event", async () => { const map = new PendingRequestMap(); const subscribeIter = map.subscribe("test.stream", {}); let iterationCompleted = false; const consumePromise = (async () => { for await (const _ of subscribeIter) { // will receive abort } iterationCompleted = true; })(); await new Promise((r) => setTimeout(r, 20)); const requestId = [...map["entries"].keys()][0]; const entry = map["entries"].get(requestId); expect(entry).toBeDefined(); expect(entry!.type).toBe("subscribe"); map["pubsub"].publish("call.aborted", "", { requestId }); await consumePromise; expect(iterationCompleted).toBe(true); }); it("times out on idle timeout", async () => { const map = new PendingRequestMap(); const idleTimeout = 80; const subscribeIter = map.subscribe("test.slow", {}, { idleTimeout }); let caughtError: unknown; const consumePromise = (async () => { try { for await (const _ of subscribeIter) { // should not receive any events } } catch (error) { caughtError = error; } })(); await consumePromise; expect(caughtError).toBeInstanceOf(CallError); expect((caughtError as CallError).code).toBe(InfrastructureErrorCode.TIMEOUT); }); it("resets idle timeout on each envelope", async () => { const map = new PendingRequestMap(); const idleTimeout = 150; const subscribeIter = map.subscribe("test.heartbeat", {}, { idleTimeout }); const results: ResponseEnvelope[] = []; const consumePromise = (async () => { for await (const envelope of subscribeIter) { results.push(envelope); if (results.length === 3) break; } })(); await new Promise((r) => setTimeout(r, 20)); const requestId = [...map["entries"].keys()][0]; await new Promise((r) => setTimeout(r, 50)); map.respond(requestId, localEnvelope("event1", "test.heartbeat")); await new Promise((r) => setTimeout(r, 50)); map.respond(requestId, localEnvelope("event2", "test.heartbeat")); await new Promise((r) => setTimeout(r, 50)); map.respond(requestId, localEnvelope("event3", "test.heartbeat")); await consumePromise; expect(results).toHaveLength(3); expect(results[0].data).toBe("event1"); expect(results[1].data).toBe("event2"); expect(results[2].data).toBe("event3"); }); it("abort() closes subscription iterator", async () => { const map = new PendingRequestMap(); const subscribeIter = map.subscribe("test.stream", {}); let iterationCompleted = false; const consumePromise = (async () => { for await (const _ of subscribeIter) { // will receive abort } iterationCompleted = true; })(); await new Promise((r) => setTimeout(r, 20)); const requestId = [...map["entries"].keys()][0]; map.abort(requestId); await consumePromise; expect(iterationCompleted).toBe(true); }); it("tracks subscribe entries in pending count", async () => { const map = new PendingRequestMap(); const subscribeIter = map.subscribe("test.stream", {}); const consumePromise = (async () => { for await (const _ of subscribeIter) { break; } })(); await new Promise((r) => setTimeout(r, 30)); expect(map.getPendingCount()).toBe(1); const requestId = [...map["entries"].keys()][0]; map.abort(requestId); await new Promise((r) => setTimeout(r, 20)); await consumePromise; expect(map.getPendingCount()).toBe(0); }); it("subscribe without idleTimeout never times out", async () => { const map = new PendingRequestMap(); const subscribeIter = map.subscribe("test.persistent", {}); let eventCount = 0; const consumePromise = (async () => { for await (const _ of subscribeIter) { eventCount++; if (eventCount >= 1) break; } })(); await new Promise((r) => setTimeout(r, 50)); const requestId = [...map["entries"].keys()][0]; map.respond(requestId, localEnvelope("event", "test.persistent")); await consumePromise; expect(eventCount).toBe(1); }); it("call() with past deadline times out immediately", async () => { const map = new PendingRequestMap(); const deadline = Date.now() - 100; const callPromise = map.call("test.op", { value: "hello" }, { deadline }); await expect(callPromise).rejects.toThrow("timed out"); await expect(callPromise).rejects.toBeInstanceOf(CallError); }); }); describe("CallHandler SUBSCRIPTION dispatch", () => { it("dispatches SUBSCRIPTION operations to subscribe()", async () => { const registry = new OperationRegistry(); registry.register({ name: "events", namespace: "test", version: "1.0.0", type: OperationType.SUBSCRIPTION, description: "event stream", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: [] }, handler: async function* (_input: unknown, _context: unknown) { yield "event1"; yield "event2"; }, }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const subscribeIter = callMap.subscribe("test.events", {}); const results: ResponseEnvelope[] = []; const consumePromise = (async () => { for await (const envelope of subscribeIter) { results.push(envelope); if (results.length === 2) break; } })(); await new Promise((r) => setTimeout(r, 20)); const requestId = [...callMap["entries"].keys()][0]; await handler({ requestId, operationId: "test.events", input: {}, }); await consumePromise; expect(results).toHaveLength(2); expect(results[0].data).toBe("event1"); expect(results[0].meta.source).toBe("local"); expect(results[1].data).toBe("event2"); }); it("publishes call.error for SUBSCRIPTION access denied", async () => { const registry = new OperationRegistry(); registry.register({ name: "guarded", namespace: "test", version: "1.0.0", type: OperationType.SUBSCRIPTION, description: "guarded sub", inputSchema: Type.Object({}), outputSchema: Type.Unknown(), accessControl: { requiredScopes: ["admin"] }, handler: async function* (_input: unknown, _context: unknown) { yield "secret"; }, }); const callMap = new PendingRequestMap(); const handler = buildCallHandler({ registry, callMap }); const identity: Identity = { id: "user1", scopes: [] }; const subscribeIter = callMap.subscribe("test.guarded", {}, { identity }); let caughtError: unknown; const consumePromise = (async () => { try { for await (const _ of subscribeIter) { // should not reach } } catch (error) { caughtError = error; } })(); await new Promise((r) => setTimeout(r, 20)); const requestId = [...callMap["entries"].keys()][0]; await handler({ requestId, operationId: "test.guarded", input: {}, identity, }); await consumePromise; expect(caughtError).toBeInstanceOf(CallError); expect((caughtError as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED); }); });