Merge branch 'feat/host/reactive'
This commit is contained in:
@@ -1,2 +1,4 @@
|
||||
export { GraphologyHostConfig } from "./graphology.js";
|
||||
export type { WorkflowTag, GraphNode, GraphContext, OperationRegistry } from "./graphology.js";
|
||||
export type { WorkflowTag, GraphNode, GraphContext, OperationRegistry as GraphOperationRegistry } from "./graphology.js";
|
||||
export { ReactiveHostConfig } from "./reactive.js";
|
||||
export type { WorkflowNode, ReactiveContext, OperationRegistry } from "./reactive.js";
|
||||
@@ -1 +1,250 @@
|
||||
export {};
|
||||
import { signal, computed } from "@preact/signals-core";
|
||||
import type { Signal, ReadonlySignal } from "@preact/signals-core";
|
||||
import type { HostConfig } from "@alkdev/ujsx";
|
||||
import type { NodeStatus } from "../schema/enums.js";
|
||||
import type { CallResult } from "../schema/edge.js";
|
||||
import type { EventLogProjection } from "../reactive/workflow.js";
|
||||
import type { WorkflowTag } from "./graphology.js";
|
||||
|
||||
export interface OperationRegistry {
|
||||
resolve(name: string): unknown;
|
||||
}
|
||||
|
||||
export interface WorkflowNode {
|
||||
key: string;
|
||||
type: WorkflowTag;
|
||||
status: Signal<NodeStatus>;
|
||||
preconditions: ReadonlySignal<boolean>;
|
||||
blockedByFailure: ReadonlySignal<boolean>;
|
||||
operationId?: string;
|
||||
output?: Signal<unknown>;
|
||||
children: WorkflowNode[];
|
||||
}
|
||||
|
||||
export interface ReactiveContext {
|
||||
operationRegistry: OperationRegistry;
|
||||
nodes: Map<string, WorkflowNode>;
|
||||
statusSignals: Map<string, Signal<NodeStatus>>;
|
||||
preconditions: Map<string, ReadonlySignal<boolean>>;
|
||||
blockedByFailure: Map<string, ReadonlySignal<boolean>>;
|
||||
resultProjection: EventLogProjection;
|
||||
parentMap: Map<string, string>;
|
||||
siblingMap: Map<string, string[]>;
|
||||
results: Map<string, ReadonlySignal<CallResult | undefined>>;
|
||||
_containerCounter: number;
|
||||
}
|
||||
|
||||
function collectLeafPredecessors(
|
||||
nodeKey: string,
|
||||
ctx: ReactiveContext,
|
||||
): string[] {
|
||||
const parentKey = ctx.parentMap.get(nodeKey);
|
||||
if (!parentKey) return [];
|
||||
|
||||
const parentNode = ctx.nodes.get(parentKey);
|
||||
if (!parentNode) return [];
|
||||
|
||||
const siblings = ctx.siblingMap.get(parentKey);
|
||||
if (!siblings) return [];
|
||||
|
||||
const idx = siblings.indexOf(nodeKey);
|
||||
|
||||
switch (parentNode.type) {
|
||||
case "sequential": {
|
||||
if (idx > 0) {
|
||||
const prevKey = siblings[idx - 1]!;
|
||||
const prevNode = ctx.nodes.get(prevKey);
|
||||
if (prevNode && prevNode.type === "operation") {
|
||||
return [prevKey];
|
||||
}
|
||||
return collectLeafPredecessors(prevKey, ctx);
|
||||
}
|
||||
return collectLeafPredecessors(parentKey, ctx);
|
||||
}
|
||||
case "parallel":
|
||||
case "map":
|
||||
case "conditional": {
|
||||
return collectLeafPredecessors(parentKey, ctx);
|
||||
}
|
||||
default:
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
function computePreconditions(
|
||||
node: WorkflowNode,
|
||||
ctx: ReactiveContext,
|
||||
): boolean {
|
||||
const predecessors = collectLeafPredecessors(node.key, ctx);
|
||||
if (predecessors.length === 0) return true;
|
||||
return predecessors.every((predKey) => {
|
||||
const predStatus = ctx.statusSignals.get(predKey);
|
||||
if (!predStatus) return true;
|
||||
return predStatus.value === "completed" || predStatus.value === "skipped";
|
||||
});
|
||||
}
|
||||
|
||||
function computeBlockedByFailure(
|
||||
node: WorkflowNode,
|
||||
ctx: ReactiveContext,
|
||||
): boolean {
|
||||
const predecessors = collectLeafPredecessors(node.key, ctx);
|
||||
return predecessors.some((predKey) => {
|
||||
const predStatus = ctx.statusSignals.get(predKey);
|
||||
if (!predStatus) return false;
|
||||
return predStatus.value === "failed" || predStatus.value === "aborted";
|
||||
});
|
||||
}
|
||||
|
||||
export const ReactiveHostConfig: HostConfig<WorkflowTag, WorkflowNode, ReactiveContext> = {
|
||||
name: "reactive",
|
||||
|
||||
createRootContext(_container, options) {
|
||||
const ctx: ReactiveContext = {
|
||||
operationRegistry: options?.registry as OperationRegistry ?? { resolve: () => undefined },
|
||||
nodes: new Map(),
|
||||
statusSignals: new Map(),
|
||||
preconditions: new Map(),
|
||||
blockedByFailure: new Map(),
|
||||
resultProjection: options?.resultProjection as EventLogProjection ?? {
|
||||
append() {},
|
||||
getStatus: () => "idle" as NodeStatus,
|
||||
getResult: () => undefined,
|
||||
getEvents: () => [],
|
||||
},
|
||||
parentMap: new Map(),
|
||||
siblingMap: new Map(),
|
||||
results: new Map(),
|
||||
_containerCounter: 0,
|
||||
};
|
||||
return ctx;
|
||||
},
|
||||
|
||||
createInstance(tag, props, ctx, parent) {
|
||||
if (tag === "operation") {
|
||||
const key = props.name as string;
|
||||
const status = signal<NodeStatus>("idle");
|
||||
|
||||
const node: WorkflowNode = {
|
||||
key,
|
||||
type: "operation",
|
||||
status,
|
||||
preconditions: computed(() => computePreconditions(node, ctx)),
|
||||
blockedByFailure: computed(() => computeBlockedByFailure(node, ctx)),
|
||||
operationId: key,
|
||||
output: signal<unknown>(undefined),
|
||||
children: [],
|
||||
};
|
||||
|
||||
ctx.nodes.set(key, node);
|
||||
ctx.statusSignals.set(key, status);
|
||||
ctx.preconditions.set(key, node.preconditions);
|
||||
ctx.blockedByFailure.set(key, node.blockedByFailure);
|
||||
|
||||
if (parent) {
|
||||
ctx.parentMap.set(key, parent.key);
|
||||
const siblings = ctx.siblingMap.get(parent.key);
|
||||
if (siblings) {
|
||||
siblings.push(key);
|
||||
} else {
|
||||
ctx.siblingMap.set(parent.key, [key]);
|
||||
}
|
||||
}
|
||||
|
||||
return node;
|
||||
}
|
||||
|
||||
const counter = ctx._containerCounter++;
|
||||
const key = `__${tag}_${counter}`;
|
||||
const status = signal<NodeStatus>("idle");
|
||||
|
||||
const node: WorkflowNode = {
|
||||
key,
|
||||
type: tag,
|
||||
status,
|
||||
preconditions: computed(() => computePreconditions(node, ctx)),
|
||||
blockedByFailure: computed(() => computeBlockedByFailure(node, ctx)),
|
||||
children: [],
|
||||
};
|
||||
|
||||
ctx.nodes.set(key, node);
|
||||
ctx.statusSignals.set(key, status);
|
||||
ctx.preconditions.set(key, node.preconditions);
|
||||
ctx.blockedByFailure.set(key, node.blockedByFailure);
|
||||
|
||||
if (parent) {
|
||||
ctx.parentMap.set(key, parent.key);
|
||||
const siblings = ctx.siblingMap.get(parent.key);
|
||||
if (siblings) {
|
||||
siblings.push(key);
|
||||
} else {
|
||||
ctx.siblingMap.set(parent.key, [key]);
|
||||
}
|
||||
}
|
||||
|
||||
if (!ctx.siblingMap.has(key)) {
|
||||
ctx.siblingMap.set(key, []);
|
||||
}
|
||||
|
||||
return node;
|
||||
},
|
||||
|
||||
createTextInstance(_text, ctx, _parent) {
|
||||
const counter = ctx._containerCounter++;
|
||||
const key = `__text_${counter}`;
|
||||
const status = signal<NodeStatus>("idle");
|
||||
const node: WorkflowNode = {
|
||||
key,
|
||||
type: "sequential" as WorkflowTag,
|
||||
status,
|
||||
preconditions: computed(() => true),
|
||||
blockedByFailure: computed(() => false),
|
||||
children: [],
|
||||
};
|
||||
ctx.nodes.set(key, node);
|
||||
ctx.statusSignals.set(key, status);
|
||||
return node;
|
||||
},
|
||||
|
||||
appendChild(parent, child, _ctx) {
|
||||
if (!parent.children.includes(child)) {
|
||||
parent.children.push(child);
|
||||
}
|
||||
},
|
||||
|
||||
insertBefore(parent, child, before, _ctx) {
|
||||
const idx = parent.children.indexOf(before);
|
||||
if (idx === -1) {
|
||||
parent.children.push(child);
|
||||
} else {
|
||||
if (!parent.children.includes(child)) {
|
||||
parent.children.splice(idx, 0, child);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
removeChild(parent, child, ctx) {
|
||||
parent.children = parent.children.filter((c) => c.key !== child.key);
|
||||
|
||||
const siblings = ctx.siblingMap.get(parent.key);
|
||||
if (siblings) {
|
||||
const idx = siblings.indexOf(child.key);
|
||||
if (idx !== -1) {
|
||||
siblings.splice(idx, 1);
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
prepareUpdate() {
|
||||
return null;
|
||||
},
|
||||
|
||||
commitUpdate() {
|
||||
},
|
||||
|
||||
emit() {
|
||||
},
|
||||
|
||||
finalizeInstance(_instance, _ctx) {
|
||||
},
|
||||
};
|
||||
Reference in New Issue
Block a user