From 7345ef5442fb85e4826a9f6b322ecb5d64650f67 Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Tue, 23 Jun 2026 14:40:13 +0000 Subject: [PATCH] Implement handler registration and operation registry Implements the dispatch core for the call protocol per ADR-022 and ADR-024: - Handler async closure type alias returning ResponseEnvelope - HandlerRegistration bundle: spec, handler, provenance, composition authority, scoped env, capabilities - OperationProvenance enum with all 6 variants (Local, FromOpenAPI, FromMCP, FromCall, FromJsonSchema, Session) - OperationRegistry with register/registration/invoke/list_operations - invoke flow: visibility check (Internal from wire -> NOT_FOUND), ACL with authority switch (internal: true -> handler_identity, internal: false -> caller identity), handler dispatch - OperationRegistryBuilder with new/with_local/with_leaf/with_leaf_provenance/with/build - make_handler helper for boxing async handlers - 21 unit tests covering invoke, visibility, ACL authority switch, builder provenance, and lookup behavior --- .../alknet-call/src/registry/registration.rs | 740 +++++++++++++++++- 1 file changed, 733 insertions(+), 7 deletions(-) diff --git a/crates/alknet-call/src/registry/registration.rs b/crates/alknet-call/src/registry/registration.rs index c7cc84d..c6e00d5 100644 --- a/crates/alknet-call/src/registry/registration.rs +++ b/crates/alknet-call/src/registry/registration.rs @@ -1,8 +1,734 @@ -//! Handler registration: `Handler`, `HandlerRegistration`, -//! `OperationProvenance`, `OperationRegistry`, and -//! `OperationRegistryBuilder`. -//! -//! See `docs/architecture/crates/call/operation-registry.md` for the full -//! specification. +use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; -// TODO: implement +use alknet_core::types::Capabilities; +use serde_json::Value; + +use super::context::{CompositionAuthority, OperationContext, ScopedOperationEnv}; +use super::spec::{AccessResult, OperationSpec, Visibility}; +use crate::protocol::wire::ResponseEnvelope; + +pub type Handler = Arc< + dyn Fn(Value, OperationContext) -> Pin + Send>> + + Send + + Sync, +>; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum OperationProvenance { + Local, + FromOpenAPI, + FromMCP, + FromCall, + FromJsonSchema, + Session, +} + +pub struct HandlerRegistration { + pub spec: OperationSpec, + pub handler: Handler, + pub provenance: OperationProvenance, + pub composition_authority: Option, + pub scoped_env: Option, + pub capabilities: Capabilities, +} + +impl HandlerRegistration { + pub fn new( + spec: OperationSpec, + handler: Handler, + provenance: OperationProvenance, + composition_authority: Option, + scoped_env: Option, + capabilities: Capabilities, + ) -> Self { + Self { + spec, + handler, + provenance, + composition_authority, + scoped_env, + capabilities, + } + } +} + +pub struct OperationRegistry { + operations: HashMap, +} + +impl OperationRegistry { + pub fn new() -> Self { + Self { + operations: HashMap::new(), + } + } + + pub fn register(&mut self, registration: HandlerRegistration) { + self.operations + .insert(registration.spec.name.clone(), registration); + } + + pub fn registration(&self, name: &str) -> Option<&HandlerRegistration> { + self.operations.get(name) + } + + pub fn list_operations(&self) -> Vec<&OperationSpec> { + self.operations + .values() + .filter(|r| r.spec.visibility == Visibility::External) + .map(|r| &r.spec) + .collect() + } + + pub async fn invoke( + &self, + name: &str, + input: Value, + context: OperationContext, + ) -> ResponseEnvelope { + let request_id = context.request_id.clone(); + let registration = match self.operations.get(name) { + Some(r) => r, + None => return ResponseEnvelope::not_found(request_id, name), + }; + + if registration.spec.visibility == Visibility::Internal && !context.internal { + return ResponseEnvelope::not_found(request_id, name); + } + + let acl = ®istration.spec.access_control; + let identity = if context.internal { + context + .handler_identity + .as_ref() + .and_then(|ca| ca.as_identity()) + } else { + context.identity.clone() + }; + + if let AccessResult::Forbidden(message) = acl.check(identity.as_ref()) { + return ResponseEnvelope::forbidden(request_id, message); + } + + let handler = Arc::clone(®istration.handler); + (handler)(input, context).await + } +} + +impl Default for OperationRegistry { + fn default() -> Self { + Self::new() + } +} + +pub struct OperationRegistryBuilder { + operations: HashMap, +} + +impl OperationRegistryBuilder { + pub fn new() -> Self { + Self { + operations: HashMap::new(), + } + } + + pub fn with_local( + mut self, + spec: OperationSpec, + handler: Handler, + composition_authority: Option, + scoped_env: Option, + capabilities: Capabilities, + ) -> Self { + let registration = HandlerRegistration::new( + spec, + handler, + OperationProvenance::Local, + composition_authority, + scoped_env, + capabilities, + ); + self.operations + .insert(registration.spec.name.clone(), registration); + self + } + + pub fn with_leaf( + self, + spec: OperationSpec, + handler: Handler, + capabilities: Capabilities, + ) -> Self { + self.with_leaf_provenance( + spec, + handler, + OperationProvenance::FromOpenAPI, + capabilities, + ) + } + + pub fn with_leaf_provenance( + self, + spec: OperationSpec, + handler: Handler, + provenance: OperationProvenance, + capabilities: Capabilities, + ) -> Self { + let registration = + HandlerRegistration::new(spec, handler, provenance, None, None, capabilities); + let mut this = self; + this.operations + .insert(registration.spec.name.clone(), registration); + this + } + + pub fn with(mut self, registration: HandlerRegistration) -> Self { + self.operations + .insert(registration.spec.name.clone(), registration); + self + } + + pub fn build(self) -> OperationRegistry { + OperationRegistry { + operations: self.operations, + } + } +} + +impl Default for OperationRegistryBuilder { + fn default() -> Self { + Self::new() + } +} + +pub fn make_handler(f: F) -> Handler +where + F: Fn(Value, OperationContext) -> Fut + Send + Sync + 'static, + Fut: Future + Send + 'static, +{ + Arc::new(move |input, context| Box::pin(f(input, context))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::protocol::wire::CallError; + use crate::registry::context::AbortPolicy; + use crate::registry::env::OperationEnv; + use crate::registry::spec::{AccessControl, OperationType}; + use alknet_core::auth::Identity; + use std::collections::HashMap; + use std::time::Duration; + + struct NoopEnv; + + #[async_trait::async_trait] + impl OperationEnv for NoopEnv { + async fn invoke_with_policy( + &self, + _namespace: &str, + _operation: &str, + _input: Value, + _parent: &OperationContext, + _policy: AbortPolicy, + ) -> ResponseEnvelope { + ResponseEnvelope::error("test", CallError::internal("noop env does not dispatch")) + } + + fn contains(&self, _name: &str) -> bool { + false + } + } + + fn root_context( + request_id: &str, + identity: Option, + handler_identity: Option, + internal: bool, + scoped_env: ScopedOperationEnv, + ) -> OperationContext { + OperationContext { + request_id: request_id.to_string(), + parent_request_id: None, + identity, + handler_identity, + capabilities: Capabilities::new(), + metadata: HashMap::new(), + scoped_env, + env: Arc::new(NoopEnv), + abort_policy: AbortPolicy::default(), + deadline: Some(std::time::Instant::now() + Duration::from_secs(30)), + internal, + } + } + + fn echo_handler() -> Handler { + make_handler( + |input, context| async move { ResponseEnvelope::ok(context.request_id, input) }, + ) + } + + fn error_handler() -> Handler { + make_handler(|_input, context| async move { + ResponseEnvelope::error(context.request_id, CallError::internal("handler failure")) + }) + } + + fn external_spec(name: &str, acl: AccessControl) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Query, + Visibility::External, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ) + } + + fn internal_spec(name: &str, acl: AccessControl) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Query, + Visibility::Internal, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ) + } + + fn identity_with_scopes(id: &str, scopes: &[&str]) -> Identity { + Identity { + id: id.to_string(), + scopes: scopes.iter().map(|s| s.to_string()).collect(), + resources: HashMap::new(), + } + } + + #[tokio::test] + async fn register_and_invoke_simple_operation() { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + external_spec("echo", AccessControl::default()), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ctx = root_context("req-1", None, None, false, ScopedOperationEnv::empty()); + let response = registry + .invoke("echo", serde_json::json!({"hi": 1}), ctx) + .await; + assert_eq!(response.request_id, "req-1"); + assert_eq!(response.result, Ok(serde_json::json!({"hi": 1}))); + } + + #[tokio::test] + async fn internal_op_from_external_call_returns_not_found() { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + internal_spec("secret", AccessControl::default()), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ctx = root_context("req-2", None, None, false, ScopedOperationEnv::empty()); + let response = registry.invoke("secret", serde_json::json!({}), ctx).await; + match response.result { + Err(e) => { + assert_eq!(e.code, "NOT_FOUND"); + assert!(e.message.contains("secret")); + } + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + + #[tokio::test] + async fn internal_op_from_internal_call_invokes_handler() { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + internal_spec("secret", AccessControl::default()), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ctx = root_context("req-3", None, None, true, ScopedOperationEnv::empty()); + let response = registry + .invoke("secret", serde_json::json!({"x": 2}), ctx) + .await; + assert_eq!(response.request_id, "req-3"); + assert_eq!(response.result, Ok(serde_json::json!({"x": 2}))); + } + + #[tokio::test] + async fn unknown_op_returns_not_found() { + let registry = OperationRegistry::new(); + let ctx = root_context("req-4", None, None, false, ScopedOperationEnv::empty()); + let response = registry.invoke("missing", serde_json::json!({}), ctx).await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND"), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + + #[tokio::test] + async fn acl_sufficient_scopes_allowed() { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + external_spec( + "admin", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ctx = root_context( + "req-5", + Some(identity_with_scopes("caller", &["admin"])), + None, + false, + ScopedOperationEnv::empty(), + ); + let response = registry.invoke("admin", serde_json::json!({}), ctx).await; + assert!(response.result.is_ok()); + } + + #[tokio::test] + async fn acl_insufficient_scopes_forbidden() { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + external_spec( + "admin", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ctx = root_context( + "req-6", + Some(identity_with_scopes("caller", &["user"])), + None, + false, + ScopedOperationEnv::empty(), + ); + let response = registry.invoke("admin", serde_json::json!({}), ctx).await; + match response.result { + Err(e) => { + assert_eq!(e.code, "FORBIDDEN"); + assert!(e.message.contains("admin")); + } + other => panic!("expected FORBIDDEN, got {other:?}"), + } + } + + #[tokio::test] + async fn acl_restricted_op_no_identity_forbidden() { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + external_spec( + "admin", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ctx = root_context("req-7", None, None, false, ScopedOperationEnv::empty()); + let response = registry.invoke("admin", serde_json::json!({}), ctx).await; + match response.result { + Err(e) => { + assert_eq!(e.code, "FORBIDDEN"); + assert_eq!(e.message, "authentication required"); + } + other => panic!("expected FORBIDDEN, got {other:?}"), + } + } + + #[tokio::test] + async fn internal_call_acl_uses_handler_identity() { + let mut registry = OperationRegistry::new(); + let composing_authority = CompositionAuthority::new("agent-chat", ["admin".to_string()]); + registry.register(HandlerRegistration::new( + internal_spec( + "secret", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ctx = root_context( + "req-8", + Some(identity_with_scopes("user", &["user"])), + Some(composing_authority), + true, + ScopedOperationEnv::empty(), + ); + let response = registry.invoke("secret", serde_json::json!({}), ctx).await; + assert!( + response.result.is_ok(), + "internal call should use handler_identity (admin), not caller (user)" + ); + } + + #[tokio::test] + async fn internal_call_acl_insufficient_handler_identity_forbidden() { + let mut registry = OperationRegistry::new(); + let weak_authority = CompositionAuthority::new("weak", ["user".to_string()]); + registry.register(HandlerRegistration::new( + internal_spec( + "secret", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ctx = root_context( + "req-9", + Some(identity_with_scopes("user", &["admin"])), + Some(weak_authority), + true, + ScopedOperationEnv::empty(), + ); + let response = registry.invoke("secret", serde_json::json!({}), ctx).await; + match response.result { + Err(e) => { + assert_eq!(e.code, "FORBIDDEN"); + assert!(e.message.contains("admin")); + } + other => panic!("expected FORBIDDEN, got {other:?}"), + } + } + + #[tokio::test] + async fn external_call_acl_uses_caller_identity_not_handler_identity() { + let mut registry = OperationRegistry::new(); + let handler_authority = CompositionAuthority::new("agent", ["admin".to_string()]); + registry.register(HandlerRegistration::new( + external_spec( + "gate", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + echo_handler(), + OperationProvenance::Local, + Some(handler_authority), + None, + Capabilities::new(), + )); + let ctx = root_context( + "req-10", + Some(identity_with_scopes("user", &["user"])), + Some(CompositionAuthority::new("agent", ["admin".to_string()])), + false, + ScopedOperationEnv::empty(), + ); + let response = registry.invoke("gate", serde_json::json!({}), ctx).await; + match response.result { + Err(e) => assert_eq!(e.code, "FORBIDDEN"), + other => panic!("expected FORBIDDEN, got {other:?}"), + } + } + + #[tokio::test] + async fn list_operations_returns_external_only() { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + external_spec("echo", AccessControl::default()), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + registry.register(HandlerRegistration::new( + internal_spec("secret", AccessControl::default()), + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ops = registry.list_operations(); + assert_eq!(ops.len(), 1); + assert_eq!(ops[0].name, "echo"); + } + + #[tokio::test] + async fn handler_returned_error_passes_through() { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + external_spec("boom", AccessControl::default()), + error_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let ctx = root_context("req-11", None, None, false, ScopedOperationEnv::empty()); + let response = registry.invoke("boom", serde_json::json!({}), ctx).await; + match response.result { + Err(e) => assert_eq!(e.code, "INTERNAL"), + other => panic!("expected INTERNAL error, got {other:?}"), + } + } + + #[test] + fn builder_with_local_sets_provenance_local() { + let registry = OperationRegistryBuilder::new() + .with_local( + external_spec("echo", AccessControl::default()), + echo_handler(), + CompositionAuthority::none(), + ScopedOperationEnv::empty().into(), + Capabilities::new(), + ) + .build(); + let reg = registry.registration("echo").expect("registered"); + assert_eq!(reg.provenance, OperationProvenance::Local); + assert!(reg.composition_authority.is_none()); + assert!(reg.scoped_env.is_some()); + } + + #[test] + fn builder_with_local_carries_authority_and_scoped_env() { + let registry = OperationRegistryBuilder::new() + .with_local( + external_spec("agent", AccessControl::default()), + echo_handler(), + Some(CompositionAuthority::new("agent", ["fs:read".to_string()])), + Some(ScopedOperationEnv::new(["fs/readFile"])), + Capabilities::new(), + ) + .build(); + let reg = registry.registration("agent").expect("registered"); + assert_eq!(reg.provenance, OperationProvenance::Local); + let authority = reg.composition_authority.as_ref().expect("authority set"); + assert_eq!(authority.label, "agent"); + assert_eq!(authority.scopes, vec!["fs:read".to_string()]); + assert!(reg.scoped_env.is_some()); + assert!(reg.scoped_env.as_ref().unwrap().allows("fs/readFile")); + } + + #[test] + fn builder_with_leaf_sets_provenance_and_no_authority() { + let registry = OperationRegistryBuilder::new() + .with_leaf( + external_spec("vastai", AccessControl::default()), + echo_handler(), + Capabilities::new(), + ) + .build(); + let reg = registry.registration("vastai").expect("registered"); + assert_eq!(reg.provenance, OperationProvenance::FromOpenAPI); + assert!(reg.composition_authority.is_none()); + assert!(reg.scoped_env.is_none()); + } + + #[test] + fn builder_with_leaf_provenance_overrides_provenance() { + let registry = OperationRegistryBuilder::new() + .with_leaf_provenance( + external_spec("remote", AccessControl::default()), + echo_handler(), + OperationProvenance::FromCall, + Capabilities::new(), + ) + .build(); + let reg = registry.registration("remote").expect("registered"); + assert_eq!(reg.provenance, OperationProvenance::FromCall); + assert!(reg.composition_authority.is_none()); + assert!(reg.scoped_env.is_none()); + } + + #[test] + fn builder_with_takes_full_bundle() { + let registration = HandlerRegistration::new( + external_spec("agent", AccessControl::default()), + echo_handler(), + OperationProvenance::Session, + Some(CompositionAuthority::new("sandbox", [])), + Some(ScopedOperationEnv::new(["fs/readFile"])), + Capabilities::new(), + ); + let registry = OperationRegistryBuilder::new().with(registration).build(); + let reg = registry.registration("agent").expect("registered"); + assert_eq!(reg.provenance, OperationProvenance::Session); + assert!(reg.composition_authority.is_some()); + assert!(reg.scoped_env.is_some()); + } + + #[test] + fn builder_default_is_new() { + let builder = OperationRegistryBuilder::default(); + let registry = builder.build(); + assert!(registry.list_operations().is_empty()); + } + + #[test] + fn registry_default_is_new() { + let registry = OperationRegistry::default(); + assert!(registry.list_operations().is_empty()); + assert!(registry.registration("anything").is_none()); + } + + #[test] + fn registration_lookup_returns_bundle_fields() { + let mut registry = OperationRegistry::new(); + let authority = CompositionAuthority::new("agent", ["fs:read".to_string()]); + let scoped = ScopedOperationEnv::new(["fs/readFile"]); + let caps = Capabilities::new().with_api_key("google", "k".to_string()); + registry.register(HandlerRegistration::new( + external_spec("agent", AccessControl::default()), + echo_handler(), + OperationProvenance::Local, + Some(authority.clone()), + Some(scoped.clone()), + caps.clone(), + )); + let reg = registry.registration("agent").expect("found"); + assert_eq!(reg.spec.name, "agent"); + assert_eq!(reg.provenance, OperationProvenance::Local); + assert_eq!(reg.composition_authority.as_ref().unwrap().label, "agent"); + assert!(reg.scoped_env.as_ref().unwrap().allows("fs/readFile")); + } +}