391 lines
14 KiB
Markdown
391 lines
14 KiB
Markdown
# Phase 5: Flowgraph HostConfig
|
|
|
|
## Status: Spec (Draft)
|
|
|
|
## Problem
|
|
|
|
Flowgraph is a separate project (`@alkdev/flowgraph`) that uses `@alkdev/ujsx` as a dependency. It renders workflow templates (which are `UNode` trees) to different targets using `HostConfig`. This requires two new host implementations:
|
|
|
|
1. **Graphology DAG host** — renders a workflow template to a graphology directed acyclic graph for structural analysis, cycle detection, topological sort
|
|
2. **Reactive execution host** — renders a workflow template to an in-memory reactive execution engine for runtime workflow execution
|
|
|
|
## Conceptual Mapping
|
|
|
|
| Workflow concept | UJSX concept |
|
|
|---|---|
|
|
| Workflow template | `URoot` / `UElement` tree (`{ type, props, children }`) |
|
|
| Operation step | `<Operation name="classify" ...>` → `UElement` |
|
|
| Sequential flow | Parent → child relationship |
|
|
| Parallel branches | Multiple children of a `Parallel` component |
|
|
| Status propagation | `signal`/`computed` reactive layer |
|
|
| Template validation | `TransformRegistry` rule pass |
|
|
| Template → DAG | `HostConfig` rendering into graphology |
|
|
| Template → execution engine | Different `HostConfig` for different runners |
|
|
|
|
## Host 1: Graphology DAG
|
|
|
|
### Purpose
|
|
|
|
Convert a declarative workflow template into a graphology DAG for:
|
|
- Structural analysis (connected components, reachability)
|
|
- Cycle detection (prevent invalid workflows)
|
|
- Topological sort (execution ordering)
|
|
- Critical path analysis
|
|
- Visualization
|
|
|
|
### Type Parameters
|
|
|
|
```typescript
|
|
HostConfig<WorkflowTag, GraphologyNode, GraphologyContext>
|
|
```
|
|
|
|
Where:
|
|
- `WorkflowTag` = string literal union: `"operation" | "sequential" | "parallel" | "conditional" | "workflow"`
|
|
- `GraphologyNode` = graphology node instance (with attributes)
|
|
- `GraphologyContext` = `{ graph: Graph; nodeIdCounter: number }`
|
|
|
|
### Implementation
|
|
|
|
```typescript
|
|
import Graph from "graphology";
|
|
import { HostConfig } from "@alkdev/ujsx/host";
|
|
import { hasCycle } from "graphology-dag";
|
|
|
|
type WorkflowTag = "operation" | "sequential" | "parallel" | "conditional" | "workflow";
|
|
|
|
interface GraphNodeAttrs {
|
|
label: string;
|
|
type: WorkflowTag;
|
|
status: "pending" | "running" | "completed" | "failed" | "aborted";
|
|
props: Record<string, unknown>;
|
|
}
|
|
|
|
interface GraphologyContext {
|
|
graph: Graph;
|
|
nodeIdCounter: number;
|
|
parentStack: string[]; // stack of parent node IDs for edge creation
|
|
}
|
|
|
|
export function createGraphologyHost(): HostConfig<WorkflowTag, string, GraphologyContext> {
|
|
return {
|
|
name: "graphology-dag",
|
|
|
|
createRootContext(container, options, context) {
|
|
return {
|
|
graph: new Graph({ type: "directed", multi: false }),
|
|
nodeIdCounter: 0,
|
|
parentStack: [],
|
|
};
|
|
},
|
|
|
|
finalizeRoot(ctx) {
|
|
if (hasCycle(ctx.graph)) {
|
|
throw new CycleError("Workflow contains a cycle");
|
|
}
|
|
},
|
|
|
|
createInstance(tag, props, ctx, parent) {
|
|
const id = `node_${ctx.nodeIdCounter++}`;
|
|
ctx.graph.addNode(id, {
|
|
label: String(props.name ?? id),
|
|
type: tag,
|
|
status: "pending",
|
|
props,
|
|
});
|
|
return id;
|
|
},
|
|
|
|
createTextInstance(text, ctx, parent) {
|
|
// Workflows don't have text nodes; skip
|
|
return `text_${ctx.nodeIdCounter++}`;
|
|
},
|
|
|
|
appendChild(parent, child, ctx) {
|
|
ctx.graph.addEdge(parent, child, { type: "dependency" });
|
|
},
|
|
|
|
insertBefore(parent, child, before, ctx) {
|
|
ctx.graph.addEdge(parent, child, { type: "dependency" });
|
|
// graphology doesn't have order, but we can add an ordinal attribute
|
|
},
|
|
|
|
removeChild(parent, child, ctx) {
|
|
ctx.graph.dropEdge(parent, child);
|
|
ctx.graph.dropNode(child);
|
|
},
|
|
|
|
prepareUpdate(instance, tag, prevProps, nextProps, ctx) {
|
|
const attrs = ctx.graph.getNodeAttributes(instance);
|
|
const changed = {};
|
|
for (const key of Object.keys(nextProps)) {
|
|
if (nextProps[key] !== prevProps[key]) {
|
|
changed[key] = nextProps[key];
|
|
}
|
|
}
|
|
return Object.keys(changed).length > 0 ? changed : null;
|
|
},
|
|
|
|
commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
|
|
ctx.graph.updateNodeAttributes(instance, (attrs) => ({
|
|
...attrs,
|
|
props: { ...attrs.props, ...payload },
|
|
}));
|
|
},
|
|
};
|
|
}
|
|
```
|
|
|
|
### What the DAG Host Enables
|
|
|
|
```typescript
|
|
import { h, createRoot } from "@alkdev/ujsx";
|
|
import { createGraphologyHost } from "@alkdev/flowgraph/host/graphology";
|
|
|
|
const workflow = h("workflow", { name: "document-pipeline" },
|
|
h("sequential", { name: "ingest" },
|
|
h("operation", { name: "fetch", type: "http-request", url: "https://..." }),
|
|
h("operation", { name: "parse", type: "json-parse" }),
|
|
),
|
|
h("parallel", { name: "analyze" },
|
|
h("operation", { name: "classify", type: "llm-classify" }),
|
|
h("operation", { name: "extract", type: "llm-extract" }),
|
|
),
|
|
h("conditional", { name: "route", condition: "classify.result" },
|
|
h("operation", { name: "escalate", type: "notify" }),
|
|
h("operation", { name: "archive", type: "store" }),
|
|
),
|
|
);
|
|
|
|
const host = createGraphologyHost();
|
|
const root = createRoot(host, {});
|
|
root.render(workflow);
|
|
|
|
// ctx.graph is now a graphology DAG
|
|
const { graph } = root.ctx;
|
|
console.log(graph.order); // number of nodes
|
|
console.log(topologicalSort(graph)); // execution order
|
|
```
|
|
|
|
## Host 2: Reactive Execution Engine
|
|
|
|
### Purpose
|
|
|
|
Render a workflow template to a reactive execution engine where:
|
|
- Each operation node has a `signal` for its status
|
|
- Downstream operations use `computed` signals that check upstream completion
|
|
- Status propagation is automatic: when `operation_A.status` becomes `completed`, dependent operations automatically start (if preconditions are met)
|
|
|
|
### Type Parameters
|
|
|
|
```typescript
|
|
HostConfig<WorkflowTag, OperationInstance, ExecutionContext>
|
|
```
|
|
|
|
Where:
|
|
- `OperationInstance` = `{ id: string; status: Signal<OpStatus>; result: Signal<unknown>; definition: Record<string, unknown> }`
|
|
- `ExecutionContext` = `{ operations: Map<string, OperationInstance>; runner: OperationRunner }`
|
|
|
|
### Implementation Sketch
|
|
|
|
```typescript
|
|
import { signal, computed, batch } from "@preact/signals-core";
|
|
|
|
type OpStatus = "pending" | "running" | "completed" | "failed" | "aborted";
|
|
|
|
interface OperationInstance {
|
|
id: string;
|
|
status: Signal<OpStatus>;
|
|
result: Signal<unknown>;
|
|
definition: Record<string, unknown>;
|
|
}
|
|
|
|
interface ExecutionContext {
|
|
operations: Map<string, OperationInstance>;
|
|
parentStack: string[];
|
|
runner: OperationRunner;
|
|
}
|
|
|
|
export function createReactiveHost(runner: OperationRunner): HostConfig<WorkflowTag, OperationInstance, ExecutionContext> {
|
|
return {
|
|
name: "reactive-execution",
|
|
|
|
createRootContext(container, options) {
|
|
return {
|
|
operations: new Map(),
|
|
parentStack: [],
|
|
runner,
|
|
};
|
|
},
|
|
|
|
createInstance(tag, props, ctx) {
|
|
const instance: OperationInstance = {
|
|
id: String(props.name ?? `op_${ctx.operations.size}`),
|
|
status: signal<OpStatus>("pending"),
|
|
result: signal<unknown>(null),
|
|
definition: props,
|
|
};
|
|
ctx.operations.set(instance.id, instance);
|
|
return instance;
|
|
},
|
|
|
|
appendChild(parent, child, ctx) {
|
|
// Establish dependency: child depends on parent
|
|
// When parent completes, child's preconditions are re-evaluated
|
|
const parentInstance = typeof parent === "string"
|
|
? ctx.operations.get(parent)!
|
|
: parent as OperationInstance;
|
|
const childInstance = child as OperationInstance;
|
|
|
|
// Create a computed that watches parent status
|
|
const preconditionsMet = computed(() => {
|
|
// For sequential: parent must be completed
|
|
if (parentInstance.status.value === "completed") return true;
|
|
return false;
|
|
});
|
|
|
|
// Effect: when preconditions are met, start the child operation
|
|
effect(() => {
|
|
if (preconditionsMet.value && childInstance.status.value === "pending") {
|
|
batch(() => {
|
|
childInstance.status.value = "running";
|
|
ctx.runner.execute(childInstance).then(
|
|
(result) => batch(() => {
|
|
childInstance.result.value = result;
|
|
childInstance.status.value = "completed";
|
|
}),
|
|
(error) => batch(() => {
|
|
childInstance.result.value = error;
|
|
childInstance.status.value = "failed";
|
|
}),
|
|
);
|
|
});
|
|
}
|
|
});
|
|
},
|
|
|
|
commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
|
|
// Update the operation's definition
|
|
instance.definition = { ...instance.definition, ...payload };
|
|
},
|
|
|
|
// ... remaining methods
|
|
};
|
|
}
|
|
```
|
|
|
|
### Abort Cascade
|
|
|
|
The reactive host naturally supports abort cascading. When a parent operation fails, downstream operations can be automatically aborted:
|
|
|
|
```typescript
|
|
// In appendChild, also watch for parent failure:
|
|
const abortTrigger = computed(() => {
|
|
return parentInstance.status.value === "failed" || parentInstance.status.value === "aborted";
|
|
});
|
|
|
|
effect(() => {
|
|
if (abortTrigger.value && childInstance.status.value === "pending") {
|
|
batch(() => {
|
|
childInstance.status.value = "aborted";
|
|
});
|
|
}
|
|
});
|
|
```
|
|
|
|
This is the key advantage of the signal-based approach: no manual event wiring for status propagation. The reactive graph handles it automatically.
|
|
|
|
## Integration with Flowgraph
|
|
|
|
### Flowgraph Package Structure
|
|
|
|
```
|
|
@alkdev/flowgraph/
|
|
src/
|
|
component/ # ujsx components for workflow definition
|
|
operation.ts # <Operation name="..." /> component
|
|
sequential.ts # <Sequential> — sequential execution
|
|
parallel.ts # <Parallel> — concurrent execution
|
|
conditional.ts # <Conditional> — branching
|
|
index.ts
|
|
host/
|
|
graphology.ts # HostConfig: ujsx template → graphology DAG
|
|
reactive.ts # HostConfig: ujsx template → reactive execution engine
|
|
schema/
|
|
enums.ts # CallStatus, NodeStatus, EdgeType
|
|
node.ts # WorkflowNode schema (OperationNode, CallNode)
|
|
edge.ts # WorkflowEdge schema
|
|
graph.ts # SerializedGraph factory + flowgraph schemas
|
|
graph/
|
|
construction.ts # FlowGraph class (wraps graphology)
|
|
validation.ts # Cycle detection, type-compat checking
|
|
queries.ts # topological sort, ancestors/descendants, critical path
|
|
mutation.ts # addNode, addEdge, updateStatus
|
|
reactive/
|
|
workflow.ts # ReactiveRoot for workflow state
|
|
node.ts # Per-node signal management
|
|
analysis/
|
|
type-compat.ts # Output→Input schema compatibility checking
|
|
workflow.ts # Execution ordering, precondition validation
|
|
defaults.ts # Enum defaults
|
|
error/
|
|
index.ts # FlowgraphError, CycleError, TypeCompatError
|
|
index.ts # Public API
|
|
```
|
|
|
|
### Dependencies
|
|
|
|
```
|
|
@alkdev/typebox # Schema definitions
|
|
@alkdev/pubsub # Event transport (peer dep, optional)
|
|
@alkdev/operations # OperationSpec types for type-compat edges (peer dep)
|
|
@alkdev/ujsx # Template definition, reactive layer, transform
|
|
@preact/signals-core # Transitive via ujsx, reactive state
|
|
graphology # DAG structural analysis
|
|
graphology-dag # Cycle detection, topological sort
|
|
```
|
|
|
|
### Key Design Decision
|
|
|
|
`@alkdev/ujsx` should be a **direct dependency** of `@alkdev/flowgraph`. The workflow template IS the canonical representation, and the graphology DAG is one rendering target. This keeps things simple — one package, clear dependency chain.
|
|
|
|
The `from_ujsx` adapter for operations stays in the operations package as a separate export path (consistent with how it already handles `from-mcp` and `from-openapi`).
|
|
|
|
## Changes to UJSX
|
|
|
|
| File | Change |
|
|
|------|--------|
|
|
| None specific | Flowgraph is a consumer, not a modification of ujsx. |
|
|
|
|
However, this phase validates the reconciler design: if the flowgraph host configs cannot be built cleanly on top of the Phase 1-3 reconciler, the reconciler design needs adjustment. Build both hosts early as integration tests.
|
|
|
|
## Dependencies
|
|
|
|
- Phases 1-3 must be complete (the reactive host needs proper signal disposal, the graphology host needs `removeChild`)
|
|
- Phase 4 is optional but highly beneficial (graphology host benefits from `Value.Equal` bail-out on unchanged workflows)
|
|
- `graphology` and `graphology-dag` — new external dependencies for flowgraph (not ujsx)
|
|
|
|
## Open Questions
|
|
|
|
1. **Should flowgraph components be in `@alkdev/flowgraph` or `@alkdev/ujsx`?** The workflow components (`<Operation>`, `<Sequential>`, `<Parallel>`) could go in either package. Putting them in flowgraph keeps ujsx generic. Putting them in ujsx (under a separate export like `@alkdev/ujsx/workflow`) makes them reusable. Recommendation: keep them in flowgraph — they're domain-specific.
|
|
|
|
2. **How does the reactive host handle async operations?** `effect()` is synchronous. The `runner.execute()` call is async. Need to ensure the `effect` that starts execution doesn't re-run on every signal read inside the async chain.
|
|
|
|
3. **Should the graphology host support incremental updates?** If a workflow template changes (add/remove operations), the reconciler should update the graph in place rather than rebuilding it. This requires Phase 2 (key-based reconciliation) to be working.
|
|
|
|
## Test Cases
|
|
|
|
For graphology host:
|
|
1. Render simple sequential workflow → correct DAG structure
|
|
2. Render parallel branches → correct multi-child edges
|
|
3. Cycle detection on invalid workflow
|
|
4. Topological sort order matches declaration order
|
|
5. Update an operation's props → graphology node attributes update
|
|
6. Remove an operation → graphology node and edges removed
|
|
|
|
For reactive host:
|
|
1. Execute sequential workflow → operations run in order
|
|
2. Execute parallel branches → operations start concurrently
|
|
3. Operation failure → downstream operations aborted
|
|
4. Status propagation is automatic (no manual wiring)
|
|
5. Disposing a workflow stops all pending operations
|
|
6. Signal subscriptions are cleaned up on unmount |