Files
flowgraph/docs/architecture/reactive-execution.md
glm-5.1 d2253099ee add flowgraph architecture docs (Phase 1 SDD)
Draft architecture specification for @alkdev/flowgraph — a workflow graph library providing DAG-based orchestration over operations. Covers two graph types (operation graph, call graph), ujsx workflow templates, GraphologyHost and ReactiveHost configs, signal-driven execution, type-compatibility analysis, error hierarchy, and build/distribution. Includes 3 ADRs: ujsx as template IR, DAG-only enforcement, decoupled storage.
2026-05-19 09:36:22 +00:00

12 KiB

status, last_updated
status last_updated
draft 2026-05-19

Reactive Execution

Signal-driven status propagation, computed preconditions, and abort cascading for workflow template execution.

Overview

The reactive execution layer bridges workflow template structure (DAG) to runtime behavior (call execution). It uses @preact/signals-core (via ujsx's reactive layer) to create a signal-backed execution model where:

  • Each <Operation> node gets a signal<NodeStatus> tracking its lifecycle state
  • Preconditions are computed<boolean> values that automatically resolve when upstream dependencies complete
  • Abort cascades propagate through the signal graph — setting one node to "aborted" automatically prevents downstream nodes from starting

This layer does NOT execute operations directly. It provides reactive state that the hub coordinator reads and writes. The coordinator calls registry.execute() when a node's preconditions are met, and updates the node's status signal when the call completes.

ReactiveRoot for Workflows

class WorkflowReactiveRoot {
  private statusMap: Map<string, Signal<NodeStatus>>;
  private preconditions: Map<string, Computed<boolean>>;
  private graph: DirectedGraph;
  private abortMap: Map<string, () => void>;

  constructor(graph: DirectedGraph) {
    this.graph = graph;
    this.statusMap = new Map();
    this.preconditions = new Map();
    this.abortMap = new Map();
    this.initializeSignals();
  }
}

WorkflowReactiveRoot wraps the reactive state for an entire workflow execution. It takes the structural DAG (from the GraphologyHost) and creates reactive state for each operation node.

initializeSignals()

private initializeSignals(): void {
  for (const node of this.graph.nodes()) {
    const attrs = this.graph.getNodeAttributes(node);
    if (attrs.category !== "operation") continue;  // Skip structural nodes (already flattened)

    const status = signal<NodeStatus>("idle");

    const preconditions = computed(() => {
      const predecessors = this.graph.inNeighbors(node);
      return predecessors.every(pred => {
        const predStatus = this.statusMap.get(pred);
        return predStatus && predStatus.value === "completed";
      });
    });

    this.statusMap.set(node, status);
    this.preconditions.set(node, preconditions);
    this.abortMap.set(node, () => this.cascadeAbort(node));
  }
}

For each operation node in the DAG:

  1. Create a signal<NodeStatus> starting at "idle"
  2. Create a computed<boolean> that's true when all predecessor nodes have status "completed"
  3. Register an abort function that cascades to all descendants

Status lifecycle

The signal-based status lifecycle mirrors CallStatus with workflow-specific additions:

idle → waiting → ready → running → completed
                                       → failed
                    → aborted          → aborted
Status Meaning Signal trigger
idle Node just created, no parent completion yet Initial state
waiting At least one predecessor is running, none have completed Any predecessor status change
ready All predecessors completed (preconditions met) computed resolves to true
running Call executing Hub sets status.value = "running"
completed Call succeeded Hub sets status.value = "completed"
failed Call failed Hub sets status.value = "failed"
aborted Call cancelled (or parent cancelled) Hub or cascade sets status.value = "aborted"
skipped Conditional branch not taken Conditional evaluation sets this

The hub coordinator reads the ready state (via preconditions) and triggers execution. When the call completes, the hub writes the new status to the signal. The signal propagates to all downstream computed values automatically.

Computed Preconditions

The core innovation of reactive execution: each node's "can I start?" question is a computed signal that automatically resolves based on upstream states.

const preconditions = computed(() => {
  const predecessors = graph.inNeighbors(node);
  return predecessors.every(pred => statusMap.get(pred)!.value === "completed");
});

This means:

  • Adding a new predecessor automatically includes it in the check (if the DAG changes)
  • A predecessor completing automatically re-evaluates all dependent preconditions
  • An aborted predecessor prevents all dependents from becoming ready
  • No manual event wiring or callback chains

Sequential preconditions

In a sequential group (A → B → C):

  • A's preconditions: true (no predecessors, or root-level)
  • B's preconditions: A.status === "completed"
  • C's preconditions: B.status === "completed"

When A completes → B's preconditions become true → hub starts B → B completes → C's preconditions become true → hub starts C. All without manual event wiring.

Parallel preconditions

In a parallel group (A starts B and C simultaneously):

  • B's preconditions: A.status === "completed" (same as any sequential dependency)
  • C's preconditions: A.status === "completed" (shared predecessor)

Both B and C become ready at the same time, and the hub starts them in parallel.

Join preconditions

When a node depends on multiple predecessors (e.g., D depends on both B and C completing):

  • D's preconditions: B.status === "completed" && C.status === "completed"

D only becomes ready when all predecessors complete. This is the "join" in fork-join parallelism.

Abort Cascade

Abort cascading is signal-driven. When a node is aborted:

cascadeAbort(nodeId: string): void {
  const status = this.statusMap.get(nodeId);
  if (status && !isTerminal(status.value)) {
    status.value = "aborted";
    // Cascade to all descendants
    for (const desc of this.graph.descendants(nodeId)) {
      const descStatus = this.statusMap.get(desc);
      if (descStatus && !isTerminal(descStatus.value)) {
        descStatus.value = "aborted";
      }
    }
  }
}

This sets the status of the aborted node and all of its descendants to "aborted". The computed preconditions of these nodes automatically re-evaluate — but since aborted nodes never become "completed", their dependents will never become "ready".

Interaction with call protocol abort

There are two abort mechanisms:

  1. Signal cascade (this layer) — sets status.value = "aborted" for the node and all descendants. This is immediate and graph-based.
  2. Call protocol abort (operations layer) — PendingRequestMap.abort(requestId) propagates call.aborted events through the pub/sub layer. This is network-aware and handles remote calls.

The hub coordinator should invoke both:

// When aborting a call:
workflowRoot.cascadeAbort(nodeId);           // Signal cascade
prm.abort(requestId);                        // Protocol cascade

The signal cascade is for local state (the reactive graph). The protocol cascade is for remote state (the running calls). They're complementary — the protocol cascade may take time to propagate, but the signal cascade is instant.

NodeStatus vs CallStatus

NodeStatus extends CallStatus with workflow-specific states that have no call protocol equivalent:

NodeStatus Meaning CallStatus equivalent
idle Not started, no preconditions evaluated None (call doesn't exist yet)
waiting Preconditions not met (upstream still running) None
ready Preconditions met, eligible to start None
running Call in progress running
completed Call succeeded completed
failed Call failed failed
aborted Call cancelled aborted
skipped Conditional branch not taken None

The hub coordinator maps between these:

// NodeStatus → CallStatus (when starting a call)
function nodeStatusToCallAction(status: NodeStatus): "start" | "skip" | "abort" | "none" {
  switch (status) {
    case "ready": return "start";
    case "skipped": return "skip";
    case "aborted": return "abort";
    default: return "none";
  }
}

// CallStatus → NodeStatus (when call event arrives)
function callStatusToNodeStatus(callStatus: CallStatus): NodeStatus {
  // Direct mapping for shared states
  return callStatus as NodeStatus;
}

Effect-Driven Execution

The hub coordinator uses effect() to react to precondition changes:

for (const [nodeId, preconditions] of workflowRoot.preconditions) {
  effect(() => {
    if (preconditions.value) {
      const status = workflowRoot.statusMap.get(nodeId)!;
      if (status.value === "idle" || status.value === "waiting") {
        // All preconditions met — start the call
        status.value = "running";
        const operationId = graph.getNodeAttributes(nodeId).name;
        prm.call(operationId, getInput(nodeId), { parentRequestId: parentCallId })
          .then(result => { status.value = "completed"; })
          .catch(error => { status.value = "failed"; });
      }
    }
  });
}

Each node gets an effect() that watches its preconditions computed value. When preconditions resolve to true and the node is in a startable state (idle or waiting), the effect starts the call via PendingRequestMap.call().

The call's promise resolution updates the node's status signal, which triggers downstream preconditions to re-evaluate, which triggers their effects, and so on.

Effect disposal

Each effect() returns a dispose function. The WorkflowReactiveRoot tracks all effect disposers and provides a dispose() method that tears down the entire reactive graph:

dispose(): void {
  for (const disposer of this.effectDisposers) {
    disposer();
  }
  this.statusMap.clear();
  this.preconditions.clear();
  this.abortMap.clear();
}

This is critical for cleaning up when a workflow completes, fails, or is aborted. Without disposal, signal subscriptions leak.

Constraints

  • Signals are in-memoryWorkflowReactiveRoot state is not persisted. If the hub restarts, the reactive state is lost and must be reconstructed from call protocol events + template re-render.
  • Effect-driven execution is optional — the hub coordinator can choose not to use effect() and instead poll preconditions.value manually. The reactive layer provides the building blocks; the coordinator decides how to use them.
  • Abort is immediate in signals, delayed in protocol — setting status.value = "aborted" is instant, but prm.abort(requestId) takes time to propagate through the call protocol. The hub should invoke both.
  • skipped is set by conditional evaluation, not by the call protocol — a Conditional node whose test evaluates to false sets its child's status to skipped, which prevents the call from ever starting.
  • NodeStatus and CallStatus share terminal statesrunning, completed, failed, aborted map directly. idle, waiting, ready, skipped are workflow-specific additions.

Open Questions

  1. Should preconditions support OR logic? Currently all predecessors must complete. An anyOf predicate would allow "start this node as soon as any predecessor completes." This would require an edge attribute or node-level configuration.

  2. How are retries handled at the signal level? If an operation fails and should be retried, the status would go running → failed → ready → running. This requires resetting the status back to ready, which the current state machine doesn't support (failed is terminal). A retried status or a separate retryCount attribute may be needed.

  3. Should the reactive graph support partial re-rendering? If a template changes mid-execution (e.g., a step is added), the ujsx reconciler could diff the old and new trees. But the ReactiveHost only supports mount rendering. Re-rendering would require reconciler support.

  4. How does maxConcurrency interact with preconditions? A Parallel group with maxConcurrency: 3 should only start 3 nodes at a time, even though all preconditions are met. This is a scheduling concern, not a structural one. The reactive layer could implement this as a semaphore signal, or it could be the coordinator's responsibility.

References

  • ujsx reactive layer: @alkdev/ujsx/docs/architecture/reactive-layer.md
  • ujsx reconciler: @alkdev/ujsx/docs/architecture/reconciler.md
  • Schema: schema.mdNodeStatus, CallStatus
  • Host configs: host-configs.md
  • Workflow templates: workflow-templates.md
  • Call protocol: @alkdev/alkhub_ts/docs/architecture/call-graph.md