add reconciler implementation plan: 6-phase spec with dependency graph and parallelism analysis
This commit is contained in:
391
docs/research/reconciler/05-flowgraph-host-configs.md
Normal file
391
docs/research/reconciler/05-flowgraph-host-configs.md
Normal file
@@ -0,0 +1,391 @@
|
||||
# Phase 5: Flowgraph HostConfig
|
||||
|
||||
## Status: Spec (Draft)
|
||||
|
||||
## Problem
|
||||
|
||||
Flowgraph is a separate project (`@alkdev/flowgraph`) that uses `@alkdev/ujsx` as a dependency. It renders workflow templates (which are `UNode` trees) to different targets using `HostConfig`. This requires two new host implementations:
|
||||
|
||||
1. **Graphology DAG host** — renders a workflow template to a graphology directed acyclic graph for structural analysis, cycle detection, topological sort
|
||||
2. **Reactive execution host** — renders a workflow template to an in-memory reactive execution engine for runtime workflow execution
|
||||
|
||||
## Conceptual Mapping
|
||||
|
||||
| Workflow concept | UJSX concept |
|
||||
|---|---|
|
||||
| Workflow template | `URoot` / `UElement` tree (`{ type, props, children }`) |
|
||||
| Operation step | `<Operation name="classify" ...>` → `UElement` |
|
||||
| Sequential flow | Parent → child relationship |
|
||||
| Parallel branches | Multiple children of a `Parallel` component |
|
||||
| Status propagation | `signal`/`computed` reactive layer |
|
||||
| Template validation | `TransformRegistry` rule pass |
|
||||
| Template → DAG | `HostConfig` rendering into graphology |
|
||||
| Template → execution engine | Different `HostConfig` for different runners |
|
||||
|
||||
## Host 1: Graphology DAG
|
||||
|
||||
### Purpose
|
||||
|
||||
Convert a declarative workflow template into a graphology DAG for:
|
||||
- Structural analysis (connected components, reachability)
|
||||
- Cycle detection (prevent invalid workflows)
|
||||
- Topological sort (execution ordering)
|
||||
- Critical path analysis
|
||||
- Visualization
|
||||
|
||||
### Type Parameters
|
||||
|
||||
```typescript
|
||||
HostConfig<WorkflowTag, GraphologyNode, GraphologyContext>
|
||||
```
|
||||
|
||||
Where:
|
||||
- `WorkflowTag` = string literal union: `"operation" | "sequential" | "parallel" | "conditional" | "workflow"`
|
||||
- `GraphologyNode` = graphology node instance (with attributes)
|
||||
- `GraphologyContext` = `{ graph: Graph; nodeIdCounter: number }`
|
||||
|
||||
### Implementation
|
||||
|
||||
```typescript
|
||||
import Graph from "graphology";
|
||||
import { HostConfig } from "@alkdev/ujsx/host";
|
||||
import { hasCycle } from "graphology-dag";
|
||||
|
||||
type WorkflowTag = "operation" | "sequential" | "parallel" | "conditional" | "workflow";
|
||||
|
||||
interface GraphNodeAttrs {
|
||||
label: string;
|
||||
type: WorkflowTag;
|
||||
status: "pending" | "running" | "completed" | "failed" | "aborted";
|
||||
props: Record<string, unknown>;
|
||||
}
|
||||
|
||||
interface GraphologyContext {
|
||||
graph: Graph;
|
||||
nodeIdCounter: number;
|
||||
parentStack: string[]; // stack of parent node IDs for edge creation
|
||||
}
|
||||
|
||||
export function createGraphologyHost(): HostConfig<WorkflowTag, string, GraphologyContext> {
|
||||
return {
|
||||
name: "graphology-dag",
|
||||
|
||||
createRootContext(container, options, context) {
|
||||
return {
|
||||
graph: new Graph({ type: "directed", multi: false }),
|
||||
nodeIdCounter: 0,
|
||||
parentStack: [],
|
||||
};
|
||||
},
|
||||
|
||||
finalizeRoot(ctx) {
|
||||
if (hasCycle(ctx.graph)) {
|
||||
throw new CycleError("Workflow contains a cycle");
|
||||
}
|
||||
},
|
||||
|
||||
createInstance(tag, props, ctx, parent) {
|
||||
const id = `node_${ctx.nodeIdCounter++}`;
|
||||
ctx.graph.addNode(id, {
|
||||
label: String(props.name ?? id),
|
||||
type: tag,
|
||||
status: "pending",
|
||||
props,
|
||||
});
|
||||
return id;
|
||||
},
|
||||
|
||||
createTextInstance(text, ctx, parent) {
|
||||
// Workflows don't have text nodes; skip
|
||||
return `text_${ctx.nodeIdCounter++}`;
|
||||
},
|
||||
|
||||
appendChild(parent, child, ctx) {
|
||||
ctx.graph.addEdge(parent, child, { type: "dependency" });
|
||||
},
|
||||
|
||||
insertBefore(parent, child, before, ctx) {
|
||||
ctx.graph.addEdge(parent, child, { type: "dependency" });
|
||||
// graphology doesn't have order, but we can add an ordinal attribute
|
||||
},
|
||||
|
||||
removeChild(parent, child, ctx) {
|
||||
ctx.graph.dropEdge(parent, child);
|
||||
ctx.graph.dropNode(child);
|
||||
},
|
||||
|
||||
prepareUpdate(instance, tag, prevProps, nextProps, ctx) {
|
||||
const attrs = ctx.graph.getNodeAttributes(instance);
|
||||
const changed = {};
|
||||
for (const key of Object.keys(nextProps)) {
|
||||
if (nextProps[key] !== prevProps[key]) {
|
||||
changed[key] = nextProps[key];
|
||||
}
|
||||
}
|
||||
return Object.keys(changed).length > 0 ? changed : null;
|
||||
},
|
||||
|
||||
commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
|
||||
ctx.graph.updateNodeAttributes(instance, (attrs) => ({
|
||||
...attrs,
|
||||
props: { ...attrs.props, ...payload },
|
||||
}));
|
||||
},
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
### What the DAG Host Enables
|
||||
|
||||
```typescript
|
||||
import { h, createRoot } from "@alkdev/ujsx";
|
||||
import { createGraphologyHost } from "@alkdev/flowgraph/host/graphology";
|
||||
|
||||
const workflow = h("workflow", { name: "document-pipeline" },
|
||||
h("sequential", { name: "ingest" },
|
||||
h("operation", { name: "fetch", type: "http-request", url: "https://..." }),
|
||||
h("operation", { name: "parse", type: "json-parse" }),
|
||||
),
|
||||
h("parallel", { name: "analyze" },
|
||||
h("operation", { name: "classify", type: "llm-classify" }),
|
||||
h("operation", { name: "extract", type: "llm-extract" }),
|
||||
),
|
||||
h("conditional", { name: "route", condition: "classify.result" },
|
||||
h("operation", { name: "escalate", type: "notify" }),
|
||||
h("operation", { name: "archive", type: "store" }),
|
||||
),
|
||||
);
|
||||
|
||||
const host = createGraphologyHost();
|
||||
const root = createRoot(host, {});
|
||||
root.render(workflow);
|
||||
|
||||
// ctx.graph is now a graphology DAG
|
||||
const { graph } = root.ctx;
|
||||
console.log(graph.order); // number of nodes
|
||||
console.log(topologicalSort(graph)); // execution order
|
||||
```
|
||||
|
||||
## Host 2: Reactive Execution Engine
|
||||
|
||||
### Purpose
|
||||
|
||||
Render a workflow template to a reactive execution engine where:
|
||||
- Each operation node has a `signal` for its status
|
||||
- Downstream operations use `computed` signals that check upstream completion
|
||||
- Status propagation is automatic: when `operation_A.status` becomes `completed`, dependent operations automatically start (if preconditions are met)
|
||||
|
||||
### Type Parameters
|
||||
|
||||
```typescript
|
||||
HostConfig<WorkflowTag, OperationInstance, ExecutionContext>
|
||||
```
|
||||
|
||||
Where:
|
||||
- `OperationInstance` = `{ id: string; status: Signal<OpStatus>; result: Signal<unknown>; definition: Record<string, unknown> }`
|
||||
- `ExecutionContext` = `{ operations: Map<string, OperationInstance>; runner: OperationRunner }`
|
||||
|
||||
### Implementation Sketch
|
||||
|
||||
```typescript
|
||||
import { signal, computed, batch } from "@preact/signals-core";
|
||||
|
||||
type OpStatus = "pending" | "running" | "completed" | "failed" | "aborted";
|
||||
|
||||
interface OperationInstance {
|
||||
id: string;
|
||||
status: Signal<OpStatus>;
|
||||
result: Signal<unknown>;
|
||||
definition: Record<string, unknown>;
|
||||
}
|
||||
|
||||
interface ExecutionContext {
|
||||
operations: Map<string, OperationInstance>;
|
||||
parentStack: string[];
|
||||
runner: OperationRunner;
|
||||
}
|
||||
|
||||
export function createReactiveHost(runner: OperationRunner): HostConfig<WorkflowTag, OperationInstance, ExecutionContext> {
|
||||
return {
|
||||
name: "reactive-execution",
|
||||
|
||||
createRootContext(container, options) {
|
||||
return {
|
||||
operations: new Map(),
|
||||
parentStack: [],
|
||||
runner,
|
||||
};
|
||||
},
|
||||
|
||||
createInstance(tag, props, ctx) {
|
||||
const instance: OperationInstance = {
|
||||
id: String(props.name ?? `op_${ctx.operations.size}`),
|
||||
status: signal<OpStatus>("pending"),
|
||||
result: signal<unknown>(null),
|
||||
definition: props,
|
||||
};
|
||||
ctx.operations.set(instance.id, instance);
|
||||
return instance;
|
||||
},
|
||||
|
||||
appendChild(parent, child, ctx) {
|
||||
// Establish dependency: child depends on parent
|
||||
// When parent completes, child's preconditions are re-evaluated
|
||||
const parentInstance = typeof parent === "string"
|
||||
? ctx.operations.get(parent)!
|
||||
: parent as OperationInstance;
|
||||
const childInstance = child as OperationInstance;
|
||||
|
||||
// Create a computed that watches parent status
|
||||
const preconditionsMet = computed(() => {
|
||||
// For sequential: parent must be completed
|
||||
if (parentInstance.status.value === "completed") return true;
|
||||
return false;
|
||||
});
|
||||
|
||||
// Effect: when preconditions are met, start the child operation
|
||||
effect(() => {
|
||||
if (preconditionsMet.value && childInstance.status.value === "pending") {
|
||||
batch(() => {
|
||||
childInstance.status.value = "running";
|
||||
ctx.runner.execute(childInstance).then(
|
||||
(result) => batch(() => {
|
||||
childInstance.result.value = result;
|
||||
childInstance.status.value = "completed";
|
||||
}),
|
||||
(error) => batch(() => {
|
||||
childInstance.result.value = error;
|
||||
childInstance.status.value = "failed";
|
||||
}),
|
||||
);
|
||||
});
|
||||
}
|
||||
});
|
||||
},
|
||||
|
||||
commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
|
||||
// Update the operation's definition
|
||||
instance.definition = { ...instance.definition, ...payload };
|
||||
},
|
||||
|
||||
// ... remaining methods
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
### Abort Cascade
|
||||
|
||||
The reactive host naturally supports abort cascading. When a parent operation fails, downstream operations can be automatically aborted:
|
||||
|
||||
```typescript
|
||||
// In appendChild, also watch for parent failure:
|
||||
const abortTrigger = computed(() => {
|
||||
return parentInstance.status.value === "failed" || parentInstance.status.value === "aborted";
|
||||
});
|
||||
|
||||
effect(() => {
|
||||
if (abortTrigger.value && childInstance.status.value === "pending") {
|
||||
batch(() => {
|
||||
childInstance.status.value = "aborted";
|
||||
});
|
||||
}
|
||||
});
|
||||
```
|
||||
|
||||
This is the key advantage of the signal-based approach: no manual event wiring for status propagation. The reactive graph handles it automatically.
|
||||
|
||||
## Integration with Flowgraph
|
||||
|
||||
### Flowgraph Package Structure
|
||||
|
||||
```
|
||||
@alkdev/flowgraph/
|
||||
src/
|
||||
component/ # ujsx components for workflow definition
|
||||
operation.ts # <Operation name="..." /> component
|
||||
sequential.ts # <Sequential> — sequential execution
|
||||
parallel.ts # <Parallel> — concurrent execution
|
||||
conditional.ts # <Conditional> — branching
|
||||
index.ts
|
||||
host/
|
||||
graphology.ts # HostConfig: ujsx template → graphology DAG
|
||||
reactive.ts # HostConfig: ujsx template → reactive execution engine
|
||||
schema/
|
||||
enums.ts # CallStatus, NodeStatus, EdgeType
|
||||
node.ts # WorkflowNode schema (OperationNode, CallNode)
|
||||
edge.ts # WorkflowEdge schema
|
||||
graph.ts # SerializedGraph factory + flowgraph schemas
|
||||
graph/
|
||||
construction.ts # FlowGraph class (wraps graphology)
|
||||
validation.ts # Cycle detection, type-compat checking
|
||||
queries.ts # topological sort, ancestors/descendants, critical path
|
||||
mutation.ts # addNode, addEdge, updateStatus
|
||||
reactive/
|
||||
workflow.ts # ReactiveRoot for workflow state
|
||||
node.ts # Per-node signal management
|
||||
analysis/
|
||||
type-compat.ts # Output→Input schema compatibility checking
|
||||
workflow.ts # Execution ordering, precondition validation
|
||||
defaults.ts # Enum defaults
|
||||
error/
|
||||
index.ts # FlowgraphError, CycleError, TypeCompatError
|
||||
index.ts # Public API
|
||||
```
|
||||
|
||||
### Dependencies
|
||||
|
||||
```
|
||||
@alkdev/typebox # Schema definitions
|
||||
@alkdev/pubsub # Event transport (peer dep, optional)
|
||||
@alkdev/operations # OperationSpec types for type-compat edges (peer dep)
|
||||
@alkdev/ujsx # Template definition, reactive layer, transform
|
||||
@preact/signals-core # Transitive via ujsx, reactive state
|
||||
graphology # DAG structural analysis
|
||||
graphology-dag # Cycle detection, topological sort
|
||||
```
|
||||
|
||||
### Key Design Decision
|
||||
|
||||
`@alkdev/ujsx` should be a **direct dependency** of `@alkdev/flowgraph`. The workflow template IS the canonical representation, and the graphology DAG is one rendering target. This keeps things simple — one package, clear dependency chain.
|
||||
|
||||
The `from_ujsx` adapter for operations stays in the operations package as a separate export path (consistent with how it already handles `from-mcp` and `from-openapi`).
|
||||
|
||||
## Changes to UJSX
|
||||
|
||||
| File | Change |
|
||||
|------|--------|
|
||||
| None specific | Flowgraph is a consumer, not a modification of ujsx. |
|
||||
|
||||
However, this phase validates the reconciler design: if the flowgraph host configs cannot be built cleanly on top of the Phase 1-3 reconciler, the reconciler design needs adjustment. Build both hosts early as integration tests.
|
||||
|
||||
## Dependencies
|
||||
|
||||
- Phases 1-3 must be complete (the reactive host needs proper signal disposal, the graphology host needs `removeChild`)
|
||||
- Phase 4 is optional but highly beneficial (graphology host benefits from `Value.Equal` bail-out on unchanged workflows)
|
||||
- `graphology` and `graphology-dag` — new external dependencies for flowgraph (not ujsx)
|
||||
|
||||
## Open Questions
|
||||
|
||||
1. **Should flowgraph components be in `@alkdev/flowgraph` or `@alkdev/ujsx`?** The workflow components (`<Operation>`, `<Sequential>`, `<Parallel>`) could go in either package. Putting them in flowgraph keeps ujsx generic. Putting them in ujsx (under a separate export like `@alkdev/ujsx/workflow`) makes them reusable. Recommendation: keep them in flowgraph — they're domain-specific.
|
||||
|
||||
2. **How does the reactive host handle async operations?** `effect()` is synchronous. The `runner.execute()` call is async. Need to ensure the `effect` that starts execution doesn't re-run on every signal read inside the async chain.
|
||||
|
||||
3. **Should the graphology host support incremental updates?** If a workflow template changes (add/remove operations), the reconciler should update the graph in place rather than rebuilding it. This requires Phase 2 (key-based reconciliation) to be working.
|
||||
|
||||
## Test Cases
|
||||
|
||||
For graphology host:
|
||||
1. Render simple sequential workflow → correct DAG structure
|
||||
2. Render parallel branches → correct multi-child edges
|
||||
3. Cycle detection on invalid workflow
|
||||
4. Topological sort order matches declaration order
|
||||
5. Update an operation's props → graphology node attributes update
|
||||
6. Remove an operation → graphology node and edges removed
|
||||
|
||||
For reactive host:
|
||||
1. Execute sequential workflow → operations run in order
|
||||
2. Execute parallel branches → operations start concurrently
|
||||
3. Operation failure → downstream operations aborted
|
||||
4. Status propagation is automatic (no manual wiring)
|
||||
5. Disposing a workflow stops all pending operations
|
||||
6. Signal subscriptions are cleaned up on unmount
|
||||
Reference in New Issue
Block a user