3 Commits

Author SHA1 Message Date
9c959cd863 docs(http): mark http/gateway/gateway-dispatch-spine completed 2026-07-01 17:07:53 +00:00
117085de4e Merge feat/http-gateway-dispatch-spine: GatewayDispatch shared dispatch spine
Concrete struct (not a trait) holding Arc<OperationRegistry> + Arc<dyn IdentityProvider>
with resolve_bearer() and invoke() returning ResponseEnvelope. Builds root
OperationContext for wire-ingress (internal:false, forwarded_for:None, fresh
request_id, 30s deadline). 14 unit tests covering dispatch, AccessControl
filtering, NOT_FOUND for unregistered/Internal ops, FORBIDDEN for unauthorized.
2026-07-01 17:06:22 +00:00
a4ce2c8173 feat(http): implement GatewayDispatch shared dispatch spine
Thin concrete struct (not a trait) holding Arc<OperationRegistry> +
Arc<dyn IdentityProvider>. Exposes resolve_bearer() (delegates to
identity_provider.resolve_from_token) and invoke() which builds a root
OperationContext for wire-ingress (internal: false, forwarded_for: None,
fresh UUID v4 request_id, deadline now+30s) carrying the registration
bundle's composition_authority/capabilities/scoped_env, then calls
OperationRegistry::invoke. Dispatches services/list and services/schema
unchanged (registered ops); AccessControl filtering in services/list
sees the caller's resolved identity. Re-exported from lib.rs.

Duplicates Dispatcher::build_root_context construction (the alknet-call
version is pub(crate) and tangled with CallConnection peer/session
overlays); the invariants (internal: false, forwarded_for: None) are
the load-bearing part and identical to the wire-ingress path.
2026-07-01 17:05:10 +00:00
4 changed files with 564 additions and 3 deletions

View File

