diff --git a/src/call.ts b/src/call.ts index 14fe489..59723f1 100644 --- a/src/call.ts +++ b/src/call.ts @@ -4,7 +4,7 @@ import { getLogger } from "@logtape/logtape"; import { OperationRegistry } from "./registry.js"; import { CallError, InfrastructureErrorCode, mapError } from "./error.js"; import { validateOrThrow } from "./validation.js"; -import type { IOperationDefinition, Identity, OperationContext, AccessControl } from "./types.js"; +import type { Identity, OperationContext, AccessControl, OperationSpec } from "./types.js"; const logger = getLogger("operations:call"); @@ -45,10 +45,10 @@ export type CallEventMapValue = CallRequestedEvent | CallRespondedEvent | CallAb export const CallEventMap = CallEventSchema; type CallPubSubMap = { - "call.requested": [CallRequestedEvent]; - "call.responded": [CallRespondedEvent]; - "call.aborted": [CallAbortedEvent]; - "call.error": [CallErrorEvent]; + "call.requested": CallRequestedEvent; + "call.responded": CallRespondedEvent; + "call.aborted": CallAbortedEvent; + "call.error": CallErrorEvent; }; interface PendingRequest { @@ -77,10 +77,10 @@ export class PendingRequestMap { } private setupSubscriptions(): void { - const respondedIter = this.pubsub.subscribe("call.responded"); + const respondedIter = this.pubsub.subscribe("call.responded", ""); (async () => { - for await (const event of respondedIter) { - const responded = event as CallRespondedEvent; + for await (const envelope of respondedIter) { + const responded = envelope.payload; const pending = this.requests.get(responded.requestId); if (pending) { if (pending.timer) clearTimeout(pending.timer); @@ -90,10 +90,10 @@ export class PendingRequestMap { } })(); - const errorIter = this.pubsub.subscribe("call.error"); + const errorIter = this.pubsub.subscribe("call.error", ""); (async () => { - for await (const event of errorIter) { - const err = event as CallErrorEvent; + for await (const envelope of errorIter) { + const err = envelope.payload; const pending = this.requests.get(err.requestId); if (pending) { if (pending.timer) clearTimeout(pending.timer); @@ -103,10 +103,10 @@ export class PendingRequestMap { } })(); - const abortedIter = this.pubsub.subscribe("call.aborted"); + const abortedIter = this.pubsub.subscribe("call.aborted", ""); (async () => { - for await (const event of abortedIter) { - const aborted = event as CallAbortedEvent; + for await (const envelope of abortedIter) { + const aborted = envelope.payload; const pending = this.requests.get(aborted.requestId); if (pending) { if (pending.timer) clearTimeout(pending.timer); @@ -137,7 +137,7 @@ export class PendingRequestMap { this.requests.set(requestId, pending); - this.pubsub.publish("call.requested", { + this.pubsub.publish("call.requested", "", { requestId, operationId, input, @@ -149,14 +149,14 @@ export class PendingRequestMap { } respond(requestId: string, output: unknown): void { - this.pubsub.publish("call.responded", { + this.pubsub.publish("call.responded", "", { requestId, output, }); } emitError(requestId: string, code: string, message: string, details?: unknown): void { - this.pubsub.publish("call.error", { + this.pubsub.publish("call.error", "", { requestId, code, message, @@ -169,7 +169,7 @@ export class PendingRequestMap { if (pending) { if (pending.timer) clearTimeout(pending.timer); this.requests.delete(requestId); - this.pubsub.publish("call.aborted", { requestId }); + this.pubsub.publish("call.aborted", "", { requestId }); pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${requestId} was aborted`)); } } @@ -186,9 +186,9 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler { const { requestId, operationId, input, identity } = event; try { - const operation = registry.get(operationId); + const spec = registry.getSpec(operationId); - if (!operation) { + if (!spec) { throw new CallError( InfrastructureErrorCode.OPERATION_NOT_FOUND, `Operation not found: ${operationId}`, @@ -196,7 +196,16 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler { ); } - const accessControl: AccessControl = operation.accessControl as AccessControl; + const handler = registry.getHandler(operationId); + if (!handler) { + throw new CallError( + InfrastructureErrorCode.OPERATION_NOT_FOUND, + `No handler registered for operation: ${operationId}`, + { operationId }, + ); + } + + const accessControl: AccessControl = spec.accessControl as AccessControl; if (identity && !checkAccess(accessControl, identity)) { throw new CallError( @@ -212,9 +221,9 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler { identity, }; - validateOrThrow(operation.inputSchema, input, `Input validation for ${operationId}`); + validateOrThrow(spec.inputSchema, input, `Input validation for ${operationId}`); - await operation.handler(input, context); + await handler(input, context); } catch (error) { const callError = mapError(error); diff --git a/src/env.ts b/src/env.ts index 34fee37..6f26fb9 100644 --- a/src/env.ts +++ b/src/env.ts @@ -18,34 +18,34 @@ export interface EnvOptions { export function buildEnv(options: EnvOptions): OperationEnv { const { registry, context, allowedNamespaces, callMap } = options; - const operations = registry.list(); + const specs = registry.getAllSpecs(); const namespaces: OperationEnv = {}; - for (const operation of operations) { - if (allowedNamespaces && !allowedNamespaces.includes(operation.namespace)) { + for (const spec of specs) { + if (allowedNamespaces && !allowedNamespaces.includes(spec.namespace)) { continue; } - if (operation.type === OperationType.SUBSCRIPTION) { + if (spec.type === OperationType.SUBSCRIPTION) { continue; } - if (!namespaces[operation.namespace]) { - namespaces[operation.namespace] = {}; + if (!namespaces[spec.namespace]) { + namespaces[spec.namespace] = {}; } - const operationId = `${operation.namespace}.${operation.name}`; + const operationId = `${spec.namespace}.${spec.name}`; if (callMap) { - namespaces[operation.namespace][operation.name] = async (input: unknown) => { + namespaces[spec.namespace][spec.name] = async (input: unknown) => { logger.debug(`Call protocol: ${operationId}`); return await callMap.call(operationId, input, { parentRequestId: context.requestId, }); }; } else { - namespaces[operation.namespace][operation.name] = async (input: unknown) => { + namespaces[spec.namespace][spec.name] = async (input: unknown) => { logger.debug(`Executing: ${operationId}`); return await registry.execute(operationId, input, context); }; diff --git a/src/from_mcp.ts b/src/from_mcp.ts index daedfe0..5117070 100644 --- a/src/from_mcp.ts +++ b/src/from_mcp.ts @@ -1,4 +1,4 @@ -import type { IOperationDefinition } from "./types.js"; +import type { OperationSpec, OperationHandler, OperationContext } from "./types.js"; import { OperationType } from "./types.js"; import { Type, type TSchema } from "@alkdev/typebox"; import { FromSchema } from "./from_schema.js"; @@ -18,7 +18,7 @@ export interface MCPClientConfig { export interface MCPClientWrapper { name: string; client: unknown; - tools: IOperationDefinition[]; + tools: Array; } export async function createMCPClient( @@ -54,7 +54,7 @@ export async function createMCPClient( logger.info(`Connected to MCP server: ${name}`); const toolsResult = await client.listTools(); - const operations: IOperationDefinition[] = toolsResult.tools.map((tool: { name: string; description?: string; inputSchema: unknown }) => { + const operations: Array = toolsResult.tools.map((tool: { name: string; description?: string; inputSchema: unknown }) => { return { name: tool.name, namespace: name, @@ -78,7 +78,7 @@ export async function createMCPClient( return result.content; }, - } satisfies IOperationDefinition; + } satisfies OperationSpec & { handler: OperationHandler }; }); return { @@ -126,8 +126,8 @@ export class MCPClientLoader { return Array.from(this.clients.values()); } - getAllOperations(): IOperationDefinition[] { - const allOps: IOperationDefinition[] = []; + getAllOperations(): Array { + const allOps: Array = []; for (const wrapper of this.clients.values()) { for (const op of wrapper.tools) { allOps.push(op); diff --git a/src/from_openapi.ts b/src/from_openapi.ts index 6733474..cac3cf0 100644 --- a/src/from_openapi.ts +++ b/src/from_openapi.ts @@ -1,6 +1,6 @@ import * as Type from "@alkdev/typebox"; import { FromSchema } from "./from_schema.js"; -import { OperationType, type IOperationDefinition, type OperationHandler, type OperationContext } from "./types.js"; +import { OperationType, type OperationSpec, type OperationHandler, type OperationContext } from "./types.js"; export interface OpenAPIFS { readFile(path: string): Promise; @@ -225,7 +225,7 @@ function createHTTPOperation( method: string, path: string, config: HTTPServiceConfig, -): IOperationDefinition { +): OperationSpec & { handler: OperationHandler } { const operationId = normalizeOperationId(operation, method, path); const opType = detectOperationType(method, operation); const authHeaders = getAuthHeaders(config); @@ -298,8 +298,8 @@ function createHTTPOperation( }; } -export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): IOperationDefinition[] { - const operations: IOperationDefinition[] = []; +export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): Array }> { + const operations: Array }> = []; const basePath = spec.basePath || ""; for (const [path, methods] of Object.entries(spec.paths)) { @@ -320,7 +320,7 @@ export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): IOper return operations; } -export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise { +export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise }>> { let content: string; if (fs) { content = await fs.readFile(path); @@ -332,7 +332,7 @@ export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, f return FromOpenAPI(spec, config); } -export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise { +export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise }>> { const response = await fetch(url); const spec = await response.json() as OpenAPISpec; return FromOpenAPI(spec, config); diff --git a/src/index.ts b/src/index.ts index 6fbe732..50e6a8d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,4 @@ -export { OperationType, OperationContextSchema, OperationDefinitionSchema, OperationSpecSchema, AccessControlSchema, ErrorDefinitionSchema } from "./types.js"; +export { OperationType, OperationContextSchema, OperationSpecSchema, AccessControlSchema, ErrorDefinitionSchema } from "./types.js"; export type { IOperationDefinition, OperationHandler, SubscriptionHandler, Identity, OperationEnv, OperationContext, OperationSpec, AccessControl, ErrorDefinition } from "./types.js"; export { OperationRegistry } from "./registry.js"; export { formatValueErrors, assertIsSchema, validateOrThrow, collectErrors } from "./validation.js"; diff --git a/src/registry.ts b/src/registry.ts index 7f3ac15..3e18e25 100644 --- a/src/registry.ts +++ b/src/registry.ts @@ -1,4 +1,4 @@ -import type { IOperationDefinition, OperationContext, OperationSpec } from "./types.js"; +import type { OperationContext, OperationSpec, OperationHandler, SubscriptionHandler } from "./types.js"; import { getLogger } from "@logtape/logtape"; import { Value } from "@alkdev/typebox/value"; import { assertIsSchema, validateOrThrow, collectErrors, formatValueErrors } from "./validation.js"; @@ -6,51 +6,75 @@ import { assertIsSchema, validateOrThrow, collectErrors, formatValueErrors } fro const logger = getLogger("operations:registry"); export class OperationRegistry { - private operations = new Map(); + private specs = new Map(); + private handlers = new Map(); - private getOperationId(operation: IOperationDefinition): string { - return `${operation.namespace}.${operation.name}`; + private opId(namespace: string, name: string): string { + return `${namespace}.${name}`; } - register(operation: IOperationDefinition): void { - const opId = `${operation.namespace}.${operation.name}`; - assertIsSchema(operation.inputSchema, `${opId} inputSchema`); - assertIsSchema(operation.outputSchema, `${opId} outputSchema`); - const id = this.getOperationId(operation); - this.operations.set(id, operation); + register(operation: OperationSpec & { handler?: OperationHandler | SubscriptionHandler }): void { + const id = this.opId(operation.namespace, operation.name); + assertIsSchema(operation.inputSchema, `${id} inputSchema`); + assertIsSchema(operation.outputSchema, `${id} outputSchema`); + const { handler, ...spec } = operation; + this.specs.set(id, spec); + if (handler) { + this.handlers.set(id, handler); + } logger.info(`Registered operation: ${id}`); } - registerAll(operations: IOperationDefinition[]): void { + registerAll(operations: Array): void { for (const op of operations) { this.register(op); } } - get(id: string): IOperationDefinition | undefined { - return this.operations.get(id); + registerSpec(spec: OperationSpec): void { + const id = this.opId(spec.namespace, spec.name); + assertIsSchema(spec.inputSchema, `${id} inputSchema`); + assertIsSchema(spec.outputSchema, `${id} outputSchema`); + this.specs.set(id, spec); + logger.info(`Registered spec: ${id}`); } - getByName(namespace: string, name: string): IOperationDefinition | undefined { - return this.operations.get(`${namespace}.${name}`); + registerHandler(id: string, handler: OperationHandler | SubscriptionHandler): void { + if (!this.specs.has(id)) { + throw new Error(`Cannot register handler for unknown operation: ${id}`); + } + this.handlers.set(id, handler); + logger.info(`Registered handler: ${id}`); } - list(): IOperationDefinition[] { - return Array.from(this.operations.values()); - } - - private extractSpec(operation: IOperationDefinition): OperationSpec { - const { handler: _handler, ...spec } = operation; - return spec; + get(id: string): (OperationSpec & { handler?: OperationHandler | SubscriptionHandler }) | undefined { + const spec = this.specs.get(id); + if (!spec) return undefined; + const handler = this.handlers.get(id); + return { ...spec, handler }; } getSpec(id: string): OperationSpec | undefined { - const operation = this.operations.get(id); - return operation ? this.extractSpec(operation) : undefined; + return this.specs.get(id); + } + + getHandler(id: string): OperationHandler | SubscriptionHandler | undefined { + return this.handlers.get(id); + } + + getByName(namespace: string, name: string): (OperationSpec & { handler?: OperationHandler | SubscriptionHandler }) | undefined { + return this.get(this.opId(namespace, name)); + } + + list(): Array { + return Array.from(this.specs.entries()).map(([id, spec]) => ({ + ...spec, + handler: this.handlers.get(id), + })); } getAllSpecs(): OperationSpec[] { - return this.list().map(op => this.extractSpec(op)); + return Array.from(this.specs.values()); } async execute( @@ -58,17 +82,21 @@ export class OperationRegistry { input: TInput, context: OperationContext, ): Promise { - const operation = this.operations.get(operationId); - - if (!operation) { + const spec = this.specs.get(operationId); + if (!spec) { throw new Error(`Operation not found: ${operationId}`); } - validateOrThrow(operation.inputSchema, input, `Input validation failed for ${operationId}`); + const handler = this.handlers.get(operationId); + if (!handler) { + throw new Error(`No handler registered for operation: ${operationId}`); + } - const result = await operation.handler(input, context) as TOutput; + validateOrThrow(spec.inputSchema, input, `Input validation failed for ${operationId}`); - const errors = collectErrors(operation.outputSchema, result); + const result = await handler(input, context) as TOutput; + + const errors = collectErrors(spec.outputSchema, result); if (errors.length > 0) { logger.warn(`Output validation failed for ${operationId}:\n${formatValueErrors(errors)}`); } diff --git a/src/scanner.ts b/src/scanner.ts index 4fcc313..180ac48 100644 --- a/src/scanner.ts +++ b/src/scanner.ts @@ -1,5 +1,5 @@ -import type { IOperationDefinition } from "./types.js"; -import { OperationDefinitionSchema } from "./types.js"; +import type { OperationSpec } from "./types.js"; +import { OperationSpecSchema } from "./types.js"; import { collectErrors, formatValueErrors } from "./validation.js"; import { getLogger } from "@logtape/logtape"; @@ -11,15 +11,15 @@ export interface ScannerFS { } export interface OperationManifest { - operations: Record; + operations: Record; baseUrl?: string; } export async function scanOperations( dirPath: string, fs: ScannerFS, -): Promise { - const operations: IOperationDefinition[] = []; +): Promise { + const operations: OperationSpec[] = []; try { await processDirectory(dirPath, operations, fs); @@ -37,7 +37,7 @@ export async function scanOperations( async function processDirectory( dirPath: string, - operations: IOperationDefinition[], + operations: OperationSpec[], fs: ScannerFS, ): Promise { try { @@ -53,9 +53,9 @@ async function processDirectory( const module = await import(moduleUrl); if (module.default) { - const operation = module.default as IOperationDefinition; + const operation = module.default as OperationSpec; - const errors = collectErrors(OperationDefinitionSchema, operation); + const errors = collectErrors(OperationSpecSchema, operation); if (errors.length > 0) { logger.warn(`${fullPath}: Invalid operation definition - ${formatValueErrors(errors, "")}`); diff --git a/src/subscribe.ts b/src/subscribe.ts index cc30d4e..7e6feec 100644 --- a/src/subscribe.ts +++ b/src/subscribe.ts @@ -1,4 +1,4 @@ -import type { IOperationDefinition, OperationContext } from "./types.js"; +import type { OperationContext } from "./types.js"; import { OperationRegistry } from "./registry.js"; export async function* subscribe( @@ -7,13 +7,18 @@ export async function* subscribe( input: unknown, context: OperationContext, ): AsyncGenerator { - const operation = registry.get(operationId); + const spec = registry.getSpec(operationId); - if (!operation) { + if (!spec) { throw new Error(`Operation not found: ${operationId}`); } - const handler = operation.handler; + const handler = registry.getHandler(operationId); + + if (!handler) { + throw new Error(`No handler registered for operation: ${operationId}`); + } + const generator = handler(input, context) as AsyncGenerator; try { diff --git a/test/registry.test.ts b/test/registry.test.ts index 4359bbf..ddbebf1 100644 --- a/test/registry.test.ts +++ b/test/registry.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect } from "vitest"; import { OperationRegistry } from "../src/registry.js"; -import { OperationType, type IOperationDefinition, type OperationContext } from "../src/index.js"; +import { OperationType, type IOperationDefinition, type OperationContext, type OperationSpec, type OperationHandler } from "../src/index.js"; import * as Type from "@alkdev/typebox"; import { Value } from "@alkdev/typebox/value"; @@ -19,19 +19,40 @@ function makeOperation(overrides: Partial = {}): IOperatio }; } +function makeSpec(overrides: Partial = {}): OperationSpec { + return { + name: "testOp", + namespace: "test", + version: "1.0.0", + type: OperationType.QUERY, + description: "A test operation", + inputSchema: Type.Object({ value: Type.String() }), + outputSchema: Type.Object({ result: Type.String() }), + accessControl: { requiredScopes: [] }, + ...overrides, + }; +} + +const testHandler: OperationHandler = async (input: any) => ({ result: `processed: ${input.value}` }); + describe("OperationRegistry", () => { it("registers and retrieves an operation", () => { const registry = new OperationRegistry(); const op = makeOperation(); registry.register(op); - expect(registry.get("test.testOp")).toBe(op); + const retrieved = registry.get("test.testOp")!; + expect(retrieved).toStrictEqual(op); + expect(retrieved.name).toBe("testOp"); + expect(retrieved.handler).toBeDefined(); }); it("retrieves by namespace and name", () => { const registry = new OperationRegistry(); const op = makeOperation(); registry.register(op); - expect(registry.getByName("test", "testOp")).toBe(op); + const retrieved = registry.getByName("test", "testOp")!; + expect(retrieved).toStrictEqual(op); + expect(retrieved.name).toBe("testOp"); }); it("returns undefined for missing operations", () => { @@ -90,6 +111,14 @@ describe("OperationRegistry", () => { ).rejects.toThrow("Operation not found"); }); + it("throws on missing handler", async () => { + const registry = new OperationRegistry(); + registry.registerSpec(makeSpec()); + await expect( + registry.execute("test.testOp", { value: "hello" }, {} as OperationContext) + ).rejects.toThrow("No handler registered"); + }); + it("warns on output mismatch but returns result", async () => { const registry = new OperationRegistry(); registry.register(makeOperation({ @@ -98,4 +127,52 @@ describe("OperationRegistry", () => { const result = await registry.execute("test.testOp", { value: "x" }, {} as OperationContext); expect(result).toEqual({ unexpected: "field" }); }); + + it("registerSpec and registerHandler separately", async () => { + const registry = new OperationRegistry(); + const spec = makeSpec(); + registry.registerSpec(spec); + registry.registerHandler("test.testOp", testHandler); + + const retrieved = registry.get("test.testOp")!; + expect(retrieved.name).toBe("testOp"); + expect(retrieved.handler).toBeDefined(); + + const result = await registry.execute("test.testOp", { value: "hello" }, {} as OperationContext); + expect(result).toEqual({ result: "processed: hello" }); + }); + + it("registerHandler throws for unknown operation", () => { + const registry = new OperationRegistry(); + expect(() => registry.registerHandler("unknown.op", testHandler)).toThrow("Cannot register handler for unknown operation"); + }); + + it("getHandler returns handler", () => { + const registry = new OperationRegistry(); + registry.register(makeOperation()); + expect(registry.getHandler("test.testOp")).toBeDefined(); + }); + + it("getHandler returns undefined for spec-only registration", () => { + const registry = new OperationRegistry(); + registry.registerSpec(makeSpec()); + expect(registry.getHandler("test.testOp")).toBeUndefined(); + }); + + it("register with spec-only (no handler)", () => { + const registry = new OperationRegistry(); + const spec = makeSpec(); + registry.register(spec); + const retrieved = registry.get("test.testOp")!; + expect(retrieved.name).toBe("testOp"); + expect(retrieved.handler).toBeUndefined(); + }); + + it("getSpec returns spec without handler after combined register", () => { + const registry = new OperationRegistry(); + registry.register(makeOperation()); + const spec = registry.getSpec("test.testOp")!; + expect(spec.name).toBe("testOp"); + expect((spec as any).handler).toBeUndefined(); + }); }); \ No newline at end of file