Files
ujsx/docs/research/reconciler/05-flowgraph-host-configs.md

391 lines
14 KiB
Markdown

# Phase 5: Flowgraph HostConfig
## Status: Spec (Draft)
## Problem
Flowgraph is a separate project (`@alkdev/flowgraph`) that uses `@alkdev/ujsx` as a dependency. It renders workflow templates (which are `UNode` trees) to different targets using `HostConfig`. This requires two new host implementations:
1. **Graphology DAG host** — renders a workflow template to a graphology directed acyclic graph for structural analysis, cycle detection, topological sort
2. **Reactive execution host** — renders a workflow template to an in-memory reactive execution engine for runtime workflow execution
## Conceptual Mapping
| Workflow concept | UJSX concept |
|---|---|
| Workflow template | `URoot` / `UElement` tree (`{ type, props, children }`) |
| Operation step | `<Operation name="classify" ...>``UElement` |
| Sequential flow | Parent → child relationship |
| Parallel branches | Multiple children of a `Parallel` component |
| Status propagation | `signal`/`computed` reactive layer |
| Template validation | `TransformRegistry` rule pass |
| Template → DAG | `HostConfig` rendering into graphology |
| Template → execution engine | Different `HostConfig` for different runners |
## Host 1: Graphology DAG
### Purpose
Convert a declarative workflow template into a graphology DAG for:
- Structural analysis (connected components, reachability)
- Cycle detection (prevent invalid workflows)
- Topological sort (execution ordering)
- Critical path analysis
- Visualization
### Type Parameters
```typescript
HostConfig<WorkflowTag, GraphologyNode, GraphologyContext>
```
Where:
- `WorkflowTag` = string literal union: `"operation" | "sequential" | "parallel" | "conditional" | "workflow"`
- `GraphologyNode` = graphology node instance (with attributes)
- `GraphologyContext` = `{ graph: Graph; nodeIdCounter: number }`
### Implementation
```typescript
import Graph from "graphology";
import { HostConfig } from "@alkdev/ujsx/host";
import { hasCycle } from "graphology-dag";
type WorkflowTag = "operation" | "sequential" | "parallel" | "conditional" | "workflow";
interface GraphNodeAttrs {
label: string;
type: WorkflowTag;
status: "pending" | "running" | "completed" | "failed" | "aborted";
props: Record<string, unknown>;
}
interface GraphologyContext {
graph: Graph;
nodeIdCounter: number;
parentStack: string[]; // stack of parent node IDs for edge creation
}
export function createGraphologyHost(): HostConfig<WorkflowTag, string, GraphologyContext> {
return {
name: "graphology-dag",
createRootContext(container, options, context) {
return {
graph: new Graph({ type: "directed", multi: false }),
nodeIdCounter: 0,
parentStack: [],
};
},
finalizeRoot(ctx) {
if (hasCycle(ctx.graph)) {
throw new CycleError("Workflow contains a cycle");
}
},
createInstance(tag, props, ctx, parent) {
const id = `node_${ctx.nodeIdCounter++}`;
ctx.graph.addNode(id, {
label: String(props.name ?? id),
type: tag,
status: "pending",
props,
});
return id;
},
createTextInstance(text, ctx, parent) {
// Workflows don't have text nodes; skip
return `text_${ctx.nodeIdCounter++}`;
},
appendChild(parent, child, ctx) {
ctx.graph.addEdge(parent, child, { type: "dependency" });
},
insertBefore(parent, child, before, ctx) {
ctx.graph.addEdge(parent, child, { type: "dependency" });
// graphology doesn't have order, but we can add an ordinal attribute
},
removeChild(parent, child, ctx) {
ctx.graph.dropEdge(parent, child);
ctx.graph.dropNode(child);
},
prepareUpdate(instance, tag, prevProps, nextProps, ctx) {
const attrs = ctx.graph.getNodeAttributes(instance);
const changed = {};
for (const key of Object.keys(nextProps)) {
if (nextProps[key] !== prevProps[key]) {
changed[key] = nextProps[key];
}
}
return Object.keys(changed).length > 0 ? changed : null;
},
commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
ctx.graph.updateNodeAttributes(instance, (attrs) => ({
...attrs,
props: { ...attrs.props, ...payload },
}));
},
};
}
```
### What the DAG Host Enables
```typescript
import { h, createRoot } from "@alkdev/ujsx";
import { createGraphologyHost } from "@alkdev/flowgraph/host/graphology";
const workflow = h("workflow", { name: "document-pipeline" },
h("sequential", { name: "ingest" },
h("operation", { name: "fetch", type: "http-request", url: "https://..." }),
h("operation", { name: "parse", type: "json-parse" }),
),
h("parallel", { name: "analyze" },
h("operation", { name: "classify", type: "llm-classify" }),
h("operation", { name: "extract", type: "llm-extract" }),
),
h("conditional", { name: "route", condition: "classify.result" },
h("operation", { name: "escalate", type: "notify" }),
h("operation", { name: "archive", type: "store" }),
),
);
const host = createGraphologyHost();
const root = createRoot(host, {});
root.render(workflow);
// ctx.graph is now a graphology DAG
const { graph } = root.ctx;
console.log(graph.order); // number of nodes
console.log(topologicalSort(graph)); // execution order
```
## Host 2: Reactive Execution Engine
### Purpose
Render a workflow template to a reactive execution engine where:
- Each operation node has a `signal` for its status
- Downstream operations use `computed` signals that check upstream completion
- Status propagation is automatic: when `operation_A.status` becomes `completed`, dependent operations automatically start (if preconditions are met)
### Type Parameters
```typescript
HostConfig<WorkflowTag, OperationInstance, ExecutionContext>
```
Where:
- `OperationInstance` = `{ id: string; status: Signal<OpStatus>; result: Signal<unknown>; definition: Record<string, unknown> }`
- `ExecutionContext` = `{ operations: Map<string, OperationInstance>; runner: OperationRunner }`
### Implementation Sketch
```typescript
import { signal, computed, batch } from "@preact/signals-core";
type OpStatus = "pending" | "running" | "completed" | "failed" | "aborted";
interface OperationInstance {
id: string;
status: Signal<OpStatus>;
result: Signal<unknown>;
definition: Record<string, unknown>;
}
interface ExecutionContext {
operations: Map<string, OperationInstance>;
parentStack: string[];
runner: OperationRunner;
}
export function createReactiveHost(runner: OperationRunner): HostConfig<WorkflowTag, OperationInstance, ExecutionContext> {
return {
name: "reactive-execution",
createRootContext(container, options) {
return {
operations: new Map(),
parentStack: [],
runner,
};
},
createInstance(tag, props, ctx) {
const instance: OperationInstance = {
id: String(props.name ?? `op_${ctx.operations.size}`),
status: signal<OpStatus>("pending"),
result: signal<unknown>(null),
definition: props,
};
ctx.operations.set(instance.id, instance);
return instance;
},
appendChild(parent, child, ctx) {
// Establish dependency: child depends on parent
// When parent completes, child's preconditions are re-evaluated
const parentInstance = typeof parent === "string"
? ctx.operations.get(parent)!
: parent as OperationInstance;
const childInstance = child as OperationInstance;
// Create a computed that watches parent status
const preconditionsMet = computed(() => {
// For sequential: parent must be completed
if (parentInstance.status.value === "completed") return true;
return false;
});
// Effect: when preconditions are met, start the child operation
effect(() => {
if (preconditionsMet.value && childInstance.status.value === "pending") {
batch(() => {
childInstance.status.value = "running";
ctx.runner.execute(childInstance).then(
(result) => batch(() => {
childInstance.result.value = result;
childInstance.status.value = "completed";
}),
(error) => batch(() => {
childInstance.result.value = error;
childInstance.status.value = "failed";
}),
);
});
}
});
},
commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
// Update the operation's definition
instance.definition = { ...instance.definition, ...payload };
},
// ... remaining methods
};
}
```
### Abort Cascade
The reactive host naturally supports abort cascading. When a parent operation fails, downstream operations can be automatically aborted:
```typescript
// In appendChild, also watch for parent failure:
const abortTrigger = computed(() => {
return parentInstance.status.value === "failed" || parentInstance.status.value === "aborted";
});
effect(() => {
if (abortTrigger.value && childInstance.status.value === "pending") {
batch(() => {
childInstance.status.value = "aborted";
});
}
});
```
This is the key advantage of the signal-based approach: no manual event wiring for status propagation. The reactive graph handles it automatically.
## Integration with Flowgraph
### Flowgraph Package Structure
```
@alkdev/flowgraph/
src/
component/ # ujsx components for workflow definition
operation.ts # <Operation name="..." /> component
sequential.ts # <Sequential> — sequential execution
parallel.ts # <Parallel> — concurrent execution
conditional.ts # <Conditional> — branching
index.ts
host/
graphology.ts # HostConfig: ujsx template → graphology DAG
reactive.ts # HostConfig: ujsx template → reactive execution engine
schema/
enums.ts # CallStatus, NodeStatus, EdgeType
node.ts # WorkflowNode schema (OperationNode, CallNode)
edge.ts # WorkflowEdge schema
graph.ts # SerializedGraph factory + flowgraph schemas
graph/
construction.ts # FlowGraph class (wraps graphology)
validation.ts # Cycle detection, type-compat checking
queries.ts # topological sort, ancestors/descendants, critical path
mutation.ts # addNode, addEdge, updateStatus
reactive/
workflow.ts # ReactiveRoot for workflow state
node.ts # Per-node signal management
analysis/
type-compat.ts # Output→Input schema compatibility checking
workflow.ts # Execution ordering, precondition validation
defaults.ts # Enum defaults
error/
index.ts # FlowgraphError, CycleError, TypeCompatError
index.ts # Public API
```
### Dependencies
```
@alkdev/typebox # Schema definitions
@alkdev/pubsub # Event transport (peer dep, optional)
@alkdev/operations # OperationSpec types for type-compat edges (peer dep)
@alkdev/ujsx # Template definition, reactive layer, transform
@preact/signals-core # Transitive via ujsx, reactive state
graphology # DAG structural analysis
graphology-dag # Cycle detection, topological sort
```
### Key Design Decision
`@alkdev/ujsx` should be a **direct dependency** of `@alkdev/flowgraph`. The workflow template IS the canonical representation, and the graphology DAG is one rendering target. This keeps things simple — one package, clear dependency chain.
The `from_ujsx` adapter for operations stays in the operations package as a separate export path (consistent with how it already handles `from-mcp` and `from-openapi`).
## Changes to UJSX
| File | Change |
|------|--------|
| None specific | Flowgraph is a consumer, not a modification of ujsx. |
However, this phase validates the reconciler design: if the flowgraph host configs cannot be built cleanly on top of the Phase 1-3 reconciler, the reconciler design needs adjustment. Build both hosts early as integration tests.
## Dependencies
- Phases 1-3 must be complete (the reactive host needs proper signal disposal, the graphology host needs `removeChild`)
- Phase 4 is optional but highly beneficial (graphology host benefits from `Value.Equal` bail-out on unchanged workflows)
- `graphology` and `graphology-dag` — new external dependencies for flowgraph (not ujsx)
## Open Questions
1. **Should flowgraph components be in `@alkdev/flowgraph` or `@alkdev/ujsx`?** The workflow components (`<Operation>`, `<Sequential>`, `<Parallel>`) could go in either package. Putting them in flowgraph keeps ujsx generic. Putting them in ujsx (under a separate export like `@alkdev/ujsx/workflow`) makes them reusable. Recommendation: keep them in flowgraph — they're domain-specific.
2. **How does the reactive host handle async operations?** `effect()` is synchronous. The `runner.execute()` call is async. Need to ensure the `effect` that starts execution doesn't re-run on every signal read inside the async chain.
3. **Should the graphology host support incremental updates?** If a workflow template changes (add/remove operations), the reconciler should update the graph in place rather than rebuilding it. This requires Phase 2 (key-based reconciliation) to be working.
## Test Cases
For graphology host:
1. Render simple sequential workflow → correct DAG structure
2. Render parallel branches → correct multi-child edges
3. Cycle detection on invalid workflow
4. Topological sort order matches declaration order
5. Update an operation's props → graphology node attributes update
6. Remove an operation → graphology node and edges removed
For reactive host:
1. Execute sequential workflow → operations run in order
2. Execute parallel branches → operations start concurrently
3. Operation failure → downstream operations aborted
4. Status propagation is automatic (no manual wiring)
5. Disposing a workflow stops all pending operations
6. Signal subscriptions are cleaned up on unmount