diff --git a/crates/alknet-http/src/gateway/dispatch.rs b/crates/alknet-http/src/gateway/dispatch.rs index 6867db6..53688a7 100644 --- a/crates/alknet-http/src/gateway/dispatch.rs +++ b/crates/alknet-http/src/gateway/dispatch.rs @@ -26,6 +26,7 @@ use alknet_call::registry::env::LocalOperationEnv; use alknet_call::registry::registration::OperationRegistry; use alknet_core::auth::{AuthToken, Identity, IdentityProvider}; use alknet_core::types::Capabilities; +use futures::stream::BoxStream; use serde_json::Value; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); @@ -70,11 +71,43 @@ impl GatewayDispatch { self.registry.invoke(&operation_name, input, context).await } + pub fn invoke_streaming( + &self, + identity: Option, + op: &str, + input: Value, + ) -> BoxStream<'static, ResponseEnvelope> { + let operation_name = strip_leading_slash(op).to_string(); + let request_id = uuid::Uuid::new_v4().to_string(); + let context = self.build_root_context_streaming(&request_id, &operation_name, identity); + self.registry + .invoke_streaming(&operation_name, input, context) + } + fn build_root_context( &self, request_id: &str, operation_name: &str, identity: Option, + ) -> OperationContext { + self.build_root_context_inner(request_id, operation_name, identity, true) + } + + fn build_root_context_streaming( + &self, + request_id: &str, + operation_name: &str, + identity: Option, + ) -> OperationContext { + self.build_root_context_inner(request_id, operation_name, identity, false) + } + + fn build_root_context_inner( + &self, + request_id: &str, + operation_name: &str, + identity: Option, + bounded: bool, ) -> OperationContext { let registration = self.registry.registration(operation_name); let (composition_authority, capabilities, scoped_env) = match registration { @@ -97,7 +130,7 @@ impl GatewayDispatch { forwarded_for: None, capabilities, metadata: HashMap::new(), - deadline: Some(Instant::now() + DEFAULT_TIMEOUT), + deadline: bounded.then(|| Instant::now() + DEFAULT_TIMEOUT), scoped_env, env, abort_policy: AbortPolicy::default(), @@ -117,10 +150,11 @@ mod tests { services_list_handler, services_list_spec, services_schema_handler, services_schema_spec, }; use alknet_call::registry::registration::{ - make_handler, HandlerKind, HandlerRegistration, OperationProvenance, + make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, OperationProvenance, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::AuthToken; + use futures::stream::StreamExt; use std::sync::Mutex as StdMutex; struct StaticIdentityProvider { @@ -235,6 +269,53 @@ mod tests { registry } + fn subscription_spec(name: &str, visibility: Visibility, acl: AccessControl) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Subscription, + visibility, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ) + } + + fn echo_streaming_handler() -> HandlerKind { + HandlerKind::Stream(make_streaming_handler(|input, context| { + futures::stream::iter(vec![ResponseEnvelope::ok(context.request_id, input)]) + })) + } + + fn registry_with_subscription( + name: &str, + visibility: Visibility, + acl: AccessControl, + ) -> OperationRegistry { + let mut registry = OperationRegistry::new(); + registry + .register(HandlerRegistration::new( + subscription_spec(name, visibility, acl), + echo_streaming_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + } + + async fn collect_stream( + mut stream: BoxStream<'static, ResponseEnvelope>, + ) -> Vec { + let mut out = Vec::new(); + while let Some(env) = stream.next().await { + out.push(env); + } + out + } + fn dispatch( registry: Arc, provider: Arc, @@ -548,4 +629,195 @@ mod tests { fn assert_concrete() {} assert_concrete::(); } + + #[tokio::test] + async fn invoke_streaming_on_subscription_returns_handler_stream() { + let registry = Arc::new(registry_with_subscription( + "events/stream", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = dispatch(registry, provider); + + let stream = dp.invoke_streaming(None, "events/stream", serde_json::json!({ "v": 7 })); + let items = collect_stream(stream).await; + assert_eq!(items.len(), 1); + assert_eq!(items[0].result, Ok(serde_json::json!({ "v": 7 }))); + } + + #[tokio::test] + async fn invoke_streaming_strips_leading_slash_from_operation_name() { + let registry = Arc::new(registry_with_subscription( + "events/stream", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = dispatch(registry, provider); + + let stream = dp.invoke_streaming(None, "/events/stream", serde_json::json!({})); + let items = collect_stream(stream).await; + assert_eq!(items.len(), 1); + assert!(items[0].result.is_ok()); + } + + #[tokio::test] + async fn invoke_streaming_on_unknown_op_yields_single_not_found() { + let registry = Arc::new(OperationRegistry::new()); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = dispatch(registry, provider); + + let stream = dp.invoke_streaming(None, "no/such", serde_json::json!({})); + let items = collect_stream(stream).await; + assert_eq!(items.len(), 1); + match &items[0].result { + Err(e) => { + assert_eq!(e.code, "NOT_FOUND"); + assert!(e.message.contains("no/such")); + } + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + + #[tokio::test] + async fn invoke_streaming_on_internal_op_from_external_yields_not_found() { + let registry = Arc::new(registry_with_subscription( + "secret/stream", + Visibility::Internal, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = dispatch(registry, provider); + + let stream = dp.invoke_streaming(None, "secret/stream", serde_json::json!({})); + let items = collect_stream(stream).await; + assert_eq!(items.len(), 1); + match &items[0].result { + Err(e) => { + assert_eq!(e.code, "NOT_FOUND"); + assert!(e.message.contains("secret/stream")); + } + other => panic!("expected NOT_FOUND, got {other:?}"), + } + } + + #[tokio::test] + async fn invoke_streaming_with_none_identity_and_restricted_op_yields_forbidden() { + let registry = Arc::new(registry_with_subscription( + "admin/stream", + Visibility::External, + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = dispatch(registry, provider); + + let stream = dp.invoke_streaming(None, "admin/stream", serde_json::json!({})); + let items = collect_stream(stream).await; + assert_eq!(items.len(), 1); + match &items[0].result { + Err(e) => { + assert_eq!(e.code, "FORBIDDEN"); + assert_eq!(e.message, "authentication required"); + } + other => panic!("expected FORBIDDEN, got {other:?}"), + } + } + + #[tokio::test] + async fn invoke_streaming_on_query_op_yields_invalid_operation_type() { + let registry = Arc::new(registry_with( + "echo/run", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = dispatch(registry, provider); + + let stream = dp.invoke_streaming(None, "echo/run", serde_json::json!({})); + let items = collect_stream(stream).await; + assert_eq!(items.len(), 1); + match &items[0].result { + Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"), + other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"), + } + } + + #[tokio::test] + async fn invoke_on_subscription_op_returns_invalid_operation_type() { + let registry = Arc::new(registry_with_subscription( + "events/stream", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = dispatch(registry, provider); + + let response = dp + .invoke(None, "events/stream", serde_json::json!({})) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"), + other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"), + } + } + + #[test] + fn build_root_context_streaming_sets_deadline_none() { + let registry = Arc::new(registry_with_subscription( + "events/stream", + Visibility::External, + AccessControl::default(), + )); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = dispatch(registry, provider); + + let ctx = dp.build_root_context_streaming("req-st-1", "events/stream", None); + assert!(!ctx.internal, "internal must be false for wire-ingress"); + assert!(ctx.forwarded_for.is_none(), "forwarded_for must be None"); + assert!(ctx.parent_request_id.is_none(), "root has no parent"); + assert!( + ctx.deadline.is_none(), + "deadline must be None for streaming" + ); + } + + #[test] + fn build_root_context_streaming_carries_registration_bundle_fields() { + let authority = alknet_call::registry::context::CompositionAuthority::new( + "agent", + ["fs:read".to_string()], + ); + let scoped = ScopedPeerEnv::new(["fs/readFile"]); + let caps = Capabilities::new().with_api_key("google", "k".to_string()); + + let mut registry = OperationRegistry::new(); + registry + .register(HandlerRegistration::new( + subscription_spec( + "agent/stream", + Visibility::External, + AccessControl::default(), + ), + echo_streaming_handler(), + OperationProvenance::Local, + Some(authority), + Some(scoped.clone()), + caps, + )) + .unwrap(); + let registry = Arc::new(registry); + let provider: Arc = Arc::new(StaticIdentityProvider::new()); + let dp = dispatch(registry, provider); + + let ctx = dp.build_root_context_streaming("req-st-2", "agent/stream", None); + assert!(ctx.handler_identity.is_some()); + assert_eq!(ctx.handler_identity.as_ref().unwrap().label, "agent"); + assert!(ctx.scoped_env.allows("fs/readFile")); + assert!(ctx.capabilities.get("google").is_some()); + assert!(ctx.deadline.is_none()); + } }