Files
ujsx/docs/research/reconciler/05-flowgraph-host-configs.md

14 KiB

Phase 5: Flowgraph HostConfig

Status: Spec (Draft)

Problem

Flowgraph is a separate project (@alkdev/flowgraph) that uses @alkdev/ujsx as a dependency. It renders workflow templates (which are UNode trees) to different targets using HostConfig. This requires two new host implementations:

  1. Graphology DAG host — renders a workflow template to a graphology directed acyclic graph for structural analysis, cycle detection, topological sort
  2. Reactive execution host — renders a workflow template to an in-memory reactive execution engine for runtime workflow execution

Conceptual Mapping

Workflow concept UJSX concept
Workflow template URoot / UElement tree ({ type, props, children })
Operation step <Operation name="classify" ...>UElement
Sequential flow Parent → child relationship
Parallel branches Multiple children of a Parallel component
Status propagation signal/computed reactive layer
Template validation TransformRegistry rule pass
Template → DAG HostConfig rendering into graphology
Template → execution engine Different HostConfig for different runners

Host 1: Graphology DAG

Purpose

Convert a declarative workflow template into a graphology DAG for:

  • Structural analysis (connected components, reachability)
  • Cycle detection (prevent invalid workflows)
  • Topological sort (execution ordering)
  • Critical path analysis
  • Visualization

Type Parameters

HostConfig<WorkflowTag, GraphologyNode, GraphologyContext>

Where:

  • WorkflowTag = string literal union: "operation" | "sequential" | "parallel" | "conditional" | "workflow"
  • GraphologyNode = graphology node instance (with attributes)
  • GraphologyContext = { graph: Graph; nodeIdCounter: number }

Implementation

import Graph from "graphology";
import { HostConfig } from "@alkdev/ujsx/host";
import { hasCycle } from "graphology-dag";

type WorkflowTag = "operation" | "sequential" | "parallel" | "conditional" | "workflow";

interface GraphNodeAttrs {
  label: string;
  type: WorkflowTag;
  status: "pending" | "running" | "completed" | "failed" | "aborted";
  props: Record<string, unknown>;
}

interface GraphologyContext {
  graph: Graph;
  nodeIdCounter: number;
  parentStack: string[];  // stack of parent node IDs for edge creation
}

export function createGraphologyHost(): HostConfig<WorkflowTag, string, GraphologyContext> {
  return {
    name: "graphology-dag",

    createRootContext(container, options, context) {
      return {
        graph: new Graph({ type: "directed", multi: false }),
        nodeIdCounter: 0,
        parentStack: [],
      };
    },

    finalizeRoot(ctx) {
      if (hasCycle(ctx.graph)) {
        throw new CycleError("Workflow contains a cycle");
      }
    },

    createInstance(tag, props, ctx, parent) {
      const id = `node_${ctx.nodeIdCounter++}`;
      ctx.graph.addNode(id, {
        label: String(props.name ?? id),
        type: tag,
        status: "pending",
        props,
      });
      return id;
    },

    createTextInstance(text, ctx, parent) {
      // Workflows don't have text nodes; skip
      return `text_${ctx.nodeIdCounter++}`;
    },

    appendChild(parent, child, ctx) {
      ctx.graph.addEdge(parent, child, { type: "dependency" });
    },

    insertBefore(parent, child, before, ctx) {
      ctx.graph.addEdge(parent, child, { type: "dependency" });
      // graphology doesn't have order, but we can add an ordinal attribute
    },

    removeChild(parent, child, ctx) {
      ctx.graph.dropEdge(parent, child);
      ctx.graph.dropNode(child);
    },

    prepareUpdate(instance, tag, prevProps, nextProps, ctx) {
      const attrs = ctx.graph.getNodeAttributes(instance);
      const changed = {};
      for (const key of Object.keys(nextProps)) {
        if (nextProps[key] !== prevProps[key]) {
          changed[key] = nextProps[key];
        }
      }
      return Object.keys(changed).length > 0 ? changed : null;
    },

    commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
      ctx.graph.updateNodeAttributes(instance, (attrs) => ({
        ...attrs,
        props: { ...attrs.props, ...payload },
      }));
    },
  };
}

What the DAG Host Enables

import { h, createRoot } from "@alkdev/ujsx";
import { createGraphologyHost } from "@alkdev/flowgraph/host/graphology";

