add flowgraph architecture docs (Phase 1 SDD)
Draft architecture specification for @alkdev/flowgraph — a workflow graph library providing DAG-based orchestration over operations. Covers two graph types (operation graph, call graph), ujsx workflow templates, GraphologyHost and ReactiveHost configs, signal-driven execution, type-compatibility analysis, error hierarchy, and build/distribution. Includes 3 ADRs: ujsx as template IR, DAG-only enforcement, decoupled storage.
This commit is contained in:
279
docs/architecture/reactive-execution.md
Normal file
279
docs/architecture/reactive-execution.md
Normal file
@@ -0,0 +1,279 @@
|
||||
---
|
||||
status: draft
|
||||
last_updated: 2026-05-19
|
||||
---
|
||||
|
||||
# Reactive Execution
|
||||
|
||||
Signal-driven status propagation, computed preconditions, and abort cascading for workflow template execution.
|
||||
|
||||
## Overview
|
||||
|
||||
The reactive execution layer bridges workflow template structure (DAG) to runtime behavior (call execution). It uses `@preact/signals-core` (via ujsx's reactive layer) to create a signal-backed execution model where:
|
||||
|
||||
- Each `<Operation>` node gets a `signal<NodeStatus>` tracking its lifecycle state
|
||||
- Preconditions are `computed<boolean>` values that automatically resolve when upstream dependencies complete
|
||||
- Abort cascades propagate through the signal graph — setting one node to `"aborted"` automatically prevents downstream nodes from starting
|
||||
|
||||
This layer does NOT execute operations directly. It provides reactive state that the hub coordinator reads and writes. The coordinator calls `registry.execute()` when a node's preconditions are met, and updates the node's status signal when the call completes.
|
||||
|
||||
## ReactiveRoot for Workflows
|
||||
|
||||
```typescript
|
||||
class WorkflowReactiveRoot {
|
||||
private statusMap: Map<string, Signal<NodeStatus>>;
|
||||
private preconditions: Map<string, Computed<boolean>>;
|
||||
private graph: DirectedGraph;
|
||||
private abortMap: Map<string, () => void>;
|
||||
|
||||
constructor(graph: DirectedGraph) {
|
||||
this.graph = graph;
|
||||
this.statusMap = new Map();
|
||||
this.preconditions = new Map();
|
||||
this.abortMap = new Map();
|
||||
this.initializeSignals();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
`WorkflowReactiveRoot` wraps the reactive state for an entire workflow execution. It takes the structural DAG (from the GraphologyHost) and creates reactive state for each operation node.
|
||||
|
||||
### initializeSignals()
|
||||
|
||||
```typescript
|
||||
private initializeSignals(): void {
|
||||
for (const node of this.graph.nodes()) {
|
||||
const attrs = this.graph.getNodeAttributes(node);
|
||||
if (attrs.category !== "operation") continue; // Skip structural nodes (already flattened)
|
||||
|
||||
const status = signal<NodeStatus>("idle");
|
||||
|
||||
const preconditions = computed(() => {
|
||||
const predecessors = this.graph.inNeighbors(node);
|
||||
return predecessors.every(pred => {
|
||||
const predStatus = this.statusMap.get(pred);
|
||||
return predStatus && predStatus.value === "completed";
|
||||
});
|
||||
});
|
||||
|
||||
this.statusMap.set(node, status);
|
||||
this.preconditions.set(node, preconditions);
|
||||
this.abortMap.set(node, () => this.cascadeAbort(node));
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
For each operation node in the DAG:
|
||||
1. Create a `signal<NodeStatus>` starting at `"idle"`
|
||||
2. Create a `computed<boolean>` that's `true` when all predecessor nodes have status `"completed"`
|
||||
3. Register an abort function that cascades to all descendants
|
||||
|
||||
### Status lifecycle
|
||||
|
||||
The signal-based status lifecycle mirrors `CallStatus` with workflow-specific additions:
|
||||
|
||||
```
|
||||
idle → waiting → ready → running → completed
|
||||
→ failed
|
||||
→ aborted → aborted
|
||||
```
|
||||
|
||||
| Status | Meaning | Signal trigger |
|
||||
|--------|---------|---------------|
|
||||
| `idle` | Node just created, no parent completion yet | Initial state |
|
||||
| `waiting` | At least one predecessor is running, none have completed | Any predecessor status change |
|
||||
| `ready` | All predecessors completed (preconditions met) | `computed` resolves to `true` |
|
||||
| `running` | Call executing | Hub sets `status.value = "running"` |
|
||||
| `completed` | Call succeeded | Hub sets `status.value = "completed"` |
|
||||
| `failed` | Call failed | Hub sets `status.value = "failed"` |
|
||||
| `aborted` | Call cancelled (or parent cancelled) | Hub or cascade sets `status.value = "aborted"` |
|
||||
| `skipped` | Conditional branch not taken | Conditional evaluation sets this |
|
||||
|
||||
The hub coordinator reads the `ready` state (via `preconditions`) and triggers execution. When the call completes, the hub writes the new status to the signal. The signal propagates to all downstream `computed` values automatically.
|
||||
|
||||
## Computed Preconditions
|
||||
|
||||
The core innovation of reactive execution: each node's "can I start?" question is a `computed` signal that automatically resolves based on upstream states.
|
||||
|
||||
```typescript
|
||||
const preconditions = computed(() => {
|
||||
const predecessors = graph.inNeighbors(node);
|
||||
return predecessors.every(pred => statusMap.get(pred)!.value === "completed");
|
||||
});
|
||||
```
|
||||
|
||||
This means:
|
||||
- Adding a new predecessor automatically includes it in the check (if the DAG changes)
|
||||
- A predecessor completing automatically re-evaluates all dependent preconditions
|
||||
- An aborted predecessor prevents all dependents from becoming `ready`
|
||||
- No manual event wiring or callback chains
|
||||
|
||||
### Sequential preconditions
|
||||
|
||||
In a sequential group (A → B → C):
|
||||
|
||||
- A's preconditions: `true` (no predecessors, or root-level)
|
||||
- B's preconditions: `A.status === "completed"`
|
||||
- C's preconditions: `B.status === "completed"`
|
||||
|
||||
When A completes → B's preconditions become true → hub starts B → B completes → C's preconditions become true → hub starts C. All without manual event wiring.
|
||||
|
||||
### Parallel preconditions
|
||||
|
||||
In a parallel group (A starts B and C simultaneously):
|
||||
|
||||
- B's preconditions: `A.status === "completed"` (same as any sequential dependency)
|
||||
- C's preconditions: `A.status === "completed"` (shared predecessor)
|
||||
|
||||
Both B and C become `ready` at the same time, and the hub starts them in parallel.
|
||||
|
||||
### Join preconditions
|
||||
|
||||
When a node depends on multiple predecessors (e.g., D depends on both B and C completing):
|
||||
|
||||
- D's preconditions: `B.status === "completed" && C.status === "completed"`
|
||||
|
||||
D only becomes `ready` when all predecessors complete. This is the "join" in fork-join parallelism.
|
||||
|
||||
## Abort Cascade
|
||||
|
||||
Abort cascading is signal-driven. When a node is aborted:
|
||||
|
||||
```typescript
|
||||
cascadeAbort(nodeId: string): void {
|
||||
const status = this.statusMap.get(nodeId);
|
||||
if (status && !isTerminal(status.value)) {
|
||||
status.value = "aborted";
|
||||
// Cascade to all descendants
|
||||
for (const desc of this.graph.descendants(nodeId)) {
|
||||
const descStatus = this.statusMap.get(desc);
|
||||
if (descStatus && !isTerminal(descStatus.value)) {
|
||||
descStatus.value = "aborted";
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
This sets the status of the aborted node and all of its descendants to `"aborted"`. The `computed` preconditions of these nodes automatically re-evaluate — but since aborted nodes never become "completed", their dependents will never become "ready".
|
||||
|
||||
### Interaction with call protocol abort
|
||||
|
||||
There are two abort mechanisms:
|
||||
|
||||
1. **Signal cascade** (this layer) — sets `status.value = "aborted"` for the node and all descendants. This is immediate and graph-based.
|
||||
2. **Call protocol abort** (operations layer) — `PendingRequestMap.abort(requestId)` propagates `call.aborted` events through the pub/sub layer. This is network-aware and handles remote calls.
|
||||
|
||||
The hub coordinator should invoke both:
|
||||
```typescript
|
||||
// When aborting a call:
|
||||
workflowRoot.cascadeAbort(nodeId); // Signal cascade
|
||||
prm.abort(requestId); // Protocol cascade
|
||||
```
|
||||
|
||||
The signal cascade is for local state (the reactive graph). The protocol cascade is for remote state (the running calls). They're complementary — the protocol cascade may take time to propagate, but the signal cascade is instant.
|
||||
|
||||
## NodeStatus vs CallStatus
|
||||
|
||||
`NodeStatus` extends `CallStatus` with workflow-specific states that have no call protocol equivalent:
|
||||
|
||||
| NodeStatus | Meaning | CallStatus equivalent |
|
||||
|-----------|---------|----------------------|
|
||||
| `idle` | Not started, no preconditions evaluated | None (call doesn't exist yet) |
|
||||
| `waiting` | Preconditions not met (upstream still running) | None |
|
||||
| `ready` | Preconditions met, eligible to start | None |
|
||||
| `running` | Call in progress | `running` |
|
||||
| `completed` | Call succeeded | `completed` |
|
||||
| `failed` | Call failed | `failed` |
|
||||
| `aborted` | Call cancelled | `aborted` |
|
||||
| `skipped` | Conditional branch not taken | None |
|
||||
|
||||
The hub coordinator maps between these:
|
||||
|
||||
```typescript
|
||||
// NodeStatus → CallStatus (when starting a call)
|
||||
function nodeStatusToCallAction(status: NodeStatus): "start" | "skip" | "abort" | "none" {
|
||||
switch (status) {
|
||||
case "ready": return "start";
|
||||
case "skipped": return "skip";
|
||||
case "aborted": return "abort";
|
||||
default: return "none";
|
||||
}
|
||||
}
|
||||
|
||||
// CallStatus → NodeStatus (when call event arrives)
|
||||
function callStatusToNodeStatus(callStatus: CallStatus): NodeStatus {
|
||||
// Direct mapping for shared states
|
||||
return callStatus as NodeStatus;
|
||||
}
|
||||
```
|
||||
|
||||
## Effect-Driven Execution
|
||||
|
||||
The hub coordinator uses `effect()` to react to precondition changes:
|
||||
|
||||
```typescript
|
||||
for (const [nodeId, preconditions] of workflowRoot.preconditions) {
|
||||
effect(() => {
|
||||
if (preconditions.value) {
|
||||
const status = workflowRoot.statusMap.get(nodeId)!;
|
||||
if (status.value === "idle" || status.value === "waiting") {
|
||||
// All preconditions met — start the call
|
||||
status.value = "running";
|
||||
const operationId = graph.getNodeAttributes(nodeId).name;
|
||||
prm.call(operationId, getInput(nodeId), { parentRequestId: parentCallId })
|
||||
.then(result => { status.value = "completed"; })
|
||||
.catch(error => { status.value = "failed"; });
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
```
|
||||
|
||||
Each node gets an `effect()` that watches its `preconditions` computed value. When preconditions resolve to `true` and the node is in a startable state (`idle` or `waiting`), the effect starts the call via `PendingRequestMap.call()`.
|
||||
|
||||
The call's promise resolution updates the node's status signal, which triggers downstream preconditions to re-evaluate, which triggers their effects, and so on.
|
||||
|
||||
### Effect disposal
|
||||
|
||||
Each `effect()` returns a dispose function. The `WorkflowReactiveRoot` tracks all effect disposers and provides a `dispose()` method that tears down the entire reactive graph:
|
||||
|
||||
```typescript
|
||||
dispose(): void {
|
||||
for (const disposer of this.effectDisposers) {
|
||||
disposer();
|
||||
}
|
||||
this.statusMap.clear();
|
||||
this.preconditions.clear();
|
||||
this.abortMap.clear();
|
||||
}
|
||||
```
|
||||
|
||||
This is critical for cleaning up when a workflow completes, fails, or is aborted. Without disposal, signal subscriptions leak.
|
||||
|
||||
## Constraints
|
||||
|
||||
- **Signals are in-memory** — `WorkflowReactiveRoot` state is not persisted. If the hub restarts, the reactive state is lost and must be reconstructed from call protocol events + template re-render.
|
||||
- **Effect-driven execution is optional** — the hub coordinator can choose not to use `effect()` and instead poll `preconditions.value` manually. The reactive layer provides the building blocks; the coordinator decides how to use them.
|
||||
- **Abort is immediate in signals, delayed in protocol** — setting `status.value = "aborted"` is instant, but `prm.abort(requestId)` takes time to propagate through the call protocol. The hub should invoke both.
|
||||
- **`skipped` is set by conditional evaluation, not by the call protocol** — a `Conditional` node whose test evaluates to `false` sets its child's status to `skipped`, which prevents the call from ever starting.
|
||||
- **`NodeStatus` and `CallStatus` share terminal states** — `running`, `completed`, `failed`, `aborted` map directly. `idle`, `waiting`, `ready`, `skipped` are workflow-specific additions.
|
||||
|
||||
## Open Questions
|
||||
|
||||
1. **Should preconditions support OR logic?** Currently all predecessors must complete. An `anyOf` predicate would allow "start this node as soon as any predecessor completes." This would require an edge attribute or node-level configuration.
|
||||
|
||||
2. **How are retries handled at the signal level?** If an operation fails and should be retried, the status would go `running → failed → ready → running`. This requires resetting the status back to `ready`, which the current state machine doesn't support (failed is terminal). A `retried` status or a separate `retryCount` attribute may be needed.
|
||||
|
||||
3. **Should the reactive graph support partial re-rendering?** If a template changes mid-execution (e.g., a step is added), the ujsx reconciler could diff the old and new trees. But the ReactiveHost only supports mount rendering. Re-rendering would require reconciler support.
|
||||
|
||||
4. **How does `maxConcurrency` interact with preconditions?** A `Parallel` group with `maxConcurrency: 3` should only start 3 nodes at a time, even though all preconditions are met. This is a scheduling concern, not a structural one. The reactive layer could implement this as a semaphore signal, or it could be the coordinator's responsibility.
|
||||
|
||||
## References
|
||||
|
||||
- ujsx reactive layer: `@alkdev/ujsx/docs/architecture/reactive-layer.md`
|
||||
- ujsx reconciler: `@alkdev/ujsx/docs/architecture/reconciler.md`
|
||||
- Schema: [schema.md](schema.md) — `NodeStatus`, `CallStatus`
|
||||
- Host configs: [host-configs.md](host-configs.md)
|
||||
- Workflow templates: [workflow-templates.md](workflow-templates.md)
|
||||
- Call protocol: `@alkdev/alkhub_ts/docs/architecture/call-graph.md`
|
||||
Reference in New Issue
Block a user