feat: implement node status signal management with computed preconditions and blockedByFailure
- Add computePreconditions and computeBlockedByFailure functions to node-status.ts - Add registerStartEffect and registerAbortEffect for automatic state transitions - Start effect: idle/waiting -> ready when preconditions met - Abort effect: idle/waiting -> aborted when blockedByFailure true - Refactor WorkflowReactiveRoot to use node-status.ts functions - Root nodes auto-transition from idle to ready (no predecessors = preconditions true) - Add AbortEffectOptions with abortDependents policy support - Add comprehensive unit tests for all precondition and failure isolation scenarios
This commit is contained in:
@@ -1,8 +1,15 @@
|
||||
import { signal, computed, effect } from "@preact/signals-core";
|
||||
import { signal, computed } from "@preact/signals-core";
|
||||
import type { Signal, ReadonlySignal } from "@preact/signals-core";
|
||||
import type { DirectedGraph } from "graphology";
|
||||
import type { NodeStatus } from "../schema/enums.js";
|
||||
import type { CallResult } from "../schema/edge.js";
|
||||
import {
|
||||
computePreconditions,
|
||||
computeBlockedByFailure,
|
||||
registerStartEffect,
|
||||
registerAbortEffect,
|
||||
} from "./node-status.js";
|
||||
import type { NodeStatusContext } from "./node-status.js";
|
||||
|
||||
export type FailurePolicy = "continue-running" | "abort-dependents";
|
||||
|
||||
@@ -123,24 +130,17 @@ export class WorkflowReactiveRoot implements EventLogProjection {
|
||||
|
||||
const status = signal<NodeStatus>("idle");
|
||||
|
||||
const ctx: NodeStatusContext = {
|
||||
statusMap: this.statusMap,
|
||||
predecessors,
|
||||
};
|
||||
|
||||
const preconditionsComputed = computed(() => {
|
||||
return predecessors.every((pred: string) => {
|
||||
const predStatus = this.statusMap.get(pred);
|
||||
if (!predStatus) return false;
|
||||
return (
|
||||
predStatus.value === "completed" || predStatus.value === "skipped"
|
||||
);
|
||||
});
|
||||
return computePreconditions(node, ctx);
|
||||
});
|
||||
|
||||
const blockedByFailureComputed = computed(() => {
|
||||
return predecessors.some((pred: string) => {
|
||||
const predStatus = this.statusMap.get(pred);
|
||||
if (!predStatus) return false;
|
||||
return (
|
||||
predStatus.value === "failed" || predStatus.value === "aborted"
|
||||
);
|
||||
});
|
||||
return computeBlockedByFailure(node, ctx);
|
||||
});
|
||||
|
||||
const resultComputed = computed(() => {
|
||||
@@ -195,23 +195,13 @@ export class WorkflowReactiveRoot implements EventLogProjection {
|
||||
|
||||
for (const node of this.graph.nodes()) {
|
||||
const status = this.statusMap.get(node)!;
|
||||
const preconditions = this.preconditions.get(node)!;
|
||||
const blocked = this.blockedByFailure.get(node)!;
|
||||
|
||||
const disposer = effect(() => {
|
||||
if (blocked.value) {
|
||||
const current = status.value;
|
||||
if (current === "idle" || current === "waiting" || current === "ready") {
|
||||
if (this._failurePolicy === "abort-dependents") {
|
||||
if (!TERMINAL_STATUSES.has(current)) {
|
||||
status.value = "aborted";
|
||||
}
|
||||
} else {
|
||||
status.value = "aborted";
|
||||
}
|
||||
}
|
||||
}
|
||||
registerStartEffect(status, preconditions, this.effectDisposers);
|
||||
registerAbortEffect(status, blocked, this.effectDisposers, {
|
||||
abortDependents: this._failurePolicy === "abort-dependents",
|
||||
});
|
||||
this.effectDisposers.push(disposer);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -354,4 +344,4 @@ export class WorkflowReactiveRoot implements EventLogProjection {
|
||||
this.requestIdToNodeKey.clear();
|
||||
this.eventLog = [];
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user