@@ -0,0 +1,548 @@
//! Shared dispatch spine for the `to_openapi` / `to_mcp` gateway projections.
//!
//! Thin concrete struct (not a trait — research §5.2 rules out a trait with an
//! associated output type). Holds `Arc<OperationRegistry>` +
//! `Arc<dyn IdentityProvider>` and exposes a `resolve_bearer()` + `invoke()`
//! method pair returning the neutral `ResponseEnvelope`. Each gateway maps the
//! envelope to its own wire shape (`to_openapi` → HTTP `Response`, `to_mcp` →
//! `CallToolResult`).
//!
//! # Security invariants
//!
//! - `internal: false` — ACL runs against the caller's `identity`, not a
//! handler's composition authority (ADR-015).
//! - `forwarded_for: None` — wire-ingress only (ADR-032).
//! - The root `OperationContext` is constructed identically for both
//! gateways, making them provably identical on the security axis (auth,
//! authority, ACL); they diverge only on wire-framing.
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use alknet_call::protocol::wire::ResponseEnvelope;
use alknet_call::registry::context::{AbortPolicy, OperationContext, ScopedPeerEnv};
use alknet_call::registry::env::LocalOperationEnv;
use alknet_call::registry::registration::OperationRegistry;
use alknet_core::auth::{AuthToken, Identity, IdentityProvider};
use alknet_core::types::Capabilities;
use serde_json::Value;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
pub struct GatewayDispatch {
registry: Arc<OperationRegistry>,
identity_provider: Arc<dyn IdentityProvider>,
}
impl GatewayDispatch {
pub fn new(
registry: Arc<OperationRegistry>,
identity_provider: Arc<dyn IdentityProvider>,
) -> Self {
Self {
registry,
identity_provider,
}
}
pub fn registry(&self) -> &Arc<OperationRegistry> {
&self.registry
}
pub fn identity_provider(&self) -> &Arc<dyn IdentityProvider> {
&self.identity_provider
}
pub fn resolve_bearer(&self, token: &AuthToken) -> Option<Identity> {
self.identity_provider.resolve_from_token(token)
}
pub async fn invoke(
&self,
identity: Option<Identity>,
op: &str,
input: Value,
) -> ResponseEnvelope {
let operation_name = strip_leading_slash(op).to_string();
let request_id = uuid::Uuid::new_v4().to_string();
let context = self.build_root_context(&request_id, &operation_name, identity);
self.registry.invoke(&operation_name, input, context).await
}
fn build_root_context(
&self,
request_id: &str,
operation_name: &str,
identity: Option<Identity>,
) -> OperationContext {
let registration = self.registry.registration(operation_name);
let (composition_authority, capabilities, scoped_env) = match registration {
Some(r) => (
r.composition_authority.clone(),
r.capabilities.clone(),
r.scoped_env.clone().unwrap_or_else(ScopedPeerEnv::empty),
),
None => (
None,
Capabilities::new(),
ScopedPeerEnv::empty(),
),
};
let env: Arc<dyn alknet_call::registry::env::OperationEnv + Send + Sync> =
Arc::new(LocalOperationEnv::new(Arc::clone(&self.registry)));
OperationContext {
request_id: request_id.to_string(),
parent_request_id: None,
identity,
handler_identity: composition_authority,
forwarded_for: None,
capabilities,
metadata: HashMap::new(),
deadline: Some(Instant::now() + DEFAULT_TIMEOUT),
scoped_env,
env,
abort_policy: AbortPolicy::default(),
internal: false,
}
}
}
fn strip_leading_slash(operation_id: &str) -> &str {
operation_id.strip_prefix('/').unwrap_or(operation_id)
}
#[cfg(test)]
mod tests {
use super::*;
use alknet_call::registry::discovery::{
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
};
use alknet_call::registry::registration::{
make_handler, HandlerRegistration, OperationProvenance,
};
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility};
use alknet_core::auth::AuthToken;
use std::sync::Mutex as StdMutex;
struct StaticIdentityProvider {
tokens: StdMutex<HashMap<String, Identity>>,
}
impl StaticIdentityProvider {
fn new() -> Self {
Self {
tokens: StdMutex::new(HashMap::new()),
}
}
fn with_token(self, token: &str, identity: Identity) -> Self {
self.tokens
.lock()
.unwrap()
.insert(token.to_string(), identity);
self
}
}
impl IdentityProvider for StaticIdentityProvider {
fn resolve_from_fingerprint(&self, _fp: &str) -> Option<Identity> {
None
}
fn resolve_from_token(&self, token: &AuthToken) -> Option<Identity> {
let token_str = String::from_utf8_lossy(&token.raw);
self.tokens.lock().unwrap().get(token_str.as_ref()).cloned()
}
}
fn identity_with_scopes(id: &str, scopes: &[&str]) -> Identity {
Identity {
id: id.to_string(),
scopes: scopes.iter().map(|s| s.to_string()).collect(),
resources: HashMap::new(),
}
}
fn external_spec(name: &str, acl: AccessControl) -> OperationSpec {
OperationSpec::new(
name,
OperationType::Query,
Visibility::External,
serde_json::json!({}),
serde_json::json!({}),
vec![],
acl,
)
}
fn internal_spec(name: &str, acl: AccessControl) -> OperationSpec {
OperationSpec::new(
name,
OperationType::Query,
Visibility::Internal,
serde_json::json!({}),
serde_json::json!({}),
vec![],
acl,
)
}
fn registry_with(name: &str, visibility: Visibility, acl: AccessControl) -> OperationRegistry {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
OperationSpec::new(
name,
OperationType::Query,
visibility,
serde_json::json!({}),
serde_json::json!({}),
vec![],
acl,
),
make_handler(|input, context| async move {
ResponseEnvelope::ok(context.request_id, input)
}),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
registry
}
fn registry_with_discovery(inner: Arc<OperationRegistry>) -> OperationRegistry {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
services_list_spec(),
services_list_handler(Arc::clone(&inner)),
OperationProvenance::Local,
None,
ScopedPeerEnv::empty().into(),
Capabilities::new(),
));
registry.register(HandlerRegistration::new(
services_schema_spec(),
services_schema_handler(Arc::clone(&inner)),
OperationProvenance::Local,
None,
ScopedPeerEnv::empty().into(),
Capabilities::new(),
));
registry
}
fn dispatch(
registry: Arc<OperationRegistry>,
provider: Arc<dyn IdentityProvider>,
) -> GatewayDispatch {
GatewayDispatch::new(registry, provider)
}
#[tokio::test]
async fn invoke_dispatches_registered_op_and_returns_response_envelope() {
let registry = Arc::new(registry_with(
"echo/run",
Visibility::External,
AccessControl::default(),
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let response = dp
.invoke(None, "echo/run", serde_json::json!({ "msg": "hi" }))
.await;
assert!(response.result.is_ok());
assert_eq!(
response.result.unwrap(),
serde_json::json!({ "msg": "hi" })
);
}
#[tokio::test]
async fn invoke_strips_leading_slash_from_operation_name() {
let registry = Arc::new(registry_with(
"echo/run",
Visibility::External,
AccessControl::default(),
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let response = dp
.invoke(None, "/echo/run", serde_json::json!({}))
.await;
assert!(response.result.is_ok());
}
#[tokio::test]
async fn invoke_for_services_list_returns_access_control_filtered_list() {
let mut inner = OperationRegistry::new();
inner.register(HandlerRegistration::new(
external_spec("public/echo", AccessControl::default()),
make_handler(|input, context| async move {
ResponseEnvelope::ok(context.request_id, input)
}),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
inner.register(HandlerRegistration::new(
external_spec(
"admin/secret",
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
),
make_handler(|input, context| async move {
ResponseEnvelope::ok(context.request_id, input)
}),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
let inner = Arc::new(inner);
let discovery = Arc::new(registry_with_discovery(Arc::clone(&inner)));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(discovery, provider);
let response = dp
.invoke(
Some(identity_with_scopes("regular-peer", &["user"])),
"services/list",
serde_json::json!({}),
)
.await;
let output = response.result.expect("list ok");
let ops = output
.get("operations")
.and_then(|v| v.as_array())
.expect("operations array");
let names: Vec<&str> = ops
.iter()
.filter_map(|o| o.get("name").and_then(|n| n.as_str()))
.collect();
assert!(names.contains(&"public/echo"));
assert!(
!names.contains(&"admin/secret"),
"unauthorized peer must not see admin op via services/list"
);
}
#[tokio::test]
async fn invoke_for_services_schema_returns_spec_for_known_op() {
let mut inner = OperationRegistry::new();
inner.register(HandlerRegistration::new(
external_spec("fs/readFile", AccessControl::default()),
make_handler(|input, context| async move {
ResponseEnvelope::ok(context.request_id, input)
}),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
let inner = Arc::new(inner);
let discovery = Arc::new(registry_with_discovery(Arc::clone(&inner)));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(discovery, provider);
let response = dp
.invoke(
None,
"services/schema",
serde_json::json!({ "name": "fs/readFile" }),
)
.await;
let spec = response.result.expect("schema ok");
assert_eq!(spec.get("name"), Some(&serde_json::json!("fs/readFile")));
assert_eq!(spec.get("namespace"), Some(&serde_json::json!("fs")));
}
#[tokio::test]
async fn invoke_for_unregistered_op_returns_not_found() {
let registry = Arc::new(OperationRegistry::new());
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let response = dp
.invoke(None, "no/such", serde_json::json!({}))
.await;
match response.result {
Err(e) => {
assert_eq!(e.code, "NOT_FOUND");
assert!(e.message.contains("no/such"));
}
other => panic!("expected NOT_FOUND, got {other:?}"),
}
}
#[tokio::test]
async fn invoke_for_internal_op_returns_not_found_not_leaked() {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
internal_spec("secret/op", AccessControl::default()),
make_handler(|input, context| async move {
ResponseEnvelope::ok(context.request_id, input)
}),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
let registry = Arc::new(registry);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let response = dp
.invoke(None, "secret/op", serde_json::json!({}))
.await;
match response.result {
Err(e) => {
assert_eq!(e.code, "NOT_FOUND");
assert!(e.message.contains("secret/op"));
}
other => panic!("expected NOT_FOUND, got {other:?}"),
}
}
#[tokio::test]
async fn invoke_with_none_identity_and_restricted_op_returns_forbidden() {
let registry = Arc::new(registry_with(
"admin/run",
Visibility::External,
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let response = dp
.invoke(None, "admin/run", serde_json::json!({}))
.await;
match response.result {
Err(e) => {
assert_eq!(e.code, "FORBIDDEN");
assert_eq!(e.message, "authentication required");
}
other => panic!("expected FORBIDDEN, got {other:?}"),
}
}
#[tokio::test]
async fn invoke_with_authorized_identity_dispatches_restricted_op() {
let registry = Arc::new(registry_with(
"admin/run",
Visibility::External,
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
));
let provider: Arc<dyn IdentityProvider> = Arc::new(
StaticIdentityProvider::new()
.with_token("alk_admin", identity_with_scopes("admin-peer", &["admin"])),
);
let dp = dispatch(registry, provider);
let token = AuthToken {
raw: b"alk_admin".to_vec(),
};
let identity = dp.resolve_bearer(&token).expect("token resolves");
let response = dp
.invoke(Some(identity), "admin/run", serde_json::json!({ "ok": 1 }))
.await;
assert!(response.result.is_ok());
assert_eq!(response.result.unwrap(), serde_json::json!({ "ok": 1 }));
}
#[tokio::test]
async fn resolve_bearer_returns_none_for_unknown_token() {
let registry = Arc::new(OperationRegistry::new());
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let token = AuthToken {
raw: b"alk_unknown".to_vec(),
};
assert!(dp.resolve_bearer(&token).is_none());
}
#[test]
fn build_root_context_sets_internal_false_and_forwarded_for_none() {
let registry = Arc::new(registry_with(
"echo/run",
Visibility::External,
AccessControl::default(),
));
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let ctx = dp.build_root_context("req-ctx-1", "echo/run", None);
assert!(!ctx.internal, "internal must be false for wire-ingress");
assert!(ctx.forwarded_for.is_none(), "forwarded_for must be None");
assert!(ctx.parent_request_id.is_none(), "root has no parent");
assert!(ctx.deadline.is_some(), "deadline is set");
assert!(ctx.request_id != "req-ctx-1" || uuid::Uuid::parse_str("req-ctx-1").is_err());
}
#[test]
fn build_root_context_for_unknown_op_uses_empty_capabilities_and_scoped_env() {
let registry = Arc::new(OperationRegistry::new());
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let ctx = dp.build_root_context("req-ctx-2", "no/such", None);
assert!(ctx.handler_identity.is_none());
assert!(ctx.scoped_env.allowed_ops.is_empty());
assert!(ctx.scoped_env.peer_pinned.is_empty());
}
#[test]
fn build_root_context_carries_registration_bundle_fields() {
let authority =
alknet_call::registry::context::CompositionAuthority::new("agent", ["fs:read".to_string()]);
let scoped = ScopedPeerEnv::new(["fs/readFile"]);
let caps = Capabilities::new().with_api_key("google", "k".to_string());
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
external_spec("agent/run", AccessControl::default()),
make_handler(|input, context| async move {
ResponseEnvelope::ok(context.request_id, input)
}),
OperationProvenance::Local,
Some(authority),
Some(scoped.clone()),
caps,
));
let registry = Arc::new(registry);
let provider: Arc<dyn IdentityProvider> = Arc::new(StaticIdentityProvider::new());
let dp = dispatch(registry, provider);
let ctx = dp.build_root_context("req-ctx-3", "agent/run", None);
assert!(ctx.handler_identity.is_some());
assert_eq!(ctx.handler_identity.as_ref().unwrap().label, "agent");
assert!(ctx.scoped_env.allows("fs/readFile"));
assert!(ctx.capabilities.get("google").is_some());
}
#[test]
fn strip_leading_slash_helper_works() {
assert_eq!(strip_leading_slash("/fs/readFile"), "fs/readFile");
assert_eq!(strip_leading_slash("fs/readFile"), "fs/readFile");
assert_eq!(strip_leading_slash(""), "");
}
#[test]
fn gateway_dispatch_is_concrete_struct_not_trait() {
fn assert_concrete<T: Sized>() {}
assert_concrete::<GatewayDispatch>();
}
}

View File

@@ -4,4 +4,6 @@
//! `/subscribe`) are the sole HTTP invoke path (ADR-042/047). See
//! `docs/architecture/crates/http/http-server.md`.
// TODO: implement
pub mod dispatch;
pub use dispatch::GatewayDispatch;

View File

@@ -11,3 +11,5 @@ pub mod client;
pub mod gateway;
pub mod server;
pub mod websocket;
pub use gateway::GatewayDispatch;

View File

@@ -1,7 +1,7 @@
---
id: http/gateway/gateway-dispatch-spine
name: Implement GatewayDispatch shared dispatch spine (thin concrete struct, not a trait)
status: pending
status: completed
depends_on: [http/crate-init]
scope: narrow
risk: medium
@@ -180,4 +180,13 @@ research §6 open question #4.
## Summary
> To be filled on completion
> GatewayDispatch concrete struct implemented in src/gateway/dispatch.rs. Holds
> Arc<OperationRegistry> + Arc<dyn IdentityProvider>. resolve_bearer() delegates
> to identity_provider.resolve_from_token. invoke() builds root OperationContext
> (internal:false, forwarded_for:None, fresh UUID v4 request_id, deadline now+30s,
> registration bundle's composition_authority/capabilities/scoped_env) and calls
> OperationRegistry::invoke. Duplicated build_root_context construction (alknet-call
> version is pub(crate) + tangled with CallConnection overlays; invariants identical).
> 14 unit tests covering dispatch, services/list AccessControl filtering, NOT_FOUND
> for unregistered/Internal ops, FORBIDDEN for unauthorized, concrete-struct-not-trait.
> Re-exported from lib.rs. Build/clippy/test all pass.