docs(architecture): resolve review #003 — type/API surface completeness
Review #003 found 11 critical, 14 warning, and 6 suggestion findings after reviews #001 (governance/security) and #002 (cross-document consistency/two-way-door audit) were resolved. The theme: types and APIs that were *referenced* but never *defined*, and stale ADR sketches that didn't match the now-updated spec docs. Critical fixes (11): - C1: DerivedKey #[derive(Deserialize)] contradicted the custom Deserialize that rejects "[REDACTED]" — dropped the derive, added explicit manual Serialize/Deserialize impls (protocol.md). - C2: encrypt prose said "derived at PATHS::ENCRYPTION" but the signature takes key_version — updated to encryption_path_for_version (service.md). - C3: derive_encryption_key returned DerivedKey, derive_encryption_key _for_version returned EncryptionKey (same cache) — unified on DerivedKey, defined CachedKey (service.md). - C4: tokio vs std::sync::RwLock contradiction — specified std::sync::RwLock, dropped tokio from vault deps (ADR-018, ADR-025, service.md). - C5: Missing drift rows in vault README — added #9 (key_version ignored) and #10 (rotate not implemented). - C6: ADR-022 build_root_context and invoke() sketches omitted abort_policy (9 fields vs 10) — added the field to both sketches. - C7: Capabilities type referenced 20+ times, never defined — added struct definition to core-types.md with Clone+Send+Sync, Zeroize, sealed builder API, immutability guard. - C8: SessionOverlaySource on CallAdapter but never defined, crate violation (alknet-call can't depend on alknet-agent) — defined the trait in alknet-call (call-protocol.md), matching the IdentityProvider pattern. - C9: CompositeOperationEnv dispatch fall-through was "a two-way door" — added contains() to OperationEnv trait, made the composite probe before dispatching, eliminating the sentinel ambiguity. - C10: No API for Layer 2 (connection overlay) registration, CallConnection undefined — defined CallConnection struct + register_imported() API (call-protocol.md). - C11: with_local signature diverged between two examples (4 args vs 5) — added capabilities as the 5th arg, made both examples consistent. Warning fixes (14): - W1: invoke_with_policy restructured as required method, invoke gets a default impl delegating to it — eliminates duplication across impls. - W2: CachedKey defined (service.md). - W3: EncryptionKey constructor/glue specified, added to re-export list. - W4: Secp256k1ExtendedPrivKey defined, derive_ethereum_key glue shown. - W5: encryption_path_for_version rejects version < 2 (v1 is TS PBKDF2). - W6: Wire payload schemas for all event types + ResponseEnvelope → EventEnvelope conversion table (call-protocol.md). - W7: Timeout section — deadline on OperationContext, composed calls inherit parent's deadline, CallAdapter::with_timeout(). - W8: Request ID generation spec — UUID v4 for composed calls, wire ID vs internal ID relationship for abort cascade. - W9: unlock_new already-unlocked behavior specified (returns AlreadyUnlocked). - W10: KeyType Serialize/Deserialize justification corrected (stale irpc reference removed). - W11: OperationProvenance and CompositionAuthority defined inline in operation-registry.md (were only in ADR-022). - W12: encrypt/decrypt free functions marked pub(crate), relationship to VaultServiceHandle methods stated. - W13: rotate signature removed from encryption.md (it's a VaultServiceHandle method, not a free function). - W14: CallAdapter::new() + with_session_source() + with_timeout() constructors shown. Suggestion fixes (6): Seed: Clone note, VaultServiceInner invariant, ExtendedPrivKey accessor signatures, CURRENT_KEY_VERSION location, ADR-018 stale actor text, derivation helpers re-export note.
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
---
|
||||
status: draft
|
||||
last_updated: 2026-06-22-22
|
||||
last_updated: 2026-06-23
|
||||
---
|
||||
|
||||
# Call Protocol
|
||||
@@ -39,13 +39,54 @@ pub struct CallAdapter {
|
||||
/// Layer 1 — optional session-overlay source (agent crate supplies this;
|
||||
/// None for non-agent deployments). See ADR-024, OQ-19.
|
||||
session_source: Option<Arc<dyn SessionOverlaySource + Send + Sync>>,
|
||||
/// Default timeout for wire calls (30s). Composed calls inherit the
|
||||
/// parent's remaining deadline via `OperationContext.deadline`.
|
||||
default_timeout: Duration,
|
||||
}
|
||||
|
||||
// The connection's imported-ops overlay (Layer 2) is built per CallConnection
|
||||
// as from_call discovery completes — it's not a field on CallAdapter but
|
||||
// rather state held by the CallConnection / dispatch context for incoming
|
||||
// calls on that connection. See ADR-024.
|
||||
```
|
||||
impl CallAdapter {
|
||||
/// Non-agent deployment: no session overlay, default timeout.
|
||||
pub fn new(
|
||||
registry: Arc<OperationRegistry>,
|
||||
identity_provider: Arc<dyn IdentityProvider>,
|
||||
) -> Self {
|
||||
Self { registry, identity_provider, session_source: None,
|
||||
default_timeout: Duration::from_secs(30) }
|
||||
}
|
||||
|
||||
/// Agent deployment: supply a session-overlay source. The agent crate
|
||||
/// implements `SessionOverlaySource`; alknet-call defines the trait.
|
||||
pub fn with_session_source(mut self, source: Arc<dyn SessionOverlaySource + Send + Sync>) -> Self {
|
||||
self.session_source = Some(source);
|
||||
self
|
||||
}
|
||||
|
||||
/// Override the default timeout.
|
||||
pub fn with_timeout(mut self, timeout: Duration) -> Self {
|
||||
self.default_timeout = timeout;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
/// Session overlay integration point (ADR-024). Defined in alknet-call
|
||||
/// because `CallAdapter` must name the type — alknet-call cannot depend on
|
||||
/// alknet-agent (agent depends on call, not reverse). The agent crate
|
||||
/// implements this trait; alknet-call defines it. This is the same pattern
|
||||
/// as `IdentityProvider` (ADR-004: core defines the trait, handlers impl it).
|
||||
///
|
||||
/// The session overlay is an `OperationEnv` impl that wraps the curated base
|
||||
/// (Layer 0). The `CallAdapter` composes it into the root
|
||||
/// `OperationContext.env` per incoming call when a session is active. The
|
||||
/// lookup mechanism (session ID in metadata, payload field, connection-bound
|
||||
/// session state) belongs to the agent crate — this trait is the integration
|
||||
/// point, not the lookup policy.
|
||||
pub trait SessionOverlaySource: Send + Sync {
|
||||
/// Returns the session overlay env for the given call, if a session is
|
||||
/// active. `None` means no session is active for this call — the root
|
||||
/// env is `curated base + connection overlay` (no session layer).
|
||||
/// The agent crate determines how to map a call to its session.
|
||||
fn overlay_for(&self, context: &OperationContext) -> Option<Arc<dyn OperationEnv + Send + Sync>>;
|
||||
}
|
||||
|
||||
The `CallAdapter` holds the static curated registry and an optional
|
||||
session-overlay source. Per-connection imported-ops overlays (Layer 2,
|
||||
@@ -53,6 +94,68 @@ ADR-024) are held with the connection and composed into the root
|
||||
`OperationContext.env` per incoming call. See ADR-024 for the layering
|
||||
model and `compose_root_env` below.
|
||||
|
||||
### CallConnection
|
||||
|
||||
A `CallConnection` represents an established `alknet/call` connection,
|
||||
regardless of which side opened it (ADR-017). It holds the connection's
|
||||
imported-ops overlay (Layer 2, ADR-024) — the set of `from_call`-imported
|
||||
operations discovered when the connection was established.
|
||||
|
||||
```rust
|
||||
/// An established alknet/call connection (either direction — accepted or
|
||||
/// opened). Holds the connection's Layer 2 overlay (imported ops).
|
||||
pub struct CallConnection {
|
||||
/// The underlying QUIC connection (from endpoint.accept or CallClient.connect).
|
||||
connection: Connection,
|
||||
/// Layer 2 — this connection's imported-ops overlay. Populated by
|
||||
/// `from_call` discovery when the connection is established. Each
|
||||
/// imported op is a `HandlerRegistration` with `provenance: FromCall`.
|
||||
/// This overlay is an `OperationEnv` impl that the `CallAdapter`
|
||||
/// composes into the root `OperationContext.env` per incoming call.
|
||||
imported_operations: Arc<RwLock<HashMap<String, HandlerRegistration>>>,
|
||||
}
|
||||
|
||||
impl CallConnection {
|
||||
/// Register an imported operation into this connection's overlay
|
||||
/// (Layer 2, ADR-024). Called by `from_call` after discovery.
|
||||
pub fn register_imported(&self, registration: HandlerRegistration) {
|
||||
let name = registration.spec.name.clone();
|
||||
self.imported_operations.write().insert(name, registration);
|
||||
}
|
||||
|
||||
/// Register multiple imported operations (bulk variant for `from_call`).
|
||||
pub fn register_imported_all(&self, registrations: Vec<HandlerRegistration>) {
|
||||
let mut overlay = self.imported_operations.write();
|
||||
for reg in registrations {
|
||||
overlay.insert(reg.spec.name.clone(), reg);
|
||||
}
|
||||
}
|
||||
|
||||
/// Build an `OperationEnv` impl for this connection's overlay. Used by
|
||||
/// the `CallAdapter` when composing the root `OperationContext.env`.
|
||||
/// Returns an `OperationEnv` that dispatches to this connection's
|
||||
/// imported ops (and reports `contains` only for ops in the overlay).
|
||||
pub fn overlay_env(&self) -> Arc<dyn OperationEnv + Send + Sync>;
|
||||
|
||||
/// Call an operation on the remote peer (sends `call.requested`).
|
||||
pub async fn call(&self, operation_id: &str, input: Value) -> ResponseEnvelope;
|
||||
|
||||
/// Subscribe to a streaming operation on the remote peer.
|
||||
pub async fn subscribe(&self, operation_id: &str, input: Value) -> impl Stream<Item = ResponseEnvelope>;
|
||||
|
||||
/// Abort an in-flight request (sends `call.aborted`, cascades per ADR-016).
|
||||
pub async fn abort(&self, request_id: &str);
|
||||
}
|
||||
```
|
||||
|
||||
**Layer 0 vs Layer 2 registration API** (ADR-024): `OperationRegistryBuilder`
|
||||
builds Layer 0 (curated, immutable after startup) via `.with_local()` /
|
||||
`.with_leaf()` / `.with()`. Layer 2 (per-connection) registration uses
|
||||
`CallConnection::register_imported()` at runtime — the builder is
|
||||
Layer-0-only; runtime overlay registration uses `CallConnection` methods.
|
||||
When the connection drops, the overlay (and all imported ops) is dropped —
|
||||
no explicit deregistration needed.
|
||||
|
||||
The adapter:
|
||||
1. Accepts bidirectional streams on the connection
|
||||
2. Reads length-prefixed JSON `EventEnvelope` frames from each stream
|
||||
@@ -162,6 +265,29 @@ Fields:
|
||||
|
||||
New error codes may be added in future versions. Clients should treat unknown error codes as `INTERNAL` with `retryable: false`.
|
||||
|
||||
### Wire Payload Schemas
|
||||
|
||||
The `payload` field of `EventEnvelope` has a different shape per event type:
|
||||
|
||||
| Event | `payload` shape |
|
||||
|-------|----------------|
|
||||
| `call.requested` | `{ "operationId": "/fs/readFile", "input": {...}, "auth_token": "alk_..." (optional) }` |
|
||||
| `call.responded` | `{ "output": <Value> }` — the operation's output, matching `output_schema` |
|
||||
| `call.completed` | `{}` — empty object (subscription stream end signal) |
|
||||
| `call.aborted` | `{}` — empty object (cancellation signal; the `id` identifies which request) |
|
||||
| `call.error` | `{ "code": "...", "message": "...", "retryable": bool, "details": {...} (optional) }` |
|
||||
|
||||
### `ResponseEnvelope` → `EventEnvelope` Conversion
|
||||
|
||||
Local dispatch produces `ResponseEnvelope { request_id, result: Result<Value, CallError> }`. The `CallAdapter` converts it to `EventEnvelope` for the wire:
|
||||
|
||||
| `ResponseEnvelope` | `EventEnvelope` |
|
||||
|--------------------|-----------------|
|
||||
| `Ok(value)` | `{ type: "call.responded", id: request_id, payload: { output: value } }` |
|
||||
| `Err(call_error)` | `{ type: "call.error", id: request_id, payload: <serialized CallError> }` |
|
||||
|
||||
The `request_id` becomes the `id` field. For subscriptions, each `call.responded` is a separate `EventEnvelope` with the same `id`; `call.completed` is `{ type: "call.completed", id, payload: {} }`.
|
||||
|
||||
### Protocol Operations
|
||||
|
||||
The call protocol defines four top-level operations, expressed through event types and operation names:
|
||||
@@ -304,6 +430,7 @@ fn build_root_context(
|
||||
handler_identity: registration.composition_authority.clone(),
|
||||
capabilities: registration.capabilities.clone(), // from the registration bundle
|
||||
metadata: HashMap::new(), // fresh per request
|
||||
deadline: Some(Instant::now() + self.default_timeout), // root deadline (W7)
|
||||
scoped_env: registration.scoped_env.clone()
|
||||
.unwrap_or_else(ScopedOperationEnv::empty), // from the bundle, empty for leaves
|
||||
// Per-call env composition (ADR-024): the root env is a composite
|
||||
@@ -349,7 +476,17 @@ Local dispatch produces `ResponseEnvelope` with no serialization overhead. The `
|
||||
|
||||
**Stream reset**: When a QUIC stream is reset mid-operation, the `FrameFramedReader` returns an error. If the stream was carrying a subscription, the `PendingRequestMap` entry is removed and the mpsc channel is closed. If the stream was carrying a call, the oneshot is resolved with an error. No `call.aborted` is sent — the stream is gone.
|
||||
|
||||
**Timeouts**: Default timeout for calls is 30 seconds. Default timeout for subscriptions is optional (the client can specify a timeout in the `call.requested` payload, or leave it open-ended). The `PendingRequestMap` sweeper runs every 10 seconds and removes expired entries. Timeouts are configurable at the `CallAdapter` level, not per-operation.
|
||||
**Timeouts**: Default timeout for wire calls is 30 seconds, configurable via
|
||||
`CallAdapter::with_timeout()`. The `build_root_context` sets
|
||||
`OperationContext.deadline` to `now + default_timeout`. Composed calls
|
||||
inherit the parent's deadline (children do **not** get a fresh 30s — the
|
||||
root call's deadline bounds the entire call tree, preventing a depth-5
|
||||
composition from running 150s). A composed call that exceeds the deadline
|
||||
is cancelled (future dropped, `Drop` guards release resources) and returns
|
||||
`CallError { code: "TIMEOUT", retryable: true }`. Subscriptions default to
|
||||
no deadline (`deadline: None` — unbounded); the client can specify a
|
||||
timeout in the `call.requested` payload. The `PendingRequestMap` sweeper
|
||||
runs every 10 seconds and removes expired wire entries.
|
||||
|
||||
**Error handling in `CallAdapter::handle()`**: If a handler panics, the stream is closed and the `PendingRequestMap` entry (if any) is cleaned up by the next sweeper pass. Other streams and the connection are unaffected.
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
---
|
||||
status: draft
|
||||
last_updated: 2026-06-22-22
|
||||
last_updated: 2026-06-23
|
||||
---
|
||||
|
||||
# Operation Registry
|
||||
@@ -136,6 +136,14 @@ pub struct OperationContext {
|
||||
/// composing handler via `OperationEnv::invoke()` (or
|
||||
/// `invoke_with_policy()`), not by the wire caller.
|
||||
pub abort_policy: AbortPolicy,
|
||||
/// Deadline for this call and all descendants. Set by `build_root_context`
|
||||
/// to `now + CallAdapter.default_timeout` (default 30s). Composed calls
|
||||
/// inherit the parent's deadline (children do not get a fresh 30s — the
|
||||
/// root call's deadline bounds the entire call tree). A composed call
|
||||
/// that exceeds the deadline is cancelled (future dropped, `Drop` guards
|
||||
/// release resources). `None` means no deadline (unbounded — used for
|
||||
/// long-running subscriptions). See call-protocol.md → Timeouts.
|
||||
pub deadline: Option<Instant>,
|
||||
/// Composition-origin flag. Set by `OperationEnv::invoke()` (true) or the
|
||||
/// `CallAdapter` dispatch path (false) — never by handlers. Module-private
|
||||
/// for writes; read via `is_internal()`. See ADR-015.
|
||||
@@ -191,6 +199,27 @@ The registry maps operation names to `HandlerRegistration` bundles. The curated
|
||||
- `invoke(name, input, context)`: Look up, check ACL, invoke handler, return result
|
||||
- `list_operations()`: Return all registered specs (for `/services/list` — returns curated + active overlay ops)
|
||||
|
||||
### Request ID Generation
|
||||
|
||||
Request IDs correlate `call.requested`/`call.responded` events and index the
|
||||
abort-cascade tree (`PendingRequestMap` is keyed by request ID, ADR-016).
|
||||
|
||||
- **Wire calls**: the root `OperationContext.request_id` is the `id` field
|
||||
from the wire `call.requested` event (generated by the client).
|
||||
- **Composed calls**: `OperationEnv::invoke()` generates a new `request_id`
|
||||
for each child via `generate_request_id()` — a UUID v4 (or
|
||||
`parent_id + "-" + counter`). Deterministic IDs (e.g.
|
||||
`format!("env-{name}")`) **must not** be used — they collide across
|
||||
concurrent invocations of the same operation, corrupting
|
||||
`PendingRequestMap` correlation and the abort-cascade tree.
|
||||
- **Wire visibility**: composed child `request_id`s are **internal** — they
|
||||
appear in `PendingRequestMap` for abort-cascade indexing but are not sent
|
||||
as `call.requested` to any peer. The client only sees `call.aborted` for
|
||||
the root ID it sent; the server cascades internally to descendants. The
|
||||
exception is `from_call` ops, which generate their own wire ID when
|
||||
forwarding to the remote node (the remote node's `PendingRequestMap`
|
||||
indexes it).
|
||||
|
||||
### HandlerRegistration
|
||||
|
||||
The registration bundle carries everything the dispatch path needs to construct an `OperationContext`. See ADR-022 for the full rationale.
|
||||
@@ -206,25 +235,74 @@ pub struct HandlerRegistration {
|
||||
}
|
||||
```
|
||||
|
||||
- `provenance`: Where the op came from (`Local`, `FromOpenAPI`, `FromMCP`, `FromCall`, `FromJsonSchema`, `Session`). Determines composition capability, default visibility, and trust model. Only `Local` and `Session` ops can compose; leaves get `composition_authority: None` and `scoped_env: None`.
|
||||
- `composition_authority`: The declared authority (label + scopes + resources) the handler operates under when composing children. `None` for leaves. This replaces ADR-015's `handler_identity: Identity` — it's not a peer identity, it's a declared authority bundle. See ADR-022.
|
||||
#### OperationProvenance
|
||||
|
||||
Where the op came from. Determines composition capability, default
|
||||
visibility, and trust model. See ADR-022 for rationale.
|
||||
|
||||
```rust
|
||||
pub enum OperationProvenance {
|
||||
Local, // Assembly-written, trusted, can compose
|
||||
FromOpenAPI, // HTTP forwarding stub (from_openapi), leaf
|
||||
FromMCP, // MCP forwarding stub (from_mcp), leaf
|
||||
FromCall, // QUIC forwarding stub (from_call), leaf locally
|
||||
FromJsonSchema, // JSON Schema definition, no handler — schema only
|
||||
Session, // Agent-written, sandboxed, can compose within sandbox
|
||||
}
|
||||
```
|
||||
|
||||
| Provenance | Can compose? | Has composition authority? | Default visibility |
|
||||
|-----------|-------------|---------------------------|-------------------|
|
||||
| `Local` | Yes | Yes — scopes set by assembly layer | External or Internal (assembly declares) |
|
||||
| `FromOpenAPI` | No (leaf) | No | Internal |
|
||||
| `FromMCP` | No (leaf) | No | Internal |
|
||||
| `FromCall` | No (leaf in local registry) | No | Internal |
|
||||
| `FromJsonSchema` | N/A (no handler) | No | N/A |
|
||||
| `Session` | Yes (within sandbox) | Yes — scopes set at sandbox creation | Internal always |
|
||||
|
||||
#### CompositionAuthority
|
||||
|
||||
The declared authority (label + scopes + resources) the handler operates
|
||||
under when composing children. `None` for leaves. This replaces ADR-015's
|
||||
`handler_identity: Identity` — it's not a peer identity, it's a declared
|
||||
authority bundle. See ADR-022.
|
||||
|
||||
```rust
|
||||
pub struct CompositionAuthority {
|
||||
pub label: String, // e.g., "agent-chat" — not a peer id
|
||||
pub scopes: Vec<String>, // e.g., ["llm:call", "fs:read"]
|
||||
pub resources: HashMap<String, Vec<String>>, // e.g., {"service": ["vastai"]}
|
||||
}
|
||||
|
||||
impl CompositionAuthority {
|
||||
pub fn none() -> Option<Self> { None } // Convenience for leaves
|
||||
pub fn new(label: &str, scopes: impl IntoIterator<Item = String>) -> Self { ... }
|
||||
pub fn as_identity(&self) -> Option<Identity> { ... } // Synthetic Identity for ACL
|
||||
}
|
||||
```
|
||||
|
||||
- `provenance`: Determines composition capability. Only `Local` and `Session` ops can compose; leaves get `composition_authority: None` and `scoped_env: None`.
|
||||
- `composition_authority`: The declared authority the handler operates under when composing children. `None` for leaves. See ADR-022.
|
||||
- `scoped_env`: The set of operations this handler may reach via `env.invoke()`. `None` for leaves (empty env). The reachability control from ADR-015.
|
||||
- `capabilities`: Outbound credentials (decrypted API keys, signing keys). Populated by the assembly layer from the vault at registration time. See [Capability Injection](#capability-injection).
|
||||
|
||||
The `OperationRegistryBuilder` provides a fluent API with convenience methods for common cases:
|
||||
|
||||
```rust
|
||||
// with_local: Local provenance, full bundle — all 5 args required.
|
||||
// with_local(spec, handler, composition_authority, scoped_env, capabilities)
|
||||
let registry = OperationRegistryBuilder::new()
|
||||
// Built-in service discovery (Local, no composition)
|
||||
// Built-in service discovery (Local, no composition — empty authority, empty env, empty caps)
|
||||
.with_local(services_list_spec(), Arc::new(services_list_handler),
|
||||
CompositionAuthority::none(), ScopedOperationEnv::empty())
|
||||
CompositionAuthority::none(), ScopedOperationEnv::empty(), Capabilities::new())
|
||||
.with_local(services_schema_spec(), Arc::new(schema_handler),
|
||||
CompositionAuthority::none(), ScopedOperationEnv::empty())
|
||||
// Agent handler (Local, composes — has authority + scoped env)
|
||||
CompositionAuthority::none(), ScopedOperationEnv::empty(), Capabilities::new())
|
||||
// Agent handler (Local, composes — authority + scoped env + capabilities)
|
||||
.with_local(agent_chat_spec(), Arc::new(agent_chat_handler),
|
||||
CompositionAuthority::new("agent-chat", ["llm:call", "fs:read", "vastai:query"]),
|
||||
ScopedOperationEnv::new(["fs/readFile", "vastai/listMachines", "llm/generate"]))
|
||||
// Imported ops (leaves — no authority, no scoped env)
|
||||
ScopedOperationEnv::new(["fs/readFile", "vastai/listMachines", "llm/generate"]),
|
||||
Capabilities::new().with_api_key("google", google_api_key))
|
||||
// Imported ops (leaves — no authority, no scoped env; capabilities for outbound HTTP)
|
||||
.with_leaf(vastai_listMachines_spec(), Arc::new(vastai_handler), vastai_credentials)
|
||||
.build();
|
||||
```
|
||||
@@ -249,19 +327,25 @@ pub trait OperationEnv: Send + Sync {
|
||||
/// Compose a child operation. The child's `OperationContext` is
|
||||
/// constructed with `internal: true`, inheriting the parent's
|
||||
/// composition authority as the child's caller identity. The abort
|
||||
/// policy defaults to the parent's (ADR-016 Decision 6).
|
||||
/// policy defaults to the parent's (ADR-016 Decision 6, W19).
|
||||
///
|
||||
/// Default impl: delegates to `invoke_with_policy` with
|
||||
/// `parent.abort_policy.clone()`. Impls only need to implement
|
||||
/// `invoke_with_policy` — `invoke` is provided.
|
||||
async fn invoke(
|
||||
&self,
|
||||
namespace: &str,
|
||||
operation: &str,
|
||||
input: Value,
|
||||
parent: &OperationContext,
|
||||
) -> ResponseEnvelope;
|
||||
) -> ResponseEnvelope {
|
||||
self.invoke_with_policy(namespace, operation, input, parent, parent.abort_policy.clone()).await
|
||||
}
|
||||
|
||||
/// Compose a child with an explicit abort policy (ADR-016 Decision 6).
|
||||
/// Use `AbortPolicy::ContinueRunning` for long-running work that
|
||||
/// should survive a parent's abort. The default `invoke()` inherits
|
||||
/// the parent's policy; this method overrides it for this child.
|
||||
/// should survive a parent's abort. This is the required method —
|
||||
/// `invoke()` delegates to it with the parent's policy.
|
||||
async fn invoke_with_policy(
|
||||
&self,
|
||||
namespace: &str,
|
||||
@@ -270,6 +354,14 @@ pub trait OperationEnv: Send + Sync {
|
||||
parent: &OperationContext,
|
||||
policy: AbortPolicy,
|
||||
) -> ResponseEnvelope;
|
||||
|
||||
/// Does this env contain the named operation? Used by
|
||||
/// `CompositeOperationEnv` to probe overlays before dispatching
|
||||
/// (ADR-024). The composite checks `session.contains()` →
|
||||
/// `connection.contains()` → base, dispatching to the first overlay
|
||||
/// that contains the op. Default impl returns `true` (a single-layer
|
||||
/// env like `LocalOperationEnv` contains everything it can dispatch).
|
||||
fn contains(&self, name: &str) -> bool { true }
|
||||
}
|
||||
```
|
||||
|
||||
@@ -292,7 +384,10 @@ pub struct LocalOperationEnv {
|
||||
|
||||
#[async_trait]
|
||||
impl OperationEnv for LocalOperationEnv {
|
||||
async fn invoke(&self, namespace: &str, operation: &str, input: Value, parent: &OperationContext) -> ResponseEnvelope {
|
||||
// `invoke` uses the default impl (delegates to `invoke_with_policy`
|
||||
// with `parent.abort_policy.clone()`).
|
||||
|
||||
async fn invoke_with_policy(&self, namespace: &str, operation: &str, input: Value, parent: &OperationContext, policy: AbortPolicy) -> ResponseEnvelope {
|
||||
let name = format!("{namespace}/{operation}");
|
||||
|
||||
// Reachability check (ADR-015, ADR-022): is this op in the parent's
|
||||
@@ -307,7 +402,7 @@ impl OperationEnv for LocalOperationEnv {
|
||||
|
||||
let registration = self.registry.registration(&name);
|
||||
let context = OperationContext {
|
||||
// Unique per invocation — a counter, UUID, or parent_id + suffix.
|
||||
// Unique per invocation — a UUID v4 or parent_id + counter.
|
||||
// A deterministic ID (e.g. format!("env-{name}")) collides across
|
||||
// concurrent invocations of the same operation, which corrupts
|
||||
// PendingRequestMap correlation and the abort-cascade tree
|
||||
@@ -324,21 +419,21 @@ impl OperationEnv for LocalOperationEnv {
|
||||
handler_identity: registration.composition_authority.clone(),
|
||||
capabilities: parent.capabilities.clone(), // Inherit caller's capabilities
|
||||
metadata: HashMap::new(), // Fresh — does NOT propagate parent metadata (ADR-014)
|
||||
abort_policy: policy, // Explicit policy (from invoke() default or invoke_with_policy)
|
||||
deadline: parent.deadline, // Inherit parent's deadline (children don't get a fresh 30s)
|
||||
scoped_env: registration.scoped_env.clone()
|
||||
.unwrap_or_else(ScopedOperationEnv::empty), // Child's own scoped env (empty for leaves)
|
||||
// Dispatch trait: the child inherits the parent's env (the same
|
||||
// composite of curated base + active overlays). See ADR-024.
|
||||
env: parent.env.clone(),
|
||||
// Abort policy: inherit the parent's policy by default (ADR-016).
|
||||
// The parent handler can override via `invoke_with_policy()`.
|
||||
abort_policy: parent.abort_policy.clone(),
|
||||
internal: true, // Nested calls use handler authority
|
||||
};
|
||||
self.registry.invoke(&name, input, context).await
|
||||
}
|
||||
|
||||
// invoke_with_policy() delegates to invoke() with the policy set on the
|
||||
// child context (ADR-016 Decision 6). See the trait definition above.
|
||||
// `contains` uses the default impl (returns true — the curated registry
|
||||
// contains everything it can dispatch). For a single-layer env, the
|
||||
// reachability check in `invoke_with_policy` is the real gate.
|
||||
}
|
||||
```
|
||||
|
||||
@@ -357,34 +452,48 @@ pub struct CompositeOperationEnv {
|
||||
|
||||
#[async_trait]
|
||||
impl OperationEnv for CompositeOperationEnv {
|
||||
async fn invoke(&self, namespace: &str, operation: &str, input: Value, parent: &OperationContext) -> ResponseEnvelope {
|
||||
// `invoke` uses the default impl (delegates to `invoke_with_policy`
|
||||
// with `parent.abort_policy.clone()`).
|
||||
|
||||
async fn invoke_with_policy(&self, namespace: &str, operation: &str, input: Value, parent: &OperationContext, policy: AbortPolicy) -> ResponseEnvelope {
|
||||
let name = format!("{namespace}/{operation}");
|
||||
// Reachability check against parent.scoped_env (same as LocalOperationEnv).
|
||||
if !parent.scoped_env.allows(&name) {
|
||||
return ResponseEnvelope::not_found(name);
|
||||
}
|
||||
// Dispatch in overlay order: session → connection → curated base.
|
||||
// First match wins. Each overlay is an OperationEnv impl that knows
|
||||
// its own registry; the composite routes to the right one.
|
||||
// First overlay that *contains* the op wins. `contains()` (ADR-024)
|
||||
// is the probe — it avoids the sentinel-return ambiguity and ensures
|
||||
// cross-impl interop: any OperationEnv impl that correctly reports
|
||||
// `contains` works with this composite.
|
||||
if let Some(session) = &self.session {
|
||||
// session impl checks its own registry; if not found, falls
|
||||
// through (returns a sentinel or the composite continues).
|
||||
// Implementation detail: the session impl's `invoke` either
|
||||
// dispatches or returns a "not in this overlay" signal.
|
||||
if session.contains(&name) {
|
||||
return session.invoke_with_policy(namespace, operation, input, parent, policy).await;
|
||||
}
|
||||
}
|
||||
if let Some(connection) = &self.connection {
|
||||
// same pattern
|
||||
if connection.contains(&name) {
|
||||
return connection.invoke_with_policy(namespace, operation, input, parent, policy).await;
|
||||
}
|
||||
}
|
||||
self.base.invoke(namespace, operation, input, parent).await
|
||||
self.base.invoke_with_policy(namespace, operation, input, parent, policy).await
|
||||
}
|
||||
|
||||
fn contains(&self, name: &str) -> bool {
|
||||
// The composite contains the op if any layer does.
|
||||
self.session.as_ref().map_or(false, |s| s.contains(name))
|
||||
|| self.connection.as_ref().map_or(false, |c| c.contains(name))
|
||||
|| self.base.contains(name)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
The exact "first match wins" mechanism (sentinel return, a separate
|
||||
`contains` check, or a try/else pattern) is a two-way door for
|
||||
implementation — the structural decision (composite trait object, overlay
|
||||
order, `Arc::clone` inheritance) is what ADR-024 locks.
|
||||
```
|
||||
The `contains()` method (review #003 C9) is the overlay-dispatch contract.
|
||||
It replaces the previous "sentinel or contains check — two-way door" framing,
|
||||
which was ambiguous enough to produce non-interoperable `OperationEnv` impls.
|
||||
The structural decision (composite trait object, overlay order, `Arc::clone`
|
||||
inheritance) is locked by ADR-024; the dispatch contract (`contains` probe
|
||||
before `invoke_with_policy`) is now locked too.
|
||||
|
||||
Two things happen in `invoke()`:
|
||||
|
||||
@@ -456,12 +565,12 @@ let vastai_credentials = Capabilities::new().with_http_token("vastai", vastai_to
|
||||
|
||||
// Register operations — vault operations are NOT registered here
|
||||
let registry = OperationRegistryBuilder::new()
|
||||
// Built-in service discovery (Local, no composition)
|
||||
// Built-in service discovery (Local, no composition — empty caps)
|
||||
.with_local(services_list_spec(), Arc::new(services_list_handler),
|
||||
CompositionAuthority::none(), ScopedOperationEnv::empty())
|
||||
CompositionAuthority::none(), ScopedOperationEnv::empty(), Capabilities::new())
|
||||
.with_local(services_schema_spec(), Arc::new(schema_handler),
|
||||
CompositionAuthority::none(), ScopedOperationEnv::empty())
|
||||
// Agent handler (Local, composes — has authority + scoped env + capabilities)
|
||||
CompositionAuthority::none(), ScopedOperationEnv::empty(), Capabilities::new())
|
||||
// Agent handler (Local, composes — full bundle via .with())
|
||||
.with(HandlerRegistration {
|
||||
spec: agent_chat_spec(),
|
||||
handler: Arc::new(agent_chat_handler),
|
||||
@@ -478,6 +587,7 @@ let registry = OperationRegistryBuilder::new()
|
||||
.build();
|
||||
|
||||
let call_adapter = CallAdapter::new(Arc::new(registry), identity_provider);
|
||||
// Agent deployment: let call_adapter = CallAdapter::new(...).with_session_source(source);
|
||||
```
|
||||
|
||||
The vault is used at construction time to populate `capabilities` in the registration bundle, not registered as call protocol operations. The curated layer (Layer 0) is immutable after construction — adding a `Local` op requires restarting the process. Session and imported overlays are dynamic at their respective scopes (ADR-024). This is consistent with OQ-04 (scoped to the `HandlerRegistry` by ADR-024), ADR-008, ADR-014, and ADR-022.
|
||||
|
||||
Reference in New Issue
Block a user