const workflow = h("workflow", { name: "document-pipeline" },
  h("sequential", { name: "ingest" },
    h("operation", { name: "fetch", type: "http-request", url: "https://..." }),
    h("operation", { name: "parse", type: "json-parse" }),
  ),
  h("parallel", { name: "analyze" },
    h("operation", { name: "classify", type: "llm-classify" }),
    h("operation", { name: "extract", type: "llm-extract" }),
  ),
  h("conditional", { name: "route", condition: "classify.result" },
    h("operation", { name: "escalate", type: "notify" }),
    h("operation", { name: "archive", type: "store" }),
  ),
);

const host = createGraphologyHost();
const root = createRoot(host, {});
root.render(workflow);

// ctx.graph is now a graphology DAG
const { graph } = root.ctx;
console.log(graph.order);          // number of nodes
console.log(topologicalSort(graph)); // execution order

Host 2: Reactive Execution Engine

Purpose

Render a workflow template to a reactive execution engine where:

  • Each operation node has a signal for its status
  • Downstream operations use computed signals that check upstream completion
  • Status propagation is automatic: when operation_A.status becomes completed, dependent operations automatically start (if preconditions are met)

Type Parameters

HostConfig<WorkflowTag, OperationInstance, ExecutionContext>

Where:

  • OperationInstance = { id: string; status: Signal<OpStatus>; result: Signal<unknown>; definition: Record<string, unknown> }
  • ExecutionContext = { operations: Map<string, OperationInstance>; runner: OperationRunner }

Implementation Sketch

import { signal, computed, batch } from "@preact/signals-core";

type OpStatus = "pending" | "running" | "completed" | "failed" | "aborted";

interface OperationInstance {
  id: string;
  status: Signal<OpStatus>;
  result: Signal<unknown>;
  definition: Record<string, unknown>;
}

interface ExecutionContext {
  operations: Map<string, OperationInstance>;
  parentStack: string[];
  runner: OperationRunner;
}

export function createReactiveHost(runner: OperationRunner): HostConfig<WorkflowTag, OperationInstance, ExecutionContext> {
  return {
    name: "reactive-execution",

    createRootContext(container, options) {
      return {
        operations: new Map(),
        parentStack: [],
        runner,
      };
    },

    createInstance(tag, props, ctx) {
      const instance: OperationInstance = {
        id: String(props.name ?? `op_${ctx.operations.size}`),
        status: signal<OpStatus>("pending"),
        result: signal<unknown>(null),
        definition: props,
      };
      ctx.operations.set(instance.id, instance);
      return instance;
    },

    appendChild(parent, child, ctx) {
      // Establish dependency: child depends on parent
      // When parent completes, child's preconditions are re-evaluated
      const parentInstance = typeof parent === "string"
        ? ctx.operations.get(parent)!
        : parent as OperationInstance;
      const childInstance = child as OperationInstance;

      // Create a computed that watches parent status
      const preconditionsMet = computed(() => {
        // For sequential: parent must be completed
        if (parentInstance.status.value === "completed") return true;
        return false;
      });

      // Effect: when preconditions are met, start the child operation
      effect(() => {
        if (preconditionsMet.value && childInstance.status.value === "pending") {
          batch(() => {
            childInstance.status.value = "running";
            ctx.runner.execute(childInstance).then(
              (result) => batch(() => {
                childInstance.result.value = result;
                childInstance.status.value = "completed";
              }),
              (error) => batch(() => {
                childInstance.result.value = error;
                childInstance.status.value = "failed";
              }),
            );
          });
        }
      });
    },

    commitUpdate(instance, payload, tag, prevProps, nextProps, ctx) {
      // Update the operation's definition
      instance.definition = { ...instance.definition, ...payload };
    },

    // ... remaining methods
  };
}

Abort Cascade

The reactive host naturally supports abort cascading. When a parent operation fails, downstream operations can be automatically aborted:

// In appendChild, also watch for parent failure:
const abortTrigger = computed(() => {
  return parentInstance.status.value === "failed" || parentInstance.status.value === "aborted";
});

effect(() => {
  if (abortTrigger.value && childInstance.status.value === "pending") {
    batch(() => {
      childInstance.status.value = "aborted";
    });
  }
});

