From 2d0afc878839fc8bd34a4bb7ea42e7d25368ee35 Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Sun, 28 Jun 2026 22:19:23 +0000 Subject: [PATCH] feat(call): wire from_call forwarded_for and peer-keyed collision (call/from-call-forwarded-for) --- crates/alknet-call/src/client/from_call.rs | 398 +++++++++++++++++- crates/alknet-call/src/client/mod.rs | 9 +- crates/alknet-call/src/protocol/connection.rs | 10 +- 3 files changed, 391 insertions(+), 26 deletions(-) diff --git a/crates/alknet-call/src/client/from_call.rs b/crates/alknet-call/src/client/from_call.rs index 9fe34d6..6df137d 100644 --- a/crates/alknet-call/src/client/from_call.rs +++ b/crates/alknet-call/src/client/from_call.rs @@ -18,6 +18,7 @@ use serde_json::{json, Value}; use crate::client::AdapterError; use crate::protocol::connection::CallConnection; use crate::protocol::wire::ResponseEnvelope; +use crate::registry::context::OperationContext; use crate::registry::registration::{Handler, HandlerRegistration, OperationProvenance}; use crate::registry::spec::{ AccessControl, ErrorDefinition, OperationSpec, OperationType, Visibility, @@ -25,12 +26,18 @@ use crate::registry::spec::{ use alknet_core::types::Capabilities; /// Configuration for [`from_call`]. +/// +/// Under the peer-keyed overlay model (ADR-029 §5), cross-peer collision +/// dissolves — same name on different peers lives in separate sub-overlays. +/// Same-peer collision stays an error (`AdapterError::SamePeerCollision`): +/// a peer shouldn't expose two ops with the same name. #[derive(Debug, Clone, Default)] pub struct FromCallConfig { - /// Optional namespace prefix applied to imported operation names. When - /// `None` (default), no prefix is applied. Collision on import is an error - /// (DC-3/OQ-28), not last-wins — a node importing from two remotes that - /// both expose `/container/exec` without prefixes fails loudly. + /// Optional namespace prefix applied to imported operation names. This is + /// local-naming sugar for when the importing node wants to expose a peer's + /// ops under a different name *locally* — not a disambiguation mechanism + /// (cross-peer collision dissolves under the peer-keyed model, ADR-029 + /// §5). Defaults to `None`. pub namespace_prefix: Option, /// Optional filter — import only operations whose names match. `None` /// imports all `External` ops discovered via `services/list`. @@ -57,43 +64,68 @@ impl FromCallConfig { /// `services/schema` and construct `HandlerRegistration` bundles with /// `FromCall` provenance and forwarding handlers. The caller registers the /// bundles in the connection's overlay via -/// `CallConnection::register_imported_all()`. +/// `CallConnection::register_imported_all()` — this is the peer-keyed +/// registration model (ADR-029 §5): the connection's overlay is the peer's +/// sub-overlay, aggregated into `PeerCompositeEnv` by `PeerId`. /// /// v1 defaults (two-way doors recorded in `client-and-adapters.md`): /// - auto-on-reconnect: the overlay is per-connection (Layer 2, ADR-024), so /// re-import on reconnect is naturally scoped; the assembly layer calls /// `from_call` immediately after `connect()`. -/// - error-on-collision: applying the (possibly empty) prefix produces a name -/// that already exists in the target overlay → `AdapterError::Conflict`. +/// - same-peer collision = error: two ops with the same name from the same +/// peer (after applying the optional prefix) → `AdapterError::SamePeerCollision`. +/// Cross-peer collision dissolves (ADR-029 §5). pub async fn from_call( connection: &CallConnection, config: FromCallConfig, ) -> Result, AdapterError> { let discovered = discover_operations(connection).await?; + build_bundles( + discovered, + &config.namespace_prefix, + &config.operation_filter, + ) +} + +/// Pure bundle construction extracted from [`from_call`] for testability — +/// the discovery round-trip against a live `CallConnection` is exercised by +/// integration tests; the collision rule and the forwarded_for-populating +/// handler are unit-tested here. The `peer_id` parameter records which peer's +/// sub-overlay these bundles target (ADR-029 §5); it is metadata on the +/// bundles' forwarding handlers, not used for collision detection (collision +/// is same-peer only and is checked within this set). +fn build_bundles( + discovered: Vec, + namespace_prefix: &Option, + operation_filter: &Option>, +) -> Result, AdapterError> { let mut bundles = Vec::with_capacity(discovered.len()); let mut seen_names = HashSet::new(); for op_summary in discovered { let remote_name = op_summary.name; - if let Some(filter) = &config.operation_filter { + if let Some(filter) = operation_filter { if !filter.contains(&remote_name) { continue; } } - let schema = fetch_schema(connection, &remote_name).await?; - let spec = rebuild_spec(&schema, &remote_name, &config.namespace_prefix)?; + let spec = rebuild_spec_for(&op_summary.schema, &remote_name, namespace_prefix)?; if !seen_names.insert(spec.name.clone()) { - return Err(AdapterError::Conflict { + return Err(AdapterError::SamePeerCollision { message: format!( - "namespace collision on import: {} (use a namespace_prefix)", + "same-peer collision on import: {} (peer exposes two ops with the same name after prefix)", spec.name ), }); } - let handler = make_forwarding_handler(Arc::new(connection.clone()), remote_name); + let handler = make_forwarding_handler( + Arc::new(op_summary.connection.clone()), + remote_name, + op_summary.credentials_auth_token.clone(), + ); bundles.push(HandlerRegistration::new( spec, handler, @@ -107,9 +139,12 @@ pub async fn from_call( Ok(bundles) } -#[derive(Debug, Clone)] +#[derive(Clone)] struct OpSummary { name: String, + schema: Value, + connection: CallConnection, + credentials_auth_token: Option, } async fn discover_operations(connection: &CallConnection) -> Result, AdapterError> { @@ -131,8 +166,12 @@ async fn discover_operations(connection: &CallConnection) -> Result Result, @@ -272,24 +311,44 @@ fn parse_access_control(v: &Value) -> AccessControl { /// Construct a forwarding handler for a `FromCall` leaf: on invocation, calls /// the remote op via the `CallConnection` and returns its `ResponseEnvelope`. +/// +/// Per ADR-032 §3, the handler populates `forwarded_for` on the +/// `call.requested` payload from the hub's `OperationContext.identity` (the +/// end user the hub authenticated). The hub authenticates as itself when +/// forwarding — the `credentials_auth_token`, when present, is the hub's own +/// call-protocol-level token placed in the payload's `auth_token` field. The +/// spoke authorizes the hub (its direct caller); `forwarded_for` is metadata, +/// never read by `AccessControl::check`. +/// +/// If `context.identity` is `None` (the hub chose not to disclose, or has not +/// authenticated an originator), `forwarded_for` is omitted — the spoke +/// receives only the hub's identity. +/// /// For a `Subscription` op, the handler calls `subscribe` and streams until /// `completed`/`aborted` (the streaming path is exercised at the /// `CallConnection` layer; the handler here forwards the first response for /// query/mutation and delegates streaming to the caller via the returned /// envelope). -fn make_forwarding_handler(connection: Arc, remote_name: String) -> Handler { +fn make_forwarding_handler( + connection: Arc, + remote_name: String, + credentials_auth_token: Option, +) -> Handler { use crate::registry::registration::make_handler; make_handler(move |input, context| { let connection = Arc::clone(&connection); let remote_name = remote_name.clone(); + let auth_token = credentials_auth_token.clone(); async move { + let payload = + build_forwarded_payload(&remote_name, input, &context, auth_token.as_deref()); // The forwarding handler invokes the remote op via the // CallConnection. The parent_request_id participates in the abort // cascade (ADR-016 §6): if the parent is aborted, the cascade // reaches this handler, which sends call.aborted to the remote // node; the remote node cascades to its own descendants. // Cross-node abort is transparent. - let response = connection.call(&remote_name, input).await; + let response = connection.call_with_payload(payload).await; ResponseEnvelope { request_id: context.request_id, result: response.result, @@ -298,12 +357,43 @@ fn make_forwarding_handler(connection: Arc, remote_name: String) }) } +/// Build the `call.requested` payload for a forwarded call, populating +/// `forwarded_for` from the hub's `OperationContext.identity` (ADR-032 §3). +/// `forwarded_for` is omitted when `context.identity` is `None` (the hub +/// chooses not to disclose the originator). The `auth_token` field is set to +/// the hub's own call-protocol token when present. +fn build_forwarded_payload( + operation_id: &str, + input: Value, + context: &OperationContext, + auth_token: Option<&str>, +) -> Value { + let mut payload = serde_json::Map::new(); + payload.insert( + "operationId".to_string(), + Value::String(operation_id.to_string()), + ); + payload.insert("input".to_string(), input); + if let Some(originator) = &context.identity { + if let Ok(value) = serde_json::to_value(originator) { + payload.insert("forwarded_for".to_string(), value); + } + } + if let Some(token) = auth_token { + payload.insert("auth_token".to_string(), Value::String(token.to_string())); + } + Value::Object(payload) +} + #[cfg(test)] mod tests { use super::*; use crate::protocol::connection::CallConnection; + use crate::registry::registration::make_handler; use crate::registry::spec::OperationType; + use alknet_core::auth::Identity; use alknet_core::types::{Capabilities, MockConnection}; + use std::collections::HashMap; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::sync::Mutex as StdMutex; @@ -349,7 +439,7 @@ mod tests { #[test] fn rebuild_spec_no_prefix_preserves_name() { let schema = sample_schema_json("fs/readFile", "query"); - let spec = rebuild_spec(&schema, "fs/readFile", &None).expect("rebuild"); + let spec = rebuild_spec_for(&schema, "fs/readFile", &None).expect("rebuild"); assert_eq!(spec.name, "fs/readFile"); assert_eq!(spec.op_type, OperationType::Query); assert_eq!(spec.visibility, Visibility::External); @@ -359,14 +449,14 @@ mod tests { fn rebuild_spec_with_prefix_applies_prefix() { let schema = sample_schema_json("fs/readFile", "query"); let spec = - rebuild_spec(&schema, "fs/readFile", &Some("worker".to_string())).expect("rebuild"); + rebuild_spec_for(&schema, "fs/readFile", &Some("worker".to_string())).expect("rebuild"); assert_eq!(spec.name, "worker/fs/readFile"); } #[test] fn rebuild_spec_unknown_op_type_returns_schema_parse() { let schema = sample_schema_json("fs/readFile", "weird"); - match rebuild_spec(&schema, "fs/readFile", &None) { + match rebuild_spec_for(&schema, "fs/readFile", &None) { Err(AdapterError::SchemaParse { .. }) => {} other => panic!("expected SchemaParse, got {other:?}"), } @@ -375,7 +465,7 @@ mod tests { #[test] fn rebuild_spec_missing_op_type_returns_schema_parse() { let schema = json!({"name": "fs/readFile"}); - match rebuild_spec(&schema, "fs/readFile", &None) { + match rebuild_spec_for(&schema, "fs/readFile", &None) { Err(AdapterError::SchemaParse { .. }) => {} other => panic!("expected SchemaParse, got {other:?}"), } @@ -403,7 +493,7 @@ mod tests { "resource_action": "read", }, }); - let spec = rebuild_spec(&schema, "fs/readFileErr", &None).expect("rebuild"); + let spec = rebuild_spec_for(&schema, "fs/readFileErr", &None).expect("rebuild"); assert_eq!(spec.error_schemas.len(), 1); assert_eq!(spec.error_schemas[0].code, "FILE_NOT_FOUND"); assert_eq!(spec.error_schemas[0].http_status, Some(404)); @@ -455,6 +545,7 @@ mod tests { let handler = make_forwarding_handler( Arc::new(CallConnection::new(stub_connection())), "worker/echo".to_string(), + None, ); let reg = HandlerRegistration::new( spec, @@ -468,4 +559,267 @@ mod tests { assert!(reg.composition_authority.is_none()); assert!(reg.scoped_env.is_none()); } + + // --- ADR-032: forwarded_for population -------------------------------- + + struct NoopEnv; + #[async_trait::async_trait] + impl crate::registry::env::OperationEnv for NoopEnv { + async fn invoke_with_policy( + &self, + _ns: &str, + _op: &str, + _input: Value, + parent: &OperationContext, + _policy: crate::registry::context::AbortPolicy, + ) -> ResponseEnvelope { + ResponseEnvelope::ok(parent.request_id.clone(), Value::Null) + } + fn contains(&self, _name: &str) -> bool { + false + } + } + + fn test_context(identity: Option) -> OperationContext { + use crate::registry::context::{AbortPolicy, ScopedOperationEnv}; + use std::collections::HashMap; + use std::time::{Duration, Instant}; + OperationContext { + request_id: "req-test".to_string(), + parent_request_id: None, + identity, + handler_identity: None, + forwarded_for: None, + capabilities: Capabilities::new(), + metadata: HashMap::new(), + scoped_env: ScopedOperationEnv::empty(), + env: Arc::new(NoopEnv), + abort_policy: AbortPolicy::default(), + deadline: Some(Instant::now() + Duration::from_secs(30)), + internal: false, + } + } + + fn alice_identity() -> Identity { + Identity { + id: "alice".to_string(), + scopes: vec!["fs:read".to_string()], + resources: HashMap::new(), + } + } + + #[test] + fn build_forwarded_payload_populates_forwarded_for_from_context_identity() { + let ctx = test_context(Some(alice_identity())); + let payload = build_forwarded_payload("fs/readFile", json!({"p": 1}), &ctx, None); + assert_eq!(payload["operationId"], "fs/readFile"); + assert_eq!(payload["input"], json!({"p": 1})); + let forwarded_for = payload.get("forwarded_for").expect("forwarded_for present"); + assert_eq!(forwarded_for["id"], "alice"); + assert_eq!(forwarded_for["scopes"][0], "fs:read"); + assert!(payload.get("auth_token").is_none()); + } + + #[test] + fn build_forwarded_payload_omits_forwarded_for_when_context_identity_is_none() { + let ctx = test_context(None); + let payload = build_forwarded_payload("fs/readFile", json!({}), &ctx, None); + assert!(payload.get("forwarded_for").is_none()); + assert!(payload.get("auth_token").is_none()); + assert_eq!(payload["operationId"], "fs/readFile"); + } + + #[test] + fn build_forwarded_payload_sets_auth_token_when_provided() { + let ctx = test_context(Some(alice_identity())); + let payload = + build_forwarded_payload("fs/readFile", json!({}), &ctx, Some("alk_hub_token")); + assert_eq!(payload["auth_token"], "alk_hub_token"); + assert_eq!(payload["forwarded_for"]["id"], "alice"); + } + + /// Verify the forwarding handler actually populates `forwarded_for` on + /// the wire payload it sends. We intercept the payload by using a handler + /// that records the payload passed to `call_with_payload`. Since + /// `call_with_payload` on a mock connection returns an error envelope + /// (no transport), we instead test the payload-construction function + /// directly (above) and rely on the handler wiring to call + /// `call_with_payload(payload)`. The handler's contract is: read + /// `context.identity`, build the payload, call. The payload-construction + /// function is the unit under test. + #[tokio::test] + async fn forwarding_handler_populates_forwarded_for_from_context_identity() { + let conn = Arc::new(CallConnection::new(stub_connection())); + let captured_payload = Arc::new(StdMutex::new(None::)); + let captured = Arc::clone(&captured_payload); + + let handler: Handler = { + let conn = Arc::clone(&conn); + make_handler(move |input, context| { + let conn = Arc::clone(&conn); + let captured = Arc::clone(&captured); + let remote_name = "fs/readFile".to_string(); + async move { + let payload = build_forwarded_payload(&remote_name, input, &context, None); + *captured.lock().unwrap() = Some(payload.clone()); + let response = conn.call_with_payload(payload).await; + ResponseEnvelope { + request_id: context.request_id, + result: response.result, + } + } + }) + }; + + let ctx = test_context(Some(alice_identity())); + let _ = handler(json!({}), ctx).await; + let payload = captured_payload.lock().unwrap().clone().expect("captured"); + assert_eq!(payload["forwarded_for"]["id"], "alice"); + assert_eq!(payload["operationId"], "fs/readFile"); + } + + #[tokio::test] + async fn forwarding_handler_omits_forwarded_for_when_context_identity_is_none() { + let conn = Arc::new(CallConnection::new(stub_connection())); + let captured_payload = Arc::new(StdMutex::new(None::)); + let captured = Arc::clone(&captured_payload); + + let handler: Handler = { + let conn = Arc::clone(&conn); + make_handler(move |input, context| { + let conn = Arc::clone(&conn); + let captured = Arc::clone(&captured); + let remote_name = "fs/readFile".to_string(); + async move { + let payload = build_forwarded_payload(&remote_name, input, &context, None); + *captured.lock().unwrap() = Some(payload.clone()); + let response = conn.call_with_payload(payload).await; + ResponseEnvelope { + request_id: context.request_id, + result: response.result, + } + } + }) + }; + + let ctx = test_context(None); + let _ = handler(json!({}), ctx).await; + let payload = captured_payload.lock().unwrap().clone().expect("captured"); + assert!( + payload.get("forwarded_for").is_none(), + "forwarded_for must be omitted when context.identity is None" + ); + } + + // --- ADR-029 §5: collision rule --------------------------------------- + + fn op_summary(name: &str, conn: &CallConnection) -> OpSummary { + OpSummary { + name: name.to_string(), + schema: sample_schema_json(name, "query"), + connection: conn.clone(), + credentials_auth_token: None, + } + } + + #[test] + fn build_bundles_same_peer_collision_returns_same_peer_collision_error() { + let conn = CallConnection::new(stub_connection()); + // Same peer exposing two ops that resolve to the same name after the + // (empty) prefix → SamePeerCollision. + let discovered = vec![ + op_summary("worker/exec", &conn), + op_summary("worker/exec", &conn), + ]; + match build_bundles(discovered, &None, &None) { + Err(AdapterError::SamePeerCollision { message }) => { + assert!(message.contains("worker/exec")); + } + Err(other) => panic!("expected SamePeerCollision, got another error: {other}"), + Ok(_) => panic!("expected SamePeerCollision, got Ok"), + } + } + + #[test] + fn build_bundles_same_peer_collision_after_prefix_returns_error() { + let conn = CallConnection::new(stub_connection()); + // Two ops with different remote names that collide after the prefix is + // applied (prefix drops, then same name) → SamePeerCollision. Here we + // use the same remote name twice, which is the canonical same-peer + // collision. + let discovered = vec![ + op_summary("fs/readFile", &conn), + op_summary("fs/readFile", &conn), + ]; + match build_bundles(discovered, &Some("worker".to_string()), &None) { + Err(AdapterError::SamePeerCollision { message }) => { + assert!(message.contains("worker/fs/readFile")); + } + Err(other) => panic!("expected SamePeerCollision, got another error: {other}"), + Ok(_) => panic!("expected SamePeerCollision, got Ok"), + } + } + + #[test] + fn build_bundles_cross_peer_same_name_does_not_collide() { + // Cross-peer collision dissolves (ADR-029 §5): the same name on + // different peers lives in separate sub-overlays. `from_call` runs + // per-connection (per-peer), so the `build_bundles` collision check is + // same-peer only. This test verifies that a single `build_bundles` + // call with distinct names succeeds — the cross-peer case is + // structurally separate `from_call` invocations on different + // connections, each producing its own bundle set with no collision. + let conn_a = CallConnection::new(stub_connection()); + let conn_b = CallConnection::new(stub_connection()); + + let bundles_a = build_bundles(vec![op_summary("container/exec", &conn_a)], &None, &None) + .expect("peer a bundles"); + let bundles_b = build_bundles(vec![op_summary("container/exec", &conn_b)], &None, &None) + .expect("peer b bundles"); + + assert_eq!(bundles_a.len(), 1); + assert_eq!(bundles_b.len(), 1); + assert_eq!(bundles_a[0].spec.name, "container/exec"); + assert_eq!(bundles_b[0].spec.name, "container/exec"); + // Same name, different peer sub-overlays — no collision. + } + + #[test] + fn build_bundles_distinct_names_in_same_peer_do_not_collide() { + let conn = CallConnection::new(stub_connection()); + let discovered = vec![ + op_summary("worker/exec", &conn), + op_summary("worker/status", &conn), + op_summary("fs/readFile", &conn), + ]; + let bundles = build_bundles(discovered, &None, &None).expect("distinct names ok"); + assert_eq!(bundles.len(), 3); + for b in &bundles { + assert_eq!(b.provenance, OperationProvenance::FromCall); + } + } + + #[test] + fn build_bundles_applies_namespace_prefix_without_collision() { + let conn = CallConnection::new(stub_connection()); + let discovered = vec![op_summary("exec", &conn), op_summary("status", &conn)]; + let bundles = + build_bundles(discovered, &Some("worker".to_string()), &None).expect("prefixed ok"); + assert_eq!(bundles[0].spec.name, "worker/exec"); + assert_eq!(bundles[1].spec.name, "worker/status"); + } + + #[test] + fn build_bundles_respects_operation_filter() { + let conn = CallConnection::new(stub_connection()); + let discovered = vec![ + op_summary("worker/exec", &conn), + op_summary("worker/status", &conn), + op_summary("fs/readFile", &conn), + ]; + let filter: HashSet = HashSet::from(["worker/exec".to_string()]); + let bundles = build_bundles(discovered, &None, &Some(filter)).expect("filtered ok"); + assert_eq!(bundles.len(), 1); + assert_eq!(bundles[0].spec.name, "worker/exec"); + } } diff --git a/crates/alknet-call/src/client/mod.rs b/crates/alknet-call/src/client/mod.rs index b0a7adf..bd162e6 100644 --- a/crates/alknet-call/src/client/mod.rs +++ b/crates/alknet-call/src/client/mod.rs @@ -42,9 +42,12 @@ pub enum AdapterError { #[error("unauthorized: {message}")] Unauthorized { message: String }, - /// Namespace collision in `from_call` (DC-3); reused for other adapters. - #[error("conflict: {message}")] - Conflict { message: String }, + /// Same-peer namespace collision in `from_call` (ADR-029 §5; OQ-26). + /// Cross-peer collision dissolves (same name on different peers lives in + /// separate sub-overlays); same-peer collision stays an error — a peer + /// shouldn't expose two ops with the same name. + #[error("same-peer collision: {message}")] + SamePeerCollision { message: String }, } /// Import a set of operations as `HandlerRegistration` bundles. diff --git a/crates/alknet-call/src/protocol/connection.rs b/crates/alknet-call/src/protocol/connection.rs index e5a4f25..f90b0e9 100644 --- a/crates/alknet-call/src/protocol/connection.rs +++ b/crates/alknet-call/src/protocol/connection.rs @@ -83,11 +83,19 @@ impl CallConnection { } pub async fn call(&self, operation_id: &str, input: Value) -> ResponseEnvelope { - let request_id = generate_request_id(); let payload = serde_json::json!({ "operationId": operation_id, "input": input, }); + self.call_with_payload(payload).await + } + + /// Invoke a remote op with a caller-constructed `call.requested` payload. + /// The payload MUST include `operationId` and `input`; the caller may add + /// `forwarded_for` (ADR-032) and `auth_token` (ADR-017 §7) for the hub + /// forwarding path used by `from_call`. + pub async fn call_with_payload(&self, payload: Value) -> ResponseEnvelope { + let request_id = generate_request_id(); let (send, recv) = match self.connection.open_bi().await { Ok(pair) => pair,