# Research: `@alkdev/operations` Package Extraction > **Status: COMPLETED** — This extraction is done. The `@alkdev/operations` package (v0.1.0) is published on npm and includes all functionality described here plus the call protocol (PendingRequestMap, ResponseEnvelope, access control, SchemaAdapter). See `docs/reviews/core-library-extraction-sync-2026-05-18.md` for the migration impact analysis. ## Goal Extract `packages/core/operations/` and `packages/core/mcp/` into a standalone `@alkdev/operations` package that includes the call protocol (PendingRequestMap, CallHandler, call event types). The call protocol is not a separate module — `call ≡ subscribe` at the protocol level, so it belongs in the operations package. MCP is an operations adapter, not a separate concern. ## Current State ### Source: `packages/core/operations/` | File | Lines | Key Exports | Dependencies | |------|-------|-------------|--------------| | `types.ts` | 212 | `OperationType`, `Identity`, `OperationEnv`, `OperationContext` (TypeBox + type), `ErrorDefinition`, `AccessControl`, `OperationHandler`, `SubscriptionHandler`, `OperationDefinition` (TypeBox schema), `OperationSpec`, `IOperationDefinition`, `OperationSpecSchema` | `@alkdev/typebox` | | `registry.ts` | 82 | `OperationRegistry` (register, get, list, execute, getSpec, getAllSpecs) | `@alkdev/typebox/value`, `../logger/mod.ts`, `./validation.ts`, `./types.ts` | | `validation.ts` | 115 | `assertIsSchema`, `validateOrThrow`, `collectErrors`, `formatValueErrors` | `@alkdev/typebox`, `@alkdev/typebox/value`, `@std/assert` | | `env.ts` | 83 | `buildEnv`, `EnvOptions`, `PendingRequestMap` (interface only) | `./types.ts`, `./registry.ts`, `../logger/mod.ts` | | `scanner.ts` | 89 | `scanOperations`, `OperationManifest` | `@std/path`, `./types.ts`, `./validation.ts`, `../logger/mod.ts`, `Deno.readDir`, `Deno.cwd` | | `from_schema.ts` | 115 | `FromSchema` (JSON Schema → TypeBox converter) | `@alkdev/typebox` | | `from_openapi.ts` | 333 | `FromOpenAPI`, `FromOpenAPIFile`, `FromOpenAPIUrl`, `OpenAPISpec`, `HTTPServiceConfig` | `@alkdev/typebox`, `./from_schema.ts`, `./types.ts`, `Deno.env.get` | ### Source: `packages/core/mcp/` | File | Lines | Key Exports | Dependencies | |------|-------|-------------|--------------| | `wrapper.ts` | 88 | `createMCPClient`, `closeMCPClient`, `MCPClientWrapper` | `@modelcontextprotocol/sdk`, `./../operations/mod.ts`, `./../logger/mod.ts`, `@alkdev/typebox` | | `loader.ts` | 59 | `MCPClientLoader` | `./wrapper.ts`, `./../operations/mod.ts`, `./../logger/mod.ts` | | `mod.ts` | 2 | Re-exports | `./wrapper.ts`, `./loader.ts` | ### Test Coverage | Test File | Tests | What it covers | |-----------|-------|---------------| | `tests/operations/registry.test.ts` | 7 | Registry CRUD, execute, getSpec, buildEnv direct mode, namespace filtering | | `tests/operations/scanner.test.ts` | 3 | Directory scanning, empty directory, validation of scanned operations | | No tests for | — | `from_schema.ts`, `from_openapi.ts`, `from_mcp` (wrapper/loader), `validation.ts` edge cases, subscription operations, call protocol mode | ### Cross-Module Dependencies (Must Be Decoupled) | Dependency | Used In | Current Import | Extraction Strategy | |-----------|---------|---------------|---------------------| | Logger | `registry.ts`, `env.ts`, `scanner.ts` | `../logger/mod.ts` | Use `@logtape/logtape` directly (`import { getLogger } from "@logtape/logtape"`). Delete the wrapper. Configure sinks at the application level (hub/spoke entry point). | | `Deno.env.get()` | `from_openapi.ts` line 67 | `Deno.env.get("BEARER_TOKEN")` | Inject auth resolution via `HTTPServiceConfig.auth.resolveToken?(): Promise` or make the caller pass the token explicitly. | | `Deno.readDir()`, `Deno.cwd()` | `scanner.ts` | Filesystem discovery | Accept as injectable dependency: `scanOperations(dirPath, { readDir?, cwd? })`, or document as Deno-specific and provide a Node-compatible alternative (e.g., `fs.readdir`). | | MCP ↔ Operations | `mcp/wrapper.ts` | `../operations/mod.ts` | MCP stays in the same package. It's an adapter that wraps MCP tools as operations. | | MCP ↔ Logger | `mcp/wrapper.ts`, `mcp/loader.ts` | `../logger/mod.ts` | Same as operations: use logtape directly. | ## What Must Be Built (Not Yet in Code) The call protocol is a **core part of operations**, not a separate package. It must be implemented for the system to work correctly, especially for subscriptions. ### 1. Call Event Types (`CallEventMap`) Defined in `call-graph.md` but not implemented. These are TypeBox schemas: ```ts call.requested → { requestId, operationId, input, parentRequestId?, deadline?, identity? } call.responded → { requestId, output } call.aborted → { requestId } call.error → { requestId, code, message, details? } ``` ### 2. PendingRequestMap The current `env.ts` has only the `PendingRequestMap` interface (3 methods). The full class must: - Hold `Map` for in-flight requests - Take `PubSubConfig` on construction - Auto-wire subscriptions to route `call.responded`/`call.aborted`/`call.error` back to waiting callers - `call(operationId, input, options?) => Promise` — publishes `call.requested`, resolves on `call.responded` - `subscribe() => AsyncIterable` — for subscription consumption (stays open, yields events until `call.aborted` or `call.error`) - Deadline timeout support — auto-abort on timeout This is the **key missing piece** that makes subscriptions work. Without it, `buildEnv` can't route calls through the event system, and there's no way to consume subscription operations. ### 3. CallHandler `buildCallHandler(registry, eventTarget)` that: - Subscribes to `call.requested` events - Checks `AccessControl` against `Identity` - Executes via `registry.execute()` on success - Dispatches `call.responded` on success, `call.error` on failure - Uses `mapError` against `errorSchemas` for domain error matching ### 4. Subscription Support Currently broken/incomplete: - `OperationType.SUBSCRIPTION` is defined but `registry.execute()` treats it the same as QUERY/MUTATION - `SubscriptionHandler` type exists (returns `AsyncGenerator`) but no execution path handles it - `buildEnv` explicitly filters out SUBSCRIPTION operations — there's no `subscribe()` equivalent - `OperationContext.pubsub` is typed as `unknown` - `OperationContext.stream` is defined but never populated The fix: `call ≡ subscribe` means: - `call` = publish `call.requested`, resolve `Promise` on first `call.responded` - `subscribe` = publish `call.requested`, yield `AsyncIterable` of `call.responded` events until `call.aborted` - Same event types, same `PendingRequestMap`, different consumption pattern ### 5. Error Model `mapError` function and `CallError` codes (OPERATION_NOT_FOUND, ACCESS_DENIED, VALIDATION_ERROR, TIMEOUT, ABORTED, EXECUTION_ERROR, UNKNOWN_ERROR) are spec'd but not implemented. Used by `CallHandler` to produce structured errors. ### 6. SSE Handler Fix for FromOpenAPI `from_openapi.ts` detects SSE endpoints but doesn't generate async generator handlers. The handler needs to stream SSE events for SUBSCRIPTION operations instead of doing a one-shot fetch. ## Proposed Package Structure ``` @alkdev/operations/ src/ index.ts # Barrel: re-exports all public API # Core (always included) types.ts # OperationType, IOperationDefinition, OperationContext, etc. registry.ts # OperationRegistry class validation.ts # assertIsSchema, validateOrThrow, collectErrors env.ts # buildEnv, PendingRequestMap (interface + full class), CallHandler call-events.ts # CallEventMap TypeBox schemas, error codes error-map.ts # mapError function, CallError type, infrastructure error codes # Adapters (tree-shakeable, peer deps isolated) from_schema.ts # JSON Schema → TypeBox converter (peer: @alkdev/typebox) from_openapi.ts # OpenAPI spec → operations (peer: none beyond core) from_mcp.ts # MCP tools → operations (peer: @modelcontextprotocol/sdk) scanner.ts # Local TS file discovery (peer: Deno runtime OR injected fs) # Subscription support subscribe.ts # subscribe() for SUBSCRIPTION operations, AsyncIterable handling tests/ registry.test.ts # Existing + subscription tests call-protocol.test.ts # PendingRequestMap, CallHandler, call/respond/abort flow from_schema.test.ts # JSON Schema conversion from_openapi.test.ts # OpenAPI spec handling from_mcp.test.ts # MCP client wrapper/loader subscribe.test.ts # AsyncIterable subscription flow env.test.ts # buildEnv with callMap, namespace filtering, subscription filtering package.json tsconfig.json ``` ### Adapter Peer Dependencies (following typemap pattern) | Adapter Module | Peer Dependencies | Notes | |---------------|------------------|-------| | `from_schema.ts` | `@alkdev/typebox` (already a core dep) | No extra peer | | `from_openapi.ts` | None beyond core | Auth token resolution injected (no `Deno.env`) | | `from_mcp.ts` | `@modelcontextprotocol/sdk` | Only loaded when you import `from_mcp`. Tree-shakeable. | | `scanner.ts` | `@std/path` (or inject fs) | Deno runtime for `Deno.readDir`. Could accept injected `readDir` + `import` functions for Node compat. | ### Dependencies | Dependency | Type | Notes | |-----------|------|-------| | `@alkdev/typebox` | direct | Core schema engine. Used everywhere. | | `@alkdev/typebox/value` | direct | `Value.Check`, `Value.Errors`, `Value.Hash` for validation. | | `@alkdev/pubsub` | direct | `createPubSub`, `TypedEventTarget` for call protocol event routing. `PendingRequestMap` depends on this. | | `@logtape/logtape` | direct | Replace `../logger/mod.ts` wrapper with direct `import { getLogger } from "@logtape/logtape"`. Zero-dep logger, consistent across packages. | | `@std/assert` | direct | Used in `validation.ts` for `assertIsSchema`. | | `@std/path` | peer | Used by `scanner.ts` for path resolution. | | `@modelcontextprotocol/sdk` | peer | Only imported by `from_mcp.ts`. Tree-shakeable. | | `graphology` | direct (future) | For call graph and operation graph. Not yet in deno.json. Needed for call graph tracking. | ### Logger Strategy The current `packages/core/logger/mod.ts` is 27 lines — just `configure()` and `getLogger()` wrapping logtape. For the extracted package: **Option A: Direct logtape import** (recommended) - Each module does `import { getLogger } from "@logtape/logtape"` - `configure()` stays in the application entry point (hub/spoke) - Zero duplication, zero coupling - logtape is already a direct dependency, not going through a wrapper **Option B: `@alkdev/logger` package** - Create a tiny shared logger config package - Adds a package dependency for 27 lines - Only justified if the config pattern is complex enough to warrant sharing logtape's `getLogger("category")` is the same pattern used in the current wrapper. Option A is effectively what we're already doing, minus the unnecessary indirection of `../logger/mod.ts`. ## The Call ≡ Subscribe Contract This is the central design decision for the package. Here's how it works in detail: ### Current State (Broken) - `OperationType.SUBSCRIPTION` exists as a type but `registry.execute()` calls `handler()` generically - `buildEnv` filters out SUBSCRIPTION operations with no alternative - No `subscribe()` method anywhere - `OperationContext.pubsub` is `unknown` - `PendingRequestMap` is just an interface with `call()` ### Target State Same event types for both calls and subscriptions: ``` QUERY/MUTATION: caller → call.requested → [event system] → call.responded → caller (resolve Promise) SUBSCRIPTION: caller → call.requested → [event system] → call.responded → caller (yield first) → call.responded → caller (yield next) → call.responded → caller (yield next) → call.aborted → caller (done) ``` `PendingRequestMap` handles both: - `call()` returns `Promise` — subscribes to `call.responded:{requestId}`, resolves on first event, unsubscribes - `subscribe()` returns `AsyncIterable` — subscribes to `call.responded:{requestId}`, yields each event, stays open until `call.aborted` `buildEnv` gets extended: - Direct mode: `registry.execute()` for QUERY/MUTATION, `registry.subscribe()` for SUBSCRIPTION - Call protocol mode: `callMap.call()` for QUERY/MUTATION, `callMap.subscribe()` for SUBSCRIPTION The `OperationRegistry` needs a `subscribe()` method that: 1. Looks up the operation (must be SUBSCRIPTION type) 2. Creates an `AbortController` and passes it via `context.stream` 3. Populates `context.pubsub` with a scoped pubsub instance 4. Calls the `SubscriptionHandler` and returns the `AsyncGenerator` ## Migration Steps ### Phase 1: Decouple and set up package skeleton 1. **Create `@alkdev/operations` repo** (or directory in monorepo) 2. **Set up build pipeline** (tsup, package.json, tsconfig) — same pattern as `@alkdev/taskgraph` 3. **Replace logger wrapper** — `import { getLogger } from "@logtape/logtape"` directly 4. **Inject `Deno.env`** in `from_openapi.ts` — pass auth token explicitly or via resolver function 5. **Make scanner Deno/Node agnostic** — accept injected `readDir` and `importModule` functions, with Deno defaults 6. **Move MCP module** from `core/mcp/` to `src/from_mcp.ts` — it's an operations adapter, same package 7. **Add `@alkdev/pubsub` as dependency** — needed for `PendingRequestMap` implementation 8. **Write missing tests**: `from_schema`, `from_openapi`, `from_mcp` ### Phase 2: Implement call protocol (the missing core) 9. **Implement `CallEventMap`** as TypeBox schemas in `call-events.ts` 10. **Implement `PendingRequestMap` class** in `env.ts` (replacing the interface): - Constructor takes `PubSubConfig` - Auto-wires subscriptions for `call.responded`, `call.aborted`, `call.error` - `call()` returns Promise, resolves on first response - `subscribe()` returns AsyncIterable, yields each response until abort/error - Deadline timeout support 11. **Implement `CallHandler`** — subscribes to `call.requested`, validates access, executes, dispatches response/error 12. **Implement `mapError`** — matches thrown errors against `errorSchemas`, falls back to infrastructure codes 13. **Implement `OperationRegistry.subscribe()`** — execute SUBSCRIPTION operations, return AsyncIterable via context.stream/context.pubsub 14. **Extend `buildEnv`** — add callMap mode for SUBSCRIPTION operations (callMap.subscribe instead of callMap.call) 15. **Write tests**: `call-protocol.test.ts`, `subscribe.test.ts` ### Phase 3: SSE handler and polish 16. **Fix `from_openapi.ts` SSE handler** — generate async generator for SUBSCRIPTION operations with SSE parsing 17. **Add `from_openapi.test.ts`** — OpenAPI spec conversion tests 18. **Publish v0.1.0 to npm** ### Phase 4: Integration back into alkhub_ts 19. **Replace** `packages/core/operations/` and `packages/core/mcp/` with `@alkdev/operations` dependency 20. **Update** `packages/core/deno.json` and `packages/core/mod.ts` to import from `@alkdev/operations` 21. **Update** hub and spoke to use `PendingRequestMap`, `CallHandler`, `buildEnv` from the package 22. **Implement hub-side WebSocket handling** — per-connection `WebSocketEventTarget` + `PendingRequestMap` per spoke ## Open Questions 1. **`buildEnv` API for subscriptions**: Should `buildEnv` return two objects (`{ call: OperationEnv, subscribe: SubscriptionEnv }`) or should it be a single env where SUBSCRIPTION operations have a different signature (returning `AsyncIterable` instead of `Promise`)? The latter keeps the env shape consistent but complicates typing. The former is more explicit. 2. **Scanner Deno/Node compatibility**: Should `scanner.ts` provide dual implementations (`scanOperations` for Deno with `Deno.readDir`, `scanOperationsNode` for Node with `fs.readdir`), or inject the filesystem dependency? Injection is cleaner but more verbose for the common case. 3. **Call graph storage (`graphology`)**: Should `@alkdev/operations` include call graph tracking (using `graphology`), or should that be a hub-level concern? The graph is populated as a side effect of the call protocol, but storage (Postgres) is a hub concern. Recommendation: graph tracking in operations, storage in hub. 4. **`@alkdev/pubsub` version coupling**: `PendingRequestMap` depends on `createPubSub` and `TypedEventTarget` from `@alkdev/pubsub`. Should operations pin to exact pubsub versions or use caret ranges? Since both are `@alkdev` packages we control, caret ranges should be fine, but breaking changes to the `TypedEventTarget` interface would cascade. 5. **`buildEnv` direct mode subscriptions**: In direct mode (no callMap), how do subscriptions work? The registry needs a `subscribe()` method that returns `AsyncIterable` for SUBSCRIPTION operations. This requires the registry to know about the subscription handler type. Currently `execute()` just calls `handler()` generically. 6. **Logger configuration**: logtape's `configure()` is async and sets up sinks. Should each `@alkdev` package just use `getLogger()` and trust that the application has called `configure()`, or should packages have a setup function? Recommendation: trust the application. logtape logs to a default sink if unconfigured.