diff --git a/crates/alknet-call/src/registry/env.rs b/crates/alknet-call/src/registry/env.rs index 59218e4..46e29ee 100644 --- a/crates/alknet-call/src/registry/env.rs +++ b/crates/alknet-call/src/registry/env.rs @@ -13,6 +13,19 @@ use crate::protocol::wire::ResponseEnvelope; /// peer's cryptographic material. pub type PeerId = String; +/// Peer-routing selector (ADR-029 §2). Selects a specific peer's sub-overlay +/// (`Specific`) or the first peer (insertion order) that serves the op +/// (`Any`). +/// +/// `PeerRef::Specific(PeerId)` routes to the named peer's overlay only — no +/// fallthrough (explicit routing must be honored or fail loudly, ADR-029 §2). +/// `PeerRef::Any` reuses `invoke_with_policy` (the insertion-order fan-out +/// built in `PeerCompositeEnv`). +pub enum PeerRef { + Specific(PeerId), + Any, +} + #[async_trait::async_trait] pub trait OperationEnv: Send + Sync { async fn invoke( @@ -50,6 +63,26 @@ pub trait OperationEnv: Send + Sync { fn peer_operations(&self, _peer: &PeerId) -> Vec { Vec::new() } + + /// Peer-routing composition (ADR-029 §2). Routes to a specific peer + /// (`PeerRef::Specific`) or to the first peer that serves the op + /// (`PeerRef::Any`). The default impl ignores the peer selector and + /// delegates to `invoke_with_policy`, preserving back-compat for + /// single-layer envs that don't route by peer. `PeerCompositeEnv` + /// overrides with real peer-keyed routing. + async fn invoke_peer( + &self, + peer: &PeerRef, + namespace: &str, + operation: &str, + input: Value, + parent: &OperationContext, + policy: AbortPolicy, + ) -> ResponseEnvelope { + let _ = peer; + self.invoke_with_policy(namespace, operation, input, parent, policy) + .await + } } pub struct LocalOperationEnv { @@ -219,6 +252,41 @@ impl OperationEnv for PeerCompositeEnv { || self.connections.values().any(|c| c.contains(name)) || self.base.contains(name) } + + async fn invoke_peer( + &self, + peer: &PeerRef, + namespace: &str, + operation: &str, + input: Value, + parent: &OperationContext, + policy: AbortPolicy, + ) -> ResponseEnvelope { + let name = format!("{namespace}/{operation}"); + if !parent.scoped_env.allows(&name) { + return ResponseEnvelope::not_found(parent.request_id.clone(), &name); + } + match peer { + PeerRef::Specific(peer_id) => match self.connections.get(peer_id) { + Some(conn_env) if conn_env.contains(&name) => { + conn_env + .invoke_with_policy(namespace, operation, input, parent, policy) + .await + } + _ => ResponseEnvelope::not_found(parent.request_id.clone(), &name), + }, + PeerRef::Any => { + self.invoke_with_policy(namespace, operation, input, parent, policy) + .await + } + } + } + + fn peer_contains(&self, peer: &PeerId, name: &str) -> bool { + self.connections + .get(peer) + .is_some_and(|c| c.contains(name)) + } } #[cfg(test)] @@ -825,4 +893,242 @@ mod tests { assert_eq!(composite.connection_order().len(), 1); assert!(composite.connections().contains_key("worker-a")); } + + #[tokio::test] + async fn invoke_peer_specific_routes_to_named_peer() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_b = Arc::new(ProbeEnv { + name: "worker-b".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + composite.attach_peer("worker-b".to_string(), worker_b.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let ctx = root_context("root-pr-1", None, None, scoped, env.clone()); + let response = env + .invoke_peer( + &PeerRef::Specific("worker-b".to_string()), + "worker", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + assert_eq!(response.result.unwrap(), Value::String("worker-b".into())); + assert_eq!( + worker_b.dispatched.lock().unwrap().as_deref(), + Some("worker/exec") + ); + assert!(worker_a.dispatched.lock().unwrap().is_none()); + } + + #[tokio::test] + async fn invoke_peer_specific_returns_not_found_when_peer_does_not_serve_op() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["other/op".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base.clone()); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let ctx = root_context("root-pr-2", None, None, scoped, env.clone()); + let response = env + .invoke_peer( + &PeerRef::Specific("worker-a".to_string()), + "worker", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND"), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + assert!(worker_a.dispatched.lock().unwrap().is_none()); + assert!( + base.dispatched.lock().unwrap().is_none(), + "no fallthrough to base" + ); + } + + #[tokio::test] + async fn invoke_peer_specific_returns_not_found_when_peer_unknown() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base.clone()); + composite.attach_peer( + "worker-a".to_string(), + Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }), + ); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let ctx = root_context("root-pr-3", None, None, scoped, env.clone()); + let response = env + .invoke_peer( + &PeerRef::Specific("ghost".to_string()), + "worker", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND"), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + assert!(base.dispatched.lock().unwrap().is_none()); + } + + #[tokio::test] + async fn invoke_peer_any_routes_to_first_peer_in_insertion_order() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_b = Arc::new(ProbeEnv { + name: "worker-b".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + composite.attach_peer("worker-b".to_string(), worker_b.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let ctx = root_context("root-pr-4", None, None, scoped, env.clone()); + let response = env + .invoke_peer( + &PeerRef::Any, + "worker", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + assert_eq!(response.result.unwrap(), Value::String("worker-a".into())); + assert_eq!( + worker_a.dispatched.lock().unwrap().as_deref(), + Some("worker/exec") + ); + assert!(worker_b.dispatched.lock().unwrap().is_none()); + } + + #[tokio::test] + async fn invoke_peer_reachability_check_gates_before_routing() { + let base = Arc::new(NoopEnv { contains_op: true }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::empty(); + let ctx = root_context("root-pr-5", None, None, scoped, env.clone()); + let response = env + .invoke_peer( + &PeerRef::Specific("worker-a".to_string()), + "worker", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND"), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + assert!(worker_a.dispatched.lock().unwrap().is_none()); + } + + #[test] + fn peer_contains_checks_specific_peer_overlay() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_b = Arc::new(ProbeEnv { + name: "worker-b".to_string(), + contains_set: vec!["other/op".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a); + composite.attach_peer("worker-b".to_string(), worker_b); + assert!(composite.peer_contains(&"worker-a".to_string(), "worker/exec")); + assert!(!composite.peer_contains(&"worker-b".to_string(), "worker/exec")); + assert!(!composite.peer_contains(&"ghost".to_string(), "worker/exec")); + } + + #[tokio::test] + async fn default_invoke_peer_delegates_to_invoke_with_policy() { + let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None); + let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); + let scoped = ScopedOperationEnv::new(["echo/run"]); + let ctx = root_context("root-pr-6", None, None, scoped, env.clone()); + let response = env + .invoke_peer( + &PeerRef::Specific("any-peer".to_string()), + "echo", + "run", + serde_json::json!({"hi": 1}), + &ctx, + AbortPolicy::default(), + ) + .await; + assert!(response.result.is_ok()); + assert_eq!(response.result.unwrap(), serde_json::json!({"hi": 1})); + } + + #[test] + fn default_peer_contains_delegates_to_contains() { + let registry = Arc::new(OperationRegistry::new()); + let env = LocalOperationEnv::new(registry); + assert!(env.peer_contains(&"any-peer".to_string(), "anything")); + } }