Separate handler from spec in OperationRegistry, update pubsub API

- Split OperationRegistry into separate specs and handlers maps
- Add registerSpec(), registerHandler(), getHandler() methods
- register() still accepts IOperationDefinition (backward compatible)
- execute() now requires both spec and handler, throws if missing
- Update @alkdev/pubsub integration for v0.1.0 API:
  - subscribe(type, id) now requires id parameter (use  for all events)
  - publish(type, id, payload) now requires 3 args
  - Events unwrapped from EventEnvelope via .payload
- Update buildCallHandler to use getSpec() + getHandler() separately
- Update subscribe.ts to use getHandler()
- Update buildEnv to use getAllSpecs() instead of list()
- Update scanner to validate against OperationSpecSchema
- Update from_mcp and from_openapi to use OperationSpec & { handler } types
- Remove OperationDefinitionSchema from public exports
- Add 7 new registry tests for handler separation
This commit is contained in:
2026-05-09 08:25:59 +00:00
parent c5979ecd63
commit 4f11f8e7a0
9 changed files with 210 additions and 91 deletions

View File

@@ -4,7 +4,7 @@ import { getLogger } from "@logtape/logtape";
import { OperationRegistry } from "./registry.js";
import { CallError, InfrastructureErrorCode, mapError } from "./error.js";
import { validateOrThrow } from "./validation.js";
import type { IOperationDefinition, Identity, OperationContext, AccessControl } from "./types.js";
import type { Identity, OperationContext, AccessControl, OperationSpec } from "./types.js";
const logger = getLogger("operations:call");
@@ -45,10 +45,10 @@ export type CallEventMapValue = CallRequestedEvent | CallRespondedEvent | CallAb
export const CallEventMap = CallEventSchema;
type CallPubSubMap = {
"call.requested": [CallRequestedEvent];
"call.responded": [CallRespondedEvent];
"call.aborted": [CallAbortedEvent];
"call.error": [CallErrorEvent];
"call.requested": CallRequestedEvent;
"call.responded": CallRespondedEvent;
"call.aborted": CallAbortedEvent;
"call.error": CallErrorEvent;
};
interface PendingRequest {
@@ -77,10 +77,10 @@ export class PendingRequestMap {
}
private setupSubscriptions(): void {
const respondedIter = this.pubsub.subscribe("call.responded");
const respondedIter = this.pubsub.subscribe("call.responded", "");
(async () => {
for await (const event of respondedIter) {
const responded = event as CallRespondedEvent;
for await (const envelope of respondedIter) {
const responded = envelope.payload;
const pending = this.requests.get(responded.requestId);
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
@@ -90,10 +90,10 @@ export class PendingRequestMap {
}
})();
const errorIter = this.pubsub.subscribe("call.error");
const errorIter = this.pubsub.subscribe("call.error", "");
(async () => {
for await (const event of errorIter) {
const err = event as CallErrorEvent;
for await (const envelope of errorIter) {
const err = envelope.payload;
const pending = this.requests.get(err.requestId);
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
@@ -103,10 +103,10 @@ export class PendingRequestMap {
}
})();
const abortedIter = this.pubsub.subscribe("call.aborted");
const abortedIter = this.pubsub.subscribe("call.aborted", "");
(async () => {
for await (const event of abortedIter) {
const aborted = event as CallAbortedEvent;
for await (const envelope of abortedIter) {
const aborted = envelope.payload;
const pending = this.requests.get(aborted.requestId);
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
@@ -137,7 +137,7 @@ export class PendingRequestMap {
this.requests.set(requestId, pending);
this.pubsub.publish("call.requested", {
this.pubsub.publish("call.requested", "", {
requestId,
operationId,
input,
@@ -149,14 +149,14 @@ export class PendingRequestMap {
}
respond(requestId: string, output: unknown): void {
this.pubsub.publish("call.responded", {
this.pubsub.publish("call.responded", "", {
requestId,
output,
});
}
emitError(requestId: string, code: string, message: string, details?: unknown): void {
this.pubsub.publish("call.error", {
this.pubsub.publish("call.error", "", {
requestId,
code,
message,
@@ -169,7 +169,7 @@ export class PendingRequestMap {
if (pending) {
if (pending.timer) clearTimeout(pending.timer);
this.requests.delete(requestId);
this.pubsub.publish("call.aborted", { requestId });
this.pubsub.publish("call.aborted", "", { requestId });
pending.reject(new CallError(InfrastructureErrorCode.ABORTED, `Request ${requestId} was aborted`));
}
}
@@ -186,9 +186,9 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler {
const { requestId, operationId, input, identity } = event;
try {
const operation = registry.get(operationId);
const spec = registry.getSpec(operationId);
if (!operation) {
if (!spec) {
throw new CallError(
InfrastructureErrorCode.OPERATION_NOT_FOUND,
`Operation not found: ${operationId}`,
@@ -196,7 +196,16 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler {
);
}
const accessControl: AccessControl = operation.accessControl as AccessControl;
const handler = registry.getHandler(operationId);
if (!handler) {
throw new CallError(
InfrastructureErrorCode.OPERATION_NOT_FOUND,
`No handler registered for operation: ${operationId}`,
{ operationId },
);
}
const accessControl: AccessControl = spec.accessControl as AccessControl;
if (identity && !checkAccess(accessControl, identity)) {
throw new CallError(
@@ -212,9 +221,9 @@ export function buildCallHandler(config: CallHandlerConfig): CallHandler {
identity,
};
validateOrThrow(operation.inputSchema, input, `Input validation for ${operationId}`);
validateOrThrow(spec.inputSchema, input, `Input validation for ${operationId}`);
await operation.handler(input, context);
await handler(input, context);
} catch (error) {
const callError = mapError(error);

View File

@@ -18,34 +18,34 @@ export interface EnvOptions {
export function buildEnv(options: EnvOptions): OperationEnv {
const { registry, context, allowedNamespaces, callMap } = options;
const operations = registry.list();
const specs = registry.getAllSpecs();
const namespaces: OperationEnv = {};
for (const operation of operations) {
if (allowedNamespaces && !allowedNamespaces.includes(operation.namespace)) {
for (const spec of specs) {
if (allowedNamespaces && !allowedNamespaces.includes(spec.namespace)) {
continue;
}
if (operation.type === OperationType.SUBSCRIPTION) {
if (spec.type === OperationType.SUBSCRIPTION) {
continue;
}
if (!namespaces[operation.namespace]) {
namespaces[operation.namespace] = {};
if (!namespaces[spec.namespace]) {
namespaces[spec.namespace] = {};
}
const operationId = `${operation.namespace}.${operation.name}`;
const operationId = `${spec.namespace}.${spec.name}`;
if (callMap) {
namespaces[operation.namespace][operation.name] = async (input: unknown) => {
namespaces[spec.namespace][spec.name] = async (input: unknown) => {
logger.debug(`Call protocol: ${operationId}`);
return await callMap.call(operationId, input, {
parentRequestId: context.requestId,
});
};
} else {
namespaces[operation.namespace][operation.name] = async (input: unknown) => {
namespaces[spec.namespace][spec.name] = async (input: unknown) => {
logger.debug(`Executing: ${operationId}`);
return await registry.execute(operationId, input, context);
};

View File

@@ -1,4 +1,4 @@
import type { IOperationDefinition } from "./types.js";
import type { OperationSpec, OperationHandler, OperationContext } from "./types.js";
import { OperationType } from "./types.js";
import { Type, type TSchema } from "@alkdev/typebox";
import { FromSchema } from "./from_schema.js";
@@ -18,7 +18,7 @@ export interface MCPClientConfig {
export interface MCPClientWrapper {
name: string;
client: unknown;
tools: IOperationDefinition[];
tools: Array<OperationSpec & { handler: OperationHandler }>;
}
export async function createMCPClient(
@@ -54,7 +54,7 @@ export async function createMCPClient(
logger.info(`Connected to MCP server: ${name}`);
const toolsResult = await client.listTools();
const operations: IOperationDefinition[] = toolsResult.tools.map((tool: { name: string; description?: string; inputSchema: unknown }) => {
const operations: Array<OperationSpec & { handler: OperationHandler }> = toolsResult.tools.map((tool: { name: string; description?: string; inputSchema: unknown }) => {
return {
name: tool.name,
namespace: name,
@@ -78,7 +78,7 @@ export async function createMCPClient(
return result.content;
},
} satisfies IOperationDefinition;
} satisfies OperationSpec & { handler: OperationHandler };
});
return {
@@ -126,8 +126,8 @@ export class MCPClientLoader {
return Array.from(this.clients.values());
}
getAllOperations(): IOperationDefinition[] {
const allOps: IOperationDefinition[] = [];
getAllOperations(): Array<OperationSpec & { handler: OperationHandler }> {
const allOps: Array<OperationSpec & { handler: OperationHandler }> = [];
for (const wrapper of this.clients.values()) {
for (const op of wrapper.tools) {
allOps.push(op);

View File

@@ -1,6 +1,6 @@
import * as Type from "@alkdev/typebox";
import { FromSchema } from "./from_schema.js";
import { OperationType, type IOperationDefinition, type OperationHandler, type OperationContext } from "./types.js";
import { OperationType, type OperationSpec, type OperationHandler, type OperationContext } from "./types.js";
export interface OpenAPIFS {
readFile(path: string): Promise<string>;
@@ -225,7 +225,7 @@ function createHTTPOperation(
method: string,
path: string,
config: HTTPServiceConfig,
): IOperationDefinition {
): OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> } {
const operationId = normalizeOperationId(operation, method, path);
const opType = detectOperationType(method, operation);
const authHeaders = getAuthHeaders(config);
@@ -298,8 +298,8 @@ function createHTTPOperation(
};
}
export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): IOperationDefinition[] {
const operations: IOperationDefinition[] = [];
export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): Array<OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> }> {
const operations: Array<OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> }> = [];
const basePath = spec.basePath || "";
for (const [path, methods] of Object.entries(spec.paths)) {
@@ -320,7 +320,7 @@ export function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): IOper
return operations;
}
export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise<IOperationDefinition[]> {
export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, fs?: OpenAPIFS): Promise<Array<OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> }>> {
let content: string;
if (fs) {
content = await fs.readFile(path);
@@ -332,7 +332,7 @@ export async function FromOpenAPIFile(path: string, config: HTTPServiceConfig, f
return FromOpenAPI(spec, config);
}
export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise<IOperationDefinition[]> {
export async function FromOpenAPIUrl(url: string, config: HTTPServiceConfig): Promise<Array<OperationSpec & { handler: OperationHandler<unknown, unknown, OperationContext> }>> {
const response = await fetch(url);
const spec = await response.json() as OpenAPISpec;
return FromOpenAPI(spec, config);

View File

@@ -1,4 +1,4 @@
export { OperationType, OperationContextSchema, OperationDefinitionSchema, OperationSpecSchema, AccessControlSchema, ErrorDefinitionSchema } from "./types.js";
export { OperationType, OperationContextSchema, OperationSpecSchema, AccessControlSchema, ErrorDefinitionSchema } from "./types.js";
export type { IOperationDefinition, OperationHandler, SubscriptionHandler, Identity, OperationEnv, OperationContext, OperationSpec, AccessControl, ErrorDefinition } from "./types.js";
export { OperationRegistry } from "./registry.js";
export { formatValueErrors, assertIsSchema, validateOrThrow, collectErrors } from "./validation.js";

View File

@@ -1,4 +1,4 @@
import type { IOperationDefinition, OperationContext, OperationSpec } from "./types.js";
import type { OperationContext, OperationSpec, OperationHandler, SubscriptionHandler } from "./types.js";
import { getLogger } from "@logtape/logtape";
import { Value } from "@alkdev/typebox/value";
import { assertIsSchema, validateOrThrow, collectErrors, formatValueErrors } from "./validation.js";
@@ -6,51 +6,75 @@ import { assertIsSchema, validateOrThrow, collectErrors, formatValueErrors } fro
const logger = getLogger("operations:registry");
export class OperationRegistry {
private operations = new Map<string, IOperationDefinition>();
private specs = new Map<string, OperationSpec>();
private handlers = new Map<string, OperationHandler | SubscriptionHandler>();
private getOperationId(operation: IOperationDefinition): string {
return `${operation.namespace}.${operation.name}`;
private opId(namespace: string, name: string): string {
return `${namespace}.${name}`;
}
register(operation: IOperationDefinition): void {
const opId = `${operation.namespace}.${operation.name}`;
assertIsSchema(operation.inputSchema, `${opId} inputSchema`);
assertIsSchema(operation.outputSchema, `${opId} outputSchema`);
const id = this.getOperationId(operation);
this.operations.set(id, operation);
register(operation: OperationSpec & { handler?: OperationHandler | SubscriptionHandler }): void {
const id = this.opId(operation.namespace, operation.name);
assertIsSchema(operation.inputSchema, `${id} inputSchema`);
assertIsSchema(operation.outputSchema, `${id} outputSchema`);
const { handler, ...spec } = operation;
this.specs.set(id, spec);
if (handler) {
this.handlers.set(id, handler);
}
logger.info(`Registered operation: ${id}`);
}
registerAll(operations: IOperationDefinition[]): void {
registerAll(operations: Array<OperationSpec & { handler?: OperationHandler | SubscriptionHandler }>): void {
for (const op of operations) {
this.register(op);
}
}
get(id: string): IOperationDefinition | undefined {
return this.operations.get(id);
registerSpec(spec: OperationSpec): void {
const id = this.opId(spec.namespace, spec.name);
assertIsSchema(spec.inputSchema, `${id} inputSchema`);
assertIsSchema(spec.outputSchema, `${id} outputSchema`);
this.specs.set(id, spec);
logger.info(`Registered spec: ${id}`);
}
getByName(namespace: string, name: string): IOperationDefinition | undefined {
return this.operations.get(`${namespace}.${name}`);
registerHandler(id: string, handler: OperationHandler | SubscriptionHandler): void {
if (!this.specs.has(id)) {
throw new Error(`Cannot register handler for unknown operation: ${id}`);
}
this.handlers.set(id, handler);
logger.info(`Registered handler: ${id}`);
}
list(): IOperationDefinition[] {
return Array.from(this.operations.values());
}
private extractSpec(operation: IOperationDefinition): OperationSpec {
const { handler: _handler, ...spec } = operation;
return spec;
get(id: string): (OperationSpec & { handler?: OperationHandler | SubscriptionHandler }) | undefined {
const spec = this.specs.get(id);
if (!spec) return undefined;
const handler = this.handlers.get(id);
return { ...spec, handler };
}
getSpec(id: string): OperationSpec | undefined {
const operation = this.operations.get(id);
return operation ? this.extractSpec(operation) : undefined;
return this.specs.get(id);
}
getHandler(id: string): OperationHandler | SubscriptionHandler | undefined {
return this.handlers.get(id);
}
getByName(namespace: string, name: string): (OperationSpec & { handler?: OperationHandler | SubscriptionHandler }) | undefined {
return this.get(this.opId(namespace, name));
}
list(): Array<OperationSpec & { handler?: OperationHandler | SubscriptionHandler }> {
return Array.from(this.specs.entries()).map(([id, spec]) => ({
...spec,
handler: this.handlers.get(id),
}));
}
getAllSpecs(): OperationSpec[] {
return this.list().map(op => this.extractSpec(op));
return Array.from(this.specs.values());
}
async execute<TInput = unknown, TOutput = unknown>(
@@ -58,17 +82,21 @@ export class OperationRegistry {
input: TInput,
context: OperationContext,
): Promise<TOutput> {
const operation = this.operations.get(operationId);
if (!operation) {
const spec = this.specs.get(operationId);
if (!spec) {
throw new Error(`Operation not found: ${operationId}`);
}
validateOrThrow(operation.inputSchema, input, `Input validation failed for ${operationId}`);
const handler = this.handlers.get(operationId);
if (!handler) {
throw new Error(`No handler registered for operation: ${operationId}`);
}
const result = await operation.handler(input, context) as TOutput;
validateOrThrow(spec.inputSchema, input, `Input validation failed for ${operationId}`);
const errors = collectErrors(operation.outputSchema, result);
const result = await handler(input, context) as TOutput;
const errors = collectErrors(spec.outputSchema, result);
if (errors.length > 0) {
logger.warn(`Output validation failed for ${operationId}:\n${formatValueErrors(errors)}`);
}

View File

@@ -1,5 +1,5 @@
import type { IOperationDefinition } from "./types.js";
import { OperationDefinitionSchema } from "./types.js";
import type { OperationSpec } from "./types.js";
import { OperationSpecSchema } from "./types.js";
import { collectErrors, formatValueErrors } from "./validation.js";
import { getLogger } from "@logtape/logtape";
@@ -11,15 +11,15 @@ export interface ScannerFS {
}
export interface OperationManifest {
operations: Record<string, IOperationDefinition>;
operations: Record<string, OperationSpec>;
baseUrl?: string;
}
export async function scanOperations(
dirPath: string,
fs: ScannerFS,
): Promise<IOperationDefinition[]> {
const operations: IOperationDefinition[] = [];
): Promise<OperationSpec[]> {
const operations: OperationSpec[] = [];
try {
await processDirectory(dirPath, operations, fs);
@@ -37,7 +37,7 @@ export async function scanOperations(
async function processDirectory(
dirPath: string,
operations: IOperationDefinition[],
operations: OperationSpec[],
fs: ScannerFS,
): Promise<void> {
try {
@@ -53,9 +53,9 @@ async function processDirectory(
const module = await import(moduleUrl);
if (module.default) {
const operation = module.default as IOperationDefinition;
const operation = module.default as OperationSpec;
const errors = collectErrors(OperationDefinitionSchema, operation);
const errors = collectErrors(OperationSpecSchema, operation);
if (errors.length > 0) {
logger.warn(`${fullPath}: Invalid operation definition - ${formatValueErrors(errors, "")}`);

View File

@@ -1,4 +1,4 @@
import type { IOperationDefinition, OperationContext } from "./types.js";
import type { OperationContext } from "./types.js";
import { OperationRegistry } from "./registry.js";
export async function* subscribe(
@@ -7,13 +7,18 @@ export async function* subscribe(
input: unknown,
context: OperationContext,
): AsyncGenerator<unknown, void, unknown> {
const operation = registry.get(operationId);
const spec = registry.getSpec(operationId);
if (!operation) {
if (!spec) {
throw new Error(`Operation not found: ${operationId}`);
}
const handler = operation.handler;
const handler = registry.getHandler(operationId);
if (!handler) {
throw new Error(`No handler registered for operation: ${operationId}`);
}
const generator = handler(input, context) as AsyncGenerator<unknown, void, unknown>;
try {

View File

@@ -1,6 +1,6 @@
import { describe, it, expect } from "vitest";
import { OperationRegistry } from "../src/registry.js";
import { OperationType, type IOperationDefinition, type OperationContext } from "../src/index.js";
import { OperationType, type IOperationDefinition, type OperationContext, type OperationSpec, type OperationHandler } from "../src/index.js";
import * as Type from "@alkdev/typebox";
import { Value } from "@alkdev/typebox/value";
@@ -19,19 +19,40 @@ function makeOperation(overrides: Partial<IOperationDefinition> = {}): IOperatio
};
}
function makeSpec(overrides: Partial<OperationSpec> = {}): OperationSpec {
return {
name: "testOp",
namespace: "test",
version: "1.0.0",
type: OperationType.QUERY,
description: "A test operation",
inputSchema: Type.Object({ value: Type.String() }),
outputSchema: Type.Object({ result: Type.String() }),
accessControl: { requiredScopes: [] },
...overrides,
};
}
const testHandler: OperationHandler = async (input: any) => ({ result: `processed: ${input.value}` });
describe("OperationRegistry", () => {
it("registers and retrieves an operation", () => {
const registry = new OperationRegistry();
const op = makeOperation();
registry.register(op);
expect(registry.get("test.testOp")).toBe(op);
const retrieved = registry.get("test.testOp")!;
expect(retrieved).toStrictEqual(op);
expect(retrieved.name).toBe("testOp");
expect(retrieved.handler).toBeDefined();
});
it("retrieves by namespace and name", () => {
const registry = new OperationRegistry();
const op = makeOperation();
registry.register(op);
expect(registry.getByName("test", "testOp")).toBe(op);
const retrieved = registry.getByName("test", "testOp")!;
expect(retrieved).toStrictEqual(op);
expect(retrieved.name).toBe("testOp");
});
it("returns undefined for missing operations", () => {
@@ -90,6 +111,14 @@ describe("OperationRegistry", () => {
).rejects.toThrow("Operation not found");
});
it("throws on missing handler", async () => {
const registry = new OperationRegistry();
registry.registerSpec(makeSpec());
await expect(
registry.execute("test.testOp", { value: "hello" }, {} as OperationContext)
).rejects.toThrow("No handler registered");
});
it("warns on output mismatch but returns result", async () => {
const registry = new OperationRegistry();
registry.register(makeOperation({
@@ -98,4 +127,52 @@ describe("OperationRegistry", () => {
const result = await registry.execute("test.testOp", { value: "x" }, {} as OperationContext);
expect(result).toEqual({ unexpected: "field" });
});
it("registerSpec and registerHandler separately", async () => {
const registry = new OperationRegistry();
const spec = makeSpec();
registry.registerSpec(spec);
registry.registerHandler("test.testOp", testHandler);
const retrieved = registry.get("test.testOp")!;
expect(retrieved.name).toBe("testOp");
expect(retrieved.handler).toBeDefined();
const result = await registry.execute("test.testOp", { value: "hello" }, {} as OperationContext);
expect(result).toEqual({ result: "processed: hello" });
});
it("registerHandler throws for unknown operation", () => {
const registry = new OperationRegistry();
expect(() => registry.registerHandler("unknown.op", testHandler)).toThrow("Cannot register handler for unknown operation");
});
it("getHandler returns handler", () => {
const registry = new OperationRegistry();
registry.register(makeOperation());
expect(registry.getHandler("test.testOp")).toBeDefined();
});
it("getHandler returns undefined for spec-only registration", () => {
const registry = new OperationRegistry();
registry.registerSpec(makeSpec());
expect(registry.getHandler("test.testOp")).toBeUndefined();
});
it("register with spec-only (no handler)", () => {
const registry = new OperationRegistry();
const spec = makeSpec();
registry.register(spec);
const retrieved = registry.get("test.testOp")!;
expect(retrieved.name).toBe("testOp");
expect(retrieved.handler).toBeUndefined();
});
it("getSpec returns spec without handler after combined register", () => {
const registry = new OperationRegistry();
registry.register(makeOperation());
const spec = registry.getSpec("test.testOp")!;
expect(spec.name).toBe("testOp");
expect((spec as any).handler).toBeUndefined();
});
});