feat(call): add forwarded_for field to OperationContext (call/operation-context-forwarded-for)

This commit is contained in:
2026-06-28 22:03:51 +00:00
parent 1aeb634a2d
commit f5be62d131
10 changed files with 320 additions and 10 deletions

View File

@@ -114,6 +114,7 @@ mod tests {
parent_request_id: None, parent_request_id: None,
identity: None, identity: None,
handler_identity: None, handler_identity: None,
forwarded_for: None,
capabilities: Capabilities::new(), capabilities: Capabilities::new(),
metadata: HashMap::new(), metadata: HashMap::new(),
scoped_env: ScopedOperationEnv::empty(), scoped_env: ScopedOperationEnv::empty(),

View File

@@ -106,10 +106,16 @@ impl CallAdapter {
request_id: String, request_id: String,
operation_name: &str, operation_name: &str,
identity: Option<Identity>, identity: Option<Identity>,
forwarded_for: Option<Identity>,
connection: &CallConnection, connection: &CallConnection,
) -> OperationContext { ) -> OperationContext {
self.dispatcher self.dispatcher.build_root_context(
.build_root_context(request_id, operation_name, identity, connection) request_id,
operation_name,
identity,
forwarded_for,
connection,
)
} }
#[cfg(test)] #[cfg(test)]
@@ -402,7 +408,8 @@ mod tests {
let adapter = CallAdapter::new(registry, provider); let adapter = CallAdapter::new(registry, provider);
let conn = CallConnection::new(stub_connection()); let conn = CallConnection::new(stub_connection());
let context = adapter.build_root_context("req-1".to_string(), "echo/run", None, &conn); let context =
adapter.build_root_context("req-1".to_string(), "echo/run", None, None, &conn);
assert!(!context.is_internal()); assert!(!context.is_internal());
assert!(context.parent_request_id.is_none()); assert!(context.parent_request_id.is_none());
@@ -427,7 +434,8 @@ mod tests {
let adapter = CallAdapter::new(registry, provider); let adapter = CallAdapter::new(registry, provider);
let conn = CallConnection::new(stub_connection()); let conn = CallConnection::new(stub_connection());
let context = adapter.build_root_context("req-2".to_string(), "agent/run", None, &conn); let context =
adapter.build_root_context("req-2".to_string(), "agent/run", None, None, &conn);
assert!(context.scoped_env.allows("fs/readFile")); assert!(context.scoped_env.allows("fs/readFile"));
assert!(!context.scoped_env.allows("other/op")); assert!(!context.scoped_env.allows("other/op"));
@@ -445,7 +453,8 @@ mod tests {
let adapter = CallAdapter::new(registry.clone(), provider); let adapter = CallAdapter::new(registry.clone(), provider);
let conn = CallConnection::new(stub_connection()); let conn = CallConnection::new(stub_connection());
let context = adapter.build_root_context("req-3".to_string(), "fs/readFile", None, &conn); let context =
adapter.build_root_context("req-3".to_string(), "fs/readFile", None, None, &conn);
assert!(context.env.contains("fs/readFile")); assert!(context.env.contains("fs/readFile"));
} }
@@ -506,7 +515,8 @@ mod tests {
let adapter = CallAdapter::new(registry, provider).with_session_source(session_source); let adapter = CallAdapter::new(registry, provider).with_session_source(session_source);
let conn = CallConnection::new(stub_connection()); let conn = CallConnection::new(stub_connection());
let context = adapter.build_root_context("req-4".to_string(), "fs/readFile", None, &conn); let context =
adapter.build_root_context("req-4".to_string(), "fs/readFile", None, None, &conn);
assert!(context.env.contains("agent/chat")); assert!(context.env.contains("agent/chat"));
assert!(context.env.contains("fs/readFile")); assert!(context.env.contains("fs/readFile"));
@@ -769,7 +779,8 @@ mod tests {
let adapter = CallAdapter::new(registry, provider); let adapter = CallAdapter::new(registry, provider);
let conn = CallConnection::new(stub_connection()); let conn = CallConnection::new(stub_connection());
let context = adapter.build_root_context("req-7".to_string(), "missing/op", None, &conn); let context =
adapter.build_root_context("req-7".to_string(), "missing/op", None, None, &conn);
assert!(!context.scoped_env.allows("missing/op")); assert!(!context.scoped_env.allows("missing/op"));
assert!(context.handler_identity.is_none()); assert!(context.handler_identity.is_none());
@@ -803,6 +814,220 @@ mod tests {
} }
} }
fn inspect_forwarded_for_handler() -> crate::registry::registration::Handler {
make_handler(|_input, context| async move {
let identity_id = context.identity.as_ref().map(|i| i.id.clone());
let forwarded_for_id = context.forwarded_for.as_ref().map(|i| i.id.clone());
ResponseEnvelope::ok(
context.request_id,
serde_json::json!({
"identity_id": identity_id,
"forwarded_for_id": forwarded_for_id,
}),
)
})
}
#[test]
fn build_root_context_populates_forwarded_for_from_argument() {
let registry = registry_with(
"echo/run",
Visibility::External,
AccessControl::default(),
echo_handler(),
);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let adapter = CallAdapter::new(registry, provider);
let conn = CallConnection::new(stub_connection());
let forwarded = identity_with_scopes("alice", &["fs:read"]);
let context = adapter.build_root_context(
"req-ff-1".to_string(),
"echo/run",
None,
Some(forwarded.clone()),
&conn,
);
assert_eq!(
context.forwarded_for.as_ref().map(|i| &i.id),
Some(&"alice".to_string())
);
assert_eq!(
context.forwarded_for.as_ref().map(|i| i.scopes.clone()),
Some(forwarded.scopes.clone())
);
}
#[test]
fn build_root_context_missing_forwarded_for_is_none() {
let registry = registry_with(
"echo/run",
Visibility::External,
AccessControl::default(),
echo_handler(),
);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let adapter = CallAdapter::new(registry, provider);
let conn = CallConnection::new(stub_connection());
let context =
adapter.build_root_context("req-ff-2".to_string(), "echo/run", None, None, &conn);
assert!(context.forwarded_for.is_none());
}
#[tokio::test]
async fn dispatch_requested_populates_forwarded_for_from_payload() {
let registry = registry_with(
"inspect/run",
Visibility::External,
AccessControl::default(),
inspect_forwarded_for_handler(),
);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let adapter = CallAdapter::new(registry, provider);
let conn = Arc::new(CallConnection::new(stub_connection()));
let payload = serde_json::json!({
"operationId": "/inspect/run",
"input": {},
"forwarded_for": {
"id": "alice",
"scopes": ["fs:read", "docker:start"],
"resources": {},
},
});
let response = adapter
.dispatch_requested(&conn, "req-ff-3".to_string(), payload)
.await;
let out = response.result.expect("ok");
assert_eq!(out["forwarded_for_id"], Value::String("alice".into()));
}
#[tokio::test]
async fn dispatch_requested_missing_forwarded_for_yields_none() {
let registry = registry_with(
"inspect/run",
Visibility::External,
AccessControl::default(),
inspect_forwarded_for_handler(),
);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let adapter = CallAdapter::new(registry, provider);
let conn = Arc::new(CallConnection::new(stub_connection()));
let payload = serde_json::json!({
"operationId": "/inspect/run",
"input": {},
});
let response = adapter
.dispatch_requested(&conn, "req-ff-4".to_string(), payload)
.await;
let out = response.result.expect("ok");
assert!(out["forwarded_for_id"].is_null());
}
#[tokio::test]
async fn dispatch_requested_malformed_forwarded_for_yields_none() {
let registry = registry_with(
"inspect/run",
Visibility::External,
AccessControl::default(),
inspect_forwarded_for_handler(),
);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let adapter = CallAdapter::new(registry, provider);
let conn = Arc::new(CallConnection::new(stub_connection()));
let payload = serde_json::json!({
"operationId": "/inspect/run",
"input": {},
"forwarded_for": "not-an-object",
});
let response = adapter
.dispatch_requested(&conn, "req-ff-5".to_string(), payload)
.await;
let out = response.result.expect("ok");
assert!(out["forwarded_for_id"].is_null());
}
#[tokio::test]
async fn dispatch_requested_forwarded_for_does_not_satisfy_acl() {
let registry = registry_with(
"admin/run",
Visibility::External,
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
inspect_forwarded_for_handler(),
);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let adapter = CallAdapter::new(registry, provider);
let conn = Arc::new(CallConnection::new(stub_connection()));
let payload = serde_json::json!({
"operationId": "/admin/run",
"input": {},
"forwarded_for": {
"id": "alice",
"scopes": ["admin"],
"resources": {},
},
});
let response = adapter
.dispatch_requested(&conn, "req-ff-6".to_string(), payload)
.await;
match response.result {
Err(e) => {
assert_eq!(e.code, "FORBIDDEN");
assert_eq!(e.message, "authentication required");
}
other => panic!("expected FORBIDDEN (forwarded_for must not authorize), got {other:?}"),
}
}
#[tokio::test]
async fn dispatch_requested_forwarded_for_present_with_satisfied_acl() {
let registry = registry_with(
"admin/run",
Visibility::External,
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
inspect_forwarded_for_handler(),
);
let token_identity = identity_with_scopes("hub", &["admin"]);
let provider: Arc<dyn IdentityProvider> =
Arc::new(StaticIdentityProvider::new().with_token("alk_hub", token_identity));
let adapter = CallAdapter::new(registry, provider);
let conn = Arc::new(CallConnection::new(stub_connection()));
let payload = serde_json::json!({
"operationId": "/admin/run",
"input": {},
"auth_token": "alk_hub",
"forwarded_for": {
"id": "alice",
"scopes": ["fs:read"],
"resources": {},
},
});
let response = adapter
.dispatch_requested(&conn, "req-ff-7".to_string(), payload)
.await;
let out = response.result.expect("ok");
assert_eq!(out["identity_id"], Value::String("hub".into()));
assert_eq!(out["forwarded_for_id"], Value::String("alice".into()));
}
fn encode_frame(envelope: &EventEnvelope) -> Vec<u8> { fn encode_frame(envelope: &EventEnvelope) -> Vec<u8> {
let body = serde_json::to_vec(envelope).unwrap(); let body = serde_json::to_vec(envelope).unwrap();
let mut buf = (body.len() as u32).to_be_bytes().to_vec(); let mut buf = (body.len() as u32).to_be_bytes().to_vec();

View File

@@ -283,6 +283,7 @@ impl OperationEnv for OverlayOperationEnv {
.as_ref() .as_ref()
.and_then(|ca| ca.as_identity()), .and_then(|ca| ca.as_identity()),
handler_identity: composition_authority, handler_identity: composition_authority,
forwarded_for: None,
capabilities: parent.capabilities.clone(), capabilities: parent.capabilities.clone(),
metadata: HashMap::new(), metadata: HashMap::new(),
abort_policy: policy, abort_policy: policy,
@@ -431,6 +432,7 @@ mod tests {
parent_request_id: None, parent_request_id: None,
identity: None, identity: None,
handler_identity: Some(CompositionAuthority::new("agent", ["fs:read".to_string()])), handler_identity: Some(CompositionAuthority::new("agent", ["fs:read".to_string()])),
forwarded_for: None,
capabilities: Capabilities::new(), capabilities: Capabilities::new(),
metadata: HashMap::new(), metadata: HashMap::new(),
scoped_env, scoped_env,

View File

@@ -120,6 +120,7 @@ impl Dispatcher {
request_id: String, request_id: String,
operation_name: &str, operation_name: &str,
identity: Option<Identity>, identity: Option<Identity>,
forwarded_for: Option<Identity>,
connection: &CallConnection, connection: &CallConnection,
) -> OperationContext { ) -> OperationContext {
let registration = self.registry.registration(operation_name); let registration = self.registry.registration(operation_name);
@@ -145,6 +146,7 @@ impl Dispatcher {
parent_request_id: None, parent_request_id: None,
identity: identity.clone(), identity: identity.clone(),
handler_identity: composition_authority, handler_identity: composition_authority,
forwarded_for,
capabilities, capabilities,
metadata: HashMap::new(), metadata: HashMap::new(),
deadline: Some(Instant::now() + self.default_timeout), deadline: Some(Instant::now() + self.default_timeout),
@@ -172,10 +174,19 @@ impl Dispatcher {
let connection_identity = connection.connection().identity().cloned(); let connection_identity = connection.connection().identity().cloned();
let identity = self.resolve_identity(connection_identity, &payload); let identity = self.resolve_identity(connection_identity, &payload);
let forwarded_for = payload
.get("forwarded_for")
.and_then(|v| serde_json::from_value::<Identity>(v.clone()).ok());
let input = payload.get("input").cloned().unwrap_or(Value::Null); let input = payload.get("input").cloned().unwrap_or(Value::Null);
let context = let context = self.build_root_context(
self.build_root_context(request_id.clone(), &operation_name, identity, connection); request_id.clone(),
&operation_name,
identity,
forwarded_for,
connection,
);
self.registry.invoke(&operation_name, input, context).await self.registry.invoke(&operation_name, input, context).await
} }

View File

@@ -13,6 +13,16 @@ pub struct OperationContext {
pub parent_request_id: Option<String>, pub parent_request_id: Option<String>,
pub identity: Option<Identity>, pub identity: Option<Identity>,
pub handler_identity: Option<CompositionAuthority>, pub handler_identity: Option<CompositionAuthority>,
/// The original caller when this call was forwarded by a `from_call`
/// handler (ADR-032). **Metadata only** — `AccessControl::check` never
/// reads it; the ACL always authorizes `identity` (the direct caller).
/// Handlers may read it for logging, auditing, per-user rate limiting,
/// or application context. Populated from
/// `call.requested.forwarded_for` by the dispatch path; set to `None`
/// for composed children (wire-ingress only, not composition-ingress).
/// The forwarder's claim, not a verified identity — a malicious hub can
/// lie (same property as HTTP `X-Forwarded-For`). See ADR-032.
pub forwarded_for: Option<Identity>,
pub capabilities: Capabilities, pub capabilities: Capabilities,
pub metadata: HashMap<String, Value>, pub metadata: HashMap<String, Value>,
pub scoped_env: ScopedOperationEnv, pub scoped_env: ScopedOperationEnv,

View File

@@ -283,6 +283,7 @@ mod tests {
parent_request_id: None, parent_request_id: None,
identity: None, identity: None,
handler_identity: None, handler_identity: None,
forwarded_for: None,
capabilities: Capabilities::new(), capabilities: Capabilities::new(),
metadata: HashMap::new(), metadata: HashMap::new(),
scoped_env: ScopedOperationEnv::empty(), scoped_env: ScopedOperationEnv::empty(),

View File

@@ -77,6 +77,7 @@ impl OperationEnv for LocalOperationEnv {
.as_ref() .as_ref()
.and_then(|ca| ca.as_identity()), .and_then(|ca| ca.as_identity()),
handler_identity: registration.composition_authority.clone(), handler_identity: registration.composition_authority.clone(),
forwarded_for: None,
capabilities: parent.capabilities.clone(), capabilities: parent.capabilities.clone(),
metadata: HashMap::new(), metadata: HashMap::new(),
abort_policy: policy, abort_policy: policy,
@@ -209,6 +210,7 @@ mod tests {
make_handler(|_input, context| async move { make_handler(|_input, context| async move {
let internal = context.is_internal(); let internal = context.is_internal();
let id = context.identity.as_ref().map(|i| i.id.clone()); let id = context.identity.as_ref().map(|i| i.id.clone());
let forwarded_for_id = context.forwarded_for.as_ref().map(|i| i.id.clone());
let metadata_empty = context.metadata.is_empty(); let metadata_empty = context.metadata.is_empty();
let parent_set = context.parent_request_id.is_some(); let parent_set = context.parent_request_id.is_some();
ResponseEnvelope::ok( ResponseEnvelope::ok(
@@ -216,6 +218,7 @@ mod tests {
serde_json::json!({ serde_json::json!({
"internal": internal, "internal": internal,
"identity_id": id, "identity_id": id,
"forwarded_for_id": forwarded_for_id,
"metadata_empty": metadata_empty, "metadata_empty": metadata_empty,
"parent_set": parent_set, "parent_set": parent_set,
}), }),
@@ -229,12 +232,31 @@ mod tests {
handler_identity: Option<CompositionAuthority>, handler_identity: Option<CompositionAuthority>,
scoped_env: ScopedOperationEnv, scoped_env: ScopedOperationEnv,
env: Arc<dyn OperationEnv + Send + Sync>, env: Arc<dyn OperationEnv + Send + Sync>,
) -> OperationContext {
root_context_with_forwarded_for(
request_id,
identity,
handler_identity,
None,
scoped_env,
env,
)
}
fn root_context_with_forwarded_for(
request_id: &str,
identity: Option<Identity>,
handler_identity: Option<CompositionAuthority>,
forwarded_for: Option<Identity>,
scoped_env: ScopedOperationEnv,
env: Arc<dyn OperationEnv + Send + Sync>,
) -> OperationContext { ) -> OperationContext {
OperationContext { OperationContext {
request_id: request_id.to_string(), request_id: request_id.to_string(),
parent_request_id: None, parent_request_id: None,
identity, identity,
handler_identity, handler_identity,
forwarded_for,
capabilities: Capabilities::new(), capabilities: Capabilities::new(),
metadata: HashMap::new(), metadata: HashMap::new(),
scoped_env, scoped_env,
@@ -369,6 +391,41 @@ mod tests {
assert_eq!(out["metadata_empty"], Value::Bool(true)); assert_eq!(out["metadata_empty"], Value::Bool(true));
} }
#[tokio::test]
async fn local_env_child_does_not_inherit_forwarded_for() {
let registry = registry_with(
"child/run",
Visibility::External,
inspect_handler(),
None,
None,
);
let env = Arc::new(LocalOperationEnv::new(Arc::clone(&registry)));
let scoped = ScopedOperationEnv::new(["child/run"]);
let forwarded = Identity {
id: "alice".to_string(),
scopes: vec![],
resources: HashMap::new(),
};
let ctx = root_context_with_forwarded_for(
"root-ff",
None,
None,
Some(forwarded),
scoped,
env.clone(),
);
assert!(ctx.forwarded_for.is_some());
let response = env
.invoke("child", "run", serde_json::json!({}), &ctx)
.await;
let out = response.result.expect("ok");
assert!(
out["forwarded_for_id"].is_null(),
"composed child must NOT inherit forwarded_for (wire-ingress only, ADR-032)"
);
}
struct ProbeEnv { struct ProbeEnv {
name: String, name: String,
contains_set: Vec<String>, contains_set: Vec<String>,

View File

@@ -254,6 +254,7 @@ mod tests {
parent_request_id: None, parent_request_id: None,
identity, identity,
handler_identity, handler_identity,
forwarded_for: None,
capabilities: Capabilities::new(), capabilities: Capabilities::new(),
metadata: HashMap::new(), metadata: HashMap::new(),
scoped_env, scoped_env,

View File

@@ -272,6 +272,7 @@ async fn from_call_discovers_and_forwards_over_quic_loopback() {
parent_request_id: None, parent_request_id: None,
identity: None, identity: None,
handler_identity: None, handler_identity: None,
forwarded_for: None,
capabilities: Capabilities::new(), capabilities: Capabilities::new(),
metadata: Default::default(), metadata: Default::default(),
scoped_env: scoped, scoped_env: scoped,

View File

@@ -55,11 +55,12 @@ use std::sync::Arc;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::config::{DynamicConfig, PeerEntry}; use crate::config::{DynamicConfig, PeerEntry};
use crate::store::StoreError; use crate::store::StoreError;
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct Identity { pub struct Identity {
pub id: String, pub id: String,
pub scopes: Vec<String>, pub scopes: Vec<String>,