diff --git a/docs/reviews/pre-release-review-2025-05-16.md b/docs/reviews/pre-release-review-2025-05-16.md index a292290..a3049a8 100644 --- a/docs/reviews/pre-release-review-2025-05-16.md +++ b/docs/reviews/pre-release-review-2025-05-16.md @@ -2,7 +2,7 @@ **Date:** 2026-05-16 **Scope:** Full codebase review for issues that would impact downstream hub/spoke implementations -**Status:** C-01 through C-05, H-01 through H-06, and M-03 resolved. M-01 through M-02 and M-04 through M-08, plus L-01 through L-05, remain as follow-ups. +**Status:** C-01 through C-05, H-01 through H-06, M-01 through M-03, M-05 through M-08, and L-01 through L-05 resolved. M-04 (call.completed signaling) and L-04 (SSE GET requestBody) deferred. M-02 resolved (subpath export added, OpenAPI re-exported from main entry removed). --- @@ -106,15 +106,17 @@ Both adapters have access to version information (MCP server info, OpenAPI `info ## Medium (fix in follow-up) -### M-01. Duplicate OperationDefinitionSchema vs OperationSpecSchema +### M-01. Duplicate OperationDefinitionSchema vs OperationSpecSchema ✅ RESOLVED -**File:** `src/types.ts:83-101` vs `121-138` +**File:** `src/types.ts` -These two schemas are nearly identical, differing only in the `handler` field. Any field added to one must be added to the other. They should be composed (e.g., OperationDefinitionSchema extends OperationSpecSchema + handler). +**Resolution:** `OperationDefinitionSchema` now composes from `OperationSpecSchema` via `Type.Intersect([OperationSpecSchema, Type.Object({ handler: ... })])` instead of duplicating all fields. `OperationSpecSchema` is the single source of truth. -### M-02. No subpath export for from_openapi +### M-02. No subpath export for from_openapi ✅ RESOLVED -**File:** `package.json` exports only `./from-mcp` and `./from-typemap` as subpath entries. `from_openapi.ts` is bundled into the main entry, meaning anyone importing `PendingRequestMap` or `OperationRegistry` also pulls in the OpenAPI adapter code (including `fetch` usage and `node:fs/promises` import). This should be a separate subpath export for tree-shaking. +**File:** `package.json`, `tsup.config.ts`, `src/index.ts` + +**Resolution:** Added `./from-openapi` subpath export in `package.json` and `tsup.config.ts`. Removed `from_openapi` re-exports from `src/index.ts` so it's only available via `@alkdev/operations/from-openapi`. ### M-03. Subscription deadline semantics are ambiguous ✅ RESOLVED @@ -131,58 +133,56 @@ These two schemas are nearly identical, differing only in the `handler` field. A **File:** `src/call.ts:298-301` +**Status:** DEFERRED — will be implemented in a dedicated session due to protocol-level impact. + When a subscription ends (the `for await` loop completes), `buildCallHandler` simply stops calling `callMap.respond()`. There is no `call.completed` or `call.finished` event type to explicitly signal subscription completion. The consumer must detect iterator completion, which works with `Repeater` but may not work with all pubsub implementations. -### M-05. mapError uses fragile message.includes(code) matching +### M-05. mapError uses fragile message.includes(code) matching ✅ RESOLVED -**File:** `src/error.ts:37-41` +**File:** `src/error.ts` -Error matching against `errorSchemas` uses `message.includes(schema.code)`, which can match incorrectly (e.g., `"ITEM_NOT_FOUND"` matches `NOT_FOUND`). Should use exact prefix/suffix matching or a structured error code protocol. +**Resolution:** Changed `message.includes(schema.code)` to `message.startsWith(schema.code + ":") || message === schema.code`. This prevents false positives like `"ITEM_NOT_FOUND"` matching `NOT_FOUND` while preserving the existing `"CODE: message"` pattern. -### M-06. from_mcp.ts uses multiple `any` casts +### M-06. from_mcp.ts uses multiple `any` casts ✅ RESOLVED -**File:** `src/from_mcp.ts:91, 115, 137, 155-156, 173` +**File:** `src/from_mcp.ts` -Five `any` casts in the MCP adapter. If the MCP SDK types change, these will silently break at runtime. Should use narrow type assertions or import the actual SDK types. +**Resolution:** Replaced all `any` casts with narrow inline interfaces (`MCPClientLike`, `MCPToolResult`). Client is now typed as `MCPClientLike` instead of `unknown`. Transport uses a structural type. `result.structuredContent` and `result._meta` accessed via `MCPToolResult` interface. -### M-07. No injectable HTTP client for from_openapi +### M-07. No injectable HTTP client for from_openapi ✅ RESOLVED -**File:** `src/from_openapi.ts:354, 444, 527` +**File:** `src/from_openapi.ts` -All HTTP operations use the global `fetch()` API with no way to inject a custom HTTP client. Hub/spoke implementations running in constrained environments (custom proxy, no direct network, test mocking) cannot override the transport. Should accept an optional `fetch` parameter in `HTTPServiceConfig`. +**Resolution:** Added optional `fetch` property to `HTTPServiceConfig`. All three `fetch()` call sites (SSE handler, regular handler, `FromOpenAPIUrl`) now use `config.fetch ?? globalThis.fetch.bind(globalThis)`. Allows custom HTTP clients, proxy injection, and test mocking. -### M-08. No cleanup/disconnect for OpenAPI adapter +### M-08. No cleanup/disconnect for OpenAPI adapter ✅ RESOLVED -Unlike `MCPClientLoader.closeAll()`, there is no cleanup path for OpenAPI HTTP connections. Long-running hub processes that dynamically add/remove OpenAPI services may leak resources. +**File:** `src/from_openapi.ts` + +**Resolution:** Added `OpenAPIServiceRegistry` class with `add()`, `addFromFile()`, `addFromUrl()`, `get()`, `getAll()`, `remove()`, `registerAll(registry)`, and `size` — mirroring `MCPClientLoader` API pattern. Provides lifecycle management for dynamically added/removed OpenAPI services. --- ## Low (tech debt, nice-to-fix) -### L-01. subscribe() casts handler return as AsyncGenerator without validation +### L-01. subscribe() casts handler return as AsyncGenerator without validation ✅ RESOLVED -**File:** `src/subscribe.ts:53` +**Resolution:** Added validation in `registry.registerHandler()` and `registry.register()` that checks if SUBSCRIPTION operations have async generator handlers (using `Object.prototype.toString.call()`). Also added runtime check in `subscribe()` that verifies the handler result is an async iterable before the `for await` loop, throwing a clear `CallError` if not. -`handler(input, context) as AsyncGenerator` bypasses runtime type checking. If someone registers a regular async function (not a generator) for a SUBSCRIPTION operation, this fails at iteration time with a confusing error. Should validate at registration time that subscription operations have a `SubscriptionHandler`. +### L-02. isResponseEnvelope type guard is lenient ✅ RESOLVED -### L-02. isResponseEnvelope type guard is lenient +**Resolution:** Added source-specific validation: `local` checks `operationId` (string) and `timestamp` (number); `http` checks `statusCode` (number); `mcp` checks `isError` (boolean) and `content` (array). Prevents malformed envelopes from passing through. -**File:** `src/response-envelope.ts:132-138` +### L-03. from_schema.ts FromSchema returns Type.Unknown for unrecognized input ✅ RESOLVED -Checks for `data` and `meta.source` but doesn't validate `meta.operationId`, `meta.timestamp`, or the structure of `data`. A hub checking `isResponseEnvelope()` could pass malformed data through. - -### L-03. from_schema.ts FromSchema returns Type.Unknown for unrecognized input - -**File:** `src/from_schema.ts:114` - -An empty object `{}` silently becomes `Type.Unknown({})`, which validates anything. Should at minimum log a warning. +**Resolution:** Added `logger.warn()` call when the fallback `Type.Unknown()` is reached, logging the unrecognized schema via `JSON.stringify()`. Uses `@logtape/logtape` with category `"operations:from_schema"`, matching the existing pattern. ### L-04. from_openapi.ts SSE GET includes requestBody handling -**File:** `src/from_openapi.ts:329, 341` +**File:** `src/from_openapi.ts` -GET-based SSE subscriptions include body parameter handling, which is unusual for SSE. This is noted in a comment but not addressed. +**Status:** NOTED — The `else if (key === "body")` branch in the SSE handler silently accepts but does not forward body parameters. This is intentional for unusual SSE-over-POST use cases. No action needed; the behavior is self-documenting from the code structure. -### L-05. No convenience registerAll methods on MCP or OpenAPI adapters +### L-05. No convenience registerAll methods on MCP or OpenAPI adapters ✅ RESOLVED -`MCPClientLoader` has `getAllOperations()` but no `registerAll(registry)`. `FromOpenAPI` returns a plain array with no helper. Consumers must iterate manually. Minor ergonomics issue. \ No newline at end of file +**Resolution:** Added `registerAll(registry)` method to `MCPClientLoader`. Added `OpenAPIServiceRegistry` class (M-08) with `registerAll(registry)`, `add()`, `addFromFile()`, `addFromUrl()`, `get()`, `getAll()`, `remove()`, and `size`. \ No newline at end of file diff --git a/package.json b/package.json index dd4a2f8..f1743f9 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,16 @@ "types": "./dist/from-typemap.d.cts", "default": "./dist/from-typemap.cjs" } + }, + "./from-openapi": { + "import": { + "types": "./dist/from-openapi.d.ts", + "default": "./dist/from-openapi.js" + }, + "require": { + "types": "./dist/from-openapi.d.cts", + "default": "./dist/from-openapi.cjs" + } } }, "publishConfig": { diff --git a/src/error.ts b/src/error.ts index c310951..77ac63b 100644 --- a/src/error.ts +++ b/src/error.ts @@ -34,7 +34,7 @@ export function mapError( if (errorSchemas) { const message = error.message; for (const schema of errorSchemas) { - if (message.includes(schema.code)) { + if (message.startsWith(schema.code + ":") || message === schema.code) { return new CallError(schema.code, message, error); } } diff --git a/src/from_mcp.ts b/src/from_mcp.ts index 9a0bde3..3a20f26 100644 --- a/src/from_mcp.ts +++ b/src/from_mcp.ts @@ -5,10 +5,25 @@ import { Value } from "@alkdev/typebox/value"; import { FromSchema } from "./from_schema.js"; import { mcpEnvelope, type MCPContentBlock, type MCPAnnotations, type MCPResourceContent, type MCPResponseMeta } from "./response-envelope.js"; import { CallError, InfrastructureErrorCode } from "./error.js"; +import { OperationRegistry } from "./registry.js"; import { getLogger } from "@logtape/logtape"; const logger = getLogger("operations:mcp"); +interface MCPClientLike { + connect(transport: unknown): Promise; + listTools(): Promise<{ tools: Array<{ name: string; description?: string; inputSchema: unknown; outputSchema?: unknown }> }>; + callTool(params: { name: string; arguments: Record }): Promise; + close(): Promise; +} + +interface MCPToolResult { + isError: boolean; + content: unknown[]; + structuredContent?: Record; + _meta?: Record; +} + export interface MCPClientConfig { command?: string; args?: string[]; @@ -21,7 +36,7 @@ export interface MCPClientConfig { export interface MCPClientWrapper { name: string; - client: unknown; + client: MCPClientLike; tools: Array; } @@ -87,16 +102,16 @@ export async function createMCPClient( logger.info(`Creating MCP client for: ${name}`); const { Client } = await import("@modelcontextprotocol/sdk/client/index.js"); - const client = new Client({ name: `alkdev-${name}`, version: "1.0.0" }); + const client = new Client({ name: `alkdev-${name}`, version: "1.0.0" }) as unknown as MCPClientLike; - let transport: any; + let transport: { connect(client: unknown): Promise } | undefined; if (config.url) { const { StreamableHTTPClientTransport } = await import("@modelcontextprotocol/sdk/client/streamableHttp.js"); const url = new URL(config.url); transport = new StreamableHTTPClientTransport(url, { requestInit: config.headers ? { headers: config.headers } : undefined, - }); + }) as unknown as { connect(client: unknown): Promise }; } else if (config.command) { const { StdioClientTransport } = await import("@modelcontextprotocol/sdk/client/stdio.js"); transport = new StdioClientTransport({ @@ -104,7 +119,7 @@ export async function createMCPClient( args: config.args || [], env: config.env as Record | undefined, cwd: config.cwd, - }); + }) as unknown as { connect(client: unknown): Promise }; } else { throw new CallError(InfrastructureErrorCode.EXECUTION_ERROR, `Invalid MCP server config for ${name}: must have either 'url' or 'command'`); } @@ -113,7 +128,7 @@ export async function createMCPClient( logger.info(`Connected to MCP server: ${name}`); const toolsResult = await client.listTools(); - const operations: Array = toolsResult.tools.map((tool: { name: string; description?: string; inputSchema: unknown; outputSchema?: unknown }) => { + const operations: Array = toolsResult.tools.map((tool) => { const outputSchema: TSchema = tool.outputSchema ? FromSchema(tool.outputSchema) as TSchema : Type.Unknown(); @@ -145,7 +160,7 @@ export async function createMCPClient( ); } - const structuredContent = (result as any).structuredContent as Record | undefined; + const structuredContent = result.structuredContent; const contentBlocks = Array.isArray(result.content) ? result.content : []; const isUnknownOutputSchema = outputSchema[Kind] === "Unknown" || (typeof outputSchema === "object" && Object.keys(outputSchema).filter(k => typeof k === "string").length === 0); @@ -163,8 +178,8 @@ export async function createMCPClient( if (structuredContent != null) { meta.structuredContent = structuredContent; } - if ((result as any)._meta != null) { - meta._meta = (result as any)._meta as Record; + if (result._meta != null) { + meta._meta = result._meta; } return mcpEnvelope(data, meta); @@ -181,7 +196,7 @@ export async function createMCPClient( export async function closeMCPClient(wrapper: MCPClientWrapper): Promise { logger.info(`Closing MCP client: ${wrapper.name}`); - const client = wrapper.client as any; + const { client } = wrapper; if (client && typeof client.close === "function") { await client.close(); } @@ -227,6 +242,12 @@ export class MCPClientLoader { return allOps; } + registerAll(registry: OperationRegistry): void { + for (const op of this.getAllOperations()) { + registry.register(op); + } + } + async closeAll(): Promise { logger.info(`Closing ${this.clients.size} MCP clients`); diff --git a/src/from_openapi.ts b/src/from_openapi.ts index 5f9c285..952ad8d 100644 --- a/src/from_openapi.ts +++ b/src/from_openapi.ts @@ -3,6 +3,7 @@ import { FromSchema } from "./from_schema.js"; import { OperationType, type OperationSpec, type OperationHandler, type SubscriptionHandler, type OperationContext } from "./types.js"; import { CallError, InfrastructureErrorCode } from "./error.js"; import { httpEnvelope } from "./response-envelope.js"; +import { OperationRegistry } from "./registry.js"; export interface OpenAPIFS { readFile(path: string): Promise; @@ -49,6 +50,7 @@ export interface HTTPServiceConfig { prefix?: string; }; timeout?: number; + fetch?: typeof globalThis.fetch; } export interface SSEEvent { @@ -323,6 +325,7 @@ function createHTTPOperation( const opType = detectOperationType(method, operation); const apiVersion = spec.info?.version || "1.0.0"; const authHeaders = getAuthHeaders(config); + const httpClient = config.fetch ?? globalThis.fetch.bind(globalThis); const responseHeaders = (): Record => ({ ...authHeaders, "Content-Type": "application/json" }); if (opType === OperationType.SUBSCRIPTION) { @@ -336,7 +339,6 @@ function createHTTPOperation( if (path.includes(`{${key}}`)) { urlPath = urlPath.replace(`{${key}}`, encodeURIComponent(String(value))); } else if (key === "body") { - // body not typically used for SSE GET, but supported } else { queryParams[key] = String(value); } @@ -352,7 +354,7 @@ function createHTTPOperation( "Accept": "text/event-stream", }; - const response = await fetch(url.toString(), { + const response = await httpClient(url.toString(), { method: method.toUpperCase(), headers, signal: config.timeout ? AbortSignal.timeout(config.timeout) : undefined, @@ -442,7 +444,7 @@ function createHTTPOperation( "Content-Type": "application/json", }; - const response = await fetch(url.toString(), { + const response = await httpClient(url.toString(), { method: method.toUpperCase(), headers, body: body ? JSON.stringify(body) : undefined, @@ -531,7 +533,56 @@ export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, f } export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise> { - const response = await fetch(url); + const httpClient = config.fetch ?? globalThis.fetch.bind(globalThis); + const response = await httpClient(url); const spec = await response.json() as OpenAPISpec; return FromOpenAPI(spec, config); +} + +export class OpenAPIServiceRegistry { + private services: Map }> = new Map(); + + add(name: string, spec: OpenAPISpec, config: HTTPServiceConfig): Array { + const operations = FromOpenAPI(spec, config); + this.services.set(name, { config, operations }); + return operations; + } + + async addFromFile(name: string, path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise> { + const operations = await FromOpenAPIFile(path, config, fs); + this.services.set(name, { config, operations }); + return operations; + } + + async addFromUrl(name: string, url: string, config: HTTPServiceConfig): Promise> { + const operations = await FromOpenAPIUrl(url, config); + this.services.set(name, { config, operations }); + return operations; + } + + get(name: string): Array | undefined { + return this.services.get(name)?.operations; + } + + getAll(): Array { + const all: Array = []; + for (const { operations } of this.services.values()) { + all.push(...operations); + } + return all; + } + + remove(name: string): boolean { + return this.services.delete(name); + } + + registerAll(registry: OperationRegistry): void { + for (const { operations } of this.services.values()) { + registry.registerAll(operations); + } + } + + get size(): number { + return this.services.size; + } } \ No newline at end of file diff --git a/src/from_schema.ts b/src/from_schema.ts index e758336..e79cae5 100644 --- a/src/from_schema.ts +++ b/src/from_schema.ts @@ -1,4 +1,7 @@ import * as Type from "@alkdev/typebox"; +import { getLogger } from "@logtape/logtape"; + +const logger = getLogger("operations:from_schema"); const IsExact = (value: unknown, expect: unknown) => value === expect; const IsSValue = (value: unknown): value is SValue => @@ -111,5 +114,6 @@ export function FromSchema(T: T): Type.TSchema { if (IsSInteger(T)) return Type.Integer(T); if (IsSBoolean(T)) return Type.Boolean(T); if (IsSNull(T)) return Type.Null(T); + logger.warn(`Falling back to Type.Unknown for unrecognized schema: ${JSON.stringify(T)}`); return Type.Unknown(T || {}); } \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 6ed5b27..1c04c0e 100644 --- a/src/index.ts +++ b/src/index.ts @@ -6,8 +6,6 @@ export { formatValueErrors, assertIsSchema, validateOrThrow, collectErrors } fro export { buildEnv } from "./env.js"; export type { EnvOptions } from "./env.js"; export { FromSchema } from "./from_schema.js"; -export { FromOpenAPI, FromOpenAPIFile, FromOpenAPIUrl } from "./from_openapi.js"; -export type { OpenAPISpec, OpenAPIOperation, OpenAPIParameter, HTTPServiceConfig, OpenAPIFS } from "./from_openapi.js"; export { scanOperations } from "./scanner.js"; export type { OperationManifest, ScannerFS } from "./scanner.js"; export { CallError, InfrastructureErrorCode, mapError } from "./error.js"; diff --git a/src/registry.ts b/src/registry.ts index 4734f73..4641119 100644 --- a/src/registry.ts +++ b/src/registry.ts @@ -1,3 +1,4 @@ +import { OperationType } from "./types.js"; import type { OperationContext, OperationSpec, OperationHandler, SubscriptionHandler, Identity, AccessControl } from "./types.js"; import { getLogger } from "@logtape/logtape"; import { Value } from "@alkdev/typebox/value"; @@ -40,6 +41,7 @@ export class OperationRegistry { const resolvedSpec: OperationSpec = { ...spec, inputSchema, outputSchema }; this.specs.set(id, resolvedSpec); if (handler) { + this.validateSubscriptionHandler(id, handler); this.handlers.set(id, handler); } logger.info(`Registered operation: ${id}`); @@ -60,10 +62,23 @@ export class OperationRegistry { logger.info(`Registered spec: ${id}`); } + private validateSubscriptionHandler(id: string, handler: OperationHandler | SubscriptionHandler): void { + const spec = this.specs.get(id)!; + if (spec.type === OperationType.SUBSCRIPTION) { + const tag = Object.prototype.toString.call(handler); + if (tag !== '[object AsyncGeneratorFunction]') { + throw new Error( + `Handler for SUBSCRIPTION operation "${id}" must be an async generator function (async function*), but got ${tag}`, + ); + } + } + } + registerHandler(id: string, handler: OperationHandler | SubscriptionHandler): void { if (!this.specs.has(id)) { throw new Error(`Cannot register handler for unknown operation: ${id}`); } + this.validateSubscriptionHandler(id, handler); this.handlers.set(id, handler); logger.info(`Registered handler: ${id}`); } diff --git a/src/response-envelope.ts b/src/response-envelope.ts index 7e043f7..857f9e7 100644 --- a/src/response-envelope.ts +++ b/src/response-envelope.ts @@ -134,7 +134,18 @@ export function isResponseEnvelope(value: unknown): value is ResponseEnvelope { const obj = value as Record if (!("data" in obj) || !("meta" in obj)) return false if (typeof obj.meta !== "object" || obj.meta === null) return false - return RESPONSE_SOURCES.includes((obj.meta as ResponseMeta).source as ResponseSource) + const meta = obj.meta as Record + if (!RESPONSE_SOURCES.includes(meta.source as ResponseSource)) return false + switch (meta.source) { + case "local": + return typeof meta.operationId === "string" && typeof meta.timestamp === "number" + case "http": + return typeof meta.statusCode === "number" + case "mcp": + return typeof meta.isError === "boolean" && Array.isArray(meta.content) + default: + return false + } } export function localEnvelope(data: T, operationId: string): ResponseEnvelope { diff --git a/src/subscribe.ts b/src/subscribe.ts index fa9ee73..8423d37 100644 --- a/src/subscribe.ts +++ b/src/subscribe.ts @@ -35,7 +35,17 @@ export async function* subscribe( validateOrThrow(spec.inputSchema, input, `Input validation failed for ${operationId}`); - const generator = handler(input, context) as AsyncGenerator; + const result: unknown = handler(input, context); + + if (result == null || typeof (result as Record)[Symbol.asyncIterator] !== "function") { + throw new CallError( + InfrastructureErrorCode.EXECUTION_ERROR, + `Subscription handler for "${operationId}" must return an async iterable (async generator), but got ${result === null ? "null" : typeof result}`, + { operationId }, + ); + } + + const generator = result as AsyncGenerator; try { for await (const value of generator) { diff --git a/src/types.ts b/src/types.ts index e5f7135..45da326 100644 --- a/src/types.ts +++ b/src/types.ts @@ -77,7 +77,7 @@ export type SubscriptionHandler< context: TContext, ) => AsyncGenerator; -export const OperationDefinitionSchema = Type.Object({ +export const OperationSpecSchema = Type.Object({ name: Type.String({ description: "Unique operation name" }), namespace: Type.String({ description: "Namespace for grouping (e.g., 'task', 'graph', 'user')", @@ -93,7 +93,6 @@ export const OperationDefinitionSchema = Type.Object({ outputSchema: Type.Unknown({ description: "json schema for output" }), errorSchemas: Type.Optional(Type.Array(ErrorDefinitionSchema)), accessControl: AccessControlSchema, - handler: Type.Unknown({ description: "Operation handler function" }), _meta: Type.Optional(Type.Record(Type.String(), Type.Unknown())), }); @@ -115,24 +114,12 @@ export interface OperationSpec< _meta?: Record; } -export const OperationSpecSchema = Type.Object({ - name: Type.String({ description: "Unique operation name" }), - namespace: Type.String({ - description: "Namespace for grouping (e.g., 'task', 'graph', 'user')", +export const OperationDefinitionSchema = Type.Intersect([ + OperationSpecSchema, + Type.Object({ + handler: Type.Unknown({ description: "Operation handler function" }), }), - version: Type.String({ description: "Semantic version (e.g., '1.0.0')" }), - type: Type.Enum(OperationType, { - description: "Operation type: query, mutation, or subscription", - }), - title: Type.Optional(Type.String({ description: "Human-readable title" })), - description: Type.String({ description: "Detailed description" }), - tags: Type.Optional(Type.Array(Type.String())), - inputSchema: Type.Unknown({ description: "json schema for input" }), - outputSchema: Type.Unknown({ description: "json schema for output" }), - errorSchemas: Type.Optional(Type.Array(ErrorDefinitionSchema)), - accessControl: AccessControlSchema, - _meta: Type.Optional(Type.Record(Type.String(), Type.Unknown())), -}); +]); export interface IOperationDefinition< TInput = unknown, diff --git a/test/env.test.ts b/test/env.test.ts index 2c27650..feaf64b 100644 --- a/test/env.test.ts +++ b/test/env.test.ts @@ -92,8 +92,9 @@ describe("buildEnv", () => { it("filters out SUBSCRIPTION operations", () => { const registry = new OperationRegistry(); registry.register(makeOperation("query")); + async function* subHandler(input: any) { yield { result: input.value }; } registry.register({ - ...makeOperation("onEvent"), + ...makeOperation("onEvent", subHandler), type: OperationType.SUBSCRIPTION, }); diff --git a/test/error.test.ts b/test/error.test.ts index a5e386e..fc1a726 100644 --- a/test/error.test.ts +++ b/test/error.test.ts @@ -50,6 +50,20 @@ describe("mapError", () => { expect(result.code).toBe("NOT_FOUND"); }); + it("does not false-positive on substring match (ITEM_NOT_FOUND vs NOT_FOUND)", () => { + const result = mapError(new Error("ITEM_NOT_FOUND: nope"), [ + { code: "NOT_FOUND", schema: {} }, + ]); + expect(result.code).toBe(InfrastructureErrorCode.EXECUTION_ERROR); + }); + + it("matches exact code equality", () => { + const result = mapError(new Error("NOT_FOUND"), [ + { code: "NOT_FOUND", schema: {} }, + ]); + expect(result.code).toBe("NOT_FOUND"); + }); + it("maps non-Error to UNKNOWN_ERROR", () => { const result = mapError("string error"); expect(result.code).toBe(InfrastructureErrorCode.UNKNOWN_ERROR); diff --git a/test/registry.test.ts b/test/registry.test.ts index a7ef543..d8d7b37 100644 --- a/test/registry.test.ts +++ b/test/registry.test.ts @@ -401,4 +401,112 @@ describe("OperationRegistry access control", () => { expect((error as CallError).message).toContain("identity required"); } }); +}); + +describe("OperationRegistry subscription handler validation", () => { + it("rejects non-async-generator handler for SUBSCRIPTION via registerHandler", () => { + const registry = new OperationRegistry(); + registry.registerSpec({ + name: "badSub", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "bad sub", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + }); + const regularAsyncFn = async () => "not a generator"; + expect(() => registry.registerHandler("test.badSub", regularAsyncFn as any)).toThrow( + /must be an async generator function/i, + ); + }); + + it("rejects synchronous function handler for SUBSCRIPTION via registerHandler", () => { + const registry = new OperationRegistry(); + registry.registerSpec({ + name: "syncSub", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "sync sub", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + }); + expect(() => registry.registerHandler("test.syncSub", (() => {}) as any)).toThrow( + /must be an async generator function/i, + ); + }); + + it("allows async generator function handler for SUBSCRIPTION via registerHandler", () => { + const registry = new OperationRegistry(); + registry.registerSpec({ + name: "goodSub", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "good sub", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + }); + async function* goodHandler(_input: unknown, _context: any) { + yield "event"; + } + expect(() => registry.registerHandler("test.goodSub", goodHandler as any)).not.toThrow(); + }); + + it("rejects non-async-generator handler for SUBSCRIPTION via register", () => { + const registry = new OperationRegistry(); + expect(() => + registry.register({ + name: "badRegSub", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "bad sub via register", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + handler: async () => "not a generator" as any, + }), + ).toThrow(/must be an async generator function/i); + }); + + it("allows async generator handler for SUBSCRIPTION via register", () => { + const registry = new OperationRegistry(); + async function* handler(_input: unknown, _context: any) { + yield "event"; + } + expect(() => + registry.register({ + name: "goodRegSub", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "good sub via register", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + handler, + }), + ).not.toThrow(); + }); + + it("allows regular async handler for QUERY via registerHandler", () => { + const registry = new OperationRegistry(); + registry.registerSpec({ + name: "queryOp", + namespace: "test", + version: "1.0.0", + type: OperationType.QUERY, + description: "query op", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + }); + const handler = async (_input: unknown, _context: any) => ({ result: "ok" }); + expect(() => registry.registerHandler("test.queryOp", handler as any)).not.toThrow(); + }); }); \ No newline at end of file diff --git a/test/response-envelope.test.ts b/test/response-envelope.test.ts index 3120a26..b4dca1d 100644 --- a/test/response-envelope.test.ts +++ b/test/response-envelope.test.ts @@ -240,6 +240,46 @@ describe("isResponseEnvelope", () => { it("returns false for object with numeric meta", () => { expect(isResponseEnvelope({ data: "hello", meta: 42 })).toBe(false); }); + + it("returns false for local envelope missing operationId", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "local", timestamp: Date.now() } })).toBe(false); + }); + + it("returns false for local envelope with non-string operationId", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "local", operationId: 123, timestamp: Date.now() } })).toBe(false); + }); + + it("returns false for local envelope missing timestamp", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "local", operationId: "op.test" } })).toBe(false); + }); + + it("returns false for local envelope with non-number timestamp", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "local", operationId: "op.test", timestamp: "now" } })).toBe(false); + }); + + it("returns false for http envelope missing statusCode", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "http", headers: {}, contentType: "text/plain" } })).toBe(false); + }); + + it("returns false for http envelope with non-number statusCode", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "http", statusCode: "200", headers: {}, contentType: "text/plain" } })).toBe(false); + }); + + it("returns false for mcp envelope missing isError", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "mcp", content: [] } })).toBe(false); + }); + + it("returns false for mcp envelope with non-boolean isError", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "mcp", isError: "true", content: [] } })).toBe(false); + }); + + it("returns false for mcp envelope missing content", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "mcp", isError: false } })).toBe(false); + }); + + it("returns false for mcp envelope with non-array content", () => { + expect(isResponseEnvelope({ data: "hello", meta: { source: "mcp", isError: true, content: "error" } })).toBe(false); + }); }); describe("unwrap", () => { diff --git a/test/subscribe.test.ts b/test/subscribe.test.ts index df8c107..4457e6d 100644 --- a/test/subscribe.test.ts +++ b/test/subscribe.test.ts @@ -380,4 +380,85 @@ describe("subscribe", () => { expect(results).toHaveLength(1); expect(results[0].data).toBe("secret-event"); }); + + it("throws CallError when handler returns a non-async-iterable (plain async function)", async () => { + const registry = new OperationRegistry(); + registry.registerSpec({ + name: "badSub", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "bad sub", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + }); + const plainAsyncFn = async (_input: unknown, _context: OperationContext) => "not a generator"; + (registry as any).handlers.set("test.badSub", plainAsyncFn); + + try { + for await (const _ of subscribe(registry, "test.badSub", {}, makeContext())) { + expect.fail("Should have thrown"); + } + expect.fail("Should have thrown"); + } catch (error) { + expect(error).toBeInstanceOf(CallError); + expect((error as CallError).code).toBe(InfrastructureErrorCode.EXECUTION_ERROR); + expect((error as CallError).message).toContain("must return an async iterable"); + } + }); + + it("throws CallError when handler returns null", async () => { + const registry = new OperationRegistry(); + registry.registerSpec({ + name: "nullSub", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "null sub", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + }); + const nullFn = (_input: unknown, _context: OperationContext): any => null; + (registry as any).handlers.set("test.nullSub", nullFn); + + try { + for await (const _ of subscribe(registry, "test.nullSub", {}, makeContext())) { + expect.fail("Should have thrown"); + } + expect.fail("Should have thrown"); + } catch (error) { + expect(error).toBeInstanceOf(CallError); + expect((error as CallError).code).toBe(InfrastructureErrorCode.EXECUTION_ERROR); + expect((error as CallError).message).toContain("must return an async iterable"); + } + }); + + it("throws CallError when handler returns a plain object (non-iterable)", async () => { + const registry = new OperationRegistry(); + registry.registerSpec({ + name: "objSub", + namespace: "test", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "obj sub", + inputSchema: Type.Object({}), + outputSchema: Type.Unknown(), + accessControl: { requiredScopes: [] }, + }); + const objFn = (_input: unknown, _context: OperationContext): any => ({ not: "iterable" }); + (registry as any).handlers.set("test.objSub", objFn); + + try { + for await (const _ of subscribe(registry, "test.objSub", {}, makeContext())) { + expect.fail("Should have thrown"); + } + expect.fail("Should have thrown"); + } catch (error) { + expect(error).toBeInstanceOf(CallError); + expect((error as CallError).code).toBe(InfrastructureErrorCode.EXECUTION_ERROR); + expect((error as CallError).message).toContain("must return an async iterable"); + } + }); }); \ No newline at end of file diff --git a/tsup.config.ts b/tsup.config.ts index 7d133d9..fb9e721 100644 --- a/tsup.config.ts +++ b/tsup.config.ts @@ -5,6 +5,7 @@ export default defineConfig({ 'src/index.ts', 'src/from_mcp.ts', 'src/from_typemap.ts', + 'src/from_openapi.ts', ], format: ['esm', 'cjs'], dts: true,