feat(call): add ScopedPeerEnv peer-pinned reachability (ADR-029 §4, call/scoped-peer-env)

This commit is contained in:
2026-06-30 11:07:41 +00:00
parent 5c4feff468
commit bfd1621b9b
10 changed files with 397 additions and 99 deletions

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use serde_json::Value;
use super::context::{generate_request_id, AbortPolicy, OperationContext, ScopedOperationEnv};
use super::context::{generate_request_id, AbortPolicy, OperationContext, ScopedPeerEnv};
use super::registration::OperationRegistry;
use crate::protocol::wire::ResponseEnvelope;
@@ -136,7 +136,7 @@ impl OperationEnv for LocalOperationEnv {
scoped_env: registration
.scoped_env
.clone()
.unwrap_or_else(ScopedOperationEnv::empty),
.unwrap_or_else(ScopedPeerEnv::empty),
env: parent.env.clone(),
internal: true,
};
@@ -263,19 +263,27 @@ impl OperationEnv for PeerCompositeEnv {
policy: AbortPolicy,
) -> ResponseEnvelope {
let name = format!("{namespace}/{operation}");
if !parent.scoped_env.allows(&name) {
return ResponseEnvelope::not_found(parent.request_id.clone(), &name);
}
match peer {
PeerRef::Specific(peer_id) => match self.connections.get(peer_id) {
Some(conn_env) if conn_env.contains(&name) => {
conn_env
.invoke_with_policy(namespace, operation, input, parent, policy)
.await
PeerRef::Specific(peer_id) => {
if !parent
.scoped_env
.allows_via(&PeerRef::Specific(peer_id.clone()), &name)
{
return ResponseEnvelope::not_found(parent.request_id.clone(), &name);
}
_ => ResponseEnvelope::not_found(parent.request_id.clone(), &name),
},
match self.connections.get(peer_id) {
Some(conn_env) if conn_env.contains(&name) => {
conn_env
.invoke_with_policy(namespace, operation, input, parent, policy)
.await
}
_ => ResponseEnvelope::not_found(parent.request_id.clone(), &name),
}
}
PeerRef::Any => {
if !parent.scoped_env.allows(&name) {
return ResponseEnvelope::not_found(parent.request_id.clone(), &name);
}
self.invoke_with_policy(namespace, operation, input, parent, policy)
.await
}
@@ -353,7 +361,7 @@ mod tests {
request_id: &str,
identity: Option<Identity>,
handler_identity: Option<CompositionAuthority>,
scoped_env: ScopedOperationEnv,
scoped_env: ScopedPeerEnv,
env: Arc<dyn OperationEnv + Send + Sync>,
) -> OperationContext {
root_context_with_forwarded_for(
@@ -371,7 +379,7 @@ mod tests {
identity: Option<Identity>,
handler_identity: Option<CompositionAuthority>,
forwarded_for: Option<Identity>,
scoped_env: ScopedOperationEnv,
scoped_env: ScopedPeerEnv,
env: Arc<dyn OperationEnv + Send + Sync>,
) -> OperationContext {
OperationContext {
@@ -395,7 +403,7 @@ mod tests {
spec_visibility: Visibility,
handler: crate::registry::registration::Handler,
composition_authority: Option<CompositionAuthority>,
scoped_env: Option<ScopedOperationEnv>,
scoped_env: Option<ScopedPeerEnv>,
) -> Arc<OperationRegistry> {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
@@ -421,7 +429,7 @@ mod tests {
async fn local_env_invoke_allowed_op_dispatches() {
let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None);
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["echo/run"]);
let scoped = ScopedPeerEnv::new(["echo/run"]);
let ctx = root_context("root-1", None, None, scoped, env.clone());
let response = env
.invoke("echo", "run", serde_json::json!({"hi": 1}), &ctx)
@@ -434,7 +442,7 @@ mod tests {
async fn local_env_invoke_disallowed_op_returns_not_found() {
let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None);
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["other/op"]);
let scoped = ScopedPeerEnv::new(["other/op"]);
let ctx = root_context("root-2", None, None, scoped, env.clone());
let response = env.invoke("echo", "run", serde_json::json!({}), &ctx).await;
match response.result {
@@ -453,7 +461,7 @@ mod tests {
None,
);
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["secret/op"]);
let scoped = ScopedPeerEnv::new(["secret/op"]);
let ctx = root_context("root-3", None, None, scoped, env.clone());
let response = env
.invoke("secret", "op", serde_json::json!({}), &ctx)
@@ -474,7 +482,7 @@ mod tests {
None,
);
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["child/run"]);
let scoped = ScopedPeerEnv::new(["child/run"]);
let ctx = root_context(
"root-4",
Some(Identity {
@@ -503,7 +511,7 @@ mod tests {
None,
);
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["child/run"]);
let scoped = ScopedPeerEnv::new(["child/run"]);
let mut ctx = root_context("root-5", None, None, scoped, env.clone());
ctx.metadata
.insert("secret".to_string(), Value::String("leak".into()));
@@ -524,7 +532,7 @@ mod tests {
None,
);
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["child/run"]);
let scoped = ScopedPeerEnv::new(["child/run"]);
let forwarded = Identity {
id: "alice".to_string(),
scopes: vec![],
@@ -584,7 +592,7 @@ mod tests {
});
let composite = PeerCompositeEnv::new(base).with_session(session.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["agent/chat"]);
let scoped = ScopedPeerEnv::new(["agent/chat"]);
let ctx = root_context("root-6", None, None, scoped, env.clone());
let response = env
.invoke("agent", "chat", serde_json::json!({}), &ctx)
@@ -617,7 +625,7 @@ mod tests {
composite.attach_peer("worker-a".to_string(), worker_a.clone());
composite.attach_peer("worker-b".to_string(), worker_b.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let scoped = ScopedPeerEnv::new(["worker/exec"]);
let ctx = root_context("root-7", None, None, scoped, env.clone());
let response = env
.invoke("worker", "exec", serde_json::json!({}), &ctx)
@@ -650,7 +658,7 @@ mod tests {
let mut composite = PeerCompositeEnv::new(base.clone()).with_session(session);
composite.attach_peer("worker-a".to_string(), connection);
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["fs/readFile"]);
let scoped = ScopedPeerEnv::new(["fs/readFile"]);
let ctx = root_context("root-8", None, None, scoped, env.clone());
let response = env
.invoke("fs", "readFile", serde_json::json!({}), &ctx)
@@ -667,7 +675,7 @@ mod tests {
let base = Arc::new(NoopEnv { contains_op: true });
let composite = PeerCompositeEnv::new(base);
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::empty();
let scoped = ScopedPeerEnv::empty();
let ctx = root_context("root-9", None, None, scoped, env.clone());
let response = env
.invoke("agent", "chat", serde_json::json!({}), &ctx)
@@ -716,7 +724,7 @@ mod tests {
composite.attach_peer("worker-a".to_string(), connection.clone());
composite.detach_peer(&"worker-a".to_string());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let scoped = ScopedPeerEnv::new(["worker/exec"]);
let ctx = root_context("root-10", None, None, scoped, env.clone());
let response = env
.invoke("worker", "exec", serde_json::json!({}), &ctx)
@@ -742,7 +750,7 @@ mod tests {
composite.detach_peer(&"worker-a".to_string());
composite.attach_peer("worker-a".to_string(), connection.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let scoped = ScopedPeerEnv::new(["worker/exec"]);
let ctx = root_context("root-10b", None, None, scoped, env.clone());
let response = env
.invoke("worker", "exec", serde_json::json!({}), &ctx)
@@ -795,7 +803,7 @@ mod tests {
let mut composite = PeerCompositeEnv::new(base).with_session(session);
composite.attach_peer("worker-a".to_string(), connection.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let scoped = ScopedPeerEnv::new(["worker/exec"]);
let ctx = root_context("root-11", None, None, scoped, env.clone());
let response = env
.invoke("worker", "exec", serde_json::json!({}), &ctx)
@@ -811,7 +819,7 @@ mod tests {
async fn local_env_unknown_op_after_reachability_pass_returns_not_found() {
let registry = Arc::new(OperationRegistry::new());
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["fs/readFile"]);
let scoped = ScopedPeerEnv::new(["fs/readFile"]);
let ctx = root_context("root-12", None, None, scoped, env.clone());
let response = env
.invoke("fs", "readFile", serde_json::json!({}), &ctx)
@@ -832,7 +840,7 @@ mod tests {
None,
);
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["child/run"]);
let scoped = ScopedPeerEnv::new(["child/run"]);
let deadline = Instant::now() + Duration::from_secs(5);
let mut ctx = root_context("root-13", None, None, scoped, env.clone());
ctx.deadline = Some(deadline);
@@ -917,7 +925,7 @@ mod tests {
composite.attach_peer("worker-a".to_string(), worker_a.clone());
composite.attach_peer("worker-b".to_string(), worker_b.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let scoped = ScopedPeerEnv::new(["worker/exec"]);
let ctx = root_context("root-pr-1", None, None, scoped, env.clone());
let response = env
.invoke_peer(
@@ -952,7 +960,7 @@ mod tests {
let mut composite = PeerCompositeEnv::new(base.clone());
composite.attach_peer("worker-a".to_string(), worker_a.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let scoped = ScopedPeerEnv::new(["worker/exec"]);
let ctx = root_context("root-pr-2", None, None, scoped, env.clone());
let response = env
.invoke_peer(
@@ -992,7 +1000,7 @@ mod tests {
}),
);
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let scoped = ScopedPeerEnv::new(["worker/exec"]);
let ctx = root_context("root-pr-3", None, None, scoped, env.clone());
let response = env
.invoke_peer(
@@ -1032,7 +1040,7 @@ mod tests {
composite.attach_peer("worker-a".to_string(), worker_a.clone());
composite.attach_peer("worker-b".to_string(), worker_b.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let scoped = ScopedPeerEnv::new(["worker/exec"]);
let ctx = root_context("root-pr-4", None, None, scoped, env.clone());
let response = env
.invoke_peer(
@@ -1063,7 +1071,7 @@ mod tests {
let mut composite = PeerCompositeEnv::new(base);
composite.attach_peer("worker-a".to_string(), worker_a.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedOperationEnv::empty();
let scoped = ScopedPeerEnv::empty();
let ctx = root_context("root-pr-5", None, None, scoped, env.clone());
let response = env
.invoke_peer(
@@ -1111,7 +1119,7 @@ mod tests {
async fn default_invoke_peer_delegates_to_invoke_with_policy() {
let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None);
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["echo/run"]);
let scoped = ScopedPeerEnv::new(["echo/run"]);
let ctx = root_context("root-pr-6", None, None, scoped, env.clone());
let response = env
.invoke_peer(
@@ -1133,4 +1141,178 @@ mod tests {
let env = LocalOperationEnv::new(registry);
assert!(env.peer_contains(&"any-peer".to_string(), "anything"));
}
// --- ADR-029 §4: peer-pinned reachability gate -------------------------
#[tokio::test]
async fn invoke_peer_specific_pinned_only_op_reaches_pinned_peer() {
let base = Arc::new(NoopEnv { contains_op: true });
let worker_a = Arc::new(ProbeEnv {
name: "worker-a".to_string(),
contains_set: vec!["container/exec".to_string()],
dispatched: std::sync::Mutex::new(None),
});
let mut composite = PeerCompositeEnv::new(base);
composite.attach_peer("worker-a".to_string(), worker_a.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]);
let ctx = root_context("root-pin-1", None, None, scoped, env.clone());
let response = env
.invoke_peer(
&PeerRef::Specific("worker-a".to_string()),
"container",
"exec",
serde_json::json!({}),
&ctx,
AbortPolicy::default(),
)
.await;
assert_eq!(response.result.unwrap(), Value::String("worker-a".into()));
assert_eq!(
worker_a.dispatched.lock().unwrap().as_deref(),
Some("container/exec")
);
}
#[tokio::test]
async fn invoke_peer_any_pinned_only_op_returns_not_found() {
let base = Arc::new(NoopEnv { contains_op: true });
let worker_a = Arc::new(ProbeEnv {
name: "worker-a".to_string(),
contains_set: vec!["container/exec".to_string()],
dispatched: std::sync::Mutex::new(None),
});
let mut composite = PeerCompositeEnv::new(base);
composite.attach_peer("worker-a".to_string(), worker_a.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]);
let ctx = root_context("root-pin-2", None, None, scoped, env.clone());
let response = env
.invoke_peer(
&PeerRef::Any,
"container",
"exec",
serde_json::json!({}),
&ctx,
AbortPolicy::default(),
)
.await;
match response.result {
Err(e) => assert_eq!(e.code, "NOT_FOUND", "pinned-only op NOT reachable via Any"),
other => panic!("expected NOT_FOUND via Any, got {other:?}"),
}
assert!(worker_a.dispatched.lock().unwrap().is_none());
}
#[tokio::test]
async fn invoke_with_policy_does_not_pick_up_pinned_only_ops() {
let base = Arc::new(NoopEnv { contains_op: true });
let worker_a = Arc::new(ProbeEnv {
name: "worker-a".to_string(),
contains_set: vec!["container/exec".to_string()],
dispatched: std::sync::Mutex::new(None),
});
let mut composite = PeerCompositeEnv::new(base);
composite.attach_peer("worker-a".to_string(), worker_a.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]);
let ctx = root_context("root-pin-3", None, None, scoped, env.clone());
let response = env
.invoke("container", "exec", serde_json::json!({}), &ctx)
.await;
match response.result {
Err(e) => assert_eq!(
e.code, "NOT_FOUND",
"invoke_with_policy (Any path) must NOT pick up pinned-only ops"
),
other => panic!("expected NOT_FOUND, got {other:?}"),
}
assert!(worker_a.dispatched.lock().unwrap().is_none());
}
#[tokio::test]
async fn invoke_peer_specific_wrong_peer_for_pinned_only_op_returns_not_found() {
let base = Arc::new(NoopEnv { contains_op: true });
let worker_a = Arc::new(ProbeEnv {
name: "worker-a".to_string(),
contains_set: vec!["container/exec".to_string()],
dispatched: std::sync::Mutex::new(None),
});
let worker_b = Arc::new(ProbeEnv {
name: "worker-b".to_string(),
contains_set: vec!["container/exec".to_string()],
dispatched: std::sync::Mutex::new(None),
});
let mut composite = PeerCompositeEnv::new(base);
composite.attach_peer("worker-a".to_string(), worker_a.clone());
composite.attach_peer("worker-b".to_string(), worker_b.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped = ScopedPeerEnv::empty().with_pinned(["worker-a/container/exec"]);
let ctx = root_context("root-pin-4", None, None, scoped, env.clone());
let response = env
.invoke_peer(
&PeerRef::Specific("worker-b".to_string()),
"container",
"exec",
serde_json::json!({}),
&ctx,
AbortPolicy::default(),
)
.await;
match response.result {
Err(e) => assert_eq!(
e.code, "NOT_FOUND",
"pinned to worker-a, routed to worker-b → NOT_FOUND"
),
other => panic!("expected NOT_FOUND, got {other:?}"),
}
assert!(worker_a.dispatched.lock().unwrap().is_none());
assert!(worker_b.dispatched.lock().unwrap().is_none());
}
#[tokio::test]
async fn invoke_peer_op_in_both_sets_reachable_via_both_any_and_specific() {
let base = Arc::new(NoopEnv { contains_op: true });
let worker_a = Arc::new(ProbeEnv {
name: "worker-a".to_string(),
contains_set: vec!["container/exec".to_string()],
dispatched: std::sync::Mutex::new(None),
});
let mut composite = PeerCompositeEnv::new(base);
composite.attach_peer("worker-a".to_string(), worker_a.clone());
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
let scoped =
ScopedPeerEnv::new(["container/exec"]).with_pinned(["worker-a/container/exec"]);
let ctx = root_context("root-pin-5", None, None, scoped, env.clone());
let response_any = env
.invoke_peer(
&PeerRef::Any,
"container",
"exec",
serde_json::json!({}),
&ctx,
AbortPolicy::default(),
)
.await;
assert!(
response_any.result.is_ok(),
"op in allowed_ops reachable via Any"
);
let response_specific = env
.invoke_peer(
&PeerRef::Specific("worker-a".to_string()),
"container",
"exec",
serde_json::json!({}),
&ctx,
AbortPolicy::default(),
)
.await;
assert!(
response_specific.result.is_ok(),
"op in both sets reachable via Specific(peer)"
);
}
}