Files
flowgraph/docs/architecture/reactive-execution.md
glm-5.1 f3e084d02f resolve all remaining open questions (OQ-03–OQ-29), add ADR-006
Resolve all 19 remaining open questions across the architecture. Every
question now has a documented resolution with rationale:

- OQ-004/OQ-029: edgeType is a universal required attribute on all edges,
  single graph per FlowGraph instance (ADR-006)
- OQ-011: No OR preconditions for v1; preconditionMode as v2 extension
- OQ-012: maxConcurrency enforced via reactive counting semaphore
- OQ-014: Unknown operationId creates node with pending status
- OQ-017: Expose common graphology traversal methods on FlowGraph (80/20)
- OQ-020: condition as Type.Unknown() with string/function documentation
- OQ-022: Identity imported from @alkdev/operations peer dep
- All other questions resolved with documented rationale

Fix three critical issues found by architecture review:
1. edgeType serialization/validation gap: document two-step validation
2. CallEdgeAttrs runtime discrimination: edgeType as runtime discriminant,
   depends_on edges clarified as observability-only (not execution)
3. ADR-005 signal mutation inconsistency: explicitly distinguish call-level
   statuses (event-log-driven) from workflow-derived statuses (signal-mutation)

Additional clarifications:
- dataFlow inference uses conservative strategy (defaults false)
- Conditional.test string resolution: operationName → status === completed
- Add negated field to TemplateEdgeAttrs for else-branch conditions
- Document edge key priority convention for composite keys
- Add maxConcurrency semaphore design to reactive-execution.md
2026-05-21 09:25:55 +00:00

833 lines
45 KiB
Markdown

