Files
hub/docs/architecture/call-graph.md
glm-5.1 93e2286343 Align storage & architecture specs with published npm libraries
Systematically compared @alkdev/taskgraph, @alkdev/operations, and
@alkdev/flowgraph against storage/arch specs and fixed all mismatches.

Key changes:

Tasks (storage/tasks.md + ADR-011):
- Rename TaskFrontmatter → TaskInput to match library export
- Fix dependsOn (was depends_on) in field mappings — library uses
  camelCase; parseFrontmatter normalizes YAML snake_case on input
- Document DependencyEdge shape {from, to, qualityRetention?} and
  DB↔library field mapping
- Document graph node vs DB column distinction (TaskGraphNodeAttrs
  is a subset of TaskInput)
- Fix default risk fallback from low → medium (matches resolveDefaults)
- Fix cross-project guard column references (dependentTaskId, not taskId)
- Clarify @alkdev/taskgraph TS is source of truth; frontmatter is for
  LLM output parsing and legacy imports, not Rust CLI
- Add complete library exports reference

Operations (storage/spokes.md + operations.md):
- Add version, title, _meta columns to operations table (required by
  OperationSpec, were missing)
- Fix type casing: query/mutation/subscription (lowercase, matching
  OperationType runtime values)
- Make outputSchema and accessControl NOT NULL (matching library)
- Document ErrorDefinition shape {code, description, schema, httpStatus?}
- Document _meta vs commonCols.metadata distinction
- Add registerAll, get, getHandler, getByName, list, subscribe methods
- Fix buildCallHandler signature ({ registry, callMap })
- Fix OperationType values (lowercase)

Call graph (storage/call-graph.md + call-graph.md):
- Change operationId to NOT NULL with RESTRICT FK (was nullable/SET NULL)
  — matches flowgraph's required CallNodeAttrs.operationId
- Document sentinel __removed__ operation strategy for deletions
- Document ISO 8601 string ↔ timestamptz conversion requirement
- Rewrite CallEventMap to match actual library: flat dot-notation keys,
  timestamp on all events, nested error structure, optional output on
  completed event
