Files
flowgraph/src/reactive/workflow.ts

349 lines
10 KiB
TypeScript

import { signal, computed } from "@preact/signals-core";
import type { Signal, ReadonlySignal } from "@preact/signals-core";
import type { DirectedGraph } from "graphology";
import type { NodeStatus } from "../schema/enums.js";
import type { CallResult } from "../schema/edge.js";
import type { CallEventMapValue } from "../graph/construction.js";
export type { CallEventMapValue } from "../graph/construction.js";
import {
computePreconditions,
computeBlockedByFailure,
registerStartEffect,
registerAbortEffect,
} from "./node-status.js";
import type { NodeStatusContext } from "./node-status.js";
export type FailurePolicy = "continue-running" | "abort-dependents";
export interface ParallelGroup {
siblings: string[];
maxConcurrency?: number;
}
export interface ParallelGroupConfig {
[groupKey: string]: ParallelGroup;
}
export interface EventLogProjection {
append(event: CallEventMapValue): void;
getStatus(nodeId: string): NodeStatus;
getResult(nodeId: string): CallResult | undefined;
getEvents(nodeId: string): CallEventMapValue[];
}
export interface AggregateStatus {
completed: number;
failed: number;
aborted: number;
skipped: number;
running: number;
waiting: number;
ready: number;
idle: number;
total: number;
}
const TERMINAL_STATUSES: Set<NodeStatus> = new Set([
"completed",
"failed",
"aborted",
"skipped",
]);
const EVENT_TO_STATUS: Record<string, NodeStatus> = {
"call.requested": "running",
"call.responded": "completed",
"call.error": "failed",
"call.aborted": "aborted",
"call.completed": "completed",
};
export class WorkflowReactiveRoot implements EventLogProjection {
statusMap: Map<string, Signal<NodeStatus>>;
preconditions: Map<string, ReadonlySignal<boolean>>;
canStart: Map<string, ReadonlySignal<boolean>>;
blockedByFailure: Map<string, ReadonlySignal<boolean>>;
resultMap: Map<string, ReadonlySignal<CallResult | undefined>>;
nodeKeyToRequestId: Map<string, string>;
requestIdToNodeKey: Map<string, string>;
private graph: DirectedGraph;
private effectDisposers: (() => void)[];
private eventLog: CallEventMapValue[];
private _failurePolicy: FailurePolicy;
private _parallelGroups: ParallelGroupConfig;
constructor(
graph: DirectedGraph,
options?: { failurePolicy?: FailurePolicy; parallelGroups?: ParallelGroupConfig },
) {
this.graph = graph;
this.statusMap = new Map();
this.preconditions = new Map();
this.canStart = new Map();
this.blockedByFailure = new Map();
this.resultMap = new Map();
this.effectDisposers = [];
this.eventLog = [];
this.nodeKeyToRequestId = new Map();
this.requestIdToNodeKey = new Map();
this._failurePolicy = options?.failurePolicy ?? "continue-running";
this._parallelGroups = options?.parallelGroups ?? {};
this.initializeSignals();
}
setRequestId(nodeKey: string, requestId: string): void {
this.nodeKeyToRequestId.set(nodeKey, requestId);
this.requestIdToNodeKey.set(requestId, nodeKey);
}
private initializeSignals(): void {
const nodeToGroupKey = new Map<string, string>();
for (const [groupKey, group] of Object.entries(this._parallelGroups)) {
for (const sibling of group.siblings) {
nodeToGroupKey.set(sibling, groupKey);
}
}
for (const node of this.graph.nodes()) {
const predecessors: string[] = this.graph.inNeighbors(node) ?? [];
const status = signal<NodeStatus>("idle");
const ctx: NodeStatusContext = {
statusMap: this.statusMap,
predecessors,
};
const preconditionsComputed = computed(() => {
return computePreconditions(node, ctx);
});
const groupKey = nodeToGroupKey.get(node);
const parallelGroup = groupKey ? this._parallelGroups[groupKey] : undefined;
const maxConc = parallelGroup?.maxConcurrency;
const siblings = parallelGroup?.siblings ?? [];
let canStartComputed: ReadonlySignal<boolean>;
if (maxConc !== undefined && siblings.length > 0) {
const otherSiblings = siblings.filter((s) => s !== node);
canStartComputed = computed(() => {
if (!preconditionsComputed.value) return false;
const activeSiblingCount = otherSiblings.filter((sib) => {
const sibStatus = this.statusMap.get(sib);
return sibStatus && (sibStatus.value === "running" || sibStatus.value === "ready");
}).length;
return activeSiblingCount < maxConc;
});
} else {
canStartComputed = preconditionsComputed;
}
const blockedByFailureComputed = computed(() => {
return computeBlockedByFailure(node, ctx);
});
const resultComputed = computed(() => {
const requestId = this.nodeKeyToRequestId.get(node);
if (!requestId) return undefined;
let latestTerminalEvent: CallEventMapValue | undefined;
for (let i = this.eventLog.length - 1; i >= 0; i--) {
const e = this.eventLog[i]!;
if ("requestId" in e && e.requestId === requestId) {
if (
e.type === "call.responded" ||
e.type === "call.error" ||
e.type === "call.aborted"
) {
latestTerminalEvent = e;
break;
}
}
}
if (!latestTerminalEvent) return undefined;
if (latestTerminalEvent.type === "call.error") {
return {
status: "failed" as NodeStatus,
output: undefined,
error: latestTerminalEvent.error,
} as CallResult;
}
if (latestTerminalEvent.type === "call.responded") {
return {
status: "completed" as NodeStatus,
output: latestTerminalEvent.output,
} as CallResult;
}
if (latestTerminalEvent.type === "call.aborted") {
return {
status: "aborted" as NodeStatus,
output: undefined,
} as CallResult;
}
return undefined;
});
this.statusMap.set(node, status);
this.preconditions.set(node, preconditionsComputed);
this.canStart.set(node, canStartComputed);
this.blockedByFailure.set(node, blockedByFailureComputed);
this.resultMap.set(node, resultComputed);
}
for (const node of this.graph.nodes()) {
const status = this.statusMap.get(node)!;
const canStart = this.canStart.get(node)!;
const blocked = this.blockedByFailure.get(node)!;
registerStartEffect(status, canStart, this.effectDisposers);
registerAbortEffect(status, blocked, this.effectDisposers, {
abortDependents: this._failurePolicy === "abort-dependents",
});
}
}
append(event: CallEventMapValue): void {
this.eventLog.push(event);
if (!("requestId" in event)) return;
let nodeId = this.requestIdToNodeKey.get(event.requestId);
if (nodeId === undefined) {
for (const [nId, rid] of this.nodeKeyToRequestId) {
if (rid === event.requestId) {
nodeId = nId;
this.requestIdToNodeKey.set(event.requestId, nId);
break;
}
}
}
if (nodeId === undefined) return;
const currentRequestId = this.nodeKeyToRequestId.get(nodeId);
if (currentRequestId === event.requestId) {
const statusSignal = this.statusMap.get(nodeId);
if (!statusSignal) return;
const derived = EVENT_TO_STATUS[event.type];
if (derived !== undefined) {
statusSignal.value = derived;
}
}
}
getStatus(nodeId: string): NodeStatus {
const statusSignal = this.statusMap.get(nodeId);
if (!statusSignal) return "idle";
const requestId = this.nodeKeyToRequestId.get(nodeId);
if (requestId) {
let lastEventType: string | undefined;
for (let i = this.eventLog.length - 1; i >= 0; i--) {
const e = this.eventLog[i]!;
if ("requestId" in e && e.requestId === requestId) {
lastEventType = e.type;
break;
}
}
if (lastEventType && EVENT_TO_STATUS[lastEventType] !== undefined) {
return EVENT_TO_STATUS[lastEventType]!;
}
}
return statusSignal.value;
}
getResult(nodeId: string): CallResult | undefined {
const resultComputed = this.resultMap.get(nodeId);
if (!resultComputed) return undefined;
return resultComputed.value;
}
getEvents(nodeId: string): CallEventMapValue[] {
const requestIds = new Set<string>();
for (const [rid, nId] of this.requestIdToNodeKey) {
if (nId === nodeId) {
requestIds.add(rid);
}
}
if (requestIds.size === 0) return [];
const events: CallEventMapValue[] = [];
for (const e of this.eventLog) {
if ("requestId" in e && requestIds.has(e.requestId)) {
events.push(e);
}
}
return events;
}
abortAll(): void {
for (const [_nodeId, status] of this.statusMap) {
if (!TERMINAL_STATUSES.has(status.value)) {
status.value = "aborted";
}
}
}
abortNode(nodeId: string): void {
const status = this.statusMap.get(nodeId);
if (!status) return;
if (!TERMINAL_STATUSES.has(status.value)) {
status.value = "aborted";
}
}
isComplete(): boolean {
for (const [_nodeId, status] of this.statusMap) {
if (!TERMINAL_STATUSES.has(status.value)) {
return false;
}
}
return true;
}
getAggregateStatus(): AggregateStatus {
const counts: Record<string, number> = {
completed: 0,
failed: 0,
aborted: 0,
skipped: 0,
running: 0,
waiting: 0,
ready: 0,
idle: 0,
total: 0,
};
for (const [_nodeId, status] of this.statusMap) {
const s = status.value;
counts["total"]!++;
if (s in counts) {
counts[s]!++;
}
}
return counts as unknown as AggregateStatus;
}
dispose(): void {
for (const disposer of this.effectDisposers) {
disposer();
}
this.effectDisposers = [];
this.statusMap.clear();
this.preconditions.clear();
this.canStart.clear();
this.blockedByFailure.clear();
this.resultMap.clear();
this.nodeKeyToRequestId.clear();
this.requestIdToNodeKey.clear();
this.eventLog = [];
}
}