diff --git a/crates/alknet-call/src/client/from_call.rs b/crates/alknet-call/src/client/from_call.rs index 6df137d..5d4fcb1 100644 --- a/crates/alknet-call/src/client/from_call.rs +++ b/crates/alknet-call/src/client/from_call.rs @@ -581,7 +581,7 @@ mod tests { } fn test_context(identity: Option) -> OperationContext { - use crate::registry::context::{AbortPolicy, ScopedOperationEnv}; + use crate::registry::context::{AbortPolicy, ScopedPeerEnv}; use std::collections::HashMap; use std::time::{Duration, Instant}; OperationContext { @@ -592,7 +592,7 @@ mod tests { forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), - scoped_env: ScopedOperationEnv::empty(), + scoped_env: ScopedPeerEnv::empty(), env: Arc::new(NoopEnv), abort_policy: AbortPolicy::default(), deadline: Some(Instant::now() + Duration::from_secs(30)), diff --git a/crates/alknet-call/src/client/from_jsonschema.rs b/crates/alknet-call/src/client/from_jsonschema.rs index d77e0cd..90eb86a 100644 --- a/crates/alknet-call/src/client/from_jsonschema.rs +++ b/crates/alknet-call/src/client/from_jsonschema.rs @@ -73,7 +73,7 @@ impl OperationAdapter for FromJsonSchema { mod tests { use super::*; use crate::client::from_jsonschema as from_jsonschema_fn; - use crate::registry::context::{AbortPolicy, ScopedOperationEnv}; + use crate::registry::context::{AbortPolicy, ScopedPeerEnv}; use crate::registry::env::OperationEnv; use crate::registry::spec::{AccessControl, OperationType, Visibility}; use std::collections::HashMap; @@ -117,7 +117,7 @@ mod tests { forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), - scoped_env: ScopedOperationEnv::empty(), + scoped_env: ScopedPeerEnv::empty(), env: Arc::new(NoopEnv), abort_policy: AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), diff --git a/crates/alknet-call/src/protocol/adapter.rs b/crates/alknet-call/src/protocol/adapter.rs index c77d4ff..9f0a12c 100644 --- a/crates/alknet-call/src/protocol/adapter.rs +++ b/crates/alknet-call/src/protocol/adapter.rs @@ -164,7 +164,7 @@ mod tests { use crate::protocol::wire::{ CallError, EventEnvelope, EVENT_COMPLETED, EVENT_ERROR, EVENT_RESPONDED, }; - use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv}; + use crate::registry::context::{AbortPolicy, OperationContext, ScopedPeerEnv}; use crate::registry::env::OperationEnv; use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; @@ -419,7 +419,7 @@ mod tests { #[tokio::test] async fn build_root_context_carries_capabilities_and_scoped_env() { let mut registry = OperationRegistry::new(); - let scoped = ScopedOperationEnv::new(["fs/readFile"]); + let scoped = ScopedPeerEnv::new(["fs/readFile"]); let caps = Capabilities::new().with_api_key("google", "k".to_string()); registry.register(HandlerRegistration::new( external_spec("agent/run", AccessControl::default()), @@ -562,7 +562,7 @@ mod tests { let context = adapter.build_root_context("req-5".to_string(), "fs/readFile", None, None, &conn); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let invoke_ctx = OperationContext { request_id: "req-5".to_string(), parent_request_id: None, @@ -620,7 +620,7 @@ mod tests { let context = adapter.build_root_context("req-6".to_string(), "fs/readFile", None, None, &conn); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let invoke_ctx = OperationContext { request_id: "req-6".to_string(), parent_request_id: None, diff --git a/crates/alknet-call/src/protocol/connection.rs b/crates/alknet-call/src/protocol/connection.rs index f90b0e9..dd2f3c7 100644 --- a/crates/alknet-call/src/protocol/connection.rs +++ b/crates/alknet-call/src/protocol/connection.rs @@ -23,9 +23,7 @@ use super::wire::{ EVENT_ERROR, EVENT_RESPONDED, }; use crate::protocol::wire::ResponseEnvelope; -use crate::registry::context::{ - generate_request_id, AbortPolicy, OperationContext, ScopedOperationEnv, -}; +use crate::registry::context::{generate_request_id, AbortPolicy, OperationContext, ScopedPeerEnv}; use crate::registry::env::OperationEnv; use crate::registry::registration::{Handler, HandlerRegistration}; @@ -280,7 +278,7 @@ impl OperationEnv for OverlayOperationEnv { scoped_env = registration .scoped_env .clone() - .unwrap_or_else(ScopedOperationEnv::empty); + .unwrap_or_else(ScopedPeerEnv::empty); } let context = OperationContext { @@ -432,7 +430,7 @@ mod tests { fn root_context( request_id: &str, - scoped_env: ScopedOperationEnv, + scoped_env: ScopedPeerEnv, env: Arc, ) -> OperationContext { OperationContext { @@ -487,7 +485,7 @@ mod tests { conn.register_imported(imported_registration("worker/exec")); let env = conn.overlay_env(); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-1", scoped, env.clone()); let response = env @@ -506,7 +504,7 @@ mod tests { assert!(!env.contains("worker/missing")); - let scoped = ScopedOperationEnv::new(["worker/missing"]); + let scoped = ScopedPeerEnv::new(["worker/missing"]); let ctx = root_context("root-2", scoped, env.clone()); let response = env @@ -525,7 +523,7 @@ mod tests { conn.register_imported(imported_registration("worker/exec")); let env = conn.overlay_env(); - let scoped = ScopedOperationEnv::empty(); + let scoped = ScopedPeerEnv::empty(); let ctx = root_context("root-3", scoped, env.clone()); let response = env @@ -562,7 +560,7 @@ mod tests { )); let env = conn.overlay_env(); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-4", scoped, env.clone()); let response = env diff --git a/crates/alknet-call/src/protocol/dispatch.rs b/crates/alknet-call/src/protocol/dispatch.rs index 8183720..7f99a8d 100644 --- a/crates/alknet-call/src/protocol/dispatch.rs +++ b/crates/alknet-call/src/protocol/dispatch.rs @@ -28,7 +28,7 @@ use super::wire::{ EVENT_ABORTED, EVENT_REQUESTED, }; use crate::protocol::adapter::SessionOverlaySource; -use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv}; +use crate::registry::context::{AbortPolicy, OperationContext, ScopedPeerEnv}; use crate::registry::env::{LocalOperationEnv, OperationEnv, PeerCompositeEnv}; use crate::registry::registration::OperationRegistry; @@ -135,14 +135,12 @@ impl Dispatcher { Some(r) => ( r.composition_authority.clone(), r.capabilities.clone(), - r.scoped_env - .clone() - .unwrap_or_else(ScopedOperationEnv::empty), + r.scoped_env.clone().unwrap_or_else(ScopedPeerEnv::empty), ), None => ( None, alknet_core::types::Capabilities::new(), - ScopedOperationEnv::empty(), + ScopedPeerEnv::empty(), ), }; diff --git a/crates/alknet-call/src/registry/context.rs b/crates/alknet-call/src/registry/context.rs index b1d7beb..9fe1816 100644 --- a/crates/alknet-call/src/registry/context.rs +++ b/crates/alknet-call/src/registry/context.rs @@ -6,7 +6,7 @@ use alknet_core::auth::Identity; use alknet_core::types::Capabilities; use serde_json::Value; -use super::env::OperationEnv; +use super::env::{OperationEnv, PeerId, PeerRef}; pub struct OperationContext { pub request_id: String, @@ -25,7 +25,7 @@ pub struct OperationContext { pub forwarded_for: Option, pub capabilities: Capabilities, pub metadata: HashMap, - pub scoped_env: ScopedOperationEnv, + pub scoped_env: ScopedPeerEnv, pub env: Arc, pub abort_policy: AbortPolicy, pub deadline: Option, @@ -75,29 +75,65 @@ impl CompositionAuthority { } #[derive(Debug, Clone)] -pub struct ScopedOperationEnv { - allowed: HashSet, +pub struct ScopedPeerEnv { + /// Peer-agnostic reachability — reachable via `PeerRef::Any` or + /// `PeerRef::Specific(any)`. The common case (peer-agnostic composition). + pub allowed_ops: HashSet, + /// Peer-pinned reachability — `"peer-id/op-name"`, reachable only via + /// `PeerRef::Specific(that peer)`. Additive to `allowed_ops`; opt-in for + /// the disambiguation case (ADR-029 §4). + pub peer_pinned: HashSet, } -impl ScopedOperationEnv { +impl ScopedPeerEnv { pub fn empty() -> Self { Self { - allowed: HashSet::new(), + allowed_ops: HashSet::new(), + peer_pinned: HashSet::new(), } } pub fn new(ops: impl IntoIterator>) -> Self { Self { - allowed: ops.into_iter().map(|s| s.into()).collect(), + allowed_ops: ops.into_iter().map(|s| s.into()).collect(), + peer_pinned: HashSet::new(), } } + /// Peer-pinned reachability: `"peer-id/op-name"`. Reachable only via + /// `PeerRef::Specific(that peer)`. Additive to `new` — call `new` for the + /// peer-agnostic set, then `with_pinned` for the pinned set. + pub fn with_pinned(mut self, pinned: impl IntoIterator>) -> Self { + self.peer_pinned = pinned.into_iter().map(|s| s.into()).collect(); + self + } + + /// Peer-agnostic reachability — unchanged from `ScopedOperationEnv::allows`. + /// A name here is reachable via any routing path (`PeerRef::Any` or + /// `Specific`). pub fn allows(&self, name: &str) -> bool { - self.allowed.contains(name) + self.allowed_ops.contains(name) + } + + /// Peer-pinned reachability — reachable only via `PeerRef::Specific(peer)`. + /// The entry shape is `"peer-id/op-name"` (ADR-029 §4, OQ-33). + pub fn allows_pinned(&self, peer: &PeerId, name: &str) -> bool { + self.peer_pinned.contains(&format!("{peer}/{name}")) + } + + /// Does this scoped env permit `name` via `peer`? Used by the reachability + /// gate in `invoke_peer` / `invoke_with_policy`. + /// - `PeerRef::Any` → `allows(name)` + /// - `PeerRef::Specific(peer)` → `allows(name) || allows_pinned(peer, name)` + pub fn allows_via(&self, peer: &PeerRef, name: &str) -> bool { + match peer { + PeerRef::Any => self.allows(name), + PeerRef::Specific(p) => self.allows(name) || self.allows_pinned(p, name), + } } } -impl Default for ScopedOperationEnv { +impl Default for ScopedPeerEnv { fn default() -> Self { Self::empty() } @@ -114,24 +150,108 @@ mod tests { #[test] fn scoped_env_allows_in_set() { - let env = ScopedOperationEnv::new(["fs/readFile", "agent/chat"]); + let env = ScopedPeerEnv::new(["fs/readFile", "agent/chat"]); assert!(env.allows("fs/readFile")); assert!(env.allows("agent/chat")); } #[test] fn scoped_env_disallows_not_in_set() { - let env = ScopedOperationEnv::new(["fs/readFile"]); + let env = ScopedPeerEnv::new(["fs/readFile"]); assert!(!env.allows("agent/chat")); assert!(!env.allows("")); } #[test] fn scoped_env_empty_allows_nothing() { - let env = ScopedOperationEnv::empty(); + let env = ScopedPeerEnv::empty(); assert!(!env.allows("fs/readFile")); } + #[test] + fn scoped_peer_env_new_with_pinned_populates_both_fields() { + let env = ScopedPeerEnv::new(["fs/readFile"]).with_pinned(["worker-a/container/exec"]); + assert!(env.allowed_ops.contains("fs/readFile")); + assert!(env.peer_pinned.contains("worker-a/container/exec")); + assert!(!env.allowed_ops.contains("worker-a/container/exec")); + assert!(!env.peer_pinned.contains("fs/readFile")); + } + + #[test] + fn scoped_peer_env_allows_checks_allowed_ops_only() { + let env = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]); + assert!( + !env.allows("container/exec"), + "pinned-only op not in allowed_ops" + ); + let env2 = ScopedPeerEnv::new(["container/exec"]).with_pinned(["worker-a/container/exec"]); + assert!( + env2.allows("container/exec"), + "op in allowed_ops is allowed" + ); + } + + #[test] + fn scoped_peer_env_allows_pinned_checks_peer_pinned_shape() { + let env = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]); + assert!(env.allows_pinned(&"worker-a".to_string(), "container/exec")); + assert!( + !env.allows_pinned(&"worker-b".to_string(), "container/exec"), + "wrong peer" + ); + assert!( + !env.allows_pinned(&"worker-a".to_string(), "other/op"), + "wrong op" + ); + } + + #[test] + fn scoped_peer_env_allows_via_any_uses_allowed_ops_only() { + let env = ScopedPeerEnv::new(["fs/readFile"]).with_pinned(["worker-a/container/exec"]); + assert!( + env.allows_via(&PeerRef::Any, "fs/readFile"), + "allowed op via Any" + ); + assert!( + !env.allows_via(&PeerRef::Any, "container/exec"), + "pinned-only op NOT reachable via Any" + ); + } + + #[test] + fn scoped_peer_env_allows_via_specific_uses_allowed_ops_or_peer_pinned() { + let env = ScopedPeerEnv::new(["fs/readFile"]).with_pinned(["worker-a/container/exec"]); + assert!( + env.allows_via(&PeerRef::Specific("worker-a".to_string()), "container/exec"), + "pinned-only op reachable via Specific(pinned peer)" + ); + assert!( + env.allows_via(&PeerRef::Specific("worker-a".to_string()), "fs/readFile"), + "allowed op reachable via Specific(any peer)" + ); + assert!( + !env.allows_via(&PeerRef::Specific("worker-b".to_string()), "container/exec"), + "pinned-only op NOT reachable via Specific(wrong peer)" + ); + } + + #[test] + fn scoped_peer_env_op_in_both_sets_reachable_via_both_any_and_specific() { + let env = ScopedPeerEnv::new(["container/exec"]).with_pinned(["worker-a/container/exec"]); + assert!( + env.allows_via(&PeerRef::Any, "container/exec"), + "op in allowed_ops reachable via Any" + ); + assert!( + env.allows_via(&PeerRef::Specific("worker-a".to_string()), "container/exec"), + "op in both sets reachable via Specific(peer)" + ); + assert!( + env.allows_via(&PeerRef::Specific("worker-b".to_string()), "container/exec"), + "op in allowed_ops reachable via Specific(other peer) too" + ); + } + #[test] fn composition_authority_as_identity_correct() { let mut resources = HashMap::new(); diff --git a/crates/alknet-call/src/registry/discovery.rs b/crates/alknet-call/src/registry/discovery.rs index 4067f84..72cb13f 100644 --- a/crates/alknet-call/src/registry/discovery.rs +++ b/crates/alknet-call/src/registry/discovery.rs @@ -323,7 +323,7 @@ pub fn services_schema_handler(registry: Arc) -> Handler { #[cfg(test)] mod tests { use super::*; - use crate::registry::context::{CompositionAuthority, ScopedOperationEnv}; + use crate::registry::context::{CompositionAuthority, ScopedPeerEnv}; use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; use alknet_core::types::Capabilities; use std::collections::HashMap; @@ -389,7 +389,7 @@ mod tests { forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), - scoped_env: ScopedOperationEnv::empty(), + scoped_env: ScopedPeerEnv::empty(), env: noop_env(), abort_policy: crate::registry::context::AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), @@ -409,7 +409,7 @@ mod tests { forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), - scoped_env: ScopedOperationEnv::empty(), + scoped_env: ScopedPeerEnv::empty(), env: noop_env(), abort_policy: crate::registry::context::AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), @@ -674,7 +674,7 @@ mod tests { list_handler, OperationProvenance::Local, CompositionAuthority::none(), - ScopedOperationEnv::empty().into(), + ScopedPeerEnv::empty().into(), Capabilities::new(), )); discovery_registry.register(HandlerRegistration::new( @@ -682,7 +682,7 @@ mod tests { schema_handler, OperationProvenance::Local, CompositionAuthority::none(), - ScopedOperationEnv::empty().into(), + ScopedPeerEnv::empty().into(), Capabilities::new(), )); let discovery = Arc::new(discovery_registry); @@ -836,7 +836,7 @@ mod tests { forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), - scoped_env: ScopedOperationEnv::empty(), + scoped_env: ScopedPeerEnv::empty(), env, abort_policy: crate::registry::context::AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), @@ -919,7 +919,7 @@ mod tests { forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), - scoped_env: ScopedOperationEnv::empty(), + scoped_env: ScopedPeerEnv::empty(), env, abort_policy: crate::registry::context::AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), diff --git a/crates/alknet-call/src/registry/env.rs b/crates/alknet-call/src/registry/env.rs index 10a0873..3b08f82 100644 --- a/crates/alknet-call/src/registry/env.rs +++ b/crates/alknet-call/src/registry/env.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use serde_json::Value; -use super::context::{generate_request_id, AbortPolicy, OperationContext, ScopedOperationEnv}; +use super::context::{generate_request_id, AbortPolicy, OperationContext, ScopedPeerEnv}; use super::registration::OperationRegistry; use crate::protocol::wire::ResponseEnvelope; @@ -136,7 +136,7 @@ impl OperationEnv for LocalOperationEnv { scoped_env: registration .scoped_env .clone() - .unwrap_or_else(ScopedOperationEnv::empty), + .unwrap_or_else(ScopedPeerEnv::empty), env: parent.env.clone(), internal: true, }; @@ -263,19 +263,27 @@ impl OperationEnv for PeerCompositeEnv { policy: AbortPolicy, ) -> ResponseEnvelope { let name = format!("{namespace}/{operation}"); - if !parent.scoped_env.allows(&name) { - return ResponseEnvelope::not_found(parent.request_id.clone(), &name); - } match peer { - PeerRef::Specific(peer_id) => match self.connections.get(peer_id) { - Some(conn_env) if conn_env.contains(&name) => { - conn_env - .invoke_with_policy(namespace, operation, input, parent, policy) - .await + PeerRef::Specific(peer_id) => { + if !parent + .scoped_env + .allows_via(&PeerRef::Specific(peer_id.clone()), &name) + { + return ResponseEnvelope::not_found(parent.request_id.clone(), &name); } - _ => ResponseEnvelope::not_found(parent.request_id.clone(), &name), - }, + match self.connections.get(peer_id) { + Some(conn_env) if conn_env.contains(&name) => { + conn_env + .invoke_with_policy(namespace, operation, input, parent, policy) + .await + } + _ => ResponseEnvelope::not_found(parent.request_id.clone(), &name), + } + } PeerRef::Any => { + if !parent.scoped_env.allows(&name) { + return ResponseEnvelope::not_found(parent.request_id.clone(), &name); + } self.invoke_with_policy(namespace, operation, input, parent, policy) .await } @@ -353,7 +361,7 @@ mod tests { request_id: &str, identity: Option, handler_identity: Option, - scoped_env: ScopedOperationEnv, + scoped_env: ScopedPeerEnv, env: Arc, ) -> OperationContext { root_context_with_forwarded_for( @@ -371,7 +379,7 @@ mod tests { identity: Option, handler_identity: Option, forwarded_for: Option, - scoped_env: ScopedOperationEnv, + scoped_env: ScopedPeerEnv, env: Arc, ) -> OperationContext { OperationContext { @@ -395,7 +403,7 @@ mod tests { spec_visibility: Visibility, handler: crate::registry::registration::Handler, composition_authority: Option, - scoped_env: Option, + scoped_env: Option, ) -> Arc { let mut registry = OperationRegistry::new(); registry.register(HandlerRegistration::new( @@ -421,7 +429,7 @@ mod tests { async fn local_env_invoke_allowed_op_dispatches() { let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); - let scoped = ScopedOperationEnv::new(["echo/run"]); + let scoped = ScopedPeerEnv::new(["echo/run"]); let ctx = root_context("root-1", None, None, scoped, env.clone()); let response = env .invoke("echo", "run", serde_json::json!({"hi": 1}), &ctx) @@ -434,7 +442,7 @@ mod tests { async fn local_env_invoke_disallowed_op_returns_not_found() { let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); - let scoped = ScopedOperationEnv::new(["other/op"]); + let scoped = ScopedPeerEnv::new(["other/op"]); let ctx = root_context("root-2", None, None, scoped, env.clone()); let response = env.invoke("echo", "run", serde_json::json!({}), &ctx).await; match response.result { @@ -453,7 +461,7 @@ mod tests { None, ); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); - let scoped = ScopedOperationEnv::new(["secret/op"]); + let scoped = ScopedPeerEnv::new(["secret/op"]); let ctx = root_context("root-3", None, None, scoped, env.clone()); let response = env .invoke("secret", "op", serde_json::json!({}), &ctx) @@ -474,7 +482,7 @@ mod tests { None, ); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); - let scoped = ScopedOperationEnv::new(["child/run"]); + let scoped = ScopedPeerEnv::new(["child/run"]); let ctx = root_context( "root-4", Some(Identity { @@ -503,7 +511,7 @@ mod tests { None, ); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); - let scoped = ScopedOperationEnv::new(["child/run"]); + let scoped = ScopedPeerEnv::new(["child/run"]); let mut ctx = root_context("root-5", None, None, scoped, env.clone()); ctx.metadata .insert("secret".to_string(), Value::String("leak".into())); @@ -524,7 +532,7 @@ mod tests { None, ); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); - let scoped = ScopedOperationEnv::new(["child/run"]); + let scoped = ScopedPeerEnv::new(["child/run"]); let forwarded = Identity { id: "alice".to_string(), scopes: vec![], @@ -584,7 +592,7 @@ mod tests { }); let composite = PeerCompositeEnv::new(base).with_session(session.clone()); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["agent/chat"]); + let scoped = ScopedPeerEnv::new(["agent/chat"]); let ctx = root_context("root-6", None, None, scoped, env.clone()); let response = env .invoke("agent", "chat", serde_json::json!({}), &ctx) @@ -617,7 +625,7 @@ mod tests { composite.attach_peer("worker-a".to_string(), worker_a.clone()); composite.attach_peer("worker-b".to_string(), worker_b.clone()); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-7", None, None, scoped, env.clone()); let response = env .invoke("worker", "exec", serde_json::json!({}), &ctx) @@ -650,7 +658,7 @@ mod tests { let mut composite = PeerCompositeEnv::new(base.clone()).with_session(session); composite.attach_peer("worker-a".to_string(), connection); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["fs/readFile"]); + let scoped = ScopedPeerEnv::new(["fs/readFile"]); let ctx = root_context("root-8", None, None, scoped, env.clone()); let response = env .invoke("fs", "readFile", serde_json::json!({}), &ctx) @@ -667,7 +675,7 @@ mod tests { let base = Arc::new(NoopEnv { contains_op: true }); let composite = PeerCompositeEnv::new(base); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::empty(); + let scoped = ScopedPeerEnv::empty(); let ctx = root_context("root-9", None, None, scoped, env.clone()); let response = env .invoke("agent", "chat", serde_json::json!({}), &ctx) @@ -716,7 +724,7 @@ mod tests { composite.attach_peer("worker-a".to_string(), connection.clone()); composite.detach_peer(&"worker-a".to_string()); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-10", None, None, scoped, env.clone()); let response = env .invoke("worker", "exec", serde_json::json!({}), &ctx) @@ -742,7 +750,7 @@ mod tests { composite.detach_peer(&"worker-a".to_string()); composite.attach_peer("worker-a".to_string(), connection.clone()); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-10b", None, None, scoped, env.clone()); let response = env .invoke("worker", "exec", serde_json::json!({}), &ctx) @@ -795,7 +803,7 @@ mod tests { let mut composite = PeerCompositeEnv::new(base).with_session(session); composite.attach_peer("worker-a".to_string(), connection.clone()); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-11", None, None, scoped, env.clone()); let response = env .invoke("worker", "exec", serde_json::json!({}), &ctx) @@ -811,7 +819,7 @@ mod tests { async fn local_env_unknown_op_after_reachability_pass_returns_not_found() { let registry = Arc::new(OperationRegistry::new()); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); - let scoped = ScopedOperationEnv::new(["fs/readFile"]); + let scoped = ScopedPeerEnv::new(["fs/readFile"]); let ctx = root_context("root-12", None, None, scoped, env.clone()); let response = env .invoke("fs", "readFile", serde_json::json!({}), &ctx) @@ -832,7 +840,7 @@ mod tests { None, ); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); - let scoped = ScopedOperationEnv::new(["child/run"]); + let scoped = ScopedPeerEnv::new(["child/run"]); let deadline = Instant::now() + Duration::from_secs(5); let mut ctx = root_context("root-13", None, None, scoped, env.clone()); ctx.deadline = Some(deadline); @@ -917,7 +925,7 @@ mod tests { composite.attach_peer("worker-a".to_string(), worker_a.clone()); composite.attach_peer("worker-b".to_string(), worker_b.clone()); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-pr-1", None, None, scoped, env.clone()); let response = env .invoke_peer( @@ -952,7 +960,7 @@ mod tests { let mut composite = PeerCompositeEnv::new(base.clone()); composite.attach_peer("worker-a".to_string(), worker_a.clone()); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-pr-2", None, None, scoped, env.clone()); let response = env .invoke_peer( @@ -992,7 +1000,7 @@ mod tests { }), ); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-pr-3", None, None, scoped, env.clone()); let response = env .invoke_peer( @@ -1032,7 +1040,7 @@ mod tests { composite.attach_peer("worker-a".to_string(), worker_a.clone()); composite.attach_peer("worker-b".to_string(), worker_b.clone()); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::new(["worker/exec"]); + let scoped = ScopedPeerEnv::new(["worker/exec"]); let ctx = root_context("root-pr-4", None, None, scoped, env.clone()); let response = env .invoke_peer( @@ -1063,7 +1071,7 @@ mod tests { let mut composite = PeerCompositeEnv::new(base); composite.attach_peer("worker-a".to_string(), worker_a.clone()); let env: Arc = Arc::new(composite); - let scoped = ScopedOperationEnv::empty(); + let scoped = ScopedPeerEnv::empty(); let ctx = root_context("root-pr-5", None, None, scoped, env.clone()); let response = env .invoke_peer( @@ -1111,7 +1119,7 @@ mod tests { async fn default_invoke_peer_delegates_to_invoke_with_policy() { let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None); let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); - let scoped = ScopedOperationEnv::new(["echo/run"]); + let scoped = ScopedPeerEnv::new(["echo/run"]); let ctx = root_context("root-pr-6", None, None, scoped, env.clone()); let response = env .invoke_peer( @@ -1133,4 +1141,178 @@ mod tests { let env = LocalOperationEnv::new(registry); assert!(env.peer_contains(&"any-peer".to_string(), "anything")); } + + // --- ADR-029 §4: peer-pinned reachability gate ------------------------- + + #[tokio::test] + async fn invoke_peer_specific_pinned_only_op_reaches_pinned_peer() { + let base = Arc::new(NoopEnv { contains_op: true }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["container/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]); + let ctx = root_context("root-pin-1", None, None, scoped, env.clone()); + let response = env + .invoke_peer( + &PeerRef::Specific("worker-a".to_string()), + "container", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + assert_eq!(response.result.unwrap(), Value::String("worker-a".into())); + assert_eq!( + worker_a.dispatched.lock().unwrap().as_deref(), + Some("container/exec") + ); + } + + #[tokio::test] + async fn invoke_peer_any_pinned_only_op_returns_not_found() { + let base = Arc::new(NoopEnv { contains_op: true }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["container/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]); + let ctx = root_context("root-pin-2", None, None, scoped, env.clone()); + let response = env + .invoke_peer( + &PeerRef::Any, + "container", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND", "pinned-only op NOT reachable via Any"), + other => panic!("expected NOT_FOUND via Any, got {other:?}"), + } + assert!(worker_a.dispatched.lock().unwrap().is_none()); + } + + #[tokio::test] + async fn invoke_with_policy_does_not_pick_up_pinned_only_ops() { + let base = Arc::new(NoopEnv { contains_op: true }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["container/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]); + let ctx = root_context("root-pin-3", None, None, scoped, env.clone()); + let response = env + .invoke("container", "exec", serde_json::json!({}), &ctx) + .await; + match response.result { + Err(e) => assert_eq!( + e.code, "NOT_FOUND", + "invoke_with_policy (Any path) must NOT pick up pinned-only ops" + ), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + assert!(worker_a.dispatched.lock().unwrap().is_none()); + } + + #[tokio::test] + async fn invoke_peer_specific_wrong_peer_for_pinned_only_op_returns_not_found() { + let base = Arc::new(NoopEnv { contains_op: true }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["container/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let worker_b = Arc::new(ProbeEnv { + name: "worker-b".to_string(), + contains_set: vec!["container/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + composite.attach_peer("worker-b".to_string(), worker_b.clone()); + let env: Arc = Arc::new(composite); + let scoped = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]); + let ctx = root_context("root-pin-4", None, None, scoped, env.clone()); + let response = env + .invoke_peer( + &PeerRef::Specific("worker-b".to_string()), + "container", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + match response.result { + Err(e) => assert_eq!( + e.code, "NOT_FOUND", + "pinned to worker-a, routed to worker-b → NOT_FOUND" + ), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + assert!(worker_a.dispatched.lock().unwrap().is_none()); + assert!(worker_b.dispatched.lock().unwrap().is_none()); + } + + #[tokio::test] + async fn invoke_peer_op_in_both_sets_reachable_via_both_any_and_specific() { + let base = Arc::new(NoopEnv { contains_op: true }); + let worker_a = Arc::new(ProbeEnv { + name: "worker-a".to_string(), + contains_set: vec!["container/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let mut composite = PeerCompositeEnv::new(base); + composite.attach_peer("worker-a".to_string(), worker_a.clone()); + let env: Arc = Arc::new(composite); + let scoped = + ScopedPeerEnv::new(["container/exec"]).with_pinned(["worker-a/container/exec"]); + let ctx = root_context("root-pin-5", None, None, scoped, env.clone()); + + let response_any = env + .invoke_peer( + &PeerRef::Any, + "container", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + assert!( + response_any.result.is_ok(), + "op in allowed_ops reachable via Any" + ); + + let response_specific = env + .invoke_peer( + &PeerRef::Specific("worker-a".to_string()), + "container", + "exec", + serde_json::json!({}), + &ctx, + AbortPolicy::default(), + ) + .await; + assert!( + response_specific.result.is_ok(), + "op in both sets reachable via Specific(peer)" + ); + } } diff --git a/crates/alknet-call/src/registry/registration.rs b/crates/alknet-call/src/registry/registration.rs index 77837d1..8bae54b 100644 --- a/crates/alknet-call/src/registry/registration.rs +++ b/crates/alknet-call/src/registry/registration.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use alknet_core::types::Capabilities; use serde_json::Value; -use super::context::{CompositionAuthority, OperationContext, ScopedOperationEnv}; +use super::context::{CompositionAuthority, OperationContext, ScopedPeerEnv}; use super::spec::{AccessResult, OperationSpec, Visibility}; use crate::protocol::wire::ResponseEnvelope; @@ -31,7 +31,7 @@ pub struct HandlerRegistration { pub handler: Handler, pub provenance: OperationProvenance, pub composition_authority: Option, - pub scoped_env: Option, + pub scoped_env: Option, pub capabilities: Capabilities, } @@ -41,7 +41,7 @@ impl HandlerRegistration { handler: Handler, provenance: OperationProvenance, composition_authority: Option, - scoped_env: Option, + scoped_env: Option, capabilities: Capabilities, ) -> Self { Self { @@ -146,7 +146,7 @@ impl OperationRegistryBuilder { spec: OperationSpec, handler: Handler, composition_authority: Option, - scoped_env: Option, + scoped_env: Option, capabilities: Capabilities, ) -> Self { let registration = HandlerRegistration::new( @@ -247,7 +247,7 @@ mod tests { identity: Option, handler_identity: Option, internal: bool, - scoped_env: ScopedOperationEnv, + scoped_env: ScopedPeerEnv, ) -> OperationContext { OperationContext { request_id: request_id.to_string(), @@ -320,7 +320,7 @@ mod tests { None, Capabilities::new(), )); - let ctx = root_context("req-1", None, None, false, ScopedOperationEnv::empty()); + let ctx = root_context("req-1", None, None, false, ScopedPeerEnv::empty()); let response = registry .invoke("echo", serde_json::json!({"hi": 1}), ctx) .await; @@ -339,7 +339,7 @@ mod tests { None, Capabilities::new(), )); - let ctx = root_context("req-2", None, None, false, ScopedOperationEnv::empty()); + let ctx = root_context("req-2", None, None, false, ScopedPeerEnv::empty()); let response = registry.invoke("secret", serde_json::json!({}), ctx).await; match response.result { Err(e) => { @@ -361,7 +361,7 @@ mod tests { None, Capabilities::new(), )); - let ctx = root_context("req-3", None, None, true, ScopedOperationEnv::empty()); + let ctx = root_context("req-3", None, None, true, ScopedPeerEnv::empty()); let response = registry .invoke("secret", serde_json::json!({"x": 2}), ctx) .await; @@ -372,7 +372,7 @@ mod tests { #[tokio::test] async fn unknown_op_returns_not_found() { let registry = OperationRegistry::new(); - let ctx = root_context("req-4", None, None, false, ScopedOperationEnv::empty()); + let ctx = root_context("req-4", None, None, false, ScopedPeerEnv::empty()); let response = registry.invoke("missing", serde_json::json!({}), ctx).await; match response.result { Err(e) => assert_eq!(e.code, "NOT_FOUND"), @@ -402,7 +402,7 @@ mod tests { Some(identity_with_scopes("caller", &["admin"])), None, false, - ScopedOperationEnv::empty(), + ScopedPeerEnv::empty(), ); let response = registry.invoke("admin", serde_json::json!({}), ctx).await; assert!(response.result.is_ok()); @@ -430,7 +430,7 @@ mod tests { Some(identity_with_scopes("caller", &["user"])), None, false, - ScopedOperationEnv::empty(), + ScopedPeerEnv::empty(), ); let response = registry.invoke("admin", serde_json::json!({}), ctx).await; match response.result { @@ -459,7 +459,7 @@ mod tests { None, Capabilities::new(), )); - let ctx = root_context("req-7", None, None, false, ScopedOperationEnv::empty()); + let ctx = root_context("req-7", None, None, false, ScopedPeerEnv::empty()); let response = registry.invoke("admin", serde_json::json!({}), ctx).await; match response.result { Err(e) => { @@ -493,7 +493,7 @@ mod tests { Some(identity_with_scopes("user", &["user"])), Some(composing_authority), true, - ScopedOperationEnv::empty(), + ScopedPeerEnv::empty(), ); let response = registry.invoke("secret", serde_json::json!({}), ctx).await; assert!( @@ -525,7 +525,7 @@ mod tests { Some(identity_with_scopes("user", &["admin"])), Some(weak_authority), true, - ScopedOperationEnv::empty(), + ScopedPeerEnv::empty(), ); let response = registry.invoke("secret", serde_json::json!({}), ctx).await; match response.result { @@ -560,7 +560,7 @@ mod tests { Some(identity_with_scopes("user", &["user"])), Some(CompositionAuthority::new("agent", ["admin".to_string()])), false, - ScopedOperationEnv::empty(), + ScopedPeerEnv::empty(), ); let response = registry.invoke("gate", serde_json::json!({}), ctx).await; match response.result { @@ -604,7 +604,7 @@ mod tests { None, Capabilities::new(), )); - let ctx = root_context("req-11", None, None, false, ScopedOperationEnv::empty()); + let ctx = root_context("req-11", None, None, false, ScopedPeerEnv::empty()); let response = registry.invoke("boom", serde_json::json!({}), ctx).await; match response.result { Err(e) => assert_eq!(e.code, "INTERNAL"), @@ -619,7 +619,7 @@ mod tests { external_spec("echo", AccessControl::default()), echo_handler(), CompositionAuthority::none(), - ScopedOperationEnv::empty().into(), + ScopedPeerEnv::empty().into(), Capabilities::new(), ) .build(); @@ -636,7 +636,7 @@ mod tests { external_spec("agent", AccessControl::default()), echo_handler(), Some(CompositionAuthority::new("agent", ["fs:read".to_string()])), - Some(ScopedOperationEnv::new(["fs/readFile"])), + Some(ScopedPeerEnv::new(["fs/readFile"])), Capabilities::new(), ) .build(); @@ -687,7 +687,7 @@ mod tests { echo_handler(), OperationProvenance::Session, Some(CompositionAuthority::new("sandbox", [])), - Some(ScopedOperationEnv::new(["fs/readFile"])), + Some(ScopedPeerEnv::new(["fs/readFile"])), Capabilities::new(), ); let registry = OperationRegistryBuilder::new().with(registration).build(); @@ -715,7 +715,7 @@ mod tests { fn registration_lookup_returns_bundle_fields() { let mut registry = OperationRegistry::new(); let authority = CompositionAuthority::new("agent", ["fs:read".to_string()]); - let scoped = ScopedOperationEnv::new(["fs/readFile"]); + let scoped = ScopedPeerEnv::new(["fs/readFile"]); let caps = Capabilities::new().with_api_key("google", "k".to_string()); registry.register(HandlerRegistration::new( external_spec("agent", AccessControl::default()), diff --git a/crates/alknet-call/tests/two_node_call.rs b/crates/alknet-call/tests/two_node_call.rs index 0278e9b..827f1cd 100644 --- a/crates/alknet-call/tests/two_node_call.rs +++ b/crates/alknet-call/tests/two_node_call.rs @@ -233,7 +233,7 @@ async fn two_node_call_round_trip() { #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn from_call_discovers_and_forwards_over_quic_loopback() { use alknet_call::client::{from_call, FromCallConfig}; - use alknet_call::registry::context::ScopedOperationEnv; + use alknet_call::registry::context::ScopedPeerEnv; let server_registry = build_server_registry(); let (server_addr, server_fingerprint, _server_join) = @@ -284,7 +284,7 @@ async fn from_call_discovers_and_forwards_over_quic_loopback() { // Build a minimal parent context to invoke the overlay env (mirrors how a // composing handler dispatches a child). - let scoped = ScopedOperationEnv::new(["server/echo"]); + let scoped = ScopedPeerEnv::new(["server/echo"]); let parent = alknet_call::registry::context::OperationContext { request_id: "parent-1".to_string(), parent_request_id: None,