Merge branch 'feat/call-envelope-integration'

This commit is contained in:
2026-05-11 02:15:59 +00:00
3 changed files with 669 additions and 16 deletions

View File

@@ -1,9 +1,12 @@
import { Type, type Static } from "@alkdev/typebox";
import { Type, type Static, KindGuard } from "@alkdev/typebox";
import { Value } from "@alkdev/typebox/value";
import { createPubSub, type PubSub } from "@alkdev/pubsub";
import { getLogger } from "@logtape/logtape";
import { OperationRegistry } from "./registry.js";
import { CallError, InfrastructureErrorCode, mapError } from "./error.js";
import { validateOrThrow } from "./validation.js";
import { validateOrThrow, collectErrors, formatValueErrors } from "./validation.js";
import { ResponseEnvelopeSchema, isResponseEnvelope, localEnvelope } from "./response-envelope.js";
import type { ResponseEnvelope } from "./response-envelope.js";
import type { Identity, OperationContext, AccessControl, OperationSpec } from "./types.js";
const logger = getLogger("operations:call");
@@ -23,7 +26,7 @@ export const CallEventSchema = {
}),
"call.responded": Type.Object({
requestId: Type.String(),
output: Type.Unknown(),
output: ResponseEnvelopeSchema,
}),
"call.aborted": Type.Object({
requestId: Type.String(),
@@ -52,7 +55,7 @@ type CallPubSubMap = {
};
interface PendingRequest {
resolve: (value: unknown) => void;
resolve: (value: ResponseEnvelope) => void;
reject: (reason: unknown) => void;
deadline?: number;
timer?: ReturnType<typeof setTimeout>;
@@ -60,7 +63,7 @@ interface PendingRequest {
export interface CallHandlerConfig {
registry: OperationRegistry;
eventTarget?: EventTarget;
callMap?: PendingRequestMap;
}
export type CallHandler = (event: CallRequestedEvent) => Promise<void>;
@@ -85,7 +88,7 @@ export class PendingRequestMap {
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
this.requests.delete(responded.requestId);
pending.resolve(responded.output);
pending.resolve(responded.output as ResponseEnvelope);
}
}
})();
@@ -121,7 +124,7 @@ export class PendingRequestMap {
operationId: string,
input: unknown,
options?: { parentRequestId?: string; deadline?: number; identity?: Identity },
): Promise<unknown> {
): Promise<ResponseEnvelope> {
const requestId = crypto.randomUUID();
return new Promise((resolve, reject) => {
@@ -148,7 +151,10 @@ export class PendingRequestMap {
});
}
respond(requestId: string, output: unknown): void {
respond(requestId: string, output: ResponseEnvelope): void {
if (!isResponseEnvelope(output)) {
throw new Error("PendingRequestMap.respond() requires a ResponseEnvelope. Use isResponseEnvelope() to check values before calling respond().");
}
this.pubsub.publish("call.responded", "", {
requestId,
output,
@@ -180,7 +186,7 @@ export class PendingRequestMap {
}
export function buildCallHandler(config: CallHandlerConfig): CallHandler {
const { registry } = config;
const { registry, callMap } = config;
return async (event: CallRequestedEvent): Promise<void> => {
const { requestId, operationId, input, identity } = event;
@@ -223,11 +229,35 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler {
validateOrThrow(spec.inputSchema, input, `Input validation for ${operationId}`);
await handler(input, context);
const result = await handler(input, context);
let envelope: ResponseEnvelope;
if (isResponseEnvelope(result)) {
envelope = result as ResponseEnvelope;
} else {
envelope = localEnvelope(result, operationId);
}
if (!KindGuard.IsUnknown(spec.outputSchema)) {
envelope.data = Value.Cast(spec.outputSchema, envelope.data);
}
const errors = collectErrors(spec.outputSchema, envelope.data);
if (errors.length > 0) {
logger.warn(`Output validation failed for ${operationId}:\n${formatValueErrors(errors)}`);
}
if (callMap) {
callMap.respond(requestId, envelope);
}
} catch (error) {
const callError = mapError(error);
throw callError;
if (callMap) {
callMap.emitError(requestId, callError.code, callError.message, callError.details);
} else {
throw callError;
}
}
};
}

View File

@@ -0,0 +1,86 @@
---
id: call-envelope-integration
name: Update PendingRequestMap and CallHandler for ResponseEnvelope
status: completed
depends_on: [response-envelope-tests]
scope: broad
risk: medium
impact: project
level: implementation
---
## Description
Update `src/call.ts` to integrate `ResponseEnvelope` throughout the call protocol. This covers all changes documented in call-protocol.md and api-surface.md's "Source vs. Spec Drift" sections for ADR-005.
Changes needed:
### `CallEventSchema["call.responded"]`
- Change `output` from `Type.Unknown()` to `ResponseEnvelopeSchema`
### `PendingRequestMap.respond()`
- Add `isResponseEnvelope()` guard — throws if `output` is not a valid envelope
- This enforces the invariant that all call protocol responses carry source metadata
### `PendingRequestMap.call()`
- Return type changes from `Promise<unknown>` to `Promise<ResponseEnvelope>`
- Resolution: the `call.responded` subscription handler resolves with the `ResponseEnvelope` from `output` field (which is already validated by `respond()`)
### `CallHandler`
- **Handler model change**: Handler returns a value; `CallHandler` wraps and publishes. Handler does NOT publish `call.responded` itself.
- Capture handler return value
- Apply shared result pipeline:
1. Detect: `isResponseEnvelope(result)` → pass through
2. Wrap: `localEnvelope(result, operationId)`
3. Normalize: `Value.Cast(spec.outputSchema, envelope.data)` when `outputSchema !== Type.Unknown()`
4. Validate: `collectErrors` on `envelope.data` — warning-only
- Publish `call.responded` via `callMap.respond(requestId, envelope)`
- On handler exception: `mapError()` converts to `CallError`, publish `call.error`
**Note**: Access control in `CallHandler` stays as-is (checking `identity` and `checkAccess`). ADR-006's change to make `CallHandler` call `execute()` is a separate task.
## Acceptance Criteria
- [x] `CallEventSchema["call.responded"].output` is `ResponseEnvelopeSchema`
- [x] `PendingRequestMap.respond()` validates `output` with `isResponseEnvelope()`, throws on raw values
- [x] `PendingRequestMap.call()` return type is `Promise<ResponseEnvelope>`
- [x] `CallHandler` captures handler return value instead of discarding it
- [x] `CallHandler` applies result pipeline: detect → wrap → normalize → validate
- [x] `CallHandler` publishes `call.responded` via `callMap.respond()` with the envelope
- [x] `CallHandler` on handler exception publishes `call.error` (not re-throws)
- [x] Adapter handlers (pre-built envelopes via `mcpEnvelope`/`httpEnvelope`) pass through via `isResponseEnvelope()`
- [x] Existing `call.test.ts` tests updated for new return types and behavior
- [x] New tests for: envelope validation in `respond()`, envelope wrapping in `CallHandler`, envelope passthrough, Value.Cast normalization
- [x] `npm run build` passes
- [x] `npm run lint` passes
- [x] `npm test` passes
## References
- docs/architecture/call-protocol.md § Source vs. Spec Drift (ADR-005 items)
- docs/architecture/api-surface.md § Source vs. Spec Drift (ADR-005 items)
- docs/architecture/response-envelopes.md § CallHandler, § PendingRequestMap
- src/call.ts
## Notes
`CallHandlerConfig` changed from `{ registry, eventTarget? }` to `{ registry, callMap? }` to support publishing `call.responded` via `callMap.respond()`. When `callMap` is not provided, errors are thrown directly (backward-compatible for direct use without the call protocol).
`Value.Cast` in TypeBox does not strip excess properties from objects — it only fills defaults and upcasts values. The test was updated to verify default-filling behavior rather than property stripping.
## Summary
Integrated ResponseEnvelope throughout the call protocol in `src/call.ts`.
- Modified: `src/call.ts` (52 lines changed)
- `CallEventSchema["call.responded"].output``ResponseEnvelopeSchema`
- `PendingRequest.call()``Promise<ResponseEnvelope>`
- `PendingRequestMap.respond()` → validates with `isResponseEnvelope()`, throws on raw values
- `CallHandler` → captures return value, applies detect→wrap→normalize→validate pipeline, publishes via `callMap`
- `CallHandlerConfig``{ registry, callMap? }` replacing `eventTarget?`
- Added imports: `KindGuard`, `Value`, `collectErrors`, `formatValueErrors`, `ResponseEnvelopeSchema`, `isResponseEnvelope`, `localEnvelope`
- Modified: `test/call.test.ts` (547 lines added)
- Updated existing tests to pass `ResponseEnvelope` to `respond()`
- Added 23 new tests: envelope validation in `respond()`, envelope wrapping in `CallHandler`, envelope passthrough (mcp/http), `Value.Cast` normalization, error publishing, and no-callMap fallback behavior
- All 189 tests passing
- Build and lint passing

View File

@@ -1,10 +1,12 @@
import { describe, it, expect } from "vitest";
import { describe, it, expect, vi } from "vitest";
import { PendingRequestMap, buildCallHandler } from "../src/call.js";
import { CallError, InfrastructureErrorCode } from "../src/error.js";
import { OperationRegistry } from "../src/registry.js";
import { Type } from "@alkdev/typebox";
import { OperationType } from "../src/types.js";
import type { Identity } from "../src/types.js";
import { localEnvelope, httpEnvelope, mcpEnvelope, isResponseEnvelope } from "../src/response-envelope.js";
import type { ResponseEnvelope } from "../src/response-envelope.js";
describe("PendingRequestMap", () => {
it("creates instance without event target", () => {
@@ -18,18 +20,43 @@ describe("PendingRequestMap", () => {
expect(map.getPendingCount()).toBe(0);
});
it("call() resolves when respond() is called", async () => {
it("call() resolves when respond() is called with a ResponseEnvelope", async () => {
const map = new PendingRequestMap();
const callPromise = map.call("test.op", { value: "hello" });
setTimeout(() => {
const requestId = [...map["requests"].keys()][0];
map.respond(requestId, { result: "world" });
map.respond(requestId, localEnvelope({ result: "world" }, "test.op"));
}, 10);
const result = await callPromise;
expect(result).toEqual({ result: "world" });
expect(result.meta.source).toBe("local");
expect(result.data).toEqual({ result: "world" });
});
it("call() resolves with the full ResponseEnvelope", async () => {
const map = new PendingRequestMap();
const callPromise = map.call("test.op", { value: "hello" });
const envelope = httpEnvelope({ status: "ok" }, {
statusCode: 200,
headers: { "content-type": "application/json" },
contentType: "application/json",
});
setTimeout(() => {
const requestId = [...map["requests"].keys()][0];
map.respond(requestId, envelope);
}, 10);
const result = await callPromise;
expect(result.meta.source).toBe("http");
expect(result.data).toEqual({ status: "ok" });
if (result.meta.source === "http") {
expect(result.meta.statusCode).toBe(200);
}
});
it("call() rejects when emitError() is called", async () => {
@@ -83,11 +110,521 @@ describe("PendingRequestMap", () => {
expect(map.getPendingCount()).toBe(1);
const requestId = [...map["requests"].keys()][0];
map.respond(requestId, { result: "done" });
map.respond(requestId, localEnvelope({ result: "done" }, "test.op"));
await callPromise;
expect(map.getPendingCount()).toBe(0);
});
it("respond() throws when called with a non-envelope value", () => {
const map = new PendingRequestMap();
expect(() => map.respond("r1", { result: "raw" } as any)).toThrow(
"PendingRequestMap.respond() requires a ResponseEnvelope"
);
});
it("respond() throws when called with a null value", () => {
const map = new PendingRequestMap();
expect(() => map.respond("r1", null as any)).toThrow(
"PendingRequestMap.respond() requires a ResponseEnvelope"
);
});
it("respond() throws when called with a string value", () => {
const map = new PendingRequestMap();
expect(() => map.respond("r1", "raw string" as any)).toThrow(
"PendingRequestMap.respond() requires a ResponseEnvelope"
);
});
it("respond() accepts a localEnvelope", () => {
const map = new PendingRequestMap();
const callPromise = map.call("test.op", {});
const requestId = [...map["requests"].keys()][0];
expect(() => map.respond(requestId, localEnvelope("data", "test.op"))).not.toThrow();
});
it("respond() accepts an httpEnvelope", () => {
const map = new PendingRequestMap();
const callPromise = map.call("test.op", {});
const requestId = [...map["requests"].keys()][0];
expect(() => map.respond(requestId, httpEnvelope("data", {
statusCode: 200,
headers: {},
contentType: "text/plain",
}))).not.toThrow();
});
it("respond() accepts an mcpEnvelope", () => {
const map = new PendingRequestMap();
const callPromise = map.call("test.op", {});
const requestId = [...map["requests"].keys()][0];
expect(() => map.respond(requestId, mcpEnvelope("data", {
isError: false,
content: [],
}))).not.toThrow();
});
it("call() returns Promise<ResponseEnvelope>", async () => {
const map = new PendingRequestMap();
const callPromise = map.call("test.op", { value: "hello" });
setTimeout(() => {
const requestId = [...map["requests"].keys()][0];
map.respond(requestId, localEnvelope(42, "test.op"));
}, 10);
const result = await callPromise;
expect(isResponseEnvelope(result)).toBe(true);
expect(result.data).toBe(42);
expect(result.meta.source).toBe("local");
});
});
describe("CallHandler", () => {
function makeRegistry(ops?: Array<Record<string, unknown>>) {
const registry = new OperationRegistry();
registry.register({
name: "echo",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "echo op",
inputSchema: Type.Object({ value: Type.String() }),
outputSchema: Type.Object({ value: Type.String() }),
accessControl: { requiredScopes: [] },
handler: async (input: unknown) => ({ value: (input as { value: string }).value }),
});
registry.register({
name: "voidOp",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "void op",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
handler: async () => undefined,
});
registry.register({
name: "guarded",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "guarded op",
inputSchema: Type.Object({}),
outputSchema: Type.Object({ ok: Type.Boolean() }),
accessControl: {
requiredScopes: [],
resourceType: "project",
resourceAction: "read",
},
handler: async () => ({ ok: true }),
});
registry.register({
name: "open",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "open op",
inputSchema: Type.Object({}),
outputSchema: Type.Object({ ok: Type.Boolean() }),
accessControl: { requiredScopes: [] },
handler: async () => ({ ok: true }),
});
if (ops) {
for (const op of ops) {
registry.register(op as any);
}
}
return registry;
}
it("wraps handler return value in localEnvelope", async () => {
const registry = makeRegistry();
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.echo", { value: "hello" });
handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.echo",
input: { value: "hello" },
});
const result = await callPromise;
expect(result.meta.source).toBe("local");
if (result.meta.source === "local") {
expect(result.meta.operationId).toBe("test.echo");
expect(typeof result.meta.timestamp).toBe("number");
}
expect(result.data).toEqual({ value: "hello" });
});
it("wraps undefined handler return value in localEnvelope", async () => {
const registry = makeRegistry();
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.voidOp", {});
handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.voidOp",
input: {},
});
const result = await callPromise;
expect(result.meta.source).toBe("local");
if (result.meta.source === "local") {
expect(result.meta.operationId).toBe("test.voidOp");
}
expect(result.data).toBeUndefined();
});
it("passes through a pre-built ResponseEnvelope from handler", async () => {
const registry = new OperationRegistry();
const mcpResult = mcpEnvelope({ structured: true }, {
isError: false,
content: [{ type: "text" as const, text: "result" }],
structuredContent: { structured: true },
});
registry.register({
name: "mcpOp",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "mcp op",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
handler: async () => mcpResult,
});
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.mcpOp", {});
handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.mcpOp",
input: {},
});
const result = await callPromise;
expect(result.meta.source).toBe("mcp");
expect(result.data).toEqual({ structured: true });
});
it("passes through httpEnvelope from handler", async () => {
const registry = new OperationRegistry();
const httpResult = httpEnvelope({ items: [1, 2, 3] }, {
statusCode: 200,
headers: { "content-type": "application/json" },
contentType: "application/json",
});
registry.register({
name: "httpOp",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "http op",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
handler: async () => httpResult,
});
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.httpOp", {});
handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.httpOp",
input: {},
});
const result = await callPromise;
expect(result.meta.source).toBe("http");
if (result.meta.source === "http") {
expect(result.meta.statusCode).toBe(200);
}
expect(result.data).toEqual({ items: [1, 2, 3] });
});
it("publishes call.error when handler throws", async () => {
const registry = new OperationRegistry();
registry.register({
name: "throws",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "throws op",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
handler: async () => { throw new Error("handler failed"); },
});
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.throws", {});
await handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.throws",
input: {},
});
await expect(callPromise).rejects.toThrow("handler failed");
});
it("publishes call.error with CallError when handler throws CallError", async () => {
const registry = new OperationRegistry();
registry.register({
name: "throwsCallError",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "throws call error",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
handler: async () => {
throw new CallError("CUSTOM_CODE", "custom error message", { detail: "info" });
},
});
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.throwsCallError", {});
await handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.throwsCallError",
input: {},
});
try {
await callPromise;
expect.fail("Should have thrown");
} catch (error) {
expect(error).toBeInstanceOf(CallError);
expect((error as CallError).code).toBe("CUSTOM_CODE");
expect((error as CallError).message).toBe("custom error message");
}
});
it("applies Value.Cast normalization when outputSchema is not Unknown", async () => {
const registry = new OperationRegistry();
registry.register({
name: "defaultsFields",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "op with default fields",
inputSchema: Type.Object({}),
outputSchema: Type.Object({ name: Type.String(), count: Type.Number({ default: 0 }) }),
accessControl: { requiredScopes: [] },
handler: async () => ({ name: "test" }),
});
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.defaultsFields", {});
handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.defaultsFields",
input: {},
});
const result = await callPromise;
expect(result.data).toEqual({ name: "test", count: 0 });
});
it("does not normalize with Value.Cast when outputSchema is Unknown", async () => {
const registry = new OperationRegistry();
registry.register({
name: "unknownOutput",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "op with unknown output",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
handler: async () => ({ name: "test", extra: "field" }),
});
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.unknownOutput", {});
handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.unknownOutput",
input: {},
});
const result = await callPromise;
expect(result.data).toEqual({ name: "test", extra: "field" });
});
it("publishes call.error when operation not found", async () => {
const registry = new OperationRegistry();
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.nonexistent", {});
await handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.nonexistent",
input: {},
});
try {
await callPromise;
expect.fail("Should have thrown");
} catch (error) {
expect(error).toBeInstanceOf(CallError);
expect((error as CallError).code).toBe(InfrastructureErrorCode.OPERATION_NOT_FOUND);
}
});
it("publishes call.error when handler not registered", async () => {
const registry = new OperationRegistry();
registry.registerSpec({
name: "noHandler",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "op without handler",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
});
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const callPromise = callMap.call("test.noHandler", {});
await handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.noHandler",
input: {},
});
try {
await callPromise;
expect.fail("Should have thrown");
} catch (error) {
expect(error).toBeInstanceOf(CallError);
expect((error as CallError).code).toBe(InfrastructureErrorCode.OPERATION_NOT_FOUND);
}
});
it("publishes call.error on access denied", async () => {
const registry = makeRegistry();
const callMap = new PendingRequestMap();
const handler = buildCallHandler({ registry, callMap });
const identity: Identity = { id: "user1", scopes: [] };
const callPromise = callMap.call("test.guarded", {}, { identity });
await handler({
requestId: [...callMap["requests"].keys()][0],
operationId: "test.guarded",
input: {},
identity,
});
try {
await callPromise;
expect.fail("Should have thrown");
} catch (error) {
expect(error).toBeInstanceOf(CallError);
expect((error as CallError).code).toBe(InfrastructureErrorCode.ACCESS_DENIED);
}
});
it("throws error when no callMap and operation not found", async () => {
const registry = new OperationRegistry();
const handler = buildCallHandler({ registry });
await expect(
handler({
requestId: "r1",
operationId: "test.nonexistent",
input: {},
}),
).rejects.toThrow("Operation not found");
});
it("throws error when no callMap and access denied", async () => {
const registry = makeRegistry();
const handler = buildCallHandler({ registry });
const identity: Identity = { id: "user1", scopes: [] };
await expect(
handler({
requestId: "r1",
operationId: "test.guarded",
input: {},
identity,
}),
).rejects.toThrow("Access denied");
});
it("works without callMap for successful operations", async () => {
const registry = makeRegistry();
const handler = buildCallHandler({ registry });
const identity: Identity = {
id: "user1",
scopes: [],
resources: { "project:abc": ["read"] },
};
await expect(
handler({
requestId: "r1",
operationId: "test.guarded",
input: {},
identity,
}),
).resolves.toBeUndefined();
});
it("works without callMap for open operations", async () => {
const registry = makeRegistry();
const handler = buildCallHandler({ registry });
await expect(
handler({
requestId: "r1",
operationId: "test.open",
input: {},
}),
).resolves.toBeUndefined();
});
});
describe("checkAccess resource access control", () => {