From 877c9232449b5aafaa0d416e6d0bffb0344fd64b Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Sun, 28 Jun 2026 22:02:09 +0000 Subject: [PATCH] feat(call): filter services/list by AccessControl and add services/list-peers opt-in (call/services-list-accesscontrol-filtered) --- crates/alknet-call/src/registry/discovery.rs | 391 +++++++++++++++++++ crates/alknet-call/src/registry/env.rs | 12 + crates/alknet-call/src/registry/spec.rs | 6 + 3 files changed, 409 insertions(+) diff --git a/crates/alknet-call/src/registry/discovery.rs b/crates/alknet-call/src/registry/discovery.rs index e8b25da..8182608 100644 --- a/crates/alknet-call/src/registry/discovery.rs +++ b/crates/alknet-call/src/registry/discovery.rs @@ -8,6 +8,7 @@ use super::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use crate::protocol::wire::{CallError, ResponseEnvelope}; const NAME_SERVICES_LIST: &str = "services/list"; +const NAME_SERVICES_LIST_PEERS: &str = "services/list-peers"; const NAME_SERVICES_SCHEMA: &str = "services/schema"; pub fn services_list_spec() -> OperationSpec { @@ -56,6 +57,45 @@ pub fn services_schema_spec() -> OperationSpec { ) } +pub fn services_list_peers_spec() -> OperationSpec { + OperationSpec::new( + NAME_SERVICES_LIST_PEERS, + OperationType::Query, + Visibility::External, + json!({}), + json!({ + "type": "object", + "properties": { + "peers": { + "type": "array", + "items": { + "type": "object", + "properties": { + "peer_id": { "type": "string" }, + "operations": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { "type": "string" }, + "namespace": { "type": "string" }, + "op_type": { + "type": "string", + "enum": ["query", "mutation", "subscription"] + } + } + } + } + } + } + } + } + }), + vec![], + AccessControl::default(), + ) +} + fn operation_spec_schema() -> Value { json!({ "type": "object", @@ -177,9 +217,11 @@ pub fn services_list_handler(registry: Arc) -> Handler { let registry = Arc::clone(®istry); Box::pin(async move { let _ = input; + let calling_identity = ctx.identity.as_ref(); let ops: Vec = registry .list_operations() .into_iter() + .filter(|spec| spec.access_control.check(calling_identity).is_allowed()) .map(|s| { json!({ "name": s.name, @@ -193,6 +235,67 @@ pub fn services_list_handler(registry: Arc) -> Handler { }) } +pub fn services_list_peers_handler(registry: Arc) -> Handler { + Arc::new(move |input: Value, ctx: OperationContext| { + let registry = Arc::clone(®istry); + Box::pin(async move { + let _ = input; + let calling_identity = ctx.identity.as_ref(); + let local_ops: Vec = registry + .list_operations() + .into_iter() + .filter(|spec| spec.access_control.check(calling_identity).is_allowed()) + .map(|s| { + json!({ + "name": s.name, + "namespace": s.namespace, + "op_type": op_type_str(s.op_type), + }) + }) + .collect(); + let mut peers: Vec = Vec::new(); + if !local_ops.is_empty() { + peers.push(json!({ "peer_id": "local", "operations": local_ops })); + } + for peer_id in ctx.env.peer_ids() { + let peer_ops: Vec = ctx + .env + .peer_operations(&peer_id) + .into_iter() + .filter(|name| { + let spec = registry.registration(name); + match spec { + Some(reg) => { + reg.spec.access_control.check(calling_identity).is_allowed() + } + None => true, + } + }) + .map(name_to_listing_json) + .collect(); + if !peer_ops.is_empty() { + peers.push(json!({ "peer_id": peer_id, "operations": peer_ops })); + } + } + ResponseEnvelope::ok(ctx.request_id, json!({ "peers": peers })) + }) + }) +} + +fn name_to_listing_json(name: String) -> Value { + let namespace = name + .split('/') + .next() + .filter(|s| !s.is_empty()) + .unwrap_or("") + .to_string(); + json!({ + "name": name, + "namespace": namespace, + "op_type": "query", + }) +} + pub fn services_schema_handler(registry: Arc) -> Handler { Arc::new(move |input: Value, ctx: OperationContext| { let registry = Arc::clone(®istry); @@ -293,6 +396,91 @@ mod tests { } } + fn root_context_with_identity( + request_id: &str, + identity: Option, + ) -> OperationContext { + OperationContext { + request_id: request_id.to_string(), + parent_request_id: None, + identity, + handler_identity: None, + capabilities: Capabilities::new(), + metadata: HashMap::new(), + scoped_env: ScopedOperationEnv::empty(), + env: noop_env(), + abort_policy: crate::registry::context::AbortPolicy::default(), + deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), + internal: false, + } + } + + fn identity_with_scopes(id: &str, scopes: &[&str]) -> alknet_core::auth::Identity { + alknet_core::auth::Identity { + id: id.to_string(), + scopes: scopes.iter().map(|s| s.to_string()).collect(), + resources: HashMap::new(), + } + } + + fn external_spec_with_acl(name: &str, acl: AccessControl) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Query, + Visibility::External, + json!({}), + json!({}), + vec![], + acl, + ) + } + + fn registry_with_access_controlled_ops() -> Arc { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + external_spec_with_acl("public/echo", AccessControl::default()), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + registry.register(HandlerRegistration::new( + external_spec_with_acl( + "admin/secret", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + registry.register(HandlerRegistration::new( + internal_spec("internal/hidden"), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + Arc::new(registry) + } + + fn op_names(response: ResponseEnvelope) -> Vec { + let output = response.result.expect("ok response"); + output + .get("operations") + .and_then(|v| v.as_array()) + .expect("operations array") + .iter() + .filter_map(|o| o.get("name").and_then(|n| n.as_str()).map(String::from)) + .collect() + } + fn registry_with_ops() -> Arc { let mut registry = OperationRegistry::new(); registry.register(HandlerRegistration::new( @@ -554,4 +742,207 @@ mod tests { let acl = json_val.get("access_control").expect("access_control"); assert_eq!(acl.get("required_scopes"), Some(&json!(["fs:read"]))); } + + #[tokio::test] + async fn services_list_filters_by_access_control_authorized_peer() { + let registry = registry_with_access_controlled_ops(); + let handler = services_list_handler(Arc::clone(®istry)); + let ctx = root_context_with_identity( + "req-acl-1", + Some(identity_with_scopes("admin-peer", &["admin"])), + ); + let names = op_names(handler(serde_json::json!({}), ctx).await); + assert!(names.contains(&"public/echo".to_string())); + assert!(names.contains(&"admin/secret".to_string())); + assert!(!names.contains(&"internal/hidden".to_string())); + } + + #[tokio::test] + async fn services_list_filters_by_access_control_unauthorized_peer() { + let registry = registry_with_access_controlled_ops(); + let handler = services_list_handler(Arc::clone(®istry)); + let ctx = root_context_with_identity( + "req-acl-2", + Some(identity_with_scopes("regular-peer", &["user"])), + ); + let names = op_names(handler(serde_json::json!({}), ctx).await); + assert!(names.contains(&"public/echo".to_string())); + assert!( + !names.contains(&"admin/secret".to_string()), + "unauthorized peer must not see admin/secret" + ); + assert!(!names.contains(&"internal/hidden".to_string())); + } + + #[tokio::test] + async fn services_list_op_with_default_acl_listed_to_any_peer() { + let registry = registry_with_access_controlled_ops(); + let handler = services_list_handler(Arc::clone(®istry)); + let ctx = root_context_with_identity("req-acl-3", None); + let names = op_names(handler(serde_json::json!({}), ctx).await); + assert!( + names.contains(&"public/echo".to_string()), + "default AccessControl op must be listed to unauthenticated peer" + ); + assert!(!names.contains(&"admin/secret".to_string())); + } + + #[tokio::test] + async fn services_list_peers_attributes_ops_by_peer_id() { + struct PeerEnv { + peers: HashMap>, + } + #[async_trait::async_trait] + impl crate::registry::env::OperationEnv for PeerEnv { + async fn invoke_with_policy( + &self, + _ns: &str, + _op: &str, + _input: Value, + parent: &OperationContext, + _policy: crate::registry::context::AbortPolicy, + ) -> ResponseEnvelope { + ResponseEnvelope::ok(parent.request_id.clone(), json!({})) + } + fn contains(&self, _name: &str) -> bool { + false + } + fn peer_ids(&self) -> Vec { + self.peers.keys().cloned().collect() + } + fn peer_operations(&self, peer: &crate::registry::env::PeerId) -> Vec { + self.peers.get(peer).cloned().unwrap_or_default() + } + } + + let mut peers = HashMap::new(); + peers.insert( + "worker-a".to_string(), + vec!["container/exec".to_string(), "container/logs".to_string()], + ); + peers.insert("worker-b".to_string(), vec!["container/exec".to_string()]); + let env: Arc = + Arc::new(PeerEnv { peers }); + + let registry = registry_with_access_controlled_ops(); + let handler = services_list_peers_handler(Arc::clone(®istry)); + let ctx = OperationContext { + request_id: "req-peers-1".to_string(), + parent_request_id: None, + identity: None, + handler_identity: None, + capabilities: Capabilities::new(), + metadata: HashMap::new(), + scoped_env: ScopedOperationEnv::empty(), + env, + abort_policy: crate::registry::context::AbortPolicy::default(), + deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), + internal: false, + }; + let response = handler(serde_json::json!({}), ctx).await; + let output = response.result.expect("ok response"); + let peers_arr = output + .get("peers") + .and_then(|v| v.as_array()) + .expect("peers array"); + let peer_ids: Vec<&str> = peers_arr + .iter() + .filter_map(|p| p.get("peer_id").and_then(|v| v.as_str())) + .collect(); + assert!(peer_ids.contains(&"local")); + assert!(peer_ids.contains(&"worker-a")); + assert!(peer_ids.contains(&"worker-b")); + let worker_a = peers_arr + .iter() + .find(|p| p.get("peer_id").and_then(|v| v.as_str()) == Some("worker-a")) + .expect("worker-a present"); + let worker_a_ops = worker_a + .get("operations") + .and_then(|v| v.as_array()) + .expect("worker-a operations"); + let worker_a_names: Vec<&str> = worker_a_ops + .iter() + .filter_map(|o| o.get("name").and_then(|n| n.as_str())) + .collect(); + assert!(worker_a_names.contains(&"container/exec")); + assert!(worker_a_names.contains(&"container/logs")); + } + + #[test] + fn services_list_peers_spec_has_correct_fields() { + let spec = services_list_peers_spec(); + assert_eq!(spec.name, NAME_SERVICES_LIST_PEERS); + assert_eq!(spec.namespace, "services"); + assert_eq!(spec.op_type, OperationType::Query); + assert_eq!(spec.visibility, Visibility::External); + assert!(spec.error_schemas.is_empty()); + assert!(!spec.access_control.has_restrictions()); + } + + #[tokio::test] + async fn services_list_peers_filters_by_access_control() { + struct PeerEnv; + #[async_trait::async_trait] + impl crate::registry::env::OperationEnv for PeerEnv { + async fn invoke_with_policy( + &self, + _ns: &str, + _op: &str, + _input: Value, + parent: &OperationContext, + _policy: crate::registry::context::AbortPolicy, + ) -> ResponseEnvelope { + ResponseEnvelope::ok(parent.request_id.clone(), json!({})) + } + fn contains(&self, _name: &str) -> bool { + false + } + fn peer_ids(&self) -> Vec { + vec!["restricted-peer".to_string()] + } + fn peer_operations(&self, _peer: &crate::registry::env::PeerId) -> Vec { + vec!["admin/secret".to_string(), "public/echo".to_string()] + } + } + + let registry = registry_with_access_controlled_ops(); + let handler = services_list_peers_handler(Arc::clone(®istry)); + let env: Arc = Arc::new(PeerEnv); + let ctx = OperationContext { + request_id: "req-peers-2".to_string(), + parent_request_id: None, + identity: Some(identity_with_scopes("regular-peer", &["user"])), + handler_identity: None, + capabilities: Capabilities::new(), + metadata: HashMap::new(), + scoped_env: ScopedOperationEnv::empty(), + env, + abort_policy: crate::registry::context::AbortPolicy::default(), + deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), + internal: false, + }; + let response = handler(serde_json::json!({}), ctx).await; + let output = response.result.expect("ok response"); + let peers_arr = output + .get("peers") + .and_then(|v| v.as_array()) + .expect("peers array"); + let restricted = peers_arr + .iter() + .find(|p| p.get("peer_id").and_then(|v| v.as_str()) == Some("restricted-peer")) + .expect("restricted-peer present"); + let ops = restricted + .get("operations") + .and_then(|v| v.as_array()) + .expect("operations"); + let names: Vec<&str> = ops + .iter() + .filter_map(|o| o.get("name").and_then(|n| n.as_str())) + .collect(); + assert!(names.contains(&"public/echo")); + assert!( + !names.contains(&"admin/secret"), + "unauthorized peer must not see admin op in list-peers" + ); + } } diff --git a/crates/alknet-call/src/registry/env.rs b/crates/alknet-call/src/registry/env.rs index bbacc8a..2b9ca4a 100644 --- a/crates/alknet-call/src/registry/env.rs +++ b/crates/alknet-call/src/registry/env.rs @@ -38,6 +38,18 @@ pub trait OperationEnv: Send + Sync { fn contains(&self, _name: &str) -> bool { true } + + fn peer_ids(&self) -> Vec { + Vec::new() + } + + fn peer_contains(&self, _peer: &PeerId, name: &str) -> bool { + self.contains(name) + } + + fn peer_operations(&self, _peer: &PeerId) -> Vec { + Vec::new() + } } pub struct LocalOperationEnv { diff --git a/crates/alknet-call/src/registry/spec.rs b/crates/alknet-call/src/registry/spec.rs index ca11d96..316c1b6 100644 --- a/crates/alknet-call/src/registry/spec.rs +++ b/crates/alknet-call/src/registry/spec.rs @@ -42,6 +42,12 @@ pub enum AccessResult { Forbidden(String), } +impl AccessResult { + pub fn is_allowed(&self) -> bool { + matches!(self, AccessResult::Allowed) + } +} + impl AccessControl { pub fn has_restrictions(&self) -> bool { !self.required_scopes.is_empty()