--- status: draft last_updated: 2026-05-25 --- # Call Protocol, Call Graph & Operation Graph ## Overview The call protocol is the unified transport layer for all operation invocations. It provides a single event-based mechanism that works the same whether the call is local (in-process), remote (hub ↔ spoke over websocket), or streamed (subscription). The call graph and operation graph are built on top of it — and `@alkdev/flowgraph` provides the graph construction, analysis, and reactive execution primitives. Websockets are the primary transport for hub-spoke communication, not SSE. SSE is half-duplex and requires polling for the reverse path; websockets give us bidirectional channels where hub → spoke dispatch and spoke → hub results flow through the same connection. The call protocol's `call ≡ subscribe` semantics map naturally: a websocket frame comes in, the protocol resolves or streams depending on the consumption pattern. **Transport distinction**: WebSocket is the primary bidirectional transport for hub↔spoke and hub↔client-spoke communication. SSE support exists for compatibility (e.g., OpenAI proxy streams, legacy clients) but is not the preferred transport. A client (browser, CLI) that connects as a spoke gets full bidirectional communication over a single WebSocket — no SSE needed. ## call ≡ subscribe At the protocol level, `call` and `subscribe` are the same thing with different consumption patterns: - **`call`**: Publish `call.requested`, subscribe to `call.responded:{requestId}`, resolve on first response → `Promise` - **`subscribe`**: Publish `call.requested`, subscribe to `call.responded:{requestId}`, yield each response → `AsyncIterable` Both use the same event types, the same `requestId` correlation, and the same `PendingRequestMap`. The only difference is that `call` resolves after the first `call.responded` and unsubscribes, while `subscribe` stays open and yields each `call.responded` until `call.aborted` or `call.error`. This means `call` is semantically `subscribe().next()` — a subscription that completes after one event. **HTTP endpoint**: An HTTP `POST /api/{namespace}/{operation}` is just `call` over HTTP — publish a `call.requested`, wait for `call.responded`, return the output as JSON. **WebSocket endpoint**: A websocket connection carries bidirectional call protocol events. The hub pushes `call.requested` to spoke runners; runners push `call.responded`/`call.error` back. Same protocol, different transport. This is the hub-spoke "rpc-mode": persistent connection, no polling, natural streaming support. ## Why We Keep the Call Protocol (Not Just the Graphs) 1. **SDD process requires it** — the coordinator models development workflows between agents using the call graph. When the architect calls the decomposer which calls the coordinator which spawns implementation specialists, that's a call graph. The call protocol is what populates it automatically. 2. **Abort cascading** — when a parent operation fails or is aborted, all child operations should be notified. The call protocol propagates `call.aborted` through `parentRequestId` chains. Without it, each coordination operation handles errors ad-hoc (e.g., `coord.spawn` chains 5 `registry.execute()` calls — if the 3rd fails, there's no structured abort of the first two or the pending 4th/5th). 3. **Observability** — seeing what operations called what, how long they took, what failed, is essential for debugging agent workflows. The call protocol auto-tracks calls via `PendingRequestMap`; the call graph is populated as a side effect. 4. **Unified error handling** — `mapError` + `InfrastructureErrors` + `errorSchemas` declaration gives structured, typed errors across all transports. Without it, each consumer invents its own error format. 5. **Transport flexibility** — the `TypedEventTarget` plug point means the same protocol works over in-process `EventTarget`, Redis channels, or websockets. The hub uses all three: in-process for local operations, Redis for cross-process events, websockets for spoke runner dispatch. 6. **Future Rust rewrite** — the API contract needs to be stable. The call protocol is a small, well-defined event contract. Building it now means the Rust rewrite has a spec to implement against. ## Call Event Types All communication flows through typed events. The call event TypeBox schemas are defined in `@alkdev/operations` as `CallEventSchema` (flat dot-notation keys like `"call.requested"`). The shapes below match the library's actual event types. ```ts import { Type } from "@alkdev/typebox" // CallEventSchema uses flat dot-notation keys (not nested objects): // "call.requested", "call.responded", "call.completed", "call.aborted", "call.error" // The shapes below show the event payload for each type. // call.requested Type.Object({ type: Type.Literal("call.requested"), requestId: Type.String(), operationId: Type.String(), input: Type.Unknown(), timestamp: Type.String(), // ISO 8601 — used as source for startedAt/completedAt parentRequestId: Type.Optional(Type.String()), identity: Type.Optional(Type.Object({ id: Type.String(), scopes: Type.Array(Type.String()), resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String()))) })), startedAt: Type.Optional(Type.String()), // ISO 8601 — if provided, overrides timestamp for startedAt }) // call.responded Type.Object({ type: Type.Literal("call.responded"), requestId: Type.String(), output: Type.Unknown(), // ResponseEnvelope from @alkdev/operations timestamp: Type.String(), // ISO 8601 }) // call.completed Type.Object({ type: Type.Literal("call.completed"), requestId: Type.String(), output: Type.Optional(Type.Unknown()), // Optional final output timestamp: Type.String(), // ISO 8601 }) // call.aborted Type.Object({ type: Type.Literal("call.aborted"), requestId: Type.String(), timestamp: Type.String(), // ISO 8601 }) // call.error Type.Object({ type: Type.Literal("call.error"), requestId: Type.String(), error: Type.Object({ // Error is nested under "error" key code: Type.String(), message: Type.String(), details: Type.Optional(Type.Unknown()), }), timestamp: Type.String(), // ISO 8601 }) ``` **Note on `deadline`**: The `call.requested` event above does not include a `deadline` field. Deadlines are a `PendingRequestMap` concept (timeouts are applied at the protocol layer, not persisted in the call graph). If a call times out, the `PendingRequestMap` emits a `call.error` with code `TIMEOUT`. **Note: no `call.running` event**: The library's `CallEventMapValue` union only has 5 event types. There is no `call.running` event. When the hub dispatches an operation handler, it calls `flowGraph.updateStatus(requestId, "running", { startedAt: now.toISOString() })` directly. This is a hub-initiated state transition, not an event. See [Write Path](#write-path) for details. ### Event Semantics - **`call.requested`** — Initiates a call. Creates a call graph node (status: `pending`) and adds a `triggered` edge if `parentRequestId` is present. If `startedAt` is provided in the event, it overrides `timestamp` for the node's `startedAt`. - **`call.responded`** — Carries the call result. For one-shot calls, this is the terminal event that resolves the `Promise`. The `output` field contains a `ResponseEnvelope` (with `data` and `meta` fields) from `@alkdev/operations`. Sets `status: "completed"` and `completedAt: event.timestamp`. - **`call.completed`** — Terminal completion signal, idempotent if `call.responded` was already received. For subscriptions, fires after the last `call.responded` to signal stream end. May include optional `output`. Sets `completedAt: event.timestamp` if not already set. - **`call.aborted`** — Call was cancelled. Sets status to `aborted` and `completedAt: event.timestamp`. **Note**: flowgraph's `updateFromEvent()` does NOT cascade aborts to children — the hub's `CallHandler` is responsible for cascading `call.aborted` to descendant calls via the pubsub layer. - **`call.error`** — Call failed with an error. Sets status to `failed`, stores the error (nested under `error: { code, message, details? }`), and sets `completedAt: event.timestamp`. **Note on `@alkdev/flowgraph`**: The `CallEventMapValue` type in `@alkdev/flowgraph/schema` defines the union of these event types. Flowgraph's `FlowGraph.fromCallEvents()` and `updateFromEvent()` consume these events directly to populate the call graph. The `CallStatus` enum in flowgraph (`pending`, `running`, `completed`, `failed`, `aborted`) aligns with the statuses in the call protocol events. **Note on ResponseEnvelope unwrapping**: The `call.responded` event carries `output` as a `ResponseEnvelope` (from `@alkdev/operations`). When feeding events to `@alkdev/flowgraph`, the hub **unwraps the envelope** before calling `updateFromEvent()` — `CallNodeAttrs.output` stores the `ResponseEnvelope.data` value (the actual result), not the full envelope. The `ResponseEnvelope.meta` is discarded at the call graph level (it's available in `PendingRequestMap` for the caller, but not persisted in the graph node). This means `call_graph_nodes.output` contains the unwrapped result data. **Identity**: The `Identity` type represents the caller's security context. Derived from keypal's `ApiKeyMetadata` — `scopes` maps directly from keypal's global scopes, `resources` maps from keypal's resource-scoped permissions (key format: `"type:id"`, value: scope array). Passed through the call chain and checked by `CallHandler` against the operation's `AccessControl` definition. See operations.md for the `AccessControl` type. **Request correlation**: Every call has a unique `requestId`. Nested calls include `parentRequestId` to track the call chain. Responses and errors are matched to requests by `requestId`. ## Error Model The call protocol uses a **unified error model**: both infrastructure (protocol-level) and domain (operation-level) errors flow through the same `CallError` event. `CallError.code` is `string` — the distinction between infrastructure and domain codes is by convention, not by type. ### Infrastructure Error Codes Reserved codes produced by `CallHandler` itself, before or after operation execution: | Code | When | Schema | |------|------|--------| | `OPERATION_NOT_FOUND` | No operation matches `operationId` | `{ operationId: string }` | | `ACCESS_DENIED` | Missing scopes | `{ requiredScopes?: string[] }` | | `VALIDATION_ERROR` | Input fails `inputSchema` check | `{ errors: ValueError[] }` | | `TIMEOUT` | Deadline exceeded | `{ deadline: number }` | | `ABORTED` | Call cancelled | `{ reason?: string }` | | `EXECUTION_ERROR` | Handler threw, no `errorSchemas` match | `{ message: string }` | | `UNKNOWN_ERROR` | Non-Error thrown | `{ raw: string }` | ### Domain Error Propagation Operations declare their possible errors via `errorSchemas` on `IOperationDefinition`. When a handler throws, `mapError` matches the thrown error against the declared schemas — falls back to `EXECUTION_ERROR` if no match. **`errorSchemas` is the contract**: An operation's `errorSchemas` declaration is the contract between the operation and its callers about what errors it might produce. No `errorSchemas` = safe default with `EXECUTION_ERROR` wrapper. ## PendingRequestMap Manages in-flight requests and provides the `call()` interface: ```ts // From @alkdev/operations import { PendingRequestMap } from "@alkdev/operations" // Construction — takes optional EventTarget for pluggable transport (positional, not options object) const prm = new PendingRequestMap(eventTarget?) // Call protocol — call() returns Promise const envelope = await prm.call(operationId, input, { deadline, identity }) // envelope.data contains the result, envelope.meta contains source + timestamp // Subscribe protocol — returns AsyncIterable const stream = prm.subscribe(operationId, input, { idleTimeout, identity }) for await (const envelope of stream) { // yield each response } // Resolving calls prm.respond(requestId, output) // output must be ResponseEnvelope prm.emitError(requestId, code, message, details?) prm.complete(requestId) prm.abort(requestId) ``` **Key behaviors**: - `call()` returns `Promise` (not `Promise`) - `subscribe()` returns `AsyncIterable` - `respond()` requires `isResponseEnvelope(output)` - Built-in deadline and idle timeout support - Constructor takes optional `EventTarget` for pluggable transport (positional parameter, not options object) ## CallHandler Bridges pubsub events to `OperationRegistry.execute()`. Performs access control and error mapping: ```ts import { buildCallHandler } from "@alkdev/operations" const handler = buildCallHandler({ registry, callMap }) // callMap: PendingRequestMap instance (not raw EventTarget) // subscribes to call.requested events // checks access control (requiredScopes, resource permissions) against Identity // executes via registry, dispatches call.responded on success // maps errors via mapError, dispatches call.error ``` ## Nested Call Wiring Routing is an **env construction concern**, not a separate protocol layer. `buildEnv` is the single function that creates the `env`: - **Direct mode**: `buildEnv({ registry, context })` → env functions call `registry.execute()` directly, return `Promise` - **Call protocol mode**: `PendingRequestMap` handles routing internally, `parentRequestId` is set via context `buildEnv` no longer takes a `callMap` parameter. It sets `trusted: true` on nested context (bypasses access control for internal calls). Env functions return `Promise`, not `Promise`. Callers must use `unwrap(envelope)` or access `envelope.data` for the result. **parentRequestId propagation**: Every nested call includes `parentRequestId` — enables call graph reconstruction and abort cascading. ## Operation Graph (Static) Built once at startup from the `OperationRegistry`. Represents type-compatibility edges between operations. Implemented using `@alkdev/flowgraph`. ### Structure ``` Node = OperationNodeAttrs (namespace.name, type, inputSchema, outputSchema) Edge = OperationEdgeAttrs (compatible: boolean, detail?, mismatches?) Edge type = "typed" (from flowgraph EdgeType enum) ``` The operation graph is constructed via `FlowGraph.fromSpecs(specs)`, which takes an array of `OperationSpec` objects (derived from `OperationRegistry`) and: 1. Creates a node for each operation with `OperationNodeAttrs` attributes 2. Runs `buildTypeEdges(graph)` to create edges between operations whose output/input schemas are type-compatible 3. Throws `CycleError` if the resulting graph has cycles (DAG invariant) ### Type Compatibility `typeCompat(outputSchema, inputSchema)` performs deep structural comparison of two TypeBox schemas. Returns: - `{ compatible: true }` — output is a subtype of input - `{ compatible: true, detail }` — compatible with notes (e.g., "output has extra fields") - `{ compatible: false, mismatches: TypeMismatch[] }` — structural incompatibility - `undefined` — one or both schemas are `unknown`/`any` (no meaningful check possible) Edges where `compatible: false` are still added to the graph (with `compatible: false` and the mismatch details) so the graph is complete for observability, but the `compatible` attribute allows consumers to filter. ### Call Templates for SDD The SDD process defines a natural workflow: ``` architect → architecture-reviewer → decomposer → coordinator → implementation-specialist → code-reviewer ``` This is a call template — a validated path through the operation graph that the coordinator can instantiate as a call graph at runtime. **Current approach**: Hardcoded workflow sequences. See "What We Defer" below. **Future approach**: `@alkdev/flowgraph` provides ujsx workflow composition components (`Operation`, `Sequential`, `Parallel`, `Conditional`, `Map`) that can define templates declaratively. The `GraphologyHostConfig` renders templates to a `DirectedGraph` for validation, and `ReactiveHostConfig` renders them to reactive `WorkflowNode` trees for execution. When we adopt template-based workflows, flowgraph provides the validation (`validateTemplate`), type-compatibility checking, and DAG enforcement out of the box. ### API Summary ```ts import { FlowGraph } from "@alkdev/flowgraph/graph" import { typeCompat, buildTypeEdges, topologicalOrder, validateGraph } from "@alkdev/flowgraph/analysis" // Build operation graph from registered operations. // Note: @alkdev/flowgraph's OperationSpec is a structural subset of // @alkdev/operations' OperationSpec (it omits handler, accessControl, errorSchemas). // The .map() transforms between the two types. const opGraph = FlowGraph.fromSpecs(registry.list().map(spec => ({ name: spec.name, namespace: spec.namespace, version: spec.version, type: spec.type, // "query" | "mutation" | "subscription" inputSchema: spec.inputSchema, outputSchema: spec.outputSchema, description: spec.description, }))) // Query const compatResult = typeCompat(opA.outputSchema, opB.inputSchema) const order = topologicalOrder(opGraph.graph) const issues = validateGraph(opGraph.graph) // Serialization const data = opGraph.export() // -> OperationGraphSerialized (graphology JSON format) const restored = FlowGraph.fromJSON(data) // validates schema + DAG invariant ``` ## Call Graph (Dynamic) Created at runtime for each workflow execution. Populated automatically by the call protocol — every `call.requested` adds a node, every `call.responded`/`call.error`/`call.aborted` updates its state and timestamp. Implemented using `@alkdev/flowgraph`. ### Structure ``` Node = CallNodeAttrs (requestId, operationId, status, input, output?, error?, identity?, parentRequestId?, startedAt?, completedAt?) Edge type "triggered" = execution hierarchy (parentRequestId → child call) Edge type "depends_on" = data dependency (call A waits on call B's output) ``` > **EdgeType scoping**: `@alkdev/flowgraph` defines five edge types in its `EdgeType` enum: `triggered`, `depends_on`, `typed`, `sequential`, `conditional`. Not all apply to every graph type: > - **Call graph**: `triggered` and `depends_on` (plus the storage-layer `requested_by`) > - **Operation graph**: `typed` (type compatibility between operations) > - **Template graph**: `sequential` and `conditional` (workflow composition via ujsx) > > This document focuses on call graph edge types. See the [flowgraph architecture docs](https://git.alk.dev/alkdev/flowgraph) for the full type definitions. The call graph is populated by `FlowGraph.fromCallEvents(events)` or incrementally via `updateFromEvent(event)`. Each call protocol event maps directly to a graph mutation: | Event | Graph Mutation | |-------|---------------| | `call.requested` | `addCall(attrs)` — creates node (status: `pending`) + `triggered` edge if `parentRequestId` present | | `call.responded` | `updateFromEvent()` sets `status: "completed"`, `output`, `completedAt: event.timestamp` (idempotent for terminal states) | | `call.completed` | Sets `completedAt: event.timestamp` if not already set. Idempotent — if already `completed`, no-op. May also set `output` if present in event. | | `call.error` | `updateFromEvent()` sets `status: "failed"`, `error: { code, message, details? }`, `completedAt: event.timestamp` | | `call.aborted` | `updateFromEvent()` sets `status: "aborted"`, `completedAt: event.timestamp`. **No cascade**: `updateFromEvent()` does not abort children — the hub's `CallHandler` handles cascading. | | _(hub-initiated)_ | `updateStatus(requestId, "running", { startedAt: now.toISOString() })` — **Not an event.** The hub's `CallHandler` calls `updateStatus()` directly when it dispatches the operation handler, transitioning `pending` → `running`. | ### Call Status State Machine Flowgraph enforces valid status transitions via `updateStatus()`. The state machine is: ``` pending → running → completed → failed → aborted running → aborted ``` Terminal states (`completed`, `failed`, `aborted`) are immutable. `InvalidTransitionError` is thrown on invalid transitions. This matches the storage layer's `call_graph_nodes.status` enum. ### Abort Cascading When a call is aborted, all of its children should also be aborted. Flowgraph provides two mechanisms: 1. **`triggered` edge traversal**: `children(requestId)` returns direct children via `triggered` edges. Full cascading uses `descendants(requestId)` for all descendants. 2. **`WorkflowReactiveRoot`**: For running workflow executions, the reactive engine provides `abortNode(nodeId)` and `abortAll()` with `FailurePolicy` configuration (`"continue-running"` vs `"abort-dependents"`). The hub's `CallHandler` wires `call.aborted` events to: - `updateStatus(requestId, "aborted")` on the call graph - Pubsub event propagation so downstream `PendingRequestMap` instances also call `abort()` on their in-flight requests - `WorkflowReactiveRoot.abortNode(nodeId)` if a workflow execution is tracking this call ### `depends_on` Edges While `triggered` edges represent the parent-child execution hierarchy, `depends_on` edges represent data dependencies — a call that needs another call's output before it can proceed. These are created by the coordinator when orchestrating workflows: ```ts callGraph.addDependency(sourceRequestId, targetRequestId) // Adds a "depends_on" edge (source depends on target's output) // Cycle-checked — throws CycleError if the edge would create a cycle ``` `depends_on` edges are not created by the call protocol itself. They are added by coordination logic that knows the data flow between calls (e.g., the coordinator knows that `coord.spawn` step 3 depends on step 1's output). This gives the observability layer a richer graph for analysis without changing the protocol. ### API Summary ```ts import { FlowGraph } from "@alkdev/flowgraph/graph" import { CallStatus } from "@alkdev/flowgraph/schema" // Build call graph from events (e.g., after hub restart, reconstruct from DB) const callGraph = FlowGraph.fromCallEvents(storedEvents) // Or build incrementally as events arrive const callGraph = new FlowGraph(CallNodeAttrs, CallEdgeAttrs) // Process events callGraph.updateFromEvent(event) // handles all 5 call.* event types // Status management callGraph.updateStatus(requestId, "running", { startedAt: now.toISOString() }) // hub-initiated, not event-driven callGraph.updateStatus(requestId, "completed") // validates state machine transition // Call management callGraph.addCall({ requestId, operationId, status: "pending", parentRequestId?, input?, identity?, startedAt? }) callGraph.updateCall(requestId, attrs) // partial update of any CallNodeAttrs callGraph.removeCall(requestId) callGraph.addDependency(sourceRequestId, targetRequestId) // depends_on edge // Queries callGraph.children(requestId) // direct children via triggered edges callGraph.descendants(nodeId) // all descendants callGraph.lineage(requestId) // ancestor chain from root to this call callGraph.getRoots() // calls with no parentRequestId callGraph.filterByStatus("running") // all running calls callGraph.duration(requestId) // completedAt - startedAt in ms // Escape hatch callGraph.graph // raw graphology DirectedGraph // Serialization (for Postgres persistence) const data = callGraph.export() // -> CallGraphSerialized const restored = FlowGraph.fromJSON(data) ``` ### Graph Size At hub level, the call graph is small — just metadata nodes mapping to the actual process/call. Agent workflow call graphs will have tens of nodes at most for simple workflows, potentially hundreds for complex coordination with many parallel tasks. Performance is a non-issue for call-level metadata; flowgraph wraps graphology which handles thousands of nodes efficiently. ## Reactive Workflow Execution For running workflow executions (not just observability), `@alkdev/flowgraph/reactive` provides `WorkflowReactiveRoot` — a signal-driven execution engine that: 1. Takes a `DirectedGraph` (from a ujsx template rendered via `GraphologyHostConfig`) and creates reactive state for every node 2. Processes call protocol events via `append(event)` — the event log is the source of truth, status/results are derived projections 3. Computes `preconditions` (all predecessors completed), `canStart` (preconditions met + not blocked by failure), and `blockedByFailure` (any predecessor failed/aborted) as reactive signals 4. Supports `FailurePolicy`: `"continue-running"` (only abort idle/waiting dependents) or `"abort-dependents"` (cascade abort to all non-terminal dependents) 5. Maps node keys to `requestId`s via `setRequestId(nodeKey, requestId)` — bridging template nodes to call protocol identifiers 6. Requires `dispose()` to release signal subscriptions ```ts import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive" // WorkflowReactiveRoot takes the raw DirectedGraph (flowGraph.graph), // not the FlowGraph wrapper const workflowRoot = new WorkflowReactiveRoot(templateGraph.graph) try { // Bridge template nodes to call protocol request IDs workflowRoot.setRequestId("step-1", requestId1) workflowRoot.setRequestId("step-2", requestId2) // Process events (appended by the hub's call protocol handler) workflowRoot.append(callRequestedEvent) workflowRoot.append(callRespondedEvent) // Query reactive state const status = workflowRoot.getStatus("step-1") // NodeStatus const canStart = workflowRoot.canStart.get("step-2") // ReadonlySignal const isComplete = workflowRoot.isComplete() // all nodes terminal? } finally { workflowRoot.dispose() } ``` This is the execution engine for workflow-based coordination. The hub coordinator instantiates a `WorkflowReactiveRoot` for each running workflow, feeds it call protocol events, and uses its reactive state to determine what to do next (start the next step, handle failures, cascade aborts). ## Storage Call graph nodes and edges are stored in Postgres. See `storage/call-graph.md` for the full schema definitions. The storage layer persists individual `call_graph_nodes` and `call_graph_edges` rows. Flowgraph's `export()` produces graphology's native JSON format (`CallGraphSerialized`), which is suitable for snapshot/restore but not for incremental observability queries. The hub uses **both**: - **Incremental storage**: Each call protocol event writes/updates a row in `call_graph_nodes` and creates `call_graph_edges` as needed. This supports real-time observability queries (what's running, what failed, what's blocked). - **Reconstruction**: After a hub restart, the call graph can be reconstructed from stored events or from incremental rows using `FlowGraph.fromCallEvents()`. ### Write Path The hub's `CallHandler` is responsible for writing call graph data to Postgres. When a call protocol event arrives: 1. **`call.requested`**: The `CallHandler` creates a row in `call_graph_nodes` (status: `pending`) and, if `parentRequestId` is present, a `triggered` edge in `call_graph_edges`. This write happens **synchronously before dispatching** to ensure the call is tracked even if the handler fails immediately. 2. **Handler dispatch**: The `CallHandler` dispatches the operation handler. At this point it updates the node status to `running` and sets `startedAt` — this is **not** an event, but a hub-initiated `updateStatus()` call on both the in-memory flowgraph and the DB row. 3. **`call.responded`**: Updates the node's status to `completed`, sets `output` (unwrapped from the `ResponseEnvelope` — only `data` is stored, not `meta`), and sets `completedAt`. 4. **`call.error`**: Updates status to `failed`, sets `error`, and sets `completedAt`. 5. **`call.aborted`**: Updates status to `aborted` and sets `completedAt`. The hub's `CallHandler` then cascades the abort to child calls (this is hub logic, not flowgraph's `updateFromEvent()`). 6. **`call.completed`**: Sets `completedAt` if not already set. Idempotent — no-op if the call is already `completed`. Error handling: If a DB write fails, the call still proceeds (the handler has already been invoked). The hub logs the write failure and continues. Call graph data is best-effort — the in-memory flowgraph is the authoritative source for running calls; the DB is for persistence and observability. ### Identifier Mapping The `call_graph_nodes` table uses two identifiers: - **`id`** (UUID, from `commonCols`): Internal primary key, used as the FK target for `call_graph_edges`. - **`requestId`** (text, UNIQUE): Protocol-level correlation key, used as the flowgraph node key. When reconstructing a flowgraph from the database, the hub uses `requestId` as the node key (matching `CallNodeAttrs.requestId`). The `call_graph_edges` table uses `sourceId`/`targetId` referencing `call_graph_nodes.id` (the UUID), so reconstruction requires resolving UUIDs to requestIds. The `call_graph_nodes.requestId` column has a UNIQUE index, making this lookup efficient. ### `call_graph_nodes` — One row per call | Column | Type | Notes | |--------|------|-------| | commonCols | — | id, metadata, createdAt, updatedAt | | requestId | text NOT NULL UNIQUE | Protocol-level correlation key. Also the flowgraph node key. | | operationId | text NOT NULL | FK → operations.id (RESTRICT). NOT NULL — `CallNodeAttrs.operationId` is required in flowgraph. | | parentRequestId | text | Denormalized parent — fast point lookup. Redundant with `triggered` edge. | | identity | jsonb | Caller identity: `{ id, scopes, resources? }` | | callerAccountId | text | FK → accounts.id (ON DELETE SET NULL). System calls are nullable. | | status | text NOT NULL | Matches `CallStatus` enum: `pending`, `running`, `completed`, `failed`, `aborted` | | input | jsonb | Call input (redacted, truncated — see storage/call-graph.md) | | output | jsonb | Call output (on success) | | error | jsonb | `{ code, message, details? }` (on failure, nested under `error` key matching flowgraph) | | startedAt | timestamp with tz | When call was dispatched. **Type conversion**: flowgraph stores as ISO 8601 string; DB stores as `timestamptz`. | | completedAt | timestamp with tz | When call completed/failed/aborted. Same type conversion as `startedAt`. | ### `call_graph_edges` — Typed directed edges between calls | Column | Type | Notes | |--------|------|-------| | commonCols | — | id, metadata, createdAt, updatedAt | | sourceId | text NOT NULL | FK → call_graph_nodes.id (CASCADE) | | targetId | text NOT NULL | FK → call_graph_nodes.id (CASCADE) | | edgeType | text NOT NULL | `triggered`, `depends_on`, or `requested_by` | **Edge type semantics**: `triggered` = execution hierarchy (parentRequestId), `depends_on` = data dependency, `requested_by` = identity/authorization chain. See storage/call-graph.md for details. **Note on `depends_on` in flowgraph**: The flowgraph `CallEdgeAttrs` type is a union of `TriggeredEdgeAttrs` and `DependencyEdgeAttrs`, matching the `triggered` and `depends_on` edge types. The `requested_by` edge type is a storage-layer concept for identity tracing that doesn't have a corresponding flowgraph edge type — it's persisted in the database but not modeled in the in-memory graph. ## Transport Mapping The call protocol is transport-agnostic. The `TypedEventTarget` plug point (same pattern as `RedisEventTarget` in the pubsub design) determines how events move: | Transport | Use Case | `TypedEventTarget` impl | |-----------|----------|------------------------| | In-process | Local hub operations | Browser `EventTarget` (default) | | Redis | Cross-process events (e.g., hub → all processes) | `RedisEventTarget` | | WebSocket | Hub ↔ spoke bidirectional | `createWebSocketServerEventTarget` (hub) / `createWebSocketClientEventTarget` (spoke) from `@alkdev/pubsub` | A `WebSocketEventTarget` implementing `TypedEventTarget` makes each spoke runner's websocket connection a live bidirectional channel. The hub dispatches `call.requested` over the socket; the runner sends `call.responded`/`call.error` back. Same protocol, same event shapes, same `PendingRequestMap` — just a different `eventTarget`. ## What We Defer 1. **Full ujsx call templates** — currently using hardcoded workflow sequences. `@alkdev/flowgraph/component` provides `Operation`, `Sequential`, `Parallel`, `Conditional`, `Map` components for declarative template definition, and `GraphologyHostConfig` + `ReactiveHostConfig` for rendering. We'll adopt these when workflow complexity justifies it. 2. **Graph visualization** — API only, no Sigma.js UI 3. **Stream deduplication** — `Value.Hash({operationId, input})` deduplication for multiple subscribers to the same stream 4. **`requested_by` edge creation in flowgraph** — the `requested_by` edge type is a storage-layer concept for identity tracing. It's persisted in `call_graph_edges` but not modeled in `@alkdev/flowgraph`'s `CallEdgeAttrs` union. We may add it to flowgraph in the future. The call protocol itself, `PendingRequestMap`, `CallHandler`, `buildEnv` dual-mode, call graph auto-tracking, and reactive workflow execution are **in the initial implementation**. They're not much code and they prevent the need to bolt on ad-hoc error handling and abort logic in every coordination operation. ## Open Questions 1. **Operation deletion and call graph referential integrity**: The `call_graph_nodes.operationId` column has a RESTRICT FK to `operations.id`. An operation cannot be deleted while any call records reference it. For v1, the strategy is to deny removal while call records exist. If operation removal becomes necessary (e.g., cleanup of old operations), the hub would need to either: (a) reassign all referencing call records to a sentinel `__removed__` operation (pre-seeded in migrations with `id='__removed__'`, `namespace='system'`), then delete the original operation, or (b) accept that historical call records reference operations that may have been provided by disconnected spokes — in which case, consider making `operationId` nullable in flowgraph's `CallNodeAttrs` so the hub can NULL the FK instead of requiring a sentinel row. This requires coordination with the `@alkdev/flowgraph` package. 2. **Reactive vs. call graph `requested` semantics**: In `FlowGraph`, `call.requested` creates a node in `pending` state. In `WorkflowReactiveRoot`, `call.requested` maps to `NodeStatus.running` (the reactive model assumes a template node starts executing when requested). This is a deliberate semantic difference — the reactive model tracks execution progress, while the call graph model tracks protocol state. The spec documents this, but implementers should be aware that feeding the same event to both models produces different initial statuses. ## Dependencies ``` @alkdev/flowgraph # DAG construction, reactive execution, call/operation graphs, type-compat analysis @alkdev/operations # Call protocol, PendingRequestMap, CallHandler @alkdev/pubsub # Event transport (Redis, WebSocket, Worker) @alkdev/taskgraph # Task graph construction and analysis (for task management, not call graphs) ``` **Why both `@alkdev/flowgraph` and `@alkdev/taskgraph`?** `@alkdev/taskgraph` is a domain-specific library for task DAG construction with categorical estimates (scope, risk, impact), frontmatter parsing, and task-specific analysis (critical path, bottleneck detection, risk assessment). `@alkdev/flowgraph` is a general-purpose workflow graph library for call/operation DAGs with ujsx template composition and reactive execution. They both wrap graphology, but serve different domains. The hub uses `@alkdev/taskgraph` for task management and `@alkdev/flowgraph` for call graph and operation graph management. ## Prior Art The call protocol was adapted from `ade_spoke`'s call protocol design (which was pubsub-agnostic). The key difference here is that websockets are the primary transport for hub-spoke communication rather than SSE. The call graph and operation graph are now implemented using `@alkdev/flowgraph` rather than raw graphology, which provides DAG enforcement, type-compatibility analysis, and reactive execution out of the box.