feat(call): add invoke_peer/peer_contains/PeerRef to OperationEnv for peer-keyed routing (call/operation-env-invoke-peer)
This commit is contained in:
@@ -13,6 +13,19 @@ use crate::protocol::wire::ResponseEnvelope;
|
|||||||
/// peer's cryptographic material.
|
/// peer's cryptographic material.
|
||||||
pub type PeerId = String;
|
pub type PeerId = String;
|
||||||
|
|
||||||
|
/// Peer-routing selector (ADR-029 §2). Selects a specific peer's sub-overlay
|
||||||
|
/// (`Specific`) or the first peer (insertion order) that serves the op
|
||||||
|
/// (`Any`).
|
||||||
|
///
|
||||||
|
/// `PeerRef::Specific(PeerId)` routes to the named peer's overlay only — no
|
||||||
|
/// fallthrough (explicit routing must be honored or fail loudly, ADR-029 §2).
|
||||||
|
/// `PeerRef::Any` reuses `invoke_with_policy` (the insertion-order fan-out
|
||||||
|
/// built in `PeerCompositeEnv`).
|
||||||
|
pub enum PeerRef {
|
||||||
|
Specific(PeerId),
|
||||||
|
Any,
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait OperationEnv: Send + Sync {
|
pub trait OperationEnv: Send + Sync {
|
||||||
async fn invoke(
|
async fn invoke(
|
||||||
@@ -50,6 +63,26 @@ pub trait OperationEnv: Send + Sync {
|
|||||||
fn peer_operations(&self, _peer: &PeerId) -> Vec<String> {
|
fn peer_operations(&self, _peer: &PeerId) -> Vec<String> {
|
||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Peer-routing composition (ADR-029 §2). Routes to a specific peer
|
||||||
|
/// (`PeerRef::Specific`) or to the first peer that serves the op
|
||||||
|
/// (`PeerRef::Any`). The default impl ignores the peer selector and
|
||||||
|
/// delegates to `invoke_with_policy`, preserving back-compat for
|
||||||
|
/// single-layer envs that don't route by peer. `PeerCompositeEnv`
|
||||||
|
/// overrides with real peer-keyed routing.
|
||||||
|
async fn invoke_peer(
|
||||||
|
&self,
|
||||||
|
peer: &PeerRef,
|
||||||
|
namespace: &str,
|
||||||
|
operation: &str,
|
||||||
|
input: Value,
|
||||||
|
parent: &OperationContext,
|
||||||
|
policy: AbortPolicy,
|
||||||
|
) -> ResponseEnvelope {
|
||||||
|
let _ = peer;
|
||||||
|
self.invoke_with_policy(namespace, operation, input, parent, policy)
|
||||||
|
.await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct LocalOperationEnv {
|
pub struct LocalOperationEnv {
|
||||||
@@ -219,6 +252,41 @@ impl OperationEnv for PeerCompositeEnv {
|
|||||||
|| self.connections.values().any(|c| c.contains(name))
|
|| self.connections.values().any(|c| c.contains(name))
|
||||||
|| self.base.contains(name)
|
|| self.base.contains(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn invoke_peer(
|
||||||
|
&self,
|
||||||
|
peer: &PeerRef,
|
||||||
|
namespace: &str,
|
||||||
|
operation: &str,
|
||||||
|
input: Value,
|
||||||
|
parent: &OperationContext,
|
||||||
|
policy: AbortPolicy,
|
||||||
|
) -> ResponseEnvelope {
|
||||||
|
let name = format!("{namespace}/{operation}");
|
||||||
|
if !parent.scoped_env.allows(&name) {
|
||||||
|
return ResponseEnvelope::not_found(parent.request_id.clone(), &name);
|
||||||
|
}
|
||||||
|
match peer {
|
||||||
|
PeerRef::Specific(peer_id) => match self.connections.get(peer_id) {
|
||||||
|
Some(conn_env) if conn_env.contains(&name) => {
|
||||||
|
conn_env
|
||||||
|
.invoke_with_policy(namespace, operation, input, parent, policy)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
_ => ResponseEnvelope::not_found(parent.request_id.clone(), &name),
|
||||||
|
},
|
||||||
|
PeerRef::Any => {
|
||||||
|
self.invoke_with_policy(namespace, operation, input, parent, policy)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn peer_contains(&self, peer: &PeerId, name: &str) -> bool {
|
||||||
|
self.connections
|
||||||
|
.get(peer)
|
||||||
|
.is_some_and(|c| c.contains(name))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -825,4 +893,242 @@ mod tests {
|
|||||||
assert_eq!(composite.connection_order().len(), 1);
|
assert_eq!(composite.connection_order().len(), 1);
|
||||||
assert!(composite.connections().contains_key("worker-a"));
|
assert!(composite.connections().contains_key("worker-a"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_specific_routes_to_named_peer() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_b = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-b".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a.clone());
|
||||||
|
composite.attach_peer("worker-b".to_string(), worker_b.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-pr-1", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("worker-b".to_string()),
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(response.result.unwrap(), Value::String("worker-b".into()));
|
||||||
|
assert_eq!(
|
||||||
|
worker_b.dispatched.lock().unwrap().as_deref(),
|
||||||
|
Some("worker/exec")
|
||||||
|
);
|
||||||
|
assert!(worker_a.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_specific_returns_not_found_when_peer_does_not_serve_op() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["other/op".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base.clone());
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-pr-2", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("worker-a".to_string()),
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(e.code, "NOT_FOUND"),
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
assert!(worker_a.dispatched.lock().unwrap().is_none());
|
||||||
|
assert!(
|
||||||
|
base.dispatched.lock().unwrap().is_none(),
|
||||||
|
"no fallthrough to base"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_specific_returns_not_found_when_peer_unknown() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base.clone());
|
||||||
|
composite.attach_peer(
|
||||||
|
"worker-a".to_string(),
|
||||||
|
Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-pr-3", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("ghost".to_string()),
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(e.code, "NOT_FOUND"),
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
assert!(base.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_any_routes_to_first_peer_in_insertion_order() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_b = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-b".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a.clone());
|
||||||
|
composite.attach_peer("worker-b".to_string(), worker_b.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-pr-4", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Any,
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(response.result.unwrap(), Value::String("worker-a".into()));
|
||||||
|
assert_eq!(
|
||||||
|
worker_a.dispatched.lock().unwrap().as_deref(),
|
||||||
|
Some("worker/exec")
|
||||||
|
);
|
||||||
|
assert!(worker_b.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_reachability_check_gates_before_routing() {
|
||||||
|
let base = Arc::new(NoopEnv { contains_op: true });
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::empty();
|
||||||
|
let ctx = root_context("root-pr-5", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("worker-a".to_string()),
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(e.code, "NOT_FOUND"),
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
assert!(worker_a.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn peer_contains_checks_specific_peer_overlay() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_b = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-b".to_string(),
|
||||||
|
contains_set: vec!["other/op".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a);
|
||||||
|
composite.attach_peer("worker-b".to_string(), worker_b);
|
||||||
|
assert!(composite.peer_contains(&"worker-a".to_string(), "worker/exec"));
|
||||||
|
assert!(!composite.peer_contains(&"worker-b".to_string(), "worker/exec"));
|
||||||
|
assert!(!composite.peer_contains(&"ghost".to_string(), "worker/exec"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn default_invoke_peer_delegates_to_invoke_with_policy() {
|
||||||
|
let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None);
|
||||||
|
let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry)));
|
||||||
|
let scoped = ScopedOperationEnv::new(["echo/run"]);
|
||||||
|
let ctx = root_context("root-pr-6", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("any-peer".to_string()),
|
||||||
|
"echo",
|
||||||
|
"run",
|
||||||
|
serde_json::json!({"hi": 1}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(response.result.is_ok());
|
||||||
|
assert_eq!(response.result.unwrap(), serde_json::json!({"hi": 1}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn default_peer_contains_delegates_to_contains() {
|
||||||
|
let registry = Arc::new(OperationRegistry::new());
|
||||||
|
let env = LocalOperationEnv::new(registry);
|
||||||
|
assert!(env.peer_contains(&"any-peer".to_string(), "anything"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user