Compare commits
2 Commits
37e430b09d
...
e27b77f3ae
| Author | SHA1 | Date | |
|---|---|---|---|
| e27b77f3ae | |||
| e72a296089 |
@@ -114,6 +114,7 @@ mod tests {
|
|||||||
parent_request_id: None,
|
parent_request_id: None,
|
||||||
identity: None,
|
identity: None,
|
||||||
handler_identity: None,
|
handler_identity: None,
|
||||||
|
forwarded_for: None,
|
||||||
capabilities: Capabilities::new(),
|
capabilities: Capabilities::new(),
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
scoped_env: ScopedOperationEnv::empty(),
|
scoped_env: ScopedOperationEnv::empty(),
|
||||||
|
|||||||
@@ -106,10 +106,16 @@ impl CallAdapter {
|
|||||||
request_id: String,
|
request_id: String,
|
||||||
operation_name: &str,
|
operation_name: &str,
|
||||||
identity: Option<Identity>,
|
identity: Option<Identity>,
|
||||||
|
forwarded_for: Option<Identity>,
|
||||||
connection: &CallConnection,
|
connection: &CallConnection,
|
||||||
) -> OperationContext {
|
) -> OperationContext {
|
||||||
self.dispatcher
|
self.dispatcher.build_root_context(
|
||||||
.build_root_context(request_id, operation_name, identity, connection)
|
request_id,
|
||||||
|
operation_name,
|
||||||
|
identity,
|
||||||
|
forwarded_for,
|
||||||
|
connection,
|
||||||
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -402,7 +408,8 @@ mod tests {
|
|||||||
let adapter = CallAdapter::new(registry, provider);
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
let conn = CallConnection::new(stub_connection());
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
|
||||||
let context = adapter.build_root_context("req-1".to_string(), "echo/run", None, &conn);
|
let context =
|
||||||
|
adapter.build_root_context("req-1".to_string(), "echo/run", None, None, &conn);
|
||||||
|
|
||||||
assert!(!context.is_internal());
|
assert!(!context.is_internal());
|
||||||
assert!(context.parent_request_id.is_none());
|
assert!(context.parent_request_id.is_none());
|
||||||
@@ -427,7 +434,8 @@ mod tests {
|
|||||||
let adapter = CallAdapter::new(registry, provider);
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
let conn = CallConnection::new(stub_connection());
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
|
||||||
let context = adapter.build_root_context("req-2".to_string(), "agent/run", None, &conn);
|
let context =
|
||||||
|
adapter.build_root_context("req-2".to_string(), "agent/run", None, None, &conn);
|
||||||
|
|
||||||
assert!(context.scoped_env.allows("fs/readFile"));
|
assert!(context.scoped_env.allows("fs/readFile"));
|
||||||
assert!(!context.scoped_env.allows("other/op"));
|
assert!(!context.scoped_env.allows("other/op"));
|
||||||
@@ -445,7 +453,8 @@ mod tests {
|
|||||||
let adapter = CallAdapter::new(registry.clone(), provider);
|
let adapter = CallAdapter::new(registry.clone(), provider);
|
||||||
let conn = CallConnection::new(stub_connection());
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
|
||||||
let context = adapter.build_root_context("req-3".to_string(), "fs/readFile", None, &conn);
|
let context =
|
||||||
|
adapter.build_root_context("req-3".to_string(), "fs/readFile", None, None, &conn);
|
||||||
|
|
||||||
assert!(context.env.contains("fs/readFile"));
|
assert!(context.env.contains("fs/readFile"));
|
||||||
}
|
}
|
||||||
@@ -506,7 +515,8 @@ mod tests {
|
|||||||
let adapter = CallAdapter::new(registry, provider).with_session_source(session_source);
|
let adapter = CallAdapter::new(registry, provider).with_session_source(session_source);
|
||||||
let conn = CallConnection::new(stub_connection());
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
|
||||||
let context = adapter.build_root_context("req-4".to_string(), "fs/readFile", None, &conn);
|
let context =
|
||||||
|
adapter.build_root_context("req-4".to_string(), "fs/readFile", None, None, &conn);
|
||||||
|
|
||||||
assert!(context.env.contains("agent/chat"));
|
assert!(context.env.contains("agent/chat"));
|
||||||
assert!(context.env.contains("fs/readFile"));
|
assert!(context.env.contains("fs/readFile"));
|
||||||
@@ -891,7 +901,8 @@ mod tests {
|
|||||||
let adapter = CallAdapter::new(registry, provider);
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
let conn = CallConnection::new(stub_connection());
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
|
||||||
let context = adapter.build_root_context("req-7".to_string(), "missing/op", None, &conn);
|
let context =
|
||||||
|
adapter.build_root_context("req-7".to_string(), "missing/op", None, None, &conn);
|
||||||
|
|
||||||
assert!(!context.scoped_env.allows("missing/op"));
|
assert!(!context.scoped_env.allows("missing/op"));
|
||||||
assert!(context.handler_identity.is_none());
|
assert!(context.handler_identity.is_none());
|
||||||
@@ -925,6 +936,220 @@ mod tests {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn inspect_forwarded_for_handler() -> crate::registry::registration::Handler {
|
||||||
|
make_handler(|_input, context| async move {
|
||||||
|
let identity_id = context.identity.as_ref().map(|i| i.id.clone());
|
||||||
|
let forwarded_for_id = context.forwarded_for.as_ref().map(|i| i.id.clone());
|
||||||
|
ResponseEnvelope::ok(
|
||||||
|
context.request_id,
|
||||||
|
serde_json::json!({
|
||||||
|
"identity_id": identity_id,
|
||||||
|
"forwarded_for_id": forwarded_for_id,
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_root_context_populates_forwarded_for_from_argument() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"echo/run",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
echo_handler(),
|
||||||
|
);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
|
||||||
|
let forwarded = identity_with_scopes("alice", &["fs:read"]);
|
||||||
|
let context = adapter.build_root_context(
|
||||||
|
"req-ff-1".to_string(),
|
||||||
|
"echo/run",
|
||||||
|
None,
|
||||||
|
Some(forwarded.clone()),
|
||||||
|
&conn,
|
||||||
|
);
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
context.forwarded_for.as_ref().map(|i| &i.id),
|
||||||
|
Some(&"alice".to_string())
|
||||||
|
);
|
||||||
|
assert_eq!(
|
||||||
|
context.forwarded_for.as_ref().map(|i| i.scopes.clone()),
|
||||||
|
Some(forwarded.scopes.clone())
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_root_context_missing_forwarded_for_is_none() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"echo/run",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
echo_handler(),
|
||||||
|
);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
|
||||||
|
let context =
|
||||||
|
adapter.build_root_context("req-ff-2".to_string(), "echo/run", None, None, &conn);
|
||||||
|
|
||||||
|
assert!(context.forwarded_for.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn dispatch_requested_populates_forwarded_for_from_payload() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"inspect/run",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
inspect_forwarded_for_handler(),
|
||||||
|
);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
|
let conn = Arc::new(CallConnection::new(stub_connection()));
|
||||||
|
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"operationId": "/inspect/run",
|
||||||
|
"input": {},
|
||||||
|
"forwarded_for": {
|
||||||
|
"id": "alice",
|
||||||
|
"scopes": ["fs:read", "docker:start"],
|
||||||
|
"resources": {},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
let response = adapter
|
||||||
|
.dispatch_requested(&conn, "req-ff-3".to_string(), payload)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let out = response.result.expect("ok");
|
||||||
|
assert_eq!(out["forwarded_for_id"], Value::String("alice".into()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn dispatch_requested_missing_forwarded_for_yields_none() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"inspect/run",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
inspect_forwarded_for_handler(),
|
||||||
|
);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
|
let conn = Arc::new(CallConnection::new(stub_connection()));
|
||||||
|
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"operationId": "/inspect/run",
|
||||||
|
"input": {},
|
||||||
|
});
|
||||||
|
let response = adapter
|
||||||
|
.dispatch_requested(&conn, "req-ff-4".to_string(), payload)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let out = response.result.expect("ok");
|
||||||
|
assert!(out["forwarded_for_id"].is_null());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn dispatch_requested_malformed_forwarded_for_yields_none() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"inspect/run",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
inspect_forwarded_for_handler(),
|
||||||
|
);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
|
let conn = Arc::new(CallConnection::new(stub_connection()));
|
||||||
|
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"operationId": "/inspect/run",
|
||||||
|
"input": {},
|
||||||
|
"forwarded_for": "not-an-object",
|
||||||
|
});
|
||||||
|
let response = adapter
|
||||||
|
.dispatch_requested(&conn, "req-ff-5".to_string(), payload)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let out = response.result.expect("ok");
|
||||||
|
assert!(out["forwarded_for_id"].is_null());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn dispatch_requested_forwarded_for_does_not_satisfy_acl() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"admin/run",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl {
|
||||||
|
required_scopes: vec!["admin".to_string()],
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
inspect_forwarded_for_handler(),
|
||||||
|
);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
|
let conn = Arc::new(CallConnection::new(stub_connection()));
|
||||||
|
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"operationId": "/admin/run",
|
||||||
|
"input": {},
|
||||||
|
"forwarded_for": {
|
||||||
|
"id": "alice",
|
||||||
|
"scopes": ["admin"],
|
||||||
|
"resources": {},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
let response = adapter
|
||||||
|
.dispatch_requested(&conn, "req-ff-6".to_string(), payload)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match response.result {
|
||||||
|
Err(e) => {
|
||||||
|
assert_eq!(e.code, "FORBIDDEN");
|
||||||
|
assert_eq!(e.message, "authentication required");
|
||||||
|
}
|
||||||
|
other => panic!("expected FORBIDDEN (forwarded_for must not authorize), got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn dispatch_requested_forwarded_for_present_with_satisfied_acl() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"admin/run",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl {
|
||||||
|
required_scopes: vec!["admin".to_string()],
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
inspect_forwarded_for_handler(),
|
||||||
|
);
|
||||||
|
let token_identity = identity_with_scopes("hub", &["admin"]);
|
||||||
|
let provider: Arc<dyn IdentityProvider> =
|
||||||
|
Arc::new(StaticIdentityProvider::new().with_token("alk_hub", token_identity));
|
||||||
|
let adapter = CallAdapter::new(registry, provider);
|
||||||
|
let conn = Arc::new(CallConnection::new(stub_connection()));
|
||||||
|
|
||||||
|
let payload = serde_json::json!({
|
||||||
|
"operationId": "/admin/run",
|
||||||
|
"input": {},
|
||||||
|
"auth_token": "alk_hub",
|
||||||
|
"forwarded_for": {
|
||||||
|
"id": "alice",
|
||||||
|
"scopes": ["fs:read"],
|
||||||
|
"resources": {},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
let response = adapter
|
||||||
|
.dispatch_requested(&conn, "req-ff-7".to_string(), payload)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let out = response.result.expect("ok");
|
||||||
|
assert_eq!(out["identity_id"], Value::String("hub".into()));
|
||||||
|
assert_eq!(out["forwarded_for_id"], Value::String("alice".into()));
|
||||||
|
}
|
||||||
|
|
||||||
fn encode_frame(envelope: &EventEnvelope) -> Vec<u8> {
|
fn encode_frame(envelope: &EventEnvelope) -> Vec<u8> {
|
||||||
let body = serde_json::to_vec(envelope).unwrap();
|
let body = serde_json::to_vec(envelope).unwrap();
|
||||||
let mut buf = (body.len() as u32).to_be_bytes().to_vec();
|
let mut buf = (body.len() as u32).to_be_bytes().to_vec();
|
||||||
|
|||||||
@@ -283,6 +283,7 @@ impl OperationEnv for OverlayOperationEnv {
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|ca| ca.as_identity()),
|
.and_then(|ca| ca.as_identity()),
|
||||||
handler_identity: composition_authority,
|
handler_identity: composition_authority,
|
||||||
|
forwarded_for: None,
|
||||||
capabilities: parent.capabilities.clone(),
|
capabilities: parent.capabilities.clone(),
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
abort_policy: policy,
|
abort_policy: policy,
|
||||||
@@ -431,6 +432,7 @@ mod tests {
|
|||||||
parent_request_id: None,
|
parent_request_id: None,
|
||||||
identity: None,
|
identity: None,
|
||||||
handler_identity: Some(CompositionAuthority::new("agent", ["fs:read".to_string()])),
|
handler_identity: Some(CompositionAuthority::new("agent", ["fs:read".to_string()])),
|
||||||
|
forwarded_for: None,
|
||||||
capabilities: Capabilities::new(),
|
capabilities: Capabilities::new(),
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
scoped_env,
|
scoped_env,
|
||||||
|
|||||||
@@ -127,6 +127,7 @@ impl Dispatcher {
|
|||||||
request_id: String,
|
request_id: String,
|
||||||
operation_name: &str,
|
operation_name: &str,
|
||||||
identity: Option<Identity>,
|
identity: Option<Identity>,
|
||||||
|
forwarded_for: Option<Identity>,
|
||||||
connection: &CallConnection,
|
connection: &CallConnection,
|
||||||
) -> OperationContext {
|
) -> OperationContext {
|
||||||
let registration = self.registry.registration(operation_name);
|
let registration = self.registry.registration(operation_name);
|
||||||
@@ -152,6 +153,7 @@ impl Dispatcher {
|
|||||||
parent_request_id: None,
|
parent_request_id: None,
|
||||||
identity: identity.clone(),
|
identity: identity.clone(),
|
||||||
handler_identity: composition_authority,
|
handler_identity: composition_authority,
|
||||||
|
forwarded_for,
|
||||||
capabilities,
|
capabilities,
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
deadline: Some(Instant::now() + self.default_timeout),
|
deadline: Some(Instant::now() + self.default_timeout),
|
||||||
@@ -179,10 +181,19 @@ impl Dispatcher {
|
|||||||
let connection_identity = connection.connection().identity().cloned();
|
let connection_identity = connection.connection().identity().cloned();
|
||||||
let identity = self.resolve_identity(connection_identity, &payload);
|
let identity = self.resolve_identity(connection_identity, &payload);
|
||||||
|
|
||||||
|
let forwarded_for = payload
|
||||||
|
.get("forwarded_for")
|
||||||
|
.and_then(|v| serde_json::from_value::<Identity>(v.clone()).ok());
|
||||||
|
|
||||||
let input = payload.get("input").cloned().unwrap_or(Value::Null);
|
let input = payload.get("input").cloned().unwrap_or(Value::Null);
|
||||||
|
|
||||||
let context =
|
let context = self.build_root_context(
|
||||||
self.build_root_context(request_id.clone(), &operation_name, identity, connection);
|
request_id.clone(),
|
||||||
|
&operation_name,
|
||||||
|
identity,
|
||||||
|
forwarded_for,
|
||||||
|
connection,
|
||||||
|
);
|
||||||
|
|
||||||
self.registry.invoke(&operation_name, input, context).await
|
self.registry.invoke(&operation_name, input, context).await
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,6 +13,16 @@ pub struct OperationContext {
|
|||||||
pub parent_request_id: Option<String>,
|
pub parent_request_id: Option<String>,
|
||||||
pub identity: Option<Identity>,
|
pub identity: Option<Identity>,
|
||||||
pub handler_identity: Option<CompositionAuthority>,
|
pub handler_identity: Option<CompositionAuthority>,
|
||||||
|
/// The original caller when this call was forwarded by a `from_call`
|
||||||
|
/// handler (ADR-032). **Metadata only** — `AccessControl::check` never
|
||||||
|
/// reads it; the ACL always authorizes `identity` (the direct caller).
|
||||||
|
/// Handlers may read it for logging, auditing, per-user rate limiting,
|
||||||
|
/// or application context. Populated from
|
||||||
|
/// `call.requested.forwarded_for` by the dispatch path; set to `None`
|
||||||
|
/// for composed children (wire-ingress only, not composition-ingress).
|
||||||
|
/// The forwarder's claim, not a verified identity — a malicious hub can
|
||||||
|
/// lie (same property as HTTP `X-Forwarded-For`). See ADR-032.
|
||||||
|
pub forwarded_for: Option<Identity>,
|
||||||
pub capabilities: Capabilities,
|
pub capabilities: Capabilities,
|
||||||
pub metadata: HashMap<String, Value>,
|
pub metadata: HashMap<String, Value>,
|
||||||
pub scoped_env: ScopedOperationEnv,
|
pub scoped_env: ScopedOperationEnv,
|
||||||
|
|||||||
@@ -386,6 +386,7 @@ mod tests {
|
|||||||
parent_request_id: None,
|
parent_request_id: None,
|
||||||
identity: None,
|
identity: None,
|
||||||
handler_identity: None,
|
handler_identity: None,
|
||||||
|
forwarded_for: None,
|
||||||
capabilities: Capabilities::new(),
|
capabilities: Capabilities::new(),
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
scoped_env: ScopedOperationEnv::empty(),
|
scoped_env: ScopedOperationEnv::empty(),
|
||||||
|
|||||||
@@ -13,6 +13,19 @@ use crate::protocol::wire::ResponseEnvelope;
|
|||||||
/// peer's cryptographic material.
|
/// peer's cryptographic material.
|
||||||
pub type PeerId = String;
|
pub type PeerId = String;
|
||||||
|
|
||||||
|
/// Peer-routing selector (ADR-029 §2). Selects a specific peer's sub-overlay
|
||||||
|
/// (`Specific`) or the first peer (insertion order) that serves the op
|
||||||
|
/// (`Any`).
|
||||||
|
///
|
||||||
|
/// `PeerRef::Specific(PeerId)` routes to the named peer's overlay only — no
|
||||||
|
/// fallthrough (explicit routing must be honored or fail loudly, ADR-029 §2).
|
||||||
|
/// `PeerRef::Any` reuses `invoke_with_policy` (the insertion-order fan-out
|
||||||
|
/// built in `PeerCompositeEnv`).
|
||||||
|
pub enum PeerRef {
|
||||||
|
Specific(PeerId),
|
||||||
|
Any,
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait]
|
#[async_trait::async_trait]
|
||||||
pub trait OperationEnv: Send + Sync {
|
pub trait OperationEnv: Send + Sync {
|
||||||
async fn invoke(
|
async fn invoke(
|
||||||
@@ -50,6 +63,26 @@ pub trait OperationEnv: Send + Sync {
|
|||||||
fn peer_operations(&self, _peer: &PeerId) -> Vec<String> {
|
fn peer_operations(&self, _peer: &PeerId) -> Vec<String> {
|
||||||
Vec::new()
|
Vec::new()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Peer-routing composition (ADR-029 §2). Routes to a specific peer
|
||||||
|
/// (`PeerRef::Specific`) or to the first peer that serves the op
|
||||||
|
/// (`PeerRef::Any`). The default impl ignores the peer selector and
|
||||||
|
/// delegates to `invoke_with_policy`, preserving back-compat for
|
||||||
|
/// single-layer envs that don't route by peer. `PeerCompositeEnv`
|
||||||
|
/// overrides with real peer-keyed routing.
|
||||||
|
async fn invoke_peer(
|
||||||
|
&self,
|
||||||
|
peer: &PeerRef,
|
||||||
|
namespace: &str,
|
||||||
|
operation: &str,
|
||||||
|
input: Value,
|
||||||
|
parent: &OperationContext,
|
||||||
|
policy: AbortPolicy,
|
||||||
|
) -> ResponseEnvelope {
|
||||||
|
let _ = peer;
|
||||||
|
self.invoke_with_policy(namespace, operation, input, parent, policy)
|
||||||
|
.await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct LocalOperationEnv {
|
pub struct LocalOperationEnv {
|
||||||
@@ -95,6 +128,7 @@ impl OperationEnv for LocalOperationEnv {
|
|||||||
.as_ref()
|
.as_ref()
|
||||||
.and_then(|ca| ca.as_identity()),
|
.and_then(|ca| ca.as_identity()),
|
||||||
handler_identity: registration.composition_authority.clone(),
|
handler_identity: registration.composition_authority.clone(),
|
||||||
|
forwarded_for: None,
|
||||||
capabilities: parent.capabilities.clone(),
|
capabilities: parent.capabilities.clone(),
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
abort_policy: policy,
|
abort_policy: policy,
|
||||||
@@ -218,6 +252,41 @@ impl OperationEnv for PeerCompositeEnv {
|
|||||||
|| self.connections.values().any(|c| c.contains(name))
|
|| self.connections.values().any(|c| c.contains(name))
|
||||||
|| self.base.contains(name)
|
|| self.base.contains(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn invoke_peer(
|
||||||
|
&self,
|
||||||
|
peer: &PeerRef,
|
||||||
|
namespace: &str,
|
||||||
|
operation: &str,
|
||||||
|
input: Value,
|
||||||
|
parent: &OperationContext,
|
||||||
|
policy: AbortPolicy,
|
||||||
|
) -> ResponseEnvelope {
|
||||||
|
let name = format!("{namespace}/{operation}");
|
||||||
|
if !parent.scoped_env.allows(&name) {
|
||||||
|
return ResponseEnvelope::not_found(parent.request_id.clone(), &name);
|
||||||
|
}
|
||||||
|
match peer {
|
||||||
|
PeerRef::Specific(peer_id) => match self.connections.get(peer_id) {
|
||||||
|
Some(conn_env) if conn_env.contains(&name) => {
|
||||||
|
conn_env
|
||||||
|
.invoke_with_policy(namespace, operation, input, parent, policy)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
_ => ResponseEnvelope::not_found(parent.request_id.clone(), &name),
|
||||||
|
},
|
||||||
|
PeerRef::Any => {
|
||||||
|
self.invoke_with_policy(namespace, operation, input, parent, policy)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn peer_contains(&self, peer: &PeerId, name: &str) -> bool {
|
||||||
|
self.connections
|
||||||
|
.get(peer)
|
||||||
|
.is_some_and(|c| c.contains(name))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -262,6 +331,7 @@ mod tests {
|
|||||||
make_handler(|_input, context| async move {
|
make_handler(|_input, context| async move {
|
||||||
let internal = context.is_internal();
|
let internal = context.is_internal();
|
||||||
let id = context.identity.as_ref().map(|i| i.id.clone());
|
let id = context.identity.as_ref().map(|i| i.id.clone());
|
||||||
|
let forwarded_for_id = context.forwarded_for.as_ref().map(|i| i.id.clone());
|
||||||
let metadata_empty = context.metadata.is_empty();
|
let metadata_empty = context.metadata.is_empty();
|
||||||
let parent_set = context.parent_request_id.is_some();
|
let parent_set = context.parent_request_id.is_some();
|
||||||
ResponseEnvelope::ok(
|
ResponseEnvelope::ok(
|
||||||
@@ -269,6 +339,7 @@ mod tests {
|
|||||||
serde_json::json!({
|
serde_json::json!({
|
||||||
"internal": internal,
|
"internal": internal,
|
||||||
"identity_id": id,
|
"identity_id": id,
|
||||||
|
"forwarded_for_id": forwarded_for_id,
|
||||||
"metadata_empty": metadata_empty,
|
"metadata_empty": metadata_empty,
|
||||||
"parent_set": parent_set,
|
"parent_set": parent_set,
|
||||||
}),
|
}),
|
||||||
@@ -282,12 +353,31 @@ mod tests {
|
|||||||
handler_identity: Option<CompositionAuthority>,
|
handler_identity: Option<CompositionAuthority>,
|
||||||
scoped_env: ScopedOperationEnv,
|
scoped_env: ScopedOperationEnv,
|
||||||
env: Arc<dyn OperationEnv + Send + Sync>,
|
env: Arc<dyn OperationEnv + Send + Sync>,
|
||||||
|
) -> OperationContext {
|
||||||
|
root_context_with_forwarded_for(
|
||||||
|
request_id,
|
||||||
|
identity,
|
||||||
|
handler_identity,
|
||||||
|
None,
|
||||||
|
scoped_env,
|
||||||
|
env,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn root_context_with_forwarded_for(
|
||||||
|
request_id: &str,
|
||||||
|
identity: Option<Identity>,
|
||||||
|
handler_identity: Option<CompositionAuthority>,
|
||||||
|
forwarded_for: Option<Identity>,
|
||||||
|
scoped_env: ScopedOperationEnv,
|
||||||
|
env: Arc<dyn OperationEnv + Send + Sync>,
|
||||||
) -> OperationContext {
|
) -> OperationContext {
|
||||||
OperationContext {
|
OperationContext {
|
||||||
request_id: request_id.to_string(),
|
request_id: request_id.to_string(),
|
||||||
parent_request_id: None,
|
parent_request_id: None,
|
||||||
identity,
|
identity,
|
||||||
handler_identity,
|
handler_identity,
|
||||||
|
forwarded_for,
|
||||||
capabilities: Capabilities::new(),
|
capabilities: Capabilities::new(),
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
scoped_env,
|
scoped_env,
|
||||||
@@ -422,6 +512,41 @@ mod tests {
|
|||||||
assert_eq!(out["metadata_empty"], Value::Bool(true));
|
assert_eq!(out["metadata_empty"], Value::Bool(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn local_env_child_does_not_inherit_forwarded_for() {
|
||||||
|
let registry = registry_with(
|
||||||
|
"child/run",
|
||||||
|
Visibility::External,
|
||||||
|
inspect_handler(),
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
);
|
||||||
|
let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry)));
|
||||||
|
let scoped = ScopedOperationEnv::new(["child/run"]);
|
||||||
|
let forwarded = Identity {
|
||||||
|
id: "alice".to_string(),
|
||||||
|
scopes: vec![],
|
||||||
|
resources: HashMap::new(),
|
||||||
|
};
|
||||||
|
let ctx = root_context_with_forwarded_for(
|
||||||
|
"root-ff",
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Some(forwarded),
|
||||||
|
scoped,
|
||||||
|
env.clone(),
|
||||||
|
);
|
||||||
|
assert!(ctx.forwarded_for.is_some());
|
||||||
|
let response = env
|
||||||
|
.invoke("child", "run", serde_json::json!({}), &ctx)
|
||||||
|
.await;
|
||||||
|
let out = response.result.expect("ok");
|
||||||
|
assert!(
|
||||||
|
out["forwarded_for_id"].is_null(),
|
||||||
|
"composed child must NOT inherit forwarded_for (wire-ingress only, ADR-032)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
struct ProbeEnv {
|
struct ProbeEnv {
|
||||||
name: String,
|
name: String,
|
||||||
contains_set: Vec<String>,
|
contains_set: Vec<String>,
|
||||||
@@ -768,4 +893,242 @@ mod tests {
|
|||||||
assert_eq!(composite.connection_order().len(), 1);
|
assert_eq!(composite.connection_order().len(), 1);
|
||||||
assert!(composite.connections().contains_key("worker-a"));
|
assert!(composite.connections().contains_key("worker-a"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_specific_routes_to_named_peer() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_b = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-b".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a.clone());
|
||||||
|
composite.attach_peer("worker-b".to_string(), worker_b.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-pr-1", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("worker-b".to_string()),
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(response.result.unwrap(), Value::String("worker-b".into()));
|
||||||
|
assert_eq!(
|
||||||
|
worker_b.dispatched.lock().unwrap().as_deref(),
|
||||||
|
Some("worker/exec")
|
||||||
|
);
|
||||||
|
assert!(worker_a.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_specific_returns_not_found_when_peer_does_not_serve_op() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["other/op".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base.clone());
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-pr-2", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("worker-a".to_string()),
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(e.code, "NOT_FOUND"),
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
assert!(worker_a.dispatched.lock().unwrap().is_none());
|
||||||
|
assert!(
|
||||||
|
base.dispatched.lock().unwrap().is_none(),
|
||||||
|
"no fallthrough to base"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_specific_returns_not_found_when_peer_unknown() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base.clone());
|
||||||
|
composite.attach_peer(
|
||||||
|
"worker-a".to_string(),
|
||||||
|
Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-pr-3", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("ghost".to_string()),
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(e.code, "NOT_FOUND"),
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
assert!(base.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_any_routes_to_first_peer_in_insertion_order() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_b = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-b".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a.clone());
|
||||||
|
composite.attach_peer("worker-b".to_string(), worker_b.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::new(["worker/exec"]);
|
||||||
|
let ctx = root_context("root-pr-4", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Any,
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(response.result.unwrap(), Value::String("worker-a".into()));
|
||||||
|
assert_eq!(
|
||||||
|
worker_a.dispatched.lock().unwrap().as_deref(),
|
||||||
|
Some("worker/exec")
|
||||||
|
);
|
||||||
|
assert!(worker_b.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_peer_reachability_check_gates_before_routing() {
|
||||||
|
let base = Arc::new(NoopEnv { contains_op: true });
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a.clone());
|
||||||
|
let env: Arc<dyn OperationEnv + Send + Sync> = Arc::new(composite);
|
||||||
|
let scoped = ScopedOperationEnv::empty();
|
||||||
|
let ctx = root_context("root-pr-5", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("worker-a".to_string()),
|
||||||
|
"worker",
|
||||||
|
"exec",
|
||||||
|
serde_json::json!({}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(e.code, "NOT_FOUND"),
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
assert!(worker_a.dispatched.lock().unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn peer_contains_checks_specific_peer_overlay() {
|
||||||
|
let base = Arc::new(ProbeEnv {
|
||||||
|
name: "base".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_a = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-a".to_string(),
|
||||||
|
contains_set: vec!["worker/exec".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let worker_b = Arc::new(ProbeEnv {
|
||||||
|
name: "worker-b".to_string(),
|
||||||
|
contains_set: vec!["other/op".to_string()],
|
||||||
|
dispatched: std::sync::Mutex::new(None),
|
||||||
|
});
|
||||||
|
let mut composite = PeerCompositeEnv::new(base);
|
||||||
|
composite.attach_peer("worker-a".to_string(), worker_a);
|
||||||
|
composite.attach_peer("worker-b".to_string(), worker_b);
|
||||||
|
assert!(composite.peer_contains(&"worker-a".to_string(), "worker/exec"));
|
||||||
|
assert!(!composite.peer_contains(&"worker-b".to_string(), "worker/exec"));
|
||||||
|
assert!(!composite.peer_contains(&"ghost".to_string(), "worker/exec"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn default_invoke_peer_delegates_to_invoke_with_policy() {
|
||||||
|
let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None);
|
||||||
|
let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry)));
|
||||||
|
let scoped = ScopedOperationEnv::new(["echo/run"]);
|
||||||
|
let ctx = root_context("root-pr-6", None, None, scoped, env.clone());
|
||||||
|
let response = env
|
||||||
|
.invoke_peer(
|
||||||
|
&PeerRef::Specific("any-peer".to_string()),
|
||||||
|
"echo",
|
||||||
|
"run",
|
||||||
|
serde_json::json!({"hi": 1}),
|
||||||
|
&ctx,
|
||||||
|
AbortPolicy::default(),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert!(response.result.is_ok());
|
||||||
|
assert_eq!(response.result.unwrap(), serde_json::json!({"hi": 1}));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn default_peer_contains_delegates_to_contains() {
|
||||||
|
let registry = Arc::new(OperationRegistry::new());
|
||||||
|
let env = LocalOperationEnv::new(registry);
|
||||||
|
assert!(env.peer_contains(&"any-peer".to_string(), "anything"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -254,6 +254,7 @@ mod tests {
|
|||||||
parent_request_id: None,
|
parent_request_id: None,
|
||||||
identity,
|
identity,
|
||||||
handler_identity,
|
handler_identity,
|
||||||
|
forwarded_for: None,
|
||||||
capabilities: Capabilities::new(),
|
capabilities: Capabilities::new(),
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
scoped_env,
|
scoped_env,
|
||||||
|
|||||||
@@ -272,6 +272,7 @@ async fn from_call_discovers_and_forwards_over_quic_loopback() {
|
|||||||
parent_request_id: None,
|
parent_request_id: None,
|
||||||
identity: None,
|
identity: None,
|
||||||
handler_identity: None,
|
handler_identity: None,
|
||||||
|
forwarded_for: None,
|
||||||
capabilities: Capabilities::new(),
|
capabilities: Capabilities::new(),
|
||||||
metadata: Default::default(),
|
metadata: Default::default(),
|
||||||
scoped_env: scoped,
|
scoped_env: scoped,
|
||||||
|
|||||||
@@ -55,11 +55,12 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::config::{DynamicConfig, PeerEntry};
|
use crate::config::{DynamicConfig, PeerEntry};
|
||||||
use crate::store::StoreError;
|
use crate::store::StoreError;
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq)]
|
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
|
||||||
pub struct Identity {
|
pub struct Identity {
|
||||||
pub id: String,
|
pub id: String,
|
||||||
pub scopes: Vec<String>,
|
pub scopes: Vec<String>,
|
||||||
|
|||||||
Reference in New Issue
Block a user