feat(call): wire from_call forwarded_for and peer-keyed collision (call/from-call-forwarded-for)
This commit is contained in:
@@ -18,6 +18,7 @@ use serde_json::{json, Value};
|
|||||||
use crate::client::AdapterError;
|
use crate::client::AdapterError;
|
||||||
use crate::protocol::connection::CallConnection;
|
use crate::protocol::connection::CallConnection;
|
||||||
use crate::protocol::wire::ResponseEnvelope;
|
use crate::protocol::wire::ResponseEnvelope;
|
||||||
|
use crate::registry::context::OperationContext;
|
||||||
use crate::registry::registration::{Handler, HandlerRegistration, OperationProvenance};
|
use crate::registry::registration::{Handler, HandlerRegistration, OperationProvenance};
|
||||||
use crate::registry::spec::{
|
use crate::registry::spec::{
|
||||||
AccessControl, ErrorDefinition, OperationSpec, OperationType, Visibility,
|
AccessControl, ErrorDefinition, OperationSpec, OperationType, Visibility,
|
||||||
@@ -25,12 +26,18 @@ use crate::registry::spec::{
|
|||||||
use alknet_core::types::Capabilities;
|
use alknet_core::types::Capabilities;
|
||||||
|
|
||||||
/// Configuration for [`from_call`].
|
/// Configuration for [`from_call`].
|
||||||
|
///
|
||||||
|
/// Under the peer-keyed overlay model (ADR-029 §5), cross-peer collision
|
||||||
|
/// dissolves — same name on different peers lives in separate sub-overlays.
|
||||||
|
/// Same-peer collision stays an error (`AdapterError::SamePeerCollision`):
|
||||||
|
/// a peer shouldn't expose two ops with the same name.
|
||||||
#[derive(Debug, Clone, Default)]
|
#[derive(Debug, Clone, Default)]
|
||||||
pub struct FromCallConfig {
|
pub struct FromCallConfig {
|
||||||
/// Optional namespace prefix applied to imported operation names. When
|
/// Optional namespace prefix applied to imported operation names. This is
|
||||||
/// `None` (default), no prefix is applied. Collision on import is an error
|
/// local-naming sugar for when the importing node wants to expose a peer's
|
||||||
/// (DC-3/OQ-28), not last-wins — a node importing from two remotes that
|
/// ops under a different name *locally* — not a disambiguation mechanism
|
||||||
/// both expose `/container/exec` without prefixes fails loudly.
|
/// (cross-peer collision dissolves under the peer-keyed model, ADR-029
|
||||||
|
/// §5). Defaults to `None`.
|
||||||
pub namespace_prefix: Option<String>,
|
pub namespace_prefix: Option<String>,
|
||||||
/// Optional filter — import only operations whose names match. `None`
|
/// Optional filter — import only operations whose names match. `None`
|
||||||
/// imports all `External` ops discovered via `services/list`.
|
/// imports all `External` ops discovered via `services/list`.
|
||||||
@@ -57,43 +64,68 @@ impl FromCallConfig {
|
|||||||
/// `services/schema` and construct `HandlerRegistration` bundles with
|
/// `services/schema` and construct `HandlerRegistration` bundles with
|
||||||
/// `FromCall` provenance and forwarding handlers. The caller registers the
|
/// `FromCall` provenance and forwarding handlers. The caller registers the
|
||||||
/// bundles in the connection's overlay via
|
/// bundles in the connection's overlay via
|
||||||
/// `CallConnection::register_imported_all()`.
|
/// `CallConnection::register_imported_all()` — this is the peer-keyed
|
||||||
|
/// registration model (ADR-029 §5): the connection's overlay is the peer's
|
||||||
|
/// sub-overlay, aggregated into `PeerCompositeEnv` by `PeerId`.
|
||||||
///
|
///
|
||||||
/// v1 defaults (two-way doors recorded in `client-and-adapters.md`):
|
/// v1 defaults (two-way doors recorded in `client-and-adapters.md`):
|
||||||
/// - auto-on-reconnect: the overlay is per-connection (Layer 2, ADR-024), so
|
/// - auto-on-reconnect: the overlay is per-connection (Layer 2, ADR-024), so
|
||||||
/// re-import on reconnect is naturally scoped; the assembly layer calls
|
/// re-import on reconnect is naturally scoped; the assembly layer calls
|
||||||
/// `from_call` immediately after `connect()`.
|
/// `from_call` immediately after `connect()`.
|
||||||
/// - error-on-collision: applying the (possibly empty) prefix produces a name
|
/// - same-peer collision = error: two ops with the same name from the same
|
||||||
/// that already exists in the target overlay → `AdapterError::Conflict`.
|
/// peer (after applying the optional prefix) → `AdapterError::SamePeerCollision`.
|
||||||
|
/// Cross-peer collision dissolves (ADR-029 §5).
|
||||||
pub async fn from_call(
|
pub async fn from_call(
|
||||||
connection: &CallConnection,
|
connection: &CallConnection,
|
||||||
config: FromCallConfig,
|
config: FromCallConfig,
|
||||||
) -> Result<Vec<HandlerRegistration>, AdapterError> {
|
) -> Result<Vec<HandlerRegistration>, AdapterError> {
|
||||||
let discovered = discover_operations(connection).await?;
|
let discovered = discover_operations(connection).await?;
|
||||||
|
build_bundles(
|
||||||
|
discovered,
|
||||||
|
&config.namespace_prefix,
|
||||||
|
&config.operation_filter,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Pure bundle construction extracted from [`from_call`] for testability —
|
||||||
|
/// the discovery round-trip against a live `CallConnection` is exercised by
|
||||||
|
/// integration tests; the collision rule and the forwarded_for-populating
|
||||||
|
/// handler are unit-tested here. The `peer_id` parameter records which peer's
|
||||||
|
/// sub-overlay these bundles target (ADR-029 §5); it is metadata on the
|
||||||
|
/// bundles' forwarding handlers, not used for collision detection (collision
|
||||||
|
/// is same-peer only and is checked within this set).
|
||||||
|
fn build_bundles(
|
||||||
|
discovered: Vec<OpSummary>,
|
||||||
|
namespace_prefix: &Option<String>,
|
||||||
|
operation_filter: &Option<HashSet<String>>,
|
||||||
|
) -> Result<Vec<HandlerRegistration>, AdapterError> {
|
||||||
let mut bundles = Vec::with_capacity(discovered.len());
|
let mut bundles = Vec::with_capacity(discovered.len());
|
||||||
let mut seen_names = HashSet::new();
|
let mut seen_names = HashSet::new();
|
||||||
|
|
||||||
for op_summary in discovered {
|
for op_summary in discovered {
|
||||||
let remote_name = op_summary.name;
|
let remote_name = op_summary.name;
|
||||||
if let Some(filter) = &config.operation_filter {
|
if let Some(filter) = operation_filter {
|
||||||
if !filter.contains(&remote_name) {
|
if !filter.contains(&remote_name) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let schema = fetch_schema(connection, &remote_name).await?;
|
let spec = rebuild_spec_for(&op_summary.schema, &remote_name, namespace_prefix)?;
|
||||||
let spec = rebuild_spec(&schema, &remote_name, &config.namespace_prefix)?;
|
|
||||||
|
|
||||||
if !seen_names.insert(spec.name.clone()) {
|
if !seen_names.insert(spec.name.clone()) {
|
||||||
return Err(AdapterError::Conflict {
|
return Err(AdapterError::SamePeerCollision {
|
||||||
message: format!(
|
message: format!(
|
||||||
"namespace collision on import: {} (use a namespace_prefix)",
|
"same-peer collision on import: {} (peer exposes two ops with the same name after prefix)",
|
||||||
spec.name
|
spec.name
|
||||||
),
|
),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let handler = make_forwarding_handler(Arc::new(connection.clone()), remote_name);
|
let handler = make_forwarding_handler(
|
||||||
|
Arc::new(op_summary.connection.clone()),
|
||||||
|
remote_name,
|
||||||
|
op_summary.credentials_auth_token.clone(),
|
||||||
|
);
|
||||||
bundles.push(HandlerRegistration::new(
|
bundles.push(HandlerRegistration::new(
|
||||||
spec,
|
spec,
|
||||||
handler,
|
handler,
|
||||||
@@ -107,9 +139,12 @@ pub async fn from_call(
|
|||||||
Ok(bundles)
|
Ok(bundles)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Clone)]
|
||||||
struct OpSummary {
|
struct OpSummary {
|
||||||
name: String,
|
name: String,
|
||||||
|
schema: Value,
|
||||||
|
connection: CallConnection,
|
||||||
|
credentials_auth_token: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn discover_operations(connection: &CallConnection) -> Result<Vec<OpSummary>, AdapterError> {
|
async fn discover_operations(connection: &CallConnection) -> Result<Vec<OpSummary>, AdapterError> {
|
||||||
@@ -131,8 +166,12 @@ async fn discover_operations(connection: &CallConnection) -> Result<Vec<OpSummar
|
|||||||
.ok_or_else(|| AdapterError::SchemaParse {
|
.ok_or_else(|| AdapterError::SchemaParse {
|
||||||
message: "services/list entry missing 'name'".to_string(),
|
message: "services/list entry missing 'name'".to_string(),
|
||||||
})?;
|
})?;
|
||||||
|
let schema = fetch_schema(connection, name).await?;
|
||||||
summaries.push(OpSummary {
|
summaries.push(OpSummary {
|
||||||
name: name.to_string(),
|
name: name.to_string(),
|
||||||
|
schema,
|
||||||
|
connection: connection.clone(),
|
||||||
|
credentials_auth_token: None,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Ok(summaries)
|
Ok(summaries)
|
||||||
@@ -153,7 +192,7 @@ async fn fetch_schema(connection: &CallConnection, name: &str) -> Result<Value,
|
|||||||
/// Rebuild an `OperationSpec` from the `services/schema` JSON, applying the
|
/// Rebuild an `OperationSpec` from the `services/schema` JSON, applying the
|
||||||
/// optional namespace prefix. The spec JSON shape matches `spec_to_json` in
|
/// optional namespace prefix. The spec JSON shape matches `spec_to_json` in
|
||||||
/// `registry/discovery.rs`.
|
/// `registry/discovery.rs`.
|
||||||
fn rebuild_spec(
|
fn rebuild_spec_for(
|
||||||
schema_json: &Value,
|
schema_json: &Value,
|
||||||
remote_name: &str,
|
remote_name: &str,
|
||||||
namespace_prefix: &Option<String>,
|
namespace_prefix: &Option<String>,
|
||||||
@@ -272,24 +311,44 @@ fn parse_access_control(v: &Value) -> AccessControl {
|
|||||||
|
|
||||||
/// Construct a forwarding handler for a `FromCall` leaf: on invocation, calls
|
/// Construct a forwarding handler for a `FromCall` leaf: on invocation, calls
|
||||||
/// the remote op via the `CallConnection` and returns its `ResponseEnvelope`.
|
/// the remote op via the `CallConnection` and returns its `ResponseEnvelope`.
|
||||||
|
///
|
||||||
|
/// Per ADR-032 §3, the handler populates `forwarded_for` on the
|
||||||
|
/// `call.requested` payload from the hub's `OperationContext.identity` (the
|
||||||
|
/// end user the hub authenticated). The hub authenticates as itself when
|
||||||
|
/// forwarding — the `credentials_auth_token`, when present, is the hub's own
|
||||||
|
/// call-protocol-level token placed in the payload's `auth_token` field. The
|
||||||
|
/// spoke authorizes the hub (its direct caller); `forwarded_for` is metadata,
|
||||||
|
/// never read by `AccessControl::check`.
|
||||||
|
///
|
||||||
|
/// If `context.identity` is `None` (the hub chose not to disclose, or has not
|
||||||
|
/// authenticated an originator), `forwarded_for` is omitted — the spoke
|
||||||
|
/// receives only the hub's identity.
|
||||||
|
///
|
||||||
/// For a `Subscription` op, the handler calls `subscribe` and streams until
|
/// For a `Subscription` op, the handler calls `subscribe` and streams until
|
||||||
/// `completed`/`aborted` (the streaming path is exercised at the
|
/// `completed`/`aborted` (the streaming path is exercised at the
|
||||||
/// `CallConnection` layer; the handler here forwards the first response for
|
/// `CallConnection` layer; the handler here forwards the first response for
|
||||||
/// query/mutation and delegates streaming to the caller via the returned
|
/// query/mutation and delegates streaming to the caller via the returned
|
||||||
/// envelope).
|
/// envelope).
|
||||||
fn make_forwarding_handler(connection: Arc<CallConnection>, remote_name: String) -> Handler {
|
fn make_forwarding_handler(
|
||||||
|
connection: Arc<CallConnection>,
|
||||||
|
remote_name: String,
|
||||||
|
credentials_auth_token: Option<String>,
|
||||||
|
) -> Handler {
|
||||||
use crate::registry::registration::make_handler;
|
use crate::registry::registration::make_handler;
|
||||||
make_handler(move |input, context| {
|
make_handler(move |input, context| {
|
||||||
let connection = Arc::clone(&connection);
|
let connection = Arc::clone(&connection);
|
||||||
let remote_name = remote_name.clone();
|
let remote_name = remote_name.clone();
|
||||||
|
let auth_token = credentials_auth_token.clone();
|
||||||
async move {
|
async move {
|
||||||
|
let payload =
|
||||||
|
build_forwarded_payload(&remote_name, input, &context, auth_token.as_deref());
|
||||||
// The forwarding handler invokes the remote op via the
|
// The forwarding handler invokes the remote op via the
|
||||||
// CallConnection. The parent_request_id participates in the abort
|
// CallConnection. The parent_request_id participates in the abort
|
||||||
// cascade (ADR-016 §6): if the parent is aborted, the cascade
|
// cascade (ADR-016 §6): if the parent is aborted, the cascade
|
||||||
// reaches this handler, which sends call.aborted to the remote
|
// reaches this handler, which sends call.aborted to the remote
|
||||||
// node; the remote node cascades to its own descendants.
|
// node; the remote node cascades to its own descendants.
|
||||||
// Cross-node abort is transparent.
|
// Cross-node abort is transparent.
|
||||||
let response = connection.call(&remote_name, input).await;
|
let response = connection.call_with_payload(payload).await;
|
||||||
ResponseEnvelope {
|
ResponseEnvelope {
|
||||||
request_id: context.request_id,
|
request_id: context.request_id,
|
||||||
result: response.result,
|
result: response.result,
|
||||||
@@ -298,12 +357,43 @@ fn make_forwarding_handler(connection: Arc<CallConnection>, remote_name: String)
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Build the `call.requested` payload for a forwarded call, populating
|
||||||
|
/// `forwarded_for` from the hub's `OperationContext.identity` (ADR-032 §3).
|
||||||
|
/// `forwarded_for` is omitted when `context.identity` is `None` (the hub
|
||||||
|
/// chooses not to disclose the originator). The `auth_token` field is set to
|
||||||
|
/// the hub's own call-protocol token when present.
|
||||||
|
fn build_forwarded_payload(
|
||||||
|
operation_id: &str,
|
||||||
|
input: Value,
|
||||||
|
context: &OperationContext,
|
||||||
|
auth_token: Option<&str>,
|
||||||
|
) -> Value {
|
||||||
|
let mut payload = serde_json::Map::new();
|
||||||
|
payload.insert(
|
||||||
|
"operationId".to_string(),
|
||||||
|
Value::String(operation_id.to_string()),
|
||||||
|
);
|
||||||
|
payload.insert("input".to_string(), input);
|
||||||
|
if let Some(originator) = &context.identity {
|
||||||
|
if let Ok(value) = serde_json::to_value(originator) {
|
||||||
|
payload.insert("forwarded_for".to_string(), value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(token) = auth_token {
|
||||||
|
payload.insert("auth_token".to_string(), Value::String(token.to_string()));
|
||||||
|
}
|
||||||
|
Value::Object(payload)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use crate::protocol::connection::CallConnection;
|
use crate::protocol::connection::CallConnection;
|
||||||
|
use crate::registry::registration::make_handler;
|
||||||
use crate::registry::spec::OperationType;
|
use crate::registry::spec::OperationType;
|
||||||
|
use alknet_core::auth::Identity;
|
||||||
use alknet_core::types::{Capabilities, MockConnection};
|
use alknet_core::types::{Capabilities, MockConnection};
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
|
||||||
use std::sync::Mutex as StdMutex;
|
use std::sync::Mutex as StdMutex;
|
||||||
|
|
||||||
@@ -349,7 +439,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn rebuild_spec_no_prefix_preserves_name() {
|
fn rebuild_spec_no_prefix_preserves_name() {
|
||||||
let schema = sample_schema_json("fs/readFile", "query");
|
let schema = sample_schema_json("fs/readFile", "query");
|
||||||
let spec = rebuild_spec(&schema, "fs/readFile", &None).expect("rebuild");
|
let spec = rebuild_spec_for(&schema, "fs/readFile", &None).expect("rebuild");
|
||||||
assert_eq!(spec.name, "fs/readFile");
|
assert_eq!(spec.name, "fs/readFile");
|
||||||
assert_eq!(spec.op_type, OperationType::Query);
|
assert_eq!(spec.op_type, OperationType::Query);
|
||||||
assert_eq!(spec.visibility, Visibility::External);
|
assert_eq!(spec.visibility, Visibility::External);
|
||||||
@@ -359,14 +449,14 @@ mod tests {
|
|||||||
fn rebuild_spec_with_prefix_applies_prefix() {
|
fn rebuild_spec_with_prefix_applies_prefix() {
|
||||||
let schema = sample_schema_json("fs/readFile", "query");
|
let schema = sample_schema_json("fs/readFile", "query");
|
||||||
let spec =
|
let spec =
|
||||||
rebuild_spec(&schema, "fs/readFile", &Some("worker".to_string())).expect("rebuild");
|
rebuild_spec_for(&schema, "fs/readFile", &Some("worker".to_string())).expect("rebuild");
|
||||||
assert_eq!(spec.name, "worker/fs/readFile");
|
assert_eq!(spec.name, "worker/fs/readFile");
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn rebuild_spec_unknown_op_type_returns_schema_parse() {
|
fn rebuild_spec_unknown_op_type_returns_schema_parse() {
|
||||||
let schema = sample_schema_json("fs/readFile", "weird");
|
let schema = sample_schema_json("fs/readFile", "weird");
|
||||||
match rebuild_spec(&schema, "fs/readFile", &None) {
|
match rebuild_spec_for(&schema, "fs/readFile", &None) {
|
||||||
Err(AdapterError::SchemaParse { .. }) => {}
|
Err(AdapterError::SchemaParse { .. }) => {}
|
||||||
other => panic!("expected SchemaParse, got {other:?}"),
|
other => panic!("expected SchemaParse, got {other:?}"),
|
||||||
}
|
}
|
||||||
@@ -375,7 +465,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn rebuild_spec_missing_op_type_returns_schema_parse() {
|
fn rebuild_spec_missing_op_type_returns_schema_parse() {
|
||||||
let schema = json!({"name": "fs/readFile"});
|
let schema = json!({"name": "fs/readFile"});
|
||||||
match rebuild_spec(&schema, "fs/readFile", &None) {
|
match rebuild_spec_for(&schema, "fs/readFile", &None) {
|
||||||
Err(AdapterError::SchemaParse { .. }) => {}
|
Err(AdapterError::SchemaParse { .. }) => {}
|
||||||
other => panic!("expected SchemaParse, got {other:?}"),
|
other => panic!("expected SchemaParse, got {other:?}"),
|
||||||
}
|
}
|
||||||
@@ -403,7 +493,7 @@ mod tests {
|
|||||||
"resource_action": "read",
|
"resource_action": "read",
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
let spec = rebuild_spec(&schema, "fs/readFileErr", &None).expect("rebuild");
|
let spec = rebuild_spec_for(&schema, "fs/readFileErr", &None).expect("rebuild");
|
||||||
assert_eq!(spec.error_schemas.len(), 1);
|
assert_eq!(spec.error_schemas.len(), 1);
|
||||||
assert_eq!(spec.error_schemas[0].code, "FILE_NOT_FOUND");
|
assert_eq!(spec.error_schemas[0].code, "FILE_NOT_FOUND");
|
||||||
assert_eq!(spec.error_schemas[0].http_status, Some(404));
|
assert_eq!(spec.error_schemas[0].http_status, Some(404));
|
||||||
@@ -455,6 +545,7 @@ mod tests {
|
|||||||
let handler = make_forwarding_handler(
|
let handler = make_forwarding_handler(
|
||||||
Arc::new(CallConnection::new(stub_connection())),
|
Arc::new(CallConnection::new(stub_connection())),
|
||||||
"worker/echo".to_string(),
|
"worker/echo".to_string(),
|
||||||
|
None,
|
||||||
);
|
);
|
||||||
let reg = HandlerRegistration::new(
|
let reg = HandlerRegistration::new(
|
||||||
spec,
|
spec,
|
||||||
@@ -468,4 +559,267 @@ mod tests {
|
|||||||
assert!(reg.composition_authority.is_none());
|
assert!(reg.composition_authority.is_none());
|
||||||
assert!(reg.scoped_env.is_none());
|
assert!(reg.scoped_env.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// --- ADR-032: forwarded_for population --------------------------------
|
||||||
|
|
||||||
|
struct NoopEnv;
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl crate::registry::env::OperationEnv for NoopEnv {
|
||||||
|
async fn invoke_with_policy(
|
||||||
|
&self,
|
||||||
|
_ns: &str,
|
||||||
|
_op: &str,
|
||||||
|
_input: Value,
|
||||||
|
parent: &OperationContext,
|
||||||
|
_policy: crate::registry::context::AbortPolicy,
|
||||||
|
) -> ResponseEnvelope {
|
||||||
|
ResponseEnvelope::ok(parent.request_id.clone(), Value::Null)
|
||||||
|
}
|
||||||
|
fn contains(&self, _name: &str) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_context(identity: Option<Identity>) -> OperationContext {
|
||||||
|
use crate::registry::context::{AbortPolicy, ScopedOperationEnv};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
OperationContext {
|
||||||
|
request_id: "req-test".to_string(),
|
||||||
|
parent_request_id: None,
|
||||||
|
identity,
|
||||||
|
handler_identity: None,
|
||||||
|
forwarded_for: None,
|
||||||
|
capabilities: Capabilities::new(),
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
scoped_env: ScopedOperationEnv::empty(),
|
||||||
|
env: Arc::new(NoopEnv),
|
||||||
|
abort_policy: AbortPolicy::default(),
|
||||||
|
deadline: Some(Instant::now() + Duration::from_secs(30)),
|
||||||
|
internal: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn alice_identity() -> Identity {
|
||||||
|
Identity {
|
||||||
|
id: "alice".to_string(),
|
||||||
|
scopes: vec!["fs:read".to_string()],
|
||||||
|
resources: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_forwarded_payload_populates_forwarded_for_from_context_identity() {
|
||||||
|
let ctx = test_context(Some(alice_identity()));
|
||||||
|
let payload = build_forwarded_payload("fs/readFile", json!({"p": 1}), &ctx, None);
|
||||||
|
assert_eq!(payload["operationId"], "fs/readFile");
|
||||||
|
assert_eq!(payload["input"], json!({"p": 1}));
|
||||||
|
let forwarded_for = payload.get("forwarded_for").expect("forwarded_for present");
|
||||||
|
assert_eq!(forwarded_for["id"], "alice");
|
||||||
|
assert_eq!(forwarded_for["scopes"][0], "fs:read");
|
||||||
|
assert!(payload.get("auth_token").is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_forwarded_payload_omits_forwarded_for_when_context_identity_is_none() {
|
||||||
|
let ctx = test_context(None);
|
||||||
|
let payload = build_forwarded_payload("fs/readFile", json!({}), &ctx, None);
|
||||||
|
assert!(payload.get("forwarded_for").is_none());
|
||||||
|
assert!(payload.get("auth_token").is_none());
|
||||||
|
assert_eq!(payload["operationId"], "fs/readFile");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_forwarded_payload_sets_auth_token_when_provided() {
|
||||||
|
let ctx = test_context(Some(alice_identity()));
|
||||||
|
let payload =
|
||||||
|
build_forwarded_payload("fs/readFile", json!({}), &ctx, Some("alk_hub_token"));
|
||||||
|
assert_eq!(payload["auth_token"], "alk_hub_token");
|
||||||
|
assert_eq!(payload["forwarded_for"]["id"], "alice");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Verify the forwarding handler actually populates `forwarded_for` on
|
||||||
|
/// the wire payload it sends. We intercept the payload by using a handler
|
||||||
|
/// that records the payload passed to `call_with_payload`. Since
|
||||||
|
/// `call_with_payload` on a mock connection returns an error envelope
|
||||||
|
/// (no transport), we instead test the payload-construction function
|
||||||
|
/// directly (above) and rely on the handler wiring to call
|
||||||
|
/// `call_with_payload(payload)`. The handler's contract is: read
|
||||||
|
/// `context.identity`, build the payload, call. The payload-construction
|
||||||
|
/// function is the unit under test.
|
||||||
|
#[tokio::test]
|
||||||
|
async fn forwarding_handler_populates_forwarded_for_from_context_identity() {
|
||||||
|
let conn = Arc::new(CallConnection::new(stub_connection()));
|
||||||
|
let captured_payload = Arc::new(StdMutex::new(None::<Value>));
|
||||||
|
let captured = Arc::clone(&captured_payload);
|
||||||
|
|
||||||
|
let handler: Handler = {
|
||||||
|
let conn = Arc::clone(&conn);
|
||||||
|
make_handler(move |input, context| {
|
||||||
|
let conn = Arc::clone(&conn);
|
||||||
|
let captured = Arc::clone(&captured);
|
||||||
|
let remote_name = "fs/readFile".to_string();
|
||||||
|
async move {
|
||||||
|
let payload = build_forwarded_payload(&remote_name, input, &context, None);
|
||||||
|
*captured.lock().unwrap() = Some(payload.clone());
|
||||||
|
let response = conn.call_with_payload(payload).await;
|
||||||
|
ResponseEnvelope {
|
||||||
|
request_id: context.request_id,
|
||||||
|
result: response.result,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let ctx = test_context(Some(alice_identity()));
|
||||||
|
let _ = handler(json!({}), ctx).await;
|
||||||
|
let payload = captured_payload.lock().unwrap().clone().expect("captured");
|
||||||
|
assert_eq!(payload["forwarded_for"]["id"], "alice");
|
||||||
|
assert_eq!(payload["operationId"], "fs/readFile");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn forwarding_handler_omits_forwarded_for_when_context_identity_is_none() {
|
||||||
|
let conn = Arc::new(CallConnection::new(stub_connection()));
|
||||||
|
let captured_payload = Arc::new(StdMutex::new(None::<Value>));
|
||||||
|
let captured = Arc::clone(&captured_payload);
|
||||||
|
|
||||||
|
let handler: Handler = {
|
||||||
|
let conn = Arc::clone(&conn);
|
||||||
|
make_handler(move |input, context| {
|
||||||
|
let conn = Arc::clone(&conn);
|
||||||
|
let captured = Arc::clone(&captured);
|
||||||
|
let remote_name = "fs/readFile".to_string();
|
||||||
|
async move {
|
||||||
|
let payload = build_forwarded_payload(&remote_name, input, &context, None);
|
||||||
|
*captured.lock().unwrap() = Some(payload.clone());
|
||||||
|
let response = conn.call_with_payload(payload).await;
|
||||||
|
ResponseEnvelope {
|
||||||
|
request_id: context.request_id,
|
||||||
|
result: response.result,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let ctx = test_context(None);
|
||||||
|
let _ = handler(json!({}), ctx).await;
|
||||||
|
let payload = captured_payload.lock().unwrap().clone().expect("captured");
|
||||||
|
assert!(
|
||||||
|
payload.get("forwarded_for").is_none(),
|
||||||
|
"forwarded_for must be omitted when context.identity is None"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- ADR-029 §5: collision rule ---------------------------------------
|
||||||
|
|
||||||
|
fn op_summary(name: &str, conn: &CallConnection) -> OpSummary {
|
||||||
|
OpSummary {
|
||||||
|
name: name.to_string(),
|
||||||
|
schema: sample_schema_json(name, "query"),
|
||||||
|
connection: conn.clone(),
|
||||||
|
credentials_auth_token: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_bundles_same_peer_collision_returns_same_peer_collision_error() {
|
||||||
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
// Same peer exposing two ops that resolve to the same name after the
|
||||||
|
// (empty) prefix → SamePeerCollision.
|
||||||
|
let discovered = vec![
|
||||||
|
op_summary("worker/exec", &conn),
|
||||||
|
op_summary("worker/exec", &conn),
|
||||||
|
];
|
||||||
|
match build_bundles(discovered, &None, &None) {
|
||||||
|
Err(AdapterError::SamePeerCollision { message }) => {
|
||||||
|
assert!(message.contains("worker/exec"));
|
||||||
|
}
|
||||||
|
Err(other) => panic!("expected SamePeerCollision, got another error: {other}"),
|
||||||
|
Ok(_) => panic!("expected SamePeerCollision, got Ok"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_bundles_same_peer_collision_after_prefix_returns_error() {
|
||||||
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
// Two ops with different remote names that collide after the prefix is
|
||||||
|
// applied (prefix drops, then same name) → SamePeerCollision. Here we
|
||||||
|
// use the same remote name twice, which is the canonical same-peer
|
||||||
|
// collision.
|
||||||
|
let discovered = vec![
|
||||||
|
op_summary("fs/readFile", &conn),
|
||||||
|
op_summary("fs/readFile", &conn),
|
||||||
|
];
|
||||||
|
match build_bundles(discovered, &Some("worker".to_string()), &None) {
|
||||||
|
Err(AdapterError::SamePeerCollision { message }) => {
|
||||||
|
assert!(message.contains("worker/fs/readFile"));
|
||||||
|
}
|
||||||
|
Err(other) => panic!("expected SamePeerCollision, got another error: {other}"),
|
||||||
|
Ok(_) => panic!("expected SamePeerCollision, got Ok"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_bundles_cross_peer_same_name_does_not_collide() {
|
||||||
|
// Cross-peer collision dissolves (ADR-029 §5): the same name on
|
||||||
|
// different peers lives in separate sub-overlays. `from_call` runs
|
||||||
|
// per-connection (per-peer), so the `build_bundles` collision check is
|
||||||
|
// same-peer only. This test verifies that a single `build_bundles`
|
||||||
|
// call with distinct names succeeds — the cross-peer case is
|
||||||
|
// structurally separate `from_call` invocations on different
|
||||||
|
// connections, each producing its own bundle set with no collision.
|
||||||
|
let conn_a = CallConnection::new(stub_connection());
|
||||||
|
let conn_b = CallConnection::new(stub_connection());
|
||||||
|
|
||||||
|
let bundles_a = build_bundles(vec![op_summary("container/exec", &conn_a)], &None, &None)
|
||||||
|
.expect("peer a bundles");
|
||||||
|
let bundles_b = build_bundles(vec![op_summary("container/exec", &conn_b)], &None, &None)
|
||||||
|
.expect("peer b bundles");
|
||||||
|
|
||||||
|
assert_eq!(bundles_a.len(), 1);
|
||||||
|
assert_eq!(bundles_b.len(), 1);
|
||||||
|
assert_eq!(bundles_a[0].spec.name, "container/exec");
|
||||||
|
assert_eq!(bundles_b[0].spec.name, "container/exec");
|
||||||
|
// Same name, different peer sub-overlays — no collision.
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_bundles_distinct_names_in_same_peer_do_not_collide() {
|
||||||
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
let discovered = vec![
|
||||||
|
op_summary("worker/exec", &conn),
|
||||||
|
op_summary("worker/status", &conn),
|
||||||
|
op_summary("fs/readFile", &conn),
|
||||||
|
];
|
||||||
|
let bundles = build_bundles(discovered, &None, &None).expect("distinct names ok");
|
||||||
|
assert_eq!(bundles.len(), 3);
|
||||||
|
for b in &bundles {
|
||||||
|
assert_eq!(b.provenance, OperationProvenance::FromCall);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_bundles_applies_namespace_prefix_without_collision() {
|
||||||
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
let discovered = vec![op_summary("exec", &conn), op_summary("status", &conn)];
|
||||||
|
let bundles =
|
||||||
|
build_bundles(discovered, &Some("worker".to_string()), &None).expect("prefixed ok");
|
||||||
|
assert_eq!(bundles[0].spec.name, "worker/exec");
|
||||||
|
assert_eq!(bundles[1].spec.name, "worker/status");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_bundles_respects_operation_filter() {
|
||||||
|
let conn = CallConnection::new(stub_connection());
|
||||||
|
let discovered = vec![
|
||||||
|
op_summary("worker/exec", &conn),
|
||||||
|
op_summary("worker/status", &conn),
|
||||||
|
op_summary("fs/readFile", &conn),
|
||||||
|
];
|
||||||
|
let filter: HashSet<String> = HashSet::from(["worker/exec".to_string()]);
|
||||||
|
let bundles = build_bundles(discovered, &None, &Some(filter)).expect("filtered ok");
|
||||||
|
assert_eq!(bundles.len(), 1);
|
||||||
|
assert_eq!(bundles[0].spec.name, "worker/exec");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,9 +42,12 @@ pub enum AdapterError {
|
|||||||
#[error("unauthorized: {message}")]
|
#[error("unauthorized: {message}")]
|
||||||
Unauthorized { message: String },
|
Unauthorized { message: String },
|
||||||
|
|
||||||
/// Namespace collision in `from_call` (DC-3); reused for other adapters.
|
/// Same-peer namespace collision in `from_call` (ADR-029 §5; OQ-26).
|
||||||
#[error("conflict: {message}")]
|
/// Cross-peer collision dissolves (same name on different peers lives in
|
||||||
Conflict { message: String },
|
/// separate sub-overlays); same-peer collision stays an error — a peer
|
||||||
|
/// shouldn't expose two ops with the same name.
|
||||||
|
#[error("same-peer collision: {message}")]
|
||||||
|
SamePeerCollision { message: String },
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Import a set of operations as `HandlerRegistration` bundles.
|
/// Import a set of operations as `HandlerRegistration` bundles.
|
||||||
|
|||||||
@@ -83,11 +83,19 @@ impl CallConnection {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn call(&self, operation_id: &str, input: Value) -> ResponseEnvelope {
|
pub async fn call(&self, operation_id: &str, input: Value) -> ResponseEnvelope {
|
||||||
let request_id = generate_request_id();
|
|
||||||
let payload = serde_json::json!({
|
let payload = serde_json::json!({
|
||||||
"operationId": operation_id,
|
"operationId": operation_id,
|
||||||
"input": input,
|
"input": input,
|
||||||
});
|
});
|
||||||
|
self.call_with_payload(payload).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Invoke a remote op with a caller-constructed `call.requested` payload.
|
||||||
|
/// The payload MUST include `operationId` and `input`; the caller may add
|
||||||
|
/// `forwarded_for` (ADR-032) and `auth_token` (ADR-017 §7) for the hub
|
||||||
|
/// forwarding path used by `from_call`.
|
||||||
|
pub async fn call_with_payload(&self, payload: Value) -> ResponseEnvelope {
|
||||||
|
let request_id = generate_request_id();
|
||||||
|
|
||||||
let (send, recv) = match self.connection.open_bi().await {
|
let (send, recv) = match self.connection.open_bi().await {
|
||||||
Ok(pair) => pair,
|
Ok(pair) => pair,
|
||||||
|
|||||||
Reference in New Issue
Block a user