feat(reactive): implement WorkflowReactiveRoot with signal graph, EventLogProjection, lifecycle, abort, and dispose
This commit is contained in:
@@ -1 +1,337 @@
|
||||
export {};
|
||||
import { signal, computed, effect } from "@preact/signals-core";
|
||||
import type { Signal, ReadonlySignal } from "@preact/signals-core";
|
||||
import type { DirectedGraph } from "graphology";
|
||||
import type { NodeStatus } from "../schema/enums.js";
|
||||
import type { CallResult } from "../schema/edge.js";
|
||||
|
||||
export type FailurePolicy = "continue-running" | "abort-dependents";
|
||||
|
||||
export interface CallRequestedEvent {
|
||||
type: "call.requested";
|
||||
requestId: string;
|
||||
operationId: string;
|
||||
input: unknown;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
export interface CallRespondedEvent {
|
||||
type: "call.responded";
|
||||
requestId: string;
|
||||
output: unknown;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
export interface CallErrorEvent {
|
||||
type: "call.error";
|
||||
requestId: string;
|
||||
error: { code: string; message: string; details?: unknown };
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
export interface CallAbortedEvent {
|
||||
type: "call.aborted";
|
||||
requestId: string;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
export interface CallCompletedEvent {
|
||||
type: "call.completed";
|
||||
requestId: string;
|
||||
output: unknown;
|
||||
timestamp: string;
|
||||
}
|
||||
|
||||
export type CallEventMapValue =
|
||||
| CallRequestedEvent
|
||||
| CallRespondedEvent
|
||||
| CallErrorEvent
|
||||
| CallAbortedEvent
|
||||
| CallCompletedEvent;
|
||||
|
||||
export interface EventLogProjection {
|
||||
append(event: CallEventMapValue): void;
|
||||
getStatus(nodeId: string): NodeStatus;
|
||||
getResult(nodeId: string): CallResult | undefined;
|
||||
getEvents(nodeId: string): CallEventMapValue[];
|
||||
}
|
||||
|
||||
export interface AggregateStatus {
|
||||
completed: number;
|
||||
failed: number;
|
||||
aborted: number;
|
||||
skipped: number;
|
||||
running: number;
|
||||
waiting: number;
|
||||
ready: number;
|
||||
idle: number;
|
||||
total: number;
|
||||
}
|
||||
|
||||
const TERMINAL_STATUSES: Set<NodeStatus> = new Set([
|
||||
"completed",
|
||||
"failed",
|
||||
"aborted",
|
||||
"skipped",
|
||||
]);
|
||||
|
||||
const EVENT_TO_STATUS: Record<string, NodeStatus> = {
|
||||
"call.requested": "running",
|
||||
"call.responded": "completed",
|
||||
"call.error": "failed",
|
||||
"call.aborted": "aborted",
|
||||
"call.completed": "completed",
|
||||
};
|
||||
|
||||
export class WorkflowReactiveRoot implements EventLogProjection {
|
||||
statusMap: Map<string, Signal<NodeStatus>>;
|
||||
preconditions: Map<string, ReadonlySignal<boolean>>;
|
||||
blockedByFailure: Map<string, ReadonlySignal<boolean>>;
|
||||
resultMap: Map<string, ReadonlySignal<CallResult | undefined>>;
|
||||
nodeKeyToRequestId: Map<string, string>;
|
||||
|
||||
private graph: DirectedGraph;
|
||||
private effectDisposers: (() => void)[];
|
||||
private eventLog: CallEventMapValue[];
|
||||
private _failurePolicy: FailurePolicy;
|
||||
|
||||
constructor(
|
||||
graph: DirectedGraph,
|
||||
options?: { failurePolicy?: FailurePolicy },
|
||||
) {
|
||||
this.graph = graph;
|
||||
this.statusMap = new Map();
|
||||
this.preconditions = new Map();
|
||||
this.blockedByFailure = new Map();
|
||||
this.resultMap = new Map();
|
||||
this.effectDisposers = [];
|
||||
this.eventLog = [];
|
||||
this.nodeKeyToRequestId = new Map();
|
||||
this._failurePolicy = options?.failurePolicy ?? "continue-running";
|
||||
this.initializeSignals();
|
||||
}
|
||||
|
||||
private initializeSignals(): void {
|
||||
for (const node of this.graph.nodes()) {
|
||||
const predecessors: string[] = this.graph.inNeighbors(node) ?? [];
|
||||
|
||||
const status = signal<NodeStatus>("idle");
|
||||
|
||||
const preconditionsComputed = computed(() => {
|
||||
return predecessors.every((pred: string) => {
|
||||
const predStatus = this.statusMap.get(pred);
|
||||
if (!predStatus) return false;
|
||||
return (
|
||||
predStatus.value === "completed" || predStatus.value === "skipped"
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
const blockedByFailureComputed = computed(() => {
|
||||
return predecessors.some((pred: string) => {
|
||||
const predStatus = this.statusMap.get(pred);
|
||||
if (!predStatus) return false;
|
||||
return (
|
||||
predStatus.value === "failed" || predStatus.value === "aborted"
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
const resultComputed = computed(() => {
|
||||
const requestId = this.nodeKeyToRequestId.get(node);
|
||||
if (!requestId) return undefined;
|
||||
|
||||
let latestTerminalEvent: CallEventMapValue | undefined;
|
||||
for (let i = this.eventLog.length - 1; i >= 0; i--) {
|
||||
const e = this.eventLog[i]!;
|
||||
if ("requestId" in e && e.requestId === requestId) {
|
||||
if (
|
||||
e.type === "call.responded" ||
|
||||
e.type === "call.error" ||
|
||||
e.type === "call.aborted"
|
||||
) {
|
||||
latestTerminalEvent = e;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!latestTerminalEvent) return undefined;
|
||||
|
||||
if (latestTerminalEvent.type === "call.error") {
|
||||
return {
|
||||
status: "failed" as NodeStatus,
|
||||
output: undefined,
|
||||
error: latestTerminalEvent.error,
|
||||
} as CallResult;
|
||||
}
|
||||
if (latestTerminalEvent.type === "call.responded") {
|
||||
return {
|
||||
status: "completed" as NodeStatus,
|
||||
output: latestTerminalEvent.output,
|
||||
} as CallResult;
|
||||
}
|
||||
if (latestTerminalEvent.type === "call.aborted") {
|
||||
return {
|
||||
status: "aborted" as NodeStatus,
|
||||
output: undefined,
|
||||
} as CallResult;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
});
|
||||
|
||||
this.statusMap.set(node, status);
|
||||
this.preconditions.set(node, preconditionsComputed);
|
||||
this.blockedByFailure.set(node, blockedByFailureComputed);
|
||||
this.resultMap.set(node, resultComputed);
|
||||
}
|
||||
|
||||
for (const node of this.graph.nodes()) {
|
||||
const status = this.statusMap.get(node)!;
|
||||
const blocked = this.blockedByFailure.get(node)!;
|
||||
|
||||
const disposer = effect(() => {
|
||||
if (blocked.value) {
|
||||
const current = status.value;
|
||||
if (current === "idle" || current === "waiting" || current === "ready") {
|
||||
if (this._failurePolicy === "abort-dependents") {
|
||||
if (!TERMINAL_STATUSES.has(current)) {
|
||||
status.value = "aborted";
|
||||
}
|
||||
} else {
|
||||
status.value = "aborted";
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
this.effectDisposers.push(disposer);
|
||||
}
|
||||
}
|
||||
|
||||
append(event: CallEventMapValue): void {
|
||||
this.eventLog.push(event);
|
||||
|
||||
if (!("requestId" in event)) return;
|
||||
|
||||
const nodeId = this.findNodeByRequestId(event.requestId);
|
||||
if (nodeId === undefined) return;
|
||||
|
||||
const statusSignal = this.statusMap.get(nodeId);
|
||||
if (!statusSignal) return;
|
||||
|
||||
const derived = EVENT_TO_STATUS[event.type];
|
||||
if (derived !== undefined) {
|
||||
statusSignal.value = derived;
|
||||
}
|
||||
}
|
||||
|
||||
getStatus(nodeId: string): NodeStatus {
|
||||
const statusSignal = this.statusMap.get(nodeId);
|
||||
if (!statusSignal) return "idle";
|
||||
|
||||
const requestId = this.nodeKeyToRequestId.get(nodeId);
|
||||
if (requestId) {
|
||||
let lastEventType: string | undefined;
|
||||
for (let i = this.eventLog.length - 1; i >= 0; i--) {
|
||||
const e = this.eventLog[i]!;
|
||||
if ("requestId" in e && e.requestId === requestId) {
|
||||
lastEventType = e.type;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (lastEventType && EVENT_TO_STATUS[lastEventType] !== undefined) {
|
||||
return EVENT_TO_STATUS[lastEventType]!;
|
||||
}
|
||||
}
|
||||
|
||||
return statusSignal.value;
|
||||
}
|
||||
|
||||
getResult(nodeId: string): CallResult | undefined {
|
||||
const resultComputed = this.resultMap.get(nodeId);
|
||||
if (!resultComputed) return undefined;
|
||||
return resultComputed.value;
|
||||
}
|
||||
|
||||
getEvents(nodeId: string): CallEventMapValue[] {
|
||||
const requestId = this.nodeKeyToRequestId.get(nodeId);
|
||||
if (!requestId) return [];
|
||||
|
||||
const events: CallEventMapValue[] = [];
|
||||
for (const e of this.eventLog) {
|
||||
if ("requestId" in e && e.requestId === requestId) {
|
||||
events.push(e);
|
||||
}
|
||||
}
|
||||
return events;
|
||||
}
|
||||
|
||||
abortAll(): void {
|
||||
for (const [_nodeId, status] of this.statusMap) {
|
||||
if (!TERMINAL_STATUSES.has(status.value)) {
|
||||
status.value = "aborted";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
abortNode(nodeId: string): void {
|
||||
const status = this.statusMap.get(nodeId);
|
||||
if (!status) return;
|
||||
if (!TERMINAL_STATUSES.has(status.value)) {
|
||||
status.value = "aborted";
|
||||
}
|
||||
}
|
||||
|
||||
isComplete(): boolean {
|
||||
for (const [_nodeId, status] of this.statusMap) {
|
||||
if (!TERMINAL_STATUSES.has(status.value)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
getAggregateStatus(): AggregateStatus {
|
||||
const counts: Record<string, number> = {
|
||||
completed: 0,
|
||||
failed: 0,
|
||||
aborted: 0,
|
||||
skipped: 0,
|
||||
running: 0,
|
||||
waiting: 0,
|
||||
ready: 0,
|
||||
idle: 0,
|
||||
total: 0,
|
||||
};
|
||||
|
||||
for (const [_nodeId, status] of this.statusMap) {
|
||||
const s = status.value;
|
||||
counts["total"]!++;
|
||||
if (s in counts) {
|
||||
counts[s]!++;
|
||||
}
|
||||
}
|
||||
|
||||
return counts as unknown as AggregateStatus;
|
||||
}
|
||||
|
||||
dispose(): void {
|
||||
for (const disposer of this.effectDisposers) {
|
||||
disposer();
|
||||
}
|
||||
this.effectDisposers = [];
|
||||
this.statusMap.clear();
|
||||
this.preconditions.clear();
|
||||
this.blockedByFailure.clear();
|
||||
this.resultMap.clear();
|
||||
this.nodeKeyToRequestId.clear();
|
||||
this.eventLog = [];
|
||||
}
|
||||
|
||||
private findNodeByRequestId(requestId: string): string | undefined {
|
||||
for (const [nodeId, rid] of this.nodeKeyToRequestId) {
|
||||
if (rid === requestId) return nodeId;
|
||||
}
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user