Files
hub/docs/architecture/call-graph.md
glm-5.1 93e2286343 Align storage & architecture specs with published npm libraries
Systematically compared @alkdev/taskgraph, @alkdev/operations, and
@alkdev/flowgraph against storage/arch specs and fixed all mismatches.

Key changes:

Tasks (storage/tasks.md + ADR-011):
- Rename TaskFrontmatter → TaskInput to match library export
- Fix dependsOn (was depends_on) in field mappings — library uses
  camelCase; parseFrontmatter normalizes YAML snake_case on input
- Document DependencyEdge shape {from, to, qualityRetention?} and
  DB↔library field mapping
- Document graph node vs DB column distinction (TaskGraphNodeAttrs
  is a subset of TaskInput)
- Fix default risk fallback from low → medium (matches resolveDefaults)
- Fix cross-project guard column references (dependentTaskId, not taskId)
- Clarify @alkdev/taskgraph TS is source of truth; frontmatter is for
  LLM output parsing and legacy imports, not Rust CLI
- Add complete library exports reference

Operations (storage/spokes.md + operations.md):
- Add version, title, _meta columns to operations table (required by
  OperationSpec, were missing)
- Fix type casing: query/mutation/subscription (lowercase, matching
  OperationType runtime values)
- Make outputSchema and accessControl NOT NULL (matching library)
- Document ErrorDefinition shape {code, description, schema, httpStatus?}
- Document _meta vs commonCols.metadata distinction
- Add registerAll, get, getHandler, getByName, list, subscribe methods
- Fix buildCallHandler signature ({ registry, callMap })
- Fix OperationType values (lowercase)

Call graph (storage/call-graph.md + call-graph.md):
- Change operationId to NOT NULL with RESTRICT FK (was nullable/SET NULL)
  — matches flowgraph's required CallNodeAttrs.operationId
- Document sentinel __removed__ operation strategy for deletions
- Document ISO 8601 string ↔ timestamptz conversion requirement
- Rewrite CallEventMap to match actual library: flat dot-notation keys,
  timestamp on all events, nested error structure, optional output on
  completed event
