diff --git a/crates/alknet-call/src/client/from_jsonschema.rs b/crates/alknet-call/src/client/from_jsonschema.rs index 2c5bae6..d77e0cd 100644 --- a/crates/alknet-call/src/client/from_jsonschema.rs +++ b/crates/alknet-call/src/client/from_jsonschema.rs @@ -114,6 +114,7 @@ mod tests { parent_request_id: None, identity: None, handler_identity: None, + forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), scoped_env: ScopedOperationEnv::empty(), diff --git a/crates/alknet-call/src/protocol/adapter.rs b/crates/alknet-call/src/protocol/adapter.rs index d57c6c3..083ce4d 100644 --- a/crates/alknet-call/src/protocol/adapter.rs +++ b/crates/alknet-call/src/protocol/adapter.rs @@ -106,10 +106,16 @@ impl CallAdapter { request_id: String, operation_name: &str, identity: Option, + forwarded_for: Option, connection: &CallConnection, ) -> OperationContext { - self.dispatcher - .build_root_context(request_id, operation_name, identity, connection) + self.dispatcher.build_root_context( + request_id, + operation_name, + identity, + forwarded_for, + connection, + ) } #[cfg(test)] @@ -402,7 +408,8 @@ mod tests { let adapter = CallAdapter::new(registry, provider); let conn = CallConnection::new(stub_connection()); - let context = adapter.build_root_context("req-1".to_string(), "echo/run", None, &conn); + let context = + adapter.build_root_context("req-1".to_string(), "echo/run", None, None, &conn); assert!(!context.is_internal()); assert!(context.parent_request_id.is_none()); @@ -427,7 +434,8 @@ mod tests { let adapter = CallAdapter::new(registry, provider); let conn = CallConnection::new(stub_connection()); - let context = adapter.build_root_context("req-2".to_string(), "agent/run", None, &conn); + let context = + adapter.build_root_context("req-2".to_string(), "agent/run", None, None, &conn); assert!(context.scoped_env.allows("fs/readFile")); assert!(!context.scoped_env.allows("other/op")); @@ -445,7 +453,8 @@ mod tests { let adapter = CallAdapter::new(registry.clone(), provider); let conn = CallConnection::new(stub_connection()); - let context = adapter.build_root_context("req-3".to_string(), "fs/readFile", None, &conn); + let context = + adapter.build_root_context("req-3".to_string(), "fs/readFile", None, None, &conn); assert!(context.env.contains("fs/readFile")); } @@ -506,7 +515,8 @@ mod tests { let adapter = CallAdapter::new(registry, provider).with_session_source(session_source); let conn = CallConnection::new(stub_connection()); - let context = adapter.build_root_context("req-4".to_string(), "fs/readFile", None, &conn); + let context = + adapter.build_root_context("req-4".to_string(), "fs/readFile", None, None, &conn); assert!(context.env.contains("agent/chat")); assert!(context.env.contains("fs/readFile")); @@ -769,7 +779,8 @@ mod tests { let adapter = CallAdapter::new(registry, provider); let conn = CallConnection::new(stub_connection()); - let context = adapter.build_root_context("req-7".to_string(), "missing/op", None, &conn); + let context = + adapter.build_root_context("req-7".to_string(), "missing/op", None, None, &conn); assert!(!context.scoped_env.allows("missing/op")); assert!(context.handler_identity.is_none()); @@ -803,6 +814,220 @@ mod tests { } } + fn inspect_forwarded_for_handler() -> crate::registry::registration::Handler { + make_handler(|_input, context| async move { + let identity_id = context.identity.as_ref().map(|i| i.id.clone()); + let forwarded_for_id = context.forwarded_for.as_ref().map(|i| i.id.clone()); + ResponseEnvelope::ok( + context.request_id, + serde_json::json!({ + "identity_id": identity_id, + "forwarded_for_id": forwarded_for_id, + }), + ) + }) + } + + #[test] + fn build_root_context_populates_forwarded_for_from_argument() { + let registry = registry_with( + "echo/run", + Visibility::External, + AccessControl::default(), + echo_handler(), + ); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let adapter = CallAdapter::new(registry, provider); + let conn = CallConnection::new(stub_connection()); + + let forwarded = identity_with_scopes("alice", &["fs:read"]); + let context = adapter.build_root_context( + "req-ff-1".to_string(), + "echo/run", + None, + Some(forwarded.clone()), + &conn, + ); + + assert_eq!( + context.forwarded_for.as_ref().map(|i| &i.id), + Some(&"alice".to_string()) + ); + assert_eq!( + context.forwarded_for.as_ref().map(|i| i.scopes.clone()), + Some(forwarded.scopes.clone()) + ); + } + + #[test] + fn build_root_context_missing_forwarded_for_is_none() { + let registry = registry_with( + "echo/run", + Visibility::External, + AccessControl::default(), + echo_handler(), + ); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let adapter = CallAdapter::new(registry, provider); + let conn = CallConnection::new(stub_connection()); + + let context = + adapter.build_root_context("req-ff-2".to_string(), "echo/run", None, None, &conn); + + assert!(context.forwarded_for.is_none()); + } + + #[tokio::test] + async fn dispatch_requested_populates_forwarded_for_from_payload() { + let registry = registry_with( + "inspect/run", + Visibility::External, + AccessControl::default(), + inspect_forwarded_for_handler(), + ); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let adapter = CallAdapter::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/inspect/run", + "input": {}, + "forwarded_for": { + "id": "alice", + "scopes": ["fs:read", "docker:start"], + "resources": {}, + }, + }); + let response = adapter + .dispatch_requested(&conn, "req-ff-3".to_string(), payload) + .await; + + let out = response.result.expect("ok"); + assert_eq!(out["forwarded_for_id"], Value::String("alice".into())); + } + + #[tokio::test] + async fn dispatch_requested_missing_forwarded_for_yields_none() { + let registry = registry_with( + "inspect/run", + Visibility::External, + AccessControl::default(), + inspect_forwarded_for_handler(), + ); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let adapter = CallAdapter::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/inspect/run", + "input": {}, + }); + let response = adapter + .dispatch_requested(&conn, "req-ff-4".to_string(), payload) + .await; + + let out = response.result.expect("ok"); + assert!(out["forwarded_for_id"].is_null()); + } + + #[tokio::test] + async fn dispatch_requested_malformed_forwarded_for_yields_none() { + let registry = registry_with( + "inspect/run", + Visibility::External, + AccessControl::default(), + inspect_forwarded_for_handler(), + ); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let adapter = CallAdapter::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/inspect/run", + "input": {}, + "forwarded_for": "not-an-object", + }); + let response = adapter + .dispatch_requested(&conn, "req-ff-5".to_string(), payload) + .await; + + let out = response.result.expect("ok"); + assert!(out["forwarded_for_id"].is_null()); + } + + #[tokio::test] + async fn dispatch_requested_forwarded_for_does_not_satisfy_acl() { + let registry = registry_with( + "admin/run", + Visibility::External, + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + inspect_forwarded_for_handler(), + ); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let adapter = CallAdapter::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/admin/run", + "input": {}, + "forwarded_for": { + "id": "alice", + "scopes": ["admin"], + "resources": {}, + }, + }); + let response = adapter + .dispatch_requested(&conn, "req-ff-6".to_string(), payload) + .await; + + match response.result { + Err(e) => { + assert_eq!(e.code, "FORBIDDEN"); + assert_eq!(e.message, "authentication required"); + } + other => panic!("expected FORBIDDEN (forwarded_for must not authorize), got {other:?}"), + } + } + + #[tokio::test] + async fn dispatch_requested_forwarded_for_present_with_satisfied_acl() { + let registry = registry_with( + "admin/run", + Visibility::External, + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + inspect_forwarded_for_handler(), + ); + let token_identity = identity_with_scopes("hub", &["admin"]); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("alk_hub", token_identity)); + let adapter = CallAdapter::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/admin/run", + "input": {}, + "auth_token": "alk_hub", + "forwarded_for": { + "id": "alice", + "scopes": ["fs:read"], + "resources": {}, + }, + }); + let response = adapter + .dispatch_requested(&conn, "req-ff-7".to_string(), payload) + .await; + + let out = response.result.expect("ok"); + assert_eq!(out["identity_id"], Value::String("hub".into())); + assert_eq!(out["forwarded_for_id"], Value::String("alice".into())); + } + fn encode_frame(envelope: &EventEnvelope) -> Vec { let body = serde_json::to_vec(envelope).unwrap(); let mut buf = (body.len() as u32).to_be_bytes().to_vec(); diff --git a/crates/alknet-call/src/protocol/connection.rs b/crates/alknet-call/src/protocol/connection.rs index 7191a03..e5a4f25 100644 --- a/crates/alknet-call/src/protocol/connection.rs +++ b/crates/alknet-call/src/protocol/connection.rs @@ -283,6 +283,7 @@ impl OperationEnv for OverlayOperationEnv { .as_ref() .and_then(|ca| ca.as_identity()), handler_identity: composition_authority, + forwarded_for: None, capabilities: parent.capabilities.clone(), metadata: HashMap::new(), abort_policy: policy, @@ -431,6 +432,7 @@ mod tests { parent_request_id: None, identity: None, handler_identity: Some(CompositionAuthority::new("agent", ["fs:read".to_string()])), + forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), scoped_env, diff --git a/crates/alknet-call/src/protocol/dispatch.rs b/crates/alknet-call/src/protocol/dispatch.rs index ecea0cb..1f31cc7 100644 --- a/crates/alknet-call/src/protocol/dispatch.rs +++ b/crates/alknet-call/src/protocol/dispatch.rs @@ -120,6 +120,7 @@ impl Dispatcher { request_id: String, operation_name: &str, identity: Option, + forwarded_for: Option, connection: &CallConnection, ) -> OperationContext { let registration = self.registry.registration(operation_name); @@ -145,6 +146,7 @@ impl Dispatcher { parent_request_id: None, identity: identity.clone(), handler_identity: composition_authority, + forwarded_for, capabilities, metadata: HashMap::new(), deadline: Some(Instant::now() + self.default_timeout), @@ -172,10 +174,19 @@ impl Dispatcher { let connection_identity = connection.connection().identity().cloned(); let identity = self.resolve_identity(connection_identity, &payload); + let forwarded_for = payload + .get("forwarded_for") + .and_then(|v| serde_json::from_value::(v.clone()).ok()); + let input = payload.get("input").cloned().unwrap_or(Value::Null); - let context = - self.build_root_context(request_id.clone(), &operation_name, identity, connection); + let context = self.build_root_context( + request_id.clone(), + &operation_name, + identity, + forwarded_for, + connection, + ); self.registry.invoke(&operation_name, input, context).await } diff --git a/crates/alknet-call/src/registry/context.rs b/crates/alknet-call/src/registry/context.rs index 8500172..b1d7beb 100644 --- a/crates/alknet-call/src/registry/context.rs +++ b/crates/alknet-call/src/registry/context.rs @@ -13,6 +13,16 @@ pub struct OperationContext { pub parent_request_id: Option, pub identity: Option, pub handler_identity: Option, + /// The original caller when this call was forwarded by a `from_call` + /// handler (ADR-032). **Metadata only** — `AccessControl::check` never + /// reads it; the ACL always authorizes `identity` (the direct caller). + /// Handlers may read it for logging, auditing, per-user rate limiting, + /// or application context. Populated from + /// `call.requested.forwarded_for` by the dispatch path; set to `None` + /// for composed children (wire-ingress only, not composition-ingress). + /// The forwarder's claim, not a verified identity — a malicious hub can + /// lie (same property as HTTP `X-Forwarded-For`). See ADR-032. + pub forwarded_for: Option, pub capabilities: Capabilities, pub metadata: HashMap, pub scoped_env: ScopedOperationEnv, diff --git a/crates/alknet-call/src/registry/discovery.rs b/crates/alknet-call/src/registry/discovery.rs index e8b25da..6364437 100644 --- a/crates/alknet-call/src/registry/discovery.rs +++ b/crates/alknet-call/src/registry/discovery.rs @@ -283,6 +283,7 @@ mod tests { parent_request_id: None, identity: None, handler_identity: None, + forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), scoped_env: ScopedOperationEnv::empty(), diff --git a/crates/alknet-call/src/registry/env.rs b/crates/alknet-call/src/registry/env.rs index 54899f2..fb40458 100644 --- a/crates/alknet-call/src/registry/env.rs +++ b/crates/alknet-call/src/registry/env.rs @@ -77,6 +77,7 @@ impl OperationEnv for LocalOperationEnv { .as_ref() .and_then(|ca| ca.as_identity()), handler_identity: registration.composition_authority.clone(), + forwarded_for: None, capabilities: parent.capabilities.clone(), metadata: HashMap::new(), abort_policy: policy, @@ -209,6 +210,7 @@ mod tests { make_handler(|_input, context| async move { let internal = context.is_internal(); let id = context.identity.as_ref().map(|i| i.id.clone()); + let forwarded_for_id = context.forwarded_for.as_ref().map(|i| i.id.clone()); let metadata_empty = context.metadata.is_empty(); let parent_set = context.parent_request_id.is_some(); ResponseEnvelope::ok( @@ -216,6 +218,7 @@ mod tests { serde_json::json!({ "internal": internal, "identity_id": id, + "forwarded_for_id": forwarded_for_id, "metadata_empty": metadata_empty, "parent_set": parent_set, }), @@ -229,12 +232,31 @@ mod tests { handler_identity: Option, scoped_env: ScopedOperationEnv, env: Arc, + ) -> OperationContext { + root_context_with_forwarded_for( + request_id, + identity, + handler_identity, + None, + scoped_env, + env, + ) + } + + fn root_context_with_forwarded_for( + request_id: &str, + identity: Option, + handler_identity: Option, + forwarded_for: Option, + scoped_env: ScopedOperationEnv, + env: Arc, ) -> OperationContext { OperationContext { request_id: request_id.to_string(), parent_request_id: None, identity, handler_identity, + forwarded_for, capabilities: Capabilities::new(), metadata: HashMap::new(), scoped_env, @@ -369,6 +391,41 @@ mod tests { assert_eq!(out["metadata_empty"], Value::Bool(true)); } + #[tokio::test] + async fn local_env_child_does_not_inherit_forwarded_for() { + let registry = registry_with( + "child/run", + Visibility::External, + inspect_handler(), + None, + None, + ); + let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); + let scoped = ScopedOperationEnv::new(["child/run"]); + let forwarded = Identity { + id: "alice".to_string(), + scopes: vec![], + resources: HashMap::new(), + }; + let ctx = root_context_with_forwarded_for( + "root-ff", + None, + None, + Some(forwarded), + scoped, + env.clone(), + ); + assert!(ctx.forwarded_for.is_some()); + let response = env + .invoke("child", "run", serde_json::json!({}), &ctx) + .await; + let out = response.result.expect("ok"); + assert!( + out["forwarded_for_id"].is_null(), + "composed child must NOT inherit forwarded_for (wire-ingress only, ADR-032)" + ); + } + struct ProbeEnv { name: String, contains_set: Vec, diff --git a/crates/alknet-call/src/registry/registration.rs b/crates/alknet-call/src/registry/registration.rs index c045c1d..77837d1 100644 --- a/crates/alknet-call/src/registry/registration.rs +++ b/crates/alknet-call/src/registry/registration.rs @@ -254,6 +254,7 @@ mod tests { parent_request_id: None, identity, handler_identity, + forwarded_for: None, capabilities: Capabilities::new(), metadata: HashMap::new(), scoped_env, diff --git a/crates/alknet-call/tests/two_node_call.rs b/crates/alknet-call/tests/two_node_call.rs index 1af99f5..e0de5e9 100644 --- a/crates/alknet-call/tests/two_node_call.rs +++ b/crates/alknet-call/tests/two_node_call.rs @@ -272,6 +272,7 @@ async fn from_call_discovers_and_forwards_over_quic_loopback() { parent_request_id: None, identity: None, handler_identity: None, + forwarded_for: None, capabilities: Capabilities::new(), metadata: Default::default(), scoped_env: scoped, diff --git a/crates/alknet-core/src/auth.rs b/crates/alknet-core/src/auth.rs index 97ad002..fb7a62d 100644 --- a/crates/alknet-core/src/auth.rs +++ b/crates/alknet-core/src/auth.rs @@ -55,11 +55,12 @@ use std::sync::Arc; use arc_swap::ArcSwap; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use crate::config::{DynamicConfig, PeerEntry}; use crate::store::StoreError; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct Identity { pub id: String, pub scopes: Vec,