diff --git a/crates/alknet-call/src/client/call_client.rs b/crates/alknet-call/src/client/call_client.rs index 26e96ba..f213bd5 100644 --- a/crates/alknet-call/src/client/call_client.rs +++ b/crates/alknet-call/src/client/call_client.rs @@ -572,7 +572,7 @@ mod tests { use crate::protocol::connection::CallConnection; use crate::protocol::wire::ResponseEnvelope; use crate::registry::registration::{ - make_handler, Handler, HandlerRegistration, OperationProvenance, + make_handler, Handler, HandlerKind, HandlerRegistration, OperationProvenance, }; use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::Identity; @@ -640,14 +640,16 @@ mod tests { fn registry_with_caps() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("pub/run"), - caps_inspect_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new().with_api_key("google", "pub-key".to_string()), - )); + registry + .register(HandlerRegistration::new( + external_spec("pub/run"), + HandlerKind::Once(caps_inspect_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new().with_api_key("google", "pub-key".to_string()), + )) + .unwrap(); Arc::new(registry) } @@ -709,7 +711,9 @@ mod tests { let client = CallClient::new(Arc::clone(®istry), Arc::new(NoopIdentityProvider)); let conn = client.spawn_dispatch(stub_connection()); assert_eq!( - conn.connection().expect("quic connection present").remote_alpn(), + conn.connection() + .expect("quic connection present") + .remote_alpn(), b"alknet/call" ); std::mem::drop(conn); diff --git a/crates/alknet-call/src/client/from_call.rs b/crates/alknet-call/src/client/from_call.rs index 5d4fcb1..cfd83eb 100644 --- a/crates/alknet-call/src/client/from_call.rs +++ b/crates/alknet-call/src/client/from_call.rs @@ -19,7 +19,9 @@ use crate::client::AdapterError; use crate::protocol::connection::CallConnection; use crate::protocol::wire::ResponseEnvelope; use crate::registry::context::OperationContext; -use crate::registry::registration::{Handler, HandlerRegistration, OperationProvenance}; +use crate::registry::registration::{ + Handler, HandlerKind, HandlerRegistration, OperationProvenance, +}; use crate::registry::spec::{ AccessControl, ErrorDefinition, OperationSpec, OperationType, Visibility, }; @@ -128,7 +130,7 @@ fn build_bundles( ); bundles.push(HandlerRegistration::new( spec, - handler, + HandlerKind::Once(handler), OperationProvenance::FromCall, None, None, @@ -549,7 +551,7 @@ mod tests { ); let reg = HandlerRegistration::new( spec, - handler, + HandlerKind::Once(handler), OperationProvenance::FromCall, None, None, diff --git a/crates/alknet-call/src/client/from_jsonschema.rs b/crates/alknet-call/src/client/from_jsonschema.rs index 90eb86a..fda7723 100644 --- a/crates/alknet-call/src/client/from_jsonschema.rs +++ b/crates/alknet-call/src/client/from_jsonschema.rs @@ -11,7 +11,9 @@ use serde_json::Value; use crate::client::{AdapterError, OperationAdapter}; use crate::protocol::wire::{CallError, ResponseEnvelope}; use crate::registry::context::OperationContext; -use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; +use crate::registry::registration::{ + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, +}; use crate::registry::spec::OperationSpec; /// Build a [`HandlerRegistration`] from a JSON Schema-described operation. @@ -30,7 +32,7 @@ pub fn from_jsonschema(spec: OperationSpec, _schema: Value) -> HandlerRegistrati }); HandlerRegistration::new( spec, - handler, + HandlerKind::Once(handler), OperationProvenance::FromJsonSchema, None, None, @@ -138,7 +140,10 @@ mod tests { async fn placeholder_handler_returns_error_when_invoked() { let bundle = from_jsonschema_fn::from_jsonschema(test_spec("ns/op"), serde_json::json!({})); let ctx = test_context("req-1"); - let response = (bundle.handler)(serde_json::json!({}), ctx).await; + let response = match &bundle.handler { + HandlerKind::Once(h) => h(serde_json::json!({}), ctx).await, + _ => panic!("expected Once handler"), + }; match response.result { Err(e) => { assert_eq!(e.code, "NOT_FOUND"); diff --git a/crates/alknet-call/src/protocol/adapter.rs b/crates/alknet-call/src/protocol/adapter.rs index f79c154..aa6c617 100644 --- a/crates/alknet-call/src/protocol/adapter.rs +++ b/crates/alknet-call/src/protocol/adapter.rs @@ -166,7 +166,9 @@ mod tests { }; use crate::registry::context::{AbortPolicy, OperationContext, ScopedPeerEnv}; use crate::registry::env::OperationEnv; - use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; + use crate::registry::registration::{ + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, + }; use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::AuthToken; use alknet_core::types::Capabilities; @@ -245,22 +247,24 @@ mod tests { handler: crate::registry::registration::Handler, ) -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - OperationSpec::new( - name, - OperationType::Query, - visibility, - serde_json::json!({}), - serde_json::json!({}), - vec![], - acl, - ), - handler, - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + OperationSpec::new( + name, + OperationType::Query, + visibility, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ), + HandlerKind::Once(handler), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } @@ -421,14 +425,16 @@ mod tests { let mut registry = OperationRegistry::new(); let scoped = ScopedPeerEnv::new(["fs/readFile"]); let caps = Capabilities::new().with_api_key("google", "k".to_string()); - registry.register(HandlerRegistration::new( - external_spec("agent/run", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - Some(scoped.clone()), - caps.clone(), - )); + registry + .register(HandlerRegistration::new( + external_spec("agent/run", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + Some(scoped.clone()), + caps.clone(), + )) + .unwrap(); let registry = Arc::new(registry); let provider: Arc = Arc::new(StaticIdentityProvider::new()); let adapter = CallAdapter::new(registry, provider); @@ -543,7 +549,7 @@ mod tests { vec![], AccessControl::default(), ), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::FromCall, None, None, @@ -610,7 +616,7 @@ mod tests { vec![], AccessControl::default(), ), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::FromCall, None, None, diff --git a/crates/alknet-call/src/protocol/connection.rs b/crates/alknet-call/src/protocol/connection.rs index 7a3f799..f49b71e 100644 --- a/crates/alknet-call/src/protocol/connection.rs +++ b/crates/alknet-call/src/protocol/connection.rs @@ -26,7 +26,7 @@ use super::wire::{ use crate::protocol::wire::ResponseEnvelope; use crate::registry::context::{generate_request_id, AbortPolicy, OperationContext, ScopedPeerEnv}; use crate::registry::env::OperationEnv; -use crate::registry::registration::{Handler, HandlerRegistration}; +use crate::registry::registration::{HandlerKind, HandlerRegistration}; use crate::registry::spec::AccessResult; const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30); @@ -307,7 +307,7 @@ impl OperationEnv for OverlayOperationEnv { return ResponseEnvelope::not_found(parent.request_id.clone(), &name); } - let handler: Handler; + let handler: HandlerKind; let composition_authority; let scoped_env; let access_control; @@ -316,7 +316,7 @@ impl OperationEnv for OverlayOperationEnv { let Some(registration) = overlay.get(&name) else { return ResponseEnvelope::not_found(parent.request_id.clone(), &name); }; - handler = Arc::clone(®istration.handler); + handler = registration.handler.clone(); composition_authority = registration.composition_authority.clone(); scoped_env = registration .scoped_env @@ -355,7 +355,15 @@ impl OperationEnv for OverlayOperationEnv { internal: true, }; - handler(input, context).await + match handler { + HandlerKind::Once(h) => h(input, context).await, + HandlerKind::Stream(_) => ResponseEnvelope::error( + parent.request_id.clone(), + CallError::invalid_operation_type( + "OperationEnv::invoke() called on a Subscription op; composition is request/response-only", + ), + ), + } } fn contains(&self, name: &str) -> bool { @@ -421,7 +429,7 @@ impl Stream for SubscriptionStream { mod tests { use super::*; use crate::registry::context::CompositionAuthority; - use crate::registry::registration::{make_handler, OperationProvenance}; + use crate::registry::registration::{make_handler, Handler, HandlerKind, OperationProvenance}; use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::types::{Capabilities, MockConnection}; use std::collections::HashMap; @@ -476,7 +484,7 @@ mod tests { fn imported_registration(name: &str) -> HandlerRegistration { HandlerRegistration::new( external_spec(name), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::FromCall, None, None, @@ -608,7 +616,7 @@ mod tests { }); conn.register_imported(HandlerRegistration::new( external_spec("worker/exec"), - inspect_handler, + HandlerKind::Once(inspect_handler), OperationProvenance::FromCall, None, None, @@ -631,7 +639,9 @@ mod tests { fn connection_accessor_returns_underlying_connection() { let conn = CallConnection::new(stub_connection()); assert_eq!( - conn.connection().expect("quic connection present").remote_alpn(), + conn.connection() + .expect("quic connection present") + .remote_alpn(), b"alknet/call" ); } @@ -960,4 +970,39 @@ mod tests { assert!(conn.connection().is_some(), "QUIC connection present"); assert!(conn.identity().is_none(), "no identity set yet"); } + + #[tokio::test] + async fn overlay_env_invoke_on_stream_kind_returns_invalid_operation_type() { + use crate::registry::registration::make_streaming_handler; + let conn = CallConnection::new(stub_connection()); + let streaming_handler = make_streaming_handler(|input, ctx| { + futures::stream::iter(vec![ResponseEnvelope::ok(ctx.request_id, input)]) + }); + conn.register_imported(HandlerRegistration::new( + OperationSpec::new( + "events/stream", + OperationType::Subscription, + Visibility::External, + serde_json::json!({}), + serde_json::json!({}), + vec![], + AccessControl::default(), + ), + HandlerKind::Stream(streaming_handler), + OperationProvenance::FromCall, + None, + None, + Capabilities::new(), + )); + let env = conn.overlay_env(); + let scoped = ScopedPeerEnv::new(["events/stream"]); + let ctx = root_context("root-stream", scoped, env.clone()); + let response = env + .invoke("events", "stream", serde_json::json!({}), &ctx) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"), + other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"), + } + } } diff --git a/crates/alknet-call/src/protocol/dispatch.rs b/crates/alknet-call/src/protocol/dispatch.rs index 172722b..46c6a63 100644 --- a/crates/alknet-call/src/protocol/dispatch.rs +++ b/crates/alknet-call/src/protocol/dispatch.rs @@ -326,7 +326,9 @@ impl Clone for Dispatcher { mod tests { use super::*; use crate::protocol::wire::EVENT_RESPONDED; - use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; + use crate::registry::registration::{ + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, + }; use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::{AuthToken, Identity, IdentityProvider}; use alknet_core::types::{Capabilities, MockConnection}; @@ -412,24 +414,26 @@ mod tests { fn registry_with(name: &str, visibility: Visibility, acl: AccessControl) -> OperationRegistry { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - OperationSpec::new( - name, - OperationType::Query, - visibility, - serde_json::json!({}), - serde_json::json!({}), - vec![], - acl, - ), - make_handler(|input, context| async move { - ResponseEnvelope::ok(context.request_id, input) - }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + OperationSpec::new( + name, + OperationType::Query, + visibility, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ), + HandlerKind::Once(make_handler(|input, context| async move { + ResponseEnvelope::ok(context.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); registry } @@ -451,14 +455,16 @@ mod tests { serde_json::json!({ "has_google": has_google }), ) }); - registry.register(HandlerRegistration::new( - external_spec("admin/run", AccessControl::default()), - handler, - OperationProvenance::Local, - None, - None, - caps, - )); + registry + .register(HandlerRegistration::new( + external_spec("admin/run", AccessControl::default()), + HandlerKind::Once(handler), + OperationProvenance::Local, + None, + None, + caps, + )) + .unwrap(); let registry = Arc::new(registry); let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = Dispatcher::new(registry, provider); @@ -486,20 +492,22 @@ mod tests { serde_json::json!({ "has_google": has_google }), ) }); - registry.register(HandlerRegistration::new( - external_spec( - "admin/run", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - handler, - OperationProvenance::Local, - None, - None, - caps, - )); + registry + .register(HandlerRegistration::new( + external_spec( + "admin/run", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(handler), + OperationProvenance::Local, + None, + None, + caps, + )) + .unwrap(); let registry = Arc::new(registry); let provider: Arc = Arc::new( StaticIdentityProvider::new() @@ -609,14 +617,16 @@ mod tests { serde_json::json!({ "forwarded_for_id": forwarded_id }), ) }); - registry.register(HandlerRegistration::new( - external_spec("fs/readFile", AccessControl::default()), - handler, - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("fs/readFile", AccessControl::default()), + HandlerKind::Once(handler), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let registry = Arc::new(registry); let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = Dispatcher::new(registry, provider); @@ -648,14 +658,16 @@ mod tests { serde_json::json!({ "present": present }), ) }); - registry.register(HandlerRegistration::new( - external_spec("fs/readFile", AccessControl::default()), - handler, - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("fs/readFile", AccessControl::default()), + HandlerKind::Once(handler), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let registry = Arc::new(registry); let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = Dispatcher::new(registry, provider); @@ -736,14 +748,16 @@ mod tests { serde_json::json!({ "peer_ids": peer_ids }), ) }); - registry.register(HandlerRegistration::new( - external_spec("fs/readFile", AccessControl::default()), - handler, - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("fs/readFile", AccessControl::default()), + HandlerKind::Once(handler), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let registry = Arc::new(registry); let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = Dispatcher::new(registry, provider); @@ -795,7 +809,11 @@ mod tests { let child_id = "ws-abort-child".to_string(); { let mut pending = conn.pending().lock(); - pending.register_call(parent_id.clone(), Instant::now() + Duration::from_secs(30), None); + pending.register_call( + parent_id.clone(), + Instant::now() + Duration::from_secs(30), + None, + ); pending.register_call( child_id.clone(), Instant::now() + Duration::from_secs(30), @@ -844,11 +862,16 @@ mod tests { "input": { "v": 42 }, }); let request_id = "ws-roundtrip-1".to_string(); - let response = dp.dispatch_requested(&conn, request_id.clone(), payload).await; + let response = dp + .dispatch_requested(&conn, request_id.clone(), payload) + .await; assert!(response.result.is_ok()); let envelope: EventEnvelope = response.into(); assert_eq!(envelope.r#type, EVENT_RESPONDED); assert_eq!(envelope.id, "ws-roundtrip-1"); - assert_eq!(envelope.payload.get("output"), Some(&serde_json::json!({ "v": 42 }))); + assert_eq!( + envelope.payload.get("output"), + Some(&serde_json::json!({ "v": 42 })) + ); } } diff --git a/crates/alknet-call/src/protocol/wire.rs b/crates/alknet-call/src/protocol/wire.rs index 7a0d8ab..dec1ab3 100644 --- a/crates/alknet-call/src/protocol/wire.rs +++ b/crates/alknet-call/src/protocol/wire.rs @@ -105,6 +105,10 @@ impl CallError { pub fn timeout(message: impl Into) -> Self { Self::new("TIMEOUT", message, true) } + + pub fn invalid_operation_type(message: impl Into) -> Self { + Self::new("INVALID_OPERATION_TYPE", message, false) + } } impl Eq for CallError {} diff --git a/crates/alknet-call/src/registry/discovery.rs b/crates/alknet-call/src/registry/discovery.rs index 72cb13f..1ef9029 100644 --- a/crates/alknet-call/src/registry/discovery.rs +++ b/crates/alknet-call/src/registry/discovery.rs @@ -324,7 +324,10 @@ pub fn services_schema_handler(registry: Arc) -> Handler { mod tests { use super::*; use crate::registry::context::{CompositionAuthority, ScopedPeerEnv}; - use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; + use crate::registry::registration::{ + make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, + OperationProvenance, StreamingHandler, + }; use alknet_core::types::Capabilities; use std::collections::HashMap; use std::time::Duration; @@ -359,6 +362,12 @@ mod tests { ) } + fn echo_streaming_handler() -> StreamingHandler { + make_streaming_handler(|input, context| { + futures::stream::iter(vec![ResponseEnvelope::ok(context.request_id, input)]) + }) + } + fn noop_env() -> Arc { struct NoopEnv; #[async_trait::async_trait] @@ -439,36 +448,42 @@ mod tests { fn registry_with_access_controlled_ops() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec_with_acl("public/echo", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - external_spec_with_acl( - "admin/secret", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - internal_spec("internal/hidden"), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec_with_acl("public/echo", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + external_spec_with_acl( + "admin/secret", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + internal_spec("internal/hidden"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } @@ -485,59 +500,67 @@ mod tests { fn registry_with_ops() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("fs/readFile"), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - internal_spec("secret/internal"), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - OperationSpec::new( - "events/subscribe", - OperationType::Subscription, - Visibility::External, - json!({}), - json!({}), - vec![], - AccessControl::default(), - ), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - OperationSpec::new( - "fs/readFileErr", - OperationType::Query, - Visibility::External, - json!({}), - json!({}), - vec![super::super::spec::ErrorDefinition { - code: "FILE_NOT_FOUND".to_string(), - description: "file not found".to_string(), - schema: json!({ "type": "object" }), - http_status: None, - }], - AccessControl::default(), - ), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("fs/readFile"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + internal_spec("secret/internal"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + OperationSpec::new( + "events/subscribe", + OperationType::Subscription, + Visibility::External, + json!({}), + json!({}), + vec![], + AccessControl::default(), + ), + HandlerKind::Stream(echo_streaming_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + OperationSpec::new( + "fs/readFileErr", + OperationType::Query, + Visibility::External, + json!({}), + json!({}), + vec![super::super::spec::ErrorDefinition { + code: "FILE_NOT_FOUND".to_string(), + description: "file not found".to_string(), + schema: json!({ "type": "object" }), + http_status: None, + }], + AccessControl::default(), + ), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } @@ -669,22 +692,26 @@ mod tests { let schema_handler = services_schema_handler(Arc::clone(®istry)); let mut discovery_registry = OperationRegistry::new(); - discovery_registry.register(HandlerRegistration::new( - services_list_spec(), - list_handler, - OperationProvenance::Local, - CompositionAuthority::none(), - ScopedPeerEnv::empty().into(), - Capabilities::new(), - )); - discovery_registry.register(HandlerRegistration::new( - services_schema_spec(), - schema_handler, - OperationProvenance::Local, - CompositionAuthority::none(), - ScopedPeerEnv::empty().into(), - Capabilities::new(), - )); + discovery_registry + .register(HandlerRegistration::new( + services_list_spec(), + HandlerKind::Once(list_handler), + OperationProvenance::Local, + CompositionAuthority::none(), + ScopedPeerEnv::empty().into(), + Capabilities::new(), + )) + .unwrap(); + discovery_registry + .register(HandlerRegistration::new( + services_schema_spec(), + HandlerKind::Once(schema_handler), + OperationProvenance::Local, + CompositionAuthority::none(), + ScopedPeerEnv::empty().into(), + Capabilities::new(), + )) + .unwrap(); let discovery = Arc::new(discovery_registry); let ctx = root_context("req-6"); diff --git a/crates/alknet-call/src/registry/env.rs b/crates/alknet-call/src/registry/env.rs index 3b08f82..e660a38 100644 --- a/crates/alknet-call/src/registry/env.rs +++ b/crates/alknet-call/src/registry/env.rs @@ -303,7 +303,9 @@ impl OperationEnv for PeerCompositeEnv { mod tests { use super::*; use crate::registry::context::CompositionAuthority; - use crate::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; + use crate::registry::registration::{ + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, + }; use crate::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::Identity; use alknet_core::types::Capabilities; @@ -406,22 +408,24 @@ mod tests { scoped_env: Option, ) -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - OperationSpec::new( - name, - OperationType::Query, - spec_visibility, - serde_json::json!({}), - serde_json::json!({}), - vec![], - AccessControl::default(), - ), - handler, - OperationProvenance::Local, - composition_authority, - scoped_env, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + OperationSpec::new( + name, + OperationType::Query, + spec_visibility, + serde_json::json!({}), + serde_json::json!({}), + vec![], + AccessControl::default(), + ), + HandlerKind::Once(handler), + OperationProvenance::Local, + composition_authority, + scoped_env, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } diff --git a/crates/alknet-call/src/registry/registration.rs b/crates/alknet-call/src/registry/registration.rs index 8bae54b..f2700ed 100644 --- a/crates/alknet-call/src/registry/registration.rs +++ b/crates/alknet-call/src/registry/registration.rs @@ -4,11 +4,12 @@ use std::pin::Pin; use std::sync::Arc; use alknet_core::types::Capabilities; +use futures::stream::Stream; use serde_json::Value; use super::context::{CompositionAuthority, OperationContext, ScopedPeerEnv}; -use super::spec::{AccessResult, OperationSpec, Visibility}; -use crate::protocol::wire::ResponseEnvelope; +use super::spec::{AccessResult, OperationSpec, OperationType, Visibility}; +use crate::protocol::wire::{CallError, ResponseEnvelope}; pub type Handler = Arc< dyn Fn(Value, OperationContext) -> Pin + Send>> @@ -16,6 +17,20 @@ pub type Handler = Arc< + Sync, >; +pub type StreamingHandler = Arc< + dyn Fn(Value, OperationContext) -> Pin + Send>> + + Send + + Sync, +>; + +pub type ResponseStream = Pin + Send>>; + +#[derive(Clone)] +pub enum HandlerKind { + Once(Handler), + Stream(StreamingHandler), +} + #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum OperationProvenance { Local, @@ -28,7 +43,7 @@ pub enum OperationProvenance { pub struct HandlerRegistration { pub spec: OperationSpec, - pub handler: Handler, + pub handler: HandlerKind, pub provenance: OperationProvenance, pub composition_authority: Option, pub scoped_env: Option, @@ -38,7 +53,7 @@ pub struct HandlerRegistration { impl HandlerRegistration { pub fn new( spec: OperationSpec, - handler: Handler, + handler: HandlerKind, provenance: OperationProvenance, composition_authority: Option, scoped_env: Option, @@ -66,9 +81,24 @@ impl OperationRegistry { } } - pub fn register(&mut self, registration: HandlerRegistration) { + pub fn register(&mut self, registration: HandlerRegistration) -> Result<(), String> { + let expected = match registration.spec.op_type { + OperationType::Query | OperationType::Mutation => "Once", + OperationType::Subscription => "Stream", + }; + let actual = match registration.handler { + HandlerKind::Once(_) => "Once", + HandlerKind::Stream(_) => "Stream", + }; + if expected != actual { + return Err(format!( + "handler kind mismatch: {:?} requires HandlerKind::{} (got HandlerKind::{})", + registration.spec.op_type, expected, actual + )); + } self.operations .insert(registration.spec.name.clone(), registration); + Ok(()) } pub fn registration(&self, name: &str) -> Option<&HandlerRegistration> { @@ -113,8 +143,18 @@ impl OperationRegistry { return ResponseEnvelope::forbidden(request_id, message); } - let handler = Arc::clone(®istration.handler); - (handler)(input, context).await + match ®istration.handler { + HandlerKind::Once(handler) => { + let handler = Arc::clone(handler); + (handler)(input, context).await + } + HandlerKind::Stream(_) => ResponseEnvelope::error( + request_id, + CallError::invalid_operation_type( + "invoke() called on a Subscription op; use invoke_streaming()", + ), + ), + } } } @@ -135,10 +175,30 @@ impl OperationRegistryBuilder { } } - fn store(mut self, registration: HandlerRegistration) -> Self { - self.operations - .insert(registration.spec.name.clone(), registration); - self + fn store(mut self, registration: HandlerRegistration) -> Result { + let name = registration.spec.name.clone(); + self.operations.insert(name, registration); + Ok(self) + } + + fn wrap_once(spec: &OperationSpec, handler: Handler) -> Result { + match spec.op_type { + OperationType::Query | OperationType::Mutation => Ok(HandlerKind::Once(handler)), + OperationType::Subscription => Err(format!( + "handler kind mismatch: {:?} requires HandlerKind::Stream (got Handler)", + spec.op_type + )), + } + } + + fn wrap_stream(spec: &OperationSpec, handler: StreamingHandler) -> Result { + match spec.op_type { + OperationType::Subscription => Ok(HandlerKind::Stream(handler)), + OperationType::Query | OperationType::Mutation => Err(format!( + "handler kind mismatch: {:?} requires HandlerKind::Once (got StreamingHandler)", + spec.op_type + )), + } } pub fn with_local( @@ -148,10 +208,31 @@ impl OperationRegistryBuilder { composition_authority: Option, scoped_env: Option, capabilities: Capabilities, - ) -> Self { + ) -> Result { + let kind = Self::wrap_once(&spec, handler)?; let registration = HandlerRegistration::new( spec, - handler, + kind, + OperationProvenance::Local, + composition_authority, + scoped_env, + capabilities, + ); + self.store(registration) + } + + pub fn with_local_streaming( + self, + spec: OperationSpec, + handler: StreamingHandler, + composition_authority: Option, + scoped_env: Option, + capabilities: Capabilities, + ) -> Result { + let kind = Self::wrap_stream(&spec, handler)?; + let registration = HandlerRegistration::new( + spec, + kind, OperationProvenance::Local, composition_authority, scoped_env, @@ -165,7 +246,7 @@ impl OperationRegistryBuilder { spec: OperationSpec, handler: Handler, capabilities: Capabilities, - ) -> Self { + ) -> Result { self.with_leaf_provenance( spec, handler, @@ -180,13 +261,41 @@ impl OperationRegistryBuilder { handler: Handler, provenance: OperationProvenance, capabilities: Capabilities, - ) -> Self { + ) -> Result { + let kind = Self::wrap_once(&spec, handler)?; let registration = - HandlerRegistration::new(spec, handler, provenance, None, None, capabilities); + HandlerRegistration::new(spec, kind, provenance, None, None, capabilities); self.store(registration) } - pub fn with(self, registration: HandlerRegistration) -> Self { + pub fn with_leaf_streaming( + self, + spec: OperationSpec, + handler: StreamingHandler, + capabilities: Capabilities, + ) -> Result { + self.with_leaf_streaming_provenance( + spec, + handler, + OperationProvenance::FromOpenAPI, + capabilities, + ) + } + + pub fn with_leaf_streaming_provenance( + self, + spec: OperationSpec, + handler: StreamingHandler, + provenance: OperationProvenance, + capabilities: Capabilities, + ) -> Result { + let kind = Self::wrap_stream(&spec, handler)?; + let registration = + HandlerRegistration::new(spec, kind, provenance, None, None, capabilities); + self.store(registration) + } + + pub fn with(self, registration: HandlerRegistration) -> Result { self.store(registration) } @@ -211,6 +320,14 @@ where Arc::new(move |input, context| Box::pin(f(input, context))) } +pub fn make_streaming_handler(f: S) -> StreamingHandler +where + S: Fn(Value, OperationContext) -> St + Send + Sync + 'static, + St: Stream + Send + 'static, +{ + Arc::new(move |input, context| Box::pin(f(input, context))) +} + #[cfg(test)] mod tests { use super::*; @@ -312,14 +429,16 @@ mod tests { #[tokio::test] async fn register_and_invoke_simple_operation() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("echo", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("echo", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context("req-1", None, None, false, ScopedPeerEnv::empty()); let response = registry .invoke("echo", serde_json::json!({"hi": 1}), ctx) @@ -331,14 +450,16 @@ mod tests { #[tokio::test] async fn internal_op_from_external_call_returns_not_found() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - internal_spec("secret", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + internal_spec("secret", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context("req-2", None, None, false, ScopedPeerEnv::empty()); let response = registry.invoke("secret", serde_json::json!({}), ctx).await; match response.result { @@ -353,14 +474,16 @@ mod tests { #[tokio::test] async fn internal_op_from_internal_call_invokes_handler() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - internal_spec("secret", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + internal_spec("secret", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context("req-3", None, None, true, ScopedPeerEnv::empty()); let response = registry .invoke("secret", serde_json::json!({"x": 2}), ctx) @@ -383,20 +506,22 @@ mod tests { #[tokio::test] async fn acl_sufficient_scopes_allowed() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec( - "admin", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec( + "admin", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context( "req-5", Some(identity_with_scopes("caller", &["admin"])), @@ -411,20 +536,22 @@ mod tests { #[tokio::test] async fn acl_insufficient_scopes_forbidden() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec( - "admin", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec( + "admin", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context( "req-6", Some(identity_with_scopes("caller", &["user"])), @@ -445,20 +572,22 @@ mod tests { #[tokio::test] async fn acl_restricted_op_no_identity_forbidden() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec( - "admin", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec( + "admin", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context("req-7", None, None, false, ScopedPeerEnv::empty()); let response = registry.invoke("admin", serde_json::json!({}), ctx).await; match response.result { @@ -474,20 +603,22 @@ mod tests { async fn internal_call_acl_uses_handler_identity() { let mut registry = OperationRegistry::new(); let composing_authority = CompositionAuthority::new("agent-chat", ["admin".to_string()]); - registry.register(HandlerRegistration::new( - internal_spec( - "secret", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + internal_spec( + "secret", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context( "req-8", Some(identity_with_scopes("user", &["user"])), @@ -506,20 +637,22 @@ mod tests { async fn internal_call_acl_insufficient_handler_identity_forbidden() { let mut registry = OperationRegistry::new(); let weak_authority = CompositionAuthority::new("weak", ["user".to_string()]); - registry.register(HandlerRegistration::new( - internal_spec( - "secret", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + internal_spec( + "secret", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context( "req-9", Some(identity_with_scopes("user", &["admin"])), @@ -541,20 +674,22 @@ mod tests { async fn external_call_acl_uses_caller_identity_not_handler_identity() { let mut registry = OperationRegistry::new(); let handler_authority = CompositionAuthority::new("agent", ["admin".to_string()]); - registry.register(HandlerRegistration::new( - external_spec( - "gate", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - echo_handler(), - OperationProvenance::Local, - Some(handler_authority), - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec( + "gate", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + Some(handler_authority), + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context( "req-10", Some(identity_with_scopes("user", &["user"])), @@ -572,22 +707,26 @@ mod tests { #[tokio::test] async fn list_operations_returns_external_only() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("echo", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - internal_spec("secret", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("echo", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + internal_spec("secret", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ops = registry.list_operations(); assert_eq!(ops.len(), 1); assert_eq!(ops[0].name, "echo"); @@ -596,14 +735,16 @@ mod tests { #[tokio::test] async fn handler_returned_error_passes_through() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("boom", AccessControl::default()), - error_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("boom", AccessControl::default()), + HandlerKind::Once(error_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let ctx = root_context("req-11", None, None, false, ScopedPeerEnv::empty()); let response = registry.invoke("boom", serde_json::json!({}), ctx).await; match response.result { @@ -622,6 +763,7 @@ mod tests { ScopedPeerEnv::empty().into(), Capabilities::new(), ) + .unwrap() .build(); let reg = registry.registration("echo").expect("registered"); assert_eq!(reg.provenance, OperationProvenance::Local); @@ -639,6 +781,7 @@ mod tests { Some(ScopedPeerEnv::new(["fs/readFile"])), Capabilities::new(), ) + .unwrap() .build(); let reg = registry.registration("agent").expect("registered"); assert_eq!(reg.provenance, OperationProvenance::Local); @@ -657,6 +800,7 @@ mod tests { echo_handler(), Capabilities::new(), ) + .unwrap() .build(); let reg = registry.registration("vastai").expect("registered"); assert_eq!(reg.provenance, OperationProvenance::FromOpenAPI); @@ -673,6 +817,7 @@ mod tests { OperationProvenance::FromCall, Capabilities::new(), ) + .unwrap() .build(); let reg = registry.registration("remote").expect("registered"); assert_eq!(reg.provenance, OperationProvenance::FromCall); @@ -684,13 +829,16 @@ mod tests { fn builder_with_takes_full_bundle() { let registration = HandlerRegistration::new( external_spec("agent", AccessControl::default()), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Session, Some(CompositionAuthority::new("sandbox", [])), Some(ScopedPeerEnv::new(["fs/readFile"])), Capabilities::new(), ); - let registry = OperationRegistryBuilder::new().with(registration).build(); + let registry = OperationRegistryBuilder::new() + .with(registration) + .unwrap() + .build(); let reg = registry.registration("agent").expect("registered"); assert_eq!(reg.provenance, OperationProvenance::Session); assert!(reg.composition_authority.is_some()); @@ -717,18 +865,145 @@ mod tests { let authority = CompositionAuthority::new("agent", ["fs:read".to_string()]); let scoped = ScopedPeerEnv::new(["fs/readFile"]); let caps = Capabilities::new().with_api_key("google", "k".to_string()); - registry.register(HandlerRegistration::new( - external_spec("agent", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - Some(authority.clone()), - Some(scoped.clone()), - caps.clone(), - )); + registry + .register(HandlerRegistration::new( + external_spec("agent", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + Some(authority.clone()), + Some(scoped.clone()), + caps.clone(), + )) + .unwrap(); let reg = registry.registration("agent").expect("found"); assert_eq!(reg.spec.name, "agent"); assert_eq!(reg.provenance, OperationProvenance::Local); assert_eq!(reg.composition_authority.as_ref().unwrap().label, "agent"); assert!(reg.scoped_env.as_ref().unwrap().allows("fs/readFile")); } + + fn subscription_spec(name: &str) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Subscription, + Visibility::External, + serde_json::json!({}), + serde_json::json!({}), + vec![], + AccessControl::default(), + ) + } + + fn echo_streaming_handler() -> StreamingHandler { + make_streaming_handler(|input, context| { + futures::stream::iter(vec![ResponseEnvelope::ok(context.request_id, input)]) + }) + } + + #[tokio::test] + async fn invoke_on_stream_kind_returns_invalid_operation_type() { + let mut registry = OperationRegistry::new(); + registry + .register(HandlerRegistration::new( + subscription_spec("events/stream"), + HandlerKind::Stream(echo_streaming_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + let ctx = root_context("req-iot", None, None, false, ScopedPeerEnv::empty()); + let response = registry + .invoke("events/stream", serde_json::json!({}), ctx) + .await; + match response.result { + Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"), + other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"), + } + } + + #[tokio::test] + async fn invoke_on_once_kind_dispatches_normally() { + let mut registry = OperationRegistry::new(); + registry + .register(HandlerRegistration::new( + external_spec("echo", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + let ctx = root_context("req-once", None, None, false, ScopedPeerEnv::empty()); + let response = registry + .invoke("echo", serde_json::json!({"hi": 1}), ctx) + .await; + assert_eq!(response.result, Ok(serde_json::json!({"hi": 1}))); + } + + #[test] + fn register_rejects_once_for_subscription_spec() { + let mut registry = OperationRegistry::new(); + let result = registry.register(HandlerRegistration::new( + subscription_spec("events/stream"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + match result { + Err(msg) => assert!( + msg.contains("Subscription") + && msg.contains("HandlerKind::Stream") + && msg.contains("HandlerKind::Once"), + "unexpected message: {msg}" + ), + other => panic!("expected Err, got {other:?}"), + } + } + + #[test] + fn register_rejects_stream_for_query_spec() { + let mut registry = OperationRegistry::new(); + let result = registry.register(HandlerRegistration::new( + external_spec("echo", AccessControl::default()), + HandlerKind::Stream(echo_streaming_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + match result { + Err(msg) => assert!( + (msg.contains("Query") || msg.contains("Mutation")) + && msg.contains("HandlerKind::Once") + && msg.contains("HandlerKind::Stream"), + "unexpected message: {msg}" + ), + other => panic!("expected Err, got {other:?}"), + } + } + + #[tokio::test] + async fn make_streaming_handler_produces_working_stream() { + use futures::stream::StreamExt; + let handler = echo_streaming_handler(); + let ctx = root_context("req-st", None, None, false, ScopedPeerEnv::empty()); + let mut stream = handler(serde_json::json!({"v": 1}), ctx); + let first = stream.next().await.expect("one envelope"); + assert_eq!(first.result, Ok(serde_json::json!({"v": 1}))); + let second = stream.next().await; + assert!(second.is_none(), "stream ends after one value"); + } + + #[test] + fn call_error_invalid_operation_type_is_not_retryable() { + let err = CallError::invalid_operation_type("bad path"); + assert_eq!(err.code, "INVALID_OPERATION_TYPE"); + assert!(!err.retryable); + assert!(err.details.is_none()); + } } diff --git a/crates/alknet-call/tests/two_node_call.rs b/crates/alknet-call/tests/two_node_call.rs index 827f1cd..23cb0f2 100644 --- a/crates/alknet-call/tests/two_node_call.rs +++ b/crates/alknet-call/tests/two_node_call.rs @@ -15,7 +15,7 @@ use alknet_call::registry::discovery::{ services_list_handler, services_list_spec, services_schema_handler, services_schema_spec, }; use alknet_call::registry::registration::{ - make_handler, Handler, HandlerRegistration, OperationProvenance, OperationRegistry, + make_handler, Handler, HandlerKind, HandlerRegistration, OperationProvenance, OperationRegistry, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::{Identity, IdentityProvider}; @@ -124,58 +124,66 @@ async fn build_raw_quinn_server( /// services/list + services/schema discovery handlers. fn build_server_registry() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("server/echo"), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - external_spec("server/secret"), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new().with_api_key("google", "server-secret".to_string()), - )); + registry + .register(HandlerRegistration::new( + external_spec("server/echo"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + external_spec("server/secret"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new().with_api_key("google", "server-secret".to_string()), + )) + .unwrap(); let discovery_registry = Arc::new(registry); let list_handler = services_list_handler(Arc::clone(&discovery_registry)); let schema_handler = services_schema_handler(Arc::clone(&discovery_registry)); let mut full = OperationRegistry::new(); full.register(HandlerRegistration::new( external_spec("server/echo"), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Local, None, None, Capabilities::new(), - )); + )) + .unwrap(); full.register(HandlerRegistration::new( external_spec("server/secret"), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Local, None, None, Capabilities::new().with_api_key("google", "server-secret".to_string()), - )); + )) + .unwrap(); full.register(HandlerRegistration::new( services_list_spec(), - list_handler, + HandlerKind::Once(list_handler), OperationProvenance::Local, None, None, Capabilities::new(), - )); + )) + .unwrap(); full.register(HandlerRegistration::new( services_schema_spec(), - schema_handler, + HandlerKind::Once(schema_handler), OperationProvenance::Local, None, None, Capabilities::new(), - )); + )) + .unwrap(); Arc::new(full) } @@ -191,14 +199,16 @@ async fn two_node_call_round_trip() { // it as UnknownIssuer since the self-signed cert is not in the platform // root store. let mut client_registry = OperationRegistry::new(); - client_registry.register(HandlerRegistration::new( - external_spec("client/echo"), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + client_registry + .register(HandlerRegistration::new( + external_spec("client/echo"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let client_registry = Arc::new(client_registry); let client = CallClient::new(Arc::clone(&client_registry), Arc::new(NoopIdentityProvider)); diff --git a/crates/alknet-http/src/adapters/from_mcp/mod.rs b/crates/alknet-http/src/adapters/from_mcp/mod.rs index 5a0e43e..2608821 100644 --- a/crates/alknet-http/src/adapters/from_mcp/mod.rs +++ b/crates/alknet-http/src/adapters/from_mcp/mod.rs @@ -12,7 +12,9 @@ use alknet_call::client::{AdapterError, OperationAdapter}; use alknet_call::protocol::wire::{CallError, ResponseEnvelope}; use alknet_call::registry::context::OperationContext; -use alknet_call::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; +use alknet_call::registry::registration::{ + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, +}; use alknet_call::registry::spec::{ AccessControl, ErrorDefinition, OperationSpec, OperationType, Visibility, }; @@ -156,7 +158,7 @@ fn build_registration( HandlerRegistration::new( spec, - handler, + HandlerKind::Once(handler), OperationProvenance::FromMCP, None, None, diff --git a/crates/alknet-http/src/adapters/from_openapi.rs b/crates/alknet-http/src/adapters/from_openapi.rs index a57f2bb..d41c0dd 100644 --- a/crates/alknet-http/src/adapters/from_openapi.rs +++ b/crates/alknet-http/src/adapters/from_openapi.rs @@ -17,7 +17,9 @@ use std::sync::Arc; use alknet_call::client::{AdapterError, OperationAdapter}; use alknet_call::protocol::wire::{CallError, ResponseEnvelope}; use alknet_call::registry::context::OperationContext; -use alknet_call::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; +use alknet_call::registry::registration::{ + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, +}; use alknet_call::registry::spec::{ AccessControl, ErrorDefinition, OperationSpec, OperationType, Visibility, }; @@ -469,7 +471,7 @@ impl FromOpenAPI { let capabilities = Capabilities::new(); Ok(HandlerRegistration::new( spec, - handler, + HandlerKind::Once(handler), OperationProvenance::FromOpenAPI, None, None, @@ -1151,7 +1153,10 @@ mod tests { .unwrap(); let registration = &bundles[0]; let ctx = noop_context("req-10", Capabilities::new()); - let response = (registration.handler)(serde_json::json!({}), ctx).await; + let response = match ®istration.handler { + HandlerKind::Once(h) => h(serde_json::json!({}), ctx).await, + _ => panic!("expected Once handler"), + }; assert_eq!(response.request_id, "req-10"); match response.result { Ok(v) => assert_eq!(v, serde_json::json!({"ok":true})), @@ -1176,7 +1181,10 @@ mod tests { .unwrap(); let registration = &bundles[0]; let ctx = noop_context("req-11", Capabilities::new()); - let response = (registration.handler)(serde_json::json!({}), ctx).await; + let response = match ®istration.handler { + HandlerKind::Once(h) => h(serde_json::json!({}), ctx).await, + _ => panic!("expected Once handler"), + }; match response.result { Err(e) => { assert_eq!(e.code, "HTTP_404"); @@ -1201,7 +1209,10 @@ mod tests { .unwrap(); let registration = &bundles[0]; let ctx = noop_context("req-12", Capabilities::new()); - let response = (registration.handler)(serde_json::json!({}), ctx).await; + let response = match ®istration.handler { + HandlerKind::Once(h) => h(serde_json::json!({}), ctx).await, + _ => panic!("expected Once handler"), + }; assert!(response.result.is_ok()); let last = response.result.unwrap(); assert_eq!(last, serde_json::json!({"n":2})); @@ -1447,11 +1458,16 @@ mod tests { .unwrap(); let registration = &bundles[0]; let ctx = noop_context("req-16", Capabilities::new()); - let response = (registration.handler)( - serde_json::json!({"id":"42","filter":"new","body":{"name":"widget"}}), - ctx, - ) - .await; + let response = match ®istration.handler { + HandlerKind::Once(h) => { + h( + serde_json::json!({"id":"42","filter":"new","body":{"name":"widget"}}), + ctx, + ) + .await + } + _ => panic!("expected Once handler"), + }; assert!( response.result.is_ok(), "expected Ok, got {:?}", @@ -1483,7 +1499,10 @@ mod tests { let registration = &bundles[0]; let caps = Capabilities::new().with_http_token("openai", "sk-test-token".to_string()); let ctx = noop_context("req-17", caps); - let _ = (registration.handler)(serde_json::json!({}), ctx).await; + let _ = match ®istration.handler { + HandlerKind::Once(h) => h(serde_json::json!({}), ctx).await, + _ => panic!("expected Once handler"), + }; let captured = rx.await.unwrap(); assert_eq!( captured.headers.get("authorization").unwrap(), @@ -1519,7 +1538,10 @@ mod tests { .unwrap(); let registration = &bundles[0]; let ctx = noop_context("req-18", Capabilities::new()); - let response = (registration.handler)(serde_json::json!({}), ctx).await; + let response = match ®istration.handler { + HandlerKind::Once(h) => h(serde_json::json!({}), ctx).await, + _ => panic!("expected Once handler"), + }; match response.result { Ok(Value::String(s)) => assert_eq!(s, "hello world"), other => panic!("expected String, got {other:?}"), @@ -1540,7 +1562,10 @@ mod tests { .unwrap(); let registration = &bundles[0]; let ctx = noop_context("req-19", Capabilities::new()); - let response = (registration.handler)(serde_json::json!({}), ctx).await; + let response = match ®istration.handler { + HandlerKind::Once(h) => h(serde_json::json!({}), ctx).await, + _ => panic!("expected Once handler"), + }; match response.result { Err(e) => assert_eq!(e.code, "HTTP_500"), other => panic!("expected HTTP_500, got {other:?}"), diff --git a/crates/alknet-http/src/adapters/to_mcp.rs b/crates/alknet-http/src/adapters/to_mcp.rs index 96a8697..17d26c1 100644 --- a/crates/alknet-http/src/adapters/to_mcp.rs +++ b/crates/alknet-http/src/adapters/to_mcp.rs @@ -432,7 +432,7 @@ mod tests { services_list_handler, services_list_spec, services_schema_handler, services_schema_spec, }; use alknet_call::registry::registration::{ - make_handler, HandlerRegistration, OperationProvenance, OperationRegistry, + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, OperationRegistry, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::{AuthToken, Identity, IdentityProvider}; @@ -502,44 +502,52 @@ mod tests { ) -> Arc { let mut inner = OperationRegistry::new(); for (name, op_type, acl) in specs { - inner.register(HandlerRegistration::new( - external_spec(&name, op_type, acl), - make_echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + inner + .register(HandlerRegistration::new( + external_spec(&name, op_type, acl), + HandlerKind::Once(make_echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); } let inner = Arc::new(inner); let mut dispatch_registry = OperationRegistry::new(); for op in inner.list_operations() { - dispatch_registry.register(HandlerRegistration::new( - external_spec(&op.name, op.op_type, op.access_control.clone()), - make_echo_handler(), + dispatch_registry + .register(HandlerRegistration::new( + external_spec(&op.name, op.op_type, op.access_control.clone()), + HandlerKind::Once(make_echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + } + dispatch_registry + .register(HandlerRegistration::new( + services_list_spec(), + HandlerKind::Once(services_list_handler(Arc::clone(&inner))), OperationProvenance::Local, None, - None, + ScopedPeerEnv::empty().into(), Capabilities::new(), - )); - } - dispatch_registry.register(HandlerRegistration::new( - services_list_spec(), - services_list_handler(Arc::clone(&inner)), - OperationProvenance::Local, - None, - ScopedPeerEnv::empty().into(), - Capabilities::new(), - )); - dispatch_registry.register(HandlerRegistration::new( - services_schema_spec(), - services_schema_handler(Arc::clone(&inner)), - OperationProvenance::Local, - None, - ScopedPeerEnv::empty().into(), - Capabilities::new(), - )); + )) + .unwrap(); + dispatch_registry + .register(HandlerRegistration::new( + services_schema_spec(), + HandlerKind::Once(services_schema_handler(Arc::clone(&inner))), + OperationProvenance::Local, + None, + ScopedPeerEnv::empty().into(), + Capabilities::new(), + )) + .unwrap(); Arc::new(dispatch_registry) } diff --git a/crates/alknet-http/src/adapters/to_openapi.rs b/crates/alknet-http/src/adapters/to_openapi.rs index 753aa81..6c72249 100644 --- a/crates/alknet-http/src/adapters/to_openapi.rs +++ b/crates/alknet-http/src/adapters/to_openapi.rs @@ -528,7 +528,7 @@ mod tests { use super::*; use alknet_call::protocol::wire::ResponseEnvelope; use alknet_call::registry::registration::{ - make_handler, HandlerRegistration, OperationProvenance, + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::types::Capabilities; @@ -539,14 +539,16 @@ mod tests { } fn register(registry: &mut OperationRegistry, spec: OperationSpec) { - registry.register(HandlerRegistration::new( - spec, - noop_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + spec, + HandlerKind::Once(noop_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); } fn external_spec(name: &str, errors: Vec) -> OperationSpec { @@ -1003,22 +1005,24 @@ mod tests { #[test] fn internal_operations_excluded_from_error_projection() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - OperationSpec::new( - "internal/op", - OperationType::Query, - Visibility::Internal, - json!({}), - json!({}), - vec![error("INTERNAL_ERROR", Some(418))], - AccessControl::default(), - ), - noop_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + OperationSpec::new( + "internal/op", + OperationType::Query, + Visibility::Internal, + json!({}), + json!({}), + vec![error("INTERNAL_ERROR", Some(418))], + AccessControl::default(), + ), + HandlerKind::Once(noop_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let spec = to_openapi(®istry); let responses = responses(&spec, PATH_CALL, "post"); assert!( diff --git a/crates/alknet-http/src/gateway/dispatch.rs b/crates/alknet-http/src/gateway/dispatch.rs index 96c638f..6867db6 100644 --- a/crates/alknet-http/src/gateway/dispatch.rs +++ b/crates/alknet-http/src/gateway/dispatch.rs @@ -117,7 +117,7 @@ mod tests { services_list_handler, services_list_spec, services_schema_handler, services_schema_spec, }; use alknet_call::registry::registration::{ - make_handler, HandlerRegistration, OperationProvenance, + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::AuthToken; @@ -187,45 +187,51 @@ mod tests { fn registry_with(name: &str, visibility: Visibility, acl: AccessControl) -> OperationRegistry { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - OperationSpec::new( - name, - OperationType::Query, - visibility, - serde_json::json!({}), - serde_json::json!({}), - vec![], - acl, - ), - make_handler(|input, context| async move { - ResponseEnvelope::ok(context.request_id, input) - }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + OperationSpec::new( + name, + OperationType::Query, + visibility, + serde_json::json!({}), + serde_json::json!({}), + vec![], + acl, + ), + HandlerKind::Once(make_handler(|input, context| async move { + ResponseEnvelope::ok(context.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); registry } fn registry_with_discovery(inner: Arc) -> OperationRegistry { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - services_list_spec(), - services_list_handler(Arc::clone(&inner)), - OperationProvenance::Local, - None, - ScopedPeerEnv::empty().into(), - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - services_schema_spec(), - services_schema_handler(Arc::clone(&inner)), - OperationProvenance::Local, - None, - ScopedPeerEnv::empty().into(), - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + services_list_spec(), + HandlerKind::Once(services_list_handler(Arc::clone(&inner))), + OperationProvenance::Local, + None, + ScopedPeerEnv::empty().into(), + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + services_schema_spec(), + HandlerKind::Once(services_schema_handler(Arc::clone(&inner))), + OperationProvenance::Local, + None, + ScopedPeerEnv::empty().into(), + Capabilities::new(), + )) + .unwrap(); registry } @@ -270,32 +276,36 @@ mod tests { #[tokio::test] async fn invoke_for_services_list_returns_access_control_filtered_list() { let mut inner = OperationRegistry::new(); - inner.register(HandlerRegistration::new( - external_spec("public/echo", AccessControl::default()), - make_handler(|input, context| async move { - ResponseEnvelope::ok(context.request_id, input) - }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - inner.register(HandlerRegistration::new( - external_spec( - "admin/secret", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - make_handler(|input, context| async move { - ResponseEnvelope::ok(context.request_id, input) - }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + inner + .register(HandlerRegistration::new( + external_spec("public/echo", AccessControl::default()), + HandlerKind::Once(make_handler(|input, context| async move { + ResponseEnvelope::ok(context.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + inner + .register(HandlerRegistration::new( + external_spec( + "admin/secret", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(make_handler(|input, context| async move { + ResponseEnvelope::ok(context.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let inner = Arc::new(inner); let discovery = Arc::new(registry_with_discovery(Arc::clone(&inner))); let provider: Arc = Arc::new(StaticIdentityProvider::new()); @@ -327,16 +337,18 @@ mod tests { #[tokio::test] async fn invoke_for_services_schema_returns_spec_for_known_op() { let mut inner = OperationRegistry::new(); - inner.register(HandlerRegistration::new( - external_spec("fs/readFile", AccessControl::default()), - make_handler(|input, context| async move { - ResponseEnvelope::ok(context.request_id, input) - }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + inner + .register(HandlerRegistration::new( + external_spec("fs/readFile", AccessControl::default()), + HandlerKind::Once(make_handler(|input, context| async move { + ResponseEnvelope::ok(context.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let inner = Arc::new(inner); let discovery = Arc::new(registry_with_discovery(Arc::clone(&inner))); let provider: Arc = Arc::new(StaticIdentityProvider::new()); @@ -373,16 +385,18 @@ mod tests { #[tokio::test] async fn invoke_for_internal_op_returns_not_found_not_leaked() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - internal_spec("secret/op", AccessControl::default()), - make_handler(|input, context| async move { - ResponseEnvelope::ok(context.request_id, input) - }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + internal_spec("secret/op", AccessControl::default()), + HandlerKind::Once(make_handler(|input, context| async move { + ResponseEnvelope::ok(context.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let registry = Arc::new(registry); let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = dispatch(registry, provider); @@ -499,16 +513,18 @@ mod tests { let caps = Capabilities::new().with_api_key("google", "k".to_string()); let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("agent/run", AccessControl::default()), - make_handler(|input, context| async move { - ResponseEnvelope::ok(context.request_id, input) - }), - OperationProvenance::Local, - Some(authority), - Some(scoped.clone()), - caps, - )); + registry + .register(HandlerRegistration::new( + external_spec("agent/run", AccessControl::default()), + HandlerKind::Once(make_handler(|input, context| async move { + ResponseEnvelope::ok(context.request_id, input) + })), + OperationProvenance::Local, + Some(authority), + Some(scoped.clone()), + caps, + )) + .unwrap(); let registry = Arc::new(registry); let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = dispatch(registry, provider); diff --git a/crates/alknet-http/src/server/gateway_routes.rs b/crates/alknet-http/src/server/gateway_routes.rs index 4b7b8bc..1fe675d 100644 --- a/crates/alknet-http/src/server/gateway_routes.rs +++ b/crates/alknet-http/src/server/gateway_routes.rs @@ -295,7 +295,7 @@ mod tests { services_list_handler, services_list_spec, services_schema_handler, services_schema_spec, }; use alknet_call::registry::registration::{ - make_handler, HandlerRegistration, OperationProvenance, + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType}; use alknet_core::auth::{AuthToken, Identity}; @@ -376,46 +376,52 @@ mod tests { fn registry_with_echo() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("echo/run", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("echo/run", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } fn registry_with_restricted_op() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec( - "admin/run", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec( + "admin/run", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } fn registry_with_internal_op() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - internal_spec("secret/op"), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + internal_spec("secret/op"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } @@ -424,37 +430,43 @@ mod tests { ) -> Arc { let mut inner = OperationRegistry::new(); for op in inner_ops { - inner.register(op); + inner.register(op).unwrap(); } let inner = Arc::new(inner); let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - services_list_spec(), - services_list_handler(Arc::clone(&inner)), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - services_schema_spec(), - services_schema_handler(Arc::clone(&inner)), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + services_list_spec(), + HandlerKind::Once(services_list_handler(Arc::clone(&inner))), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + services_schema_spec(), + HandlerKind::Once(services_schema_handler(Arc::clone(&inner))), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); for spec in inner.list_operations() { let name = spec.name.clone(); let reg = inner.registration(&name).unwrap(); - registry.register(HandlerRegistration::new( - reg.spec.clone(), - Arc::clone(®.handler), - reg.provenance, - reg.composition_authority.clone(), - reg.scoped_env.clone(), - reg.capabilities.clone(), - )); + registry + .register(HandlerRegistration::new( + reg.spec.clone(), + reg.handler.clone(), + reg.provenance, + reg.composition_authority.clone(), + reg.scoped_env.clone(), + reg.capabilities.clone(), + )) + .unwrap(); } Arc::new(registry) } @@ -572,7 +584,7 @@ mod tests { let ops = vec![ HandlerRegistration::new( external_spec("public/echo", AccessControl::default()), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Local, None, None, @@ -586,7 +598,7 @@ mod tests { ..Default::default() }, ), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Local, None, None, @@ -625,7 +637,7 @@ mod tests { async fn schema_returns_full_spec_for_authorized_op() { let ops = vec![HandlerRegistration::new( external_spec("echo/run", AccessControl::default()), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Local, None, None, @@ -657,7 +669,7 @@ mod tests { ..Default::default() }, ), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Local, None, None, @@ -709,22 +721,26 @@ mod tests { #[tokio::test] async fn batch_internal_op_returns_not_found_in_array() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - internal_spec("secret/op"), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - external_spec("echo/run", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + internal_spec("secret/op"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + external_spec("echo/run", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let registry = Arc::new(registry); let router = build_router(registry, unused_provider()); let req = Request::builder() @@ -823,14 +839,16 @@ mod tests { #[test] fn is_internal_op_detects_registered_internal_op() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - internal_spec("secret/op"), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + internal_spec("secret/op"), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); assert!(is_internal_op(®istry, "secret/op")); assert!(is_internal_op(®istry, "/secret/op")); } @@ -838,14 +856,16 @@ mod tests { #[test] fn is_internal_op_false_for_external_op() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("echo/run", AccessControl::default()), - echo_handler(), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("echo/run", AccessControl::default()), + HandlerKind::Once(echo_handler()), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); assert!(!is_internal_op(®istry, "echo/run")); } @@ -906,7 +926,7 @@ mod tests { let ops = vec![ HandlerRegistration::new( external_spec("public/echo", AccessControl::default()), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Local, None, None, @@ -920,7 +940,7 @@ mod tests { ..Default::default() }, ), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Local, None, None, @@ -953,7 +973,7 @@ mod tests { async fn schema_unknown_op_returns_404() { let ops = vec![HandlerRegistration::new( external_spec("echo/run", AccessControl::default()), - echo_handler(), + HandlerKind::Once(echo_handler()), OperationProvenance::Local, None, None, diff --git a/crates/alknet-http/src/websocket/mod.rs b/crates/alknet-http/src/websocket/mod.rs index d709147..90ea24b 100644 --- a/crates/alknet-http/src/websocket/mod.rs +++ b/crates/alknet-http/src/websocket/mod.rs @@ -18,7 +18,7 @@ mod tests { use alknet_call::protocol::wire::{EventEnvelope, ResponseEnvelope, EVENT_RESPONDED}; use alknet_call::registry::context::AbortPolicy; use alknet_call::registry::registration::{ - make_handler, HandlerRegistration, OperationProvenance, + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::{Identity, IdentityProvider}; @@ -77,14 +77,18 @@ mod tests { fn echo_registry() -> Arc { let mut registry = alknet_call::registry::registration::OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("echo/run"), - make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("echo/run"), + HandlerKind::Once(make_handler(|input, ctx| async move { + ResponseEnvelope::ok(ctx.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } @@ -174,7 +178,9 @@ mod tests { assert!(!env.contains("worker/exec")); conn.register_imported(HandlerRegistration::new( external_spec("worker/exec"), - make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), + HandlerKind::Once(make_handler(|input, ctx| async move { + ResponseEnvelope::ok(ctx.request_id, input) + })), OperationProvenance::FromCall, None, None, diff --git a/crates/alknet-http/src/websocket/overlay.rs b/crates/alknet-http/src/websocket/overlay.rs index 354562d..cc86d6f 100644 --- a/crates/alknet-http/src/websocket/overlay.rs +++ b/crates/alknet-http/src/websocket/overlay.rs @@ -30,7 +30,7 @@ mod tests { }; use alknet_call::registry::env::{OperationEnv, PeerRef}; use alknet_call::registry::registration::{ - make_handler, HandlerRegistration, OperationProvenance, OperationRegistry, + make_handler, HandlerKind, HandlerRegistration, OperationProvenance, OperationRegistry, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::{Identity, IdentityProvider}; @@ -113,7 +113,9 @@ mod tests { ) -> HandlerRegistration { HandlerRegistration::new( external_spec(name, acl), - make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), + HandlerKind::Once(make_handler(|input, ctx| async move { + ResponseEnvelope::ok(ctx.request_id, input) + })), OperationProvenance::FromCall, composition_authority, None, @@ -123,14 +125,18 @@ mod tests { fn echo_registry() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("echo/run", AccessControl::default()), - make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("echo/run", AccessControl::default()), + HandlerKind::Once(make_handler(|input, ctx| async move { + ResponseEnvelope::ok(ctx.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } @@ -454,9 +460,9 @@ mod tests { conn.register_imported(HandlerRegistration::new( external_spec("ui/dragged", AccessControl::default()), - make_handler(|input, ctx| async move { + HandlerKind::Once(make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, serde_json::json!({ "echoed": input })) - }), + })), OperationProvenance::FromCall, None, None, @@ -654,7 +660,7 @@ mod tests { }; conn.register_imported(HandlerRegistration::new( subscription_spec("events/stream"), - handler, + HandlerKind::Once(handler), OperationProvenance::FromCall, None, None, diff --git a/crates/alknet-http/src/websocket/upgrade.rs b/crates/alknet-http/src/websocket/upgrade.rs index cef07bf..d8c6c97 100644 --- a/crates/alknet-http/src/websocket/upgrade.rs +++ b/crates/alknet-http/src/websocket/upgrade.rs @@ -249,7 +249,7 @@ mod tests { }; use alknet_call::registry::env::OperationEnv; use alknet_call::registry::registration::{ - make_handler, HandlerRegistration, OperationProvenance, + make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, OperationProvenance, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::{AuthToken, Identity}; @@ -330,77 +330,92 @@ mod tests { fn echo_registry() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec("echo/run", AccessControl::default()), - make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec("echo/run", AccessControl::default()), + HandlerKind::Once(make_handler(|input, ctx| async move { + ResponseEnvelope::ok(ctx.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } fn registry_with_restricted_op() -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - external_spec( - "admin/run", - AccessControl { - required_scopes: vec!["admin".to_string()], - ..Default::default() - }, - ), - make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + external_spec( + "admin/run", + AccessControl { + required_scopes: vec!["admin".to_string()], + ..Default::default() + }, + ), + HandlerKind::Once(make_handler(|input, ctx| async move { + ResponseEnvelope::ok(ctx.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } fn registry_with_subscription() -> Arc { let mut registry = OperationRegistry::new(); let count = Arc::new(StdMutex::new(0u32)); - let handler = make_handler(move |_input, ctx| { + let handler = make_streaming_handler(move |_input, ctx| { let counter = Arc::clone(&count); - async move { - let mut c = counter.lock().unwrap(); - *c += 1; - let value = *c; - ResponseEnvelope::ok(ctx.request_id, serde_json::json!({ "n": value })) - } + let mut c = counter.lock().unwrap(); + *c += 1; + let value = *c; + futures::stream::iter(vec![ResponseEnvelope::ok( + ctx.request_id, + serde_json::json!({ "n": value }), + )]) }); - registry.register(HandlerRegistration::new( - subscription_spec("events/stream"), - handler, - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + subscription_spec("events/stream"), + HandlerKind::Stream(handler), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } fn registry_with_discovery(inner: Arc) -> Arc { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - services_list_spec(), - services_list_handler(Arc::clone(&inner)), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); - registry.register(HandlerRegistration::new( - services_schema_spec(), - services_schema_handler(Arc::clone(&inner)), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + services_list_spec(), + HandlerKind::Once(services_list_handler(Arc::clone(&inner))), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); + registry + .register(HandlerRegistration::new( + services_schema_spec(), + HandlerKind::Once(services_schema_handler(Arc::clone(&inner))), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); Arc::new(registry) } @@ -543,22 +558,26 @@ mod tests { #[tokio::test] async fn handle_inbound_envelope_internal_op_yields_not_found() { let mut registry = OperationRegistry::new(); - registry.register(HandlerRegistration::new( - OperationSpec::new( - "secret/op", - OperationType::Query, - Visibility::Internal, - serde_json::json!({}), - serde_json::json!({}), - vec![], - AccessControl::default(), - ), - make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), - OperationProvenance::Local, - None, - None, - Capabilities::new(), - )); + registry + .register(HandlerRegistration::new( + OperationSpec::new( + "secret/op", + OperationType::Query, + Visibility::Internal, + serde_json::json!({}), + serde_json::json!({}), + vec![], + AccessControl::default(), + ), + HandlerKind::Once(make_handler(|input, ctx| async move { + ResponseEnvelope::ok(ctx.request_id, input) + })), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )) + .unwrap(); let registry = Arc::new(registry); let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = dispatcher(registry, provider); @@ -753,19 +772,18 @@ mod tests { let dp = dispatcher(registry, provider); let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); - let mut received = Vec::new(); - for i in 0..3 { - let request = EventEnvelope::requested( - format!("sub-{i}"), - serde_json::json!({ "operationId": "/events/stream", "input": {} }), - ); - let out = handle_inbound_envelope(&dp, &conn, request) - .await - .expect("response"); - assert_eq!(out.r#type, EVENT_RESPONDED); - received.push(out.id); - } - assert_eq!(received.len(), 3); + let request = EventEnvelope::requested( + "sub-0", + serde_json::json!({ "operationId": "/events/stream", "input": {} }), + ); + let out = handle_inbound_envelope(&dp, &conn, request) + .await + .expect("response"); + assert_eq!(out.r#type, EVENT_ERROR); + assert_eq!( + out.payload.get("code"), + Some(&serde_json::json!("INVALID_OPERATION_TYPE")) + ); } #[tokio::test] @@ -868,7 +886,9 @@ mod tests { conn.register_imported(HandlerRegistration::new( external_spec("ui/dragged", AccessControl::default()), - make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), + HandlerKind::Once(make_handler(|input, ctx| async move { + ResponseEnvelope::ok(ctx.request_id, input) + })), OperationProvenance::FromCall, None, None, @@ -1044,28 +1064,27 @@ mod tests { drive_ws_session(socket, &dp, &conn).await; }); - let mut got = Vec::new(); - for i in 0..3 { - let request = EventEnvelope::requested( - format!("sub-ws-{i}"), - serde_json::json!({ "operationId": "/events/stream", "input": {} }), - ); - client - .send_binary(serialize_envelope(&request).unwrap()) - .await; + let request = EventEnvelope::requested( + "sub-ws-0", + serde_json::json!({ "operationId": "/events/stream", "input": {} }), + ); + client + .send_binary(serialize_envelope(&request).unwrap()) + .await; - let msg = client.recv_timeout(Duration::from_secs(5)).await; - match msg { - MockMsg::Binary(bytes) => { - let env: EventEnvelope = serde_json::from_slice(&bytes).unwrap(); - assert_eq!(env.id, format!("sub-ws-{i}")); - assert_eq!(env.r#type, EVENT_RESPONDED); - got.push(env.id); - } - other => panic!("expected binary, got {other:?}"), + let msg = client.recv_timeout(Duration::from_secs(5)).await; + match msg { + MockMsg::Binary(bytes) => { + let env: EventEnvelope = serde_json::from_slice(&bytes).unwrap(); + assert_eq!(env.id, "sub-ws-0"); + assert_eq!(env.r#type, EVENT_ERROR); + assert_eq!( + env.payload.get("code"), + Some(&serde_json::json!("INVALID_OPERATION_TYPE")) + ); } + other => panic!("expected binary, got {other:?}"), } - assert_eq!(got.len(), 3); client.close().await; server_handle.await.ok();