- Remove call.running event (doesn't exist in library) — hub calls
  updateStatus(running) directly on dispatch
- Fix buildCallHandler({ registry, callMap }) signature
- Fix PendingRequestMap constructor (positional EventTarget)
- Add updateCall/removeCall/graph methods to API summary
- Document abort cascade as hub logic, not flowgraph logic
- Add open questions for operation deletion and reactive vs call graph
  semantics

Table reference (storage/table-reference.md):
- Update call_graph_nodes.operationId cascade to RESTRICT
- Update operations.type comment to lowercase
- Update status enum reference
2026-05-25 11:46:42 +00:00

537 lines
36 KiB
Markdown

---
status: draft
last_updated: 2026-05-25
---
# Call Protocol, Call Graph & Operation Graph
## Overview
The call protocol is the unified transport layer for all operation invocations. It provides a single event-based mechanism that works the same whether the call is local (in-process), remote (hub ↔ spoke over websocket), or streamed (subscription). The call graph and operation graph are built on top of it — and `@alkdev/flowgraph` provides the graph construction, analysis, and reactive execution primitives.
Websockets are the primary transport for hub-spoke communication, not SSE. SSE is half-duplex and requires polling for the reverse path; websockets give us bidirectional channels where hub → spoke dispatch and spoke → hub results flow through the same connection. The call protocol's `call ≡ subscribe` semantics map naturally: a websocket frame comes in, the protocol resolves or streams depending on the consumption pattern.
**Transport distinction**: WebSocket is the primary bidirectional transport for hub↔spoke and hub↔client-spoke communication. SSE support exists for compatibility (e.g., OpenAI proxy streams, legacy clients) but is not the preferred transport. A client (browser, CLI) that connects as a spoke gets full bidirectional communication over a single WebSocket — no SSE needed.
## call ≡ subscribe
At the protocol level, `call` and `subscribe` are the same thing with different consumption patterns:
- **`call`**: Publish `call.requested`, subscribe to `call.responded:{requestId}`, resolve on first response → `Promise<TOutput>`
- **`subscribe`**: Publish `call.requested`, subscribe to `call.responded:{requestId}`, yield each response → `AsyncIterable<TOutput>`
Both use the same event types, the same `requestId` correlation, and the same `PendingRequestMap`. The only difference is that `call` resolves after the first `call.responded` and unsubscribes, while `subscribe` stays open and yields each `call.responded` until `call.aborted` or `call.error`.
This means `call` is semantically `subscribe().next()` — a subscription that completes after one event.
**HTTP endpoint**: An HTTP `POST /api/{namespace}/{operation}` is just `call` over HTTP — publish a `call.requested`, wait for `call.responded`, return the output as JSON.
**WebSocket endpoint**: A websocket connection carries bidirectional call protocol events. The hub pushes `call.requested` to spoke runners; runners push `call.responded`/`call.error` back. Same protocol, different transport. This is the hub-spoke "rpc-mode": persistent connection, no polling, natural streaming support.
## Why We Keep the Call Protocol (Not Just the Graphs)
1. **SDD process requires it** — the coordinator models development workflows between agents using the call graph. When the architect calls the decomposer which calls the coordinator which spawns implementation specialists, that's a call graph. The call protocol is what populates it automatically.
2. **Abort cascading** — when a parent operation fails or is aborted, all child operations should be notified. The call protocol propagates `call.aborted` through `parentRequestId` chains. Without it, each coordination operation handles errors ad-hoc (e.g., `coord.spawn` chains 5 `registry.execute()` calls — if the 3rd fails, there's no structured abort of the first two or the pending 4th/5th).
3. **Observability** — seeing what operations called what, how long they took, what failed, is essential for debugging agent workflows. The call protocol auto-tracks calls via `PendingRequestMap`; the call graph is populated as a side effect.
4. **Unified error handling**`mapError` + `InfrastructureErrors` + `errorSchemas` declaration gives structured, typed errors across all transports. Without it, each consumer invents its own error format.
5. **Transport flexibility** — the `TypedEventTarget` plug point means the same protocol works over in-process `EventTarget`, Redis channels, or websockets. The hub uses all three: in-process for local operations, Redis for cross-process events, websockets for spoke runner dispatch.
6. **Future Rust rewrite** — the API contract needs to be stable. The call protocol is a small, well-defined event contract. Building it now means the Rust rewrite has a spec to implement against.
## Call Event Types
All communication flows through typed events. The call event TypeBox schemas are defined in `@alkdev/operations` as `CallEventSchema` (flat dot-notation keys like `"call.requested"`). The shapes below match the library's actual event types.
```ts
import { Type } from "@alkdev/typebox"
// CallEventSchema uses flat dot-notation keys (not nested objects):
// "call.requested", "call.responded", "call.completed", "call.aborted", "call.error"
// The shapes below show the event payload for each type.
// call.requested
Type.Object({
type: Type.Literal("call.requested"),
requestId: Type.String(),
operationId: Type.String(),
input: Type.Unknown(),
timestamp: Type.String(), // ISO 8601 — used as source for startedAt/completedAt
parentRequestId: Type.Optional(Type.String()),
identity: Type.Optional(Type.Object({
id: Type.String(),
scopes: Type.Array(Type.String()),
resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String())))
})),
startedAt: Type.Optional(Type.String()), // ISO 8601 — if provided, overrides timestamp for startedAt
})
// call.responded
Type.Object({
type: Type.Literal("call.responded"),
requestId: Type.String(),
output: Type.Unknown(), // ResponseEnvelope from @alkdev/operations
timestamp: Type.String(), // ISO 8601
})
// call.completed
Type.Object({
type: Type.Literal("call.completed"),
requestId: Type.String(),
output: Type.Optional(Type.Unknown()), // Optional final output
timestamp: Type.String(), // ISO 8601
})
// call.aborted
Type.Object({
type: Type.Literal("call.aborted"),
requestId: Type.String(),
timestamp: Type.String(), // ISO 8601
})
// call.error
Type.Object({
type: Type.Literal("call.error"),
requestId: Type.String(),
error: Type.Object({ // Error is nested under "error" key
code: Type.String(),
message: Type.String(),
details: Type.Optional(Type.Unknown()),
}),
timestamp: Type.String(), // ISO 8601
})
```
**Note on `deadline`**: The `call.requested` event above does not include a `deadline` field. Deadlines are a `PendingRequestMap` concept (timeouts are applied at the protocol layer, not persisted in the call graph). If a call times out, the `PendingRequestMap` emits a `call.error` with code `TIMEOUT`.
**Note: no `call.running` event**: The library's `CallEventMapValue` union only has 5 event types. There is no `call.running` event. When the hub dispatches an operation handler, it calls `flowGraph.updateStatus(requestId, "running", { startedAt: now.toISOString() })` directly. This is a hub-initiated state transition, not an event. See [Write Path](#write-path) for details.
### Event Semantics
- **`call.requested`** — Initiates a call. Creates a call graph node (status: `pending`) and adds a `triggered` edge if `parentRequestId` is present. If `startedAt` is provided in the event, it overrides `timestamp` for the node's `startedAt`.
- **`call.responded`** — Carries the call result. For one-shot calls, this is the terminal event that resolves the `Promise<ResponseEnvelope>`. The `output` field contains a `ResponseEnvelope` (with `data` and `meta` fields) from `@alkdev/operations`. Sets `status: "completed"` and `completedAt: event.timestamp`.
- **`call.completed`** — Terminal completion signal, idempotent if `call.responded` was already received. For subscriptions, fires after the last `call.responded` to signal stream end. May include optional `output`. Sets `completedAt: event.timestamp` if not already set.
- **`call.aborted`** — Call was cancelled. Sets status to `aborted` and `completedAt: event.timestamp`. **Note**: flowgraph's `updateFromEvent()` does NOT cascade aborts to children — the hub's `CallHandler` is responsible for cascading `call.aborted` to descendant calls via the pubsub layer.
- **`call.error`** — Call failed with an error. Sets status to `failed`, stores the error (nested under `error: { code, message, details? }`), and sets `completedAt: event.timestamp`.
**Note on `@alkdev/flowgraph`**: The `CallEventMapValue` type in `@alkdev/flowgraph/schema` defines the union of these event types. Flowgraph's `FlowGraph.fromCallEvents()` and `updateFromEvent()` consume these events directly to populate the call graph. The `CallStatus` enum in flowgraph (`pending`, `running`, `completed`, `failed`, `aborted`) aligns with the statuses in the call protocol events.
**Note on ResponseEnvelope unwrapping**: The `call.responded` event carries `output` as a `ResponseEnvelope` (from `@alkdev/operations`). When feeding events to `@alkdev/flowgraph`, the hub **unwraps the envelope** before calling `updateFromEvent()``CallNodeAttrs.output` stores the `ResponseEnvelope.data` value (the actual result), not the full envelope. The `ResponseEnvelope.meta` is discarded at the call graph level (it's available in `PendingRequestMap` for the caller, but not persisted in the graph node). This means `call_graph_nodes.output` contains the unwrapped result data.
**Identity**: The `Identity` type represents the caller's security context. Derived from keypal's `ApiKeyMetadata``scopes` maps directly from keypal's global scopes, `resources` maps from keypal's resource-scoped permissions (key format: `"type:id"`, value: scope array). Passed through the call chain and checked by `CallHandler` against the operation's `AccessControl` definition. See operations.md for the `AccessControl` type.
**Request correlation**: Every call has a unique `requestId`. Nested calls include `parentRequestId` to track the call chain. Responses and errors are matched to requests by `requestId`.
## Error Model
The call protocol uses a **unified error model**: both infrastructure (protocol-level) and domain (operation-level) errors flow through the same `CallError` event. `CallError.code` is `string` — the distinction between infrastructure and domain codes is by convention, not by type.
### Infrastructure Error Codes
Reserved codes produced by `CallHandler` itself, before or after operation execution:
| Code | When | Schema |
|------|------|--------|
| `OPERATION_NOT_FOUND` | No operation matches `operationId` | `{ operationId: string }` |
| `ACCESS_DENIED` | Missing scopes | `{ requiredScopes?: string[] }` |
| `VALIDATION_ERROR` | Input fails `inputSchema` check | `{ errors: ValueError[] }` |
| `TIMEOUT` | Deadline exceeded | `{ deadline: number }` |
| `ABORTED` | Call cancelled | `{ reason?: string }` |
| `EXECUTION_ERROR` | Handler threw, no `errorSchemas` match | `{ message: string }` |
| `UNKNOWN_ERROR` | Non-Error thrown | `{ raw: string }` |
### Domain Error Propagation
Operations declare their possible errors via `errorSchemas` on `IOperationDefinition`. When a handler throws, `mapError` matches the thrown error against the declared schemas — falls back to `EXECUTION_ERROR` if no match.
**`errorSchemas` is the contract**: An operation's `errorSchemas` declaration is the contract between the operation and its callers about what errors it might produce. No `errorSchemas` = safe default with `EXECUTION_ERROR` wrapper.
## PendingRequestMap
Manages in-flight requests and provides the `call()` interface:
```ts
// From @alkdev/operations
import { PendingRequestMap } from "@alkdev/operations"
// Construction — takes optional EventTarget for pluggable transport (positional, not options object)
const prm = new PendingRequestMap(eventTarget?)
// Call protocol — call() returns Promise<ResponseEnvelope>
const envelope = await prm.call(operationId, input, { deadline, identity })
// envelope.data contains the result, envelope.meta contains source + timestamp
// Subscribe protocol — returns AsyncIterable<ResponseEnvelope>
const stream = prm.subscribe(operationId, input, { idleTimeout, identity })
for await (const envelope of stream) {
// yield each response
}
// Resolving calls
prm.respond(requestId, output) // output must be ResponseEnvelope
prm.emitError(requestId, code, message, details?)
prm.complete(requestId)
prm.abort(requestId)
```
**Key behaviors**:
- `call()` returns `Promise<ResponseEnvelope>` (not `Promise<unknown>`)
- `subscribe()` returns `AsyncIterable<ResponseEnvelope>`
- `respond()` requires `isResponseEnvelope(output)`
- Built-in deadline and idle timeout support
- Constructor takes optional `EventTarget` for pluggable transport (positional parameter, not options object)
## CallHandler
Bridges pubsub events to `OperationRegistry.execute()`. Performs access control and error mapping:
```ts
import { buildCallHandler } from "@alkdev/operations"
const handler = buildCallHandler({ registry, callMap })
// callMap: PendingRequestMap instance (not raw EventTarget)
// subscribes to call.requested events
// checks access control (requiredScopes, resource permissions) against Identity
// executes via registry, dispatches call.responded on success
// maps errors via mapError, dispatches call.error
```
## Nested Call Wiring
Routing is an **env construction concern**, not a separate protocol layer. `buildEnv` is the single function that creates the `env`:
- **Direct mode**: `buildEnv({ registry, context })` → env functions call `registry.execute()` directly, return `Promise<ResponseEnvelope>`
- **Call protocol mode**: `PendingRequestMap` handles routing internally, `parentRequestId` is set via context
`buildEnv` no longer takes a `callMap` parameter. It sets `trusted: true` on nested context (bypasses access control for internal calls). Env functions return `Promise<ResponseEnvelope>`, not `Promise<unknown>`. Callers must use `unwrap(envelope)` or access `envelope.data` for the result.
**parentRequestId propagation**: Every nested call includes `parentRequestId` — enables call graph reconstruction and abort cascading.
## Operation Graph (Static)
Built once at startup from the `OperationRegistry`. Represents type-compatibility edges between operations. Implemented using `@alkdev/flowgraph`.
### Structure
```
Node = OperationNodeAttrs (namespace.name, type, inputSchema, outputSchema)
Edge = OperationEdgeAttrs (compatible: boolean, detail?, mismatches?)
Edge type = "typed" (from flowgraph EdgeType enum)
```
The operation graph is constructed via `FlowGraph.fromSpecs(specs)`, which takes an array of `OperationSpec` objects (derived from `OperationRegistry`) and:
1. Creates a node for each operation with `OperationNodeAttrs` attributes
2. Runs `buildTypeEdges(graph)` to create edges between operations whose output/input schemas are type-compatible
3. Throws `CycleError` if the resulting graph has cycles (DAG invariant)
### Type Compatibility
`typeCompat(outputSchema, inputSchema)` performs deep structural comparison of two TypeBox schemas. Returns:
- `{ compatible: true }` — output is a subtype of input
- `{ compatible: true, detail }` — compatible with notes (e.g., "output has extra fields")
- `{ compatible: false, mismatches: TypeMismatch[] }` — structural incompatibility
- `undefined` — one or both schemas are `unknown`/`any` (no meaningful check possible)
Edges where `compatible: false` are still added to the graph (with `compatible: false` and the mismatch details) so the graph is complete for observability, but the `compatible` attribute allows consumers to filter.
### Call Templates for SDD
The SDD process defines a natural workflow:
```
architect → architecture-reviewer → decomposer → coordinator → implementation-specialist → code-reviewer
```
This is a call template — a validated path through the operation graph that the coordinator can instantiate as a call graph at runtime.
**Current approach**: Hardcoded workflow sequences. See "What We Defer" below.
**Future approach**: `@alkdev/flowgraph` provides ujsx workflow composition components (`Operation`, `Sequential`, `Parallel`, `Conditional`, `Map`) that can define templates declaratively. The `GraphologyHostConfig` renders templates to a `DirectedGraph` for validation, and `ReactiveHostConfig` renders them to reactive `WorkflowNode` trees for execution. When we adopt template-based workflows, flowgraph provides the validation (`validateTemplate`), type-compatibility checking, and DAG enforcement out of the box.
### API Summary
```ts
import { FlowGraph } from "@alkdev/flowgraph/graph"
import { typeCompat, buildTypeEdges, topologicalOrder, validateGraph } from "@alkdev/flowgraph/analysis"
// Build operation graph from registered operations.
// Note: @alkdev/flowgraph's OperationSpec is a structural subset of
// @alkdev/operations' OperationSpec (it omits handler, accessControl, errorSchemas).
// The .map() transforms between the two types.
const opGraph = FlowGraph.fromSpecs(registry.list().map(spec => ({
name: spec.name,
namespace: spec.namespace,
version: spec.version,
type: spec.type, // "query" | "mutation" | "subscription"
inputSchema: spec.inputSchema,
outputSchema: spec.outputSchema,
description: spec.description,
})))
// Query
const compatResult = typeCompat(opA.outputSchema, opB.inputSchema)
const order = topologicalOrder(opGraph.graph)
const issues = validateGraph(opGraph.graph)
// Serialization
const data = opGraph.export() // -> OperationGraphSerialized (graphology JSON format)
const restored = FlowGraph.fromJSON(data) // validates schema + DAG invariant
```
## Call Graph (Dynamic)
Created at runtime for each workflow execution. Populated automatically by the call protocol — every `call.requested` adds a node, every `call.responded`/`call.error`/`call.aborted` updates its state and timestamp. Implemented using `@alkdev/flowgraph`.
### Structure
```
Node = CallNodeAttrs (requestId, operationId, status, input, output?, error?, identity?, parentRequestId?, startedAt?, completedAt?)
Edge type "triggered" = execution hierarchy (parentRequestId → child call)
Edge type "depends_on" = data dependency (call A waits on call B's output)
```
> **EdgeType scoping**: `@alkdev/flowgraph` defines five edge types in its `EdgeType` enum: `triggered`, `depends_on`, `typed`, `sequential`, `conditional`. Not all apply to every graph type:
> - **Call graph**: `triggered` and `depends_on` (plus the storage-layer `requested_by`)
> - **Operation graph**: `typed` (type compatibility between operations)
> - **Template graph**: `sequential` and `conditional` (workflow composition via ujsx)
>
> This document focuses on call graph edge types. See the [flowgraph architecture docs](https://git.alk.dev/alkdev/flowgraph) for the full type definitions.
The call graph is populated by `FlowGraph.fromCallEvents(events)` or incrementally via `updateFromEvent(event)`. Each call protocol event maps directly to a graph mutation:
| Event | Graph Mutation |
|-------|---------------|
| `call.requested` | `addCall(attrs)` — creates node (status: `pending`) + `triggered` edge if `parentRequestId` present |
| `call.responded` | `updateFromEvent()` sets `status: "completed"`, `output`, `completedAt: event.timestamp` (idempotent for terminal states) |
| `call.completed` | Sets `completedAt: event.timestamp` if not already set. Idempotent — if already `completed`, no-op. May also set `output` if present in event. |
| `call.error` | `updateFromEvent()` sets `status: "failed"`, `error: { code, message, details? }`, `completedAt: event.timestamp` |
| `call.aborted` | `updateFromEvent()` sets `status: "aborted"`, `completedAt: event.timestamp`. **No cascade**: `updateFromEvent()` does not abort children — the hub's `CallHandler` handles cascading. |
| _(hub-initiated)_ | `updateStatus(requestId, "running", { startedAt: now.toISOString() })`**Not an event.** The hub's `CallHandler` calls `updateStatus()` directly when it dispatches the operation handler, transitioning `pending``running`. |
### Call Status State Machine
Flowgraph enforces valid status transitions via `updateStatus()`. The state machine is:
```
pending → running → completed
→ failed
→ aborted
running → aborted
```
Terminal states (`completed`, `failed`, `aborted`) are immutable. `InvalidTransitionError` is thrown on invalid transitions. This matches the storage layer's `call_graph_nodes.status` enum.
### Abort Cascading
When a call is aborted, all of its children should also be aborted. Flowgraph provides two mechanisms:
1. **`triggered` edge traversal**: `children(requestId)` returns direct children via `triggered` edges. Full cascading uses `descendants(requestId)` for all descendants.
2. **`WorkflowReactiveRoot`**: For running workflow executions, the reactive engine provides `abortNode(nodeId)` and `abortAll()` with `FailurePolicy` configuration (`"continue-running"` vs `"abort-dependents"`).
The hub's `CallHandler` wires `call.aborted` events to:
- `updateStatus(requestId, "aborted")` on the call graph
- Pubsub event propagation so downstream `PendingRequestMap` instances also call `abort()` on their in-flight requests
- `WorkflowReactiveRoot.abortNode(nodeId)` if a workflow execution is tracking this call
### `depends_on` Edges
While `triggered` edges represent the parent-child execution hierarchy, `depends_on` edges represent data dependencies — a call that needs another call's output before it can proceed. These are created by the coordinator when orchestrating workflows:
```ts
callGraph.addDependency(sourceRequestId, targetRequestId)
// Adds a "depends_on" edge (source depends on target's output)
// Cycle-checked — throws CycleError if the edge would create a cycle
```
`depends_on` edges are not created by the call protocol itself. They are added by coordination logic that knows the data flow between calls (e.g., the coordinator knows that `coord.spawn` step 3 depends on step 1's output). This gives the observability layer a richer graph for analysis without changing the protocol.
### API Summary
```ts
import { FlowGraph } from "@alkdev/flowgraph/graph"
import { CallStatus } from "@alkdev/flowgraph/schema"
// Build call graph from events (e.g., after hub restart, reconstruct from DB)
const callGraph = FlowGraph.fromCallEvents(storedEvents)
// Or build incrementally as events arrive
const callGraph = new FlowGraph(CallNodeAttrs, CallEdgeAttrs)
// Process events
callGraph.updateFromEvent(event) // handles all 5 call.* event types
// Status management
callGraph.updateStatus(requestId, "running", { startedAt: now.toISOString() }) // hub-initiated, not event-driven
callGraph.updateStatus(requestId, "completed") // validates state machine transition
// Call management
callGraph.addCall({ requestId, operationId, status: "pending", parentRequestId?, input?, identity?, startedAt? })
callGraph.updateCall(requestId, attrs) // partial update of any CallNodeAttrs
callGraph.removeCall(requestId)
callGraph.addDependency(sourceRequestId, targetRequestId) // depends_on edge
// Queries
callGraph.children(requestId) // direct children via triggered edges
callGraph.descendants(nodeId) // all descendants
callGraph.lineage(requestId) // ancestor chain from root to this call
callGraph.getRoots() // calls with no parentRequestId
callGraph.filterByStatus("running") // all running calls
callGraph.duration(requestId) // completedAt - startedAt in ms
// Escape hatch
callGraph.graph // raw graphology DirectedGraph
// Serialization (for Postgres persistence)
const data = callGraph.export() // -> CallGraphSerialized
const restored = FlowGraph.fromJSON(data)
```
### Graph Size
At hub level, the call graph is small — just metadata nodes mapping to the actual process/call. Agent workflow call graphs will have tens of nodes at most for simple workflows, potentially hundreds for complex coordination with many parallel tasks. Performance is a non-issue for call-level metadata; flowgraph wraps graphology which handles thousands of nodes efficiently.
## Reactive Workflow Execution
For running workflow executions (not just observability), `@alkdev/flowgraph/reactive` provides `WorkflowReactiveRoot` — a signal-driven execution engine that:
1. Takes a `DirectedGraph` (from a ujsx template rendered via `GraphologyHostConfig`) and creates reactive state for every node
2. Processes call protocol events via `append(event)` — the event log is the source of truth, status/results are derived projections
3. Computes `preconditions` (all predecessors completed), `canStart` (preconditions met + not blocked by failure), and `blockedByFailure` (any predecessor failed/aborted) as reactive signals
4. Supports `FailurePolicy`: `"continue-running"` (only abort idle/waiting dependents) or `"abort-dependents"` (cascade abort to all non-terminal dependents)
5. Maps node keys to `requestId`s via `setRequestId(nodeKey, requestId)` — bridging template nodes to call protocol identifiers
6. Requires `dispose()` to release signal subscriptions
```ts
import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive"
// WorkflowReactiveRoot takes the raw DirectedGraph (flowGraph.graph),
// not the FlowGraph wrapper
const workflowRoot = new WorkflowReactiveRoot(templateGraph.graph)
try {
// Bridge template nodes to call protocol request IDs
workflowRoot.setRequestId("step-1", requestId1)
workflowRoot.setRequestId("step-2", requestId2)
// Process events (appended by the hub's call protocol handler)
workflowRoot.append(callRequestedEvent)
workflowRoot.append(callRespondedEvent)
// Query reactive state
const status = workflowRoot.getStatus("step-1") // NodeStatus
const canStart = workflowRoot.canStart.get("step-2") // ReadonlySignal<boolean>
const isComplete = workflowRoot.isComplete() // all nodes terminal?
} finally {
workflowRoot.dispose()
}
```
This is the execution engine for workflow-based coordination. The hub coordinator instantiates a `WorkflowReactiveRoot` for each running workflow, feeds it call protocol events, and uses its reactive state to determine what to do next (start the next step, handle failures, cascade aborts).
## Storage
Call graph nodes and edges are stored in Postgres. See `storage/call-graph.md` for the full schema definitions.
The storage layer persists individual `call_graph_nodes` and `call_graph_edges` rows. Flowgraph's `export()` produces graphology's native JSON format (`CallGraphSerialized`), which is suitable for snapshot/restore but not for incremental observability queries. The hub uses **both**:
- **Incremental storage**: Each call protocol event writes/updates a row in `call_graph_nodes` and creates `call_graph_edges` as needed. This supports real-time observability queries (what's running, what failed, what's blocked).
- **Reconstruction**: After a hub restart, the call graph can be reconstructed from stored events or from incremental rows using `FlowGraph.fromCallEvents()`.
### Write Path
The hub's `CallHandler` is responsible for writing call graph data to Postgres. When a call protocol event arrives:
1. **`call.requested`**: The `CallHandler` creates a row in `call_graph_nodes` (status: `pending`) and, if `parentRequestId` is present, a `triggered` edge in `call_graph_edges`. This write happens **synchronously before dispatching** to ensure the call is tracked even if the handler fails immediately.
2. **Handler dispatch**: The `CallHandler` dispatches the operation handler. At this point it updates the node status to `running` and sets `startedAt` — this is **not** an event, but a hub-initiated `updateStatus()` call on both the in-memory flowgraph and the DB row.
3. **`call.responded`**: Updates the node's status to `completed`, sets `output` (unwrapped from the `ResponseEnvelope` — only `data` is stored, not `meta`), and sets `completedAt`.
4. **`call.error`**: Updates status to `failed`, sets `error`, and sets `completedAt`.
5. **`call.aborted`**: Updates status to `aborted` and sets `completedAt`. The hub's `CallHandler` then cascades the abort to child calls (this is hub logic, not flowgraph's `updateFromEvent()`).
6. **`call.completed`**: Sets `completedAt` if not already set. Idempotent — no-op if the call is already `completed`.
Error handling: If a DB write fails, the call still proceeds (the handler has already been invoked). The hub logs the write failure and continues. Call graph data is best-effort — the in-memory flowgraph is the authoritative source for running calls; the DB is for persistence and observability.
### Identifier Mapping
The `call_graph_nodes` table uses two identifiers:
- **`id`** (UUID, from `commonCols`): Internal primary key, used as the FK target for `call_graph_edges`.
- **`requestId`** (text, UNIQUE): Protocol-level correlation key, used as the flowgraph node key.
When reconstructing a flowgraph from the database, the hub uses `requestId` as the node key (matching `CallNodeAttrs.requestId`). The `call_graph_edges` table uses `sourceId`/`targetId` referencing `call_graph_nodes.id` (the UUID), so reconstruction requires resolving UUIDs to requestIds. The `call_graph_nodes.requestId` column has a UNIQUE index, making this lookup efficient.
### `call_graph_nodes` — One row per call
| Column | Type | Notes |
|--------|------|-------|
| commonCols | — | id, metadata, createdAt, updatedAt |
| requestId | text NOT NULL UNIQUE | Protocol-level correlation key. Also the flowgraph node key. |
| operationId | text NOT NULL | FK → operations.id (RESTRICT). NOT NULL — `CallNodeAttrs.operationId` is required in flowgraph. |
| parentRequestId | text | Denormalized parent — fast point lookup. Redundant with `triggered` edge. |
| identity | jsonb | Caller identity: `{ id, scopes, resources? }` |
| callerAccountId | text | FK → accounts.id (ON DELETE SET NULL). System calls are nullable. |
| status | text NOT NULL | Matches `CallStatus` enum: `pending`, `running`, `completed`, `failed`, `aborted` |
| input | jsonb | Call input (redacted, truncated — see storage/call-graph.md) |
| output | jsonb | Call output (on success) |
| error | jsonb | `{ code, message, details? }` (on failure, nested under `error` key matching flowgraph) |
| startedAt | timestamp with tz | When call was dispatched. **Type conversion**: flowgraph stores as ISO 8601 string; DB stores as `timestamptz`. |
| completedAt | timestamp with tz | When call completed/failed/aborted. Same type conversion as `startedAt`. |
### `call_graph_edges` — Typed directed edges between calls
| Column | Type | Notes |
|--------|------|-------|
| commonCols | — | id, metadata, createdAt, updatedAt |
| sourceId | text NOT NULL | FK → call_graph_nodes.id (CASCADE) |
| targetId | text NOT NULL | FK → call_graph_nodes.id (CASCADE) |
| edgeType | text NOT NULL | `triggered`, `depends_on`, or `requested_by` |
**Edge type semantics**: `triggered` = execution hierarchy (parentRequestId), `depends_on` = data dependency, `requested_by` = identity/authorization chain. See storage/call-graph.md for details.
**Note on `depends_on` in flowgraph**: The flowgraph `CallEdgeAttrs` type is a union of `TriggeredEdgeAttrs` and `DependencyEdgeAttrs`, matching the `triggered` and `depends_on` edge types. The `requested_by` edge type is a storage-layer concept for identity tracing that doesn't have a corresponding flowgraph edge type — it's persisted in the database but not modeled in the in-memory graph.
## Transport Mapping
The call protocol is transport-agnostic. The `TypedEventTarget` plug point (same pattern as `RedisEventTarget` in the pubsub design) determines how events move:
| Transport | Use Case | `TypedEventTarget` impl |
|-----------|----------|------------------------|
| In-process | Local hub operations | Browser `EventTarget` (default) |
| Redis | Cross-process events (e.g., hub → all processes) | `RedisEventTarget` |
| WebSocket | Hub ↔ spoke bidirectional | `createWebSocketServerEventTarget` (hub) / `createWebSocketClientEventTarget` (spoke) from `@alkdev/pubsub` |
A `WebSocketEventTarget` implementing `TypedEventTarget` makes each spoke runner's websocket connection a live bidirectional channel. The hub dispatches `call.requested` over the socket; the runner sends `call.responded`/`call.error` back. Same protocol, same event shapes, same `PendingRequestMap` — just a different `eventTarget`.
## What We Defer
1. **Full ujsx call templates** — currently using hardcoded workflow sequences. `@alkdev/flowgraph/component` provides `Operation`, `Sequential`, `Parallel`, `Conditional`, `Map` components for declarative template definition, and `GraphologyHostConfig` + `ReactiveHostConfig` for rendering. We'll adopt these when workflow complexity justifies it.
2. **Graph visualization** — API only, no Sigma.js UI
3. **Stream deduplication**`Value.Hash({operationId, input})` deduplication for multiple subscribers to the same stream
4. **`requested_by` edge creation in flowgraph** — the `requested_by` edge type is a storage-layer concept for identity tracing. It's persisted in `call_graph_edges` but not modeled in `@alkdev/flowgraph`'s `CallEdgeAttrs` union. We may add it to flowgraph in the future.
The call protocol itself, `PendingRequestMap`, `CallHandler`, `buildEnv` dual-mode, call graph auto-tracking, and reactive workflow execution are **in the initial implementation**. They're not much code and they prevent the need to bolt on ad-hoc error handling and abort logic in every coordination operation.
## Open Questions
1. **Operation deletion and call graph referential integrity**: The `call_graph_nodes.operationId` column has a RESTRICT FK to `operations.id`. An operation cannot be deleted while any call records reference it. For v1, the strategy is to deny removal while call records exist. If operation removal becomes necessary (e.g., cleanup of old operations), the hub would need to either: (a) reassign all referencing call records to a sentinel `__removed__` operation (pre-seeded in migrations with `id='__removed__'`, `namespace='system'`), then delete the original operation, or (b) accept that historical call records reference operations that may have been provided by disconnected spokes — in which case, consider making `operationId` nullable in flowgraph's `CallNodeAttrs` so the hub can NULL the FK instead of requiring a sentinel row. This requires coordination with the `@alkdev/flowgraph` package.
2. **Reactive vs. call graph `requested` semantics**: In `FlowGraph`, `call.requested` creates a node in `pending` state. In `WorkflowReactiveRoot`, `call.requested` maps to `NodeStatus.running` (the reactive model assumes a template node starts executing when requested). This is a deliberate semantic difference — the reactive model tracks execution progress, while the call graph model tracks protocol state. The spec documents this, but implementers should be aware that feeding the same event to both models produces different initial statuses.
## Dependencies
```
@alkdev/flowgraph # DAG construction, reactive execution, call/operation graphs, type-compat analysis
@alkdev/operations # Call protocol, PendingRequestMap, CallHandler
@alkdev/pubsub # Event transport (Redis, WebSocket, Worker)
@alkdev/taskgraph # Task graph construction and analysis (for task management, not call graphs)
```
**Why both `@alkdev/flowgraph` and `@alkdev/taskgraph`?** `@alkdev/taskgraph` is a domain-specific library for task DAG construction with categorical estimates (scope, risk, impact), frontmatter parsing, and task-specific analysis (critical path, bottleneck detection, risk assessment). `@alkdev/flowgraph` is a general-purpose workflow graph library for call/operation DAGs with ujsx template composition and reactive execution. They both wrap graphology, but serve different domains. The hub uses `@alkdev/taskgraph` for task management and `@alkdev/flowgraph` for call graph and operation graph management.
## Prior Art
The call protocol was adapted from `ade_spoke`'s call protocol design (which was pubsub-agnostic). The key difference here is that websockets are the primary transport for hub-spoke communication rather than SSE. The call graph and operation graph are now implemented using `@alkdev/flowgraph` rather than raw graphology, which provides DAG enforcement, type-compatibility analysis, and reactive execution out of the box.