- Remove call.running event (doesn't exist in library) — hub calls
  updateStatus(running) directly on dispatch
- Fix buildCallHandler({ registry, callMap }) signature
- Fix PendingRequestMap constructor (positional EventTarget)
- Add updateCall/removeCall/graph methods to API summary
- Document abort cascade as hub logic, not flowgraph logic
- Add open questions for operation deletion and reactive vs call graph
  semantics

Table reference (storage/table-reference.md):
- Update call_graph_nodes.operationId cascade to RESTRICT
- Update operations.type comment to lowercase
- Update status enum reference
2026-05-25 11:46:42 +00:00

36 KiB

status, last_updated
status last_updated
draft 2026-05-25

Call Protocol, Call Graph & Operation Graph

Overview

The call protocol is the unified transport layer for all operation invocations. It provides a single event-based mechanism that works the same whether the call is local (in-process), remote (hub ↔ spoke over websocket), or streamed (subscription). The call graph and operation graph are built on top of it — and @alkdev/flowgraph provides the graph construction, analysis, and reactive execution primitives.

Websockets are the primary transport for hub-spoke communication, not SSE. SSE is half-duplex and requires polling for the reverse path; websockets give us bidirectional channels where hub → spoke dispatch and spoke → hub results flow through the same connection. The call protocol's call ≡ subscribe semantics map naturally: a websocket frame comes in, the protocol resolves or streams depending on the consumption pattern.

Transport distinction: WebSocket is the primary bidirectional transport for hub↔spoke and hub↔client-spoke communication. SSE support exists for compatibility (e.g., OpenAI proxy streams, legacy clients) but is not the preferred transport. A client (browser, CLI) that connects as a spoke gets full bidirectional communication over a single WebSocket — no SSE needed.

call ≡ subscribe

At the protocol level, call and subscribe are the same thing with different consumption patterns:

  • call: Publish call.requested, subscribe to call.responded:{requestId}, resolve on first response → Promise<TOutput>
  • subscribe: Publish call.requested, subscribe to call.responded:{requestId}, yield each response → AsyncIterable<TOutput>

Both use the same event types, the same requestId correlation, and the same PendingRequestMap. The only difference is that call resolves after the first call.responded and unsubscribes, while subscribe stays open and yields each call.responded until call.aborted or call.error.

This means call is semantically subscribe().next() — a subscription that completes after one event.

HTTP endpoint: An HTTP POST /api/{namespace}/{operation} is just call over HTTP — publish a call.requested, wait for call.responded, return the output as JSON.

WebSocket endpoint: A websocket connection carries bidirectional call protocol events. The hub pushes call.requested to spoke runners; runners push call.responded/call.error back. Same protocol, different transport. This is the hub-spoke "rpc-mode": persistent connection, no polling, natural streaming support.

Why We Keep the Call Protocol (Not Just the Graphs)

  1. SDD process requires it — the coordinator models development workflows between agents using the call graph. When the architect calls the decomposer which calls the coordinator which spawns implementation specialists, that's a call graph. The call protocol is what populates it automatically.

  2. Abort cascading — when a parent operation fails or is aborted, all child operations should be notified. The call protocol propagates call.aborted through parentRequestId chains. Without it, each coordination operation handles errors ad-hoc (e.g., coord.spawn chains 5 registry.execute() calls — if the 3rd fails, there's no structured abort of the first two or the pending 4th/5th).

  3. Observability — seeing what operations called what, how long they took, what failed, is essential for debugging agent workflows. The call protocol auto-tracks calls via PendingRequestMap; the call graph is populated as a side effect.

  4. Unified error handlingmapError + InfrastructureErrors + errorSchemas declaration gives structured, typed errors across all transports. Without it, each consumer invents its own error format.

  5. Transport flexibility — the TypedEventTarget plug point means the same protocol works over in-process EventTarget, Redis channels, or websockets. The hub uses all three: in-process for local operations, Redis for cross-process events, websockets for spoke runner dispatch.

  6. Future Rust rewrite — the API contract needs to be stable. The call protocol is a small, well-defined event contract. Building it now means the Rust rewrite has a spec to implement against.

Call Event Types

All communication flows through typed events. The call event TypeBox schemas are defined in @alkdev/operations as CallEventSchema (flat dot-notation keys like "call.requested"). The shapes below match the library's actual event types.

import { Type } from "@alkdev/typebox"

// CallEventSchema uses flat dot-notation keys (not nested objects):
// "call.requested", "call.responded", "call.completed", "call.aborted", "call.error"
// The shapes below show the event payload for each type.

// call.requested
Type.Object({
  type: Type.Literal("call.requested"),
  requestId: Type.String(),
  operationId: Type.String(),
  input: Type.Unknown(),
  timestamp: Type.String(),  // ISO 8601 — used as source for startedAt/completedAt
  parentRequestId: Type.Optional(Type.String()),
  identity: Type.Optional(Type.Object({
    id: Type.String(),
    scopes: Type.Array(Type.String()),
    resources: Type.Optional(Type.Record(Type.String(), Type.Array(Type.String())))
  })),
  startedAt: Type.Optional(Type.String()),  // ISO 8601 — if provided, overrides timestamp for startedAt
})

// call.responded
Type.Object({
  type: Type.Literal("call.responded"),
  requestId: Type.String(),
  output: Type.Unknown(),  // ResponseEnvelope from @alkdev/operations
  timestamp: Type.String(), // ISO 8601
})

// call.completed
Type.Object({
  type: Type.Literal("call.completed"),
  requestId: Type.String(),
  output: Type.Optional(Type.Unknown()), // Optional final output
  timestamp: Type.String(), // ISO 8601
})

// call.aborted
Type.Object({
  type: Type.Literal("call.aborted"),
  requestId: Type.String(),
  timestamp: Type.String(), // ISO 8601
})

// call.error
Type.Object({
  type: Type.Literal("call.error"),
  requestId: Type.String(),
  error: Type.Object({       // Error is nested under "error" key
    code: Type.String(),
    message: Type.String(),
    details: Type.Optional(Type.Unknown()),
  }),
  timestamp: Type.String(), // ISO 8601
})

Note on deadline: The call.requested event above does not include a deadline field. Deadlines are a PendingRequestMap concept (timeouts are applied at the protocol layer, not persisted in the call graph). If a call times out, the PendingRequestMap emits a call.error with code TIMEOUT.

Note: no call.running event: The library's CallEventMapValue union only has 5 event types. There is no call.running event. When the hub dispatches an operation handler, it calls flowGraph.updateStatus(requestId, "running", { startedAt: now.toISOString() }) directly. This is a hub-initiated state transition, not an event. See Write Path for details.

Event Semantics

  • call.requested — Initiates a call. Creates a call graph node (status: pending) and adds a triggered edge if parentRequestId is present. If startedAt is provided in the event, it overrides timestamp for the node's startedAt.
  • call.responded — Carries the call result. For one-shot calls, this is the terminal event that resolves the Promise<ResponseEnvelope>. The output field contains a ResponseEnvelope (with data and meta fields) from @alkdev/operations. Sets status: "completed" and completedAt: event.timestamp.
  • call.completed — Terminal completion signal, idempotent if call.responded was already received. For subscriptions, fires after the last call.responded to signal stream end. May include optional output. Sets completedAt: event.timestamp if not already set.
  • call.aborted — Call was cancelled. Sets status to aborted and completedAt: event.timestamp. Note: flowgraph's updateFromEvent() does NOT cascade aborts to children — the hub's CallHandler is responsible for cascading call.aborted to descendant calls via the pubsub layer.
  • call.error — Call failed with an error. Sets status to failed, stores the error (nested under error: { code, message, details? }), and sets completedAt: event.timestamp.

Note on @alkdev/flowgraph: The CallEventMapValue type in @alkdev/flowgraph/schema defines the union of these event types. Flowgraph's FlowGraph.fromCallEvents() and updateFromEvent() consume these events directly to populate the call graph. The CallStatus enum in flowgraph (pending, running, completed, failed, aborted) aligns with the statuses in the call protocol events.

Note on ResponseEnvelope unwrapping: The call.responded event carries output as a ResponseEnvelope (from @alkdev/operations). When feeding events to @alkdev/flowgraph, the hub unwraps the envelope before calling updateFromEvent()CallNodeAttrs.output stores the ResponseEnvelope.data value (the actual result), not the full envelope. The ResponseEnvelope.meta is discarded at the call graph level (it's available in PendingRequestMap for the caller, but not persisted in the graph node). This means call_graph_nodes.output contains the unwrapped result data.

Identity: The Identity type represents the caller's security context. Derived from keypal's ApiKeyMetadatascopes maps directly from keypal's global scopes, resources maps from keypal's resource-scoped permissions (key format: "type:id", value: scope array). Passed through the call chain and checked by CallHandler against the operation's AccessControl definition. See operations.md for the AccessControl type.

Request correlation: Every call has a unique requestId. Nested calls include parentRequestId to track the call chain. Responses and errors are matched to requests by requestId.

Error Model

The call protocol uses a unified error model: both infrastructure (protocol-level) and domain (operation-level) errors flow through the same CallError event. CallError.code is string — the distinction between infrastructure and domain codes is by convention, not by type.

Infrastructure Error Codes

Reserved codes produced by CallHandler itself, before or after operation execution:

Code When Schema
OPERATION_NOT_FOUND No operation matches operationId { operationId: string }
ACCESS_DENIED Missing scopes { requiredScopes?: string[] }
VALIDATION_ERROR Input fails inputSchema check { errors: ValueError[] }
TIMEOUT Deadline exceeded { deadline: number }
ABORTED Call cancelled { reason?: string }
EXECUTION_ERROR Handler threw, no errorSchemas match { message: string }
UNKNOWN_ERROR Non-Error thrown { raw: string }

Domain Error Propagation

Operations declare their possible errors via errorSchemas on IOperationDefinition. When a handler throws, mapError matches the thrown error against the declared schemas — falls back to EXECUTION_ERROR if no match.

errorSchemas is the contract: An operation's errorSchemas declaration is the contract between the operation and its callers about what errors it might produce. No errorSchemas = safe default with EXECUTION_ERROR wrapper.

PendingRequestMap

Manages in-flight requests and provides the call() interface:

// From @alkdev/operations
import { PendingRequestMap } from "@alkdev/operations"

// Construction — takes optional EventTarget for pluggable transport (positional, not options object)
const prm = new PendingRequestMap(eventTarget?)

// Call protocol — call() returns Promise<ResponseEnvelope>
const envelope = await prm.call(operationId, input, { deadline, identity })
// envelope.data contains the result, envelope.meta contains source + timestamp

// Subscribe protocol — returns AsyncIterable<ResponseEnvelope>
const stream = prm.subscribe(operationId, input, { idleTimeout, identity })
for await (const envelope of stream) {
  // yield each response
}

// Resolving calls
prm.respond(requestId, output)  // output must be ResponseEnvelope
prm.emitError(requestId, code, message, details?)
prm.complete(requestId)
prm.abort(requestId)

Key behaviors:

  • call() returns Promise<ResponseEnvelope> (not Promise<unknown>)
  • subscribe() returns AsyncIterable<ResponseEnvelope>
  • respond() requires isResponseEnvelope(output)
  • Built-in deadline and idle timeout support
  • Constructor takes optional EventTarget for pluggable transport (positional parameter, not options object)

CallHandler

Bridges pubsub events to OperationRegistry.execute(). Performs access control and error mapping:

import { buildCallHandler } from "@alkdev/operations"

const handler = buildCallHandler({ registry, callMap })
// callMap: PendingRequestMap instance (not raw EventTarget)
// subscribes to call.requested events
// checks access control (requiredScopes, resource permissions) against Identity
// executes via registry, dispatches call.responded on success
// maps errors via mapError, dispatches call.error

Nested Call Wiring

Routing is an env construction concern, not a separate protocol layer. buildEnv is the single function that creates the env:

  • Direct mode: buildEnv({ registry, context }) → env functions call registry.execute() directly, return Promise<ResponseEnvelope>
  • Call protocol mode: PendingRequestMap handles routing internally, parentRequestId is set via context

buildEnv no longer takes a callMap parameter. It sets trusted: true on nested context (bypasses access control for internal calls). Env functions return Promise<ResponseEnvelope>, not Promise<unknown>. Callers must use unwrap(envelope) or access envelope.data for the result.

parentRequestId propagation: Every nested call includes parentRequestId — enables call graph reconstruction and abort cascading.

Operation Graph (Static)

Built once at startup from the OperationRegistry. Represents type-compatibility edges between operations. Implemented using @alkdev/flowgraph.

Structure

Node = OperationNodeAttrs (namespace.name, type, inputSchema, outputSchema)
Edge = OperationEdgeAttrs (compatible: boolean, detail?, mismatches?)
Edge type = "typed" (from flowgraph EdgeType enum)

The operation graph is constructed via FlowGraph.fromSpecs(specs), which takes an array of OperationSpec objects (derived from OperationRegistry) and:

  1. Creates a node for each operation with OperationNodeAttrs attributes
  2. Runs buildTypeEdges(graph) to create edges between operations whose output/input schemas are type-compatible
  3. Throws CycleError if the resulting graph has cycles (DAG invariant)

Type Compatibility

typeCompat(outputSchema, inputSchema) performs deep structural comparison of two TypeBox schemas. Returns:

  • { compatible: true } — output is a subtype of input
  • { compatible: true, detail } — compatible with notes (e.g., "output has extra fields")
  • { compatible: false, mismatches: TypeMismatch[] } — structural incompatibility
  • undefined — one or both schemas are unknown/any (no meaningful check possible)

Edges where compatible: false are still added to the graph (with compatible: false and the mismatch details) so the graph is complete for observability, but the compatible attribute allows consumers to filter.

Call Templates for SDD

The SDD process defines a natural workflow:

architect → architecture-reviewer → decomposer → coordinator → implementation-specialist → code-reviewer

This is a call template — a validated path through the operation graph that the coordinator can instantiate as a call graph at runtime.

Current approach: Hardcoded workflow sequences. See "What We Defer" below.

Future approach: @alkdev/flowgraph provides ujsx workflow composition components (Operation, Sequential, Parallel, Conditional, Map) that can define templates declaratively. The GraphologyHostConfig renders templates to a DirectedGraph for validation, and ReactiveHostConfig renders them to reactive WorkflowNode trees for execution. When we adopt template-based workflows, flowgraph provides the validation (validateTemplate), type-compatibility checking, and DAG enforcement out of the box.

API Summary

import { FlowGraph } from "@alkdev/flowgraph/graph"
import { typeCompat, buildTypeEdges, topologicalOrder, validateGraph } from "@alkdev/flowgraph/analysis"

// Build operation graph from registered operations.
// Note: @alkdev/flowgraph's OperationSpec is a structural subset of
// @alkdev/operations' OperationSpec (it omits handler, accessControl, errorSchemas).
// The .map() transforms between the two types.
const opGraph = FlowGraph.fromSpecs(registry.list().map(spec => ({
  name: spec.name,
  namespace: spec.namespace,
  version: spec.version,
  type: spec.type,        // "query" | "mutation" | "subscription"
  inputSchema: spec.inputSchema,
  outputSchema: spec.outputSchema,
  description: spec.description,
})))

// Query
const compatResult = typeCompat(opA.outputSchema, opB.inputSchema)
const order = topologicalOrder(opGraph.graph)
const issues = validateGraph(opGraph.graph)

// Serialization
const data = opGraph.export()   // -> OperationGraphSerialized (graphology JSON format)
const restored = FlowGraph.fromJSON(data)  // validates schema + DAG invariant

Call Graph (Dynamic)

Created at runtime for each workflow execution. Populated automatically by the call protocol — every call.requested adds a node, every call.responded/call.error/call.aborted updates its state and timestamp. Implemented using @alkdev/flowgraph.

Structure

Node = CallNodeAttrs (requestId, operationId, status, input, output?, error?, identity?, parentRequestId?, startedAt?, completedAt?)
Edge type "triggered"  = execution hierarchy (parentRequestId → child call)
Edge type "depends_on" = data dependency (call A waits on call B's output)

EdgeType scoping: @alkdev/flowgraph defines five edge types in its EdgeType enum: triggered, depends_on, typed, sequential, conditional. Not all apply to every graph type:

  • Call graph: triggered and depends_on (plus the storage-layer requested_by)
  • Operation graph: typed (type compatibility between operations)
  • Template graph: sequential and conditional (workflow composition via ujsx)

This document focuses on call graph edge types. See the flowgraph architecture docs for the full type definitions.

The call graph is populated by FlowGraph.fromCallEvents(events) or incrementally via updateFromEvent(event). Each call protocol event maps directly to a graph mutation:

Event Graph Mutation
call.requested addCall(attrs) — creates node (status: pending) + triggered edge if parentRequestId present
call.responded updateFromEvent() sets status: "completed", output, completedAt: event.timestamp (idempotent for terminal states)
call.completed Sets completedAt: event.timestamp if not already set. Idempotent — if already completed, no-op. May also set output if present in event.
call.error updateFromEvent() sets status: "failed", error: { code, message, details? }, completedAt: event.timestamp
call.aborted updateFromEvent() sets status: "aborted", completedAt: event.timestamp. No cascade: updateFromEvent() does not abort children — the hub's CallHandler handles cascading.
(hub-initiated) updateStatus(requestId, "running", { startedAt: now.toISOString() })Not an event. The hub's CallHandler calls updateStatus() directly when it dispatches the operation handler, transitioning pendingrunning.

Call Status State Machine

Flowgraph enforces valid status transitions via updateStatus(). The state machine is:

pending → running → completed
                  → failed
         → aborted
running → aborted

Terminal states (completed, failed, aborted) are immutable. InvalidTransitionError is thrown on invalid transitions. This matches the storage layer's call_graph_nodes.status enum.

Abort Cascading

When a call is aborted, all of its children should also be aborted. Flowgraph provides two mechanisms:

  1. triggered edge traversal: children(requestId) returns direct children via triggered edges. Full cascading uses descendants(requestId) for all descendants.
  2. WorkflowReactiveRoot: For running workflow executions, the reactive engine provides abortNode(nodeId) and abortAll() with FailurePolicy configuration ("continue-running" vs "abort-dependents").

The hub's CallHandler wires call.aborted events to:

  • updateStatus(requestId, "aborted") on the call graph
  • Pubsub event propagation so downstream PendingRequestMap instances also call abort() on their in-flight requests
  • WorkflowReactiveRoot.abortNode(nodeId) if a workflow execution is tracking this call

depends_on Edges

While triggered edges represent the parent-child execution hierarchy, depends_on edges represent data dependencies — a call that needs another call's output before it can proceed. These are created by the coordinator when orchestrating workflows:

callGraph.addDependency(sourceRequestId, targetRequestId)
// Adds a "depends_on" edge (source depends on target's output)
// Cycle-checked — throws CycleError if the edge would create a cycle

depends_on edges are not created by the call protocol itself. They are added by coordination logic that knows the data flow between calls (e.g., the coordinator knows that coord.spawn step 3 depends on step 1's output). This gives the observability layer a richer graph for analysis without changing the protocol.

API Summary

import { FlowGraph } from "@alkdev/flowgraph/graph"
import { CallStatus } from "@alkdev/flowgraph/schema"

// Build call graph from events (e.g., after hub restart, reconstruct from DB)
const callGraph = FlowGraph.fromCallEvents(storedEvents)

// Or build incrementally as events arrive
const callGraph = new FlowGraph(CallNodeAttrs, CallEdgeAttrs)

// Process events
callGraph.updateFromEvent(event)       // handles all 5 call.* event types

// Status management
callGraph.updateStatus(requestId, "running", { startedAt: now.toISOString() })  // hub-initiated, not event-driven
callGraph.updateStatus(requestId, "completed") // validates state machine transition

// Call management
callGraph.addCall({ requestId, operationId, status: "pending", parentRequestId?, input?, identity?, startedAt? })
callGraph.updateCall(requestId, attrs)  // partial update of any CallNodeAttrs
callGraph.removeCall(requestId)
callGraph.addDependency(sourceRequestId, targetRequestId)  // depends_on edge

// Queries
callGraph.children(requestId)           // direct children via triggered edges
callGraph.descendants(nodeId)            // all descendants
callGraph.lineage(requestId)            // ancestor chain from root to this call
callGraph.getRoots()                     // calls with no parentRequestId
callGraph.filterByStatus("running")      // all running calls
callGraph.duration(requestId)           // completedAt - startedAt in ms

// Escape hatch
callGraph.graph                          // raw graphology DirectedGraph

// Serialization (for Postgres persistence)
const data = callGraph.export()          // -> CallGraphSerialized
const restored = FlowGraph.fromJSON(data)

Graph Size

At hub level, the call graph is small — just metadata nodes mapping to the actual process/call. Agent workflow call graphs will have tens of nodes at most for simple workflows, potentially hundreds for complex coordination with many parallel tasks. Performance is a non-issue for call-level metadata; flowgraph wraps graphology which handles thousands of nodes efficiently.

Reactive Workflow Execution

For running workflow executions (not just observability), @alkdev/flowgraph/reactive provides WorkflowReactiveRoot — a signal-driven execution engine that:

  1. Takes a DirectedGraph (from a ujsx template rendered via GraphologyHostConfig) and creates reactive state for every node
  2. Processes call protocol events via append(event) — the event log is the source of truth, status/results are derived projections
  3. Computes preconditions (all predecessors completed), canStart (preconditions met + not blocked by failure), and blockedByFailure (any predecessor failed/aborted) as reactive signals
  4. Supports FailurePolicy: "continue-running" (only abort idle/waiting dependents) or "abort-dependents" (cascade abort to all non-terminal dependents)
  5. Maps node keys to requestIds via setRequestId(nodeKey, requestId) — bridging template nodes to call protocol identifiers
  6. Requires dispose() to release signal subscriptions
import { WorkflowReactiveRoot } from "@alkdev/flowgraph/reactive"

// WorkflowReactiveRoot takes the raw DirectedGraph (flowGraph.graph),
// not the FlowGraph wrapper
const workflowRoot = new WorkflowReactiveRoot(templateGraph.graph)
try {
  // Bridge template nodes to call protocol request IDs
  workflowRoot.setRequestId("step-1", requestId1)
  workflowRoot.setRequestId("step-2", requestId2)

  // Process events (appended by the hub's call protocol handler)
  workflowRoot.append(callRequestedEvent)
  workflowRoot.append(callRespondedEvent)

  // Query reactive state
  const status = workflowRoot.getStatus("step-1")    // NodeStatus
  const canStart = workflowRoot.canStart.get("step-2") // ReadonlySignal<boolean>
  const isComplete = workflowRoot.isComplete()         // all nodes terminal?
} finally {
  workflowRoot.dispose()
}

This is the execution engine for workflow-based coordination. The hub coordinator instantiates a WorkflowReactiveRoot for each running workflow, feeds it call protocol events, and uses its reactive state to determine what to do next (start the next step, handle failures, cascade aborts).

Storage

Call graph nodes and edges are stored in Postgres. See storage/call-graph.md for the full schema definitions.

The storage layer persists individual call_graph_nodes and call_graph_edges rows. Flowgraph's export() produces graphology's native JSON format (CallGraphSerialized), which is suitable for snapshot/restore but not for incremental observability queries. The hub uses both:

  • Incremental storage: Each call protocol event writes/updates a row in call_graph_nodes and creates call_graph_edges as needed. This supports real-time observability queries (what's running, what failed, what's blocked).
  • Reconstruction: After a hub restart, the call graph can be reconstructed from stored events or from incremental rows using FlowGraph.fromCallEvents().

Write Path

The hub's CallHandler is responsible for writing call graph data to Postgres. When a call protocol event arrives:

  1. call.requested: The CallHandler creates a row in call_graph_nodes (status: pending) and, if parentRequestId is present, a triggered edge in call_graph_edges. This write happens synchronously before dispatching to ensure the call is tracked even if the handler fails immediately.
  2. Handler dispatch: The CallHandler dispatches the operation handler. At this point it updates the node status to running and sets startedAt — this is not an event, but a hub-initiated updateStatus() call on both the in-memory flowgraph and the DB row.
  3. call.responded: Updates the node's status to completed, sets output (unwrapped from the ResponseEnvelope — only data is stored, not meta), and sets completedAt.
  4. call.error: Updates status to failed, sets error, and sets completedAt.
  5. call.aborted: Updates status to aborted and sets completedAt. The hub's CallHandler then cascades the abort to child calls (this is hub logic, not flowgraph's updateFromEvent()).
  6. call.completed: Sets completedAt if not already set. Idempotent — no-op if the call is already completed.

Error handling: If a DB write fails, the call still proceeds (the handler has already been invoked). The hub logs the write failure and continues. Call graph data is best-effort — the in-memory flowgraph is the authoritative source for running calls; the DB is for persistence and observability.

Identifier Mapping

The call_graph_nodes table uses two identifiers:

  • id (UUID, from commonCols): Internal primary key, used as the FK target for call_graph_edges.
  • requestId (text, UNIQUE): Protocol-level correlation key, used as the flowgraph node key.

When reconstructing a flowgraph from the database, the hub uses requestId as the node key (matching CallNodeAttrs.requestId). The call_graph_edges table uses sourceId/targetId referencing call_graph_nodes.id (the UUID), so reconstruction requires resolving UUIDs to requestIds. The call_graph_nodes.requestId column has a UNIQUE index, making this lookup efficient.

call_graph_nodes — One row per call

Column Type Notes
commonCols id, metadata, createdAt, updatedAt
requestId text NOT NULL UNIQUE Protocol-level correlation key. Also the flowgraph node key.
operationId text NOT NULL FK → operations.id (RESTRICT). NOT NULL — CallNodeAttrs.operationId is required in flowgraph.
parentRequestId text Denormalized parent — fast point lookup. Redundant with triggered edge.
identity jsonb Caller identity: { id, scopes, resources? }
callerAccountId text FK → accounts.id (ON DELETE SET NULL). System calls are nullable.
status text NOT NULL Matches CallStatus enum: pending, running, completed, failed, aborted
input jsonb Call input (redacted, truncated — see storage/call-graph.md)
output jsonb Call output (on success)
error jsonb { code, message, details? } (on failure, nested under error key matching flowgraph)
startedAt timestamp with tz When call was dispatched. Type conversion: flowgraph stores as ISO 8601 string; DB stores as timestamptz.
completedAt timestamp with tz When call completed/failed/aborted. Same type conversion as startedAt.

call_graph_edges — Typed directed edges between calls

Column Type Notes
commonCols id, metadata, createdAt, updatedAt
sourceId text NOT NULL FK → call_graph_nodes.id (CASCADE)
targetId text NOT NULL FK → call_graph_nodes.id (CASCADE)
edgeType text NOT NULL triggered, depends_on, or requested_by

Edge type semantics: triggered = execution hierarchy (parentRequestId), depends_on = data dependency, requested_by = identity/authorization chain. See storage/call-graph.md for details.

Note on depends_on in flowgraph: The flowgraph CallEdgeAttrs type is a union of TriggeredEdgeAttrs and DependencyEdgeAttrs, matching the triggered and depends_on edge types. The requested_by edge type is a storage-layer concept for identity tracing that doesn't have a corresponding flowgraph edge type — it's persisted in the database but not modeled in the in-memory graph.

Transport Mapping

The call protocol is transport-agnostic. The TypedEventTarget plug point (same pattern as RedisEventTarget in the pubsub design) determines how events move:

Transport Use Case TypedEventTarget impl
In-process Local hub operations Browser EventTarget (default)
Redis Cross-process events (e.g., hub → all processes) RedisEventTarget
WebSocket Hub ↔ spoke bidirectional createWebSocketServerEventTarget (hub) / createWebSocketClientEventTarget (spoke) from @alkdev/pubsub

A WebSocketEventTarget implementing TypedEventTarget makes each spoke runner's websocket connection a live bidirectional channel. The hub dispatches call.requested over the socket; the runner sends call.responded/call.error back. Same protocol, same event shapes, same PendingRequestMap — just a different eventTarget.

What We Defer

  1. Full ujsx call templates — currently using hardcoded workflow sequences. @alkdev/flowgraph/component provides Operation, Sequential, Parallel, Conditional, Map components for declarative template definition, and GraphologyHostConfig + ReactiveHostConfig for rendering. We'll adopt these when workflow complexity justifies it.
  2. Graph visualization — API only, no Sigma.js UI
  3. Stream deduplicationValue.Hash({operationId, input}) deduplication for multiple subscribers to the same stream
  4. requested_by edge creation in flowgraph — the requested_by edge type is a storage-layer concept for identity tracing. It's persisted in call_graph_edges but not modeled in @alkdev/flowgraph's CallEdgeAttrs union. We may add it to flowgraph in the future.

The call protocol itself, PendingRequestMap, CallHandler, buildEnv dual-mode, call graph auto-tracking, and reactive workflow execution are in the initial implementation. They're not much code and they prevent the need to bolt on ad-hoc error handling and abort logic in every coordination operation.

Open Questions

  1. Operation deletion and call graph referential integrity: The call_graph_nodes.operationId column has a RESTRICT FK to operations.id. An operation cannot be deleted while any call records reference it. For v1, the strategy is to deny removal while call records exist. If operation removal becomes necessary (e.g., cleanup of old operations), the hub would need to either: (a) reassign all referencing call records to a sentinel __removed__ operation (pre-seeded in migrations with id='__removed__', namespace='system'), then delete the original operation, or (b) accept that historical call records reference operations that may have been provided by disconnected spokes — in which case, consider making operationId nullable in flowgraph's CallNodeAttrs so the hub can NULL the FK instead of requiring a sentinel row. This requires coordination with the @alkdev/flowgraph package.

  2. Reactive vs. call graph requested semantics: In FlowGraph, call.requested creates a node in pending state. In WorkflowReactiveRoot, call.requested maps to NodeStatus.running (the reactive model assumes a template node starts executing when requested). This is a deliberate semantic difference — the reactive model tracks execution progress, while the call graph model tracks protocol state. The spec documents this, but implementers should be aware that feeding the same event to both models produces different initial statuses.

Dependencies

@alkdev/flowgraph      # DAG construction, reactive execution, call/operation graphs, type-compat analysis
@alkdev/operations     # Call protocol, PendingRequestMap, CallHandler
@alkdev/pubsub         # Event transport (Redis, WebSocket, Worker)
@alkdev/taskgraph      # Task graph construction and analysis (for task management, not call graphs)

Why both @alkdev/flowgraph and @alkdev/taskgraph? @alkdev/taskgraph is a domain-specific library for task DAG construction with categorical estimates (scope, risk, impact), frontmatter parsing, and task-specific analysis (critical path, bottleneck detection, risk assessment). @alkdev/flowgraph is a general-purpose workflow graph library for call/operation DAGs with ujsx template composition and reactive execution. They both wrap graphology, but serve different domains. The hub uses @alkdev/taskgraph for task management and @alkdev/flowgraph for call graph and operation graph management.

Prior Art

The call protocol was adapted from ade_spoke's call protocol design (which was pubsub-agnostic). The key difference here is that websockets are the primary transport for hub-spoke communication rather than SSE. The call graph and operation graph are now implemented using @alkdev/flowgraph rather than raw graphology, which provides DAG enforcement, type-compatibility analysis, and reactive execution out of the box.