2 Commits

4 changed files with 411 additions and 2 deletions

View File

@@ -8,6 +8,7 @@ use super::spec::{AccessControl, OperationSpec, OperationType, Visibility};
use crate::protocol::wire::{CallError, ResponseEnvelope};
const NAME_SERVICES_LIST: &str = "services/list";
const NAME_SERVICES_LIST_PEERS: &str = "services/list-peers";
const NAME_SERVICES_SCHEMA: &str = "services/schema";
pub fn services_list_spec() -> OperationSpec {
@@ -56,6 +57,45 @@ pub fn services_schema_spec() -> OperationSpec {
)
}
pub fn services_list_peers_spec() -> OperationSpec {
OperationSpec::new(
NAME_SERVICES_LIST_PEERS,
OperationType::Query,
Visibility::External,
json!({}),
json!({
"type": "object",
"properties": {
"peers": {
"type": "array",
"items": {
"type": "object",
"properties": {
"peer_id": { "type": "string" },
"operations": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": { "type": "string" },
"namespace": { "type": "string" },
"op_type": {
"type": "string",
"enum": ["query", "mutation", "subscription"]
}
}
}
}
}
}
}
}
}),
vec![],
AccessControl::default(),
)
}
fn operation_spec_schema() -> Value {
json!({
"type": "object",
@@ -177,9 +217,11 @@ pub fn services_list_handler(registry: Arc<OperationRegistry>) -> Handler {
let registry = Arc::clone(&registry);
Box::pin(async move {
let _ = input;
let calling_identity = ctx.identity.as_ref();
let ops: Vec<Value> = registry
.list_operations()
.into_iter()
.filter(|spec| spec.access_control.check(calling_identity).is_allowed())
.map(|s| {
json!({
"name": s.name,
@@ -193,6 +235,67 @@ pub fn services_list_handler(registry: Arc<OperationRegistry>) -> Handler {
})
}
pub fn services_list_peers_handler(registry: Arc<OperationRegistry>) -> Handler {
Arc::new(move |input: Value, ctx: OperationContext| {
let registry = Arc::clone(&registry);
Box::pin(async move {
let _ = input;
let calling_identity = ctx.identity.as_ref();
let local_ops: Vec<Value> = registry
.list_operations()
.into_iter()
.filter(|spec| spec.access_control.check(calling_identity).is_allowed())
.map(|s| {
json!({
"name": s.name,
"namespace": s.namespace,
"op_type": op_type_str(s.op_type),
})
})
.collect();
let mut peers: Vec<Value> = Vec::new();
if !local_ops.is_empty() {
peers.push(json!({ "peer_id": "local", "operations": local_ops }));
}
for peer_id in ctx.env.peer_ids() {
let peer_ops: Vec<Value> = ctx
.env
.peer_operations(&peer_id)
.into_iter()
.filter(|name| {
let spec = registry.registration(name);
match spec {
Some(reg) => {
reg.spec.access_control.check(calling_identity).is_allowed()
}
None => true,
}
})
.map(name_to_listing_json)
.collect();
if !peer_ops.is_empty() {
peers.push(json!({ "peer_id": peer_id, "operations": peer_ops }));
}
}
ResponseEnvelope::ok(ctx.request_id, json!({ "peers": peers }))
})
})
}
fn name_to_listing_json(name: String) -> Value {
let namespace = name
.split('/')
.next()
.filter(|s| !s.is_empty())
.unwrap_or("")
.to_string();
json!({
"name": name,
"namespace": namespace,
"op_type": "query",
})
}
pub fn services_schema_handler(registry: Arc<OperationRegistry>) -> Handler {
Arc::new(move |input: Value, ctx: OperationContext| {
let registry = Arc::clone(&registry);
@@ -293,6 +396,91 @@ mod tests {
}
}
fn root_context_with_identity(
request_id: &str,
identity: Option<alknet_core::auth::Identity>,
) -> OperationContext {
OperationContext {
request_id: request_id.to_string(),
parent_request_id: None,
identity,
handler_identity: None,
capabilities: Capabilities::new(),
metadata: HashMap::new(),
scoped_env: ScopedOperationEnv::empty(),
env: noop_env(),
abort_policy: crate::registry::context::AbortPolicy::default(),
deadline: Some(std::time::Instant::now() + Duration::from_secs(30)),
internal: false,
}
}
fn identity_with_scopes(id: &str, scopes: &[&str]) -> alknet_core::auth::Identity {
alknet_core::auth::Identity {
id: id.to_string(),
scopes: scopes.iter().map(|s| s.to_string()).collect(),
resources: HashMap::new(),
}
}
fn external_spec_with_acl(name: &str, acl: AccessControl) -> OperationSpec {
OperationSpec::new(
name,
OperationType::Query,
Visibility::External,
json!({}),
json!({}),
vec![],
acl,
)
}
fn registry_with_access_controlled_ops() -> Arc<OperationRegistry> {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
external_spec_with_acl("public/echo", AccessControl::default()),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
registry.register(HandlerRegistration::new(
external_spec_with_acl(
"admin/secret",
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
registry.register(HandlerRegistration::new(
internal_spec("internal/hidden"),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
Arc::new(registry)
}
fn op_names(response: ResponseEnvelope) -> Vec<String> {
let output = response.result.expect("ok response");
output
.get("operations")
.and_then(|v| v.as_array())
.expect("operations array")
.iter()
.filter_map(|o| o.get("name").and_then(|n| n.as_str()).map(String::from))
.collect()
}
fn registry_with_ops() -> Arc<OperationRegistry> {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
@@ -554,4 +742,207 @@ mod tests {
let acl = json_val.get("access_control").expect("access_control");
assert_eq!(acl.get("required_scopes"), Some(&json!(["fs:read"])));
}
#[tokio::test]
async fn services_list_filters_by_access_control_authorized_peer() {
let registry = registry_with_access_controlled_ops();
let handler = services_list_handler(Arc::clone(&registry));
let ctx = root_context_with_identity(
"req-acl-1",
Some(identity_with_scopes("admin-peer", &["admin"])),
);
let names = op_names(handler(serde_json::json!({}), ctx).await);
assert!(names.contains(&"public/echo".to_string()));
assert!(names.contains(&"admin/secret".to_string()));
assert!(!names.contains(&"internal/hidden".to_string()));
}
#[tokio::test]
async fn services_list_filters_by_access_control_unauthorized_peer() {
let registry = registry_with_access_controlled_ops();
let handler = services_list_handler(Arc::clone(&registry));
let ctx = root_context_with_identity(
"req-acl-2",
Some(identity_with_scopes("regular-peer", &["user"])),
);
let names = op_names(handler(serde_json::json!({}), ctx).await);
assert!(names.contains(&"public/echo".to_string()));
assert!(
!names.contains(&"admin/secret".to_string()),
"unauthorized peer must not see admin/secret"
);
assert!(!names.contains(&"internal/hidden".to_string()));
}
#[tokio::test]
async fn services_list_op_with_default_acl_listed_to_any_peer() {
let registry = registry_with_access_controlled_ops();
let handler = services_list_handler(Arc::clone(&registry));
let ctx = root_context_with_identity("req-acl-3", None);
let names = op_names(handler(serde_json::json!({}), ctx).await);
assert!(
names.contains(&"public/echo".to_string()),
"default AccessControl op must be listed to unauthenticated peer"
);
assert!(!names.contains(&"admin/secret".to_string()));
}
#[tokio::test]
async fn services_list_peers_attributes_ops_by_peer_id() {
struct PeerEnv {
peers: HashMap<String, Vec<String>>,
}
#[async_trait::async_trait]
impl crate::registry::env::OperationEnv for PeerEnv {
async fn invoke_with_policy(
&self,
_ns: &str,
_op: &str,
_input: Value,
parent: &OperationContext,
_policy: crate::registry::context::AbortPolicy,
) -> ResponseEnvelope {
ResponseEnvelope::ok(parent.request_id.clone(), json!({}))
}
fn contains(&self, _name: &str) -> bool {
false
}
fn peer_ids(&self) -> Vec<crate::registry::env::PeerId> {
self.peers.keys().cloned().collect()
}
fn peer_operations(&self, peer: &crate::registry::env::PeerId) -> Vec<String> {
self.peers.get(peer).cloned().unwrap_or_default()
}
}
let mut peers = HashMap::new();
peers.insert(
"worker-a".to_string(),
vec!["container/exec".to_string(), "container/logs".to_string()],
);
peers.insert("worker-b".to_string(), vec!["container/exec".to_string()]);
let env: Arc<dyn crate::registry::env::OperationEnv + Send + Sync> =
Arc::new(PeerEnv { peers });
let registry = registry_with_access_controlled_ops();
let handler = services_list_peers_handler(Arc::clone(&registry));
let ctx = OperationContext {
request_id: "req-peers-1".to_string(),
parent_request_id: None,
identity: None,
handler_identity: None,
capabilities: Capabilities::new(),
metadata: HashMap::new(),
scoped_env: ScopedOperationEnv::empty(),
env,
abort_policy: crate::registry::context::AbortPolicy::default(),
deadline: Some(std::time::Instant::now() + Duration::from_secs(30)),
internal: false,
};
let response = handler(serde_json::json!({}), ctx).await;
let output = response.result.expect("ok response");
let peers_arr = output
.get("peers")
.and_then(|v| v.as_array())
.expect("peers array");
let peer_ids: Vec<&str> = peers_arr
.iter()
.filter_map(|p| p.get("peer_id").and_then(|v| v.as_str()))
.collect();
assert!(peer_ids.contains(&"local"));
assert!(peer_ids.contains(&"worker-a"));
assert!(peer_ids.contains(&"worker-b"));
let worker_a = peers_arr
.iter()
.find(|p| p.get("peer_id").and_then(|v| v.as_str()) == Some("worker-a"))
.expect("worker-a present");
let worker_a_ops = worker_a
.get("operations")
.and_then(|v| v.as_array())
.expect("worker-a operations");
let worker_a_names: Vec<&str> = worker_a_ops
.iter()
.filter_map(|o| o.get("name").and_then(|n| n.as_str()))
.collect();
assert!(worker_a_names.contains(&"container/exec"));
assert!(worker_a_names.contains(&"container/logs"));
}
#[test]
fn services_list_peers_spec_has_correct_fields() {
let spec = services_list_peers_spec();
assert_eq!(spec.name, NAME_SERVICES_LIST_PEERS);
assert_eq!(spec.namespace, "services");
assert_eq!(spec.op_type, OperationType::Query);
assert_eq!(spec.visibility, Visibility::External);
assert!(spec.error_schemas.is_empty());
assert!(!spec.access_control.has_restrictions());
}
#[tokio::test]
async fn services_list_peers_filters_by_access_control() {
struct PeerEnv;
#[async_trait::async_trait]
impl crate::registry::env::OperationEnv for PeerEnv {
async fn invoke_with_policy(
&self,
_ns: &str,
_op: &str,
_input: Value,
parent: &OperationContext,
_policy: crate::registry::context::AbortPolicy,
) -> ResponseEnvelope {
ResponseEnvelope::ok(parent.request_id.clone(), json!({}))
}
fn contains(&self, _name: &str) -> bool {
false
}
fn peer_ids(&self) -> Vec<crate::registry::env::PeerId> {
vec!["restricted-peer".to_string()]
}
fn peer_operations(&self, _peer: &crate::registry::env::PeerId) -> Vec<String> {
vec!["admin/secret".to_string(), "public/echo".to_string()]
}
}
let registry = registry_with_access_controlled_ops();
let handler = services_list_peers_handler(Arc::clone(&registry));
let env: Arc<dyn crate::registry::env::OperationEnv + Send + Sync> = Arc::new(PeerEnv);
let ctx = OperationContext {
request_id: "req-peers-2".to_string(),
parent_request_id: None,
identity: Some(identity_with_scopes("regular-peer", &["user"])),
handler_identity: None,
capabilities: Capabilities::new(),
metadata: HashMap::new(),
scoped_env: ScopedOperationEnv::empty(),
env,
abort_policy: crate::registry::context::AbortPolicy::default(),
deadline: Some(std::time::Instant::now() + Duration::from_secs(30)),
internal: false,
};
let response = handler(serde_json::json!({}), ctx).await;
let output = response.result.expect("ok response");
let peers_arr = output
.get("peers")
.and_then(|v| v.as_array())
.expect("peers array");
let restricted = peers_arr
.iter()
.find(|p| p.get("peer_id").and_then(|v| v.as_str()) == Some("restricted-peer"))
.expect("restricted-peer present");
let ops = restricted
.get("operations")
.and_then(|v| v.as_array())
.expect("operations");
let names: Vec<&str> = ops
.iter()
.filter_map(|o| o.get("name").and_then(|n| n.as_str()))
.collect();
assert!(names.contains(&"public/echo"));
assert!(
!names.contains(&"admin/secret"),
"unauthorized peer must not see admin op in list-peers"
);
}
}

View File

@@ -38,6 +38,18 @@ pub trait OperationEnv: Send + Sync {
fn contains(&self, _name: &str) -> bool {
true
}
fn peer_ids(&self) -> Vec<PeerId> {
Vec::new()
}
fn peer_contains(&self, _peer: &PeerId, name: &str) -> bool {
self.contains(name)
}
fn peer_operations(&self, _peer: &PeerId) -> Vec<String> {
Vec::new()
}
}
pub struct LocalOperationEnv {

View File

@@ -42,6 +42,12 @@ pub enum AccessResult {
Forbidden(String),
}
impl AccessResult {
pub fn is_allowed(&self) -> bool {
matches!(self, AccessResult::Allowed)
}
}
impl AccessControl {
pub fn has_restrictions(&self) -> bool {
!self.required_scopes.is_empty()

View File

@@ -1,7 +1,7 @@
---
id: call/services-list-accesscontrol-filtered
name: Filter services/list by AccessControl::check(peer_identity) and add services/list-peers opt-in (ADR-029 §6)
status: pending
status: completed
depends_on: [call/retire-remote-safe]
scope: narrow
risk: medium
@@ -119,4 +119,4 @@ with `peer_id` attribution, and filters by the calling peer's authorization.
## Summary
> To be filled on completion
Filtered services/list by AccessControl::check(ctx.identity) (op with default ACL listed to any peer; op with required_scopes hidden from unauthorized peers; Internal ops still excluded by list_operations). Added services/list-peers opt-in op that attributes ops by peer_id via context.env.peer_ids()/peer_operations() and filters by the calling peer's authorization. Added PeerId type + peer_ids/peer_contains/peer_operations default-impls to OperationEnv trait (work as stubs until PeerCompositeEnv is wired). Added AccessResult::is_allowed() helper. 6 new unit tests cover authorized/unauthorized/default-ACL filtering and list-peers peer_id attribution + ACL filtering. 205 unit + 2 integration tests pass, clippy clean, fmt clean.