diff --git a/README.md b/README.md new file mode 100644 index 0000000..f68ea46 --- /dev/null +++ b/README.md @@ -0,0 +1,95 @@ +# @alkdev/operations + +Typed operations registry, call protocol, and adapters (MCP, OpenAPI). + +Every API endpoint, agent action, and tool is an **operation** with a TypeBox schema, access control metadata, and a handler. The registry stores specs and handlers independently. The call protocol provides unified event-based invocation — `call` and `subscribe` use the same events, same `PendingRequestMap`. Adapters generate operations from OpenAPI specs, MCP servers, and filesystem manifests. + +## Install + +```bash +npm install @alkdev/operations +``` + +Optional peer dependencies (only if you need them): + +```bash +npm install @alkdev/typemap # for Zod/Valibot schema adapters +npm install @modelcontextprotocol/sdk # for MCP client integration +``` + +## Quick Start + +```ts +import { Type } from "@alkdev/typebox"; +import { OperationRegistry, OperationType } from "@alkdev/operations"; + +const registry = new OperationRegistry(); + +registry.register({ + name: "create", + namespace: "task", + version: "1.0.0", + type: OperationType.MUTATION, + description: "Create a new task", + inputSchema: Type.Object({ + title: Type.String(), + priority: Type.Optional(Type.Union([Type.Literal("low"), Type.Literal("high")])), + }), + outputSchema: Type.Object({ + id: Type.String(), + title: Type.String(), + }), + accessControl: { requiredScopes: ["task:write"] }, + handler: async (input) => { + return { id: crypto.randomUUID(), title: input.title }; + }, +}); + +const result = await registry.execute( + "task.create", + { title: "Ship it", priority: "high" }, + { identity: { id: "user-1", scopes: ["task:write"] } }, +); + +console.log(result.data); +``` + +## Core Concepts + +- **OperationSpec** — serializable descriptor: name, namespace, type, schemas, access control +- **OperationHandler** — the function that runs when the operation is called +- **OperationRegistry** — register specs and handlers, execute operations, validate I/O +- **PendingRequestMap** — event-based call protocol: `call()`, `respond()`, `emitError()`, `abort()` +- **ResponseEnvelope** — universal result wrapper with transport metadata (`local`, `http`, `mcp`) +- **buildEnv()** — generate a nested call surface for inter-operation composition + +## Entry Points + +| Import | Purpose | +|--------|---------| +| `@alkdev/operations` | Core: registry, call protocol, envelopes, env, validation, errors, FromSchema | +| `@alkdev/operations/from-mcp` | MCP client integration (requires `@modelcontextprotocol/sdk`) | +| `@alkdev/operations/from-openapi` | OpenAPI integration | +| `@alkdev/operations/from-typemap` | Zod/Valibot schema adapters (requires `@alkdev/typemap`) | + +## Guides + +| Guide | Topic | +|-------|-------| +| [Registry](docs/registry.md) | Defining, registering, and executing operations | +| [Call Protocol](docs/call-protocol.md) | PendingRequestMap, CallHandler, call/subscribe events | +| [Subscriptions](docs/subscriptions.md) | Real-time streaming with async generators | +| [Response Envelopes](docs/response-envelopes.md) | Universal result wrapper and transport metadata | +| [Access Control](docs/access-control.md) | Scope and resource-based authorization | +| [Composition](docs/env-and-composition.md) | Inter-operation calls with buildEnv | +| [Error Handling](docs/errors.md) | CallError, infrastructure codes, mapError | +| [Adapters](docs/adapters.md) | MCP, OpenAPI, FromSchema, scanner, typemap | +| [Validation](docs/validation.md) | Schema validation helpers | + +## API Reference + +For detailed type signatures, see [docs/architecture/api-surface.md](docs/architecture/api-surface.md). + +## License + +MIT OR Apache-2.0 \ No newline at end of file diff --git a/docs/access-control.md b/docs/access-control.md new file mode 100644 index 0000000..3b037fa --- /dev/null +++ b/docs/access-control.md @@ -0,0 +1,103 @@ +# Access Control + +Operations declare their access requirements in `accessControl`. The registry and call handler enforce these before executing the handler. + +## AccessControl Fields + +```ts +interface AccessControl { + requiredScopes: string[]; // ALL must be present (AND) + requiredScopesAny?: string[]; // At least ONE must match (OR) + resourceType?: string; // e.g., "project", "tool" + resourceAction?: string; // e.g., "read", "write", "execute" +} +``` + +### requiredScopes (AND) + +Every scope in the array must be present in the caller's identity: + +```ts +accessControl: { + requiredScopes: ["task:read", "task:write"], +} +``` + +The caller must have **both** `task:read` and `task:write`. + +### requiredScopesAny (OR) + +At least one scope must match: + +```ts +accessControl: { + requiredScopes: ["admin"], + requiredScopesAny: ["task:read", "task:write"], +} +``` + +The caller needs `admin` AND either `task:read` or `task:write`. + +### Resource-based access + +When both `resourceType` and `resourceAction` are set, the caller's `resources` map is checked: + +```ts +accessControl: { + requiredScopes: [], + resourceType: "project", + resourceAction: "read", +} +``` + +The identity must have `resources` with a key matching `project:*` and `"read"` in the actions array: + +```ts +identity: { + id: "user-1", + scopes: [], + resources: { "project:abc": ["read", "write"] }, +} +``` + +## Identity + +```ts +interface Identity { + id: string; + scopes: string[]; + resources?: Record; +} +``` + +## Enforcement + +### enforceAccess() + +Throws `CallError(ACCESS_DENIED)` if access is denied: + +```ts +import { enforceAccess } from "@alkdev/operations"; + +enforceAccess(spec.accessControl, context.identity, operationId, context.trusted); +``` + +Used internally by `registry.execute()` and `subscribe()`. Passes automatically if `context.trusted` is `true`. + +### checkAccess() + +Returns a boolean without throwing: + +```ts +import { checkAccess } from "@alkdev/operations"; + +if (!checkAccess(spec.accessControl, identity)) { + // deny access +} +``` + +## Trusted Contexts + +When `buildEnv()` creates an `OperationEnv` for inter-operation calls, it sets `trusted: true` on the context. This bypasses all access control checks, allowing internal operations to call each other without needing every scope. + +Direct `registry.execute()` calls within a process can also pass `trusted: true`, but **untrusted callers should go through the call protocol** which always enforces access control. \ No newline at end of file diff --git a/docs/adapters.md b/docs/adapters.md new file mode 100644 index 0000000..724fe1f --- /dev/null +++ b/docs/adapters.md @@ -0,0 +1,238 @@ +# Adapters + +Adapters register operations from external sources. Each adapter converts external definitions (JSON Schema, OpenAPI, MCP tools) into `OperationSpec` + handler pairs that plug into the registry. + +## FromSchema + +`FromSchema` converts JSON Schema to TypeBox schemas. Used internally by OpenAPI and MCP adapters, but also useful standalone: + +```ts +import { FromSchema } from "@alkdev/operations"; + +const typeboxSchema = FromSchema({ + type: "object", + properties: { + name: { type: "string" }, + age: { type: "integer" }, + }, + required: ["name"], +}); +``` + +Supports: `object`, `array`, `string`, `number`, `integer`, `boolean`, `null`, `enum`, `allOf`, `anyOf`, `oneOf`, `$ref`, `const`, and tuples. + +## Schema Adapters (from-typemap) + +If you use Zod or Valibot schemas instead of TypeBox, the `SchemaAdapter` interface lets you register operations with your preferred schema library: + +```ts +import { OperationRegistry } from "@alkdev/operations"; +import { zodAdapter } from "@alkdev/operations/from-typemap"; +import { z } from "zod"; + +const adapter = zodAdapter(); +await adapter.init(); + +const registry = new OperationRegistry({ schemaAdapter: adapter }); + +registry.register({ + name: "create", + namespace: "task", + version: "1.0.0", + type: OperationType.MUTATION, + description: "Create a task", + inputSchema: z.object({ title: z.string() }), // Zod schema! + outputSchema: z.object({ id: z.string() }), + accessControl: { requiredScopes: [] }, + handler: async (input) => ({ id: crypto.randomUUID(), ...input }), +}); +``` + +### Available adapters + +| Import | Adapter | Requires | +|--------|---------|----------| +| `defaultAdapter` | Pass-through TypeBox | Nothing | +| `zodAdapter()` | Zod → TypeBox via `@alkdev/typemap` | `@alkdev/typemap` + `zod` | +| `valibotAdapter()` | Valibot → TypeBox via `@alkdev/typemap` | `@alkdev/typemap` + `valibot` | + +Import from the `from-typemap` sub-path: + +```ts +import { zodAdapter, valibotAdapter, defaultAdapter } from "@alkdev/operations/from-typemap"; +``` + +> `defaultAdapter` is used by `OperationRegistry` when no adapter is specified. It passes TypeBox schemas through unchanged and throws for non-TypeBox schemas. + +## MCP Client (from-mcp) + +Connect to MCP servers and register their tools as operations. + +### createMCPClient + +Create a single MCP client connection: + +```ts +import { createMCPClient, closeMCPClient } from "@alkdev/operations/from-mcp"; + +const wrapper = await createMCPClient("my-server", { + command: "npx", + args: ["my-mcp-server"], + env: { API_KEY: "..." }, +}); + +// wrapper.tools is an array of OperationSpec + handler +registry.registerAll(wrapper.tools); + +// When done: +await closeMCPClient(wrapper); +``` + +For HTTP-based servers: + +```ts +const wrapper = await createMCPClient("remote-server", { + url: "https://example.com/mcp", + headers: { Authorization: "Bearer ..." }, +}); +``` + +### MCPClientLoader + +Manages multiple MCP clients: + +```ts +import { MCPClientLoader } from "@alkdev/operations/from-mcp"; + +const loader = new MCPClientLoader(); +await loader.load({ + "filesystem": { command: "npx", args: ["@modelcontextprotocol/server-filesystem", "/tmp"] }, + "github": { command: "npx", args: ["@modelcontextprotocol/server-github"], env: { GITHUB_TOKEN: "..." } }, +}); + +// Register all tools into a registry +loader.registerAll(registry); + +// Access individual clients: +const fsClient = loader.getClient("filesystem"); + +// Cleanup +await loader.closeAll(); +``` + +### MCPClientConfig + +| Field | Type | Description | +|-------|------|-------------| +| `command` | `string` | Stdio transport: command to run | +| `args` | `string[]` | Arguments for the command | +| `env` | `Record` | Environment variables | +| `cwd` | `string` | Working directory | +| `url` | `string` | HTTP transport: server URL | +| `headers` | `Record` | HTTP headers (for `url` transport) | +| `version` | `string` | Version string for the operation specs | + +MCP tool results are wrapped in `mcpEnvelope()` with structured content blocks. If `structuredContent` is present and the output schema is not `Unknown`, it's cast through the schema. + +## OpenAPI (from-openapi) + +Import REST API operations from OpenAPI specs. + +### FromOpenAPI + +```ts +import { FromOpenAPI } from "@alkdev/operations/from-openapi"; + +const operations = FromOpenAPI(spec, { + namespace: "petstore", + baseUrl: "https://petstore.example.com", + headers: { Authorization: "Bearer ..." }, +}); + +registry.registerAll(operations); +``` + +### FromOpenAPIFile + +```ts +import { FromOpenAPIFile } from "@alkdev/operations/from-openapi"; + +const operations = await FromOpenAPIFile("./openapi.json", { + namespace: "petstore", + baseUrl: "https://petstore.example.com", +}); +``` + +For Deno or other non-Node runtimes, inject an `OpenAPIFS`: + +```ts +const operations = await FromOpenAPIFile("./openapi.json", config, { + readFile: async (path) => Deno.readTextFile(path), +}); +``` + +### FromOpenAPIUrl + +```ts +import { FromOpenAPIUrl } from "@alkdev/operations/from-openapi"; + +const operations = await FromOpenAPIUrl("https://petstore.example.com/openapi.json", config); +``` + +### OpenAPIServiceRegistry + +Manages multiple OpenAPI services: + +```ts +import { OpenAPIServiceRegistry } from "@alkdev/operations/from-openapi"; + +const serviceRegistry = new OpenAPIServiceRegistry(); +serviceRegistry.add("petstore", spec, config); +await serviceRegistry.addFromUrl("github", "https://api.github.com/openapi.json", config); +serviceRegistry.registerAll(registry); +``` + +### HTTPServiceConfig + +| Field | Type | Description | +|-------|------|-------------| +| `namespace` | `string` | Namespace for generated operations | +| `baseUrl` | `string` | Base URL for HTTP requests | +| `headers` | `Record` | Default headers | +| `auth` | `object` | Auth config: `bearer`, `apiKey`, or `basic` | +| `timeout` | `number` | Request timeout in ms | +| `fetch` | `typeof fetch` | Custom fetch implementation | + +### SSE Support + +Operations with `text/event-stream` response content type are automatically typed as `SUBSCRIPTION`. The handler returns an async generator that parses SSE frames and yields `httpEnvelope` events. + +### Operation Type Detection + +- `GET` → `QUERY` +- `POST`/`PUT`/`PATCH`/`DELETE` → `MUTATION` +- Any method with `text/event-stream` response → `SUBSCRIPTION` + +## Scanner + +`scanOperations` auto-discovers operations from the filesystem by importing `.ts` files that export a default `OperationSpec`: + +```ts +import { scanOperations } from "@alkdev/operations"; + +const specs = await scanOperations("./operations", { + readdir: async function* (path) { /* ... */ }, + cwd: () => process.cwd(), +}); +``` + +The `ScannerFS` interface makes it runtime-agnostic: + +```ts +interface ScannerFS { + readdir(path: string): AsyncIterable<{ name: string; isFile: boolean; isDirectory: boolean }>; + cwd(): string; +} +``` + +Each `.ts` file must export a default that validates against `OperationSpecSchema`. Files that don't pass validation are skipped with a warning. \ No newline at end of file diff --git a/docs/call-protocol.md b/docs/call-protocol.md new file mode 100644 index 0000000..c34aa66 --- /dev/null +++ b/docs/call-protocol.md @@ -0,0 +1,152 @@ +# Call Protocol + +The call protocol provides event-based operation invocation via `@alkdev/pubsub`. It uses the same events and `PendingRequestMap` for both one-shot calls and streaming subscriptions. The key insight: **call ≡ subscribe** — a call resolves after one response event; a subscription yields events until completed or aborted. + +## PendingRequestMap + +`PendingRequestMap` is the core of the call protocol. It manages pending calls and subscriptions through a pubsub layer. + +### Creating a CallMap + +```ts +import { PendingRequestMap } from "@alkdev/operations"; + +const callMap = new PendingRequestMap(); +``` + +Optionally pass an `EventTarget` for cross-window/cross-worker communication: + +```ts +const callMap = new PendingRequestMap(myEventTarget); +``` + +### Making a Call + +```ts +const envelope = await callMap.call("task.create", { title: "Ship it" }, { + deadline: Date.now() + 5000, + identity: { id: "user-1", scopes: ["task:write"] }, +}); +``` + +This: +1. Creates a unique `requestId` +2. Publishes a `call.requested` event +3. Returns a `Promise` that resolves when `respond()` is called with that `requestId` + +### Subscribing + +```ts +const stream = callMap.subscribe("events.watch", { filter: "important" }, { + idleTimeout: 30000, + identity: { id: "user-1", scopes: ["events:read"] }, +}); + +for await (const envelope of stream) { + console.log(envelope.data); +} +``` + +This: +1. Creates a unique `requestId` +2. Publishes a `call.requested` event (same as a call) +3. Returns an `AsyncIterable` that yields events until `completed`, `aborted`, or idle timeout + +### Responding + +```ts +callMap.respond(requestId, envelope); +``` + +Sends a `call.responded` event. For calls, this resolves the promise. For subscriptions, this yields the next event. + +### Error Handling + +```ts +callMap.emitError(requestId, "VALIDATION_ERROR", "Input was invalid", { field: "title" }); +``` + +Sends a `call.error` event. For calls, this rejects the promise with a `CallError`. For subscriptions, this stops the stream with an error. + +### Completing a Subscription + +```ts +callMap.complete(requestId); +``` + +Signals that no more events will be sent. Ends the subscription stream. + +### Aborting + +```ts +callMap.abort(requestId); +``` + +Aborts a pending call or subscription. Rejects the call promise or stops the subscription stream with an `ABORTED` error code. + +## CallHandler + +`buildCallHandler()` wires a `PendingRequestMap` to an `OperationRegistry`. It listens for `call.requested` events and routes them through the registry. + +```ts +import { PendingRequestMap, buildCallHandler } from "@alkdev/operations"; + +const callMap = new PendingRequestMap(); +const handler = buildCallHandler({ registry, callMap }); + +callMap["call.requested"].subscribe(handler); +``` + +The handler: +1. Extracts `operationId`, `input`, and `identity` from the event +2. For queries/mutations: calls `registry.execute()` and responds with the result +3. For subscriptions: delegates to `subscribe()` and streams results back +4. On error: calls `callMap.emitError()` with a mapped `CallError` + +## Event Types + +| Event | Purpose | Producer | Consumer | +|-------|---------|----------|----------| +| `call.requested` | Initiate a call or subscription | Caller | Handler | +| `call.responded` | Deliver a result | Handler | Caller | +| `call.completed` | Signal end of subscription | Handler | Caller | +| `call.aborted` | Cancel request | Caller or Handler | Opposite side | +| `call.error` | Signal an error | Handler | Caller | + +Each event carries a `requestId` for correlation. + +## Event Schemas + +All events are typed TypeBox schemas available as `CallEventMap`: + +```ts +import { CallEventMap } from "@alkdev/operations"; + +// Access type schemas: +CallEventMap["call.requested"] // TypeBox schema +CallEventMap["call.responded"] // TypeBox schema +``` + +## Timeout Behavior + +**Calls** use `deadline` — an absolute timestamp (ms since epoch). If the deadline passes without a response, the promise rejects with a `TIMEOUT` error. + +**Subscriptions** use `idleTimeout` — a relative duration (ms). If no event is received within this window, the subscription is aborted with a `TIMEOUT` error. + +## Usage Pattern + +A typical server setup: + +```ts +const registry = new OperationRegistry(); +const callMap = new PendingRequestMap(); +const handler = buildCallHandler({ registry, callMap }); + +// Wire to your transport (WebSocket, HTTP, etc.) +transport.on("call.requested", handler); + +// Or directly use the registry for in-process calls: +const result = await registry.execute("task.create", input, ctx); +``` + +See [Subscriptions](subscriptions.md) for the direct async generator approach (no pubsub needed). \ No newline at end of file diff --git a/docs/env-and-composition.md b/docs/env-and-composition.md new file mode 100644 index 0000000..d784faf --- /dev/null +++ b/docs/env-and-composition.md @@ -0,0 +1,68 @@ +# Composition (buildEnv) + +Operations often need to call other operations. `buildEnv()` generates an `OperationEnv` — a nested record of namespace → operation name → caller — that lets handlers invoke other operations without knowing the registry. + +## Creating an Environment + +```ts +import { buildEnv } from "@alkdev/operations"; + +const env = buildEnv({ + registry, + context: { identity: { id: "user-1", scopes: ["task:read"] } }, + allowedNamespaces: ["task", "user"], +}); +``` + +This walks all non-subscription specs in the registry and creates a callable function for each one. + +### Result Shape + +```ts +// env mirrors the registry's namespace.name structure: +env.task.create({ title: "New task" }); // Promise +env.task.list({ filter: "active" }); // Promise +env.user.get({ id: "user-1" }); // Promise +``` + +Each function calls `registry.execute()` under the hood. + +## Trusted Calls + +All calls made through `buildEnv()` set `trusted: true` on the context. This means: + +- Access control checks are **skipped** +- The caller's identity propagates, but scopes are not enforced +- Input/output validation still runs + +This is intentional — internal composition shouldn't require every internal operation to have every scope. Access control is enforced at the boundary (the call protocol), not between internal operations. + +## Usage in Handlers + +```ts +const createAndNotify = { + name: "createAndNotify", + namespace: "task", + version: "1.0.0", + type: OperationType.MUTATION, + description: "Create a task and send a notification", + inputSchema: Type.Object({ title: Type.String(), assignee: Type.String() }), + outputSchema: Type.Object({ taskId: Type.String(), notified: Type.Boolean() }), + accessControl: { requiredScopes: ["task:write"] }, + handler: async (input, context) => { + const task = await context.env!.task.create(input); + await context.env!.notification.send({ userId: input.assignee, message: "New task" }); + return { taskId: task.data.id, notified: true }; + }, +}; +``` + +## Options + +| Option | Type | Description | +|--------|------|-------------| +| `registry` | `OperationRegistry` | Required. The registry to build from. | +| `context` | `OperationContext` | Required. Base context (identity, metadata, etc.) | +| `allowedNamespaces` | `string[]` | Optional. Only include operations from these namespaces. | + +Subscriptions are excluded from the env (they can't be awaited). Use `subscribe()` directly for subscription composition. \ No newline at end of file diff --git a/docs/errors.md b/docs/errors.md new file mode 100644 index 0000000..663b868 --- /dev/null +++ b/docs/errors.md @@ -0,0 +1,77 @@ +# Error Handling + +## CallError + +All operational errors are represented as `CallError`, which extends `Error` with a structured `code` and optional `details`: + +```ts +class CallError extends Error { + readonly code: CallErrorCode; + readonly details?: unknown; +} +``` + +```ts +import { CallError, InfrastructureErrorCode } from "@alkdev/operations"; + +throw new CallError(InfrastructureErrorCode.OPERATION_NOT_FOUND, "Operation not found: foo.bar", { operationId: "foo.bar" }); +``` + +## Infrastructure Error Codes + +Built-in codes for framework-level errors: + +| Code | When | +|------|------| +| `OPERATION_NOT_FOUND` | No spec or handler registered for the operation ID | +| `ACCESS_DENIED` | Caller lacks required scopes or resource access | +| `VALIDATION_ERROR` | Input fails schema validation | +| `TIMEOUT` | Call or subscription timed out (deadline or idle) | +| `ABORTED` | Request was explicitly aborted | +| `EXECUTION_ERROR` | Handler threw an Error that didn't match any declared error code | +| `UNKNOWN_ERROR` | Non-Error value thrown from handler | + +You can also use custom error codes as strings: + +```ts +throw new CallError("INVALID_INPUT", "Title is required", { field: "title" }); +``` + +## Declared Errors + +Operations can declare expected error codes in their spec: + +```ts +{ + errorSchemas: [ + { code: "INVALID_INPUT", description: "Input validation failed", schema: Type.Object({ field: Type.String() }) }, + { code: "NOT_FOUND", description: "Task not found", schema: Type.Object({ id: Type.String() }) }, + ], +} +``` + +## mapError + +`mapError()` normalizes thrown values into `CallError`: + +```ts +import { mapError } from "@alkdev/operations"; + +const callError = mapError(thrownValue, spec.errorSchemas); +``` + +Logic: +1. If already a `CallError`, returns it as-is +2. If an `Error`, checks if its message matches any declared error code prefix (`CODE:` or exact `CODE`) — returns a `CallError` with that code +3. Otherwise, returns `CallError(EXECUTION_ERROR, error.message, error)` + +This is used internally by `buildCallHandler()` to map handler errors into the call protocol error format. + +## Error Propagation + +| Context | Behavior | +|---------|----------| +| `registry.execute()` | Throws `CallError` directly | +| `subscribe()` | Throws `CallError` into the async generator | +| `PendingRequestMap` | Emits `call.error` event, rejected promise or stopped stream | +| `buildEnv()` calls | Propagates `CallError` from the nested `execute()` | \ No newline at end of file diff --git a/docs/registry.md b/docs/registry.md new file mode 100644 index 0000000..e366f61 --- /dev/null +++ b/docs/registry.md @@ -0,0 +1,174 @@ +# Registry + +The `OperationRegistry` is the central store for operation specs and handlers. It handles registration, validation, access control enforcement, and execution. + +## Operation Types + +```ts +enum OperationType { + QUERY = "query", + MUTATION = "mutation", + SUBSCRIPTION = "subscription", +} +``` + +- **Query** — read-only, no side effects +- **Mutation** — writes state, side effects +- **Subscription** — streams results over time (async generator handler) + +## Defining an Operation + +Every operation has a **spec** (serializable metadata) and optionally a **handler** (the function that runs). + +```ts +import { Type } from "@alkdev/typebox"; +import { OperationType } from "@alkdev/operations"; + +const createTask = { + name: "create", + namespace: "task", + version: "1.0.0", + type: OperationType.MUTATION, + description: "Create a new task", + inputSchema: Type.Object({ + title: Type.String(), + priority: Type.Optional(Type.Union([Type.Literal("low"), Type.Literal("high")])), + }), + outputSchema: Type.Object({ + id: Type.String(), + title: Type.String(), + }), + accessControl: { + requiredScopes: ["task:write"], + }, + handler: async (input: { title: string; priority?: string }) => { + return { id: crypto.randomUUID(), title: input.title }; + }, +}; +``` + +### Spec Fields + +| Field | Type | Required | Description | +|-------|------|----------|-------------| +| `name` | `string` | yes | Operation name within its namespace | +| `namespace` | `string` | yes | Grouping (e.g. `"task"`, `"user"`) | +| `version` | `string` | yes | Semantic version | +| `type` | `OperationType` | yes | `query`, `mutation`, or `subscription` | +| `description` | `string` | yes | Human-readable description | +| `inputSchema` | `TSchema` | yes | TypeBox schema for input validation | +| `outputSchema` | `TSchema` | yes | TypeBox schema for output; use `Type.Unknown()` if untyped | +| `accessControl` | `AccessControl` | yes | Scopes and resource requirements (see [Access Control](access-control.md)) | +| `title` | `string` | no | Human-readable title | +| `tags` | `string[]` | no | Tags for filtering/grouping | +| `errorSchemas` | `ErrorDefinition[]` | no | Declared error codes and their schemas | +| `_meta` | `Record` | no | Arbitrary metadata | + +### Handler Signature + +**Query/Mutation** handlers return a value (or `ResponseEnvelope`): + +```ts +type OperationHandler = ( + input: TInput, + context: OperationContext, +) => Promise | TOutput; +``` + +**Subscription** handlers must be async generators: + +```ts +type SubscriptionHandler = ( + input: TInput, + context: OperationContext, +) => AsyncGenerator; +``` + +## Registering Operations + +### Combined registration + +```ts +const registry = new OperationRegistry(); +registry.register(createTask); +``` + +`register()` stores both the spec and the handler. The `handler` field is optional — you can register the spec first and add the handler later. + +### Batch registration + +```ts +registry.registerAll([createTask, listTasks, deleteTask]); +``` + +### Separate spec and handler + +```ts +registry.registerSpec(mySpec); +registry.registerHandler("task.create", myHandler); +``` + +This is useful when specs come from one source (e.g., OpenAPI import) and handlers from another. + +> `registerHandler` throws if no spec exists for the operation ID. + +## Executing Operations + +```ts +const envelope = await registry.execute( + "task.create", + { title: "Ship it" }, + { identity: { id: "user-1", scopes: ["task:write"] } }, +); +``` + +What `execute()` does: + +1. Looks up the spec by `namespace.name` +2. Looks up the handler +3. **Enforces access control** (unless `context.trusted` is `true`) +4. **Validates input** against the spec's `inputSchema` +5. Runs the handler +6. **Wraps the result** in a `ResponseEnvelope` if not already one +7. **Casts the output** through `outputSchema` and warns on validation errors + +Returns `ResponseEnvelope`. See [Response Envelopes](response-envelopes.md). + +### Execution context + +```ts +interface OperationContext { + requestId?: string; + parentRequestId?: string; + identity?: Identity; + trusted?: boolean; // set by buildEnv, not by callers + metadata?: Record; + env?: OperationEnv; // injected by buildEnv for inter-op calls +} +``` + +## Querying the Registry + +```ts +registry.get("task.create"); // spec + handler, or undefined +registry.getSpec("task.create"); // spec only +registry.getHandler("task.create"); // handler only +registry.getByName("task", "create"); // same as get("task.create") +registry.list(); // all specs + handlers +registry.getAllSpecs(); // all specs only +``` + +## Schema Adapters + +By default, the registry expects TypeBox schemas. If you use Zod or Valibot, pass a schema adapter: + +```ts +import { zodAdapter } from "@alkdev/operations/from-typemap"; + +const adapter = zodAdapter(); +await adapter.init(); + +const registry = new OperationRegistry({ schemaAdapter: adapter }); +``` + +See [Adapters](adapters.md) for details. \ No newline at end of file diff --git a/docs/response-envelopes.md b/docs/response-envelopes.md new file mode 100644 index 0000000..d8297f9 --- /dev/null +++ b/docs/response-envelopes.md @@ -0,0 +1,111 @@ +# Response Envelopes + +All operation results are wrapped in a `ResponseEnvelope` that carries transport metadata alongside the data. This provides a uniform result type regardless of whether the operation ran locally, over HTTP, or via MCP. + +## Structure + +```ts +interface ResponseEnvelope { + data: T; + meta: ResponseMeta; +} +``` + +`meta.source` is the discriminant: + +| Source | Type | Carries | +|--------|------|---------| +| `"local"` | `LocalResponseMeta` | `operationId`, `timestamp` | +| `"http"` | `HTTPResponseMeta` | `statusCode`, `headers`, `contentType` | +| `"mcp"` | `MCPResponseMeta` | `isError`, `content[]`, `structuredContent?`, `_meta?` | + +## Creating Envelopes + +### localEnvelope + +For in-process results: + +```ts +import { localEnvelope } from "@alkdev/operations"; + +const env = localEnvelope({ id: "123", title: "My task" }, "task.create"); +``` + +### httpEnvelope + +For HTTP-sourced results (e.g., OpenAPI adapter): + +```ts +import { httpEnvelope } from "@alkdev/operations"; + +const env = httpEnvelope(data, { + statusCode: 200, + headers: { "content-type": "application/json" }, + contentType: "application/json", +}); +``` + +### mcpEnvelope + +For MCP-sourced results: + +```ts +import { mcpEnvelope } from "@alkdev/operations"; + +const env = mcpEnvelope(data, { + isError: false, + content: [{ type: "text", text: "Done" }], +}); +``` + +## Unwrapping + +```ts +import { unwrap } from "@alkdev/operations"; + +const result = unwrap(envelope); +``` + +Returns `envelope.data`, discarding transport metadata. + +## Detecting Envelopes + +```ts +import { isResponseEnvelope } from "@alkdev/operations"; + +if (isResponseEnvelope(maybeEnvelope)) { + // TypeScript narrows to ResponseEnvelope +} +``` + +Checks that the value has `data` and `meta` with a recognized `source` ("local" | "http" | "mcp") and the appropriate source-specific fields. + +## Automatic Wrapping + +When an operation handler returns a plain value (not a `ResponseEnvelope`), `registry.execute()` and `subscribe()` wrap it in a `localEnvelope()` automatically. If the handler already returns a `ResponseEnvelope`, it passes through unchanged. + +## MCP Content Blocks + +MCP responses include structured content blocks: + +```ts +type MCPContentBlock = + | { type: "text"; text: string; annotations?: MCPAnnotations } + | { type: "image"; data: string; mimeType: string; annotations?: MCPAnnotations } + | { type: "audio"; data: string; mimeType: string; annotations?: MCPAnnotations } + | { type: "resource"; resource: MCPResourceContent; annotations?: MCPAnnotations } + | { type: "resource_link"; uri: string; name: string; description?: string; mimeType?: string } +``` + +## TypeBox Schemas + +Both `ResponseEnvelopeSchema` and `ResponseMetaSchema` are available for runtime validation: + +```ts +import { ResponseEnvelopeSchema, ResponseMetaSchema } from "@alkdev/operations"; +import { Value } from "@alkdev/typebox/value"; + +if (Value.Check(ResponseEnvelopeSchema, value)) { + // valid envelope +} +``` \ No newline at end of file diff --git a/docs/subscriptions.md b/docs/subscriptions.md new file mode 100644 index 0000000..45cae75 --- /dev/null +++ b/docs/subscriptions.md @@ -0,0 +1,92 @@ +# Subscriptions + +Subscription operations stream results over time using async generators. There are two ways to use them: + +1. **Direct** — `subscribe()` calls the handler in-process, no pubsub needed +2. **Via call protocol** — `PendingRequestMap.subscribe()` routes through the pubsub layer + +## Defining a Subscription Operation + +Subscription handlers must be **async generator functions** (`async function*`): + +```ts +import { Type } from "@alkdev/typebox"; +import { OperationType } from "@alkdev/operations"; + +const watchTasks = { + name: "watch", + namespace: "task", + version: "1.0.0", + type: OperationType.SUBSCRIPTION, + description: "Watch for task changes", + inputSchema: Type.Object({ filter: Type.Optional(Type.String()) }), + outputSchema: Type.Object({ id: Type.String(), title: Type.String(), status: Type.String() }), + accessControl: { requiredScopes: ["task:read"] }, + handler: async function* (input, context) { + for (const event of someEventSource(input.filter)) { + yield event; + } + }, +}; +``` + +> The registry validates at registration time that subscription handlers are async generators. A regular async function will throw. + +## Direct Subscription: `subscribe()` + +The `subscribe()` function executes a subscription handler and returns an async generator. No pubsub or call protocol needed. + +```ts +import { subscribe } from "@alkdev/operations"; + +const stream = subscribe(registry, "task.watch", { filter: "important" }, context); + +for await (const envelope of stream) { + console.log(envelope.data); +} +``` + +`subscribe()`: +1. Looks up the spec and handler +2. Enforces access control +3. Validates input +4. Verifies the handler returns an async iterable +5. Yields each value, wrapping non-envelope values in `localEnvelope()` + +### Error handling + +If the handler throws, the error propagates as a `CallError`. If the handler yields a non-iterable, `subscribe()` throws `EXECUTION_ERROR`. + +### Cleanup + +When the consumer breaks out of the `for await` loop, `subscribe()` calls `generator.return()` to clean up the handler. + +## Call Protocol Subscription + +When routing through `PendingRequestMap`: + +```ts +const stream = callMap.subscribe("task.watch", { filter: "important" }, { + idleTimeout: 30000, + identity: { id: "user-1", scopes: ["task:read"] }, +}); + +for await (const envelope of stream) { + console.log(envelope.data); +} +``` + +The `CallHandler` will: +1. Call `subscribe()` on the registry +2. Stream each yielded envelope via `callMap.respond()` +3. Call `callMap.complete()` when the generator finishes +4. Call `callMap.emitError()` if an error occurs + +## Choosing Between Direct and Call Protocol + +| | Direct (`subscribe()`) | Call Protocol (`callMap.subscribe()`) | +|---|---|---| +| **Use when** | In-process, same runtime | Cross-process, remote callers | +| **Transport** | Direct function call | PubSub events | +| **Timeout** | Managed by consumer | `idleTimeout` parameter | +| **Abort** | Consumer breaks loop | `callMap.abort(requestId)` | \ No newline at end of file diff --git a/docs/validation.md b/docs/validation.md new file mode 100644 index 0000000..3273148 --- /dev/null +++ b/docs/validation.md @@ -0,0 +1,53 @@ +# Validation + +TypeBox-based schema validation helpers for operation inputs and outputs. + +## validateOrThrow + +Validates a value against a TypeBox schema. Throws on failure with formatted error messages: + +```ts +import { validateOrThrow } from "@alkdev/operations"; + +validateOrThrow(schema, input, "Input validation failed for task.create"); +``` + +Used internally by `registry.execute()` to validate inputs before running handlers. + +## collectErrors + +Returns an array of errors without throwing: + +```ts +import { collectErrors } from "@alkdev/operations"; + +const errors = collectErrors(schema, value); +// [{ path: "/title", message: "Expected string" }, ...] +``` + +Used internally by `registry.execute()` to warn on output validation failures. + +## assertIsSchema + +Validates that a value is a TypeBox schema. Throws if not: + +```ts +import { assertIsSchema } from "@alkdev/operations"; + +assertIsSchema(maybeSchema, "task.create input"); +``` + +Useful when receiving schemas from external sources (JSON, OpenAPI) before passing them to the registry. + +## formatValueErrors + +Formats an iterable of `{ path, message }` errors into a human-readable string: + +```ts +import { formatValueErrors } from "@alkdev/operations"; + +const formatted = formatValueErrors(errors); +// " - /title: Expected string\n - /priority: Expected union member" +``` + +Accepts an optional indent prefix (default `" - "`). \ No newline at end of file