14 KiB
Phase 5: Flowgraph HostConfig
Status: Spec (Draft)
Problem
Flowgraph is a separate project (@alkdev/flowgraph) that uses @alkdev/ujsx as a dependency. It renders workflow templates (which are UNode trees) to different targets using HostConfig. This requires two new host implementations:
- Graphology DAG host — renders a workflow template to a graphology directed acyclic graph for structural analysis, cycle detection, topological sort
- Reactive execution host — renders a workflow template to an in-memory reactive execution engine for runtime workflow execution
Conceptual Mapping
| Workflow concept | UJSX concept |
|---|---|
| Workflow template | URoot / UElement tree ({ type, props, children }) |
| Operation step | <Operation name="classify" ...> → UElement |
| Sequential flow | Parent → child relationship |
| Parallel branches | Multiple children of a Parallel component |
| Status propagation | signal/computed reactive layer |
| Template validation | TransformRegistry rule pass |
| Template → DAG | HostConfig rendering into graphology |
| Template → execution engine | Different HostConfig for different runners |
Host 1: Graphology DAG
Purpose
Convert a declarative workflow template into a graphology DAG for:
- Structural analysis (connected components, reachability)
- Cycle detection (prevent invalid workflows)
- Topological sort (execution ordering)
- Critical path analysis
- Visualization
Type Parameters
HostConfig<WorkflowTag, GraphologyNode, GraphologyContext>
Where:
WorkflowTag= string literal union:"operation" | "sequential" | "parallel" | "conditional" | "workflow"GraphologyNode= graphology node instance (with attributes)GraphologyContext={ graph: Graph; nodeIdCounter: number }
Implementation
import Graph from "graphology";
import { HostConfig } from "@alkdev/ujsx/host";
import { hasCycle } from "graphology-dag";
type WorkflowTag = "operation" | "sequential" | "parallel" | "conditional" | "workflow";
interface GraphNodeAttrs {
label: string;
type: WorkflowTag;
status: "pending" | "running" | "completed" | "failed" | "aborted";
props: Record<string, unknown>;
}
interface GraphologyContext {
graph: Graph;
nodeIdCounter: number;
parentStack: string[]; // stack of parent node IDs for edge creation
}
export function createGraphologyHost(): HostConfig<WorkflowTag, string, GraphologyContext> {
return {
name: "graphology-dag",
createRootContext(container, options, context) {
return {
graph: new Graph({ type: "directed", multi: false }),
nodeIdCounter: 0,
parentStack: [],
};
},
finalizeRoot(ctx) {
if (hasCycle(ctx.graph)) {
throw new CycleError("Workflow contains a cycle");
}
},
createInstance(tag, props, ctx, parent) {
const id = `node_${ctx.nodeIdCounter++}`;
ctx.graph.addNode(id, {
label: String(props.name ?? id),
type: tag,
status: "pending",
props,
});
return id;
},
createTextInstance(text, ctx, parent) {
// Workflows don't have text nodes; skip
return `text_${ctx.nodeIdCounter++}`;
},
appendChild(parent, child, ctx) {
ctx.graph.addEdge(parent, child, { type: "dependency" });
},
insertBefore(parent, child, before, ctx) {
ctx.graph.addEdge(parent, child, { type: "dependency" });
// graphology doesn't have order, but we can add an ordinal attribute
},
removeChild(parent, child, ctx) {
ctx.graph.dropEdge(parent, child);
ctx.graph.dropNode(child);
},
prepareUpdate(instance, tag, prevProps, nextProps, ctx) {
const attrs = ctx.graph.getNodeAttributes(instance);
const changed = {};
for (const key of Object.keys(nextProps)) {
if (nextProps[key] !== prevProps[key]) {
changed[key] = nextProps[key];
}
}
return Object.keys(changed).length > 0 ? changed : null;
},
commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
ctx.graph.updateNodeAttributes(instance, (attrs) => ({
...attrs,
props: { ...attrs.props, ...payload },
}));
},
};
}
What the DAG Host Enables
import { h, createRoot } from "@alkdev/ujsx";
import { createGraphologyHost } from "@alkdev/flowgraph/host/graphology";
const workflow = h("workflow", { name: "document-pipeline" },
h("sequential", { name: "ingest" },
h("operation", { name: "fetch", type: "http-request", url: "https://..." }),
h("operation", { name: "parse", type: "json-parse" }),
),
h("parallel", { name: "analyze" },
h("operation", { name: "classify", type: "llm-classify" }),
h("operation", { name: "extract", type: "llm-extract" }),
),
h("conditional", { name: "route", condition: "classify.result" },
h("operation", { name: "escalate", type: "notify" }),
h("operation", { name: "archive", type: "store" }),
),
);
const host = createGraphologyHost();
const root = createRoot(host, {});
root.render(workflow);
// ctx.graph is now a graphology DAG
const { graph } = root.ctx;
console.log(graph.order); // number of nodes
console.log(topologicalSort(graph)); // execution order
Host 2: Reactive Execution Engine
Purpose
Render a workflow template to a reactive execution engine where:
- Each operation node has a
signalfor its status - Downstream operations use
computedsignals that check upstream completion - Status propagation is automatic: when
operation_A.statusbecomescompleted, dependent operations automatically start (if preconditions are met)
Type Parameters
HostConfig<WorkflowTag, OperationInstance, ExecutionContext>
Where:
OperationInstance={ id: string; status: Signal<OpStatus>; result: Signal<unknown>; definition: Record<string, unknown> }ExecutionContext={ operations: Map<string, OperationInstance>; runner: OperationRunner }
Implementation Sketch
import { signal, computed, batch } from "@preact/signals-core";
type OpStatus = "pending" | "running" | "completed" | "failed" | "aborted";
interface OperationInstance {
id: string;
status: Signal<OpStatus>;
result: Signal<unknown>;
definition: Record<string, unknown>;
}
interface ExecutionContext {
operations: Map<string, OperationInstance>;
parentStack: string[];
runner: OperationRunner;
}
export function createReactiveHost(runner: OperationRunner): HostConfig<WorkflowTag, OperationInstance, ExecutionContext> {
return {
name: "reactive-execution",
createRootContext(container, options) {
return {
operations: new Map(),
parentStack: [],
runner,
};
},
createInstance(tag, props, ctx) {
const instance: OperationInstance = {
id: String(props.name ?? `op_${ctx.operations.size}`),
status: signal<OpStatus>("pending"),
result: signal<unknown>(null),
definition: props,
};
ctx.operations.set(instance.id, instance);
return instance;
},
appendChild(parent, child, ctx) {
// Establish dependency: child depends on parent
// When parent completes, child's preconditions are re-evaluated
const parentInstance = typeof parent === "string"
? ctx.operations.get(parent)!
: parent as OperationInstance;
const childInstance = child as OperationInstance;
// Create a computed that watches parent status
const preconditionsMet = computed(() => {
// For sequential: parent must be completed
if (parentInstance.status.value === "completed") return true;
return false;
});
// Effect: when preconditions are met, start the child operation
effect(() => {
if (preconditionsMet.value && childInstance.status.value === "pending") {
batch(() => {
childInstance.status.value = "running";
ctx.runner.execute(childInstance).then(
(result) => batch(() => {
childInstance.result.value = result;
childInstance.status.value = "completed";
}),
(error) => batch(() => {
childInstance.result.value = error;
childInstance.status.value = "failed";
}),
);
});
}
});
},
commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
// Update the operation's definition
instance.definition = { ...instance.definition, ...payload };
},
// ... remaining methods
};
}
Abort Cascade
The reactive host naturally supports abort cascading. When a parent operation fails, downstream operations can be automatically aborted:
// In appendChild, also watch for parent failure:
const abortTrigger = computed(() => {
return parentInstance.status.value === "failed" || parentInstance.status.value === "aborted";
});
effect(() => {
if (abortTrigger.value && childInstance.status.value === "pending") {
batch(() => {
childInstance.status.value = "aborted";
});
}
});
This is the key advantage of the signal-based approach: no manual event wiring for status propagation. The reactive graph handles it automatically.
Integration with Flowgraph
Flowgraph Package Structure
@alkdev/flowgraph/
src/
component/ # ujsx components for workflow definition
operation.ts # <Operation name="..." /> component
sequential.ts # <Sequential> — sequential execution
parallel.ts # <Parallel> — concurrent execution
conditional.ts # <Conditional> — branching
index.ts
host/
graphology.ts # HostConfig: ujsx template → graphology DAG
reactive.ts # HostConfig: ujsx template → reactive execution engine
schema/
enums.ts # CallStatus, NodeStatus, EdgeType
node.ts # WorkflowNode schema (OperationNode, CallNode)
edge.ts # WorkflowEdge schema
graph.ts # SerializedGraph factory + flowgraph schemas
graph/
construction.ts # FlowGraph class (wraps graphology)
validation.ts # Cycle detection, type-compat checking
queries.ts # topological sort, ancestors/descendants, critical path
mutation.ts # addNode, addEdge, updateStatus
reactive/
workflow.ts # ReactiveRoot for workflow state
node.ts # Per-node signal management
analysis/
type-compat.ts # Output→Input schema compatibility checking
workflow.ts # Execution ordering, precondition validation
defaults.ts # Enum defaults
error/
index.ts # FlowgraphError, CycleError, TypeCompatError
index.ts # Public API
Dependencies
@alkdev/typebox # Schema definitions
@alkdev/pubsub # Event transport (peer dep, optional)
@alkdev/operations # OperationSpec types for type-compat edges (peer dep)
@alkdev/ujsx # Template definition, reactive layer, transform
@preact/signals-core # Transitive via ujsx, reactive state
graphology # DAG structural analysis
graphology-dag # Cycle detection, topological sort
Key Design Decision
@alkdev/ujsx should be a direct dependency of @alkdev/flowgraph. The workflow template IS the canonical representation, and the graphology DAG is one rendering target. This keeps things simple — one package, clear dependency chain.
The from_ujsx adapter for operations stays in the operations package as a separate export path (consistent with how it already handles from-mcp and from-openapi).
Changes to UJSX
| File | Change |
|---|---|
| None specific | Flowgraph is a consumer, not a modification of ujsx. |
However, this phase validates the reconciler design: if the flowgraph host configs cannot be built cleanly on top of the Phase 1-3 reconciler, the reconciler design needs adjustment. Build both hosts early as integration tests.
Dependencies
- Phases 1-3 must be complete (the reactive host needs proper signal disposal, the graphology host needs
removeChild) - Phase 4 is optional but highly beneficial (graphology host benefits from
Value.Equalbail-out on unchanged workflows) graphologyandgraphology-dag— new external dependencies for flowgraph (not ujsx)
Open Questions
-
Should flowgraph components be in
@alkdev/flowgraphor@alkdev/ujsx? The workflow components (<Operation>,<Sequential>,<Parallel>) could go in either package. Putting them in flowgraph keeps ujsx generic. Putting them in ujsx (under a separate export like@alkdev/ujsx/workflow) makes them reusable. Recommendation: keep them in flowgraph — they're domain-specific. -
How does the reactive host handle async operations?
effect()is synchronous. Therunner.execute()call is async. Need to ensure theeffectthat starts execution doesn't re-run on every signal read inside the async chain. -
Should the graphology host support incremental updates? If a workflow template changes (add/remove operations), the reconciler should update the graph in place rather than rebuilding it. This requires Phase 2 (key-based reconciliation) to be working.
Test Cases
For graphology host:
- Render simple sequential workflow → correct DAG structure
- Render parallel branches → correct multi-child edges
- Cycle detection on invalid workflow
- Topological sort order matches declaration order
- Update an operation's props → graphology node attributes update
- Remove an operation → graphology node and edges removed
For reactive host:
- Execute sequential workflow → operations run in order
- Execute parallel branches → operations start concurrently
- Operation failure → downstream operations aborted
- Status propagation is automatic (no manual wiring)
- Disposing a workflow stops all pending operations
- Signal subscriptions are cleaned up on unmount