feat(http): implement 5 gateway endpoints (search/schema/call/batch/subscribe)

Implement the sole HTTP invoke path (ADR-042/047) in
src/server/gateway_routes.rs: POST /call reads { operation, input },
resolves identity via the shared ResolvedIdentity extractor, dispatches
via GatewayDispatch::invoke, and returns ResponseEnvelope as JSON with
errors mapped via call_error_to_http_response. GET /search dispatches
services/list (AccessControl-filtered); GET /schema dispatches
services/schema with an ACL pre-check (unauthorized -> 403). POST /batch
loops over invoke returning an array; POST /subscribe projects the
response as SSE (text/event-stream) with data frames for call.responded
and an error event for call.aborted. Internal ops return 404. Wire the
real handlers into adapter.rs router, replacing the placeholder 501s.
This commit is contained in:
2026-07-01 19:17:59 +00:00
parent 539a812c12
commit 5a629a48e5
3 changed files with 967 additions and 16 deletions

View File

@@ -3,10 +3,10 @@
//! See `docs/architecture/crates/http/http-server.md`. This module wires the
//! axum `Router` (gateway endpoints + `/healthz` + `/openapi.json` + MCP +
//! custom routes + decoy fallback) and drives hyper's HTTP/1.1 or HTTP/2
//! connection driver over a single QUIC bidirectional stream. Gateway route
//! handlers, healthz/decoy logic, openapi.json generation, the MCP route, and
//! the WS upgrade handler are implemented by their respective tasks; this task
//! wires the routes with placeholder handlers returning 501 Not Implemented.
//! connection driver over a single QUIC bidirectional stream. The 5 gateway
//! endpoints (`/search`/`/schema`/`/call`/`/batch`/`/subscribe`) are wired in
//! from `gateway_routes`; `/openapi.json`, the MCP route, and the WS upgrade
//! handler remain placeholder 501 handlers pending their respective tasks.
use std::io;
use std::path::PathBuf;
@@ -17,7 +17,7 @@ use async_trait::async_trait;
use axum::http::StatusCode;
use axum::middleware::from_fn_with_state;
use axum::response::IntoResponse;
use axum::routing::{any, get, post};
use axum::routing::{get, post};
use axum::Router;
use hyper_util::rt::{TokioExecutor, TokioIo};
use hyper_util::server::conn::auto::Builder as HyperBuilder;
@@ -30,8 +30,9 @@ use alknet_core::auth::{AuthContext, IdentityProvider};
use alknet_core::types::{Connection, HandlerError, ProtocolHandler, StreamError};
use super::auth::bearer_auth_middleware;
use crate::server::decoy::decoy_fallback;
use crate::server::healthz::healthz;
use super::decoy::decoy_fallback;
use super::gateway_routes;
use super::healthz::healthz;
const ALPN_HTTP1: &[u8] = b"http/1.1";
const ALPN_H2: &[u8] = b"h2";
@@ -46,10 +47,10 @@ pub enum DecoyConfig {
#[derive(Clone)]
#[allow(dead_code)]
struct RouterState {
registry: Arc<OperationRegistry>,
identity_provider: Arc<dyn IdentityProvider>,
decoy: DecoyConfig,
pub(crate) struct RouterState {
pub(crate) registry: Arc<OperationRegistry>,
pub(crate) identity_provider: Arc<dyn IdentityProvider>,
pub(crate) decoy: DecoyConfig,
}
impl axum::extract::FromRef<RouterState> for DecoyConfig {
@@ -136,11 +137,7 @@ impl HttpAdapter {
fn build_router(state: RouterState, extra_routes: Option<Router>) -> Router {
let auth_state = Arc::clone(&state.identity_provider);
let default: Router<RouterState> = Router::new()
.route("/search", any(not_implemented))
.route("/schema", any(not_implemented))
.route("/call", any(not_implemented))
.route("/batch", any(not_implemented))
.route("/subscribe", any(not_implemented))
.merge(gateway_routes::gateway_router())
.route("/openapi.json", get(not_implemented))
.route("/mcp", post(not_implemented))
.route_layer(from_fn_with_state(auth_state.clone(), bearer_auth_middleware))

View File

@@ -0,0 +1,953 @@
//! The 5 fixed gateway endpoints (`/search`, `/schema`, `/call`, `/batch`,
//! `/subscribe`) — the sole HTTP invoke path (ADR-042, ADR-047).
//!
//! See `docs/architecture/crates/http/http-server.md` §"HTTP-to-call dispatch"
//! and `docs/architecture/crates/http/http-adapters.md` §"The gateway endpoint
//! set". Each endpoint delegates to `GatewayDispatch::invoke()` (the shared
//! dispatch spine); auth is the shared `bearer_auth_middleware`; error mapping
//! is `call_error_to_http_response`. There is no per-operation
//! `POST /{service}/{op}` direct-call surface (ADR-047).
use std::convert::Infallible;
use std::sync::Arc;
use axum::extract::{FromRef, Query, State};
use axum::http::StatusCode;
use axum::response::sse::Event;
use axum::response::{IntoResponse, Json, Response, Sse};
use axum::routing::{get, post};
use axum::Router;
use futures::stream::{self, BoxStream, Stream};
use serde::Deserialize;
use serde_json::{json, Value};
use alknet_call::protocol::wire::{CallError, ResponseEnvelope};
use alknet_call::registry::registration::OperationRegistry;
use alknet_call::registry::spec::{AccessResult, Visibility};
use alknet_core::auth::{Identity, IdentityProvider};
use super::adapter::RouterState;
use super::auth::ResolvedIdentity;
use crate::gateway::dispatch::GatewayDispatch;
use crate::gateway::error::{call_error_to_http_response, call_error_to_http_status_with_identity};
const SERVICES_LIST: &str = "services/list";
const SERVICES_SCHEMA: &str = "services/schema";
#[derive(Clone)]
pub(crate) struct GatewayState {
registry: Arc<OperationRegistry>,
identity_provider: Arc<dyn IdentityProvider>,
}
impl GatewayState {
pub(crate) fn new(
registry: Arc<OperationRegistry>,
identity_provider: Arc<dyn IdentityProvider>,
) -> Self {
Self {
registry,
identity_provider,
}
}
fn dispatch(&self) -> GatewayDispatch {
GatewayDispatch::new(Arc::clone(&self.registry), Arc::clone(&self.identity_provider))
}
}
impl FromRef<RouterState> for GatewayState {
fn from_ref(state: &RouterState) -> Self {
GatewayState::new(Arc::clone(&state.registry), Arc::clone(&state.identity_provider))
}
}
pub(crate) fn gateway_router() -> Router<RouterState> {
Router::new()
.route("/search", get(search_handler))
.route("/schema", get(schema_handler))
.route("/call", post(call_handler))
.route("/batch", post(batch_handler))
.route("/subscribe", post(subscribe_handler))
}
#[derive(Debug, Deserialize)]
pub struct CallRequest {
pub operation: String,
#[serde(default = "Value::default")]
pub input: Value,
}
#[derive(Debug, Deserialize)]
pub struct SchemaQuery {
pub name: String,
}
pub(crate) async fn call_handler(
State(state): State<GatewayState>,
ResolvedIdentity(identity): ResolvedIdentity,
Json(request): Json<CallRequest>,
) -> Response {
if is_internal_op(&state.registry, &request.operation) {
return not_found_response(&request.operation);
}
let dispatch = state.dispatch();
let envelope = dispatch.invoke(identity.clone(), &request.operation, request.input).await;
envelope_to_response(envelope, identity.as_ref())
}
pub(crate) async fn search_handler(
State(state): State<GatewayState>,
ResolvedIdentity(identity): ResolvedIdentity,
) -> Response {
let dispatch = state.dispatch();
let envelope = dispatch.invoke(identity.clone(), SERVICES_LIST, json!({})).await;
envelope_to_response(envelope, identity.as_ref())
}
pub(crate) async fn schema_handler(
State(state): State<GatewayState>,
ResolvedIdentity(identity): ResolvedIdentity,
Query(query): Query<SchemaQuery>,
) -> Response {
if let Some(forbidden) = access_check_for_op(&state.registry, &query.name, identity.as_ref()) {
return forbidden_response(forbidden, identity.as_ref());
}
let dispatch = state.dispatch();
let envelope = dispatch
.invoke(identity.clone(), SERVICES_SCHEMA, json!({ "name": query.name }))
.await;
envelope_to_response(envelope, identity.as_ref())
}
pub(crate) async fn batch_handler(
State(state): State<GatewayState>,
ResolvedIdentity(identity): ResolvedIdentity,
Json(requests): Json<Vec<CallRequest>>,
) -> Response {
let dispatch = state.dispatch();
let mut results: Vec<Value> = Vec::with_capacity(requests.len());
for request in requests {
if is_internal_op(&state.registry, &request.operation) {
results.push(not_found_envelope_json(&request.operation));
continue;
}
let envelope = dispatch
.invoke(identity.clone(), &request.operation, request.input)
.await;
results.push(envelope_to_json(envelope));
}
Json(json!({ "results": results })).into_response()
}
pub(crate) async fn subscribe_handler(
State(state): State<GatewayState>,
ResolvedIdentity(identity): ResolvedIdentity,
Json(request): Json<CallRequest>,
) -> Sse<SubscribeStream> {
let stream = if is_internal_op(&state.registry, &request.operation) {
subscribe_stream_internal_error(request.operation)
} else {
let dispatch = state.dispatch();
let envelope = dispatch.invoke(identity, &request.operation, request.input).await;
subscribe_stream_from_envelope(envelope)
};
Sse::new(stream)
}
pub type SubscribeStream = BoxStream<'static, Result<Event, Infallible>>;
fn subscribe_stream_from_envelope(envelope: ResponseEnvelope) -> SubscribeStream {
Box::pin(envelope_to_sse_stream(envelope))
}
fn subscribe_stream_internal_error(operation: String) -> SubscribeStream {
Box::pin(stream::once(async move { error_event(&operation) }))
}
fn envelope_to_response(envelope: ResponseEnvelope, identity: Option<&Identity>) -> Response {
match envelope.result {
Ok(output) => {
let body = envelope_to_ok_json(&envelope.request_id, &output);
(StatusCode::OK, Json(body)).into_response()
}
Err(error) => {
let status_code = call_error_to_http_status_with_identity(&error, identity);
let status =
StatusCode::from_u16(status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let body = serde_json::to_value(&error).unwrap_or(Value::Null);
(status, Json(body)).into_response()
}
}
}
fn envelope_to_json(envelope: ResponseEnvelope) -> Value {
match envelope.result {
Ok(output) => envelope_to_ok_json(&envelope.request_id, &output),
Err(error) => envelope_to_error_json(&envelope.request_id, &error),
}
}
fn envelope_to_ok_json(request_id: &str, output: &Value) -> Value {
json!({
"request_id": request_id,
"result": "ok",
"output": output,
})
}
fn envelope_to_error_json(request_id: &str, error: &CallError) -> Value {
json!({
"request_id": request_id,
"result": "error",
"error": serde_json::to_value(error).unwrap_or(Value::Null),
})
}
fn not_found_envelope_json(operation: &str) -> Value {
let error = CallError::not_found(operation);
json!({
"request_id": Value::Null,
"result": "error",
"error": serde_json::to_value(&error).unwrap_or(Value::Null),
})
}
fn not_found_response(operation: &str) -> Response {
let error = CallError::not_found(operation);
call_error_to_http_response(&error)
}
fn forbidden_response(message: String, identity: Option<&Identity>) -> Response {
let error = CallError::forbidden(message);
let status_code = call_error_to_http_status_with_identity(&error, identity);
let status =
StatusCode::from_u16(status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
let body = serde_json::to_value(&error).unwrap_or(Value::Null);
(status, Json(body)).into_response()
}
fn access_check_for_op(
registry: &OperationRegistry,
operation: &str,
identity: Option<&Identity>,
) -> Option<String> {
let name = operation.strip_prefix('/').unwrap_or(operation);
let reg = registry.registration(name)?;
if let AccessResult::Forbidden(message) = reg.spec.access_control.check(identity) {
return Some(message);
}
None
}
fn is_internal_op(registry: &OperationRegistry, operation: &str) -> bool {
let name = operation.strip_prefix('/').unwrap_or(operation);
match registry.registration(name) {
Some(reg) => reg.spec.visibility == Visibility::Internal,
None => false,
}
}
fn envelope_to_sse_stream(envelope: ResponseEnvelope) -> impl Stream<Item = Result<Event, Infallible>> {
stream::once(async move {
match envelope.result {
Ok(output) => {
let data = serde_json::to_string(&output).unwrap_or_else(|_| "null".to_string());
Ok(Event::default().data(data))
}
Err(error) => {
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
Ok(Event::default().event("error").data(data))
}
}
})
}
fn error_event(operation: &str) -> Result<Event, Infallible> {
let error = CallError::not_found(operation);
let payload = serde_json::to_value(&error).unwrap_or(Value::Null);
let data = serde_json::to_string(&payload).unwrap_or_else(|_| "null".to_string());
Ok(Event::default().event("error").data(data))
}
#[cfg(test)]
mod tests {
use super::*;
use alknet_call::registry::discovery::{
services_list_handler, services_list_spec, services_schema_handler, services_schema_spec,
};
use alknet_call::registry::registration::{
make_handler, HandlerRegistration, OperationProvenance,
};
use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType};
use alknet_core::auth::{AuthToken, Identity};
use alknet_core::types::Capabilities;
use axum::body::Body;
use axum::http::Request;
use axum::middleware::from_fn_with_state;
use http_body_util::BodyExt;
use std::collections::HashMap;
use std::sync::Mutex as StdMutex;
use tower::ServiceExt;
struct StaticIdentityProvider {
tokens: StdMutex<HashMap<String, Identity>>,
}
impl StaticIdentityProvider {
fn new() -> Self {
Self {
tokens: StdMutex::new(HashMap::new()),
}
}
fn with_token(self, token: &str, identity: Identity) -> Self {
self.tokens
.lock()
.unwrap()
.insert(token.to_string(), identity);
self
}
}
impl IdentityProvider for StaticIdentityProvider {
fn resolve_from_fingerprint(&self, _fp: &str) -> Option<Identity> {
None
}
fn resolve_from_token(&self, token: &AuthToken) -> Option<Identity> {
let token_str = String::from_utf8_lossy(&token.raw);
self.tokens.lock().unwrap().get(token_str.as_ref()).cloned()
}
}
fn identity_with_scopes(id: &str, scopes: &[&str]) -> Identity {
Identity {
id: id.to_string(),
scopes: scopes.iter().map(|s| s.to_string()).collect(),
resources: HashMap::new(),
}
}
fn external_spec(name: &str, acl: AccessControl) -> OperationSpec {
OperationSpec::new(
name,
OperationType::Query,
Visibility::External,
json!({}),
json!({}),
vec![],
acl,
)
}
fn internal_spec(name: &str) -> OperationSpec {
OperationSpec::new(
name,
OperationType::Query,
Visibility::Internal,
json!({}),
json!({}),
vec![],
AccessControl::default(),
)
}
fn echo_handler() -> alknet_call::registry::registration::Handler {
make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) })
}
fn registry_with_echo() -> Arc<OperationRegistry> {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
external_spec("echo/run", AccessControl::default()),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
Arc::new(registry)
}
fn registry_with_restricted_op() -> Arc<OperationRegistry> {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
external_spec(
"admin/run",
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
Arc::new(registry)
}
fn registry_with_internal_op() -> Arc<OperationRegistry> {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
internal_spec("secret/op"),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
Arc::new(registry)
}
fn registry_with_discovery_and_ops(
inner_ops: Vec<HandlerRegistration>,
) -> Arc<OperationRegistry> {
let mut inner = OperationRegistry::new();
for op in inner_ops {
inner.register(op);
}
let inner = Arc::new(inner);
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
services_list_spec(),
services_list_handler(Arc::clone(&inner)),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
registry.register(HandlerRegistration::new(
services_schema_spec(),
services_schema_handler(Arc::clone(&inner)),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
for spec in inner.list_operations() {
let name = spec.name.clone();
let reg = inner.registration(&name).unwrap();
registry.register(HandlerRegistration::new(
reg.spec.clone(),
Arc::clone(&reg.handler),
reg.provenance,
reg.composition_authority.clone(),
reg.scoped_env.clone(),
reg.capabilities.clone(),
));
}
Arc::new(registry)
}
fn unused_provider() -> Arc<dyn IdentityProvider> {
Arc::new(StaticIdentityProvider::new())
}
fn build_router(
registry: Arc<OperationRegistry>,
provider: Arc<dyn IdentityProvider>,
) -> Router {
let state = RouterState {
registry: Arc::clone(&registry),
identity_provider: Arc::clone(&provider),
decoy: crate::server::DecoyConfig::NotFound,
};
let auth_state = Arc::clone(&provider);
gateway_router()
.route_layer(from_fn_with_state(
auth_state,
super::super::auth::bearer_auth_middleware,
))
.with_state(state)
}
fn auth_header(token: &str) -> (&'static str, String) {
("authorization", format!("Bearer {token}"))
}
async fn send(router: Router, req: Request<Body>) -> (StatusCode, Value) {
let resp = router.oneshot(req).await.unwrap();
let status = resp.status();
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let body: Value = if bytes.is_empty() {
Value::Null
} else {
serde_json::from_slice(&bytes).unwrap_or(Value::Null)
};
(status, body)
}
#[tokio::test]
async fn call_round_trip_external_op_returns_200_with_json_body() {
let router = build_router(registry_with_echo(), unused_provider());
let req = Request::builder()
.method("POST")
.uri("/call")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "echo/run", "input": { "msg": "hi" } }))
.unwrap(),
))
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body.get("result"), Some(&json!("ok")));
assert_eq!(body.get("output"), Some(&json!({ "msg": "hi" })));
}
#[tokio::test]
async fn call_internal_op_returns_404() {
let router = build_router(registry_with_internal_op(), unused_provider());
let req = Request::builder()
.method("POST")
.uri("/call")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "secret/op", "input": {} })).unwrap(),
))
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_eq!(body.get("code"), Some(&json!("NOT_FOUND")));
}
#[tokio::test]
async fn call_unauthorized_restricted_op_returns_403() {
let provider: Arc<dyn IdentityProvider> = Arc::new(
StaticIdentityProvider::new()
.with_token("user-tok", identity_with_scopes("user", &["user"])),
);
let router = build_router(registry_with_restricted_op(), provider);
let (k, v) = auth_header("user-tok");
let req = Request::builder()
.method("POST")
.uri("/call")
.header("content-type", "application/json")
.header(k, v)
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "admin/run", "input": {} })).unwrap(),
))
.unwrap();
let (status, _body) = send(router, req).await;
assert_eq!(status, StatusCode::FORBIDDEN);
}
#[tokio::test]
async fn call_unauthenticated_restricted_op_returns_401() {
let router = build_router(registry_with_restricted_op(), unused_provider());
let req = Request::builder()
.method("POST")
.uri("/call")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "admin/run", "input": {} })).unwrap(),
))
.unwrap();
let (status, _body) = send(router, req).await;
assert_eq!(status, StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn search_returns_only_access_control_allowed_ops() {
let ops = vec![
HandlerRegistration::new(
external_spec("public/echo", AccessControl::default()),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
),
HandlerRegistration::new(
external_spec(
"admin/secret",
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
),
];
let discovery = registry_with_discovery_and_ops(ops);
let provider: Arc<dyn IdentityProvider> = Arc::new(
StaticIdentityProvider::new()
.with_token("user-tok", identity_with_scopes("regular", &["user"])),
);
let router = build_router(discovery, provider);
let (k, v) = auth_header("user-tok");
let req = Request::builder()
.method("GET")
.uri("/search")
.header(k, v)
.body(Body::empty())
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::OK);
let ops = body
.get("output")
.and_then(|o| o.get("operations"))
.and_then(|o| o.as_array())
.expect("operations array");
let names: Vec<&str> = ops
.iter()
.filter_map(|o| o.get("name").and_then(|n| n.as_str()))
.collect();
assert!(names.contains(&"public/echo"));
assert!(!names.contains(&"admin/secret"));
}
#[tokio::test]
async fn schema_returns_full_spec_for_authorized_op() {
let ops = vec![HandlerRegistration::new(
external_spec("echo/run", AccessControl::default()),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
)];
let discovery = registry_with_discovery_and_ops(ops);
let router = build_router(discovery, unused_provider());
let req = Request::builder()
.method("GET")
.uri("/schema?name=echo%2Frun")
.body(Body::empty())
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::OK);
let output = body.get("output").expect("output");
assert_eq!(output.get("name"), Some(&json!("echo/run")));
assert_eq!(output.get("namespace"), Some(&json!("echo")));
assert!(output.get("input_schema").is_some());
assert!(output.get("output_schema").is_some());
}
#[tokio::test]
async fn schema_for_unauthorized_op_returns_403() {
let ops = vec![HandlerRegistration::new(
external_spec(
"admin/secret",
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
)];
let discovery = registry_with_discovery_and_ops(ops);
let provider: Arc<dyn IdentityProvider> = Arc::new(
StaticIdentityProvider::new()
.with_token("user-tok", identity_with_scopes("regular", &["user"])),
);
let router = build_router(discovery, provider);
let (k, v) = auth_header("user-tok");
let req = Request::builder()
.method("GET")
.uri("/schema?name=admin%2Fsecret")
.header(k, v)
.body(Body::empty())
.unwrap();
let (status, _body) = send(router, req).await;
assert_eq!(status, StatusCode::FORBIDDEN);
}
#[tokio::test]
async fn batch_returns_array_of_results_in_order() {
let router = build_router(registry_with_echo(), unused_provider());
let req = Request::builder()
.method("POST")
.uri("/batch")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!([
{ "operation": "echo/run", "input": { "n": 1 } },
{ "operation": "echo/run", "input": { "n": 2 } },
]))
.unwrap(),
))
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::OK);
let results = body
.get("results")
.and_then(|r| r.as_array())
.expect("results array");
assert_eq!(results.len(), 2);
assert_eq!(results[0].get("output"), Some(&json!({ "n": 1 })));
assert_eq!(results[1].get("output"), Some(&json!({ "n": 2 })));
}
#[tokio::test]
async fn batch_internal_op_returns_not_found_in_array() {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
internal_spec("secret/op"),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
registry.register(HandlerRegistration::new(
external_spec("echo/run", AccessControl::default()),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
let registry = Arc::new(registry);
let router = build_router(registry, unused_provider());
let req = Request::builder()
.method("POST")
.uri("/batch")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!([
{ "operation": "echo/run", "input": {} },
{ "operation": "secret/op", "input": {} },
]))
.unwrap(),
))
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::OK);
let results = body
.get("results")
.and_then(|r| r.as_array())
.expect("results array");
assert_eq!(results.len(), 2);
assert_eq!(results[0].get("result"), Some(&json!("ok")));
assert_eq!(results[1].get("result"), Some(&json!("error")));
assert_eq!(
results[1].get("error").and_then(|e| e.get("code")),
Some(&json!("NOT_FOUND"))
);
}
#[tokio::test]
async fn subscribe_streams_sse_data_event_until_completed() {
let router = build_router(registry_with_echo(), unused_provider());
let req = Request::builder()
.method("POST")
.uri("/subscribe")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "echo/run", "input": { "v": 9 } }))
.unwrap(),
))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let ctype = resp
.headers()
.get(axum::http::header::CONTENT_TYPE)
.map(|v| v.to_str().unwrap().to_string());
assert!(
ctype.as_deref().unwrap_or("").starts_with("text/event-stream"),
"expected text/event-stream, got {ctype:?}"
);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let body = String::from_utf8_lossy(&bytes);
assert!(body.contains("data:"), "expected a data frame, got: {body}");
assert!(
body.contains("\"v\":9"),
"expected output payload, got: {body}"
);
}
#[tokio::test]
async fn subscribe_internal_op_emits_error_event() {
let router = build_router(registry_with_internal_op(), unused_provider());
let req = Request::builder()
.method("POST")
.uri("/subscribe")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "secret/op", "input": {} })).unwrap(),
))
.unwrap();
let resp = router.oneshot(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::OK);
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
let body = String::from_utf8_lossy(&bytes);
assert!(
body.contains("event:error") || body.contains("event: error"),
"expected error event, got: {body}"
);
assert!(
body.contains("NOT_FOUND"),
"expected NOT_FOUND, got: {body}"
);
}
#[test]
fn is_internal_op_returns_false_for_unknown() {
let registry = OperationRegistry::new();
assert!(!is_internal_op(&registry, "no/such"));
assert!(!is_internal_op(&registry, "/no/such"));
}
#[test]
fn is_internal_op_detects_registered_internal_op() {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
internal_spec("secret/op"),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
assert!(is_internal_op(&registry, "secret/op"));
assert!(is_internal_op(&registry, "/secret/op"));
}
#[test]
fn is_internal_op_false_for_external_op() {
let mut registry = OperationRegistry::new();
registry.register(HandlerRegistration::new(
external_spec("echo/run", AccessControl::default()),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
));
assert!(!is_internal_op(&registry, "echo/run"));
}
#[test]
fn envelope_to_ok_json_shape() {
let env = ResponseEnvelope::ok("req-1", json!({ "v": 1 }));
let v = envelope_to_json(env);
assert_eq!(v.get("request_id"), Some(&json!("req-1")));
assert_eq!(v.get("result"), Some(&json!("ok")));
assert_eq!(v.get("output"), Some(&json!({ "v": 1 })));
}
#[test]
fn envelope_to_error_json_shape() {
let env = ResponseEnvelope::not_found("req-2", "no/such");
let v = envelope_to_json(env);
assert_eq!(v.get("result"), Some(&json!("error")));
assert_eq!(
v.get("error").and_then(|e| e.get("code")),
Some(&json!("NOT_FOUND"))
);
}
#[tokio::test]
async fn call_with_leading_slash_in_operation_dispatches() {
let router = build_router(registry_with_echo(), unused_provider());
let req = Request::builder()
.method("POST")
.uri("/call")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "/echo/run", "input": {} })).unwrap(),
))
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::OK);
assert_eq!(body.get("result"), Some(&json!("ok")));
}
#[tokio::test]
async fn call_unknown_op_returns_404() {
let router = build_router(registry_with_echo(), unused_provider());
let req = Request::builder()
.method("POST")
.uri("/call")
.header("content-type", "application/json")
.body(Body::from(
serde_json::to_vec(&json!({ "operation": "no/such", "input": {} })).unwrap(),
))
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_eq!(body.get("code"), Some(&json!("NOT_FOUND")));
}
#[tokio::test]
async fn search_unauthenticated_lists_default_acl_ops_only() {
let ops = vec![
HandlerRegistration::new(
external_spec("public/echo", AccessControl::default()),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
),
HandlerRegistration::new(
external_spec(
"admin/secret",
AccessControl {
required_scopes: vec!["admin".to_string()],
..Default::default()
},
),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
),
];
let discovery = registry_with_discovery_and_ops(ops);
let router = build_router(discovery, unused_provider());
let req = Request::builder()
.method("GET")
.uri("/search")
.body(Body::empty())
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::OK);
let ops = body
.get("output")
.and_then(|o| o.get("operations"))
.and_then(|o| o.as_array())
.expect("operations array");
let names: Vec<&str> = ops
.iter()
.filter_map(|o| o.get("name").and_then(|n| n.as_str()))
.collect();
assert!(names.contains(&"public/echo"));
assert!(!names.contains(&"admin/secret"));
}
#[tokio::test]
async fn schema_unknown_op_returns_404() {
let ops = vec![HandlerRegistration::new(
external_spec("echo/run", AccessControl::default()),
echo_handler(),
OperationProvenance::Local,
None,
None,
Capabilities::new(),
)];
let discovery = registry_with_discovery_and_ops(ops);
let router = build_router(discovery, unused_provider());
let req = Request::builder()
.method("GET")
.uri("/schema?name=no%2Fsuch")
.body(Body::empty())
.unwrap();
let (status, body) = send(router, req).await;
assert_eq!(status, StatusCode::NOT_FOUND);
assert_eq!(body.get("code"), Some(&json!("NOT_FOUND")));
}
}

View File

@@ -9,6 +9,7 @@
pub mod adapter;
pub mod auth;
pub mod decoy;
pub mod gateway_routes;
pub mod healthz;
pub use adapter::{DecoyConfig, HttpAdapter};