482 lines
16 KiB
TypeScript
482 lines
16 KiB
TypeScript
import { describe, it, expect } from "vitest";
|
|
import { DirectedGraph } from "graphology";
|
|
import { WorkflowReactiveRoot } from "../../src/reactive/workflow.js";
|
|
import type { ParallelGroupConfig } from "../../src/reactive/workflow.js";
|
|
|
|
function makeParallelGroupGraph(): DirectedGraph {
|
|
const graph = new DirectedGraph();
|
|
graph.addNode("entry", { name: "entry" });
|
|
graph.addNode("a", { name: "a" });
|
|
graph.addNode("b", { name: "b" });
|
|
graph.addNode("c", { name: "c" });
|
|
graph.addNode("d", { name: "d" });
|
|
graph.addEdgeWithKey("entry->a", "entry", "a", { edgeType: "sequential" });
|
|
graph.addEdgeWithKey("entry->b", "entry", "b", { edgeType: "sequential" });
|
|
graph.addEdgeWithKey("entry->c", "entry", "c", { edgeType: "sequential" });
|
|
graph.addEdgeWithKey("entry->d", "entry", "d", { edgeType: "sequential" });
|
|
return graph;
|
|
}
|
|
|
|
function makeParallelGroupNoEntry(): DirectedGraph {
|
|
const graph = new DirectedGraph();
|
|
graph.addNode("a", { name: "a" });
|
|
graph.addNode("b", { name: "b" });
|
|
graph.addNode("c", { name: "c" });
|
|
return graph;
|
|
}
|
|
|
|
describe("maxConcurrency semaphore", () => {
|
|
describe("parallel group with maxConcurrency: 2", () => {
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c", "d"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
|
|
it("limits running siblings to maxConcurrency", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
expect(root.canStart.has("a")).toBe(true);
|
|
expect(root.canStart.has("b")).toBe(true);
|
|
expect(root.canStart.has("c")).toBe(true);
|
|
expect(root.canStart.has("d")).toBe(true);
|
|
|
|
expect(root.statusMap.get("entry")!.value).toBe("ready");
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
expect(root.preconditions.get("a")!.value).toBe(true);
|
|
expect(root.preconditions.get("b")!.value).toBe(true);
|
|
expect(root.preconditions.get("c")!.value).toBe(true);
|
|
expect(root.preconditions.get("d")!.value).toBe(true);
|
|
|
|
expect(root.canStart.get("a")!.value).toBe(true);
|
|
expect(root.canStart.get("b")!.value).toBe(true);
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("canStart becomes false when maxConcurrency siblings are running", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
expect(root.canStart.get("a")!.value).toBe(true);
|
|
expect(root.canStart.get("b")!.value).toBe(true);
|
|
expect(root.canStart.get("c")!.value).toBe(false);
|
|
expect(root.canStart.get("d")!.value).toBe(false);
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("blocked nodes by semaphore stay idle (not ready)", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
expect(root.statusMap.get("c")!.value).toBe("idle");
|
|
expect(root.statusMap.get("d")!.value).toBe("idle");
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("slot opens when a running sibling completes", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(false);
|
|
|
|
root.statusMap.get("a")!.value = "completed";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(true);
|
|
expect(root.canStart.get("d")!.value).toBe(false);
|
|
|
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("all slots eventually open as siblings complete", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
root.statusMap.get("a")!.value = "completed";
|
|
expect(root.canStart.get("c")!.value).toBe(true);
|
|
|
|
root.statusMap.get("b")!.value = "completed";
|
|
expect(root.canStart.get("d")!.value).toBe(true);
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("canStart correctly limits when sibling transitions to ready", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
root.statusMap.get("a")!.value = "ready";
|
|
root.statusMap.get("b")!.value = "ready";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(false);
|
|
expect(root.canStart.get("d")!.value).toBe(false);
|
|
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(false);
|
|
expect(root.canStart.get("d")!.value).toBe(false);
|
|
|
|
root.dispose();
|
|
});
|
|
});
|
|
|
|
describe("parallel group without maxConcurrency", () => {
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c", "d"],
|
|
},
|
|
};
|
|
|
|
it("all siblings start immediately when preconditions are met", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
expect(root.canStart.get("a")!.value).toBe(true);
|
|
expect(root.canStart.get("b")!.value).toBe(true);
|
|
expect(root.canStart.get("c")!.value).toBe(true);
|
|
expect(root.canStart.get("d")!.value).toBe(true);
|
|
|
|
expect(root.statusMap.get("a")!.value).toBe("ready");
|
|
expect(root.statusMap.get("b")!.value).toBe("ready");
|
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
|
expect(root.statusMap.get("d")!.value).toBe("ready");
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("no semaphore — running siblings do not block others", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(true);
|
|
expect(root.canStart.get("d")!.value).toBe(true);
|
|
|
|
root.dispose();
|
|
});
|
|
});
|
|
|
|
describe("no parallelGroups config", () => {
|
|
it("all nodes start normally without semaphore", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph);
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
expect(root.canStart.get("a")!.value).toBe(true);
|
|
expect(root.canStart.get("b")!.value).toBe(true);
|
|
expect(root.canStart.get("c")!.value).toBe(true);
|
|
expect(root.canStart.get("d")!.value).toBe(true);
|
|
|
|
expect(root.statusMap.get("a")!.value).toBe("ready");
|
|
expect(root.statusMap.get("b")!.value).toBe("ready");
|
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
|
expect(root.statusMap.get("d")!.value).toBe("ready");
|
|
|
|
root.dispose();
|
|
});
|
|
});
|
|
|
|
describe("maxConcurrency: 1 (serial within parallel)", () => {
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c"],
|
|
maxConcurrency: 1,
|
|
},
|
|
};
|
|
|
|
it("only one sibling can run at a time", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
const canStartA = root.canStart.get("a")!;
|
|
const canStartB = root.canStart.get("b")!;
|
|
const canStartC = root.canStart.get("c")!;
|
|
|
|
expect(canStartA.value).toBe(true);
|
|
expect(canStartB.value).toBe(false);
|
|
expect(canStartC.value).toBe(false);
|
|
|
|
root.statusMap.get("a")!.value = "running";
|
|
|
|
expect(canStartA.value).toBe(true);
|
|
expect(canStartB.value).toBe(false);
|
|
expect(canStartC.value).toBe(false);
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("next sibling can start when current finishes", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("a")!.value = "completed";
|
|
|
|
expect(root.canStart.get("b")!.value).toBe(true);
|
|
expect(root.canStart.get("c")!.value).toBe(false);
|
|
|
|
root.dispose();
|
|
});
|
|
});
|
|
|
|
describe("root-level parallel group with maxConcurrency", () => {
|
|
it("limits concurrent start for root nodes with no predecessors", () => {
|
|
const graph = makeParallelGroupNoEntry();
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
root: {
|
|
siblings: ["a", "b", "c"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
expect(root.canStart.get("a")!.value).toBe(true);
|
|
expect(root.canStart.get("b")!.value).toBe(true);
|
|
expect(root.canStart.get("c")!.value).toBe(false);
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("third node becomes ready when a slot opens", () => {
|
|
const graph = makeParallelGroupNoEntry();
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
root: {
|
|
siblings: ["a", "b", "c"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(false);
|
|
expect(root.statusMap.get("c")!.value).toBe("idle");
|
|
|
|
root.statusMap.get("a")!.value = "completed";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(true);
|
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
|
|
|
root.dispose();
|
|
});
|
|
});
|
|
|
|
describe("preconditions vs canStart", () => {
|
|
it("canStart is false when preconditions are not met even if semaphore has slots", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c", "d"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
expect(root.preconditions.get("a")!.value).toBe(false);
|
|
expect(root.canStart.get("a")!.value).toBe(false);
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("canStart requires both preconditions AND semaphore slot", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c", "d"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
expect(root.preconditions.get("a")!.value).toBe(true);
|
|
expect(root.canStart.get("a")!.value).toBe(true);
|
|
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
expect(root.preconditions.get("c")!.value).toBe(true);
|
|
expect(root.canStart.get("c")!.value).toBe(false);
|
|
|
|
root.dispose();
|
|
});
|
|
});
|
|
|
|
describe("failed sibling does not count as running", () => {
|
|
it("failed sibling frees a slot for semaphore", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c", "d"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(false);
|
|
|
|
root.statusMap.get("b")!.value = "failed";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(true);
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("completed sibling frees a slot for semaphore", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c", "d"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
root.statusMap.get("b")!.value = "completed";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(true);
|
|
expect(root.canStart.get("d")!.value).toBe(false);
|
|
|
|
root.dispose();
|
|
});
|
|
|
|
it("aborted sibling frees a slot for semaphore", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c", "d"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
root.statusMap.get("b")!.value = "aborted";
|
|
|
|
expect(root.canStart.get("c")!.value).toBe(true);
|
|
|
|
root.dispose();
|
|
});
|
|
});
|
|
|
|
describe("dispose clears canStart", () => {
|
|
it("canStart map is cleared on dispose", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c", "d"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
expect(root.canStart.size).toBe(5);
|
|
|
|
root.dispose();
|
|
|
|
expect(root.canStart.size).toBe(0);
|
|
});
|
|
});
|
|
|
|
describe("full execution flow with maxConcurrency", () => {
|
|
it("simulates parallel execution with maxConcurrency: 2 — only 2 run at a time", () => {
|
|
const graph = makeParallelGroupGraph();
|
|
const parallelGroups: ParallelGroupConfig = {
|
|
group1: {
|
|
siblings: ["a", "b", "c", "d"],
|
|
maxConcurrency: 2,
|
|
},
|
|
};
|
|
const root = new WorkflowReactiveRoot(graph, { parallelGroups });
|
|
|
|
root.setRequestId("entry", "req-entry");
|
|
root.setRequestId("a", "req-a");
|
|
root.setRequestId("b", "req-b");
|
|
root.setRequestId("c", "req-c");
|
|
root.setRequestId("d", "req-d");
|
|
|
|
root.statusMap.get("entry")!.value = "completed";
|
|
|
|
expect(root.statusMap.get("a")!.value).toBe("ready");
|
|
expect(root.statusMap.get("b")!.value).toBe("ready");
|
|
expect(root.statusMap.get("c")!.value).toBe("idle");
|
|
expect(root.statusMap.get("d")!.value).toBe("idle");
|
|
|
|
root.statusMap.get("a")!.value = "running";
|
|
root.statusMap.get("b")!.value = "running";
|
|
|
|
expect(root.statusMap.get("c")!.value).toBe("idle");
|
|
expect(root.statusMap.get("d")!.value).toBe("idle");
|
|
|
|
root.statusMap.get("a")!.value = "completed";
|
|
|
|
expect(root.statusMap.get("c")!.value).toBe("ready");
|
|
expect(root.statusMap.get("d")!.value).toBe("idle");
|
|
|
|
root.statusMap.get("c")!.value = "running";
|
|
|
|
root.statusMap.get("b")!.value = "completed";
|
|
|
|
expect(root.statusMap.get("d")!.value).toBe("ready");
|
|
|
|
root.statusMap.get("d")!.value = "running";
|
|
root.statusMap.get("c")!.value = "completed";
|
|
root.statusMap.get("d")!.value = "completed";
|
|
|
|
expect(root.isComplete()).toBe(true);
|
|
|
|
root.dispose();
|
|
});
|
|
});
|
|
}); |