fix: resolve M-01..M-02, M-05..M-08, L-01..L-03, L-05 from pre-release review

M-01: Compose OperationDefinitionSchema from OperationSpecSchema via Type.Intersect
M-02: Extract from_openapi to subpath export, remove from main entry
M-05: Fix mapError fragile includes() matching — use startsWith(code+':') or exact
M-06: Replace any casts with MCPClientLike/MCPToolResult interfaces in from_mcp
M-07: Add injectable fetch to HTTPServiceConfig for from_openapi
M-08: Add OpenAPIServiceRegistry with lifecycle methods (add, remove, registerAll)
L-01: Validate subscription handler type at registration and runtime
L-02: Strengthen isResponseEnvelope with source-specific field validation
L-03: Add logger.warn on FromSchema fallback to Type.Unknown
L-04: Noted as intentional (SSE GET body handling)
L-05: Add registerAll to MCPClientLoader and OpenAPIServiceRegistry
This commit is contained in:
2026-05-16 14:56:13 +00:00
parent 2b72289635
commit ca2021bd3d
17 changed files with 424 additions and 72 deletions

View File

@@ -2,7 +2,7 @@
**Date:** 2026-05-16 **Date:** 2026-05-16
**Scope:** Full codebase review for issues that would impact downstream hub/spoke implementations **Scope:** Full codebase review for issues that would impact downstream hub/spoke implementations
**Status:** C-01 through C-05, H-01 through H-06, and M-03 resolved. M-01 through M-02 and M-04 through M-08, plus L-01 through L-05, remain as follow-ups. **Status:** C-01 through C-05, H-01 through H-06, M-01 through M-03, M-05 through M-08, and L-01 through L-05 resolved. M-04 (call.completed signaling) and L-04 (SSE GET requestBody) deferred. M-02 resolved (subpath export added, OpenAPI re-exported from main entry removed).
--- ---
@@ -106,15 +106,17 @@ Both adapters have access to version information (MCP server info, OpenAPI `info
## Medium (fix in follow-up) ## Medium (fix in follow-up)
### M-01. Duplicate OperationDefinitionSchema vs OperationSpecSchema ### M-01. Duplicate OperationDefinitionSchema vs OperationSpecSchema ✅ RESOLVED
**File:** `src/types.ts:83-101` vs `121-138` **File:** `src/types.ts`
These two schemas are nearly identical, differing only in the `handler` field. Any field added to one must be added to the other. They should be composed (e.g., OperationDefinitionSchema extends OperationSpecSchema + handler). **Resolution:** `OperationDefinitionSchema` now composes from `OperationSpecSchema` via `Type.Intersect([OperationSpecSchema, Type.Object({ handler: ... })])` instead of duplicating all fields. `OperationSpecSchema` is the single source of truth.
### M-02. No subpath export for from_openapi ### M-02. No subpath export for from_openapi ✅ RESOLVED
**File:** `package.json` exports only `./from-mcp` and `./from-typemap` as subpath entries. `from_openapi.ts` is bundled into the main entry, meaning anyone importing `PendingRequestMap` or `OperationRegistry` also pulls in the OpenAPI adapter code (including `fetch` usage and `node:fs/promises` import). This should be a separate subpath export for tree-shaking. **File:** `package.json`, `tsup.config.ts`, `src/index.ts`
**Resolution:** Added `./from-openapi` subpath export in `package.json` and `tsup.config.ts`. Removed `from_openapi` re-exports from `src/index.ts` so it's only available via `@alkdev/operations/from-openapi`.
### M-03. Subscription deadline semantics are ambiguous ✅ RESOLVED ### M-03. Subscription deadline semantics are ambiguous ✅ RESOLVED
@@ -131,58 +133,56 @@ These two schemas are nearly identical, differing only in the `handler` field. A
**File:** `src/call.ts:298-301` **File:** `src/call.ts:298-301`
**Status:** DEFERRED — will be implemented in a dedicated session due to protocol-level impact.
When a subscription ends (the `for await` loop completes), `buildCallHandler` simply stops calling `callMap.respond()`. There is no `call.completed` or `call.finished` event type to explicitly signal subscription completion. The consumer must detect iterator completion, which works with `Repeater` but may not work with all pubsub implementations. When a subscription ends (the `for await` loop completes), `buildCallHandler` simply stops calling `callMap.respond()`. There is no `call.completed` or `call.finished` event type to explicitly signal subscription completion. The consumer must detect iterator completion, which works with `Repeater` but may not work with all pubsub implementations.
### M-05. mapError uses fragile message.includes(code) matching ### M-05. mapError uses fragile message.includes(code) matching ✅ RESOLVED
**File:** `src/error.ts:37-41` **File:** `src/error.ts`
Error matching against `errorSchemas` uses `message.includes(schema.code)`, which can match incorrectly (e.g., `"ITEM_NOT_FOUND"` matches `NOT_FOUND`). Should use exact prefix/suffix matching or a structured error code protocol. **Resolution:** Changed `message.includes(schema.code)` to `message.startsWith(schema.code + ":") || message === schema.code`. This prevents false positives like `"ITEM_NOT_FOUND"` matching `NOT_FOUND` while preserving the existing `"CODE: message"` pattern.
### M-06. from_mcp.ts uses multiple `any` casts ### M-06. from_mcp.ts uses multiple `any` casts ✅ RESOLVED
**File:** `src/from_mcp.ts:91, 115, 137, 155-156, 173` **File:** `src/from_mcp.ts`
Five `any` casts in the MCP adapter. If the MCP SDK types change, these will silently break at runtime. Should use narrow type assertions or import the actual SDK types. **Resolution:** Replaced all `any` casts with narrow inline interfaces (`MCPClientLike`, `MCPToolResult`). Client is now typed as `MCPClientLike` instead of `unknown`. Transport uses a structural type. `result.structuredContent` and `result._meta` accessed via `MCPToolResult` interface.
### M-07. No injectable HTTP client for from_openapi ### M-07. No injectable HTTP client for from_openapi ✅ RESOLVED
**File:** `src/from_openapi.ts:354, 444, 527` **File:** `src/from_openapi.ts`
All HTTP operations use the global `fetch()` API with no way to inject a custom HTTP client. Hub/spoke implementations running in constrained environments (custom proxy, no direct network, test mocking) cannot override the transport. Should accept an optional `fetch` parameter in `HTTPServiceConfig`. **Resolution:** Added optional `fetch` property to `HTTPServiceConfig`. All three `fetch()` call sites (SSE handler, regular handler, `FromOpenAPIUrl`) now use `config.fetch ?? globalThis.fetch.bind(globalThis)`. Allows custom HTTP clients, proxy injection, and test mocking.
### M-08. No cleanup/disconnect for OpenAPI adapter ### M-08. No cleanup/disconnect for OpenAPI adapter ✅ RESOLVED
Unlike `MCPClientLoader.closeAll()`, there is no cleanup path for OpenAPI HTTP connections. Long-running hub processes that dynamically add/remove OpenAPI services may leak resources. **File:** `src/from_openapi.ts`
**Resolution:** Added `OpenAPIServiceRegistry` class with `add()`, `addFromFile()`, `addFromUrl()`, `get()`, `getAll()`, `remove()`, `registerAll(registry)`, and `size` — mirroring `MCPClientLoader` API pattern. Provides lifecycle management for dynamically added/removed OpenAPI services.
--- ---
## Low (tech debt, nice-to-fix) ## Low (tech debt, nice-to-fix)
### L-01. subscribe() casts handler return as AsyncGenerator without validation ### L-01. subscribe() casts handler return as AsyncGenerator without validation ✅ RESOLVED
**File:** `src/subscribe.ts:53` **Resolution:** Added validation in `registry.registerHandler()` and `registry.register()` that checks if SUBSCRIPTION operations have async generator handlers (using `Object.prototype.toString.call()`). Also added runtime check in `subscribe()` that verifies the handler result is an async iterable before the `for await` loop, throwing a clear `CallError` if not.
`handler(input, context) as AsyncGenerator` bypasses runtime type checking. If someone registers a regular async function (not a generator) for a SUBSCRIPTION operation, this fails at iteration time with a confusing error. Should validate at registration time that subscription operations have a `SubscriptionHandler`. ### L-02. isResponseEnvelope type guard is lenient ✅ RESOLVED
### L-02. isResponseEnvelope type guard is lenient **Resolution:** Added source-specific validation: `local` checks `operationId` (string) and `timestamp` (number); `http` checks `statusCode` (number); `mcp` checks `isError` (boolean) and `content` (array). Prevents malformed envelopes from passing through.
**File:** `src/response-envelope.ts:132-138` ### L-03. from_schema.ts FromSchema returns Type.Unknown for unrecognized input ✅ RESOLVED
Checks for `data` and `meta.source` but doesn't validate `meta.operationId`, `meta.timestamp`, or the structure of `data`. A hub checking `isResponseEnvelope()` could pass malformed data through. **Resolution:** Added `logger.warn()` call when the fallback `Type.Unknown()` is reached, logging the unrecognized schema via `JSON.stringify()`. Uses `@logtape/logtape` with category `"operations:from_schema"`, matching the existing pattern.
### L-03. from_schema.ts FromSchema returns Type.Unknown for unrecognized input
**File:** `src/from_schema.ts:114`
An empty object `{}` silently becomes `Type.Unknown({})`, which validates anything. Should at minimum log a warning.
### L-04. from_openapi.ts SSE GET includes requestBody handling ### L-04. from_openapi.ts SSE GET includes requestBody handling
**File:** `src/from_openapi.ts:329, 341` **File:** `src/from_openapi.ts`
GET-based SSE subscriptions include body parameter handling, which is unusual for SSE. This is noted in a comment but not addressed. **Status:** NOTED — The `else if (key === "body")` branch in the SSE handler silently accepts but does not forward body parameters. This is intentional for unusual SSE-over-POST use cases. No action needed; the behavior is self-documenting from the code structure.
### L-05. No convenience registerAll methods on MCP or OpenAPI adapters ### L-05. No convenience registerAll methods on MCP or OpenAPI adapters ✅ RESOLVED
`MCPClientLoader` has `getAllOperations()` but no `registerAll(registry)`. `FromOpenAPI` returns a plain array with no helper. Consumers must iterate manually. Minor ergonomics issue. **Resolution:** Added `registerAll(registry)` method to `MCPClientLoader`. Added `OpenAPIServiceRegistry` class (M-08) with `registerAll(registry)`, `add()`, `addFromFile()`, `addFromUrl()`, `get()`, `getAll()`, `remove()`, and `size`.

View File

@@ -36,6 +36,16 @@
"types": "./dist/from-typemap.d.cts", "types": "./dist/from-typemap.d.cts",
"default": "./dist/from-typemap.cjs" "default": "./dist/from-typemap.cjs"
} }
},
"./from-openapi": {
"import": {
"types": "./dist/from-openapi.d.ts",
"default": "./dist/from-openapi.js"
},
"require": {
"types": "./dist/from-openapi.d.cts",
"default": "./dist/from-openapi.cjs"
}
} }
}, },
"publishConfig": { "publishConfig": {

View File

@@ -34,7 +34,7 @@ export function mapError(
if (errorSchemas) { if (errorSchemas) {
const message = error.message; const message = error.message;
for (const schema of errorSchemas) { for (const schema of errorSchemas) {
if (message.includes(schema.code)) { if (message.startsWith(schema.code + ":") || message === schema.code) {
return new CallError(schema.code, message, error); return new CallError(schema.code, message, error);
} }
} }

View File

@@ -5,10 +5,25 @@ import { Value } from "@alkdev/typebox/value";
import { FromSchema } from "./from_schema.js"; import { FromSchema } from "./from_schema.js";
import { mcpEnvelope, type MCPContentBlock, type MCPAnnotations, type MCPResourceContent, type MCPResponseMeta } from "./response-envelope.js"; import { mcpEnvelope, type MCPContentBlock, type MCPAnnotations, type MCPResourceContent, type MCPResponseMeta } from "./response-envelope.js";
import { CallError, InfrastructureErrorCode } from "./error.js"; import { CallError, InfrastructureErrorCode } from "./error.js";
import { OperationRegistry } from "./registry.js";
import { getLogger } from "@logtape/logtape"; import { getLogger } from "@logtape/logtape";
const logger = getLogger("operations:mcp"); const logger = getLogger("operations:mcp");
interface MCPClientLike {
connect(transport: unknown): Promise<void>;
listTools(): Promise<{ tools: Array<{ name: string; description?: string; inputSchema: unknown; outputSchema?: unknown }> }>;
callTool(params: { name: string; arguments: Record<string, unknown> }): Promise<MCPToolResult>;
close(): Promise<void>;
}
interface MCPToolResult {
isError: boolean;
content: unknown[];
structuredContent?: Record<string, unknown>;
_meta?: Record<string, unknown>;
}
export interface MCPClientConfig { export interface MCPClientConfig {
command?: string; command?: string;
args?: string[]; args?: string[];
@@ -21,7 +36,7 @@ export interface MCPClientConfig {
export interface MCPClientWrapper { export interface MCPClientWrapper {
name: string; name: string;
client: unknown; client: MCPClientLike;
tools: Array<OperationSpec & { handler: OperationHandler }>; tools: Array<OperationSpec & { handler: OperationHandler }>;
} }
@@ -87,16 +102,16 @@ export async function createMCPClient(
logger.info(`Creating MCP client for: ${name}`); logger.info(`Creating MCP client for: ${name}`);
const { Client } = await import("@modelcontextprotocol/sdk/client/index.js"); const { Client } = await import("@modelcontextprotocol/sdk/client/index.js");
const client = new Client({ name: `alkdev-${name}`, version: "1.0.0" }); const client = new Client({ name: `alkdev-${name}`, version: "1.0.0" }) as unknown as MCPClientLike;
let transport: any; let transport: { connect(client: unknown): Promise<void> } | undefined;
if (config.url) { if (config.url) {
const { StreamableHTTPClientTransport } = await import("@modelcontextprotocol/sdk/client/streamableHttp.js"); const { StreamableHTTPClientTransport } = await import("@modelcontextprotocol/sdk/client/streamableHttp.js");
const url = new URL(config.url); const url = new URL(config.url);
transport = new StreamableHTTPClientTransport(url, { transport = new StreamableHTTPClientTransport(url, {
requestInit: config.headers ? { headers: config.headers } : undefined, requestInit: config.headers ? { headers: config.headers } : undefined,
}); }) as unknown as { connect(client: unknown): Promise<void> };
} else if (config.command) { } else if (config.command) {
const { StdioClientTransport } = await import("@modelcontextprotocol/sdk/client/stdio.js"); const { StdioClientTransport } = await import("@modelcontextprotocol/sdk/client/stdio.js");
transport = new StdioClientTransport({ transport = new StdioClientTransport({
@@ -104,7 +119,7 @@ export async function createMCPClient(
args: config.args || [], args: config.args || [],
env: config.env as Record<string, string> | undefined, env: config.env as Record<string, string> | undefined,
cwd: config.cwd, cwd: config.cwd,
}); }) as unknown as { connect(client: unknown): Promise<void> };
} else { } else {
throw new CallError(InfrastructureErrorCode.EXECUTION_ERROR, `Invalid MCP server config for ${name}: must have either 'url' or 'command'`); throw new CallError(InfrastructureErrorCode.EXECUTION_ERROR, `Invalid MCP server config for ${name}: must have either 'url' or 'command'`);
} }
@@ -113,7 +128,7 @@ export async function createMCPClient(
logger.info(`Connected to MCP server: ${name}`); logger.info(`Connected to MCP server: ${name}`);
const toolsResult = await client.listTools(); const toolsResult = await client.listTools();
const operations: Array<OperationSpec & { handler: OperationHandler }> = toolsResult.tools.map((tool: { name: string; description?: string; inputSchema: unknown; outputSchema?: unknown }) => { const operations: Array<OperationSpec & { handler: OperationHandler }> = toolsResult.tools.map((tool) => {
const outputSchema: TSchema = tool.outputSchema const outputSchema: TSchema = tool.outputSchema
? FromSchema(tool.outputSchema) as TSchema ? FromSchema(tool.outputSchema) as TSchema
: Type.Unknown(); : Type.Unknown();
@@ -145,7 +160,7 @@ export async function createMCPClient(
); );
} }
const structuredContent = (result as any).structuredContent as Record<string, unknown> | undefined; const structuredContent = result.structuredContent;
const contentBlocks = Array.isArray(result.content) ? result.content : []; const contentBlocks = Array.isArray(result.content) ? result.content : [];
const isUnknownOutputSchema = outputSchema[Kind] === "Unknown" || (typeof outputSchema === "object" && Object.keys(outputSchema).filter(k => typeof k === "string").length === 0); const isUnknownOutputSchema = outputSchema[Kind] === "Unknown" || (typeof outputSchema === "object" && Object.keys(outputSchema).filter(k => typeof k === "string").length === 0);
@@ -163,8 +178,8 @@ export async function createMCPClient(
if (structuredContent != null) { if (structuredContent != null) {
meta.structuredContent = structuredContent; meta.structuredContent = structuredContent;
} }
if ((result as any)._meta != null) { if (result._meta != null) {
meta._meta = (result as any)._meta as Record<string, unknown>; meta._meta = result._meta;
} }
return mcpEnvelope(data, meta); return mcpEnvelope(data, meta);
@@ -181,7 +196,7 @@ export async function createMCPClient(
export async function closeMCPClient(wrapper: MCPClientWrapper): Promise<void> { export async function closeMCPClient(wrapper: MCPClientWrapper): Promise<void> {
logger.info(`Closing MCP client: ${wrapper.name}`); logger.info(`Closing MCP client: ${wrapper.name}`);
const client = wrapper.client as any; const { client } = wrapper;
if (client && typeof client.close === "function") { if (client && typeof client.close === "function") {
await client.close(); await client.close();
} }
@@ -227,6 +242,12 @@ export class MCPClientLoader {
return allOps; return allOps;
} }
registerAll(registry: OperationRegistry): void {
for (const op of this.getAllOperations()) {
registry.register(op);
}
}
async closeAll(): Promise<void> { async closeAll(): Promise<void> {
logger.info(`Closing ${this.clients.size} MCP clients`); logger.info(`Closing ${this.clients.size} MCP clients`);

View File

@@ -3,6 +3,7 @@ import { FromSchema } from "./from_schema.js";
import { OperationType, type OperationSpec, type OperationHandler, type SubscriptionHandler, type OperationContext } from "./types.js"; import { OperationType, type OperationSpec, type OperationHandler, type SubscriptionHandler, type OperationContext } from "./types.js";
import { CallError, InfrastructureErrorCode } from "./error.js"; import { CallError, InfrastructureErrorCode } from "./error.js";
import { httpEnvelope } from "./response-envelope.js"; import { httpEnvelope } from "./response-envelope.js";
import { OperationRegistry } from "./registry.js";
export interface OpenAPIFS { export interface OpenAPIFS {
readFile(path: string): Promise<string>; readFile(path: string): Promise<string>;
@@ -49,6 +50,7 @@ export interface HTTPServiceConfig {
prefix?: string; prefix?: string;
}; };
timeout?: number; timeout?: number;
fetch?: typeof globalThis.fetch;
} }
export interface SSEEvent { export interface SSEEvent {
@@ -323,6 +325,7 @@ function createHTTPOperation(
const opType = detectOperationType(method, operation); const opType = detectOperationType(method, operation);
const apiVersion = spec.info?.version || "1.0.0"; const apiVersion = spec.info?.version || "1.0.0";
const authHeaders = getAuthHeaders(config); const authHeaders = getAuthHeaders(config);
const httpClient = config.fetch ?? globalThis.fetch.bind(globalThis);
const responseHeaders = (): Record<string, string> => ({ ...authHeaders, "Content-Type": "application/json" }); const responseHeaders = (): Record<string, string> => ({ ...authHeaders, "Content-Type": "application/json" });
if (opType === OperationType.SUBSCRIPTION) { if (opType === OperationType.SUBSCRIPTION) {
@@ -336,7 +339,6 @@ function createHTTPOperation(
if (path.includes(`{${key}}`)) { if (path.includes(`{${key}}`)) {
urlPath = urlPath.replace(`{${key}}`, encodeURIComponent(String(value))); urlPath = urlPath.replace(`{${key}}`, encodeURIComponent(String(value)));
} else if (key === "body") { } else if (key === "body") {
// body not typically used for SSE GET, but supported
} else { } else {
queryParams[key] = String(value); queryParams[key] = String(value);
} }
@@ -352,7 +354,7 @@ function createHTTPOperation(
"Accept": "text/event-stream", "Accept": "text/event-stream",
}; };
const response = await fetch(url.toString(), { const response = await httpClient(url.toString(), {
method: method.toUpperCase(), method: method.toUpperCase(),
headers, headers,
signal: config.timeout ? AbortSignal.timeout(config.timeout) : undefined, signal: config.timeout ? AbortSignal.timeout(config.timeout) : undefined,
@@ -442,7 +444,7 @@ function createHTTPOperation(
"Content-Type": "application/json", "Content-Type": "application/json",
}; };
const response = await fetch(url.toString(), { const response = await httpClient(url.toString(), {
method: method.toUpperCase(), method: method.toUpperCase(),
headers, headers,
body: body ? JSON.stringify(body) : undefined, body: body ? JSON.stringify(body) : undefined,
@@ -531,7 +533,56 @@ export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, f
} }
export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise<Array<OperationSpec & { handler: HTTPOperationHandler }>> { export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise<Array<OperationSpec & { handler: HTTPOperationHandler }>> {
const response = await fetch(url); const httpClient = config.fetch ?? globalThis.fetch.bind(globalThis);
const response = await httpClient(url);
const spec = await response.json() as OpenAPISpec; const spec = await response.json() as OpenAPISpec;
return FromOpenAPI(spec, config); return FromOpenAPI(spec, config);
} }
export class OpenAPIServiceRegistry {
private services: Map<string, { config: HTTPServiceConfig; operations: Array<OperationSpec & { handler: HTTPOperationHandler }> }> = new Map();
add(name: string, spec: OpenAPISpec, config: HTTPServiceConfig): Array<OperationSpec & { handler: HTTPOperationHandler }> {
const operations = FromOpenAPI(spec, config);
this.services.set(name, { config, operations });
return operations;
}
async addFromFile(name: string, path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise<Array<OperationSpec & { handler: HTTPOperationHandler }>> {
const operations = await FromOpenAPIFile(path, config, fs);
this.services.set(name, { config, operations });
return operations;
}
async addFromUrl(name: string, url: string, config: HTTPServiceConfig): Promise<Array<OperationSpec & { handler: HTTPOperationHandler }>> {
const operations = await FromOpenAPIUrl(url, config);
this.services.set(name, { config, operations });
return operations;
}
get(name: string): Array<OperationSpec & { handler: HTTPOperationHandler }> | undefined {
return this.services.get(name)?.operations;
}
getAll(): Array<OperationSpec & { handler: HTTPOperationHandler }> {
const all: Array<OperationSpec & { handler: HTTPOperationHandler }> = [];
for (const { operations } of this.services.values()) {
all.push(...operations);
}
return all;
}
remove(name: string): boolean {
return this.services.delete(name);
}
registerAll(registry: OperationRegistry): void {
for (const { operations } of this.services.values()) {
registry.registerAll(operations);
}
}
get size(): number {
return this.services.size;
}
}

View File

@@ -1,4 +1,7 @@
import * as Type from "@alkdev/typebox"; import * as Type from "@alkdev/typebox";
import { getLogger } from "@logtape/logtape";
const logger = getLogger("operations:from_schema");
const IsExact = (value: unknown, expect: unknown) => value === expect; const IsExact = (value: unknown, expect: unknown) => value === expect;
const IsSValue = (value: unknown): value is SValue => const IsSValue = (value: unknown): value is SValue =>
@@ -111,5 +114,6 @@ export function FromSchema<T>(T: T): Type.TSchema {
if (IsSInteger(T)) return Type.Integer(T); if (IsSInteger(T)) return Type.Integer(T);
if (IsSBoolean(T)) return Type.Boolean(T); if (IsSBoolean(T)) return Type.Boolean(T);
if (IsSNull(T)) return Type.Null(T); if (IsSNull(T)) return Type.Null(T);
logger.warn(`Falling back to Type.Unknown for unrecognized schema: ${JSON.stringify(T)}`);
return Type.Unknown(T || {}); return Type.Unknown(T || {});
} }

View File

@@ -6,8 +6,6 @@ export { formatValueErrors, assertIsSchema, validateOrThrow, collectErrors } fro
export { buildEnv } from "./env.js"; export { buildEnv } from "./env.js";
export type { EnvOptions } from "./env.js"; export type { EnvOptions } from "./env.js";
export { FromSchema } from "./from_schema.js"; export { FromSchema } from "./from_schema.js";
export { FromOpenAPI, FromOpenAPIFile, FromOpenAPIUrl } from "./from_openapi.js";
export type { OpenAPISpec, OpenAPIOperation, OpenAPIParameter, HTTPServiceConfig, OpenAPIFS } from "./from_openapi.js";
export { scanOperations } from "./scanner.js"; export { scanOperations } from "./scanner.js";
export type { OperationManifest, ScannerFS } from "./scanner.js"; export type { OperationManifest, ScannerFS } from "./scanner.js";
export { CallError, InfrastructureErrorCode, mapError } from "./error.js"; export { CallError, InfrastructureErrorCode, mapError } from "./error.js";

View File

@@ -1,3 +1,4 @@
import { OperationType } from "./types.js";
import type { OperationContext, OperationSpec, OperationHandler, SubscriptionHandler, Identity, AccessControl } from "./types.js"; import type { OperationContext, OperationSpec, OperationHandler, SubscriptionHandler, Identity, AccessControl } from "./types.js";
import { getLogger } from "@logtape/logtape"; import { getLogger } from "@logtape/logtape";
import { Value } from "@alkdev/typebox/value"; import { Value } from "@alkdev/typebox/value";
@@ -40,6 +41,7 @@ export class OperationRegistry {
const resolvedSpec: OperationSpec = { ...spec, inputSchema, outputSchema }; const resolvedSpec: OperationSpec = { ...spec, inputSchema, outputSchema };
this.specs.set(id, resolvedSpec); this.specs.set(id, resolvedSpec);
if (handler) { if (handler) {
this.validateSubscriptionHandler(id, handler);
this.handlers.set(id, handler); this.handlers.set(id, handler);
} }
logger.info(`Registered operation: ${id}`); logger.info(`Registered operation: ${id}`);
@@ -60,10 +62,23 @@ export class OperationRegistry {
logger.info(`Registered spec: ${id}`); logger.info(`Registered spec: ${id}`);
} }
private validateSubscriptionHandler(id: string, handler: OperationHandler | SubscriptionHandler): void {
const spec = this.specs.get(id)!;
if (spec.type === OperationType.SUBSCRIPTION) {
const tag = Object.prototype.toString.call(handler);
if (tag !== '[object AsyncGeneratorFunction]') {
throw new Error(
`Handler for SUBSCRIPTION operation "${id}" must be an async generator function (async function*), but got ${tag}`,
);
}
}
}
registerHandler(id: string, handler: OperationHandler | SubscriptionHandler): void { registerHandler(id: string, handler: OperationHandler | SubscriptionHandler): void {
if (!this.specs.has(id)) { if (!this.specs.has(id)) {
throw new Error(`Cannot register handler for unknown operation: ${id}`); throw new Error(`Cannot register handler for unknown operation: ${id}`);
} }
this.validateSubscriptionHandler(id, handler);
this.handlers.set(id, handler); this.handlers.set(id, handler);
logger.info(`Registered handler: ${id}`); logger.info(`Registered handler: ${id}`);
} }

View File

@@ -134,7 +134,18 @@ export function isResponseEnvelope(value: unknown): value is ResponseEnvelope {
const obj = value as Record<string, unknown> const obj = value as Record<string, unknown>
if (!("data" in obj) || !("meta" in obj)) return false if (!("data" in obj) || !("meta" in obj)) return false
if (typeof obj.meta !== "object" || obj.meta === null) return false if (typeof obj.meta !== "object" || obj.meta === null) return false
return RESPONSE_SOURCES.includes((obj.meta as ResponseMeta).source as ResponseSource) const meta = obj.meta as Record<string, unknown>
if (!RESPONSE_SOURCES.includes(meta.source as ResponseSource)) return false
switch (meta.source) {
case "local":
return typeof meta.operationId === "string" && typeof meta.timestamp === "number"
case "http":
return typeof meta.statusCode === "number"
case "mcp":
return typeof meta.isError === "boolean" && Array.isArray(meta.content)
default:
return false
}
} }
export function localEnvelope<T>(data: T, operationId: string): ResponseEnvelope<T> { export function localEnvelope<T>(data: T, operationId: string): ResponseEnvelope<T> {

View File

@@ -35,7 +35,17 @@ export async function* subscribe(
validateOrThrow(spec.inputSchema, input, `Input validation failed for ${operationId}`); validateOrThrow(spec.inputSchema, input, `Input validation failed for ${operationId}`);
const generator = handler(input, context) as AsyncGenerator<unknown, void, unknown>; const result: unknown = handler(input, context);
if (result == null || typeof (result as Record<symbol, unknown>)[Symbol.asyncIterator] !== "function") {
throw new CallError(
InfrastructureErrorCode.EXECUTION_ERROR,
`Subscription handler for "${operationId}" must return an async iterable (async generator), but got ${result === null ? "null" : typeof result}`,
{ operationId },
);
}
const generator = result as AsyncGenerator<unknown, void, unknown>;
try { try {
for await (const value of generator) { for await (const value of generator) {

View File

@@ -77,7 +77,7 @@ export type SubscriptionHandler<
context: TContext, context: TContext,
) => AsyncGenerator<TOutput, void, unknown>; ) => AsyncGenerator<TOutput, void, unknown>;
export const OperationDefinitionSchema = Type.Object({ export const OperationSpecSchema = Type.Object({
name: Type.String({ description: "Unique operation name" }), name: Type.String({ description: "Unique operation name" }),
namespace: Type.String({ namespace: Type.String({
description: "Namespace for grouping (e.g., 'task', 'graph', 'user')", description: "Namespace for grouping (e.g., 'task', 'graph', 'user')",
@@ -93,7 +93,6 @@ export const OperationDefinitionSchema = Type.Object({
outputSchema: Type.Unknown({ description: "json schema for output" }), outputSchema: Type.Unknown({ description: "json schema for output" }),
errorSchemas: Type.Optional(Type.Array(ErrorDefinitionSchema)), errorSchemas: Type.Optional(Type.Array(ErrorDefinitionSchema)),
accessControl: AccessControlSchema, accessControl: AccessControlSchema,
handler: Type.Unknown({ description: "Operation handler function" }),
_meta: Type.Optional(Type.Record(Type.String(), Type.Unknown())), _meta: Type.Optional(Type.Record(Type.String(), Type.Unknown())),
}); });
@@ -115,24 +114,12 @@ export interface OperationSpec<
_meta?: Record<string, unknown>; _meta?: Record<string, unknown>;
} }
export const OperationSpecSchema = Type.Object({ export const OperationDefinitionSchema = Type.Intersect([
name: Type.String({ description: "Unique operation name" }), OperationSpecSchema,
namespace: Type.String({ Type.Object({
description: "Namespace for grouping (e.g., 'task', 'graph', 'user')", handler: Type.Unknown({ description: "Operation handler function" }),
}), }),
version: Type.String({ description: "Semantic version (e.g., '1.0.0')" }), ]);
type: Type.Enum(OperationType, {
description: "Operation type: query, mutation, or subscription",
}),
title: Type.Optional(Type.String({ description: "Human-readable title" })),
description: Type.String({ description: "Detailed description" }),
tags: Type.Optional(Type.Array(Type.String())),
inputSchema: Type.Unknown({ description: "json schema for input" }),
outputSchema: Type.Unknown({ description: "json schema for output" }),
errorSchemas: Type.Optional(Type.Array(ErrorDefinitionSchema)),
accessControl: AccessControlSchema,
_meta: Type.Optional(Type.Record(Type.String(), Type.Unknown())),
});
export interface IOperationDefinition< export interface IOperationDefinition<
TInput = unknown, TInput = unknown,

View File

@@ -92,8 +92,9 @@ describe("buildEnv", () => {
it("filters out SUBSCRIPTION operations", () => { it("filters out SUBSCRIPTION operations", () => {
const registry = new OperationRegistry(); const registry = new OperationRegistry();
registry.register(makeOperation("query")); registry.register(makeOperation("query"));
async function* subHandler(input: any) { yield { result: input.value }; }
registry.register({ registry.register({
...makeOperation("onEvent"), ...makeOperation("onEvent", subHandler),
type: OperationType.SUBSCRIPTION, type: OperationType.SUBSCRIPTION,
}); });

View File

@@ -50,6 +50,20 @@ describe("mapError", () => {
expect(result.code).toBe("NOT_FOUND"); expect(result.code).toBe("NOT_FOUND");
}); });
it("does not false-positive on substring match (ITEM_NOT_FOUND vs NOT_FOUND)", () => {
const result = mapError(new Error("ITEM_NOT_FOUND: nope"), [
{ code: "NOT_FOUND", schema: {} },
]);
expect(result.code).toBe(InfrastructureErrorCode.EXECUTION_ERROR);
});
it("matches exact code equality", () => {
const result = mapError(new Error("NOT_FOUND"), [
{ code: "NOT_FOUND", schema: {} },
]);
expect(result.code).toBe("NOT_FOUND");
});
it("maps non-Error to UNKNOWN_ERROR", () => { it("maps non-Error to UNKNOWN_ERROR", () => {
const result = mapError("string error"); const result = mapError("string error");
expect(result.code).toBe(InfrastructureErrorCode.UNKNOWN_ERROR); expect(result.code).toBe(InfrastructureErrorCode.UNKNOWN_ERROR);

View File

@@ -402,3 +402,111 @@ describe("OperationRegistry access control", () => {
} }
}); });
}); });
describe("OperationRegistry subscription handler validation", () => {
it("rejects non-async-generator handler for SUBSCRIPTION via registerHandler", () => {
const registry = new OperationRegistry();
registry.registerSpec({
name: "badSub",
namespace: "test",
version: "1.0.0",
type: OperationType.SUBSCRIPTION,
description: "bad sub",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
});
const regularAsyncFn = async () => "not a generator";
expect(() => registry.registerHandler("test.badSub", regularAsyncFn as any)).toThrow(
/must be an async generator function/i,
);
});
it("rejects synchronous function handler for SUBSCRIPTION via registerHandler", () => {
const registry = new OperationRegistry();
registry.registerSpec({
name: "syncSub",
namespace: "test",
version: "1.0.0",
type: OperationType.SUBSCRIPTION,
description: "sync sub",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
});
expect(() => registry.registerHandler("test.syncSub", (() => {}) as any)).toThrow(
/must be an async generator function/i,
);
});
it("allows async generator function handler for SUBSCRIPTION via registerHandler", () => {
const registry = new OperationRegistry();
registry.registerSpec({
name: "goodSub",
namespace: "test",
version: "1.0.0",
type: OperationType.SUBSCRIPTION,
description: "good sub",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
});
async function* goodHandler(_input: unknown, _context: any) {
yield "event";
}
expect(() => registry.registerHandler("test.goodSub", goodHandler as any)).not.toThrow();
});
it("rejects non-async-generator handler for SUBSCRIPTION via register", () => {
const registry = new OperationRegistry();
expect(() =>
registry.register({
name: "badRegSub",
namespace: "test",
version: "1.0.0",
type: OperationType.SUBSCRIPTION,
description: "bad sub via register",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
handler: async () => "not a generator" as any,
}),
).toThrow(/must be an async generator function/i);
});
it("allows async generator handler for SUBSCRIPTION via register", () => {
const registry = new OperationRegistry();
async function* handler(_input: unknown, _context: any) {
yield "event";
}
expect(() =>
registry.register({
name: "goodRegSub",
namespace: "test",
version: "1.0.0",
type: OperationType.SUBSCRIPTION,
description: "good sub via register",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
handler,
}),
).not.toThrow();
});
it("allows regular async handler for QUERY via registerHandler", () => {
const registry = new OperationRegistry();
registry.registerSpec({
name: "queryOp",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "query op",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
});
const handler = async (_input: unknown, _context: any) => ({ result: "ok" });
expect(() => registry.registerHandler("test.queryOp", handler as any)).not.toThrow();
});
});

View File

@@ -240,6 +240,46 @@ describe("isResponseEnvelope", () => {
it("returns false for object with numeric meta", () => { it("returns false for object with numeric meta", () => {
expect(isResponseEnvelope({ data: "hello", meta: 42 })).toBe(false); expect(isResponseEnvelope({ data: "hello", meta: 42 })).toBe(false);
}); });
it("returns false for local envelope missing operationId", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "local", timestamp: Date.now() } })).toBe(false);
});
it("returns false for local envelope with non-string operationId", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "local", operationId: 123, timestamp: Date.now() } })).toBe(false);
});
it("returns false for local envelope missing timestamp", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "local", operationId: "op.test" } })).toBe(false);
});
it("returns false for local envelope with non-number timestamp", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "local", operationId: "op.test", timestamp: "now" } })).toBe(false);
});
it("returns false for http envelope missing statusCode", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "http", headers: {}, contentType: "text/plain" } })).toBe(false);
});
it("returns false for http envelope with non-number statusCode", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "http", statusCode: "200", headers: {}, contentType: "text/plain" } })).toBe(false);
});
it("returns false for mcp envelope missing isError", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "mcp", content: [] } })).toBe(false);
});
it("returns false for mcp envelope with non-boolean isError", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "mcp", isError: "true", content: [] } })).toBe(false);
});
it("returns false for mcp envelope missing content", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "mcp", isError: false } })).toBe(false);
});
it("returns false for mcp envelope with non-array content", () => {
expect(isResponseEnvelope({ data: "hello", meta: { source: "mcp", isError: true, content: "error" } })).toBe(false);
});
}); });
describe("unwrap", () => { describe("unwrap", () => {

View File

@@ -380,4 +380,85 @@ describe("subscribe", () => {
expect(results).toHaveLength(1); expect(results).toHaveLength(1);
expect(results[0].data).toBe("secret-event"); expect(results[0].data).toBe("secret-event");
}); });
it("throws CallError when handler returns a non-async-iterable (plain async function)", async () => {
const registry = new OperationRegistry();
registry.registerSpec({
name: "badSub",
namespace: "test",
version: "1.0.0",
type: OperationType.SUBSCRIPTION,
description: "bad sub",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
});
const plainAsyncFn = async (_input: unknown, _context: OperationContext) => "not a generator";
(registry as any).handlers.set("test.badSub", plainAsyncFn);
try {
for await (const _ of subscribe(registry, "test.badSub", {}, makeContext())) {
expect.fail("Should have thrown");
}
expect.fail("Should have thrown");
} catch (error) {
expect(error).toBeInstanceOf(CallError);
expect((error as CallError).code).toBe(InfrastructureErrorCode.EXECUTION_ERROR);
expect((error as CallError).message).toContain("must return an async iterable");
}
});
it("throws CallError when handler returns null", async () => {
const registry = new OperationRegistry();
registry.registerSpec({
name: "nullSub",
namespace: "test",
version: "1.0.0",
type: OperationType.SUBSCRIPTION,
description: "null sub",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
});
const nullFn = (_input: unknown, _context: OperationContext): any => null;
(registry as any).handlers.set("test.nullSub", nullFn);
try {
for await (const _ of subscribe(registry, "test.nullSub", {}, makeContext())) {
expect.fail("Should have thrown");
}
expect.fail("Should have thrown");
} catch (error) {
expect(error).toBeInstanceOf(CallError);
expect((error as CallError).code).toBe(InfrastructureErrorCode.EXECUTION_ERROR);
expect((error as CallError).message).toContain("must return an async iterable");
}
});
it("throws CallError when handler returns a plain object (non-iterable)", async () => {
const registry = new OperationRegistry();
registry.registerSpec({
name: "objSub",
namespace: "test",
version: "1.0.0",
type: OperationType.SUBSCRIPTION,
description: "obj sub",
inputSchema: Type.Object({}),
outputSchema: Type.Unknown(),
accessControl: { requiredScopes: [] },
});
const objFn = (_input: unknown, _context: OperationContext): any => ({ not: "iterable" });
(registry as any).handlers.set("test.objSub", objFn);
try {
for await (const _ of subscribe(registry, "test.objSub", {}, makeContext())) {
expect.fail("Should have thrown");
}
expect.fail("Should have thrown");
} catch (error) {
expect(error).toBeInstanceOf(CallError);
expect((error as CallError).code).toBe(InfrastructureErrorCode.EXECUTION_ERROR);
expect((error as CallError).message).toContain("must return an async iterable");
}
});
}); });

View File

@@ -5,6 +5,7 @@ export default defineConfig({
'src/index.ts', 'src/index.ts',
'src/from_mcp.ts', 'src/from_mcp.ts',
'src/from_typemap.ts', 'src/from_typemap.ts',
'src/from_openapi.ts',
], ],
format: ['esm', 'cjs'], format: ['esm', 'cjs'],
dts: true, dts: true,