feat(call): replace CompositeOperationEnv with peer-keyed PeerCompositeEnv (call/peer-composite-env)

This commit is contained in:
2026-06-28 22:02:03 +00:00
parent 1aeb634a2d
commit e8219fa550
3 changed files with 370 additions and 80 deletions

View File

@@ -158,7 +158,7 @@ mod tests {
use crate::protocol::wire::{
CallError, EventEnvelope, EVENT_COMPLETED, EVENT_ERROR, EVENT_RESPONDED,
};
use crate::registry::context::{AbortPolicy, ScopedOperationEnv};
use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv};
use crate::registry::env::OperationEnv;
use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance};
use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
@@ -512,6 +512,128 @@ mod tests {
assert!(context.env.contains("fs/readFile"));
}
#[tokio::test]
async fn compose_root_env_attaches_peer_when_connection_has_identity() {
let registry = registry_with(
"fs/readFile",
Visibility::External,
AccessControl::default(),
echo_handler(),
);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let adapter = CallAdapter::new(registry, provider);
let mut conn = CallConnection::new(stub_connection());
let imported = HandlerRegistration::new(
OperationSpec::new(
"worker/exec",
OperationType::Query,
Visibility::Internal,
serde_json::json!({}),
serde_json::json!({}),
vec![],
AccessControl::default(),
),
echo_handler(),
OperationProvenance::FromCall,
None,
None,
Capabilities::new(),
);
conn.register_imported(imported);
let peer_identity = Identity {
id: "worker-a".to_string(),
scopes: vec![],
resources: HashMap::new(),
};
conn.connection()
.set_identity(peer_identity)
.expect("identity not yet set");
let context = adapter.build_root_context("req-5".to_string(), "fs/readFile", None, &conn);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let invoke_ctx = OperationContext {
request_id: "req-5".to_string(),
parent_request_id: None,
identity: None,
handler_identity: None,
capabilities: Capabilities::new(),
metadata: HashMap::new(),
scoped_env: scoped,
env: context.env.clone(),
abort_policy: AbortPolicy::default(),
deadline: context.deadline,
internal: false,
};
let response = context
.env
.invoke("worker", "exec", serde_json::json!({"v": 1}), &invoke_ctx)
.await;
assert!(
response.result.is_ok(),
"peer overlay dispatches the imported op when identity is attached"
);
assert_eq!(response.result.unwrap(), serde_json::json!({"v": 1}));
}
#[tokio::test]
async fn compose_root_env_does_not_attach_peer_when_connection_has_no_identity() {
let registry = registry_with(
"fs/readFile",
Visibility::External,
AccessControl::default(),
echo_handler(),
);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let adapter = CallAdapter::new(registry, provider);
let mut conn = CallConnection::new(stub_connection());
let imported = HandlerRegistration::new(
OperationSpec::new(
"worker/exec",
OperationType::Query,
Visibility::Internal,
serde_json::json!({}),
serde_json::json!({}),
vec![],
AccessControl::default(),
),
echo_handler(),
OperationProvenance::FromCall,
None,
None,
Capabilities::new(),
);
conn.register_imported(imported);
let context = adapter.build_root_context("req-6".to_string(), "fs/readFile", None, &conn);
let scoped = ScopedOperationEnv::new(["worker/exec"]);
let invoke_ctx = OperationContext {
request_id: "req-6".to_string(),
parent_request_id: None,
identity: None,
handler_identity: None,
capabilities: Capabilities::new(),
metadata: HashMap::new(),
scoped_env: scoped,
env: context.env.clone(),
abort_policy: AbortPolicy::default(),
deadline: context.deadline,
internal: false,
};
let response = context
.env
.invoke("worker", "exec", serde_json::json!({}), &invoke_ctx)
.await;
match response.result {
Err(e) => assert_eq!(
e.code, "NOT_FOUND",
"no peer overlay attached: op falls through to base registry which has no worker/exec"
),
other => panic!("expected NOT_FOUND, got {other:?}"),
}
}
#[tokio::test]
async fn dispatch_requested_round_trip_returns_responded() {
let registry = registry_with(

View File

@@ -29,7 +29,7 @@ use super::wire::{
};
use crate::protocol::adapter::SessionOverlaySource;
use crate::registry::context::{AbortPolicy, OperationContext, ScopedOperationEnv};
use crate::registry::env::{CompositeOperationEnv, LocalOperationEnv, OperationEnv};
use crate::registry::env::{LocalOperationEnv, OperationEnv, PeerCompositeEnv};
use crate::registry::registration::OperationRegistry;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
@@ -107,12 +107,19 @@ impl Dispatcher {
.session_source
.as_ref()
.and_then(|s| s.overlay_for(context));
let connection_overlay = connection.overlay_env();
Arc::new(CompositeOperationEnv::new(
base,
Some(connection_overlay),
session,
))
let mut env = PeerCompositeEnv::new(base);
if let Some(session) = session {
env = env.with_session(session);
}
if let Some(peer_id) = connection
.connection()
.identity()
.map(|identity| identity.id.clone())
{
env.attach_peer(peer_id, connection.overlay_env());
}
Arc::new(env)
}
pub(crate) fn build_root_context(