diff --git a/crates/alknet-call/src/client/from_jsonschema.rs b/crates/alknet-call/src/client/from_jsonschema.rs index 2c5bae6..85da967 100644 --- a/crates/alknet-call/src/client/from_jsonschema.rs +++ b/crates/alknet-call/src/client/from_jsonschema.rs @@ -121,6 +121,7 @@ mod tests { abort_policy: AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), internal: true, + forwarded_for: None, } } diff --git a/crates/alknet-call/src/protocol/adapter.rs b/crates/alknet-call/src/protocol/adapter.rs index be9e4e3..e44f11f 100644 --- a/crates/alknet-call/src/protocol/adapter.rs +++ b/crates/alknet-call/src/protocol/adapter.rs @@ -109,7 +109,7 @@ impl CallAdapter { connection: &CallConnection, ) -> OperationContext { self.dispatcher - .build_root_context(request_id, operation_name, identity, connection) + .build_root_context(request_id, operation_name, identity, None, connection) } #[cfg(test)] @@ -564,6 +564,7 @@ mod tests { abort_policy: AbortPolicy::default(), deadline: context.deadline, internal: false, + forwarded_for: None, }; let response = context .env @@ -620,6 +621,7 @@ mod tests { abort_policy: AbortPolicy::default(), deadline: context.deadline, internal: false, + forwarded_for: None, }; let response = context .env diff --git a/crates/alknet-call/src/protocol/connection.rs b/crates/alknet-call/src/protocol/connection.rs index 7191a03..d59e2e5 100644 --- a/crates/alknet-call/src/protocol/connection.rs +++ b/crates/alknet-call/src/protocol/connection.rs @@ -290,6 +290,7 @@ impl OperationEnv for OverlayOperationEnv { scoped_env, env: parent.env.clone(), internal: true, + forwarded_for: None, }; handler(input, context).await @@ -438,6 +439,7 @@ mod tests { abort_policy: AbortPolicy::default(), deadline: Some(Instant::now() + Duration::from_secs(30)), internal: true, + forwarded_for: None, } } diff --git a/crates/alknet-call/src/protocol/dispatch.rs b/crates/alknet-call/src/protocol/dispatch.rs index be65bf9..5b5493c 100644 --- a/crates/alknet-call/src/protocol/dispatch.rs +++ b/crates/alknet-call/src/protocol/dispatch.rs @@ -127,6 +127,7 @@ impl Dispatcher { request_id: String, operation_name: &str, identity: Option, + forwarded_for: Option, connection: &CallConnection, ) -> OperationContext { let registration = self.registry.registration(operation_name); @@ -159,6 +160,7 @@ impl Dispatcher { env: stub_env, abort_policy: AbortPolicy::default(), internal: false, + forwarded_for, }; context.env = self.compose_root_env(connection, &context); context @@ -178,11 +180,19 @@ impl Dispatcher { let connection_identity = connection.connection().identity().cloned(); let identity = self.resolve_identity(connection_identity, &payload); + let forwarded_for = payload + .get("forwarded_for") + .and_then(|v| serde_json::from_value::(v.clone()).ok()); let input = payload.get("input").cloned().unwrap_or(Value::Null); - let context = - self.build_root_context(request_id.clone(), &operation_name, identity, connection); + let context = self.build_root_context( + request_id.clone(), + &operation_name, + identity, + forwarded_for, + connection, + ); self.registry.invoke(&operation_name, input, context).await } @@ -305,3 +315,390 @@ impl Clone for Dispatcher { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; + use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; + use alknet_core::auth::{AuthToken, Identity, IdentityProvider}; + use alknet_core::types::{Capabilities, MockConnection}; + use std::collections::HashMap; + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::Mutex as StdMutex; + + struct StubConnection { + alpn: &'static [u8], + addr: Option, + closed: StdMutex>, + } + + impl MockConnection for StubConnection { + fn remote_alpn(&self) -> &[u8] { + self.alpn + } + fn remote_addr(&self) -> Option { + self.addr + } + fn close(&self, code: u32, reason: &str) { + *self.closed.lock().unwrap() = Some((code, reason.to_string())); + } + } + + fn stub_connection() -> alknet_core::types::Connection { + alknet_core::types::Connection::from_mock(Arc::new(StubConnection { + alpn: b"alknet/call", + addr: Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 4321)), + closed: StdMutex::new(None), + })) + } + + struct StaticIdentityProvider { + tokens: StdMutex>, + } + + impl StaticIdentityProvider { + fn new() -> Self { + Self { + tokens: StdMutex::new(HashMap::new()), + } + } + + fn with_token(self, token: &str, identity: Identity) -> Self { + self.tokens + .lock() + .unwrap() + .insert(token.to_string(), identity); + self + } + } + + impl IdentityProvider for StaticIdentityProvider { + fn resolve_from_fingerprint(&self, _fp: &str) -> Option { + None + } + fn resolve_from_token(&self, token: &AuthToken) -> Option { + let token_str = String::from_utf8_lossy(&token.raw); + self.tokens.lock().unwrap().get(token_str.as_ref()).cloned() + } + } + + fn identity_with_scopes(id: &str, scopes: &[&str]) -> Identity { + Identity { + id: id.to_string(), + scopes: scopes.iter().map(|s| s.to_string()).collect(), + resources: HashMap::new(), + } + } + + fn external_spec(name: &str, acl: AccessControl) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Query, + Visibility::External, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ) + } + + fn internal_spec(name: &str, acl: AccessControl) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Query, + Visibility::Internal, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ) + } + + fn registry_with(name: &str, visibility: Visibility, acl: AccessControl) -> OperationRegistry { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + OperationSpec::new( + name, + OperationType::Query, + visibility, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ), + make_handler(|input, context| async move { + ResponseEnvelope::ok(context.request_id, input) + }), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + registry + } + + fn dispatcher() -> Dispatcher { + Dispatcher::new( + Arc::new(OperationRegistry::new()), + Arc::new(StaticIdentityProvider::new()), + ) + } + + #[tokio::test] + async fn dispatch_authorized_peer_dispatches_and_populates_capabilities() { + let caps = Capabilities::new().with_api_key("google", "k".to_string()); + let mut registry = OperationRegistry::new(); + let handler = make_handler(|_input, context| async move { + let has_google = context.capabilities.get("google").is_some(); + ResponseEnvelope::ok( + context.request_id, + serde_json::json!({ "has_google": has_google }), + ) + }); + registry.register(HandlerRegistration::new( + external_spec("admin/run", AccessControl::default()), + handler, + OperationProvenance::Local, + None, + None, + caps, + )); + let registry = Arc::new(registry); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/admin/run", + "input": {}, + }); + let response = dp + .dispatch_requested(&conn, "req-1".to_string(), payload) + .await; + let out = response.result.expect("dispatch ok"); + assert_eq!(out["has_google"], Value::Bool(true)); + } + + #[tokio::test] + async fn dispatch_unauthorized_peer_returns_forbidden_capabilities_never_populated() { + let caps = Capabilities::new().with_api_key("google", "k".to_string()); + let mut registry = OperationRegistry::new(); + let handler = make_handler(|_input, context| async move { + let has_google = context.capabilities.get("google").is_some(); + ResponseEnvelope::ok( + context.request_id, + serde_json::json!({ "has_google": has_google }), + ) + }); + registry.register(HandlerRegistration::new( + external_spec( + "admin/run", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + handler, + OperationProvenance::Local, + None, + None, + caps, + )); + let registry = Arc::new(registry); + let provider: Arc = Arc::new( + StaticIdentityProvider::new() + .with_token("alk_user", identity_with_scopes("regular-user", &["user"])), + ); + let dp = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/admin/run", + "input": {}, + "auth_token": "alk_user", + }); + let response = dp + .dispatch_requested(&conn, "req-2".to_string(), payload) + .await; + match response.result { + Err(e) => { + assert_eq!(e.code, "FORBIDDEN"); + assert!(e.message.contains("admin")); + } + other => panic!("expected FORBIDDEN, got {other:?}"), + } + } + + #[tokio::test] + async fn dispatch_internal_op_from_wire_returns_not_found_before_acl() { + let registry = Arc::new(registry_with( + "secret/op", + Visibility::Internal, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/secret/op", + "input": {}, + }); + let response = dp + .dispatch_requested(&conn, "req-3".to_string(), payload) + .await; + match response.result { + Err(e) => { + assert_eq!(e.code, "NOT_FOUND"); + assert!(e.message.contains("secret/op")); + } + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + + #[tokio::test] + async fn dispatch_connection_with_no_identity_produces_no_peer_id_in_env() { + let registry = Arc::new(registry_with( + "fs/readFile", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = CallConnection::new(stub_connection()); + + let context = dp.build_root_context("req-4".to_string(), "fs/readFile", None, None, &conn); + + assert!( + context.identity.is_none(), + "no connection identity → context.identity is None" + ); + assert!( + context.env.peer_ids().is_empty(), + "no peer overlay attached when connection has no identity" + ); + } + + #[tokio::test] + async fn dispatch_connection_with_identity_attaches_peer_overlay_keyed_by_identity_id() { + let registry = Arc::new(registry_with( + "fs/readFile", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = CallConnection::new(stub_connection()); + conn.connection() + .set_identity(identity_with_scopes("worker-a", &[])) + .expect("identity not yet set"); + + let context = dp.build_root_context("req-5".to_string(), "fs/readFile", None, None, &conn); + + assert_eq!( + context.env.peer_ids(), + vec!["worker-a".to_string()], + "PeerId for connection comes from connection.identity().id" + ); + } + + #[tokio::test] + async fn dispatch_extract_forwarded_for_from_payload_into_context() { + let mut registry = OperationRegistry::new(); + let handler = make_handler(|_input, context| async move { + let forwarded_id = context.forwarded_for.as_ref().map(|i| i.id.clone()); + ResponseEnvelope::ok( + context.request_id, + serde_json::json!({ "forwarded_for_id": forwarded_id }), + ) + }); + registry.register(HandlerRegistration::new( + external_spec("fs/readFile", AccessControl::default()), + handler, + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let registry = Arc::new(registry); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/fs/readFile", + "input": {}, + "forwarded_for": { + "id": "alice", + "scopes": ["fs:read"], + "resources": {} + }, + }); + let response = dp + .dispatch_requested(&conn, "req-6".to_string(), payload) + .await; + let out = response.result.expect("ok"); + assert_eq!(out["forwarded_for_id"], Value::String("alice".into())); + } + + #[tokio::test] + async fn dispatch_without_forwarded_for_field_is_none() { + let mut registry = OperationRegistry::new(); + let handler = make_handler(|_input, context| async move { + let present = context.forwarded_for.is_some(); + ResponseEnvelope::ok( + context.request_id, + serde_json::json!({ "present": present }), + ) + }); + registry.register(HandlerRegistration::new( + external_spec("fs/readFile", AccessControl::default()), + handler, + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let registry = Arc::new(registry); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/fs/readFile", + "input": {}, + }); + let response = dp + .dispatch_requested(&conn, "req-7".to_string(), payload) + .await; + let out = response.result.expect("ok"); + assert_eq!(out["present"], Value::Bool(false)); + } + + #[tokio::test] + async fn dispatch_default_access_control_dispatches_to_any_peer() { + let registry = Arc::new(registry_with( + "echo/run", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = Dispatcher::new(registry, provider); + let conn = Arc::new(CallConnection::new(stub_connection())); + + let payload = serde_json::json!({ + "operationId": "/echo/run", + "input": { "msg": "hi" }, + }); + let response = dp + .dispatch_requested(&conn, "req-8".to_string(), payload) + .await; + assert_eq!(response.result, Ok(serde_json::json!({ "msg": "hi" }))); + } + + #[test] + fn dispatcher_helper_compiles_with_full_signature() { + let _dp = dispatcher(); + } +} diff --git a/crates/alknet-call/src/registry/context.rs b/crates/alknet-call/src/registry/context.rs index 8500172..fcd830d 100644 --- a/crates/alknet-call/src/registry/context.rs +++ b/crates/alknet-call/src/registry/context.rs @@ -13,6 +13,16 @@ pub struct OperationContext { pub parent_request_id: Option, pub identity: Option, pub handler_identity: Option, + /// The original caller when this call was forwarded (ADR-032). + /// Metadata only — NOT used by `AccessControl::check`. The dispatch + /// path populates it from the `call.requested.forwarded_for` field; + /// the `from_call` handler sets it when constructing the forwarded + /// payload. Handlers may read it for logging, auditing, per-user + /// rate limiting, or application context. The ACL check always runs + /// against `identity` (the direct caller), never against + /// `forwarded_for`. Not inherited by composed children (wire-ingress + /// only). + pub forwarded_for: Option, pub capabilities: Capabilities, pub metadata: HashMap, pub scoped_env: ScopedOperationEnv, diff --git a/crates/alknet-call/src/registry/discovery.rs b/crates/alknet-call/src/registry/discovery.rs index 8182608..900b21d 100644 --- a/crates/alknet-call/src/registry/discovery.rs +++ b/crates/alknet-call/src/registry/discovery.rs @@ -393,6 +393,7 @@ mod tests { abort_policy: crate::registry::context::AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), internal: false, + forwarded_for: None, } } @@ -412,6 +413,7 @@ mod tests { abort_policy: crate::registry::context::AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), internal: false, + forwarded_for: None, } } @@ -838,6 +840,7 @@ mod tests { abort_policy: crate::registry::context::AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), internal: false, + forwarded_for: None, }; let response = handler(serde_json::json!({}), ctx).await; let output = response.result.expect("ok response"); @@ -920,6 +923,7 @@ mod tests { abort_policy: crate::registry::context::AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), internal: false, + forwarded_for: None, }; let response = handler(serde_json::json!({}), ctx).await; let output = response.result.expect("ok response"); diff --git a/crates/alknet-call/src/registry/env.rs b/crates/alknet-call/src/registry/env.rs index 2b9ca4a..1bc6ef6 100644 --- a/crates/alknet-call/src/registry/env.rs +++ b/crates/alknet-call/src/registry/env.rs @@ -105,6 +105,7 @@ impl OperationEnv for LocalOperationEnv { .unwrap_or_else(ScopedOperationEnv::empty), env: parent.env.clone(), internal: true, + forwarded_for: None, }; self.registry.invoke(&name, input, context).await @@ -218,6 +219,10 @@ impl OperationEnv for PeerCompositeEnv { || self.connections.values().any(|c| c.contains(name)) || self.base.contains(name) } + + fn peer_ids(&self) -> Vec { + self.connection_order.clone() + } } #[cfg(test)] @@ -295,6 +300,7 @@ mod tests { abort_policy: AbortPolicy::default(), deadline: Some(Instant::now() + Duration::from_secs(30)), internal: false, + forwarded_for: None, } } diff --git a/crates/alknet-call/src/registry/registration.rs b/crates/alknet-call/src/registry/registration.rs index c045c1d..ad4c1d3 100644 --- a/crates/alknet-call/src/registry/registration.rs +++ b/crates/alknet-call/src/registry/registration.rs @@ -261,6 +261,7 @@ mod tests { abort_policy: AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), internal, + forwarded_for: None, } } diff --git a/crates/alknet-call/tests/two_node_call.rs b/crates/alknet-call/tests/two_node_call.rs index 1af99f5..dc9a62b 100644 --- a/crates/alknet-call/tests/two_node_call.rs +++ b/crates/alknet-call/tests/two_node_call.rs @@ -279,6 +279,7 @@ async fn from_call_discovers_and_forwards_over_quic_loopback() { abort_policy: alknet_call::registry::context::AbortPolicy::default(), deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), internal: true, + forwarded_for: None, }; let response = tokio::time::timeout( diff --git a/crates/alknet-core/src/auth.rs b/crates/alknet-core/src/auth.rs index 97ad002..79ba129 100644 --- a/crates/alknet-core/src/auth.rs +++ b/crates/alknet-core/src/auth.rs @@ -59,7 +59,7 @@ use async_trait::async_trait; use crate::config::{DynamicConfig, PeerEntry}; use crate::store::StoreError; -#[derive(Debug, Clone, PartialEq)] +#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)] pub struct Identity { pub id: String, pub scopes: Vec,