Systematically compared @alkdev/taskgraph, @alkdev/operations, and
@alkdev/flowgraph against storage/arch specs and fixed all mismatches.
Key changes:
Tasks (storage/tasks.md + ADR-011):
- Rename TaskFrontmatter → TaskInput to match library export
- Fix dependsOn (was depends_on) in field mappings — library uses
camelCase; parseFrontmatter normalizes YAML snake_case on input
- Document DependencyEdge shape {from, to, qualityRetention?} and
DB↔library field mapping
- Document graph node vs DB column distinction (TaskGraphNodeAttrs
is a subset of TaskInput)
- Fix default risk fallback from low → medium (matches resolveDefaults)
- Fix cross-project guard column references (dependentTaskId, not taskId)
- Clarify @alkdev/taskgraph TS is source of truth; frontmatter is for
LLM output parsing and legacy imports, not Rust CLI
- Add complete library exports reference
Operations (storage/spokes.md + operations.md):
- Add version, title, _meta columns to operations table (required by
OperationSpec, were missing)
- Fix type casing: query/mutation/subscription (lowercase, matching
OperationType runtime values)
- Make outputSchema and accessControl NOT NULL (matching library)
- Document ErrorDefinition shape {code, description, schema, httpStatus?}
- Document _meta vs commonCols.metadata distinction
- Add registerAll, get, getHandler, getByName, list, subscribe methods
- Fix buildCallHandler signature ({ registry, callMap })
- Fix OperationType values (lowercase)
Call graph (storage/call-graph.md + call-graph.md):
- Change operationId to NOT NULL with RESTRICT FK (was nullable/SET NULL)
— matches flowgraph's required CallNodeAttrs.operationId
- Document sentinel __removed__ operation strategy for deletions
- Document ISO 8601 string ↔ timestamptz conversion requirement
- Rewrite CallEventMap to match actual library: flat dot-notation keys,
timestamp on all events, nested error structure, optional output on
completed event
- Remove call.running event (doesn't exist in library) — hub calls
updateStatus(running) directly on dispatch
- Fix buildCallHandler({ registry, callMap }) signature
- Fix PendingRequestMap constructor (positional EventTarget)
- Add updateCall/removeCall/graph methods to API summary
- Document abort cascade as hub logic, not flowgraph logic
- Add open questions for operation deletion and reactive vs call graph
semantics
Table reference (storage/table-reference.md):
- Update call_graph_nodes.operationId cascade to RESTRICT
- Update operations.type comment to lowercase
- Update status enum reference
36 KiB
status, last_updated
| status | last_updated |
|---|---|
| draft | 2026-05-25 |
Call Protocol, Call Graph & Operation Graph
Overview
The call protocol is the unified transport layer for all operation invocations. It provides a single event-based mechanism that works the same whether the call is local (in-process), remote (hub ↔ spoke over websocket), or streamed (subscription). The call graph and operation graph are built on top of it — and @alkdev/flowgraph provides the graph construction, analysis, and reactive execution primitives.
Websockets are the primary transport for hub-spoke communication, not SSE. SSE is half-duplex and requires polling for the reverse path; websockets give us bidirectional channels where hub → spoke dispatch and spoke → hub results flow through the same connection. The call protocol's call ≡ subscribe semantics map naturally: a websocket frame comes in, the protocol resolves or streams depending on the consumption pattern.
Transport distinction: WebSocket is the primary bidirectional transport for hub↔spoke and hub↔client-spoke communication. SSE support exists for compatibility (e.g., OpenAI proxy streams, legacy clients) but is not the preferred transport. A client (browser, CLI) that connects as a spoke gets full bidirectional communication over a single WebSocket — no SSE needed.
call ≡ subscribe
At the protocol level, call and subscribe are the same thing with different consumption patterns:
call: Publishcall.requested, subscribe tocall.responded:{requestId}, resolve on first response →Promise<TOutput>subscribe: Publishcall.requested, subscribe tocall.responded:{requestId}, yield each response →AsyncIterable<TOutput>
Both use the same event types, the same requestId correlation, and the same PendingRequestMap. The only difference is that call resolves after the first call.responded and unsubscribes, while subscribe stays open and yields each call.responded until call.aborted or call.error.
This means call is semantically subscribe().next() — a subscription that completes after one event.
HTTP endpoint: An HTTP POST /api/{namespace}/{operation} is just call over HTTP — publish a call.requested, wait for call.responded, return the output as JSON.
WebSocket endpoint: A websocket connection carries bidirectional call protocol events. The hub pushes call.requested to spoke runners; runners push call.responded/call.error back. Same protocol, different transport. This is the hub-spoke "rpc-mode": persistent connection, no polling, natural streaming support.
Why We Keep the Call Protocol (Not Just the Graphs)
-
SDD process requires it — the coordinator models development workflows between agents using the call graph. When the architect calls the decomposer which calls the coordinator which spawns implementation specialists, that's a call graph. The call protocol is what populates it automatically.
-
Abort cascading — when a parent operation fails or is aborted, all child operations should be notified. The call protocol propagates
call.abortedthroughparentRequestIdchains. Without it, each coordination operation handles errors ad-hoc (e.g.,coord.spawnchains 5registry.execute()calls — if the 3rd fails, there's no structured abort of the first two or the pending 4th/5th). -
Observability — seeing what operations called what, how long they took, what failed, is essential for debugging agent workflows. The call protocol auto-tracks calls via
PendingRequestMap; the call graph is populated as a side effect. -
Unified error handling —
mapError+InfrastructureErrors+errorSchemasdeclaration gives structured, typed errors across all transports. Without it, each consumer invents its own error format. -
Transport flexibility — the
TypedEventTargetplug point means the same protocol works over in-processEventTarget, Redis channels, or websockets. The hub uses all three: in-process for local operations, Redis for cross-process events, websockets for spoke runner dispatch. -
Future Rust rewrite — the API contract needs to be stable. The call protocol is a small, well-defined event contract. Building it now means the Rust rewrite has a spec to implement against.
Call Event Types
All communication flows through typed events. The call event TypeBox schemas are defined in @alkdev/operations as CallEventSchema (flat dot-notation keys like "call.requested"). The shapes below match the library's actual event types.
import { Type } from "@alkdev/typebox"
// CallEventSchema uses flat dot-notation keys (not nested objects):
// "call.requested", "call.responded", "call.completed", "call.aborted", "call.error"
// The shapes below show the event payload for each type.
// call.requested
Type.Object({
type: Type.Literal("call.requested"),
requestId: Type.String(),
operationId: Type.String(),
input: Type.Unknown(),
timestamp: Type.String(), // ISO 8601 — used as source for startedAt/completedAt
parentRequestId: Type.Optional(Type.String()),
identity: Type.Optional(Type.Object({
id: Type.String(),
scopes: Type.Array(Type.String()),
resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String())))
})),
startedAt: Type.Optional(Type.String()), // ISO 8601 — if provided, overrides timestamp for startedAt
})
// call.responded
Type.Object({
type: Type.Literal("call.responded"),
requestId: Type.String(),
output: Type.Unknown(), // ResponseEnvelope from @alkdev/operations
timestamp: Type.String(), // ISO 8601
})
// call.completed
Type.Object({
type: Type.Literal("call.completed"),
requestId: Type.String(),
output: Type.Optional(Type.Unknown()), // Optional final output
timestamp: Type.String(), // ISO 8601
})
// call.aborted
Type.Object({
type: Type.Literal("call.aborted"),
requestId: Type.String(),
timestamp: Type.String(), // ISO 8601
})
// call.error
Type.Object({
type: Type.Literal("call.error"),
requestId: Type.String(),
error: Type.Object({ // Error is nested under "error" key
code: Type.String(),
message: Type.String(),
details: Type.Optional(Type.Unknown()),
}),
timestamp: Type.String(), // ISO 8601
})
Note on deadline: The call.requested event above does not include a deadline field. Deadlines are a PendingRequestMap concept (timeouts are applied at the protocol layer, not persisted in the call graph). If a call times out, the PendingRequestMap emits a call.error with code TIMEOUT.
Note: no call.running event: The library's CallEventMapValue union only has 5 event types. There is no call.running event. When the hub dispatches an operation handler, it calls flowGraph.updateStatus(requestId, "running", { startedAt: now.toISOString() }) directly. This is a hub-initiated state transition, not an event. See Write Path for details.
Event Semantics
call.requested— Initiates a call. Creates a call graph node (status:pending) and adds atriggerededge ifparentRequestIdis present. IfstartedAtis provided in the event, it overridestimestampfor the node'sstartedAt.call.responded— Carries the call result. For one-shot calls, this is the terminal event that resolves thePromise<ResponseEnvelope>. Theoutputfield contains aResponseEnvelope(withdataandmetafields) from@alkdev/operations. Setsstatus: "completed"andcompletedAt: event.timestamp.call.completed— Terminal completion signal, idempotent ifcall.respondedwas already received. For subscriptions, fires after the lastcall.respondedto signal stream end. May include optionaloutput. SetscompletedAt: event.timestampif not already set.call.aborted— Call was cancelled. Sets status toabortedandcompletedAt: event.timestamp. Note: flowgraph'supdateFromEvent()does NOT cascade aborts to children — the hub'sCallHandleris responsible for cascadingcall.abortedto descendant calls via the pubsub layer.call.error— Call failed with an error. Sets status tofailed, stores the error (nested undererror: { code, message, details? }), and setscompletedAt: event.timestamp.
Note on @alkdev/flowgraph: The CallEventMapValue type in @alkdev/flowgraph/schema defines the union of these event types. Flowgraph's FlowGraph.fromCallEvents() and updateFromEvent() consume these events directly to populate the call graph. The CallStatus enum in flowgraph (pending, running, completed, failed, aborted) aligns with the statuses in the call protocol events.
Note on ResponseEnvelope unwrapping: The call.responded event carries output as a ResponseEnvelope (from @alkdev/operations). When feeding events to @alkdev/flowgraph, the hub unwraps the envelope before calling updateFromEvent() — CallNodeAttrs.output stores the ResponseEnvelope.data value (the actual result), not the full envelope. The ResponseEnvelope.meta is discarded at the call graph level (it's available in PendingRequestMap for the caller, but not persisted in the graph node). This means call_graph_nodes.output contains the unwrapped result data.
Identity: The Identity type represents the caller's security context. Derived from keypal's ApiKeyMetadata — scopes maps directly from keypal's global scopes, resources maps from keypal's resource-scoped permissions (key format: "type:id", value: scope array). Passed through the call chain and checked by CallHandler against the operation's AccessControl definition. See operations.md for the AccessControl type.
Request correlation: Every call has a unique requestId. Nested calls include parentRequestId to track the call chain. Responses and errors are matched to requests by requestId.
Error Model
The call protocol uses a unified error model: both infrastructure (protocol-level) and domain (operation-level) errors flow through the same CallError event. CallError.code is string — the distinction between infrastructure and domain codes is by convention, not by type.
Infrastructure Error Codes
Reserved codes produced by CallHandler itself, before or after operation execution:
| Code | When | Schema |
|---|---|---|
OPERATION_NOT_FOUND |
No operation matches operationId |
{ operationId: string } |
ACCESS_DENIED |
Missing scopes | { requiredScopes?: string[] } |
VALIDATION_ERROR |
Input fails inputSchema check |
{ errors: ValueError[] } |
TIMEOUT |
Deadline exceeded | { deadline: number } |
ABORTED |
Call cancelled | { reason?: string } |
EXECUTION_ERROR |
Handler threw, no errorSchemas match |
{ message: string } |
UNKNOWN_ERROR |
Non-Error thrown | { raw: string } |
Domain Error Propagation
Operations declare their possible errors via errorSchemas on IOperationDefinition. When a handler throws, mapError matches the thrown error against the declared schemas — falls back to EXECUTION_ERROR if no match.
errorSchemas is the contract: An operation's errorSchemas declaration is the contract between the operation and its callers about what errors it might produce. No errorSchemas = safe default with EXECUTION_ERROR wrapper.
PendingRequestMap
Manages in-flight requests and provides the call() interface:
// From @alkdev/operations
import { PendingRequestMap } from "@alkdev/operations"
// Construction — takes optional EventTarget for pluggable transport (positional, not options object)
const prm = new PendingRequestMap(eventTarget?)
// Call protocol — call() returns Promise<ResponseEnvelope>
const envelope = await prm.call(operationId, input, { deadline, identity })
// envelope.data contains the result, envelope.meta contains source + timestamp
// Subscribe protocol — returns AsyncIterable<ResponseEnvelope>
const stream = prm.subscribe(operationId, input, { idleTimeout, identity })
for await (const envelope of stream) {
// yield each response
}
// Resolving calls
prm.respond(requestId, output) // output must be ResponseEnvelope
prm.emitError(requestId, code, message, details?)
prm.complete(requestId)
prm.abort(requestId)
Key behaviors:
call()returnsPromise<ResponseEnvelope>(notPromise<unknown>)subscribe()returnsAsyncIterable<ResponseEnvelope>respond()requiresisResponseEnvelope(output)- Built-in deadline and idle timeout support
- Constructor takes optional
EventTargetfor pluggable transport (positional parameter, not options object)
CallHandler
Bridges pubsub events to OperationRegistry.execute(). Performs access control and error mapping:
import { buildCallHandler } from "@alkdev/operations"
const handler = buildCallHandler({ registry, callMap })
// callMap: PendingRequestMap instance (not raw EventTarget)
// subscribes to call.requested events
// checks access control (requiredScopes, resource permissions) against Identity
// executes via registry, dispatches call.responded on success
// maps errors via mapError, dispatches call.error
Nested Call Wiring
Routing is an env construction concern, not a separate protocol layer. buildEnv is the single function that creates the env:
- Direct mode:
buildEnv({ registry, context })→ env functions callregistry.execute()directly, returnPromise<ResponseEnvelope> - Call protocol mode:
PendingRequestMaphandles routing internally,parentRequestIdis set via context
buildEnv no longer takes a callMap parameter. It sets trusted: true on nested context (bypasses access control for internal calls). Env functions return Promise<ResponseEnvelope>, not Promise<unknown>. Callers must use unwrap(envelope) or access envelope.data for the result.
parentRequestId propagation: Every nested call includes parentRequestId — enables call graph reconstruction and abort cascading.
Operation Graph (Static)
Built once at startup from the OperationRegistry. Represents type-compatibility edges between operations. Implemented using @alkdev/flowgraph.
Structure
Node = OperationNodeAttrs (namespace.name, type, inputSchema, outputSchema)
Edge = OperationEdgeAttrs (compatible: boolean, detail?, mismatches?)
Edge type = "typed" (from flowgraph EdgeType enum)
The operation graph is constructed via FlowGraph.fromSpecs(specs), which takes an array of OperationSpec objects (derived from OperationRegistry) and:
- Creates a node for each operation with
OperationNodeAttrsattributes - Runs
buildTypeEdges(graph)to create edges between operations whose output/input schemas are type-compatible - Throws
CycleErrorif the resulting graph has cycles (DAG invariant)
Type Compatibility
typeCompat(outputSchema, inputSchema) performs deep structural comparison of two TypeBox schemas. Returns:
{ compatible: true }— output is a subtype of input{ compatible: true, detail }— compatible with notes (e.g., "output has extra fields"){ compatible: false, mismatches: TypeMismatch[] }— structural incompatibilityundefined— one or both schemas areunknown/any(no meaningful check possible)
Edges where compatible: false are still added to the graph (with compatible: false and the mismatch details) so the graph is complete for observability, but the compatible attribute allows consumers to filter.
Call Templates for SDD
The SDD process defines a natural workflow:
architect → architecture-reviewer → decomposer → coordinator → implementation-specialist → code-reviewer
This is a call template — a validated path through the operation graph that the coordinator can instantiate as a call graph at runtime.
Current approach: Hardcoded workflow sequences. See "What We Defer" below.
Future approach: @alkdev/flowgraph provides ujsx workflow composition components (Operation, Sequential, Parallel, Conditional, Map) that can define templates declaratively. The GraphologyHostConfig renders templates to a DirectedGraph for validation, and ReactiveHostConfig renders them to reactive WorkflowNode trees for execution. When we adopt template-based workflows, flowgraph provides the validation (validateTemplate), type-compatibility checking, and DAG enforcement out of the box.
API Summary
import { FlowGraph } from "@alkdev/flowgraph/graph"
import { typeCompat, buildTypeEdges, topologicalOrder, validateGraph } from "@alkdev/flowgraph/analysis"
// Build operation graph from registered operations.
// Note: @alkdev/flowgraph's OperationSpec is a structural subset of
// @alkdev/operations' OperationSpec (it omits handler, accessControl, errorSchemas).
// The .map() transforms between the two types.
const opGraph = FlowGraph.fromSpecs(registry.list().map(spec => ({
name: spec.name,
namespace: spec.namespace,
version: spec.version,
type: spec.type, // "query" | "mutation" | "subscription"
inputSchema: spec.inputSchema,
outputSchema: spec.outputSchema,
description: spec.description,
})))
// Query
const compatResult = typeCompat(opA.outputSchema, opB.inputSchema)
const order = topologicalOrder(opGraph.graph)
const issues = validateGraph(opGraph.graph)
// Serialization
const data = opGraph.export() // -> OperationGraphSerialized (graphology JSON format)
const restored = FlowGraph.fromJSON(data) // validates schema + DAG invariant
Call Graph (Dynamic)
Created at runtime for each workflow execution. Populated automatically by the call protocol — every call.requested adds a node, every call.responded/call.error/call.aborted updates its state and timestamp. Implemented using @alkdev/flowgraph.
Structure
Node = CallNodeAttrs (requestId, operationId, status, input, output?, error?, identity?, parentRequestId?, startedAt?, completedAt?)
Edge type "triggered" = execution hierarchy (parentRequestId → child call)
Edge type "depends_on" = data dependency (call A waits on call B's output)
EdgeType scoping:
@alkdev/flowgraphdefines five edge types in itsEdgeTypeenum:triggered,depends_on,typed,sequential,conditional. Not all apply to every graph type:
- Call graph:
triggeredanddepends_on(plus the storage-layerrequested_by)- Operation graph:
typed(type compatibility between operations)- Template graph:
sequentialandconditional(workflow composition via ujsx)This document focuses on call graph edge types. See the flowgraph architecture docs for the full type definitions.
The call graph is populated by FlowGraph.fromCallEvents(events) or incrementally via updateFromEvent(event). Each call protocol event maps directly to a graph mutation:
| Event | Graph Mutation |
|---|---|
call.requested |
addCall(attrs) — creates node (status: pending) + triggered edge if parentRequestId present |
call.responded |
updateFromEvent() sets status: "completed", output, completedAt: event.timestamp (idempotent for terminal states) |
call.completed |
Sets completedAt: event.timestamp if not already set. Idempotent — if already completed, no-op. May also set output if present in event. |
call.error |
updateFromEvent() sets status: "failed", error: { code, message, details? }, completedAt: event.timestamp |
call.aborted |
updateFromEvent() sets status: "aborted", completedAt: event.timestamp. No cascade: updateFromEvent() does not abort children — the hub's CallHandler handles cascading. |
| (hub-initiated) | updateStatus(requestId, "running", { startedAt: now.toISOString() }) — Not an event. The hub's CallHandler calls updateStatus() directly when it dispatches the operation handler, transitioning pending → running. |
Call Status State Machine
Flowgraph enforces valid status transitions via updateStatus(). The state machine is:
pending → running → completed
→ failed
→ aborted
running → aborted
Terminal states (completed, failed, aborted) are immutable. InvalidTransitionError is thrown on invalid transitions. This matches the storage layer's call_graph_nodes.status enum.
Abort Cascading
When a call is aborted, all of its children should also be aborted. Flowgraph provides two mechanisms:
triggerededge traversal:children(requestId)returns direct children viatriggerededges. Full cascading usesdescendants(requestId)for all descendants.WorkflowReactiveRoot: For running workflow executions, the reactive engine providesabortNode(nodeId)andabortAll()withFailurePolicyconfiguration ("continue-running"vs"abort-dependents").
The hub's CallHandler wires call.aborted events to:
updateStatus(requestId, "aborted")on the call graph- Pubsub event propagation so downstream
PendingRequestMapinstances also callabort()on their in-flight requests WorkflowReactiveRoot.abortNode(nodeId)if a workflow execution is tracking this call
depends_on Edges
While triggered edges represent the parent-child execution hierarchy, depends_on edges represent data dependencies — a call that needs another call's output before it can proceed. These are created by the coordinator when orchestrating workflows:
callGraph.addDependency(sourceRequestId, targetRequestId)
// Adds a "depends_on" edge (source depends on target's output)
// Cycle-checked — throws CycleError if the edge would create a cycle
depends_on edges are not created by the call protocol itself. They are added by coordination logic that knows the data flow between calls (e.g., the coordinator knows that coord.spawn step 3 depends on step 1's output). This gives the observability layer a richer graph for analysis without changing the protocol.
API Summary
import { FlowGraph } from "@alkdev/flowgraph/graph"
import { CallStatus } from "@alkdev/flowgraph/schema"
// Build call graph from events (e.g., after hub restart, reconstruct from DB)
const callGraph = FlowGraph.fromCallEvents(storedEvents)
// Or build incrementally as events arrive
const callGraph = new FlowGraph(CallNodeAttrs, CallEdgeAttrs)
// Process events
callGraph.updateFromEvent(event) // handles all 5 call.* event types
// Status management
callGraph.updateStatus(requestId, "running", { startedAt: now.toISOString() }) // hub-initiated, not event-driven
callGraph.updateStatus(requestId, "completed") // validates state machine transition
// Call management
callGraph.addCall({ requestId, operationId, status: "pending", parentRequestId?, input?, identity?, startedAt? })
callGraph.updateCall(requestId, attrs) // partial update of any CallNodeAttrs
callGraph.removeCall(requestId)
callGraph.addDependency(sourceRequestId, targetRequestId) // depends_on edge
// Queries
callGraph.children(requestId) // direct children via triggered edges
callGraph.descendants(nodeId) // all descendants
callGraph.lineage(requestId) // ancestor chain from root to this call
callGraph.getRoots() // calls with no parentRequestId
callGraph.filterByStatus("running") // all running calls
callGraph.duration(requestId) // completedAt - startedAt in ms
// Escape hatch
callGraph.graph // raw graphology DirectedGraph
// Serialization (for Postgres persistence)
const data = callGraph.export() // -> CallGraphSerialized
const restored = FlowGraph.fromJSON(data)
Graph Size
At hub level, the call graph is small — just metadata nodes mapping to the actual process/call. Agent workflow call graphs will have tens of nodes at most for simple workflows, potentially hundreds for complex coordination with many parallel tasks. Performance is a non-issue for call-level metadata; flowgraph wraps graphology which handles thousands of nodes efficiently.
Reactive Workflow Execution
For running workflow executions (not just observability), @alkdev/flowgraph/reactive provides WorkflowReactiveRoot — a signal-driven execution engine that:
- Takes a
DirectedGraph(from a ujsx template rendered viaGraphologyHostConfig) and creates reactive state for every node - Processes call protocol events via
append(event)— the event log is the source of truth, status/results are derived projections - Computes
preconditions(all predecessors completed),canStart(preconditions met + not blocked by failure), andblockedByFailure(any predecessor failed/aborted) as reactive signals - Supports
FailurePolicy:"continue-running"(only abort idle/waiting dependents) or"abort-dependents"(cascade abort to all non-terminal dependents) - Maps node keys to
requestIds viasetRequestId(nodeKey, requestId)— bridging template nodes to call protocol identifiers - Requires
dispose()to release signal subscriptions
import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive"
// WorkflowReactiveRoot takes the raw DirectedGraph (flowGraph.graph),
// not the FlowGraph wrapper
const workflowRoot = new WorkflowReactiveRoot(templateGraph.graph)
try {
// Bridge template nodes to call protocol request IDs
workflowRoot.setRequestId("step-1", requestId1)
workflowRoot.setRequestId("step-2", requestId2)
// Process events (appended by the hub's call protocol handler)
workflowRoot.append(callRequestedEvent)
workflowRoot.append(callRespondedEvent)
// Query reactive state
const status = workflowRoot.getStatus("step-1") // NodeStatus
const canStart = workflowRoot.canStart.get("step-2") // ReadonlySignal<boolean>
const isComplete = workflowRoot.isComplete() // all nodes terminal?
} finally {
workflowRoot.dispose()
}
This is the execution engine for workflow-based coordination. The hub coordinator instantiates a WorkflowReactiveRoot for each running workflow, feeds it call protocol events, and uses its reactive state to determine what to do next (start the next step, handle failures, cascade aborts).
Storage
Call graph nodes and edges are stored in Postgres. See storage/call-graph.md for the full schema definitions.
The storage layer persists individual call_graph_nodes and call_graph_edges rows. Flowgraph's export() produces graphology's native JSON format (CallGraphSerialized), which is suitable for snapshot/restore but not for incremental observability queries. The hub uses both:
- Incremental storage: Each call protocol event writes/updates a row in
call_graph_nodesand createscall_graph_edgesas needed. This supports real-time observability queries (what's running, what failed, what's blocked). - Reconstruction: After a hub restart, the call graph can be reconstructed from stored events or from incremental rows using
FlowGraph.fromCallEvents().
Write Path
The hub's CallHandler is responsible for writing call graph data to Postgres. When a call protocol event arrives:
call.requested: TheCallHandlercreates a row incall_graph_nodes(status:pending) and, ifparentRequestIdis present, atriggerededge incall_graph_edges. This write happens synchronously before dispatching to ensure the call is tracked even if the handler fails immediately.- Handler dispatch: The
CallHandlerdispatches the operation handler. At this point it updates the node status torunningand setsstartedAt— this is not an event, but a hub-initiatedupdateStatus()call on both the in-memory flowgraph and the DB row. call.responded: Updates the node's status tocompleted, setsoutput(unwrapped from theResponseEnvelope— onlydatais stored, notmeta), and setscompletedAt.call.error: Updates status tofailed, setserror, and setscompletedAt.call.aborted: Updates status toabortedand setscompletedAt. The hub'sCallHandlerthen cascades the abort to child calls (this is hub logic, not flowgraph'supdateFromEvent()).call.completed: SetscompletedAtif not already set. Idempotent — no-op if the call is alreadycompleted.
Error handling: If a DB write fails, the call still proceeds (the handler has already been invoked). The hub logs the write failure and continues. Call graph data is best-effort — the in-memory flowgraph is the authoritative source for running calls; the DB is for persistence and observability.
Identifier Mapping
The call_graph_nodes table uses two identifiers:
id(UUID, fromcommonCols): Internal primary key, used as the FK target forcall_graph_edges.requestId(text, UNIQUE): Protocol-level correlation key, used as the flowgraph node key.
When reconstructing a flowgraph from the database, the hub uses requestId as the node key (matching CallNodeAttrs.requestId). The call_graph_edges table uses sourceId/targetId referencing call_graph_nodes.id (the UUID), so reconstruction requires resolving UUIDs to requestIds. The call_graph_nodes.requestId column has a UNIQUE index, making this lookup efficient.
call_graph_nodes — One row per call
| Column | Type | Notes |
|---|---|---|
| commonCols | — | id, metadata, createdAt, updatedAt |
| requestId | text NOT NULL UNIQUE | Protocol-level correlation key. Also the flowgraph node key. |
| operationId | text NOT NULL | FK → operations.id (RESTRICT). NOT NULL — CallNodeAttrs.operationId is required in flowgraph. |
| parentRequestId | text | Denormalized parent — fast point lookup. Redundant with triggered edge. |
| identity | jsonb | Caller identity: { id, scopes, resources? } |
| callerAccountId | text | FK → accounts.id (ON DELETE SET NULL). System calls are nullable. |
| status | text NOT NULL | Matches CallStatus enum: pending, running, completed, failed, aborted |
| input | jsonb | Call input (redacted, truncated — see storage/call-graph.md) |
| output | jsonb | Call output (on success) |
| error | jsonb | { code, message, details? } (on failure, nested under error key matching flowgraph) |
| startedAt | timestamp with tz | When call was dispatched. Type conversion: flowgraph stores as ISO 8601 string; DB stores as timestamptz. |
| completedAt | timestamp with tz | When call completed/failed/aborted. Same type conversion as startedAt. |
call_graph_edges — Typed directed edges between calls
| Column | Type | Notes |
|---|---|---|
| commonCols | — | id, metadata, createdAt, updatedAt |
| sourceId | text NOT NULL | FK → call_graph_nodes.id (CASCADE) |
| targetId | text NOT NULL | FK → call_graph_nodes.id (CASCADE) |
| edgeType | text NOT NULL | triggered, depends_on, or requested_by |
Edge type semantics: triggered = execution hierarchy (parentRequestId), depends_on = data dependency, requested_by = identity/authorization chain. See storage/call-graph.md for details.
Note on depends_on in flowgraph: The flowgraph CallEdgeAttrs type is a union of TriggeredEdgeAttrs and DependencyEdgeAttrs, matching the triggered and depends_on edge types. The requested_by edge type is a storage-layer concept for identity tracing that doesn't have a corresponding flowgraph edge type — it's persisted in the database but not modeled in the in-memory graph.
Transport Mapping
The call protocol is transport-agnostic. The TypedEventTarget plug point (same pattern as RedisEventTarget in the pubsub design) determines how events move:
| Transport | Use Case | TypedEventTarget impl |
|---|---|---|
| In-process | Local hub operations | Browser EventTarget (default) |
| Redis | Cross-process events (e.g., hub → all processes) | RedisEventTarget |
| WebSocket | Hub ↔ spoke bidirectional | createWebSocketServerEventTarget (hub) / createWebSocketClientEventTarget (spoke) from @alkdev/pubsub |
A WebSocketEventTarget implementing TypedEventTarget makes each spoke runner's websocket connection a live bidirectional channel. The hub dispatches call.requested over the socket; the runner sends call.responded/call.error back. Same protocol, same event shapes, same PendingRequestMap — just a different eventTarget.
What We Defer
- Full ujsx call templates — currently using hardcoded workflow sequences.
@alkdev/flowgraph/componentprovidesOperation,Sequential,Parallel,Conditional,Mapcomponents for declarative template definition, andGraphologyHostConfig+ReactiveHostConfigfor rendering. We'll adopt these when workflow complexity justifies it. - Graph visualization — API only, no Sigma.js UI
- Stream deduplication —
Value.Hash({operationId, input})deduplication for multiple subscribers to the same stream requested_byedge creation in flowgraph — therequested_byedge type is a storage-layer concept for identity tracing. It's persisted incall_graph_edgesbut not modeled in@alkdev/flowgraph'sCallEdgeAttrsunion. We may add it to flowgraph in the future.
The call protocol itself, PendingRequestMap, CallHandler, buildEnv dual-mode, call graph auto-tracking, and reactive workflow execution are in the initial implementation. They're not much code and they prevent the need to bolt on ad-hoc error handling and abort logic in every coordination operation.
Open Questions
-
Operation deletion and call graph referential integrity: The
call_graph_nodes.operationIdcolumn has a RESTRICT FK tooperations.id. An operation cannot be deleted while any call records reference it. For v1, the strategy is to deny removal while call records exist. If operation removal becomes necessary (e.g., cleanup of old operations), the hub would need to either: (a) reassign all referencing call records to a sentinel__removed__operation (pre-seeded in migrations withid='__removed__',namespace='system'), then delete the original operation, or (b) accept that historical call records reference operations that may have been provided by disconnected spokes — in which case, consider makingoperationIdnullable in flowgraph'sCallNodeAttrsso the hub can NULL the FK instead of requiring a sentinel row. This requires coordination with the@alkdev/flowgraphpackage. -
Reactive vs. call graph
requestedsemantics: InFlowGraph,call.requestedcreates a node inpendingstate. InWorkflowReactiveRoot,call.requestedmaps toNodeStatus.running(the reactive model assumes a template node starts executing when requested). This is a deliberate semantic difference — the reactive model tracks execution progress, while the call graph model tracks protocol state. The spec documents this, but implementers should be aware that feeding the same event to both models produces different initial statuses.
Dependencies
@alkdev/flowgraph # DAG construction, reactive execution, call/operation graphs, type-compat analysis
@alkdev/operations # Call protocol, PendingRequestMap, CallHandler
@alkdev/pubsub # Event transport (Redis, WebSocket, Worker)
@alkdev/taskgraph # Task graph construction and analysis (for task management, not call graphs)
Why both @alkdev/flowgraph and @alkdev/taskgraph? @alkdev/taskgraph is a domain-specific library for task DAG construction with categorical estimates (scope, risk, impact), frontmatter parsing, and task-specific analysis (critical path, bottleneck detection, risk assessment). @alkdev/flowgraph is a general-purpose workflow graph library for call/operation DAGs with ujsx template composition and reactive execution. They both wrap graphology, but serve different domains. The hub uses @alkdev/taskgraph for task management and @alkdev/flowgraph for call graph and operation graph management.
Prior Art
The call protocol was adapted from ade_spoke's call protocol design (which was pubsub-agnostic). The key difference here is that websockets are the primary transport for hub-spoke communication rather than SSE. The call graph and operation graph are now implemented using @alkdev/flowgraph rather than raw graphology, which provides DAG enforcement, type-compatibility analysis, and reactive execution out of the box.