Compare commits
1 Commits
62bebe5122
...
feat/http/
| Author | SHA1 | Date | |
|---|---|---|---|
| 4ac8d308e6 |
@@ -26,6 +26,7 @@ use alknet_call::registry::env::LocalOperationEnv;
|
|||||||
use alknet_call::registry::registration::OperationRegistry;
|
use alknet_call::registry::registration::OperationRegistry;
|
||||||
use alknet_core::auth::{AuthToken, Identity, IdentityProvider};
|
use alknet_core::auth::{AuthToken, Identity, IdentityProvider};
|
||||||
use alknet_core::types::Capabilities;
|
use alknet_core::types::Capabilities;
|
||||||
|
use futures::stream::BoxStream;
|
||||||
use serde_json::Value;
|
use serde_json::Value;
|
||||||
|
|
||||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
@@ -70,11 +71,43 @@ impl GatewayDispatch {
|
|||||||
self.registry.invoke(&operation_name, input, context).await
|
self.registry.invoke(&operation_name, input, context).await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn invoke_streaming(
|
||||||
|
&self,
|
||||||
|
identity: Option<Identity>,
|
||||||
|
op: &str,
|
||||||
|
input: Value,
|
||||||
|
) -> BoxStream<'static, ResponseEnvelope> {
|
||||||
|
let operation_name = strip_leading_slash(op).to_string();
|
||||||
|
let request_id = uuid::Uuid::new_v4().to_string();
|
||||||
|
let context = self.build_root_context_streaming(&request_id, &operation_name, identity);
|
||||||
|
self.registry
|
||||||
|
.invoke_streaming(&operation_name, input, context)
|
||||||
|
}
|
||||||
|
|
||||||
fn build_root_context(
|
fn build_root_context(
|
||||||
&self,
|
&self,
|
||||||
request_id: &str,
|
request_id: &str,
|
||||||
operation_name: &str,
|
operation_name: &str,
|
||||||
identity: Option<Identity>,
|
identity: Option<Identity>,
|
||||||
|
) -> OperationContext {
|
||||||
|
self.build_root_context_inner(request_id, operation_name, identity, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_root_context_streaming(
|
||||||
|
&self,
|
||||||
|
request_id: &str,
|
||||||
|
operation_name: &str,
|
||||||
|
identity: Option<Identity>,
|
||||||
|
) -> OperationContext {
|
||||||
|
self.build_root_context_inner(request_id, operation_name, identity, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_root_context_inner(
|
||||||
|
&self,
|
||||||
|
request_id: &str,
|
||||||
|
operation_name: &str,
|
||||||
|
identity: Option<Identity>,
|
||||||
|
bounded: bool,
|
||||||
) -> OperationContext {
|
) -> OperationContext {
|
||||||
let registration = self.registry.registration(operation_name);
|
let registration = self.registry.registration(operation_name);
|
||||||
let (composition_authority, capabilities, scoped_env) = match registration {
|
let (composition_authority, capabilities, scoped_env) = match registration {
|
||||||
@@ -97,7 +130,7 @@ impl GatewayDispatch {
|
|||||||
forwarded_for: None,
|
forwarded_for: None,
|
||||||
capabilities,
|
capabilities,
|
||||||
metadata: HashMap::new(),
|
metadata: HashMap::new(),
|
||||||
deadline: Some(Instant::now() + DEFAULT_TIMEOUT),
|
deadline: bounded.then(|| Instant::now() + DEFAULT_TIMEOUT),
|
||||||
scoped_env,
|
scoped_env,
|
||||||
env,
|
env,
|
||||||
abort_policy: AbortPolicy::default(),
|
abort_policy: AbortPolicy::default(),
|
||||||
@@ -117,10 +150,11 @@ mod tests {
|
|||||||
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
|
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
|
||||||
};
|
};
|
||||||
use alknet_call::registry::registration::{
|
use alknet_call::registry::registration::{
|
||||||
make_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
make_handler, make_streaming_handler, HandlerKind, HandlerRegistration, OperationProvenance,
|
||||||
};
|
};
|
||||||
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
|
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
|
||||||
use alknet_core::auth::AuthToken;
|
use alknet_core::auth::AuthToken;
|
||||||
|
use futures::stream::StreamExt;
|
||||||
use std::sync::Mutex as StdMutex;
|
use std::sync::Mutex as StdMutex;
|
||||||
|
|
||||||
struct StaticIdentityProvider {
|
struct StaticIdentityProvider {
|
||||||
@@ -235,6 +269,53 @@ mod tests {
|
|||||||
registry
|
registry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn subscription_spec(name: &str, visibility: Visibility, acl: AccessControl) -> OperationSpec {
|
||||||
|
OperationSpec::new(
|
||||||
|
name,
|
||||||
|
OperationType::Subscription,
|
||||||
|
visibility,
|
||||||
|
serde_json::json!({}),
|
||||||
|
serde_json::json!({}),
|
||||||
|
vec![],
|
||||||
|
acl,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn echo_streaming_handler() -> HandlerKind {
|
||||||
|
HandlerKind::Stream(make_streaming_handler(|input, context| {
|
||||||
|
futures::stream::iter(vec![ResponseEnvelope::ok(context.request_id, input)])
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn registry_with_subscription(
|
||||||
|
name: &str,
|
||||||
|
visibility: Visibility,
|
||||||
|
acl: AccessControl,
|
||||||
|
) -> OperationRegistry {
|
||||||
|
let mut registry = OperationRegistry::new();
|
||||||
|
registry
|
||||||
|
.register(HandlerRegistration::new(
|
||||||
|
subscription_spec(name, visibility, acl),
|
||||||
|
echo_streaming_handler(),
|
||||||
|
OperationProvenance::Local,
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
Capabilities::new(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
registry
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn collect_stream(
|
||||||
|
mut stream: BoxStream<'static, ResponseEnvelope>,
|
||||||
|
) -> Vec<ResponseEnvelope> {
|
||||||
|
let mut out = Vec::new();
|
||||||
|
while let Some(env) = stream.next().await {
|
||||||
|
out.push(env);
|
||||||
|
}
|
||||||
|
out
|
||||||
|
}
|
||||||
|
|
||||||
fn dispatch(
|
fn dispatch(
|
||||||
registry: Arc<OperationRegistry>,
|
registry: Arc<OperationRegistry>,
|
||||||
provider: Arc<dyn IdentityProvider>,
|
provider: Arc<dyn IdentityProvider>,
|
||||||
@@ -548,4 +629,195 @@ mod tests {
|
|||||||
fn assert_concrete<T: Sized>() {}
|
fn assert_concrete<T: Sized>() {}
|
||||||
assert_concrete::<GatewayDispatch>();
|
assert_concrete::<GatewayDispatch>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_on_subscription_returns_handler_stream() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"events/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "events/stream", serde_json::json!({ "v": 7 }));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
assert_eq!(items[0].result, Ok(serde_json::json!({ "v": 7 })));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_strips_leading_slash_from_operation_name() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"events/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "/events/stream", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
assert!(items[0].result.is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_on_unknown_op_yields_single_not_found() {
|
||||||
|
let registry = Arc::new(OperationRegistry::new());
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "no/such", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
match &items[0].result {
|
||||||
|
Err(e) => {
|
||||||
|
assert_eq!(e.code, "NOT_FOUND");
|
||||||
|
assert!(e.message.contains("no/such"));
|
||||||
|
}
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_on_internal_op_from_external_yields_not_found() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"secret/stream",
|
||||||
|
Visibility::Internal,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "secret/stream", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
match &items[0].result {
|
||||||
|
Err(e) => {
|
||||||
|
assert_eq!(e.code, "NOT_FOUND");
|
||||||
|
assert!(e.message.contains("secret/stream"));
|
||||||
|
}
|
||||||
|
other => panic!("expected NOT_FOUND, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_with_none_identity_and_restricted_op_yields_forbidden() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"admin/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl {
|
||||||
|
required_scopes: vec!["admin".to_string()],
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "admin/stream", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
match &items[0].result {
|
||||||
|
Err(e) => {
|
||||||
|
assert_eq!(e.code, "FORBIDDEN");
|
||||||
|
assert_eq!(e.message, "authentication required");
|
||||||
|
}
|
||||||
|
other => panic!("expected FORBIDDEN, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_streaming_on_query_op_yields_invalid_operation_type() {
|
||||||
|
let registry = Arc::new(registry_with(
|
||||||
|
"echo/run",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let stream = dp.invoke_streaming(None, "echo/run", serde_json::json!({}));
|
||||||
|
let items = collect_stream(stream).await;
|
||||||
|
assert_eq!(items.len(), 1);
|
||||||
|
match &items[0].result {
|
||||||
|
Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"),
|
||||||
|
other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn invoke_on_subscription_op_returns_invalid_operation_type() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"events/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let response = dp
|
||||||
|
.invoke(None, "events/stream", serde_json::json!({}))
|
||||||
|
.await;
|
||||||
|
match response.result {
|
||||||
|
Err(e) => assert_eq!(e.code, "INVALID_OPERATION_TYPE"),
|
||||||
|
other => panic!("expected INVALID_OPERATION_TYPE, got {other:?}"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_root_context_streaming_sets_deadline_none() {
|
||||||
|
let registry = Arc::new(registry_with_subscription(
|
||||||
|
"events/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
));
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let ctx = dp.build_root_context_streaming("req-st-1", "events/stream", None);
|
||||||
|
assert!(!ctx.internal, "internal must be false for wire-ingress");
|
||||||
|
assert!(ctx.forwarded_for.is_none(), "forwarded_for must be None");
|
||||||
|
assert!(ctx.parent_request_id.is_none(), "root has no parent");
|
||||||
|
assert!(
|
||||||
|
ctx.deadline.is_none(),
|
||||||
|
"deadline must be None for streaming"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn build_root_context_streaming_carries_registration_bundle_fields() {
|
||||||
|
let authority = alknet_call::registry::context::CompositionAuthority::new(
|
||||||
|
"agent",
|
||||||
|
["fs:read".to_string()],
|
||||||
|
);
|
||||||
|
let scoped = ScopedPeerEnv::new(["fs/readFile"]);
|
||||||
|
let caps = Capabilities::new().with_api_key("google", "k".to_string());
|
||||||
|
|
||||||
|
let mut registry = OperationRegistry::new();
|
||||||
|
registry
|
||||||
|
.register(HandlerRegistration::new(
|
||||||
|
subscription_spec(
|
||||||
|
"agent/stream",
|
||||||
|
Visibility::External,
|
||||||
|
AccessControl::default(),
|
||||||
|
),
|
||||||
|
echo_streaming_handler(),
|
||||||
|
OperationProvenance::Local,
|
||||||
|
Some(authority),
|
||||||
|
Some(scoped.clone()),
|
||||||
|
caps,
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
let registry = Arc::new(registry);
|
||||||
|
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
|
||||||
|
let dp = dispatch(registry, provider);
|
||||||
|
|
||||||
|
let ctx = dp.build_root_context_streaming("req-st-2", "agent/stream", None);
|
||||||
|
assert!(ctx.handler_identity.is_some());
|
||||||
|
assert_eq!(ctx.handler_identity.as_ref().unwrap().label, "agent");
|
||||||
|
assert!(ctx.scoped_env.allows("fs/readFile"));
|
||||||
|
assert!(ctx.capabilities.get("google").is_some());
|
||||||
|
assert!(ctx.deadline.is_none());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user