diff --git a/src/call.ts b/src/call.ts index 7aa268e..c733fd0 100644 --- a/src/call.ts +++ b/src/call.ts @@ -1,10 +1,12 @@ -import { Type, type Static } from "@alkdev/typebox"; +import { Type, type Static, KindGuard } from "@alkdev/typebox"; +import { Value } from "@alkdev/typebox/value"; import { createPubSub, type PubSub } from "@alkdev/pubsub"; import { getLogger } from "@logtape/logtape"; import { OperationRegistry } from "./registry.js"; import { CallError, InfrastructureErrorCode, mapError } from "./error.js"; -import { validateOrThrow } from "./validation.js"; +import { validateOrThrow, collectErrors, formatValueErrors } from "./validation.js"; import type { Identity, OperationContext, AccessControl, OperationSpec } from "./types.js"; +import { ResponseEnvelopeSchema, isResponseEnvelope, localEnvelope, type ResponseEnvelope } from "./response-envelope.js"; const logger = getLogger("operations:call"); @@ -23,7 +25,7 @@ export const CallEventSchema = { }), "call.responded": Type.Object({ requestId: Type.String(), - output: Type.Unknown(), + output: ResponseEnvelopeSchema, }), "call.aborted": Type.Object({ requestId: Type.String(), @@ -52,7 +54,7 @@ type CallPubSubMap = { }; interface PendingRequest { - resolve: (value: unknown) => void; + resolve: (value: ResponseEnvelope) => void; reject: (reason: unknown) => void; deadline?: number; timer?: ReturnType; @@ -121,7 +123,7 @@ export class PendingRequestMap { operationId: string, input: unknown, options?: { parentRequestId?: string; deadline?: number; identity?: Identity }, - ): Promise { + ): Promise { const requestId = crypto.randomUUID(); return new Promise((resolve, reject) => { @@ -148,7 +150,10 @@ export class PendingRequestMap { }); } - respond(requestId: string, output: unknown): void { + respond(requestId: string, output: ResponseEnvelope): void { + if (!isResponseEnvelope(output)) { + throw new Error(`PendingRequestMap.respond() requires a ResponseEnvelope, got: ${typeof output}`); + } this.pubsub.publish("call.responded", "", { requestId, output, @@ -180,11 +185,16 @@ export class PendingRequestMap { } export function buildCallHandler(config: CallHandlerConfig): CallHandler { - const { registry } = config; + const { registry, eventTarget } = config; return async (event: CallRequestedEvent): Promise => { const { requestId, operationId, input, identity } = event; + let callMap: PendingRequestMap | undefined; + if (eventTarget) { + callMap = new PendingRequestMap(eventTarget); + } + try { const spec = registry.getSpec(operationId); @@ -223,10 +233,33 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler { validateOrThrow(spec.inputSchema, input, `Input validation for ${operationId}`); - await handler(input, context); + const result = await handler(input, context); + + let envelope: ResponseEnvelope; + if (isResponseEnvelope(result)) { + envelope = result as ResponseEnvelope; + } else { + envelope = localEnvelope(result, operationId); + } + + if (!KindGuard.IsUnknown(spec.outputSchema)) { + envelope.data = Value.Cast(spec.outputSchema, envelope.data); + } + + const errors = collectErrors(spec.outputSchema, envelope.data); + if (errors.length > 0) { + logger.warn(`Output validation failed for ${operationId}:\n${formatValueErrors(errors)}`); + } + + if (callMap) { + callMap.respond(requestId, envelope); + } } catch (error) { const callError = mapError(error); + if (callMap) { + callMap.emitError(requestId, callError.code, callError.message, callError.details); + } throw callError; } }; diff --git a/src/env.ts b/src/env.ts index ff78cd6..71c7a7d 100644 --- a/src/env.ts +++ b/src/env.ts @@ -1,12 +1,13 @@ import { OperationType } from "./types.js"; import type { OperationContext, OperationEnv, Identity } from "./types.js"; import type { OperationRegistry } from "./registry.js"; +import type { ResponseEnvelope } from "./response-envelope.js"; import { getLogger } from "@logtape/logtape"; const logger = getLogger("operations:env"); export interface CallMap { - call(operationId: string, input: unknown, options?: { parentRequestId?: string; deadline?: number; identity?: Identity }): Promise; + call(operationId: string, input: unknown, options?: { parentRequestId?: string; deadline?: number; identity?: Identity }): Promise; } export interface EnvOptions { diff --git a/src/registry.ts b/src/registry.ts index 3e18e25..7eb0539 100644 --- a/src/registry.ts +++ b/src/registry.ts @@ -1,7 +1,9 @@ import type { OperationContext, OperationSpec, OperationHandler, SubscriptionHandler } from "./types.js"; import { getLogger } from "@logtape/logtape"; import { Value } from "@alkdev/typebox/value"; +import { KindGuard } from "@alkdev/typebox"; import { assertIsSchema, validateOrThrow, collectErrors, formatValueErrors } from "./validation.js"; +import { isResponseEnvelope, localEnvelope, type ResponseEnvelope } from "./response-envelope.js"; const logger = getLogger("operations:registry"); @@ -81,7 +83,7 @@ export class OperationRegistry { operationId: string, input: TInput, context: OperationContext, - ): Promise { + ): Promise> { const spec = this.specs.get(operationId); if (!spec) { throw new Error(`Operation not found: ${operationId}`); @@ -94,13 +96,24 @@ export class OperationRegistry { validateOrThrow(spec.inputSchema, input, `Input validation failed for ${operationId}`); - const result = await handler(input, context) as TOutput; + const result = await handler(input, context); - const errors = collectErrors(spec.outputSchema, result); + let envelope: ResponseEnvelope; + if (isResponseEnvelope(result)) { + envelope = result as ResponseEnvelope; + } else { + envelope = localEnvelope(result as TOutput, operationId); + } + + if (!KindGuard.IsUnknown(spec.outputSchema)) { + envelope.data = Value.Cast(spec.outputSchema, envelope.data) as TOutput; + } + + const errors = collectErrors(spec.outputSchema, envelope.data); if (errors.length > 0) { logger.warn(`Output validation failed for ${operationId}:\n${formatValueErrors(errors)}`); } - return result; + return envelope; } } \ No newline at end of file diff --git a/src/subscribe.ts b/src/subscribe.ts index 7e6feec..9b51c15 100644 --- a/src/subscribe.ts +++ b/src/subscribe.ts @@ -1,12 +1,13 @@ import type { OperationContext } from "./types.js"; import { OperationRegistry } from "./registry.js"; +import { isResponseEnvelope, localEnvelope, type ResponseEnvelope } from "./response-envelope.js"; export async function* subscribe( registry: OperationRegistry, operationId: string, input: unknown, context: OperationContext, -): AsyncGenerator { +): AsyncGenerator { const spec = registry.getSpec(operationId); if (!spec) { @@ -23,7 +24,11 @@ export async function* subscribe( try { for await (const value of generator) { - yield value; + if (isResponseEnvelope(value)) { + yield value; + } else { + yield localEnvelope(value, operationId); + } } } finally { if (generator.return) { diff --git a/src/types.ts b/src/types.ts index becf464..875d49c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,4 +1,5 @@ import { Type, type Static, type TSchema } from "@alkdev/typebox"; +import type { ResponseEnvelope } from "./response-envelope.js"; export enum OperationType { QUERY = "query", @@ -12,7 +13,7 @@ export interface Identity { resources?: Record } -export type OperationEnv = Record Promise>> +export type OperationEnv = Record Promise>> export const OperationContextSchema = Type.Object({ metadata: Type.Optional(Type.Record(Type.String(), Type.Unknown())), diff --git a/test/call.test.ts b/test/call.test.ts index 84f918e..f762281 100644 --- a/test/call.test.ts +++ b/test/call.test.ts @@ -5,6 +5,7 @@ import { OperationRegistry } from "../src/registry.js"; import { Type } from "@alkdev/typebox"; import { OperationType } from "../src/types.js"; import type { Identity } from "../src/types.js"; +import { localEnvelope, isResponseEnvelope, type ResponseEnvelope } from "../src/response-envelope.js"; describe("PendingRequestMap", () => { it("creates instance without event target", () => { @@ -18,18 +19,25 @@ describe("PendingRequestMap", () => { expect(map.getPendingCount()).toBe(0); }); - it("call() resolves when respond() is called", async () => { + it("call() resolves when respond() is called with envelope", async () => { const map = new PendingRequestMap(); const callPromise = map.call("test.op", { value: "hello" }); setTimeout(() => { const requestId = [...map["requests"].keys()][0]; - map.respond(requestId, { result: "world" }); + map.respond(requestId, localEnvelope({ result: "world" }, "test.op")); }, 10); const result = await callPromise; - expect(result).toEqual({ result: "world" }); + expect(isResponseEnvelope(result)).toBe(true); + expect(result.meta.source).toBe("local"); + expect(result.data).toEqual({ result: "world" }); + }); + + it("respond() throws when called with non-envelope value", () => { + const map = new PendingRequestMap(); + expect(() => map.respond("req-1", { result: "world" } as any)).toThrow("ResponseEnvelope"); }); it("call() rejects when emitError() is called", async () => { @@ -83,7 +91,7 @@ describe("PendingRequestMap", () => { expect(map.getPendingCount()).toBe(1); const requestId = [...map["requests"].keys()][0]; - map.respond(requestId, { result: "done" }); + map.respond(requestId, localEnvelope({ result: "done" }, "test.op")); await callPromise; expect(map.getPendingCount()).toBe(0); diff --git a/test/env.test.ts b/test/env.test.ts index 2c18de1..b0ab153 100644 --- a/test/env.test.ts +++ b/test/env.test.ts @@ -2,6 +2,7 @@ import { describe, it, expect } from "vitest"; import { OperationRegistry, OperationType, buildEnv, type IOperationDefinition, type OperationContext } from "../src/index.js"; import * as Type from "@alkdev/typebox"; import { PendingRequestMap } from "../src/call.js"; +import { localEnvelope, isResponseEnvelope, type ResponseEnvelope } from "../src/response-envelope.js"; function makeOperation(name: string, handler?: any): IOperationDefinition { return { @@ -33,7 +34,9 @@ describe("buildEnv", () => { expect(typeof env.test.writeFile).toBe("function"); const result = await env.test.readFile({ value: "test" }); - expect(result).toEqual({ result: "test" }); + expect(isResponseEnvelope(result)).toBe(true); + expect(result.meta.source).toBe("local"); + expect(result.data).toEqual({ result: "test" }); }); it("filters out SUBSCRIPTION operations", () => { @@ -76,8 +79,8 @@ describe("buildEnv", () => { registry.register(makeOperation("readFile")); const callMap = { - call: async (opId: string, input: unknown, opts?: any) => { - return { result: `routed: ${opId}` }; + call: async (opId: string, input: unknown, opts?: any): Promise => { + return localEnvelope({ result: `routed: ${opId}` }, opId); }, }; @@ -88,6 +91,7 @@ describe("buildEnv", () => { }); const result = await env.test.readFile({ value: "test" }); - expect(result).toEqual({ result: "routed: test.readFile" }); + expect(isResponseEnvelope(result)).toBe(true); + expect(result.data).toEqual({ result: "routed: test.readFile" }); }); }); \ No newline at end of file diff --git a/test/registry.test.ts b/test/registry.test.ts index ddbebf1..0ded768 100644 --- a/test/registry.test.ts +++ b/test/registry.test.ts @@ -1,6 +1,7 @@ import { describe, it, expect } from "vitest"; import { OperationRegistry } from "../src/registry.js"; import { OperationType, type IOperationDefinition, type OperationContext, type OperationSpec, type OperationHandler } from "../src/index.js"; +import { isResponseEnvelope, localEnvelope, type ResponseEnvelope } from "../src/response-envelope.js"; import * as Type from "@alkdev/typebox"; import { Value } from "@alkdev/typebox/value"; @@ -89,11 +90,64 @@ describe("OperationRegistry", () => { expect(registry.getAllSpecs()).toHaveLength(2); }); - it("executes an operation and validates input", async () => { + it("executes an operation and returns ResponseEnvelope", async () => { const registry = new OperationRegistry(); registry.register(makeOperation()); - const result = await registry.execute("test.testOp", { value: "hello" }, {} as OperationContext); - expect(result).toEqual({ result: "processed: hello" }); + const envelope = await registry.execute("test.testOp", { value: "hello" }, {} as OperationContext); + expect(isResponseEnvelope(envelope)).toBe(true); + expect(envelope.meta.source).toBe("local"); + expect((envelope.meta as any).operationId).toBe("test.testOp"); + expect(envelope.data).toEqual({ result: "processed: hello" }); + }); + + it("wraps raw handler result in localEnvelope", async () => { + const registry = new OperationRegistry(); + registry.register(makeOperation()); + const envelope = await registry.execute("test.testOp", { value: "hello" }, {} as OperationContext); + expect(envelope.meta.source).toBe("local"); + expect(envelope.data).toEqual({ result: "processed: hello" }); + expect(typeof (envelope.meta as any).timestamp).toBe("number"); + }); + + it("passes through pre-built ResponseEnvelope from handler", async () => { + const registry = new OperationRegistry(); + const preBuilt = localEnvelope({ custom: "data" }, "custom.op"); + registry.register(makeOperation({ + handler: async () => preBuilt, + })); + const envelope = await registry.execute("test.testOp", { value: "x" }, {} as OperationContext); + expect(envelope).toBe(preBuilt); + }); + + it("normalizes output with Value.Cast when outputSchema is not Unknown", async () => { + const registry = new OperationRegistry(); + registry.register(makeOperation({ + outputSchema: Type.Object({ result: Type.String(), count: Type.Number() }), + handler: async () => ({ result: "ok" }), + })); + const envelope = await registry.execute("test.testOp", { value: "x" }, {} as OperationContext); + expect((envelope.data as any).result).toBe("ok"); + expect((envelope.data as any).count).toBe(0); + }); + + it("does not normalize when outputSchema is Unknown", async () => { + const registry = new OperationRegistry(); + registry.register(makeOperation({ + outputSchema: Type.Unknown(), + handler: async () => ({ anything: "goes", extra: 42 }), + })); + const envelope = await registry.execute("test.testOp", { value: "x" }, {} as OperationContext); + expect(envelope.data).toEqual({ anything: "goes", extra: 42 }); + }); + + it("normalizes mismatched output via Value.Cast and returns envelope", async () => { + const registry = new OperationRegistry(); + registry.register(makeOperation({ + handler: async () => ({ unexpected: "field" }), + })); + const envelope = await registry.execute("test.testOp", { value: "x" }, {} as OperationContext); + expect(isResponseEnvelope(envelope)).toBe(true); + expect((envelope.data as any).result).toBe(""); }); it("throws on invalid input", async () => { @@ -119,15 +173,6 @@ describe("OperationRegistry", () => { ).rejects.toThrow("No handler registered"); }); - it("warns on output mismatch but returns result", async () => { - const registry = new OperationRegistry(); - registry.register(makeOperation({ - handler: async () => ({ unexpected: "field" }), - })); - const result = await registry.execute("test.testOp", { value: "x" }, {} as OperationContext); - expect(result).toEqual({ unexpected: "field" }); - }); - it("registerSpec and registerHandler separately", async () => { const registry = new OperationRegistry(); const spec = makeSpec(); @@ -138,8 +183,9 @@ describe("OperationRegistry", () => { expect(retrieved.name).toBe("testOp"); expect(retrieved.handler).toBeDefined(); - const result = await registry.execute("test.testOp", { value: "hello" }, {} as OperationContext); - expect(result).toEqual({ result: "processed: hello" }); + const envelope = await registry.execute("test.testOp", { value: "hello" }, {} as OperationContext); + expect(envelope.data).toEqual({ result: "processed: hello" }); + expect(envelope.meta.source).toBe("local"); }); it("registerHandler throws for unknown operation", () => {