feat(call): replace CompositeOperationEnv with peer-keyed PeerCompositeEnv (call/peer-composite-env)
This commit is contained in:
@@ -158,7 +158,7 @@ mod tests {
|
|||||||
use crate::protocol::wire::{
|
use crate::protocol::wire::{
|
||||||
CallError, EventEnvelope, EVENT_COMPLETED, EVENT_ERROR, EVENT_RESPONDED,
|
CallError, EventEnvelope, EVENT_COMPLETED, EVENT_ERROR, EVENT_RESPONDED,
|
||||||
};
|
};
|
||||||
use crate::registry::context::{AbortPolicy, ScopedOperationEnv};
|
use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv};
|
||||||
use crate::registry::env::OperationEnv;
|
use crate::registry::env::OperationEnv;
|
||||||
use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance};
|
use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance};
|
||||||
use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
|
use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
|
||||||
@@ -512,6 +512,128 @@ mod tests {
|
|||||||
assert!(context.env.contains("fs/readFile"));
|
assert!(context.env.contains("fs/readFile"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn compose_root_env_attaches_peer_when_connection_has_identity() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"fs/readFile",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
echo_handler(),
|
||||||
|
);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
|
let mut conn = CallConnection::new(stub_connection());
|
||||||
|
let imported = HandlerRegistration::new(
|
||||||
|
OperationSpec::new(
|
||||||
|
"worker/exec",
|
||||||
|
OperationType::Query,
|
||||||
|
Visibility::Internal,
|
||||||
|
serde_json::json!({}),
|
||||||
|
serde_json::json!({}),
|
||||||
|
vec![],
|
||||||
|
AccessControl::default(),
|
||||||
|
),
|
||||||
|
echo_handler(),
|
||||||
|
OperationProvenance::FromCall,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Capabilities::new(),
|
||||||
|
);
|
||||||
|
conn.register_imported(imported);
|
||||||
|
let peer_identity = Identity {
|
||||||
|
id: "worker-a".to_string(),
|
||||||
|
scopes: vec![],
|
||||||
|
resources: HashMap::new(),
|
||||||
|
};
|
||||||
|
conn.connection()
|
||||||
|
.set_identity(peer_identity)
|
||||||
|
.expect("identity not yet set");
|
||||||
|
|
||||||
|
let context = adapter.build_root_context("req-5".to_string(), "fs/readFile", None, &conn);
|
||||||
|
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let invoke_ctx = OperationContext {
|
||||||
|
request_id: "req-5".to_string(),
|
||||||
|
parent_request_id: None,
|
||||||
|
identity: None,
|
||||||
|
handler_identity: None,
|
||||||
|
capabilities: Capabilities::new(),
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
scoped_env: scoped,
|
||||||
|
env: context.env.clone(),
|
||||||
|
abort_policy: AbortPolicy::default(),
|
||||||
|
deadline: context.deadline,
|
||||||
|
internal: false,
|
||||||
|
};
|
||||||
|
let response = context
|
||||||
|
.env
|
||||||
|
.invoke("worker", "exec", serde_json::json!({"v": 1}), &invoke_ctx)
|
||||||
|
.await;
|
||||||
|
assert!(
|
||||||
|
response.result.is_ok(),
|
||||||
|
"peer overlay dispatches the imported op when identity is attached"
|
||||||
|
);
|
||||||
|
assert_eq!(response.result.unwrap(), serde_json::json!({"v": 1}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn compose_root_env_does_not_attach_peer_when_connection_has_no_identity() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"fs/readFile",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
echo_handler(),
|
||||||
|
);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
|
let mut conn = CallConnection::new(stub_connection());
|
||||||
|
let imported = HandlerRegistration::new(
|
||||||
|
OperationSpec::new(
|
||||||
|
"worker/exec",
|
||||||
|
OperationType::Query,
|
||||||
|
Visibility::Internal,
|
||||||
|
serde_json::json!({}),
|
||||||
|
serde_json::json!({}),
|
||||||
|
vec![],
|
||||||
|
AccessControl::default(),
|
||||||
|
),
|
||||||
|
echo_handler(),
|
||||||
|
OperationProvenance::FromCall,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Capabilities::new(),
|
||||||
|
);
|
||||||
|
conn.register_imported(imported);
|
||||||
|
|
||||||
|
let context = adapter.build_root_context("req-6".to_string(), "fs/readFile", None, &conn);
|
||||||
|
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let invoke_ctx = OperationContext {
|
||||||
|
request_id: "req-6".to_string(),
|
||||||
|
parent_request_id: None,
|
||||||
|
identity: None,
|
||||||
|
handler_identity: None,
|
||||||
|
capabilities: Capabilities::new(),
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
scoped_env: scoped,
|
||||||
|
env: context.env.clone(),
|
||||||
|
abort_policy: AbortPolicy::default(),
|
||||||
|
deadline: context.deadline,
|
||||||
|
internal: false,
|
||||||
|
};
|
||||||
|
let response = context
|
||||||
|
.env
|
||||||
|
.invoke("worker", "exec", serde_json::json!({}), &invoke_ctx)
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(
|
||||||
|
e.code, "NOT_FOUND",
|
||||||
|
"no peer overlay attached: op falls through to base registry which has no worker/exec"
|
||||||
|
),
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn dispatch_requested_round_trip_returns_responded() {
|
async fn dispatch_requested_round_trip_returns_responded() {
|
||||||
let registry = registry_with(
|
let registry = registry_with(
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ use super::wire::{
|
|||||||
};
|
};
|
||||||
use crate::protocol::adapter::SessionOverlaySource;
|
use crate::protocol::adapter::SessionOverlaySource;
|
||||||
use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv};
|
use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv};
|
||||||
use crate::registry::env::{CompositeOperationEnv, LocalOperationEnv, OperationEnv};
|
use crate::registry::env::{LocalOperationEnv, OperationEnv, PeerCompositeEnv};
|
||||||
use crate::registry::registration::OperationRegistry;
|
use crate::registry::registration::OperationRegistry;
|
||||||
|
|
||||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
@@ -107,12 +107,19 @@ impl Dispatcher {
|
|||||||
.session_source
|
.session_source
|
||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|s| s.overlay_for(context));
|
.and_then(|s| s.overlay_for(context));
|
||||||
let connection_overlay = connection.overlay_env();
|
|
||||||
Arc::new(CompositeOperationEnv::new(
|
let mut env = PeerCompositeEnv::new(base);
|
||||||
base,
|
if let Some(session) = session {
|
||||||
Some(connection_overlay),
|
env = env.with_session(session);
|
||||||
session,
|
}
|
||||||
))
|
if let Some(peer_id) = connection
|
||||||
|
.connection()
|
||||||
|
.identity()
|
||||||
|
.map(|identity| identity.id.clone())
|
||||||
|
{
|
||||||
|
env.attach_peer(peer_id, connection.overlay_env());
|
||||||
|
}
|
||||||
|
Arc::new(env)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn build_root_context(
|
pub(crate) fn build_root_context(
|
||||||
|
|||||||
@@ -7,6 +7,12 @@ use super::context::{generate_request_id, AbortPolicy, OperationContext, ScopedO
|
|||||||
use super::registration::OperationRegistry;
|
use super::registration::OperationRegistry;
|
||||||
use crate::protocol::wire::ResponseEnvelope;
|
use crate::protocol::wire::ResponseEnvelope;
|
||||||
|
|
||||||
|
/// Logical peer identifier (ADR-029 §1, ADR-030 §4). The payload is
|
||||||
|
/// `Identity.id` from `IdentityProvider` resolution (= `PeerEntry.peer_id`),
|
||||||
|
/// stable across key rotation — NOT a connection-assigned UUID and NOT the
|
||||||
|
/// peer's cryptographic material.
|
||||||
|
pub type PeerId = String;
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait OperationEnv: Send + Sync {
|
pub trait OperationEnv: Send + Sync {
|
||||||
async fn invoke(
|
async fn invoke(
|
||||||
@@ -93,22 +99,51 @@ impl OperationEnv for LocalOperationEnv {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CompositeOperationEnv {
|
/// Per-call composite env (ADR-024 + ADR-029 §1). Built by the `Dispatcher`
|
||||||
session: Option<Arc<dyn OperationEnv + Send + Sync>>,
|
/// in `compose_root_env` from the active layers. The child inherits this by
|
||||||
connection: Option<Arc<dyn OperationEnv + Send + Sync>>,
|
/// `Arc::clone` through `invoke()`. The Layer 2 connection overlay is
|
||||||
base: Arc<dyn OperationEnv + Send + Sync>,
|
/// **peer-keyed** — a head node with N worker connections holds a
|
||||||
|
/// `HashMap<PeerId, connection_overlay>`, not one overlay. The singular-
|
||||||
|
/// connection case (one peer) is the degenerate case with a single-entry map.
|
||||||
|
pub struct PeerCompositeEnv {
|
||||||
|
pub base: Arc<dyn OperationEnv + Send + Sync>,
|
||||||
|
pub session: Option<Arc<dyn OperationEnv + Send + Sync>>,
|
||||||
|
pub connections: HashMap<PeerId, Arc<dyn OperationEnv + Send + Sync>>,
|
||||||
|
connection_order: Vec<PeerId>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CompositeOperationEnv {
|
impl PeerCompositeEnv {
|
||||||
pub fn new(
|
pub fn new(base: Arc<dyn OperationEnv + Send + Sync>) -> Self {
|
||||||
base: Arc<dyn OperationEnv + Send + Sync>,
|
|
||||||
connection: Option<Arc<dyn OperationEnv + Send + Sync>>,
|
|
||||||
session: Option<Arc<dyn OperationEnv + Send + Sync>>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
session,
|
|
||||||
connection,
|
|
||||||
base,
|
base,
|
||||||
|
session: None,
|
||||||
|
connections: HashMap::new(),
|
||||||
|
connection_order: Vec::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn with_session(mut self, session: Arc<dyn OperationEnv + Send + Sync>) -> Self {
|
||||||
|
self.session = Some(session);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Attach a peer's connection overlay. The `peer_id` comes from
|
||||||
|
/// `connection.identity().id` (IdentityProvider resolution). A connection
|
||||||
|
/// with no resolved identity has no `PeerId` and is NOT attached
|
||||||
|
/// (ADR-030 §5) — its ops are invoked through the `CallConnection` handle
|
||||||
|
/// directly, not via peer-keyed composition.
|
||||||
|
pub fn attach_peer(&mut self, peer_id: PeerId, overlay: Arc<dyn OperationEnv + Send + Sync>) {
|
||||||
|
if !self.connections.contains_key(&peer_id) {
|
||||||
|
self.connection_order.push(peer_id.clone());
|
||||||
|
}
|
||||||
|
self.connections.insert(peer_id, overlay);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Detach a peer's overlay (on disconnect). The peer's sub-overlay drops;
|
||||||
|
/// in-flight `PeerRef::Specific(that_peer)` gets `NOT_FOUND`.
|
||||||
|
pub fn detach_peer(&mut self, peer_id: &PeerId) {
|
||||||
|
if self.connections.remove(peer_id).is_some() {
|
||||||
|
self.connection_order.retain(|p| p != peer_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -116,17 +151,21 @@ impl CompositeOperationEnv {
|
|||||||
&self.base
|
&self.base
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn connection(&self) -> &Option<Arc<dyn OperationEnv + Send + Sync>> {
|
|
||||||
&self.connection
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn session(&self) -> &Option<Arc<dyn OperationEnv + Send + Sync>> {
|
pub fn session(&self) -> &Option<Arc<dyn OperationEnv + Send + Sync>> {
|
||||||
&self.session
|
&self.session
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn connections(&self) -> &HashMap<PeerId, Arc<dyn OperationEnv + Send + Sync>> {
|
||||||
|
&self.connections
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn connection_order(&self) -> &[PeerId] {
|
||||||
|
&self.connection_order
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
impl OperationEnv for CompositeOperationEnv {
|
impl OperationEnv for PeerCompositeEnv {
|
||||||
async fn invoke_with_policy(
|
async fn invoke_with_policy(
|
||||||
&self,
|
&self,
|
||||||
namespace: &str,
|
namespace: &str,
|
||||||
@@ -148,11 +187,13 @@ impl OperationEnv for CompositeOperationEnv {
|
|||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if let Some(connection) = &self.connection {
|
for peer_id in &self.connection_order {
|
||||||
if connection.contains(&name) {
|
if let Some(conn_env) = self.connections.get(peer_id) {
|
||||||
return connection
|
if conn_env.contains(&name) {
|
||||||
.invoke_with_policy(namespace, operation, input, parent, policy)
|
return conn_env
|
||||||
.await;
|
.invoke_with_policy(namespace, operation, input, parent, policy)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.base
|
self.base
|
||||||
@@ -162,7 +203,7 @@ impl OperationEnv for CompositeOperationEnv {
|
|||||||
|
|
||||||
fn contains(&self, name: &str) -> bool {
|
fn contains(&self, name: &str) -> bool {
|
||||||
self.session.as_ref().is_some_and(|s| s.contains(name))
|
self.session.as_ref().is_some_and(|s| s.contains(name))
|
||||||
|| self.connection.as_ref().is_some_and(|c| c.contains(name))
|
|| self.connections.values().any(|c| c.contains(name))
|
||||||
|| self.base.contains(name)
|
|| self.base.contains(name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -395,14 +436,14 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn composite_env_dispatches_to_session_overlay_when_contains() {
|
async fn peer_composite_env_routes_to_session_when_it_contains_op() {
|
||||||
let base = Arc::new(NoopEnv { contains_op: true });
|
let base = Arc::new(NoopEnv { contains_op: true });
|
||||||
let session = Arc::new(ProbeEnv {
|
let session = Arc::new(ProbeEnv {
|
||||||
name: "session".to_string(),
|
name: "session".to_string(),
|
||||||
contains_set: vec!["agent/chat".to_string()],
|
contains_set: vec!["agent/chat".to_string()],
|
||||||
dispatched: std::sync::Mutex::new(None),
|
dispatched: std::sync::Mutex::new(None),
|
||||||
});
|
});
|
||||||
let composite = CompositeOperationEnv::new(base, None, Some(session.clone()));
|
let composite = PeerCompositeEnv::new(base).with_session(session.clone());
|
||||||
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
let scoped = ScopedOperationEnv::new(["agent/chat"]);
|
let scoped = ScopedOperationEnv::new(["agent/chat"]);
|
||||||
let ctx = root_context("root-6", None, None, scoped, env.clone());
|
let ctx = root_context("root-6", None, None, scoped, env.clone());
|
||||||
@@ -417,7 +458,41 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn composite_env_falls_through_to_base_when_no_overlay_contains() {
|
async fn peer_composite_env_routes_to_first_peer_in_insertion_order() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_b = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-b".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a.clone());
|
||||||
|
composite.attach_peer("worker-b".to_string(), worker_b.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-7", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke("worker", "exec", serde_json::json!({}), &ctx)
|
||||||
|
.await;
|
||||||
|
assert_eq!(response.result.unwrap(), Value::String("worker-a".into()));
|
||||||
|
assert_eq!(
|
||||||
|
worker_a.dispatched.lock().unwrap().as_deref(),
|
||||||
|
Some("worker/exec")
|
||||||
|
);
|
||||||
|
assert!(worker_b.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn peer_composite_env_falls_through_to_base_when_no_overlay_contains() {
|
||||||
let base = Arc::new(ProbeEnv {
|
let base = Arc::new(ProbeEnv {
|
||||||
name: "base".to_string(),
|
name: "base".to_string(),
|
||||||
contains_set: vec!["fs/readFile".to_string()],
|
contains_set: vec!["fs/readFile".to_string()],
|
||||||
@@ -433,10 +508,11 @@ mod tests {
|
|||||||
contains_set: vec!["worker/exec".to_string()],
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
dispatched: std::sync::Mutex::new(None),
|
dispatched: std::sync::Mutex::new(None),
|
||||||
});
|
});
|
||||||
let composite = CompositeOperationEnv::new(base.clone(), Some(connection), Some(session));
|
let mut composite = PeerCompositeEnv::new(base.clone()).with_session(session);
|
||||||
|
composite.attach_peer("worker-a".to_string(), connection);
|
||||||
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
let scoped = ScopedOperationEnv::new(["fs/readFile"]);
|
let scoped = ScopedOperationEnv::new(["fs/readFile"]);
|
||||||
let ctx = root_context("root-7", None, None, scoped, env.clone());
|
let ctx = root_context("root-8", None, None, scoped, env.clone());
|
||||||
let response = env
|
let response = env
|
||||||
.invoke("fs", "readFile", serde_json::json!({}), &ctx)
|
.invoke("fs", "readFile", serde_json::json!({}), &ctx)
|
||||||
.await;
|
.await;
|
||||||
@@ -448,12 +524,12 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn composite_env_reachability_check_returns_not_found() {
|
async fn peer_composite_env_reachability_check_returns_not_found() {
|
||||||
let base = Arc::new(NoopEnv { contains_op: true });
|
let base = Arc::new(NoopEnv { contains_op: true });
|
||||||
let composite = CompositeOperationEnv::new(base, None, None);
|
let composite = PeerCompositeEnv::new(base);
|
||||||
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
let scoped = ScopedOperationEnv::empty();
|
let scoped = ScopedOperationEnv::empty();
|
||||||
let ctx = root_context("root-8", None, None, scoped, env.clone());
|
let ctx = root_context("root-9", None, None, scoped, env.clone());
|
||||||
let response = env
|
let response = env
|
||||||
.invoke("agent", "chat", serde_json::json!({}), &ctx)
|
.invoke("agent", "chat", serde_json::json!({}), &ctx)
|
||||||
.await;
|
.await;
|
||||||
@@ -463,8 +539,8 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[test]
|
||||||
async fn composite_env_contains_aggregates_layers() {
|
fn peer_composite_env_contains_aggregates_layers() {
|
||||||
let base = Arc::new(ProbeEnv {
|
let base = Arc::new(ProbeEnv {
|
||||||
name: "base".to_string(),
|
name: "base".to_string(),
|
||||||
contains_set: vec!["fs/readFile".to_string()],
|
contains_set: vec!["fs/readFile".to_string()],
|
||||||
@@ -480,19 +556,124 @@ mod tests {
|
|||||||
contains_set: vec!["worker/exec".to_string()],
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
dispatched: std::sync::Mutex::new(None),
|
dispatched: std::sync::Mutex::new(None),
|
||||||
});
|
});
|
||||||
let composite = CompositeOperationEnv::new(base, Some(connection), Some(session));
|
let mut composite = PeerCompositeEnv::new(base).with_session(session);
|
||||||
|
composite.attach_peer("worker-a".to_string(), connection);
|
||||||
assert!(composite.contains("fs/readFile"));
|
assert!(composite.contains("fs/readFile"));
|
||||||
assert!(composite.contains("agent/chat"));
|
assert!(composite.contains("agent/chat"));
|
||||||
assert!(composite.contains("worker/exec"));
|
assert!(composite.contains("worker/exec"));
|
||||||
assert!(!composite.contains("unknown/op"));
|
assert!(!composite.contains("unknown/op"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn peer_composite_env_detach_peer_drops_overlay_and_returns_not_found() {
|
||||||
|
let base: Arc<dyn OperationEnv + Send + Sync> =
|
||||||
|
Arc::new(LocalOperationEnv::new(Arc::new(OperationRegistry::new())));
|
||||||
|
let connection = Arc::new(ProbeEnv {
|
||||||
|
name: "connection".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), connection.clone());
|
||||||
|
composite.detach_peer(&"worker-a".to_string());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-10", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke("worker", "exec", serde_json::json!({}), &ctx)
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(e.code, "NOT_FOUND"),
|
||||||
|
other => panic!("expected NOT_FOUND after detach, got {other:?}"),
|
||||||
|
}
|
||||||
|
assert!(connection.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn peer_composite_env_detach_peer_then_reattach_routes_again() {
|
||||||
|
let base: Arc<dyn OperationEnv + Send + Sync> =
|
||||||
|
Arc::new(LocalOperationEnv::new(Arc::new(OperationRegistry::new())));
|
||||||
|
let connection = Arc::new(ProbeEnv {
|
||||||
|
name: "connection".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), connection.clone());
|
||||||
|
composite.detach_peer(&"worker-a".to_string());
|
||||||
|
composite.attach_peer("worker-a".to_string(), connection.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-10b", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke("worker", "exec", serde_json::json!({}), &ctx)
|
||||||
|
.await;
|
||||||
|
assert_eq!(response.result.unwrap(), Value::String("connection".into()));
|
||||||
|
assert_eq!(
|
||||||
|
connection.dispatched.lock().unwrap().as_deref(),
|
||||||
|
Some("worker/exec")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn peer_composite_env_attach_peer_preserves_insertion_order_on_re_attach() {
|
||||||
|
let base: Arc<dyn OperationEnv + Send + Sync> = Arc::new(NoopEnv { contains_op: true });
|
||||||
|
let overlay_a: Arc<dyn OperationEnv + Send + Sync> =
|
||||||
|
Arc::new(NoopEnv { contains_op: true });
|
||||||
|
let overlay_b: Arc<dyn OperationEnv + Send + Sync> =
|
||||||
|
Arc::new(NoopEnv { contains_op: true });
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), overlay_a);
|
||||||
|
composite.attach_peer("worker-b".to_string(), overlay_b);
|
||||||
|
assert_eq!(composite.connection_order(), &["worker-a", "worker-b"]);
|
||||||
|
let overlay_a2: Arc<dyn OperationEnv + Send + Sync> =
|
||||||
|
Arc::new(NoopEnv { contains_op: true });
|
||||||
|
composite.attach_peer("worker-a".to_string(), overlay_a2);
|
||||||
|
assert_eq!(
|
||||||
|
composite.connection_order(),
|
||||||
|
&["worker-a", "worker-b"],
|
||||||
|
"re-attach keeps original position"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn peer_composite_env_routes_to_connection_when_session_absent_or_missing() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec![],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let connection = Arc::new(ProbeEnv {
|
||||||
|
name: "connection".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let session = Arc::new(ProbeEnv {
|
||||||
|
name: "session".to_string(),
|
||||||
|
contains_set: vec!["agent/chat".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base).with_session(session);
|
||||||
|
composite.attach_peer("worker-a".to_string(), connection.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-11", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke("worker", "exec", serde_json::json!({}), &ctx)
|
||||||
|
.await;
|
||||||
|
assert_eq!(response.result.unwrap(), Value::String("connection".into()));
|
||||||
|
assert_eq!(
|
||||||
|
connection.dispatched.lock().unwrap().as_deref(),
|
||||||
|
Some("worker/exec")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn local_env_unknown_op_after_reachability_pass_returns_not_found() {
|
async fn local_env_unknown_op_after_reachability_pass_returns_not_found() {
|
||||||
let registry = Arc::new(OperationRegistry::new());
|
let registry = Arc::new(OperationRegistry::new());
|
||||||
let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry)));
|
let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry)));
|
||||||
let scoped = ScopedOperationEnv::new(["fs/readFile"]);
|
let scoped = ScopedOperationEnv::new(["fs/readFile"]);
|
||||||
let ctx = root_context("root-9", None, None, scoped, env.clone());
|
let ctx = root_context("root-12", None, None, scoped, env.clone());
|
||||||
let response = env
|
let response = env
|
||||||
.invoke("fs", "readFile", serde_json::json!({}), &ctx)
|
.invoke("fs", "readFile", serde_json::json!({}), &ctx)
|
||||||
.await;
|
.await;
|
||||||
@@ -514,7 +695,7 @@ mod tests {
|
|||||||
let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry)));
|
let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry)));
|
||||||
let scoped = ScopedOperationEnv::new(["child/run"]);
|
let scoped = ScopedOperationEnv::new(["child/run"]);
|
||||||
let deadline = Instant::now() + Duration::from_secs(5);
|
let deadline = Instant::now() + Duration::from_secs(5);
|
||||||
let mut ctx = root_context("root-10", None, None, scoped, env.clone());
|
let mut ctx = root_context("root-13", None, None, scoped, env.clone());
|
||||||
ctx.deadline = Some(deadline);
|
ctx.deadline = Some(deadline);
|
||||||
let response = env
|
let response = env
|
||||||
.invoke("child", "run", serde_json::json!({}), &ctx)
|
.invoke("child", "run", serde_json::json!({}), &ctx)
|
||||||
@@ -550,49 +731,29 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn composite_env_accessors_return_refs() {
|
fn peer_composite_env_accessors_return_refs() {
|
||||||
let base: Arc<dyn OperationEnv + Send + Sync> = Arc::new(NoopEnv { contains_op: true });
|
let base: Arc<dyn OperationEnv + Send + Sync> = Arc::new(NoopEnv { contains_op: true });
|
||||||
let session: Arc<dyn OperationEnv + Send + Sync> = Arc::new(NoopEnv { contains_op: true });
|
let session: Arc<dyn OperationEnv + Send + Sync> = Arc::new(NoopEnv { contains_op: true });
|
||||||
let connection: Arc<dyn OperationEnv + Send + Sync> =
|
let connection: Arc<dyn OperationEnv + Send + Sync> =
|
||||||
Arc::new(NoopEnv { contains_op: false });
|
Arc::new(NoopEnv { contains_op: false });
|
||||||
let composite = CompositeOperationEnv::new(
|
let mut composite =
|
||||||
Arc::clone(&base),
|
PeerCompositeEnv::new(Arc::clone(&base)).with_session(Arc::clone(&session));
|
||||||
Some(Arc::clone(&connection)),
|
composite.attach_peer("worker-a".to_string(), Arc::clone(&connection));
|
||||||
Some(Arc::clone(&session)),
|
|
||||||
);
|
|
||||||
assert!(Arc::ptr_eq(composite.base(), &base));
|
assert!(Arc::ptr_eq(composite.base(), &base));
|
||||||
assert!(composite.session().is_some());
|
assert!(composite.session().is_some());
|
||||||
assert!(composite.connection().is_some());
|
assert!(composite.connections().get("worker-a").is_some());
|
||||||
|
assert_eq!(composite.connection_order(), &["worker-a"]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[test]
|
||||||
async fn composite_env_dispatches_to_connection_when_session_absent_or_missing() {
|
fn peer_composite_env_singular_connection_is_degenerate_single_entry_map() {
|
||||||
let base = Arc::new(ProbeEnv {
|
let base: Arc<dyn OperationEnv + Send + Sync> = Arc::new(NoopEnv { contains_op: true });
|
||||||
name: "base".to_string(),
|
let connection: Arc<dyn OperationEnv + Send + Sync> =
|
||||||
contains_set: vec![],
|
Arc::new(NoopEnv { contains_op: true });
|
||||||
dispatched: std::sync::Mutex::new(None),
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
});
|
composite.attach_peer("worker-a".to_string(), connection);
|
||||||
let connection = Arc::new(ProbeEnv {
|
assert_eq!(composite.connections().len(), 1);
|
||||||
name: "connection".to_string(),
|
assert_eq!(composite.connection_order().len(), 1);
|
||||||
contains_set: vec!["worker/exec".to_string()],
|
assert!(composite.connections().contains_key("worker-a"));
|
||||||
dispatched: std::sync::Mutex::new(None),
|
|
||||||
});
|
|
||||||
let session = Arc::new(ProbeEnv {
|
|
||||||
name: "session".to_string(),
|
|
||||||
contains_set: vec!["agent/chat".to_string()],
|
|
||||||
dispatched: std::sync::Mutex::new(None),
|
|
||||||
});
|
|
||||||
let composite = CompositeOperationEnv::new(base, Some(connection.clone()), Some(session));
|
|
||||||
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
|
||||||
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
|
||||||
let ctx = root_context("root-11", None, None, scoped, env.clone());
|
|
||||||
let response = env
|
|
||||||
.invoke("worker", "exec", serde_json::json!({}), &ctx)
|
|
||||||
.await;
|
|
||||||
assert_eq!(response.result.unwrap(), Value::String("connection".into()));
|
|
||||||
assert_eq!(
|
|
||||||
connection.dispatched.lock().unwrap().as_deref(),
|
|
||||||
Some("worker/exec")
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user