This is the key advantage of the signal-based approach: no manual event wiring for status propagation. The reactive graph handles it automatically.

Integration with Flowgraph

Flowgraph Package Structure

@alkdev/flowgraph/
  src/
    component/           # ujsx components for workflow definition
      operation.ts       # <Operation name="..." /> component
      sequential.ts       # <Sequential> — sequential execution
      parallel.ts         # <Parallel> — concurrent execution
      conditional.ts      # <Conditional> — branching
      index.ts
    host/
      graphology.ts       # HostConfig: ujsx template → graphology DAG
      reactive.ts         # HostConfig: ujsx template → reactive execution engine
    schema/
      enums.ts            # CallStatus, NodeStatus, EdgeType
      node.ts             # WorkflowNode schema (OperationNode, CallNode)
      edge.ts             # WorkflowEdge schema
      graph.ts            # SerializedGraph factory + flowgraph schemas
    graph/
      construction.ts     # FlowGraph class (wraps graphology)
      validation.ts       # Cycle detection, type-compat checking
      queries.ts          # topological sort, ancestors/descendants, critical path
      mutation.ts         # addNode, addEdge, updateStatus
    reactive/
      workflow.ts         # ReactiveRoot for workflow state
      node.ts             # Per-node signal management
    analysis/
      type-compat.ts      # Output→Input schema compatibility checking
      workflow.ts         # Execution ordering, precondition validation
      defaults.ts         # Enum defaults
    error/
      index.ts            # FlowgraphError, CycleError, TypeCompatError
    index.ts              # Public API

Dependencies

@alkdev/typebox        # Schema definitions
@alkdev/pubsub         # Event transport (peer dep, optional)
@alkdev/operations     # OperationSpec types for type-compat edges (peer dep)
@alkdev/ujsx           # Template definition, reactive layer, transform
@preact/signals-core   # Transitive via ujsx, reactive state
graphology             # DAG structural analysis
graphology-dag         # Cycle detection, topological sort

Key Design Decision

@alkdev/ujsx should be a direct dependency of @alkdev/flowgraph. The workflow template IS the canonical representation, and the graphology DAG is one rendering target. This keeps things simple — one package, clear dependency chain.

The from_ujsx adapter for operations stays in the operations package as a separate export path (consistent with how it already handles from-mcp and from-openapi).

Changes to UJSX

File Change
None specific Flowgraph is a consumer, not a modification of ujsx.

However, this phase validates the reconciler design: if the flowgraph host configs cannot be built cleanly on top of the Phase 1-3 reconciler, the reconciler design needs adjustment. Build both hosts early as integration tests.

Dependencies

  • Phases 1-3 must be complete (the reactive host needs proper signal disposal, the graphology host needs removeChild)
  • Phase 4 is optional but highly beneficial (graphology host benefits from Value.Equal bail-out on unchanged workflows)
  • graphology and graphology-dag — new external dependencies for flowgraph (not ujsx)

Open Questions

  1. Should flowgraph components be in @alkdev/flowgraph or @alkdev/ujsx? The workflow components (<Operation>, <Sequential>, <Parallel>) could go in either package. Putting them in flowgraph keeps ujsx generic. Putting them in ujsx (under a separate export like @alkdev/ujsx/workflow) makes them reusable. Recommendation: keep them in flowgraph — they're domain-specific.

  2. How does the reactive host handle async operations? effect() is synchronous. The runner.execute() call is async. Need to ensure the effect that starts execution doesn't re-run on every signal read inside the async chain.

  3. Should the graphology host support incremental updates? If a workflow template changes (add/remove operations), the reconciler should update the graph in place rather than rebuilding it. This requires Phase 2 (key-based reconciliation) to be working.

Test Cases

For graphology host:

  1. Render simple sequential workflow → correct DAG structure
  2. Render parallel branches → correct multi-child edges
  3. Cycle detection on invalid workflow
  4. Topological sort order matches declaration order
  5. Update an operation's props → graphology node attributes update
  6. Remove an operation → graphology node and edges removed

For reactive host:

  1. Execute sequential workflow → operations run in order
  2. Execute parallel branches → operations start concurrently
  3. Operation failure → downstream operations aborted
  4. Status propagation is automatic (no manual wiring)
  5. Disposing a workflow stops all pending operations
  6. Signal subscriptions are cleaned up on unmount