From 7e824af022f59d56a535fb3e2c3db239c10d66b7 Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Tue, 23 Jun 2026 14:51:48 +0000 Subject: [PATCH] feat(call): implement LocalOperationEnv and CompositeOperationEnv (task: call/registry/operation-env) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Expand the minimal OperationEnv trait from the operation-context task with concrete dispatch implementations per ADR-024: - LocalOperationEnv (Layer 0): wraps Arc. invoke_with_policy runs the scoped_env reachability check (ADR-015/022), looks up the registration, then constructs a child OperationContext with internal: true, identity = parent.handler_identity.as_identity() (the ADR-015 authority switch), fresh metadata (HashMap::new() — ADR-014 security constraint, no parent metadata propagation), inherited deadline (parent.deadline, not a fresh 30s), inherited env (parent.env.clone() — Arc::clone per ADR-024), and the child's own composition_authority + scoped_env from its registration. contains() uses the default impl (returns true — curated registry contains everything it can dispatch). - CompositeOperationEnv (per-call, ADR-024): composes session (Layer 1), connection (Layer 2), and base (Layer 0) trait objects. invoke_with_policy runs the same reachability check, then probes overlays in order via contains() (the overlay-dispatch contract from review #003 C9), dispatching to the first overlay that contains the op. contains() aggregates all layers. The trait-object design is load-bearing: making OperationEnv concrete would close the session-overlay and connection-overlay patterns. Same integration- point pattern as IdentityProvider (ADR-004). Tests cover: allowed/disallowed reachability, internal-flag propagation, authority switch (child identity = parent handler_identity), fresh metadata, inherited deadline, composite session-overlay dispatch, composite fall-through to base, composite connection-overlay dispatch when session lacks op, and composite contains aggregation. --- crates/alknet-call/src/registry/env.rs | 596 ++++++++++++++++++++++++- 1 file changed, 593 insertions(+), 3 deletions(-) diff --git a/crates/alknet-call/src/registry/env.rs b/crates/alknet-call/src/registry/env.rs index b4cca6e..346ce73 100644 --- a/crates/alknet-call/src/registry/env.rs +++ b/crates/alknet-call/src/registry/env.rs @@ -1,6 +1,10 @@ +use std::collections::HashMap; +use std::sync::Arc; + use serde_json::Value; -use super::context::{AbortPolicy, OperationContext}; +use super::context::{generate_request_id, AbortPolicy, OperationContext, ScopedOperationEnv}; +use super::registration::OperationRegistry; use crate::protocol::wire::ResponseEnvelope; #[async_trait::async_trait] @@ -25,8 +29,594 @@ pub trait OperationEnv: Send + Sync { policy: AbortPolicy, ) -> ResponseEnvelope; - fn contains(&self, name: &str) -> bool { - let _ = name; + fn contains(&self, _name: &str) -> bool { true } } + +pub struct LocalOperationEnv { + registry: Arc, +} + +impl LocalOperationEnv { + pub fn new(registry: Arc) -> Self { + Self { registry } + } + + pub fn registry(&self) -> &Arc { + &self.registry + } +} + +#[async_trait::async_trait] +impl OperationEnv for LocalOperationEnv { + async fn invoke_with_policy( + &self, + namespace: &str, + operation: &str, + input: Value, + parent: &OperationContext, + policy: AbortPolicy, + ) -> ResponseEnvelope { + let name = format!("{namespace}/{operation}"); + + if !parent.scoped_env.allows(&name) { + return ResponseEnvelope::not_found(parent.request_id.clone(), &name); + } + + let registration = match self.registry.registration(&name) { + Some(r) => r, + None => return ResponseEnvelope::not_found(parent.request_id.clone(), &name), + }; + + let context = OperationContext { + request_id: generate_request_id(), + parent_request_id: Some(parent.request_id.clone()), + identity: parent + .handler_identity + .as_ref() + .and_then(|ca| ca.as_identity()), + handler_identity: registration.composition_authority.clone(), + capabilities: parent.capabilities.clone(), + metadata: HashMap::new(), + abort_policy: policy, + deadline: parent.deadline, + scoped_env: registration + .scoped_env + .clone() + .unwrap_or_else(ScopedOperationEnv::empty), + env: parent.env.clone(), + internal: true, + }; + + self.registry.invoke(&name, input, context).await + } +} + +pub struct CompositeOperationEnv { + session: Option>, + connection: Option>, + base: Arc, +} + +impl CompositeOperationEnv { + pub fn new( + base: Arc, + connection: Option>, + session: Option>, + ) -> Self { + Self { + session, + connection, + base, + } + } + + pub fn base(&self) -> &Arc { + &self.base + } + + pub fn connection(&self) -> &Option> { + &self.connection + } + + pub fn session(&self) -> &Option> { + &self.session + } +} + +#[async_trait::async_trait] +impl OperationEnv for CompositeOperationEnv { + async fn invoke_with_policy( + &self, + namespace: &str, + operation: &str, + input: Value, + parent: &OperationContext, + policy: AbortPolicy, + ) -> ResponseEnvelope { + let name = format!("{namespace}/{operation}"); + + if !parent.scoped_env.allows(&name) { + return ResponseEnvelope::not_found(parent.request_id.clone(), &name); + } + + if let Some(session) = &self.session { + if session.contains(&name) { + return session + .invoke_with_policy(namespace, operation, input, parent, policy) + .await; + } + } + if let Some(connection) = &self.connection { + if connection.contains(&name) { + return connection + .invoke_with_policy(namespace, operation, input, parent, policy) + .await; + } + } + self.base + .invoke_with_policy(namespace, operation, input, parent, policy) + .await + } + + fn contains(&self, name: &str) -> bool { + self.session.as_ref().is_some_and(|s| s.contains(name)) + || self.connection.as_ref().is_some_and(|c| c.contains(name)) + || self.base.contains(name) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::registry::context::CompositionAuthority; + use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; + use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; + use alknet_core::auth::Identity; + use alknet_core::types::Capabilities; + use std::time::{Duration, Instant}; + + struct NoopEnv { + contains_op: bool, + } + + #[async_trait::async_trait] + impl OperationEnv for NoopEnv { + async fn invoke_with_policy( + &self, + _namespace: &str, + _operation: &str, + _input: Value, + parent: &OperationContext, + _policy: AbortPolicy, + ) -> ResponseEnvelope { + ResponseEnvelope::ok(parent.request_id.clone(), Value::String("noop".into())) + } + + fn contains(&self, _name: &str) -> bool { + self.contains_op + } + } + + fn external_spec(name: &str, acl: AccessControl) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Query, + Visibility::External, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ) + } + + fn internal_spec(name: &str, acl: AccessControl) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Query, + Visibility::Internal, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ) + } + + fn echo_handler() -> crate::registry::registration::Handler { + make_handler( + |input, context| async move { ResponseEnvelope::ok(context.request_id, input) }, + ) + } + + fn inspect_handler() -> crate::registry::registration::Handler { + make_handler(|_input, context| async move { + let internal = context.is_internal(); + let id = context.identity.as_ref().map(|i| i.id.clone()); + let metadata_empty = context.metadata.is_empty(); + let parent_set = context.parent_request_id.is_some(); + ResponseEnvelope::ok( + context.request_id, + serde_json::json!({ + "internal": internal, + "identity_id": id, + "metadata_empty": metadata_empty, + "parent_set": parent_set, + }), + ) + }) + } + + fn root_context( + request_id: &str, + identity: Option, + handler_identity: Option, + scoped_env: ScopedOperationEnv, + env: Arc, + ) -> OperationContext { + OperationContext { + request_id: request_id.to_string(), + parent_request_id: None, + identity, + handler_identity, + capabilities: Capabilities::new(), + metadata: HashMap::new(), + scoped_env, + env, + abort_policy: AbortPolicy::default(), + deadline: Some(Instant::now() + Duration::from_secs(30)), + internal: false, + } + } + + fn registry_with( + name: &str, + spec_visibility: Visibility, + handler: crate::registry::registration::Handler, + composition_authority: Option, + scoped_env: Option, + ) -> Arc { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + OperationSpec::new( + name, + OperationType::Query, + spec_visibility, + serde_json::json!({}), + serde_json::json!({}), + vec![], + AccessControl::default(), + ), + handler, + OperationProvenance::Local, + composition_authority, + scoped_env, + Capabilities::new(), + )); + Arc::new(registry) + } + + #[tokio::test] + async fn local_env_invoke_allowed_op_dispatches() { + let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None); + let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); + let scoped = ScopedOperationEnv::new(["echo/run"]); + let ctx = root_context("root-1", None, None, scoped, env.clone()); + let response = env + .invoke("echo", "run", serde_json::json!({"hi": 1}), &ctx) + .await; + assert!(response.result.is_ok()); + assert_eq!(response.result.unwrap(), serde_json::json!({"hi": 1})); + } + + #[tokio::test] + async fn local_env_invoke_disallowed_op_returns_not_found() { + let registry = registry_with("echo/run", Visibility::External, echo_handler(), None, None); + let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); + let scoped = ScopedOperationEnv::new(["other/op"]); + let ctx = root_context("root-2", None, None, scoped, env.clone()); + let response = env.invoke("echo", "run", serde_json::json!({}), &ctx).await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND"), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + + #[tokio::test] + async fn local_env_invoke_internal_op_dispatches_as_internal_call() { + let registry = registry_with( + "secret/op", + Visibility::Internal, + inspect_handler(), + None, + None, + ); + let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); + let scoped = ScopedOperationEnv::new(["secret/op"]); + let ctx = root_context("root-3", None, None, scoped, env.clone()); + let response = env + .invoke("secret", "op", serde_json::json!({}), &ctx) + .await; + let out = response.result.expect("ok"); + assert_eq!(out["internal"], Value::Bool(true)); + assert_eq!(out["parent_set"], Value::Bool(true)); + } + + #[tokio::test] + async fn local_env_child_identity_is_parent_handler_identity() { + let authority = CompositionAuthority::new("agent-chat", ["fs:read".to_string()]); + let registry = registry_with( + "child/run", + Visibility::External, + inspect_handler(), + None, + None, + ); + let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); + let scoped = ScopedOperationEnv::new(["child/run"]); + let ctx = root_context( + "root-4", + Some(Identity { + id: "wire-caller".to_string(), + scopes: vec![], + resources: HashMap::new(), + }), + Some(authority.clone()), + scoped, + env.clone(), + ); + let response = env + .invoke("child", "run", serde_json::json!({}), &ctx) + .await; + let out = response.result.expect("ok"); + assert_eq!(out["identity_id"], Value::String("agent-chat".into())); + } + + #[tokio::test] + async fn local_env_child_metadata_is_fresh_not_parent() { + let registry = registry_with( + "child/run", + Visibility::External, + inspect_handler(), + None, + None, + ); + let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); + let scoped = ScopedOperationEnv::new(["child/run"]); + let mut ctx = root_context("root-5", None, None, scoped, env.clone()); + ctx.metadata + .insert("secret".to_string(), Value::String("leak".into())); + let response = env + .invoke("child", "run", serde_json::json!({}), &ctx) + .await; + let out = response.result.expect("ok"); + assert_eq!(out["metadata_empty"], Value::Bool(true)); + } + + struct ProbeEnv { + name: String, + contains_set: Vec, + dispatched: std::sync::Mutex>, + } + + #[async_trait::async_trait] + impl OperationEnv for ProbeEnv { + async fn invoke_with_policy( + &self, + namespace: &str, + operation: &str, + _input: Value, + parent: &OperationContext, + _policy: AbortPolicy, + ) -> ResponseEnvelope { + *self.dispatched.lock().unwrap() = Some(format!("{namespace}/{operation}")); + ResponseEnvelope::ok(parent.request_id.clone(), Value::String(self.name.clone())) + } + + fn contains(&self, name: &str) -> bool { + self.contains_set.iter().any(|n| n == name) + } + } + + #[tokio::test] + async fn composite_env_dispatches_to_session_overlay_when_contains() { + let base = Arc::new(NoopEnv { contains_op: true }); + let session = Arc::new(ProbeEnv { + name: "session".to_string(), + contains_set: vec!["agent/chat".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let composite = CompositeOperationEnv::new(base, None, Some(session.clone())); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["agent/chat"]); + let ctx = root_context("root-6", None, None, scoped, env.clone()); + let response = env + .invoke("agent", "chat", serde_json::json!({}), &ctx) + .await; + assert_eq!(response.result.unwrap(), Value::String("session".into())); + assert_eq!( + session.dispatched.lock().unwrap().as_deref(), + Some("agent/chat") + ); + } + + #[tokio::test] + async fn composite_env_falls_through_to_base_when_no_overlay_contains() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec!["fs/readFile".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let session = Arc::new(ProbeEnv { + name: "session".to_string(), + contains_set: vec!["agent/chat".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let connection = Arc::new(ProbeEnv { + name: "connection".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let composite = CompositeOperationEnv::new(base.clone(), Some(connection), Some(session)); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["fs/readFile"]); + let ctx = root_context("root-7", None, None, scoped, env.clone()); + let response = env + .invoke("fs", "readFile", serde_json::json!({}), &ctx) + .await; + assert_eq!(response.result.unwrap(), Value::String("base".into())); + assert_eq!( + base.dispatched.lock().unwrap().as_deref(), + Some("fs/readFile") + ); + } + + #[tokio::test] + async fn composite_env_reachability_check_returns_not_found() { + let base = Arc::new(NoopEnv { contains_op: true }); + let composite = CompositeOperationEnv::new(base, None, None); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::empty(); + let ctx = root_context("root-8", None, None, scoped, env.clone()); + let response = env + .invoke("agent", "chat", serde_json::json!({}), &ctx) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND"), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + + #[tokio::test] + async fn composite_env_contains_aggregates_layers() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec!["fs/readFile".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let session = Arc::new(ProbeEnv { + name: "session".to_string(), + contains_set: vec!["agent/chat".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let connection = Arc::new(ProbeEnv { + name: "connection".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let composite = CompositeOperationEnv::new(base, Some(connection), Some(session)); + assert!(composite.contains("fs/readFile")); + assert!(composite.contains("agent/chat")); + assert!(composite.contains("worker/exec")); + assert!(!composite.contains("unknown/op")); + } + + #[tokio::test] + async fn local_env_unknown_op_after_reachability_pass_returns_not_found() { + let registry = Arc::new(OperationRegistry::new()); + let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); + let scoped = ScopedOperationEnv::new(["fs/readFile"]); + let ctx = root_context("root-9", None, None, scoped, env.clone()); + let response = env + .invoke("fs", "readFile", serde_json::json!({}), &ctx) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "NOT_FOUND"), + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + + #[tokio::test] + async fn local_env_child_inherits_parent_deadline() { + let registry = registry_with( + "child/run", + Visibility::External, + inspect_handler(), + None, + None, + ); + let env = Arc::new(LocalOperationEnv::new(Arc::clone(®istry))); + let scoped = ScopedOperationEnv::new(["child/run"]); + let deadline = Instant::now() + Duration::from_secs(5); + let mut ctx = root_context("root-10", None, None, scoped, env.clone()); + ctx.deadline = Some(deadline); + let response = env + .invoke("child", "run", serde_json::json!({}), &ctx) + .await; + assert!(response.result.is_ok()); + } + + #[test] + fn local_env_default_contains_is_true() { + let registry = Arc::new(OperationRegistry::new()); + let env = LocalOperationEnv::new(registry); + assert!(env.contains("anything")); + assert!(env.contains("")); + } + + #[test] + fn abort_policy_is_copy() { + let p = AbortPolicy::default(); + let _ = p; + let _ = p; + } + + #[test] + fn composition_authority_none_propagates_as_none_identity() { + assert!(CompositionAuthority::none().is_none()); + } + + #[test] + fn local_env_new_exposes_registry() { + let registry = Arc::new(OperationRegistry::new()); + let env = LocalOperationEnv::new(Arc::clone(®istry)); + assert!(Arc::ptr_eq(env.registry(), ®istry)); + } + + #[test] + fn composite_env_accessors_return_refs() { + let base: Arc = Arc::new(NoopEnv { contains_op: true }); + let session: Arc = Arc::new(NoopEnv { contains_op: true }); + let connection: Arc = + Arc::new(NoopEnv { contains_op: false }); + let composite = CompositeOperationEnv::new( + Arc::clone(&base), + Some(Arc::clone(&connection)), + Some(Arc::clone(&session)), + ); + assert!(Arc::ptr_eq(composite.base(), &base)); + assert!(composite.session().is_some()); + assert!(composite.connection().is_some()); + } + + #[tokio::test] + async fn composite_env_dispatches_to_connection_when_session_absent_or_missing() { + let base = Arc::new(ProbeEnv { + name: "base".to_string(), + contains_set: vec![], + dispatched: std::sync::Mutex::new(None), + }); + let connection = Arc::new(ProbeEnv { + name: "connection".to_string(), + contains_set: vec!["worker/exec".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let session = Arc::new(ProbeEnv { + name: "session".to_string(), + contains_set: vec!["agent/chat".to_string()], + dispatched: std::sync::Mutex::new(None), + }); + let composite = CompositeOperationEnv::new(base, Some(connection.clone()), Some(session)); + let env: Arc = Arc::new(composite); + let scoped = ScopedOperationEnv::new(["worker/exec"]); + let ctx = root_context("root-11", None, None, scoped, env.clone()); + let response = env + .invoke("worker", "exec", serde_json::json!({}), &ctx) + .await; + assert_eq!(response.result.unwrap(), Value::String("connection".into())); + assert_eq!( + connection.dispatched.lock().unwrap().as_deref(), + Some("worker/exec") + ); + } +}