---
status: draft
last_updated: 2026-05-22
---
# Reactive Execution
Signal-driven status propagation, computed preconditions, and failure propagation for workflow template execution, built on the event log as single source of truth (ADR-005).
## Overview
The reactive execution layer bridges workflow template structure (DAG) to runtime behavior (call execution). It uses `@preact/signals-core` (via ujsx's reactive layer) to create a signal-backed execution model where:
- Each `<Operation>` node gets a `signal<NodeStatus>` tracking its lifecycle state
- Preconditions are `computed<boolean>` values that automatically resolve when upstream dependencies complete
- Failure propagation follows dependency edges — a failed predecessor causes downstream dependents to abort, while independent branches continue running
- Conditionals can serve as error boundaries, catching failures and redirecting to fallback paths
### Event Log as Source of Truth
Per [ADR-005](decisions/005-event-log-as-source-of-truth.md), the reactive execution layer is a **projection** of the call protocol event log. The hub coordinator appends call protocol events (`call.requested`, `call.responded`, `call.error`, `call.aborted`, `call.completed`), and the reactive layer derives its state from these events:
```
┌─────────────────────────────────────────────┐
│ Execution Event Log │
│ (append-only CallEventMapValue[] — │
│ the call protocol events) │
└──────────────────┬──────────────────────────┘
┌─────────────┼──────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌──────────┐ ┌──────────┐
│ Status │ │ Result │ │ Call │
│ Proj. │ │ Proj. │ │ Graph │
│ │ │ │ │ Proj. │
│ nodeId: │ │ nodeId: │ │ │
│ status │ │ output │ │ nodes + │
│ │ │ │ │ edges │
└────┬────┘ └────┬─────┘ └──────────┘
│ │
▼ ▼
┌───────────────────────────────────────────┐
│ Reactive Execution Layer │
│ │
│ preconditions → "does the log show │
│ all predecessors │
│ completed?" │
│ │
│ result resolution → "does the log │
│ have A's output?" │
│ │
│ Conditional.test → reads from result proj. │
│ Map.over → reads from result proj. │
└───────────────────────────────────────────┘
```
**The hub coordinator appends events; the reactive layer projects them.** This replaces the previous design where the coordinator directly set signal values. Under ADR-005, the coordinator's responsibility is:
1. Start a call (which emits `call.requested`)
2. Receive the result (which emits `call.responded` or `call.error`)
3. Append these events to the log
The reactive layer's projections derive `NodeStatus` and `CallResult` from the log. The coordinator no longer calls `status.value = "running"` — the status projection derives this from `call.requested` events.
### Hybrid Status Model
While ADR-005 positions the event log as the single source of truth, not all `NodeStatus` values correspond to call protocol events. The model is hybrid:
**Event-log-driven statuses** (derived directly from `CallEventMapValue` events):
| Call protocol event | Derived NodeStatus |
|---------------------|--------------------|
| `call.requested` | `running` |
| `call.responded` | `completed` |
| `call.error` | `failed` |
| `call.aborted` | `aborted` |
**Projection-driven statuses** (derived from the event log combined with template structure and reactive state):
| NodeStatus | Derived from |
|------------|-------------|
| `idle` | No events for this node yet; no predecessors are running |
| `waiting` | At least one predecessor is `running`, none have completed |
| `ready` | All predecessors are `completed` or `skipped`; no `call.requested` event yet |
| `skipped` | Conditional branch not taken (template-level decision, no call event) |
**Signal-mutation statuses** (set by the reactive engine, not derived from events):
| Trigger | NodeStatus | Rationale |
|---------|------------|-----------|
| `blockedByFailure` effect | `aborted` | A predecessor failed; the node is aborted by failure propagation. This is a projection policy decision, not a call protocol event. |
This distinction is important: the event log records **what happened at the call level**, while the reactive engine derives **workflow-level state** from the log combined with template structure. The `WorkflowReactiveRoot` maintains `signal<NodeStatus>` values, but these signals are set by:
1. The status projection when call events arrive (event-log-driven)
2. The reactive engine for workflow-level states (projection-driven or signal-mutation)
The `getStatus(nodeId)` method on `EventLogProjection` checks the event log first (for call-level statuses), then falls back to the signal map (for workflow-level statuses). The `getResult(nodeId)` method is purely event-log-driven.
## ReactiveRoot for Workflows
```typescript
class WorkflowReactiveRoot implements EventLogProjection {
private statusMap: Map<string, Signal<NodeStatus>>;
private preconditions: Map<string, Computed<boolean>>;
private blockedByFailure: Map<string, Computed<boolean>>;
private resultMap: Map<string, Computed<CallResult | undefined>>;
private graph: DirectedGraph;
private effectDisposers: (() => void)[];
private eventLog: CallEventMapValue[];
private nodeKeyToRequestId: Map<string, string>;
private failurePolicy: FailurePolicy;
constructor(graph: DirectedGraph, options?: { failurePolicy?: FailurePolicy }) {
this.graph = graph;
this.statusMap = new Map();
this.preconditions = new Map();
this.blockedByFailure = new Map();
this.resultMap = new Map();
this.effectDisposers = [];
this.eventLog = [];
this.nodeKeyToRequestId = new Map();
this.failurePolicy = options?.failurePolicy ?? "continue-running";
this.initializeSignals();
}
}
```
`WorkflowReactiveRoot` wraps the reactive state for an entire workflow execution. It takes the structural DAG (from the GraphologyHost) and creates reactive state for each operation node. It implements the `EventLogProjection` interface from ADR-005, meaning the hub coordinator appends call protocol events and the root derives status and results from them.
### FailurePolicy
The failure policy determines what happens to running nodes when a predecessor fails. Per ADR-005 and OQ-010, this is a **projection policy**, not a hardcoded rule:
```typescript
type FailurePolicy =
| "continue-running" // Running nodes continue. Only idle/waiting dependents abort. (default)
| "abort-dependents"; // Running dependents of the failed node also abort.
```
The default policy (`continue-running`) means a node that has already started execution completes normally, even if a sibling or predecessor fails. Only nodes that haven't started (`idle` or `waiting`) transition to `aborted`.
### EventLogProjection Interface
```typescript
interface EventLogProjection {
/** Append an event. Events are processed idempotently. */
append(event: CallEventMapValue): void;
/** Current status of a node, derived from the most recent event. */
getStatus(nodeId: string): NodeStatus;
/** Result of a completed node, derived from call.responded events. */
getResult(nodeId: string): CallResult | undefined;
/** All events for a node, in order. */
getEvents(nodeId: string): CallEventMapValue[];
}
```
The `append()` method is the primary entry point for the hub coordinator. When a call protocol event arrives (`call.requested`, `call.responded`, etc.), the coordinator appends it to the log. The projections automatically update: `getStatus()` scans the log for the most recent event per node, and `getResult()` extracts the output from `call.responded` events.
### Request ID Mapping
The event log uses `requestId` (from the call protocol), while the reactive engine uses node keys (from the template DAG). The `nodeKeyToRequestId` map bridges these:
```typescript
// When starting a call:
const requestId = crypto.randomUUID();
workflowRoot.nodeKeyToRequestId.set(nodeKey, requestId);
// When appending events:
workflowRoot.append({ type: "call.requested", requestId, operationId, input, timestamp: now() });
```
This mapping is necessary because a single template node may have multiple requests (retries), and the event log records all of them.
### initializeSignals()
```typescript
private initializeSignals(): void {
for (const node of this.graph.nodes()) {
const attrs = this.graph.getNodeAttributes(node);
// In the flattened DAG from GraphologyHostConfig, all nodes represent
// operations (structural containers like Sequential/Parallel are transparent
// and create no nodes). No filtering needed — every node gets a signal.
const status = signal<NodeStatus>("idle");
const predecessors = this.graph.inNeighbors(node);
// Preconditions: all predecessors completed or skipped
const preconditions = computed(() => {
return predecessors.every(pred => {
const predStatus = this.statusMap.get(pred);
return predStatus && (predStatus.value === "completed" || predStatus.value === "skipped");
});
});
// Blocked by failure: any predecessor failed or aborted (uncaught)
const blockedByFailure = computed(() => {
return predecessors.some(pred => {
const predStatus = this.statusMap.get(pred);
return predStatus && (predStatus.value === "failed" || predStatus.value === "aborted");
});
});
// Result: derived from the event log's result projection
// Uses the MOST RECENT call.responded event for this node (respects retries)
const result = computed(() => {
const requestId = this.nodeKeyToRequestId.get(node);
if (!requestId) return undefined;
const nodeEvents = this.eventLog
.filter(e => "requestId" in e && e.requestId === requestId);
// For retries, find the most recent call.responded or call.error event
// Events are in chronological order, so findLast would work in ES2023.
// Here we reverse-filter to find the latest terminal event.
let latestTerminalEvent: CallEventMapValue | undefined;
for (let i = nodeEvents.length - 1; i >= 0; i--) {
const e = nodeEvents[i];
if (e.type === "call.responded" || e.type === "call.error" || e.type === "call.aborted") {
latestTerminalEvent = e;
break;
}
}
if (!latestTerminalEvent) return undefined;
if (latestTerminalEvent.type === "call.error") {
return {
status: "failed",
output: undefined,
error: latestTerminalEvent.error,
} satisfies CallResult;
}
if (latestTerminalEvent.type === "call.responded") {
return {
status: "completed",
output: latestTerminalEvent.output,
} satisfies CallResult;
}
if (latestTerminalEvent.type === "call.aborted") {
return {
status: "aborted",
output: undefined,
} satisfies CallResult;
}
return undefined;
});
this.statusMap.set(node, status);
this.preconditions.set(node, preconditions);
this.blockedByFailure.set(node, blockedByFailure);
this.resultMap.set(node, result);
}
}
```
For each operation node in the DAG:
1. Create a `signal<NodeStatus>` starting at `"idle"`
2. Create a `computed<boolean>` that's `true` when all predecessor nodes have status `"completed"` (or `"skipped"` — a skipped node satisfies its dependents' preconditions)
3. Create a `computed<NodeStatus | null>` that detects whether any predecessor has failed or been aborted, triggering a cascade
4. Create a `computed<CallResult | undefined>` that derives the node's result from the event log (for use by `Conditional.test` and `Map.over`)
5. Register an abort function that cascades to all descendants
### Status lifecycle
The signal-based status lifecycle mirrors `CallStatus` with workflow-specific additions. Under ADR-005, status transitions are **derived from the event log** — the coordinator appends events, and the status projection maps events to states:
| Event log signals | NodeStatus | Meaning |
|-------------------|------------|---------|
| (no events) | `idle` | Node just created, no call activity yet |
| Predecessor events arriving | `waiting` | At least one predecessor is running, none have completed yet |
| All predecessors completed/skipped | `ready` | All preconditions met, eligible to start |
| `call.requested` received | `running` | Call executing |
| `call.responded` received | `completed` | Call succeeded |
| `call.error` received | `failed` | Call failed (uncaught error) |
| `call.aborted` received | `aborted` | Call cancelled |
| Conditional branch not taken | `skipped` | Conditional branch not taken |
```
┌──────┐
┌────────│ idle │────────────┐
│ └──┬───┘ │
│ │ predecessor │ (no predecessors —
│ │ starts running │ root node)
│ ▼ │
│ ┌───────┐ │
│ │waiting│ │
│ └───┬───┘ │
│ │ all preds │
│ │ completed/ │
│ ┌────┤ skipped │
│ │ │ ▼
│ │ │ ┌──────┐
│ │ └──────────►│ready │
│ │ └──┬───┘
│ │ │ hub starts call
│ │ │ (appends call.requested)
│ │ ▼
│ │ ┌────────┐
│ │ │running │──── ──── ──── ────►
│ │ └──┬──┬──┘ │
│ │ │ │ │
│ │ call │ │ call │ call
│ │ responded │ │ failed │ aborted
│ │ │ │ │
│ │ ▼ ▼ ▼
│ │ ┌───────────┐ ┌──────┐ ┌────────┐
│ │ │ completed │ │failed│ │aborted │
│ │ └───────────┘ └──────┘ └────────┘
│ │ │ │ │
│ │ │ │ (uncaught) │
│ │ │ ▼ │
│ │ │ cascades to all │
│ │ │ downstream dependents │
│ │ │ via blockedByFailure │
│ │ │ │
└──────┼──────────────┼────────────────────────────┘
│ │
│ ┌─────────┐│
└───►│skipped ││ (Conditional branch
└─────────┘│ not taken)
└─── all are terminal states
```
### Retry semantics (ADR-005)
Retries are natural with the event log. A retry is NOT a state mutation — it's a new sequence of events appended to the log:
```
call.requested(A, reqId=1) → fact: A was requested
call.error(A, reqId=1) → fact: A failed on first attempt
call.requested(A, reqId=2) → fact: A was retried with a new request
call.responded(A, reqId=2) → fact: A succeeded on retry
```
The status projection derives the current state by scanning for the **most recent event per node**. No `retried` status needed; no state machine mutation; the log preserves full history. The `nodeKeyToRequestId` map tracks which `requestId` corresponds to each node's current attempt.
## Computed Preconditions
The core innovation of reactive execution: each node's "can I start?" question is a `computed` signal that automatically resolves based on upstream states.
```typescript
const preconditions = computed(() => {
const predecessors = graph.inNeighbors(node);
return predecessors.every(pred => {
const status = statusMap.get(pred)!.value;
return status === "completed" || status === "skipped";
});
});
```
A node's preconditions are met when **all predecessors have reached a satisfying terminal state** (`completed` or `skipped`). A `failed` or `aborted` predecessor does NOT satisfy preconditions — it prevents the dependent from ever becoming `ready`.
This means:
- Adding a new predecessor automatically includes it in the check (if the DAG changes)
- A predecessor completing automatically re-evaluates all dependent preconditions
- An aborted predecessor prevents dependents from becoming `ready`
- A skipped predecessor satisfies preconditions (the branch was deliberately bypassed, not broken)
- No manual event wiring or callback chains
### Sequential preconditions
In a sequential group (A → B → C):
- A's preconditions: `true` (no predecessors, or root-level)
- B's preconditions: `A.status === "completed"`
- C's preconditions: `B.status === "completed"`
When A completes → B's preconditions become true → hub starts B → B completes → C's preconditions become true → hub starts C. All without manual event wiring.
### Parallel preconditions
In a parallel group (A starts B and C simultaneously):
- B's preconditions: `A.status === "completed"` (same as any sequential dependency)
- C's preconditions: `A.status === "completed"` (shared predecessor)
Both B and C become `ready` at the same time, and the hub starts them in parallel.
### Join preconditions
When a node depends on multiple predecessors (fork-join):
```
┌── B (completed) ──┐
A (completed) ├── D (ready)
└── C (failed) ─────┘
```
D's `preconditions` requires both B and C to be completed/skipped. Since C is `failed`, D's preconditions can never be met. D transitions to `aborted`.
The alternative would be "partial success" — D starts with B's output even though C failed. This is NOT supported by the precondition model. If partial execution is needed, the template author should use a `Conditional` to handle the failure case explicitly.
### `maxConcurrency` for Parallel groups
A `Parallel` group with `maxConcurrency: 3` should only start 3 nodes at a time, even though all preconditions are met. This is a scheduling constraint, not a structural one — the DAG doesn't encode it.
The `WorkflowReactiveRoot` enforces `maxConcurrency` via a reactive counting semaphore:
```typescript
// For each node in a Parallel group with maxConcurrency:
const groupKey = getParallelGroup(nodeId); // from parentMap/siblingMap
const maxConc = getMaxConcurrency(groupKey); // from template props
const canStart = computed(() => {
const siblingRunningCount = siblings.filter(
sib => statusMap.get(sib)!.value === "running"
).length;
return preconditions.value && siblingRunningCount < maxConc;
});
```
A node becomes `ready` only when both its `preconditions` are met AND the number of currently running siblings is below `maxConcurrency`. When a sibling completes and a slot opens, the next ready node starts.
For `Parallel` groups without `maxConcurrency` (the default), all siblings start immediately when their preconditions are met — no semaphore is needed.
### Conditional as error boundary
A `Conditional` can catch a failure and redirect to a fallback path:
```typescript
h(Sequential, {},
h(Operation, { name: "fetch-data" }),
h(Conditional, {
test: (results) => results["fetch-data"].status !== "failed",
},
// then: proceed with data processing
h(Sequential, {},
h(Operation, { name: "transform" }),
h(Operation, { name: "store" }),
),
// else: fallback path
h(Operation, { name: "notify-error" }),
),
)
```
If `fetch-data` fails:
1. The `Conditional`'s `test` function receives the results map from the **result projection** (derived from the event log)
2. `test` evaluates to `false` (the operation failed)
3. The `then`-branch transitions to `skipped`
4. The `else`-branch (`notify-error`) becomes `ready`
5. Downstream nodes after the `Conditional` see the `Conditional` as `completed` (it resolved successfully, just on a different branch)
The result projection (from ADR-005) provides `CallResult` values to `Conditional.test` and `Map.over`. These are computed from the event log, not from direct signal reads. This ensures that `Conditional.test` always sees the most recent state — if a node is retried, the test sees the retry's result, not the original failure.
Without a `Conditional`, the failure is **uncaught**. It cascades through dependency edges to all dependents, which transition to `aborted`.
### Systemic failure: aborting the entire workflow
For failures that should cancel everything (e.g., provider outage, authentication failure), the hub coordinator can abort the entire `WorkflowReactiveRoot`:
```typescript
workflowRoot.abortAll(); // Sets all non-terminal nodes to "aborted"
```
This is separate from dependency-edge failure propagation. It's for systemic failures where the workflow cannot meaningfully continue regardless of which branches are independent.
### Interaction with call protocol abort
There are two abort mechanisms:
1. **Signal cascade** (this layer) — `blockedByFailure` effects transition dependents to `aborted`. This is automatic and follows dependency edges.
2. **Call protocol abort** (operations layer) — `PendingRequestMap.abort(requestId)` propagates `call.aborted` events through the pub/sub layer. This is network-aware and handles remote calls.
3. **Full workflow abort**`workflowRoot.abortAll()` aborts all non-terminal nodes. For systemic failures.
The hub coordinator should invoke signal cascade and protocol abort together:
```typescript
// When aborting a call:
workflowRoot.abortNode(nodeId); // Signal: transition dependents to aborted
prm.abort(requestId); // Protocol: cancel the remote call
// When aborting entire workflow:
workflowRoot.abortAll(); // Signal: abort everything
prm.abortAll(pendingRequestIds); // Protocol: cancel all pending calls
```
Signal cascades are instant. Protocol aborts may take time to propagate. They're complementary — the signal cascade ensures local state is immediately consistent, while the protocol abort ensures remote state eventually catches up.
## NodeStatus vs CallStatus
`NodeStatus` extends `CallStatus` with workflow-specific states that have no call protocol equivalent:
| NodeStatus | Meaning | CallStatus equivalent |
|-----------|---------|----------------------|
| `idle` | Not started, no preconditions evaluated | None (call doesn't exist yet) |
| `waiting` | Preconditions not met (upstream still running) | None |
| `ready` | Preconditions met, eligible to start | None |
| `running` | Call in progress | `running` |
| `completed` | Call succeeded | `completed` |
| `failed` | Call failed | `failed` |
| `aborted` | Call cancelled | `aborted` |
| `skipped` | Conditional branch not taken | None |
The hub coordinator maps between these:
```typescript
// NodeStatus → CallStatus (when starting a call)
function nodeStatusToCallAction(status: NodeStatus): "start" | "skip" | "abort" | "none" {
switch (status) {
case "ready": return "start";
case "skipped": return "skip";
case "aborted": return "abort";
default: return "none";
}
}
// CallStatus → NodeStatus (when call event arrives)
function callStatusToNodeStatus(callStatus: CallStatus): NodeStatus {
// Direct mapping for shared states
return callStatus as NodeStatus;
}
```
## Event-Driven Execution
Under ADR-005, the hub coordinator's responsibility shifts from directly setting signal values to **appending events to the log**. The reactive layer drives execution via `effect()`s that watch projections and invoke calls when preconditions are met.
### Coordinator Flow
```typescript
// 1. Create the reactive root from the DAG
const workflowRoot = new WorkflowReactiveRoot(dag, { failurePolicy: "continue-running" });
// 2. Register effects that start calls when preconditions are met
for (const [nodeId, preconditions, blockedByFailure] of workflowRoot.nodes) {
// Start the call when preconditions are met
effect(() => {
if (preconditions.value) {
const status = workflowRoot.statusMap.get(nodeId)!;
if (status.value === "idle" || status.value === "waiting") {
// All preconditions satisfied — start the call
const operationId = graph.getNodeAttributes(nodeId).name;
const requestId = crypto.randomUUID();
workflowRoot.nodeKeyToRequestId.set(nodeId, requestId);
// Append event to the log (the status projection updates automatically)
workflowRoot.append({
type: "call.requested",
requestId,
operationId,
input: getInput(nodeId),
timestamp: new Date().toISOString(),
});
}
}
});
// Abort when a predecessor fails (uncaught failure propagation)
effect(() => {
if (blockedByFailure.value) {
const status = workflowRoot.statusMap.get(nodeId)!;
if (status.value === "idle" || status.value === "waiting") {
// A predecessor failed and no Conditional caught it — abort
status.value = "aborted";
}
}
});
}
// 3. When a call completes, append the result event
prm.call(operationId, input, { parentRequestId })
.then(result => {
workflowRoot.append({
type: "call.responded",
requestId,
output: result,
timestamp: new Date().toISOString(),
});
})
.catch(error => {
workflowRoot.append({
type: "call.error",
requestId,
error: { code: error.code, message: error.message },
timestamp: new Date().toISOString(),
});
});
```
Both effects are reactive. When a predecessor completes, the `preconditions` computed re-evaluates, potentially triggering the start effect. When a predecessor fails, the `blockedByFailure` computed re-evaluates, potentially triggering the abort effect.
The call's promise resolution appends events to the log. The status projection derives state from events. There is no direct `status.value = "running"` or `status.value = "completed"` — the projection handles these transitions by scanning the event log.
### Event-to-Status Mapping
The status projection maps events to `NodeStatus` values:
| Last event for node | Derived NodeStatus |
|---------------------|--------------------|
| No events | `idle` (or `waiting` if predecessors are running) |
| `call.requested` | `running` |
| `call.responded` | `completed` |
| `call.error` | `failed` |
| `call.aborted` | `aborted` |
| `call.completed` | `completed` |
For retries, the projection scans for the most recent event per node. A node with both `call.error` and `call.requested` (with a new `requestId`) is `running`, not `failed`.
### Effect disposal
Each `effect()` returns a dispose function. The `WorkflowReactiveRoot` tracks all effect disposers and provides a `dispose()` method that tears down the entire reactive graph:
```typescript
dispose(): void {
for (const disposer of this.effectDisposers) {
disposer();
}
this.statusMap.clear();
this.preconditions.clear();
this.blockedByFailure.clear();
}
```
This is critical for cleaning up when a workflow completes, fails, or is aborted. Without disposal, signal subscriptions leak.
### Full workflow abort
For systemic failures (provider outage, authentication failure), `WorkflowReactiveRoot` provides `abortAll()`:
```typescript
abortAll(): void {
for (const [nodeId, status] of this.statusMap) {
if (status.value !== "completed" && status.value !== "failed") {
status.value = "aborted";
}
}
// Effects will fire and clean up any waiting/ready nodes
}
```
This transitions all non-terminal, non-failed nodes to `aborted`. It's for cases where the entire workflow should stop, regardless of which branches are independent.
## Reactive Error Boundaries
The reactive execution layer has three levels of error handling, each with distinct scope and semantics:
### Level 1: Signal-level errors (per-node)
When a call fails, the hub coordinator sets the node's status to `"failed"`:
```typescript
status.value = "failed"; // Individual node failure
```
This triggers `blockedByFailure` in all downstream dependents, causing them to transition to `"aborted"`. The failure propagates through the signal graph reactively — no manual error handling is needed.
### Level 2: Conditional error boundaries (branch-level)
A `Conditional` node catches failures and redirects to an alternative branch:
```typescript
h(Conditional, {
test: (results) => results["fetch-data"].status !== "failed",
},
// then-branch (happy path)
h(Operation, { name: "process" }),
// else-branch (fallback)
h(Operation, { name: "handle-error" }),
)
```
When the `Conditional`'s `test` function evaluates to `false` (because a predecessor failed), the then-branch transitions to `skipped` and the else-branch becomes `ready`. Downstream nodes after the `Conditional` see it as `completed` — the failure is contained.
This is the reactive equivalent of a `try/catch` block. Without a `Conditional`, failures cascade uncaught through dependency edges.
### Level 3: Workflow abort (system-level)
For failures that should cancel everything, the hub calls `workflowRoot.abortAll()`:
```typescript
workflowRoot.abortAll(); // All non-terminal nodes → "aborted"
```
This is for system-level failures: provider outage, authentication failure, or any condition where the workflow cannot meaningfully continue regardless of branch independence.
### WorkflowErrorBoundary (coordinator-level)
The hub coordinator wraps the entire reactive execution in a `WorkflowErrorBoundary` — a conceptual boundary, not a signal:
```typescript
try {
// Drive the workflow
for (const [nodeId, preconditions, blockedByFailure] of workflowRoot.nodes) {
effect(() => { /* start calls when ready */ });
effect(() => { /* abort when blocked */ });
}
} catch (error) {
// Unhandled reactive error — signal graph inconsistency
// This shouldn't happen in normal operation
workflowRoot.abortAll();
prm.abortAll(pendingRequestIds);
}
```
The `WorkflowErrorBoundary` catches errors that escape the signal graph (e.g., a `computed` that throws, an `effect` that errors). These are catastrophic — the reactive state is inconsistent. The boundary's job is to:
1. Abort all calls via `prm.abortAll()`
2. Set all non-terminal nodes to `"aborted"` via `workflowRoot.abortAll()`
3. Dispose the reactive root
4. Log the error for diagnostics
**Error propagation summary**:
| Error type | Scope | Mechanism | Recovery |
|------------|-------|-----------|----------|
| Call failure | Single node | `status.value = "failed"` | Cascades to dependents via `blockedByFailure` |
| Caught by Conditional | Branch | `Conditional.test` evaluates against failed status | Redirect to else-branch, downstream sees `completed` |
| Uncaught cascade | Downstream chain | `blockedByFailure` effects | Downstream nodes transition to `aborted` |
| System failure | Entire workflow | `abortAll()` | All non-terminal nodes to `aborted` |
| Reactive error | Signal graph | `WorkflowErrorBoundary` catch | Abort everything, dispose, log |
## Constraints
- **Events are the source of truth for call-level statuses** (ADR-005) — the hub coordinator appends call protocol events. Call-level statuses (`running`, `completed`, `failed`, `aborted` from `call.aborted`) are derived from the event log by the status projection. The coordinator does NOT directly set signal values for these statuses.
- **Workflow-derived statuses use signal mutation** — statuses that have no call protocol equivalent (`idle`, `waiting`, `ready`, `skipped`, and `aborted` from `blockedByFailure`) are set directly on signals by the reactive engine. This is not a violation of ADR-005's event-log principle — these statuses represent workflow-level concerns (scheduling, failure propagation) that exist outside the call protocol's scope. ADR-005's principle applies to *call protocol events*; it does not forbid the reactive layer from managing its own workflow-level state. See the "Hybrid Status Model" section for the full categorization.
- **Event processing is idempotent** — processing the same event twice produces the same projected state. The status projection scans for the most recent event per node.
- **Signals are in-memory** — `WorkflowReactiveRoot` state is not persisted. If the hub restarts, the reactive state is reconstructed from call protocol events + template re-render. The event log itself can be reconstructed from the call protocol event stream.
- **Failure policy is configurable** — the `FailurePolicy` determines what happens to running nodes when a predecessor fails. Default is `continue-running` (only idle/waiting nodes abort). Alternative is `abort-dependents` (running dependents also abort).
- **Failure follows dependency edges, not structural scope** — a failed node causes only its downstream dependents (via DAG edges) to abort. Sibling branches in a `Parallel` group are independent and continue running. This enables partial success: one branch can fail while another completes.
- **Conditionals are error boundaries** — a `Conditional` whose test evaluates against a failed predecessor can redirect to an else branch, catching the failure. Without a `Conditional`, failures cascade uncaught through dependency edges.
- **Abort is immediate in signals, delayed in protocol** — transitioning a signal to `aborted` is instant, but `prm.abort(requestId)` takes time to propagate through the call protocol. The hub should invoke both.
- **`skipped` satisfies preconditions** — a `skipped` predecessor is treated as "completed for the purpose of preconditions." It means the branch was deliberately bypassed, not broken.
- **`failed` and `aborted` block preconditions** — a `failed` or `aborted` predecessor means the dependent's preconditions can never be met. The `blockedByFailure` effect transitions the dependent to `aborted`.
- **`NodeStatus` and `CallStatus` share terminal states** — `running`, `completed`, `failed`, `aborted` map directly. `idle`, `waiting`, `ready`, `skipped` are workflow-specific additions with no call protocol equivalent.
- **Edge key format uses composite keys for call graph** — `triggered` edges use `${source}->${target}`, `depends_on` edges use `${source}->${target}:depends_on`. See [schema.md](schema.md) for the full key convention.
## Lifecycle and Ownership
The reactive execution pipeline has a clear creation order and ownership model:
### Creation Order
```
1. Template (UNode tree)
↓ GraphologyHostConfig
2. DAG (DirectedGraph)
↓ WorkflowReactiveRoot constructor
3. Signal graph (statusMap, preconditions, blockedByFailure)
↓ ReactiveHostConfig.render()
4. WorkflowNode tree (with effects registered)
```
1. **Template → DAG**: The consumer provides a template and renders it through `GraphologyHostConfig`. This produces a `DirectedGraph` stored in the `GraphContext`.
2. **DAG → Signal graph**: The consumer creates a `WorkflowReactiveRoot` from the DAG. The constructor iterates over all operation nodes in the DAG and creates `signal<NodeStatus>`, `computed<boolean>` (preconditions), and `computed<boolean>` (blockedByFailure) for each.
3. **Signal graph → WorkflowNode tree**: The consumer renders the template through `ReactiveHostConfig`. The `createInstance` call for each `Operation` node looks up the corresponding signal in the `ReactiveRoot` and wires the node's effects.
### Ownership
| Object | Owned by | Disposed by |
|--------|----------|-------------|
| Template (`UNode` tree) | Consumer | Consumer (not a reactive resource) |
| DAG (`DirectedGraph`) | GraphologyHostConfig's `GraphContext` | Consumer (static, no disposal needed) |
| `WorkflowReactiveRoot` | Consumer (typically the hub coordinator) | Consumer calls `root.dispose()` |
| Signal graph (statusMap, preconditions, etc.) | `WorkflowReactiveRoot` | `root.dispose()` clears all maps |
| `WorkflowNode` tree | `ReactiveContext` (created by ReactiveHostConfig) | Cleared when `ReactiveContext` is garbage collected |
| Effects | `WorkflowReactiveRoot.effectDisposers` | `root.dispose()` calls all disposers |
**Key ownership rules**:
- `WorkflowReactiveRoot` owns the signal graph. It creates every `signal` and `computed`, tracks every `effect` disposer, and is responsible for cleaning them all up.
- `ReactiveHostConfig` is stateless after rendering. It creates `WorkflowNode` instances and registers effects, but the effects are tracked by `WorkflowReactiveRoot`, not by the HostConfig.
- The consumer owns the `WorkflowReactiveRoot` lifecycle. It creates it, drives execution by setting status values, and disposes it when done.
### Disposal
```typescript
// When workflow completes or is cancelled:
workflowRoot.dispose();
```
`dispose()` performs the following in order:
1. Calls every `effect()` disposer, unsubscribing all reactive effects.
2. Clears `statusMap`, `preconditions`, and `blockedByFailure` maps, releasing signal references.
3. The `WorkflowNode` tree becomes inert — status signals no longer exist, so no updates propagate.
**When to dispose**:
- Workflow completes successfully (all nodes `completed`)
- Workflow is aborted (consumer calls `abortAll()`, then `dispose()`)
- Template is being re-rendered (dispose the old root before creating a new one — until ujsx reconciler supports re-rendering)
**What NOT to dispose**:
- The DAG (`DirectedGraph`) is not a reactive resource. It doesn't need disposal.
- The template (`UNode` tree) is plain data. It doesn't need disposal.
### Interaction with ReactiveHostConfig
The `ReactiveHostConfig` does NOT own the reactive state. It creates `WorkflowNode` instances during rendering, but these nodes reference signals that belong to `WorkflowReactiveRoot`. The rendering flow is:
```typescript
// 1. Create ReactiveRoot from DAG
const workflowRoot = new WorkflowReactiveRoot(dag);
// 2. Create ReactiveHostConfig with reference to ReactiveRoot's signals
const hostConfig = new ReactiveHostConfig(operationRegistry, workflowRoot);
// 3. Render template
const root = createRoot(hostConfig, {});
root.render(template);
// 4. Drive execution (hub coordinator sets status values)
workflowRoot.statusMap.get("architect")!.value = "ready";
// ... external code starts the call, eventually:
workflowRoot.statusMap.get("architect")!.value = "completed";
// ... which triggers downstream preconditions
// 5. Cleanup
workflowRoot.dispose();
```
The `ReactiveContext` passed to `ReactiveHostConfig` includes a reference to `workflowRoot.statusSignals` so that `createInstance` can look up and wire signals for each node. The context does not own these signals — it's a lookup table.
**Important**: `WorkflowNode.status` and `WorkflowReactiveRoot.statusMap.get(nodeId)` reference the **same** `Signal<NodeStatus>` instance. There is one signal per node, owned by `WorkflowReactiveRoot`, and both the `WorkflowNode` and the `statusMap` hold references to it. Setting `workflowRoot.statusMap.get("architect").value = "running"` and setting `workflowNode.status.value = "running"` (where `workflowNode.key === "architect"`) are equivalent operations on the same signal. Similarly, `WorkflowNode.preconditions` and `WorkflowReactiveRoot.preconditions.get(nodeId)` reference the **same** `Computed<boolean>` instance.
## Open Questions
1. ~~**Should preconditions support OR logic?**~~ **Resolved (OQ-011)**: No for v1. All preconditions use AND logic — a node becomes `ready` only when ALL predecessors have reached a satisfying terminal state (`completed` or `skipped`). OR logic (`anyOf`) would introduce significant complexity (what happens when one predecessor completes but another fails? Is the node ready or blocked?) and is already partially addressed by `Conditional` (which provides branch-level either/or semantics). For v2, if OR logic becomes necessary, it should be added as a `preconditionMode: "allOf" | "anyOf"` attribute on `Operation` (node-level, not edge-level), defaulting to `"allOf"`. This is a clean extension point that doesn't change the current precondition model.
2. ~~**How are retries handled at the signal level?**~~ **Resolved by ADR-005**: Retries are natural append events. A retry creates a new `call.requested` with a new `requestId`. The status projection derives the current state by scanning for the most recent event per node. No `retried` status needed. See the Retry semantics section above.
3. ~~**Should the reactive graph support partial re-rendering?**~~ **Resolved (OQ-025)**: Blocked on ujsx reconciler. Currently mount-only. When the reconciler is implemented, flowgraph gains re-rendering through the standard `prepareUpdate`/`commitUpdate` HostConfig methods. The event log persists across re-renders (ADR-005), so re-rendered nodes pick up where they left off. No special reactive-graph re-rendering logic is needed — the reconciler handles tree diffing, and the HostConfig applies mutations.
4. ~~**How does `maxConcurrency` interact with preconditions?**~~ **Resolved (OQ-012)**: `maxConcurrency` is a `Parallel` prop enforced by the `WorkflowReactiveRoot` via a counting semaphore in the reactive layer. When the root initializes signals for nodes in a `Parallel` group with `maxConcurrency: N`, it wraps the precondition logic: a node's effective `ready` transition requires both `preconditions.value === true` AND `runningCount < maxConcurrency`. The `runningCount` is a reactive computed derived from counting sibling nodes currently in the `running` state. This is entirely a reactive-engine concern — the DAG doesn't encode `maxConcurrency` (it's not structural), and the call graph doesn't need to know about it. The `Parallel` component's `maxConcurrency` prop is already part of the template definition; the reactive engine just needs to honor it.
5. ~~**Should `blockedByFailure` be a separate `computed` or derived from `preconditions`?**~~ **Resolved (OQ-013)**: Keep two separate `computed` values (current design). Two separate computeds are more composable — you can check preconditions independently of failure status, and you can compose different effects for each. A single `computed<NodeReadiness>` would require every consumer to destructure the result, losing the clean `if (preconditions.value) { ... }` pattern. The implementation cost of two effects per node is negligible. The current design is the right one.
6. ~~**What happens to running nodes when a predecessor fails?**~~ **Resolved by ADR-005/OQ-010**: This is a `FailurePolicy` configuration of the projection. The default policy (`continue-running`) means running nodes continue. An alternative policy (`abort-dependents`) would abort running dependents. The event log makes both strategies expressible — only the projection logic changes.
## References
- ujsx reactive layer: `@alkdev/ujsx/docs/architecture/reactive-layer.md`
- ujsx reconciler: `@alkdev/ujsx/docs/architecture/reconciler.md`
- Schema: [schema.md](schema.md) — `NodeStatus`, `CallStatus`
- Host configs: [host-configs.md](host-configs.md)
- Workflow templates: [workflow-templates.md](workflow-templates.md)
- Call protocol: `@alkdev/alkhub_ts/docs/architecture/call-graph.md`