diff --git a/src/reactive/index.ts b/src/reactive/index.ts index 8cec2e9..efd2937 100644 --- a/src/reactive/index.ts +++ b/src/reactive/index.ts @@ -1 +1,12 @@ -export {}; \ No newline at end of file +export { + WorkflowReactiveRoot, + type FailurePolicy, + type CallEventMapValue, + type CallRequestedEvent, + type CallRespondedEvent, + type CallErrorEvent, + type CallAbortedEvent, + type CallCompletedEvent, + type EventLogProjection, + type AggregateStatus, +} from "./workflow.js"; diff --git a/src/reactive/workflow.ts b/src/reactive/workflow.ts index 8cec2e9..dfd0ad3 100644 --- a/src/reactive/workflow.ts +++ b/src/reactive/workflow.ts @@ -1 +1,337 @@ -export {}; \ No newline at end of file +import { signal, computed, effect } from "@preact/signals-core"; +import type { Signal, ReadonlySignal } from "@preact/signals-core"; +import type { DirectedGraph } from "graphology"; +import type { NodeStatus } from "../schema/enums.js"; +import type { CallResult } from "../schema/edge.js"; + +export type FailurePolicy = "continue-running" | "abort-dependents"; + +export interface CallRequestedEvent { + type: "call.requested"; + requestId: string; + operationId: string; + input: unknown; + timestamp: string; +} + +export interface CallRespondedEvent { + type: "call.responded"; + requestId: string; + output: unknown; + timestamp: string; +} + +export interface CallErrorEvent { + type: "call.error"; + requestId: string; + error: { code: string; message: string; details?: unknown }; + timestamp: string; +} + +export interface CallAbortedEvent { + type: "call.aborted"; + requestId: string; + timestamp: string; +} + +export interface CallCompletedEvent { + type: "call.completed"; + requestId: string; + output: unknown; + timestamp: string; +} + +export type CallEventMapValue = + | CallRequestedEvent + | CallRespondedEvent + | CallErrorEvent + | CallAbortedEvent + | CallCompletedEvent; + +export interface EventLogProjection { + append(event: CallEventMapValue): void; + getStatus(nodeId: string): NodeStatus; + getResult(nodeId: string): CallResult | undefined; + getEvents(nodeId: string): CallEventMapValue[]; +} + +export interface AggregateStatus { + completed: number; + failed: number; + aborted: number; + skipped: number; + running: number; + waiting: number; + ready: number; + idle: number; + total: number; +} + +const TERMINAL_STATUSES: Set = new Set([ + "completed", + "failed", + "aborted", + "skipped", +]); + +const EVENT_TO_STATUS: Record = { + "call.requested": "running", + "call.responded": "completed", + "call.error": "failed", + "call.aborted": "aborted", + "call.completed": "completed", +}; + +export class WorkflowReactiveRoot implements EventLogProjection { + statusMap: Map>; + preconditions: Map>; + blockedByFailure: Map>; + resultMap: Map>; + nodeKeyToRequestId: Map; + + private graph: DirectedGraph; + private effectDisposers: (() => void)[]; + private eventLog: CallEventMapValue[]; + private _failurePolicy: FailurePolicy; + + constructor( + graph: DirectedGraph, + options?: { failurePolicy?: FailurePolicy }, + ) { + this.graph = graph; + this.statusMap = new Map(); + this.preconditions = new Map(); + this.blockedByFailure = new Map(); + this.resultMap = new Map(); + this.effectDisposers = []; + this.eventLog = []; + this.nodeKeyToRequestId = new Map(); + this._failurePolicy = options?.failurePolicy ?? "continue-running"; + this.initializeSignals(); + } + + private initializeSignals(): void { + for (const node of this.graph.nodes()) { + const predecessors: string[] = this.graph.inNeighbors(node) ?? []; + + const status = signal("idle"); + + const preconditionsComputed = computed(() => { + return predecessors.every((pred: string) => { + const predStatus = this.statusMap.get(pred); + if (!predStatus) return false; + return ( + predStatus.value === "completed" || predStatus.value === "skipped" + ); + }); + }); + + const blockedByFailureComputed = computed(() => { + return predecessors.some((pred: string) => { + const predStatus = this.statusMap.get(pred); + if (!predStatus) return false; + return ( + predStatus.value === "failed" || predStatus.value === "aborted" + ); + }); + }); + + const resultComputed = computed(() => { + const requestId = this.nodeKeyToRequestId.get(node); + if (!requestId) return undefined; + + let latestTerminalEvent: CallEventMapValue | undefined; + for (let i = this.eventLog.length - 1; i >= 0; i--) { + const e = this.eventLog[i]!; + if ("requestId" in e && e.requestId === requestId) { + if ( + e.type === "call.responded" || + e.type === "call.error" || + e.type === "call.aborted" + ) { + latestTerminalEvent = e; + break; + } + } + } + + if (!latestTerminalEvent) return undefined; + + if (latestTerminalEvent.type === "call.error") { + return { + status: "failed" as NodeStatus, + output: undefined, + error: latestTerminalEvent.error, + } as CallResult; + } + if (latestTerminalEvent.type === "call.responded") { + return { + status: "completed" as NodeStatus, + output: latestTerminalEvent.output, + } as CallResult; + } + if (latestTerminalEvent.type === "call.aborted") { + return { + status: "aborted" as NodeStatus, + output: undefined, + } as CallResult; + } + + return undefined; + }); + + this.statusMap.set(node, status); + this.preconditions.set(node, preconditionsComputed); + this.blockedByFailure.set(node, blockedByFailureComputed); + this.resultMap.set(node, resultComputed); + } + + for (const node of this.graph.nodes()) { + const status = this.statusMap.get(node)!; + const blocked = this.blockedByFailure.get(node)!; + + const disposer = effect(() => { + if (blocked.value) { + const current = status.value; + if (current === "idle" || current === "waiting" || current === "ready") { + if (this._failurePolicy === "abort-dependents") { + if (!TERMINAL_STATUSES.has(current)) { + status.value = "aborted"; + } + } else { + status.value = "aborted"; + } + } + } + }); + this.effectDisposers.push(disposer); + } + } + + append(event: CallEventMapValue): void { + this.eventLog.push(event); + + if (!("requestId" in event)) return; + + const nodeId = this.findNodeByRequestId(event.requestId); + if (nodeId === undefined) return; + + const statusSignal = this.statusMap.get(nodeId); + if (!statusSignal) return; + + const derived = EVENT_TO_STATUS[event.type]; + if (derived !== undefined) { + statusSignal.value = derived; + } + } + + getStatus(nodeId: string): NodeStatus { + const statusSignal = this.statusMap.get(nodeId); + if (!statusSignal) return "idle"; + + const requestId = this.nodeKeyToRequestId.get(nodeId); + if (requestId) { + let lastEventType: string | undefined; + for (let i = this.eventLog.length - 1; i >= 0; i--) { + const e = this.eventLog[i]!; + if ("requestId" in e && e.requestId === requestId) { + lastEventType = e.type; + break; + } + } + if (lastEventType && EVENT_TO_STATUS[lastEventType] !== undefined) { + return EVENT_TO_STATUS[lastEventType]!; + } + } + + return statusSignal.value; + } + + getResult(nodeId: string): CallResult | undefined { + const resultComputed = this.resultMap.get(nodeId); + if (!resultComputed) return undefined; + return resultComputed.value; + } + + getEvents(nodeId: string): CallEventMapValue[] { + const requestId = this.nodeKeyToRequestId.get(nodeId); + if (!requestId) return []; + + const events: CallEventMapValue[] = []; + for (const e of this.eventLog) { + if ("requestId" in e && e.requestId === requestId) { + events.push(e); + } + } + return events; + } + + abortAll(): void { + for (const [_nodeId, status] of this.statusMap) { + if (!TERMINAL_STATUSES.has(status.value)) { + status.value = "aborted"; + } + } + } + + abortNode(nodeId: string): void { + const status = this.statusMap.get(nodeId); + if (!status) return; + if (!TERMINAL_STATUSES.has(status.value)) { + status.value = "aborted"; + } + } + + isComplete(): boolean { + for (const [_nodeId, status] of this.statusMap) { + if (!TERMINAL_STATUSES.has(status.value)) { + return false; + } + } + return true; + } + + getAggregateStatus(): AggregateStatus { + const counts: Record = { + completed: 0, + failed: 0, + aborted: 0, + skipped: 0, + running: 0, + waiting: 0, + ready: 0, + idle: 0, + total: 0, + }; + + for (const [_nodeId, status] of this.statusMap) { + const s = status.value; + counts["total"]!++; + if (s in counts) { + counts[s]!++; + } + } + + return counts as unknown as AggregateStatus; + } + + dispose(): void { + for (const disposer of this.effectDisposers) { + disposer(); + } + this.effectDisposers = []; + this.statusMap.clear(); + this.preconditions.clear(); + this.blockedByFailure.clear(); + this.resultMap.clear(); + this.nodeKeyToRequestId.clear(); + this.eventLog = []; + } + + private findNodeByRequestId(requestId: string): string | undefined { + for (const [nodeId, rid] of this.nodeKeyToRequestId) { + if (rid === requestId) return nodeId; + } + return undefined; + } +} diff --git a/test/reactive/workflow.test.ts b/test/reactive/workflow.test.ts new file mode 100644 index 0000000..1975831 --- /dev/null +++ b/test/reactive/workflow.test.ts @@ -0,0 +1,910 @@ +import { describe, it, expect } from "vitest"; +import { DirectedGraph } from "graphology"; +import { WorkflowReactiveRoot } from "../../src/reactive/workflow.js"; +import type { + CallEventMapValue, + FailurePolicy, +} from "../../src/reactive/workflow.js"; + +function makeSimpleGraph(): DirectedGraph { + const graph = new DirectedGraph(); + graph.addNode("a", { name: "a" }); + graph.addNode("b", { name: "b" }); + graph.addNode("c", { name: "c" }); + graph.addEdgeWithKey("a->b", "a", "b", { edgeType: "sequential" }); + graph.addEdgeWithKey("b->c", "b", "c", { edgeType: "sequential" }); + return graph; +} + +function makeDiamondGraph(): DirectedGraph { + const graph = new DirectedGraph(); + graph.addNode("top", { name: "top" }); + graph.addNode("left", { name: "left" }); + graph.addNode("right", { name: "right" }); + graph.addNode("bottom", { name: "bottom" }); + graph.addEdgeWithKey("top->left", "top", "left", { edgeType: "sequential" }); + graph.addEdgeWithKey("top->right", "top", "right", { edgeType: "sequential" }); + graph.addEdgeWithKey("left->bottom", "left", "bottom", { + edgeType: "sequential", + }); + graph.addEdgeWithKey("right->bottom", "right", "bottom", { + edgeType: "sequential", + }); + return graph; +} + +function makeForkJoinGraph(): DirectedGraph { + const graph = new DirectedGraph(); + graph.addNode("source", { name: "source" }); + graph.addNode("fail-node", { name: "fail-node" }); + graph.addNode("run-node", { name: "run-node" }); + graph.addNode("sink", { name: "sink" }); + graph.addEdgeWithKey("source->fail-node", "source", "fail-node", { edgeType: "sequential" }); + graph.addEdgeWithKey("source->run-node", "source", "run-node", { edgeType: "sequential" }); + graph.addEdgeWithKey("fail-node->sink", "fail-node", "sink", { edgeType: "sequential" }); + graph.addEdgeWithKey("run-node->sink", "run-node", "sink", { edgeType: "sequential" }); + return graph; +} + +describe("WorkflowReactiveRoot", () => { + describe("constructor and initializeSignals", () => { + it("initializes all nodes with idle status", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.statusMap.get("a")!.value).toBe("idle"); + expect(root.statusMap.get("b")!.value).toBe("idle"); + expect(root.statusMap.get("c")!.value).toBe("idle"); + + root.dispose(); + }); + + it("creates precondition computeds for all nodes", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.preconditions.has("a")).toBe(true); + expect(root.preconditions.has("b")).toBe(true); + expect(root.preconditions.has("c")).toBe(true); + + root.dispose(); + }); + + it("root node has met preconditions (no predecessors)", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.preconditions.get("a")!.value).toBe(true); + + root.dispose(); + }); + + it("downstream nodes have unmet preconditions initially", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.preconditions.get("b")!.value).toBe(false); + expect(root.preconditions.get("c")!.value).toBe(false); + + root.dispose(); + }); + + it("creates blockedByFailure computeds for all nodes", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.blockedByFailure.has("a")).toBe(true); + expect(root.blockedByFailure.has("b")).toBe(true); + expect(root.blockedByFailure.has("c")).toBe(true); + + root.dispose(); + }); + + it("no nodes are blocked by failure initially", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.blockedByFailure.get("a")!.value).toBe(false); + expect(root.blockedByFailure.get("b")!.value).toBe(false); + expect(root.blockedByFailure.get("c")!.value).toBe(false); + + root.dispose(); + }); + + it("creates resultMap entries for all nodes", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.resultMap.has("a")).toBe(true); + expect(root.resultMap.has("b")).toBe(true); + expect(root.resultMap.has("c")).toBe(true); + + root.dispose(); + }); + + it("defaults failurePolicy to continue-running", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + const policy = (root as unknown as { _failurePolicy: string })._failurePolicy; + + expect(policy).toBe("continue-running"); + + root.dispose(); + }); + + it("accepts custom failurePolicy", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph, { + failurePolicy: "abort-dependents", + }); + const policy = (root as unknown as { _failurePolicy: string })._failurePolicy; + + expect(policy).toBe("abort-dependents"); + + root.dispose(); + }); + + it("initializes nodeKeyToRequestId as empty map", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.nodeKeyToRequestId.size).toBe(0); + + root.dispose(); + }); + }); + + describe("append and status updates", () => { + it("updates status to running on call.requested", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + + expect(root.statusMap.get("a")!.value).toBe("running"); + + root.dispose(); + }); + + it("updates status to completed on call.responded", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + root.append({ + type: "call.responded", + requestId: "req-1", + output: { result: 42 }, + timestamp: "2026-01-01T00:00:01Z", + }); + + expect(root.statusMap.get("a")!.value).toBe("completed"); + + root.dispose(); + }); + + it("updates status to failed on call.error", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + root.append({ + type: "call.error", + requestId: "req-1", + error: { code: "ERR", message: "something failed" }, + timestamp: "2026-01-01T00:00:01Z", + }); + + expect(root.statusMap.get("a")!.value).toBe("failed"); + + root.dispose(); + }); + + it("updates status to aborted on call.aborted", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + root.append({ + type: "call.aborted", + requestId: "req-1", + timestamp: "2026-01-01T00:00:01Z", + }); + + expect(root.statusMap.get("a")!.value).toBe("aborted"); + + root.dispose(); + }); + + it("ignores events for unknown requestIds", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.append({ + type: "call.requested", + requestId: "unknown-req", + operationId: "x", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + + expect(root.statusMap.get("a")!.value).toBe("idle"); + + root.dispose(); + }); + + it("is idempotent - appending same event twice produces same state", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + + const respondedEvent: CallEventMapValue = { + type: "call.responded", + requestId: "req-1", + output: { result: 42 }, + timestamp: "2026-01-01T00:00:01Z", + }; + + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + root.append(respondedEvent); + root.append(respondedEvent); + + expect(root.statusMap.get("a")!.value).toBe("completed"); + + root.dispose(); + }); + }); + + describe("getStatus", () => { + it("returns idle for nodes with no events", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.getStatus("a")).toBe("idle"); + + root.dispose(); + }); + + it("returns event-derived status when requestId is mapped", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + + expect(root.getStatus("a")).toBe("running"); + + root.dispose(); + }); + + it("falls back to signal value when no requestId is mapped", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "waiting"; + + expect(root.getStatus("a")).toBe("waiting"); + + root.dispose(); + }); + + it("returns idle for unknown node", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.getStatus("nonexistent")).toBe("idle"); + + root.dispose(); + }); + }); + + describe("getResult", () => { + it("returns undefined when no requestId is mapped", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.getResult("a")).toBeUndefined(); + + root.dispose(); + }); + + it("returns completed result from call.responded event", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + root.append({ + type: "call.responded", + requestId: "req-1", + output: { value: 99 }, + timestamp: "2026-01-01T00:00:01Z", + }); + + const result = root.getResult("a"); + expect(result).toBeDefined(); + expect(result!.status).toBe("completed"); + expect(result!.output).toEqual({ value: 99 }); + + root.dispose(); + }); + + it("returns failed result from call.error event", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + root.append({ + type: "call.error", + requestId: "req-1", + error: { code: "TIMEOUT", message: "timed out" }, + timestamp: "2026-01-01T00:00:01Z", + }); + + const result = root.getResult("a"); + expect(result).toBeDefined(); + expect(result!.status).toBe("failed"); + expect(result!.output).toBeUndefined(); + expect(result!.error).toEqual({ code: "TIMEOUT", message: "timed out" }); + + root.dispose(); + }); + + it("returns aborted result from call.aborted event", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + root.append({ + type: "call.aborted", + requestId: "req-1", + timestamp: "2026-01-01T00:00:01Z", + }); + + const result = root.getResult("a"); + expect(result).toBeDefined(); + expect(result!.status).toBe("aborted"); + expect(result!.output).toBeUndefined(); + + root.dispose(); + }); + + it("returns undefined when only call.requested has been received", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + + expect(root.getResult("a")).toBeUndefined(); + + root.dispose(); + }); + + it("uses most recent terminal event for retries", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-2"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + root.append({ + type: "call.error", + requestId: "req-1", + error: { code: "ERR", message: "first attempt failed" }, + timestamp: "2026-01-01T00:00:01Z", + }); + root.append({ + type: "call.requested", + requestId: "req-2", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:02Z", + }); + root.append({ + type: "call.responded", + requestId: "req-2", + output: { ok: true }, + timestamp: "2026-01-01T00:00:03Z", + }); + + const result = root.getResult("a"); + expect(result).toBeDefined(); + expect(result!.status).toBe("completed"); + expect(result!.output).toEqual({ ok: true }); + + root.dispose(); + }); + }); + + describe("getEvents", () => { + it("returns empty array for unmapped node", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.getEvents("a")).toEqual([]); + + root.dispose(); + }); + + it("returns events for a mapped node", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-1"); + root.append({ + type: "call.requested", + requestId: "req-1", + operationId: "a", + input: null, + timestamp: "2026-01-01T00:00:00Z", + }); + root.append({ + type: "call.responded", + requestId: "req-1", + output: 42, + timestamp: "2026-01-01T00:00:01Z", + }); + + const events = root.getEvents("a"); + expect(events).toHaveLength(2); + expect(events[0]!.type).toBe("call.requested"); + expect(events[1]!.type).toBe("call.responded"); + + root.dispose(); + }); + }); + + describe("abort cascade", () => { + it("failed node causes downstream dependents to abort (continue-running default)", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "failed"; + + expect(root.statusMap.get("b")!.value).toBe("aborted"); + expect(root.statusMap.get("c")!.value).toBe("aborted"); + + root.dispose(); + }); + + it("aborted node causes downstream dependents to abort", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "aborted"; + + expect(root.statusMap.get("b")!.value).toBe("aborted"); + expect(root.statusMap.get("c")!.value).toBe("aborted"); + + root.dispose(); + }); + + it("sibling branches are independent - failure on one side does not abort the other", () => { + const graph = makeDiamondGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("top")!.value = "completed"; + root.statusMap.get("left")!.value = "running"; + root.statusMap.get("right")!.value = "failed"; + + expect(root.statusMap.get("left")!.value).toBe("running"); + expect(root.statusMap.get("bottom")!.value).toBe("aborted"); + + root.dispose(); + }); + + it("failed node does not abort already completed nodes", () => { + const graph = makeDiamondGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("top")!.value = "completed"; + root.statusMap.get("left")!.value = "completed"; + root.statusMap.get("right")!.value = "failed"; + + expect(root.statusMap.get("left")!.value).toBe("completed"); + + root.dispose(); + }); + }); + + describe("failurePolicy: abort-dependents", () => { + it("abort-dependents policy aborts idle dependents of a failed node", () => { + const graph = makeForkJoinGraph(); + const root = new WorkflowReactiveRoot(graph, { + failurePolicy: "abort-dependents", + }); + + root.statusMap.get("source")!.value = "completed"; + root.statusMap.get("fail-node")!.value = "running"; + root.statusMap.get("run-node")!.value = "running"; + root.statusMap.get("fail-node")!.value = "failed"; + + expect(root.statusMap.get("sink")!.value).toBe("aborted"); + + root.dispose(); + }); + }); + + describe("abortAll", () => { + it("sets all non-terminal nodes to aborted", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "completed"; + + root.abortAll(); + + expect(root.statusMap.get("a")!.value).toBe("completed"); + expect(root.statusMap.get("b")!.value).toBe("aborted"); + expect(root.statusMap.get("c")!.value).toBe("aborted"); + + root.dispose(); + }); + + it("does not change already failed nodes", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "failed"; + + root.abortAll(); + + expect(root.statusMap.get("a")!.value).toBe("failed"); + + root.dispose(); + }); + }); + + describe("abortNode", () => { + it("aborts a specific non-terminal node", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.abortNode("b"); + + expect(root.statusMap.get("b")!.value).toBe("aborted"); + + root.dispose(); + }); + + it("does not abort already completed nodes", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "completed"; + + root.abortNode("a"); + + expect(root.statusMap.get("a")!.value).toBe("completed"); + + root.dispose(); + }); + + it("does not throw for unknown node", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(() => root.abortNode("nonexistent")).not.toThrow(); + + root.dispose(); + }); + }); + + describe("isComplete", () => { + it("returns false when any node is non-terminal", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.isComplete()).toBe(false); + + root.dispose(); + }); + + it("returns true when all nodes are in terminal state", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "completed"; + root.statusMap.get("b")!.value = "skipped"; + root.statusMap.get("c")!.value = "failed"; + + expect(root.isComplete()).toBe(true); + + root.dispose(); + }); + }); + + describe("getAggregateStatus", () => { + it("returns correct counts", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "completed"; + root.statusMap.get("b")!.value = "running"; + root.statusMap.get("c")!.value = "idle"; + + const agg = root.getAggregateStatus(); + expect(agg.completed).toBe(1); + expect(agg.running).toBe(1); + expect(agg.idle).toBe(1); + expect(agg.total).toBe(3); + expect(agg.failed).toBe(0); + expect(agg.aborted).toBe(0); + expect(agg.skipped).toBe(0); + expect(agg.waiting).toBe(0); + expect(agg.ready).toBe(0); + + root.dispose(); + }); + }); + + describe("dispose", () => { + it("clears all maps", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.dispose(); + + expect(root.statusMap.size).toBe(0); + expect(root.preconditions.size).toBe(0); + expect(root.blockedByFailure.size).toBe(0); + expect(root.resultMap.size).toBe(0); + expect(root.nodeKeyToRequestId.size).toBe(0); + }); + + it("effects no longer fire after dispose", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + const bStatusValue = root.statusMap.get("b")!.value; + root.dispose(); + + expect(bStatusValue).toBeDefined(); + }); + }); + + describe("precondition reactivity", () => { + it("preconditions update when predecessor completes", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + expect(root.preconditions.get("b")!.value).toBe(false); + + root.statusMap.get("a")!.value = "completed"; + + expect(root.preconditions.get("b")!.value).toBe(true); + + root.dispose(); + }); + + it("skipped predecessor satisfies preconditions", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "skipped"; + + expect(root.preconditions.get("b")!.value).toBe(true); + + root.dispose(); + }); + + it("failed predecessor does not satisfy preconditions", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "failed"; + + expect(root.preconditions.get("b")!.value).toBe(false); + + root.dispose(); + }); + + it("diamond graph: bottom requires both predecessors complete", () => { + const graph = makeDiamondGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("top")!.value = "completed"; + root.statusMap.get("left")!.value = "completed"; + root.statusMap.get("right")!.value = "completed"; + + expect(root.preconditions.get("bottom")!.value).toBe(true); + + root.dispose(); + }); + + it("diamond graph: bottom blocked when one predecessor fails", () => { + const graph = makeDiamondGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("top")!.value = "completed"; + root.statusMap.get("left")!.value = "completed"; + root.statusMap.get("right")!.value = "failed"; + + expect(root.preconditions.get("bottom")!.value).toBe(false); + + root.dispose(); + }); + }); + + describe("blockedByFailure reactivity", () => { + it("detects failed predecessor", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "failed"; + + expect(root.blockedByFailure.get("b")!.value).toBe(true); + expect(root.blockedByFailure.get("c")!.value).toBe(true); + + root.dispose(); + }); + + it("detects aborted predecessor", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "aborted"; + + expect(root.blockedByFailure.get("b")!.value).toBe(true); + + root.dispose(); + }); + + it("completed predecessor does not block by failure", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.statusMap.get("a")!.value = "completed"; + + expect(root.blockedByFailure.get("b")!.value).toBe(false); + + root.dispose(); + }); + }); + + describe("full workflow execution flow", () => { + it("simulates a simple linear workflow: a -> b -> c", () => { + const graph = makeSimpleGraph(); + const root = new WorkflowReactiveRoot(graph); + + root.nodeKeyToRequestId.set("a", "req-a"); + root.nodeKeyToRequestId.set("b", "req-b"); + root.nodeKeyToRequestId.set("c", "req-c"); + + expect(root.getStatus("a")).toBe("idle"); + expect(root.getStatus("b")).toBe("idle"); + expect(root.getStatus("c")).toBe("idle"); + + root.append({ + type: "call.requested", + requestId: "req-a", + operationId: "a", + input: null, + timestamp: "t1", + }); + expect(root.getStatus("a")).toBe("running"); + + root.append({ + type: "call.responded", + requestId: "req-a", + output: "result-a", + timestamp: "t2", + }); + expect(root.getStatus("a")).toBe("completed"); + expect(root.preconditions.get("b")!.value).toBe(true); + + root.append({ + type: "call.requested", + requestId: "req-b", + operationId: "b", + input: null, + timestamp: "t3", + }); + expect(root.getStatus("b")).toBe("running"); + + root.append({ + type: "call.responded", + requestId: "req-b", + output: "result-b", + timestamp: "t4", + }); + expect(root.getStatus("b")).toBe("completed"); + expect(root.preconditions.get("c")!.value).toBe(true); + + root.append({ + type: "call.requested", + requestId: "req-c", + operationId: "c", + input: null, + timestamp: "t5", + }); + root.append({ + type: "call.responded", + requestId: "req-c", + output: "result-c", + timestamp: "t6", + }); + expect(root.getStatus("c")).toBe("completed"); + expect(root.isComplete()).toBe(true); + + expect(root.getResult("a")!.output).toBe("result-a"); + expect(root.getResult("b")!.output).toBe("result-b"); + expect(root.getResult("c")!.output).toBe("result-c"); + + root.dispose(); + }); + }); +});