From e8219fa550db5d69d5768e72629a05dc50991e10 Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Sun, 28 Jun 2026 22:02:03 +0000 Subject: [PATCH] feat(call): replace CompositeOperationEnv with peer-keyed PeerCompositeEnv (call/peer-composite-env) --- crates/alknet-call/src/protocol/adapter.rs | 124 +++++++- crates/alknet-call/src/protocol/dispatch.rs | 21 +- crates/alknet-call/src/registry/env.rs | 305 +++++++++++++++----- 3 files changed, 370 insertions(+), 80 deletions(-) diff --git a/crates/alknet-call/src/protocol/adapter.rs b/crates/alknet-call/src/protocol/adapter.rs index d57c6c3..be9e4e3 100644 --- a/crates/alknet-call/src/protocol/adapter.rs +++ b/crates/alknet-call/src/protocol/adapter.rs @@ -158,7 +158,7 @@ mod tests { use crate::protocol::wire::{ CallError, EventEnvelope, EVENT_COMPLETED, EVENT_ERROR, EVENT_RESPONDED, }; - use crate::registry::context::{AbortPolicy, ScopedOperationEnv}; + use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv}; use crate::registry::env::OperationEnv; use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; @@ -512,6 +512,128 @@ mod tests { assert!(context.env.contains("fs/readFile")); } + #[tokio::test] + async fn compose_root_env_attaches_peer_when_connection_has_identity() { + let registry = registry_with( + "fs/readFile", + Visibility::External, + AccessControl::default(), + echo_handler(), + ); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let adapter = CallAdapter::new(registry, provider); + let mut conn = CallConnection::new(stub_connection()); + let imported = HandlerRegistration::new( + OperationSpec::new( + "worker/exec", + OperationType::Query, + Visibility::Internal, + serde_json::json!({}), + serde_json::json!({}), + vec![], + AccessControl::default(), + ), + echo_handler(), + OperationProvenance::FromCall, + None, + None, + Capabilities::new(), + ); + conn.register_imported(imported); + let peer_identity = Identity { + id: "worker-a".to_string(), + scopes: vec![], + resources: HashMap::new(), + }; + conn.connection() + .set_identity(peer_identity) + .expect("identity not yet set"); + + let context = adapter.build_root_context("req-5".to_string(), "fs/readFile", None, &conn); + + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let invoke_ctx = OperationContext { + request_id: "req-5".to_string(), + parent_request_id: None, + identity: None, + handler_identity: None, + capabilities: Capabilities::new(), + metadata: HashMap::new(), + scoped_env: scoped, + env: context.env.clone(), + abort_policy: AbortPolicy::default(), + deadline: context.deadline, + internal: false, + }; + let response = context + .env + .invoke("worker", "exec", serde_json::json!({"v": 1}), &invoke_ctx) + .await; + assert!( + response.result.is_ok(), + "peer overlay dispatches the imported op when identity is attached" + ); + assert_eq!(response.result.unwrap(), serde_json::json!({"v": 1})); + } + + #[tokio::test] + async fn compose_root_env_does_not_attach_peer_when_connection_has_no_identity() { + let registry = registry_with( + "fs/readFile", + Visibility::External, + AccessControl::default(), + echo_handler(), + ); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let adapter = CallAdapter::new(registry, provider); + let mut conn = CallConnection::new(stub_connection()); + let imported = HandlerRegistration::new( + OperationSpec::new( + "worker/exec", + OperationType::Query, + Visibility::Internal, + serde_json::json!({}), + serde_json::json!({}), + vec![], + AccessControl::default(), + ), + echo_handler(), + OperationProvenance::FromCall, + None, + None, + Capabilities::new(), + ); + conn.register_imported(imported); + + let context = adapter.build_root_context("req-6".to_string(), "fs/readFile", None, &conn); + + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let invoke_ctx = OperationContext { + request_id: "req-6".to_string(), + parent_request_id: None, + identity: None, + handler_identity: None, + capabilities: Capabilities::new(), + metadata: HashMap::new(), + scoped_env: scoped, + env: context.env.clone(), + abort_policy: AbortPolicy::default(), + deadline: context.deadline, + internal: false, + }; + let response = context + .env + .invoke("worker", "exec", serde_json::json!({}), &invoke_ctx) + .await; + match response.result { + Err(e) => assert_eq!( + e.code, "NOT_FOUND", + "no peer overlay attached: op falls through to base registry which has no worker/exec" + ), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + #[tokio::test] async fn dispatch_requested_round_trip_returns_responded() { let registry = registry_with( diff --git a/crates/alknet-call/src/protocol/dispatch.rs b/crates/alknet-call/src/protocol/dispatch.rs index ecea0cb..be65bf9 100644 --- a/crates/alknet-call/src/protocol/dispatch.rs +++ b/crates/alknet-call/src/protocol/dispatch.rs @@ -29,7 +29,7 @@ use super::wire::{ }; use crate::protocol::adapter::SessionOverlaySource; use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv}; -use crate::registry::env::{CompositeOperationEnv, LocalOperationEnv, OperationEnv}; +use crate::registry::env::{LocalOperationEnv, OperationEnv, PeerCompositeEnv}; use crate::registry::registration::OperationRegistry; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); @@ -107,12 +107,19 @@ impl Dispatcher { .session_source .as_ref() .and_then(|s| s.overlay_for(context)); - let connection_overlay = connection.overlay_env(); - Arc::new(CompositeOperationEnv::new( - base, - Some(connection_overlay), - session, - )) + + let mut env = PeerCompositeEnv::new(base); + if let Some(session) = session { + env = env.with_session(session); + } + if let Some(peer_id) = connection + .connection() + .identity() + .map(|identity| identity.id.clone()) + { + env.attach_peer(peer_id, connection.overlay_env()); + } + Arc::new(env) } pub(crate) fn build_root_context( diff --git a/crates/alknet-call/src/registry/env.rs b/crates/alknet-call/src/registry/env.rs index 54899f2..bbacc8a 100644 --- a/crates/alknet-call/src/registry/env.rs +++ b/crates/alknet-call/src/registry/env.rs @@ -7,6 +7,12 @@ use super::context::{generate_request_id, AbortPolicy, OperationContext, ScopedO use super::registration::OperationRegistry; use crate::protocol::wire::ResponseEnvelope; +/// Logical peer identifier (ADR-029 §1, ADR-030 §4). The payload is +/// `Identity.id` from `IdentityProvider` resolution (= `PeerEntry.peer_id`), +/// stable across key rotation — NOT a connection-assigned UUID and NOT the +/// peer's cryptographic material. +pub type PeerId = String; + #[async_trait::async_trait] pub trait OperationEnv: Send + Sync { async fn invoke( @@ -93,22 +99,51 @@ impl OperationEnv for LocalOperationEnv { } } -pub struct CompositeOperationEnv { - session: Option>, - connection: Option>, - base: Arc, +/// Per-call composite env (ADR-024 + ADR-029 §1). Built by the `Dispatcher` +/// in `compose_root_env` from the active layers. The child inherits this by +/// `Arc::clone` through `invoke()`. The Layer 2 connection overlay is +/// **peer-keyed** — a head node with N worker connections holds a +/// `HashMap`, not one overlay. The singular- +/// connection case (one peer) is the degenerate case with a single-entry map. +pub struct PeerCompositeEnv { + pub base: Arc, + pub session: Option>, + pub connections: HashMap>, + connection_order: Vec, } -impl CompositeOperationEnv { - pub fn new( - base: Arc, - connection: Option>, - session: Option>, - ) -> Self { +impl PeerCompositeEnv { + pub fn new(base: Arc) -> Self { Self { - session, - connection, base, + session: None, + connections: HashMap::new(), + connection_order: Vec::new(), + } + } + + pub fn with_session(mut self, session: Arc) -> Self { + self.session = Some(session); + self + } + + /// Attach a peer's connection overlay. The `peer_id` comes from + /// `connection.identity().id` (IdentityProvider resolution). A connection + /// with no resolved identity has no `PeerId` and is NOT attached + /// (ADR-030 §5) — its ops are invoked through the `CallConnection` handle + /// directly, not via peer-keyed composition. + pub fn attach_peer(&mut self, peer_id: PeerId, overlay: Arc) { + if !self.connections.contains_key(&peer_id) { + self.connection_order.push(peer_id.clone()); + } + self.connections.insert(peer_id, overlay); + } + + /// Detach a peer's overlay (on disconnect). The peer's sub-overlay drops; + /// in-flight `PeerRef::Specific(that_peer)` gets `NOT_FOUND`. + pub fn detach_peer(&mut self, peer_id: &PeerId) { + if self.connections.remove(peer_id).is_some() { + self.connection_order.retain(|p| p != peer_id); } } @@ -116,17 +151,21 @@ impl CompositeOperationEnv { &self.base } - pub fn connection(&self) -> &Option> { - &self.connection - } - pub fn session(&self) -> &Option> { &self.session } + + pub fn connections(&self) -> &HashMap> { + &self.connections + } + + pub fn connection_order(&self) -> &[PeerId] { + &self.connection_order + } } #[async_trait::async_trait] -impl OperationEnv for CompositeOperationEnv { +impl OperationEnv for PeerCompositeEnv { async fn invoke_with_policy( &self, namespace: &str, @@ -148,11 +187,13 @@ impl OperationEnv for CompositeOperationEnv { .await; } } - if let Some(connection) = &self.connection { - if connection.contains(&name) { - return connection - .invoke_with_policy(namespace, operation, input, parent, policy) - .await; + for peer_id in &self.connection_order { + if let Some(conn_env) = self.connections.get(peer_id) { + if conn_env.contains(&name) { + return conn_env + .invoke_with_policy(namespace, operation, input, parent, policy) + .await; + } } } self.base @@ -162,7 +203,7 @@ impl OperationEnv for CompositeOperationEnv { fn contains(&self, name: &str) -> bool { self.session.as_ref().is_some_and(|s| s.contains(name)) - || self.connection.as_ref().is_some_and(|c| c.contains(name)) + || self.connections.values().any(|c| c.contains(name)) || self.base.contains(name) } } @@ -395,14 +436,14 @@ mod tests { } #[tokio::test] - async fn composite_env_dispatches_to_session_overlay_when_contains() { + async fn peer_composite_env_routes_to_session_when_it_contains_op() { let base = Arc::new(NoopEnv { contains_op: true }); let session = Arc::new(ProbeEnv { name: "session".to_string(), contains_set: vec!["agent/chat".to_string()], dispatched: std::sync::Mutex::new(None), }); - let composite = CompositeOperationEnv::new(base, None, Some(session.clone())); + let composite = PeerCompositeEnv::new(base).with_session(session.clone()); let env: Arc = Arc::new(composite); let scoped = ScopedOperationEnv::new(["agent/chat"]); let ctx = root_context("root-6", None, None, scoped, env.clone()); @@ -417,7 +458,41 @@ mod tests { } #[tokio::test] - async fn composite_env_falls_through_to_base_when_no_overlay_contains() { + async fn peer_composite_env_routes_to_first_peer_in_insertion_order() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_b = Arc::new(ProbeEnv { + name: "worker-b".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + composite.attach_peer("worker-b".to_string(), worker_b.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let ctx = root_context("root-7", None, None, scoped, env.clone()); + let response = env + .invoke("worker", "exec", serde_json::json!({}), &ctx) + .await; + assert_eq!(response.result.unwrap(), Value::String("worker-a".into())); + assert_eq!( + worker_a.dispatched.lock().unwrap().as_deref(), + Some("worker/exec") + ); + assert!(worker_b.dispatched.lock().unwrap().is_none()); + } + + #[tokio::test] + async fn peer_composite_env_falls_through_to_base_when_no_overlay_contains() { let base = Arc::new(ProbeEnv { name: "base".to_string(), contains_set: vec!["fs/readFile".to_string()], @@ -433,10 +508,11 @@ mod tests { contains_set: vec!["worker/exec".to_string()], dispatched: std::sync::Mutex::new(None), }); - let composite = CompositeOperationEnv::new(base.clone(), Some(connection), Some(session)); + let mut composite = PeerCompositeEnv::new(base.clone()).with_session(session); + composite.attach_peer("worker-a".to_string(), connection); let env: Arc = Arc::new(composite); let scoped = ScopedOperationEnv::new(["fs/readFile"]); - let ctx = root_context("root-7", None, None, scoped, env.clone()); + let ctx = root_context("root-8", None, None, scoped, env.clone()); let response = env .invoke("fs", "readFile", serde_json::json!({}), &ctx) .await; @@ -448,12 +524,12 @@ mod tests { } #[tokio::test] - async fn composite_env_reachability_check_returns_not_found() { + async fn peer_composite_env_reachability_check_returns_not_found() { let base = Arc::new(NoopEnv { contains_op: true }); - let composite = CompositeOperationEnv::new(base, None, None); + let composite = PeerCompositeEnv::new(base); let env: Arc = Arc::new(composite); let scoped = ScopedOperationEnv::empty(); - let ctx = root_context("root-8", None, None, scoped, env.clone()); + let ctx = root_context("root-9", None, None, scoped, env.clone()); let response = env .invoke("agent", "chat", serde_json::json!({}), &ctx) .await; @@ -463,8 +539,8 @@ mod tests { } } - #[tokio::test] - async fn composite_env_contains_aggregates_layers() { + #[test] + fn peer_composite_env_contains_aggregates_layers() { let base = Arc::new(ProbeEnv { name: "base".to_string(), contains_set: vec!["fs/readFile".to_string()], @@ -480,19 +556,124 @@ mod tests { contains_set: vec!["worker/exec".to_string()], dispatched: std::sync::Mutex::new(None), }); - let composite = CompositeOperationEnv::new(base, Some(connection), Some(session)); + let mut composite = PeerCompositeEnv::new(base).with_session(session); + composite.attach_peer("worker-a".to_string(), connection); assert!(composite.contains("fs/readFile")); assert!(composite.contains("agent/chat")); assert!(composite.contains("worker/exec")); assert!(!composite.contains("unknown/op")); } + #[tokio::test] + async fn peer_composite_env_detach_peer_drops_overlay_and_returns_not_found() { + let base: Arc = + Arc::new(LocalOperationEnv::new(Arc::new(OperationRegistry::new()))); + let connection = Arc::new(ProbeEnv { + name: "connection".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), connection.clone()); + composite.detach_peer(&"worker-a".to_string()); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let ctx = root_context("root-10", None, None, scoped, env.clone()); + let response = env + .invoke("worker", "exec", serde_json::json!({}), &ctx) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND"), + other => panic!("expected NOT_FOUND after detach, got {other:?}"), + } + assert!(connection.dispatched.lock().unwrap().is_none()); + } + + #[tokio::test] + async fn peer_composite_env_detach_peer_then_reattach_routes_again() { + let base: Arc = + Arc::new(LocalOperationEnv::new(Arc::new(OperationRegistry::new()))); + let connection = Arc::new(ProbeEnv { + name: "connection".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), connection.clone()); + composite.detach_peer(&"worker-a".to_string()); + composite.attach_peer("worker-a".to_string(), connection.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let ctx = root_context("root-10b", None, None, scoped, env.clone()); + let response = env + .invoke("worker", "exec", serde_json::json!({}), &ctx) + .await; + assert_eq!(response.result.unwrap(), Value::String("connection".into())); + assert_eq!( + connection.dispatched.lock().unwrap().as_deref(), + Some("worker/exec") + ); + } + + #[test] + fn peer_composite_env_attach_peer_preserves_insertion_order_on_re_attach() { + let base: Arc = Arc::new(NoopEnv { contains_op: true }); + let overlay_a: Arc = + Arc::new(NoopEnv { contains_op: true }); + let overlay_b: Arc = + Arc::new(NoopEnv { contains_op: true }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), overlay_a); + composite.attach_peer("worker-b".to_string(), overlay_b); + assert_eq!(composite.connection_order(), &["worker-a", "worker-b"]); + let overlay_a2: Arc = + Arc::new(NoopEnv { contains_op: true }); + composite.attach_peer("worker-a".to_string(), overlay_a2); + assert_eq!( + composite.connection_order(), + &["worker-a", "worker-b"], + "re-attach keeps original position" + ); + } + + #[tokio::test] + async fn peer_composite_env_routes_to_connection_when_session_absent_or_missing() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec![], + dispatched: std::sync::Mutex::new(None), + }); + let connection = Arc::new(ProbeEnv { + name: "connection".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let session = Arc::new(ProbeEnv { + name: "session".to_string(), + contains_set: vec!["agent/chat".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base).with_session(session); + composite.attach_peer("worker-a".to_string(), connection.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let ctx = root_context("root-11", None, None, scoped, env.clone()); + let response = env + .invoke("worker", "exec", serde_json::json!({}), &ctx) + .await; + assert_eq!(response.result.unwrap(), Value::String("connection".into())); + assert_eq!( + connection.dispatched.lock().unwrap().as_deref(), + Some("worker/exec") + ); + } + #[tokio::test] async fn local_env_unknown_op_after_reachability_pass_returns_not_found() { let registry = Arc::new(OperationRegistry::new()); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); let scoped = ScopedOperationEnv::new(["fs/readFile"]); - let ctx = root_context("root-9", None, None, scoped, env.clone()); + let ctx = root_context("root-12", None, None, scoped, env.clone()); let response = env .invoke("fs", "readFile", serde_json::json!({}), &ctx) .await; @@ -514,7 +695,7 @@ mod tests { let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); let scoped = ScopedOperationEnv::new(["child/run"]); let deadline = Instant::now() + Duration::from_secs(5); - let mut ctx = root_context("root-10", None, None, scoped, env.clone()); + let mut ctx = root_context("root-13", None, None, scoped, env.clone()); ctx.deadline = Some(deadline); let response = env .invoke("child", "run", serde_json::json!({}), &ctx) @@ -550,49 +731,29 @@ mod tests { } #[test] - fn composite_env_accessors_return_refs() { + fn peer_composite_env_accessors_return_refs() { let base: Arc = Arc::new(NoopEnv { contains_op: true }); let session: Arc = Arc::new(NoopEnv { contains_op: true }); let connection: Arc = Arc::new(NoopEnv { contains_op: false }); - let composite = CompositeOperationEnv::new( - Arc::clone(&base), - Some(Arc::clone(&connection)), - Some(Arc::clone(&session)), - ); + let mut composite = + PeerCompositeEnv::new(Arc::clone(&base)).with_session(Arc::clone(&session)); + composite.attach_peer("worker-a".to_string(), Arc::clone(&connection)); assert!(Arc::ptr_eq(composite.base(), &base)); assert!(composite.session().is_some()); - assert!(composite.connection().is_some()); + assert!(composite.connections().get("worker-a").is_some()); + assert_eq!(composite.connection_order(), &["worker-a"]); } - #[tokio::test] - async fn composite_env_dispatches_to_connection_when_session_absent_or_missing() { - let base = Arc::new(ProbeEnv { - name: "base".to_string(), - contains_set: vec![], - dispatched: std::sync::Mutex::new(None), - }); - let connection = Arc::new(ProbeEnv { - name: "connection".to_string(), - contains_set: vec!["worker/exec".to_string()], - dispatched: std::sync::Mutex::new(None), - }); - let session = Arc::new(ProbeEnv { - name: "session".to_string(), - contains_set: vec!["agent/chat".to_string()], - dispatched: std::sync::Mutex::new(None), - }); - let composite = CompositeOperationEnv::new(base, Some(connection.clone()), Some(session)); - let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["worker/exec"]); - let ctx = root_context("root-11", None, None, scoped, env.clone()); - let response = env - .invoke("worker", "exec", serde_json::json!({}), &ctx) - .await; - assert_eq!(response.result.unwrap(), Value::String("connection".into())); - assert_eq!( - connection.dispatched.lock().unwrap().as_deref(), - Some("worker/exec") - ); + #[test] + fn peer_composite_env_singular_connection_is_degenerate_single_entry_map() { + let base: Arc = Arc::new(NoopEnv { contains_op: true }); + let connection: Arc = + Arc::new(NoopEnv { contains_op: true }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), connection); + assert_eq!(composite.connections().len(), 1); + assert_eq!(composite.connection_order().len(), 1); + assert!(composite.connections().contains_key("worker-a")); } }