Update architecture docs for handler separation and pubsub API changes

- api-surface.md: Updated registry API table (registerSpec, registerHandler,
  getHandler, separated spec/handler storage), OperationSpec description,
  IOperationDefinition marked as convenience type, adapter return types
- call-protocol.md: Added pubsub EventEnvelope unwrapping details,
  subscribe(type, id) 2-arg API, handler separation in buildCallHandler
  and subscribe(), handler separation section
- adapters.md: Updated return types (OperationSpec & { handler }),
  scanner validates against OperationSpecSchema, new module shape examples
  showing spec-only and spec+handler patterns, typemap mention
- README.md: Core principle updated for spec/handler separation
- build-distribution.md: Updated pubsub dep description, registry.ts description
- AGENTS.md: Updated key points, source layout, provenance status
This commit is contained in:
2026-05-09 08:34:41 +00:00
parent 4f11f8e7a0
commit d0017df2bf
6 changed files with 109 additions and 58 deletions

View File

@@ -1,6 +1,6 @@
---
status: draft
last_updated: 2026-04-30
last_updated: 2026-05-09
---
# @alkdev/operations Architecture
@@ -18,7 +18,7 @@ Extracted from `@alkdev/alkhub_ts/packages/core/operations/` and `packages/core/
## Core Principle
**The operation definition is the contract.** Every API endpoint, agent action, coordination tool, and MCP tool is an `IOperationDefinition` with typed input/output schemas, access control, and a handler. The registry executes them. The call protocol routes them. Adapters generate them from external specs.
**The spec is the contract; the handler is the runtime.** Every API endpoint, agent action, coordination tool, and MCP tool has an `OperationSpec` (serializable, hashable descriptor) and optionally a handler function. The registry stores specs and handlers separately — they can be registered together with `register()` or independently with `registerSpec()` and `registerHandler()`. The call protocol routes invocations through specs. Adapters generate specs (and handlers) from external definitions.
All paths funnel into the same registry:
@@ -34,8 +34,8 @@ Access control, validation, and error handling are consistent regardless of entr
## What This Package Provides
- **Core types** — `IOperationDefinition`, `OperationSpec`, `OperationType`, `AccessControl`, `Identity`, `OperationContext`
- **Registry** — `OperationRegistry` with register, execute, validate, spec extraction
- **Core types** — `OperationSpec`, `IOperationDefinition`, `OperationType`, `AccessControl`, `Identity`, `OperationContext`, `OperationHandler`, `SubscriptionHandler`
- **Registry** — `OperationRegistry` with `register`, `registerSpec`, `registerHandler`, `execute`, `getSpec`, `getHandler`, spec extraction
- **Call protocol** — `PendingRequestMap`, `CallHandler`, `call≡subscribe` event semantics
- **Subscribe** — `subscribe()` for `AsyncGenerator`-based subscription operations
- **Env builder** — `buildEnv()` for nested operation calls (direct or call protocol mode)

View File

@@ -1,6 +1,6 @@
---
status: draft
last_updated: 2026-04-30
last_updated: 2026-05-09
---
# Adapters
@@ -14,7 +14,7 @@ How `FromSchema`, `FromOpenAPI`, `from_mcp`, and `scanner` work. How to add new
### Purpose
Converts JSON Schema to TypeBox `TSchema`. Required because `IOperationDefinition.inputSchema` and `outputSchema` must be TypeBox schemas (for `Value.Check` validation), but external specs (OpenAPI, MCP) provide JSON Schema.
Converts JSON Schema to TypeBox `TSchema`. Required because `OperationSpec.inputSchema` and `outputSchema` must be TypeBox schemas (for `Value.Check` validation), but external specs (OpenAPI, MCP) provide JSON Schema. In the future, `@alkdev/typemap` may replace or supplement `FromSchema` to support Zod and Valibot input schemas as well.
### Conversion Rules
@@ -57,12 +57,12 @@ const typeboxSchema = FromSchema({
### Purpose
Generates `IOperationDefinition[]` from OpenAPI specs. Each path+method combination becomes an operation with an auto-generated `fetch` handler.
Generates `OperationSpec & { handler }[]` from OpenAPI specs. Each path+method combination becomes an operation with an auto-generated `fetch` handler.
### `FromOpenAPI(spec, config)`
```ts
function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): IOperationDefinition[]
function FromOpenAPI(spec: OpenAPISpec, config: HTTPServiceConfig): Array<OperationSpec & { handler: OperationHandler }>
```
Processes all paths in the spec. For each path and method combination:
@@ -86,7 +86,7 @@ async function FromOpenAPIFile(
path: string,
config: HTTPServiceConfig,
fs?: OpenAPIFS,
): Promise<IOperationDefinition[]>
): Promise<Array<OperationSpec & { handler: OperationHandler }>>
```
Reads an OpenAPI JSON file. If `fs` is provided, uses `fs.readFile()` (runtime-agnostic). Otherwise, uses Node.js `node:fs/promises`.
@@ -97,7 +97,7 @@ Reads an OpenAPI JSON file. If `fs` is provided, uses `fs.readFile()` (runtime-a
async function FromOpenAPIUrl(
url: string,
config: HTTPServiceConfig,
): Promise<IOperationDefinition[]>
): Promise<Array<OperationSpec & { handler: OperationHandler }>>
```
Fetches an OpenAPI JSON spec from a URL.
@@ -151,7 +151,7 @@ Injectable filesystem interface for runtime-agnostic file reading. See [ADR-002]
### Purpose
Connects to MCP (Model Context Protocol) servers and wraps their tools as `IOperationDefinition[]`. Supports both stdio and HTTP transports.
Connects to MCP (Model Context Protocol) servers and wraps their tools as `OperationSpec & { handler }[]`. Supports both stdio and HTTP transports.
### `createMCPClient(name, config)`
@@ -166,7 +166,7 @@ async function createMCPClient(
2. Create transport: `StreamableHTTPClientTransport` for `url` config, `StdioClientTransport` for `command` config
3. Connect the client
4. Call `client.listTools()` to discover available tools
5. For each tool, create an `IOperationDefinition`:
5. For each tool, create a `OperationSpec & { handler }`:
- `name`: tool name
- `namespace`: the `name` parameter (used as grouping)
- `type`: `MUTATION` (all MCP tools are mutations)
@@ -197,7 +197,7 @@ class MCPClientLoader {
async load(config: Record<string, MCPClientConfig>): Promise<MCPClientWrapper[]>
getClient(name: string): MCPClientWrapper | undefined
getAllWrappers(): MCPClientWrapper[]
getAllOperations(): IOperationDefinition[]
getAllOperations(): Array<OperationSpec & { handler: OperationHandler }>
async closeAll(): Promise<void>
}
```
@@ -215,7 +215,7 @@ Manages multiple MCP client connections. `load()` connects to all configured ser
### Purpose
Auto-discovers operation definitions from the filesystem. Recursively scans `.ts` files, imports them, and validates that the default export satisfies `OperationDefinitionSchema`.
Auto-discovers operation specs from the filesystem. Recursively scans `.ts` files, imports them, and validates that the default export satisfies `OperationSpecSchema`. Handlers must be registered separately via `registry.registerHandler()`.
### `scanOperations(dirPath, fs)`
@@ -223,15 +223,17 @@ Auto-discovers operation definitions from the filesystem. Recursively scans `.ts
async function scanOperations(
dirPath: string,
fs: ScannerFS,
): Promise<IOperationDefinition[]>
): Promise<OperationSpec[]>
```
1. Walk directory tree using `fs.readdir()`
2. For each `.ts` file, construct a `file://` URL and dynamic `import()`
3. If the module has a default export, validate it against `OperationDefinitionSchema` using `collectErrors`
4. Valid operations are added to the result array; invalid ones log a warning and are skipped
3. If the module has a default export, validate it against `OperationSpecSchema` using `collectErrors`
4. Valid specs are added to the result array; invalid ones log a warning and are skipped
5. Directories are recursed
Note: The scanner validates against `OperationSpecSchema` (no handler field). If a scanned module exports both spec and handler, use `registry.register()` instead.
### `ScannerFS`
```ts
@@ -245,6 +247,8 @@ Injectable filesystem interface. No `Deno.*` globals or Node-specific imports in
### Expected Module Shape
#### Spec + handler together (legacy, still supported)
```ts
// operations/myOperation.ts
import { Type } from "@alkdev/typebox"
@@ -263,18 +267,45 @@ export default {
} satisfies IOperationDefinition
```
#### Spec only (recommended for scanned modules)
```ts
// operations/myOperation.ts
import { Type } from "@alkdev/typebox"
import { OperationType, type OperationSpec } from "@alkdev/operations"
export default {
name: "myOperation",
namespace: "myapp",
version: "1.0.0",
type: OperationType.QUERY,
description: "Does something useful",
inputSchema: Type.Object({ name: Type.String() }),
outputSchema: Type.Object({ result: Type.String() }),
accessControl: { requiredScopes: ["read"] },
} satisfies OperationSpec
```
Then register the handler separately:
```ts
registry.registerSpec(scannedSpec)
registry.registerHandler("myapp.myOperation", myHandler)
```
## Adding a New Adapter
To add a new adapter (e.g., `from_grpc`):
1. **Create `src/from_grpc.ts`** — implement the adapter that produces `IOperationDefinition[]` from gRPC service definitions
1. **Create `src/from_grpc.ts`** — implement the adapter that produces `OperationSpec[]` (spec-only) or `Array<OperationSpec & { handler }>` (spec+handler)
2. **Export from `src/index.ts`** — add named exports to the barrel
3. **If the adapter has peer dependencies**:
- Add to `peerDependencies` and `peerDependenciesMeta` in `package.json`
- Add a sub-path entry in `exports` (e.g., `"./from-grpc"`)
- Add a separate entry in `tsup.config.ts`
- See [ADR-003](decisions/003-peer-dep-adapters.md)
4. **Inject runtime dependencies** — follow the `ScannerFS` / `OpenAPIFS` pattern for any filesystem or platform-specific APIs. See [ADR-002](decisions/002-fs-injection.md)
5. **Use `FromSchema`** for any JSON Schema → TypeBox conversion needed by the adapter
6. **Write tests** — test the adapter in isolation, mock external services
7. **Update architecture docs**add adapter section here and update the API surface table
4. **If handlers are provided**, they can be registered alongside specs or separately via `registry.registerHandler()`
5. **Inject runtime dependencies** follow the `ScannerFS` / `OpenAPIFS` pattern for any filesystem or platform-specific APIs. See [ADR-002](decisions/002-fs-injection.md)
6. **Use `FromSchema`** for any JSON Schema → TypeBox conversion needed by the adapter (or `@alkdev/typemap` for Zod/Valibot)
7. **Write tests**test the adapter in isolation, mock external services
8. **Update architecture docs** — add adapter section here and update the API surface table

View File

@@ -1,6 +1,6 @@
---
status: draft
last_updated: 2026-04-30
last_updated: 2026-05-09
---
# API Surface
@@ -103,7 +103,7 @@ interface OperationSpec<TInput = unknown, TOutput = unknown> {
}
```
Serializable, hashable subset of an operation definition. No handler — safe to send over the wire.
Serializable, hashable descriptor. No handler — safe to send over the wire, persist, or use as a template for ujsx tree interpretation. `Value.Hash(inputSchema)` provides structural deduplication keys.
### `IOperationDefinition`
@@ -113,7 +113,7 @@ interface IOperationDefinition<TInput, TOutput, TContext> extends OperationSpec<
}
```
Full definition including the runtime handler. Registered with `OperationRegistry`.
Convenience type combining spec and handler. Still supported by `register()` for backward compatibility, but the registry now stores them separately internally.
### `OperationHandler` / `SubscriptionHandler`
@@ -141,19 +141,26 @@ Namespace-keyed operation map. Accessed as `env.namespace.operationName(input)`.
### `OperationRegistry`
The registry stores specs and handlers in separate internal maps. Specs are serializable descriptors; handlers are runtime functions. They can be registered together or separately.
| Method | Signature | Description |
|--------|-----------|-------------|
| `register(operation)` | `(operation: IOperationDefinition) => void` | Register by `{namespace}.{name}` key. Validates schemas. |
| `registerAll(operations)` | `(operations: IOperationDefinition[]) => void` | Bulk register. |
| `get(id)` | `(id: string) => IOperationDefinition \| undefined` | Get by full id (`"namespace.name"`). |
| `getByName(namespace, name)` | `(namespace: string, name: string) => IOperationDefinition \| undefined` | Get by parts. |
| `list()` | `() => IOperationDefinition[]` | All registered operations. |
| `register(operation)` | `(operation: OperationSpec & { handler?: OperationHandler \| SubscriptionHandler }) => void` | Register spec + optional handler by `{namespace}.{name}` key. Validates schemas. |
| `registerAll(operations)` | `(operations: Array<OperationSpec & { handler?: ... }>) => void` | Bulk register. |
| `registerSpec(spec)` | `(spec: OperationSpec) => void` | Register spec only (no handler). Validates schemas. |
| `registerHandler(id, handler)` | `(id: string, handler: OperationHandler \| SubscriptionHandler) => void` | Register handler for existing spec. Throws if spec not found. |
| `get(id)` | `(id: string) => (OperationSpec & { handler?: ... }) \| undefined` | Get spec + handler (if registered) by full id. |
| `getSpec(id)` | `(id: string) => OperationSpec \| undefined` | Serializable spec (no handler). |
| `getHandler(id)` | `(id: string) => OperationHandler \| SubscriptionHandler \| undefined` | Handler only. `undefined` if spec registered without handler. |
| `getByName(namespace, name)` | `(namespace: string, name: string) => (OperationSpec & { handler?: ... }) \| undefined` | Get by parts. |
| `list()` | `() => Array<OperationSpec & { handler?: ... }>` | All registered entries (spec + handler if present). |
| `getAllSpecs()` | `() => OperationSpec[]` | All serializable specs. |
| `execute(operationId, input, context)` | `(id: string, input: TInput, ctx: OperationContext) => Promise<TOutput>` | Validate input, run handler, warn on output mismatch. Throws if not found or validation fails. |
| `execute(operationId, input, context)` | `(id: string, input: TInput, ctx: OperationContext) => Promise<TOutput>` | Validate input, run handler, warn on output mismatch. Throws if spec or handler not found. |
Registration key format: `{namespace}.{name}`. Overwrite on duplicate.
Specs and handlers can be registered independently: `registerSpec()` then `registerHandler()` for the same id, or `register()` with `{ ...spec, handler }` in one call. `execute()` requires both — throws `"Operation not found"` if spec missing, `"No handler registered"` if handler missing.
`execute` validates input with `validateOrThrow` before calling the handler. Output validation uses `collectErrors` and logs warnings — it does not throw.
## Call Protocol
@@ -305,10 +312,10 @@ See [adapters.md](adapters.md) for detailed adapter documentation.
| Adapter | Import | Description |
|---------|--------|-------------|
| `FromOpenAPI` | Main barrel | OpenAPI spec → `IOperationDefinition[]` |
| `FromOpenAPIFile` | Main barrel | OpenAPI file → `IOperationDefinition[]` |
| `FromOpenAPIUrl` | Main barrel | OpenAPI URL → `IOperationDefinition[]` |
| `FromOpenAPI` | Main barrel | OpenAPI spec → `OperationSpec & { handler }[]` |
| `FromOpenAPIFile` | Main barrel | OpenAPI file → `OperationSpec & { handler }[]` |
| `FromOpenAPIUrl` | Main barrel | OpenAPI URL → `OperationSpec & { handler }[]` |
| `createMCPClient` | `from-mcp` sub-path | MCP server → `MCPClientWrapper` with tool operations |
| `closeMCPClient` | `from-mcp` sub-path | Close MCP client connection |
| `MCPClientLoader` | `from-mcp` sub-path | Manage multiple MCP servers |
| `scanOperations` | Main barrel | Filesystem auto-discovery of operation definitions |
| `scanOperations` | Main barrel | Filesystem auto-discovery of operation specs |

View File

@@ -1,6 +1,6 @@
---
status: draft
last_updated: 2026-04-30
last_updated: 2026-05-09
---
# Build & Distribution
@@ -14,7 +14,7 @@ Dependencies, project structure, sub-path exports, peer deps, and build tooling.
| Package | Purpose |
|---------|---------|
| `@alkdev/typebox` | Schema system. `Type` for building schemas, `Value` for validation, `KindGuard` for schema assertion. |
| `@alkdev/pubsub` | Call protocol transport. `PendingRequestMap` creates an internal `PubSub` for event routing. |
| `@alkdev/pubsub` | Call protocol transport. `PendingRequestMap` creates an internal `PubSub` for event routing. Uses `subscribe(type, id)` and `publish(type, id, payload)` API with `EventEnvelope` wrapping. |
| `@logtape/logtape` | Structured logging. Direct import, no wrapper. See [ADR-001](decisions/001-logger-direct-import.md). |
### Peer (Optional)
@@ -41,7 +41,7 @@ Dependencies, project structure, sub-path exports, peer deps, and build tooling.
src/
index.ts # Barrel: re-exports all public API
types.ts # Core types: IOperationDefinition, OperationSpec, OperationType, etc.
registry.ts # OperationRegistry: register, execute, get, list
registry.ts # OperationRegistry: registerSpec, registerHandler, execute, get, list
validation.ts # assertIsSchema, validateOrThrow, collectErrors, formatValueErrors
call.ts # PendingRequestMap, buildCallHandler, CallEventMap, event types
subscribe.ts # subscribe(): direct AsyncGenerator execution

View File

@@ -1,6 +1,6 @@
---
status: draft
last_updated: 2026-04-30
last_updated: 2026-05-09
---
# Call Protocol
@@ -103,6 +103,7 @@ const callMap = new PendingRequestMap(eventTarget?)
- Creates an internal `PubSub<CallPubSubMap>` using `createPubSub`
- If `eventTarget` is provided, passes it to `createPubSub` for transport-level event routing (Redis, WebSocket, etc.)
- Wires subscription handlers for `call.responded`, `call.error`, and `call.aborted` to route events back to waiting callers
- Subscriptions use empty-string id (`subscribe("call.responded", "")`) to receive all events of each type. Events are unwrapped from `EventEnvelope` via `.payload`
### `call(operationId, input, options?)`
@@ -158,13 +159,15 @@ type CallHandler = (event: CallRequestedEvent) => Promise<void>
### Handler Flow
1. Look up operation by `operationId` from the registry
1. Look up spec by `operationId` from the registry via `getSpec()`
2. If not found, throw `CallError(OPERATION_NOT_FOUND, ...)`
3. Check access control (see below)
4. Validate input with `validateOrThrow`
5. Execute operation handler
6. On success: the handler is expected to have published `call.responded` through whatever mechanism
7. On failure: `mapError` converts the thrown value to `CallError`
3. Look up handler by `operationId` via `getHandler()`
4. If not found, throw `CallError(OPERATION_NOT_FOUND, "No handler registered for operation: ...")`
5. Check access control (see below)
6. Validate input with `validateOrThrow`
7. Execute operation handler
8. On success: the handler is expected to have published `call.responded` through whatever mechanism
9. On failure: `mapError` converts the thrown value to `CallError`
The `CallHandler` is designed to be wired into a pubsub subscription:
@@ -280,4 +283,13 @@ async function* subscribe(
Gets the operation from the registry, casts its handler to `AsyncGenerator`, and yields values. Properly cleans up with `generator.return()` in a `finally` block.
Use `subscribe()` for in-process consumption. Use `PendingRequestMap.call()` for cross-transport invocation that resolves after one event. For cross-transport streaming, use `PendingRequestMap.subscribe()` to yield multiple events.
Use `subscribe()` for in-process consumption. Use `PendingRequestMap.call()` for cross-transport invocation that resolves after one event. For cross-transport streaming, use `PendingRequestMap.subscribe()` to yield multiple events.
### Handler Separation
The `subscribe()` function looks up both spec and handler separately from the registry:
1. `registry.getSpec(operationId)` — throws if spec not found
2. `registry.getHandler(operationId)` — throws if handler not found
This allows spec-only registration for scenarios where handlers are provided separately (e.g., ujsx host interpretation, dynamic handler injection).