From 2695a1950295e90862fb10699eca94496fe900f6 Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Wed, 1 Jul 2026 19:52:57 +0000 Subject: [PATCH 1/2] feat(http): implement to_openapi gateway projection (5-endpoint OpenAPI doc, info.version 1.0.0) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit to_openapi(registry) -> OpenAPISpec generates the fixed 5-endpoint gateway doc (/search, /schema, /call, /batch, /subscribe) — pure projection (ADR-017 §5), gateway pattern (ADR-042). info.version is 1.0.0 tracking the gateway contract (ADR-045). /call responses carry protocol-level errors (400/401/403/404/500/504) plus operation-level errors mapped by http_status (ADR-023). GET /openapi.json wired to serve the generated spec. --- crates/alknet-http/src/adapters/mod.rs | 5 +- crates/alknet-http/src/adapters/to_openapi.rs | 654 ++++++++++++++++++ crates/alknet-http/src/server/adapter.rs | 165 +++-- 3 files changed, 782 insertions(+), 42 deletions(-) create mode 100644 crates/alknet-http/src/adapters/to_openapi.rs diff --git a/crates/alknet-http/src/adapters/mod.rs b/crates/alknet-http/src/adapters/mod.rs index 3ec6ed1..3744326 100644 --- a/crates/alknet-http/src/adapters/mod.rs +++ b/crates/alknet-http/src/adapters/mod.rs @@ -16,10 +16,13 @@ pub mod from_mcp; #[cfg(feature = "mcp")] pub mod to_mcp; +pub mod to_openapi; + pub use from_openapi::{FromOpenAPI, HttpAuthScheme, HttpServiceConfig, OpenAPISpec}; +pub use to_openapi::to_openapi; #[cfg(feature = "mcp")] pub use from_mcp::FromMCP; #[cfg(feature = "mcp")] -pub use to_mcp::{ToMcpGateway, ToMcpService, to_mcp_service}; +pub use to_mcp::{to_mcp_service, ToMcpGateway, ToMcpService}; diff --git a/crates/alknet-http/src/adapters/to_openapi.rs b/crates/alknet-http/src/adapters/to_openapi.rs new file mode 100644 index 0000000..4c5dbbb --- /dev/null +++ b/crates/alknet-http/src/adapters/to_openapi.rs @@ -0,0 +1,654 @@ +//! `to_openapi`: the OpenAPI gateway projection (ADR-042). Generates a +//! fixed 5-endpoint gateway doc (`/search`, `/schema`, `/call`, `/batch`, +//! `/subscribe`) that gates access to the full operation registry — not one +//! path per operation. Served at `GET /openapi.json` by the HTTP server. +//! +//! Pure projection (ADR-017 §5): consumes the registry, does not produce +//! entries, is not an `OperationAdapter`. The per-caller operation surface +//! is discovered via `/search` (AccessControl-filtered at runtime), not +//! preloaded into the doc (ADR-042 §3). `info.version` is a constant +//! semver tracking the gateway endpoint contract, not the operation set +//! (ADR-045); the initial version is `1.0.0`. +//! +//! Error fidelity (ADR-023): `/call`'s responses include the protocol- +//! level errors (400/401/403/404/500/504) plus the operation-level errors +//! declared in registry `error_schemas` (mapped by `http_status`). + +use alknet_call::registry::registration::OperationRegistry; +use alknet_call::registry::spec::Visibility; +use serde_json::{json, Map, Value}; + +use crate::adapters::OpenAPISpec; + +const GATEWAY_VERSION: &str = "1.0.0"; +const GATEWAY_TITLE: &str = "alknet gateway"; + +const PATH_SEARCH: &str = "/search"; +const PATH_SCHEMA: &str = "/schema"; +const PATH_CALL: &str = "/call"; +const PATH_BATCH: &str = "/batch"; +const PATH_SUBSCRIBE: &str = "/subscribe"; + +const CONTENT_JSON: &str = "application/json"; +const CONTENT_SSE: &str = "text/event-stream"; + +const STATUS_BAD_REQUEST: &str = "400"; +const STATUS_UNAUTHORIZED: &str = "401"; +const STATUS_FORBIDDEN: &str = "403"; +const STATUS_NOT_FOUND: &str = "404"; +const STATUS_INTERNAL: &str = "500"; +const STATUS_TIMEOUT: &str = "504"; + +pub fn to_openapi(registry: &OperationRegistry) -> OpenAPISpec { + let mut paths_obj = Map::new(); + paths_obj.insert( + PATH_SEARCH.to_string(), + path_item("get", search_operation()), + ); + paths_obj.insert( + PATH_SCHEMA.to_string(), + path_item("get", schema_operation()), + ); + paths_obj.insert( + PATH_CALL.to_string(), + path_item("post", call_operation(registry)), + ); + paths_obj.insert(PATH_BATCH.to_string(), path_item("post", batch_operation())); + paths_obj.insert( + PATH_SUBSCRIBE.to_string(), + path_item("post", subscribe_operation()), + ); + + let doc = json!({ + "openapi": "3.0.0", + "info": { + "title": GATEWAY_TITLE, + "version": GATEWAY_VERSION, + }, + "paths": Value::Object(paths_obj), + }); + + OpenAPISpec::from_value(doc).expect("generated gateway doc is a valid OpenAPI 3.0 object") +} + +fn path_item(method: &str, operation: Value) -> Value { + let mut item = Map::new(); + item.insert(method.to_string(), operation); + Value::Object(item) +} + +fn search_operation() -> Value { + json!({ + "operationId": "search", + "summary": "List/search available operations (AccessControl-filtered). Returns names + descriptions.", + "responses": { + "200": json_response(search_output_schema()), + STATUS_BAD_REQUEST: error_response("INVALID_INPUT", "Malformed query."), + STATUS_UNAUTHORIZED: error_response("UNAUTHORIZED", "Missing bearer token."), + STATUS_FORBIDDEN: error_response("FORBIDDEN", "Insufficient scopes."), + STATUS_INTERNAL: error_response("INTERNAL", "Internal error."), + STATUS_TIMEOUT: error_response("TIMEOUT", "Request timed out."), + } + }) +} + +fn schema_operation() -> Value { + json!({ + "operationId": "schema", + "summary": "Get an operation's full OperationSpec (input/output JSON Schemas, error schemas).", + "parameters": [{ + "name": "name", + "in": "query", + "required": true, + "schema": { "type": "string" } + }], + "responses": { + "200": json_response(schema_output_schema()), + STATUS_BAD_REQUEST: error_response("INVALID_INPUT", "Missing or malformed `name` parameter."), + STATUS_UNAUTHORIZED: error_response("UNAUTHORIZED", "Missing bearer token."), + STATUS_FORBIDDEN: error_response("FORBIDDEN", "Insufficient scopes for the requested operation."), + STATUS_NOT_FOUND: error_response("NOT_FOUND", "Operation not registered."), + STATUS_INTERNAL: error_response("INTERNAL", "Internal error."), + STATUS_TIMEOUT: error_response("TIMEOUT", "Request timed out."), + } + }) +} + +fn call_operation(registry: &OperationRegistry) -> Value { + let mut responses = Map::new(); + responses.insert("200".to_string(), json_response(call_success_schema())); + responses.insert( + STATUS_BAD_REQUEST.to_string(), + error_response( + "INVALID_INPUT", + "The request body was not a valid `{ operation, input }` object.", + ), + ); + responses.insert( + STATUS_UNAUTHORIZED.to_string(), + error_response("UNAUTHORIZED", "No bearer token provided."), + ); + responses.insert( + STATUS_FORBIDDEN.to_string(), + error_response( + "FORBIDDEN", + "Insufficient scopes to invoke the requested operation.", + ), + ); + responses.insert( + STATUS_NOT_FOUND.to_string(), + error_response("NOT_FOUND", "Operation not registered (or is Internal)."), + ); + responses.insert( + STATUS_INTERNAL.to_string(), + error_response("INTERNAL", "Internal error."), + ); + responses.insert( + STATUS_TIMEOUT.to_string(), + error_response("TIMEOUT", "Request timed out."), + ); + + for spec in registry.list_operations() { + if spec.visibility != Visibility::External { + continue; + } + for error in &spec.error_schemas { + let Some(status) = error.http_status else { + continue; + }; + let code = format!("{status}"); + if responses.contains_key(&code) { + continue; + } + responses.insert(code, json_response(error.schema.clone())); + } + } + + json!({ + "operationId": "call", + "summary": "Invoke an operation by name with a flat JSON body `{ operation, input }`.", + "requestBody": { + "required": true, + "content": { + CONTENT_JSON: { + "schema": call_input_schema(), + } + } + }, + "responses": Value::Object(responses), + }) +} + +fn batch_operation() -> Value { + json!({ + "operationId": "batch", + "summary": "Invoke multiple operations in one request. Array of `{ operation, input }`.", + "requestBody": { + "required": true, + "content": { + CONTENT_JSON: { + "schema": batch_input_schema(), + } + } + }, + "responses": { + "200": json_response(batch_output_schema()), + STATUS_BAD_REQUEST: error_response("INVALID_INPUT", "The request body was not a JSON array of call requests."), + STATUS_UNAUTHORIZED: error_response("UNAUTHORIZED", "Missing bearer token."), + STATUS_FORBIDDEN: error_response("FORBIDDEN", "Insufficient scopes."), + STATUS_INTERNAL: error_response("INTERNAL", "Internal error."), + STATUS_TIMEOUT: error_response("TIMEOUT", "Request timed out."), + } + }) +} + +fn subscribe_operation() -> Value { + let mut responses = Map::new(); + responses.insert("200".to_string(), sse_response(call_success_schema())); + responses.insert( + STATUS_BAD_REQUEST.to_string(), + error_response( + "INVALID_INPUT", + "The request body was not a valid `{ operation, input }` object.", + ), + ); + responses.insert( + STATUS_UNAUTHORIZED.to_string(), + error_response("UNAUTHORIZED", "No bearer token provided."), + ); + responses.insert( + STATUS_FORBIDDEN.to_string(), + error_response( + "FORBIDDEN", + "Insufficient scopes to invoke the requested operation.", + ), + ); + responses.insert( + STATUS_NOT_FOUND.to_string(), + error_response("NOT_FOUND", "Operation not registered (or is Internal)."), + ); + responses.insert( + STATUS_INTERNAL.to_string(), + error_response("INTERNAL", "Internal error."), + ); + responses.insert( + STATUS_TIMEOUT.to_string(), + error_response("TIMEOUT", "Request timed out."), + ); + + json!({ + "operationId": "subscribe", + "summary": "Invoke a streaming operation. Body `{ operation, input }`; response is `text/event-stream`.", + "requestBody": { + "required": true, + "content": { + CONTENT_JSON: { + "schema": call_input_schema(), + } + } + }, + "responses": Value::Object(responses), + }) +} + +fn call_input_schema() -> Value { + json!({ + "type": "object", + "properties": { + "operation": { + "type": "string", + "description": "The fully-qualified operation name to invoke." + }, + "input": { + "type": "object", + "description": "The JSON input object to pass to the operation." + } + }, + "required": ["operation"] + }) +} + +fn batch_input_schema() -> Value { + json!({ + "type": "array", + "items": call_input_schema() + }) +} + +fn search_output_schema() -> Value { + json!({ + "type": "object", + "properties": { + "operations": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": { "type": "string" }, + "description": { "type": "string" } + } + } + } + } + }) +} + +fn schema_output_schema() -> Value { + json!({ + "type": "object", + "properties": { + "name": { "type": "string" }, + "namespace": { "type": "string" }, + "op_type": { "type": "string" }, + "input_schema": {}, + "output_schema": {}, + "error_schemas": { "type": "array" }, + "access_control": {} + } + }) +} + +fn call_success_schema() -> Value { + json!({ + "type": "object", + "properties": { + "request_id": { "type": "string" }, + "result": { "type": "string", "enum": ["ok"] }, + "output": {} + } + }) +} + +fn batch_output_schema() -> Value { + json!({ + "type": "object", + "properties": { + "results": { + "type": "array", + "items": { + "type": "object", + "properties": { + "request_id": { "type": "string" }, + "result": { "type": "string" }, + "output": {}, + "error": {} + } + } + } + } + }) +} + +fn json_response(schema: Value) -> Value { + json!({ + "description": "", + "content": { + CONTENT_JSON: { + "schema": schema, + } + } + }) +} + +fn sse_response(schema: Value) -> Value { + json!({ + "description": "", + "content": { + CONTENT_SSE: { + "schema": schema, + } + } + }) +} + +fn error_response(code: &str, message: &str) -> Value { + json!({ + "description": message, + "content": { + CONTENT_JSON: { + "schema": { + "type": "object", + "properties": { + "code": { "type": "string", "enum": [code] }, + "message": { "type": "string" }, + "retryable": { "type": "boolean" } + }, + "required": ["code", "message", "retryable"] + } + } + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use alknet_call::protocol::wire::ResponseEnvelope; + use alknet_call::registry::registration::{ + make_handler, HandlerRegistration, OperationProvenance, + }; + use alknet_call::registry::spec::{ + AccessControl, ErrorDefinition, OperationSpec, OperationType, + }; + use alknet_core::types::Capabilities; + + fn echo_handler() -> alknet_call::registry::registration::Handler { + make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }) + } + + fn register_op(registry: &mut OperationRegistry, spec: OperationSpec) { + registry.register(HandlerRegistration::new( + spec, + echo_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + } + + fn external_spec(name: &str) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Query, + Visibility::External, + json!({}), + json!({}), + vec![], + AccessControl::default(), + ) + } + + fn spec_with_errors(name: &str, errors: Vec) -> OperationSpec { + OperationSpec::new( + name, + OperationType::Mutation, + Visibility::External, + json!({}), + json!({}), + errors, + AccessControl::default(), + ) + } + + fn err(code: &str, status: Option) -> ErrorDefinition { + ErrorDefinition { + code: code.to_string(), + description: format!("{code} error"), + schema: json!({ "type": "object", "properties": { "msg": { "type": "string" } } }), + http_status: status, + } + } + + fn paths(spec: &OpenAPISpec) -> Vec { + spec.paths.keys().cloned().collect() + } + + #[test] + fn generated_doc_has_exactly_five_gateway_paths() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let mut p = paths(&spec); + p.sort(); + assert_eq!( + p, + vec!["/batch", "/call", "/schema", "/search", "/subscribe"] + ); + } + + #[test] + fn generated_doc_does_not_leak_registry_operations_as_paths() { + let mut registry = OperationRegistry::new(); + register_op(&mut registry, external_spec("fs/readFile")); + register_op(&mut registry, external_spec("agent/chat")); + let spec = to_openapi(®istry); + let p = paths(&spec); + assert!(!p.contains(&"/fs/readFile".to_string())); + assert!(!p.contains(&"/agent/chat".to_string())); + assert_eq!(p.len(), 5); + } + + #[test] + fn info_version_is_1_0_0() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + assert_eq!(spec.info.version, "1.0.0"); + } + + #[test] + fn call_request_schema_is_operation_and_input() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let call = &spec.paths["/call"].operations[0].1; + let body = call.request_body.as_ref().expect("request body"); + let schema = body.content.get(CONTENT_JSON).expect("json content"); + let props = schema + .get("properties") + .and_then(Value::as_object) + .expect("properties"); + assert!(props.contains_key("operation")); + let input = props.get("input").expect("input"); + assert_eq!(input.get("type").and_then(Value::as_str), Some("object")); + let required = schema + .get("required") + .and_then(Value::as_array) + .expect("required"); + assert!(required.iter().any(|v| v == "operation")); + } + + #[test] + fn subscribe_response_content_type_is_text_event_stream() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let subscribe = &spec.paths["/subscribe"].operations[0].1; + let resp = &subscribe.responses["200"]; + assert!(resp.content.contains_key(CONTENT_SSE)); + assert!(!resp.content.contains_key(CONTENT_JSON)); + } + + #[test] + fn call_responses_include_all_protocol_level_error_statuses() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let call = &spec.paths["/call"].operations[0].1; + for status in ["400", "401", "403", "404", "500", "504"] { + assert!( + call.responses.contains_key(status), + "missing protocol-level response {status}" + ); + } + } + + #[test] + fn call_responses_include_operation_level_errors_with_http_status() { + let mut registry = OperationRegistry::new(); + register_op( + &mut registry, + spec_with_errors( + "svc/op", + vec![ + err("RATE_LIMITED", Some(429)), + err("UNPROCESSABLE", Some(422)), + ], + ), + ); + let spec = to_openapi(®istry); + let call = &spec.paths["/call"].operations[0].1; + assert!(call.responses.contains_key("429")); + assert!(call.responses.contains_key("422")); + let resp429 = &call.responses["429"]; + let schema = resp429 + .content + .get(CONTENT_JSON) + .and_then(|v| v.get("properties")) + .and_then(|v| v.get("msg")) + .expect("projected error schema"); + assert_eq!(schema.get("type").and_then(Value::as_str), Some("string")); + } + + #[test] + fn call_responses_project_http_404_error_code_as_404_response() { + let mut registry = OperationRegistry::new(); + register_op( + &mut registry, + spec_with_errors("svc/op", vec![err("HTTP_404", Some(404))]), + ); + let spec = to_openapi(®istry); + let call = &spec.paths["/call"].operations[0].1; + assert!(call.responses.contains_key("404")); + } + + #[test] + fn call_responses_do_not_duplicate_protocol_level_status_with_operation_error() { + let mut registry = OperationRegistry::new(); + register_op( + &mut registry, + spec_with_errors("svc/op", vec![err("HTTP_500", Some(500))]), + ); + let spec = to_openapi(®istry); + let call = &spec.paths["/call"].operations[0].1; + assert!(call.responses.contains_key("500")); + } + + #[test] + fn operation_errors_without_http_status_are_not_projected() { + let mut registry = OperationRegistry::new(); + register_op( + &mut registry, + spec_with_errors("svc/op", vec![err("FILE_NOT_FOUND", None)]), + ); + let spec = to_openapi(®istry); + let call = &spec.paths["/call"].operations[0].1; + assert!(!call.responses.contains_key("0")); + assert!(call.responses.contains_key("500")); + } + + #[test] + fn to_openapi_is_a_pure_projection_and_not_an_operation_adapter() { + fn assert_not_adapter() {} + assert_not_adapter:: OpenAPISpec>(); + let mut registry = OperationRegistry::new(); + register_op(&mut registry, external_spec("svc/op")); + let before = registry.list_operations().len(); + let _ = to_openapi(®istry); + assert_eq!(registry.list_operations().len(), before); + } + + #[test] + fn batch_request_schema_is_array_of_call_request() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let batch = &spec.paths["/batch"].operations[0].1; + let body = batch.request_body.as_ref().expect("request body"); + let schema = body.content.get(CONTENT_JSON).expect("json content"); + assert_eq!(schema.get("type").and_then(Value::as_str), Some("array")); + } + + #[test] + fn subscribe_request_body_uses_call_input_schema() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let subscribe = &spec.paths["/subscribe"].operations[0].1; + let body = subscribe.request_body.as_ref().expect("request body"); + let schema = body.content.get(CONTENT_JSON).expect("json content"); + assert!(schema + .get("properties") + .and_then(Value::as_object) + .map(|m| m.contains_key("operation")) + .unwrap_or(false)); + } + + #[test] + fn search_and_schema_are_get_operations() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + assert_eq!(spec.paths["/search"].operations[0].0, "get"); + assert_eq!(spec.paths["/schema"].operations[0].0, "get"); + } + + #[test] + fn call_batch_subscribe_are_post_operations() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + assert_eq!(spec.paths["/call"].operations[0].0, "post"); + assert_eq!(spec.paths["/batch"].operations[0].0, "post"); + assert_eq!(spec.paths["/subscribe"].operations[0].0, "post"); + } + + #[test] + fn raw_doc_carries_openapi_3_0_and_gateway_version() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + assert_eq!( + spec.raw.get("openapi").and_then(Value::as_str), + Some("3.0.0") + ); + assert_eq!( + spec.raw + .get("info") + .and_then(|i| i.get("version")) + .and_then(Value::as_str), + Some("1.0.0") + ); + } +} diff --git a/crates/alknet-http/src/server/adapter.rs b/crates/alknet-http/src/server/adapter.rs index 51920d8..52bfb69 100644 --- a/crates/alknet-http/src/server/adapter.rs +++ b/crates/alknet-http/src/server/adapter.rs @@ -5,8 +5,8 @@ //! custom routes + decoy fallback) and drives hyper's HTTP/1.1 or HTTP/2 //! connection driver over a single QUIC bidirectional stream. The 5 gateway //! endpoints (`/search`/`/schema`/`/call`/`/batch`/`/subscribe`) are wired in -//! from `gateway_routes`; `/openapi.json`, the MCP route, and the WS upgrade -//! handler remain placeholder 501 handlers pending their respective tasks. +//! from `gateway_routes`; `/openapi.json` serves the `to_openapi` projection +//! of the registry. use std::io; use std::path::PathBuf; @@ -14,6 +14,7 @@ use std::pin::Pin; use std::sync::Arc; use async_trait::async_trait; +use axum::extract::State; use axum::http::StatusCode; use axum::middleware::from_fn_with_state; use axum::response::IntoResponse; @@ -33,12 +34,13 @@ use super::auth::bearer_auth_middleware; use super::decoy::decoy_fallback; use super::gateway_routes; use super::healthz::healthz; -use crate::websocket::upgrade::ws_upgrade_handler; -use crate::websocket::upgrade::WS_UPGRADE_PATH; #[cfg(feature = "mcp")] use crate::adapters::to_mcp_service; +use crate::adapters::to_openapi; #[cfg(feature = "mcp")] use crate::gateway::GatewayDispatch; +use crate::websocket::upgrade::ws_upgrade_handler; +use crate::websocket::upgrade::WS_UPGRADE_PATH; const ALPN_HTTP1: &[u8] = b"http/1.1"; const ALPN_H2: &[u8] = b"h2"; @@ -47,8 +49,12 @@ const ALPN_H2: &[u8] = b"h2"; pub enum DecoyConfig { #[default] NotFound, - StaticSite { root: PathBuf }, - Redirect { to: String }, + StaticSite { + root: PathBuf, + }, + Redirect { + to: String, + }, } #[derive(Clone)] @@ -87,11 +93,17 @@ pub struct HttpAdapter { } impl HttpAdapter { - pub fn new(identity_provider: Arc, registry: Arc) -> Self { + pub fn new( + identity_provider: Arc, + registry: Arc, + ) -> Self { Self::for_alpn(identity_provider, registry, ALPN_HTTP1) } - pub fn h2(identity_provider: Arc, registry: Arc) -> Self { + pub fn h2( + identity_provider: Arc, + registry: Arc, + ) -> Self { Self::for_alpn(identity_provider, registry, ALPN_H2) } @@ -163,16 +175,22 @@ fn build_router(state: RouterState, extra_routes: Option) -> Router { )); Router::new() .nest_service("/mcp", to_mcp_service(dispatch)) - .layer(from_fn_with_state(auth_state.clone(), bearer_auth_middleware)) + .layer(from_fn_with_state( + auth_state.clone(), + bearer_auth_middleware, + )) }; #[cfg(not(feature = "mcp"))] let mcp_router: Router = Router::new(); let default: Router = Router::new() .merge(gateway_routes::gateway_router()) - .route("/openapi.json", get(not_implemented)) + .route("/openapi.json", get(openapi_json_handler)) .route(WS_UPGRADE_PATH, get(ws_upgrade_handler)) - .route_layer(from_fn_with_state(auth_state.clone(), bearer_auth_middleware)) + .route_layer(from_fn_with_state( + auth_state.clone(), + bearer_auth_middleware, + )) .route("/healthz", get(healthz)) .fallback(decoy_fallback) .merge(mcp_router); @@ -188,8 +206,16 @@ fn build_router(state: RouterState, extra_routes: Option) -> Router { with_extras.with_state(state) } -async fn not_implemented() -> impl IntoResponse { - (StatusCode::NOT_IMPLEMENTED, "501 Not Implemented") +async fn openapi_json_handler(State(registry): State>) -> impl IntoResponse { + let spec = to_openapi(®istry); + ( + StatusCode::OK, + [( + axum::http::header::CONTENT_TYPE, + axum::http::HeaderValue::from_static("application/json"), + )], + axum::Json(spec.raw), + ) } #[async_trait] @@ -203,7 +229,10 @@ impl ProtocolHandler for HttpAdapter { let _ = connection.set_identity(identity); } - let (send, recv) = connection.accept_bi().await.map_err(stream_error_to_handler)?; + let (send, recv) = connection + .accept_bi() + .await + .map_err(stream_error_to_handler)?; let io = QuicStream::new(send, recv); self.serve_io(io).await } @@ -295,7 +324,10 @@ mod tests { fn resolve_from_fingerprint(&self, _: &str) -> Option { None } - fn resolve_from_token(&self, _: &alknet_core::auth::AuthToken) -> Option { + fn resolve_from_token( + &self, + _: &alknet_core::auth::AuthToken, + ) -> Option { None } } @@ -341,7 +373,9 @@ mod tests { #[test] fn with_decoy_updates_decoy() { let adapter = HttpAdapter::new(provider(), empty_registry()); - let adapter = adapter.with_decoy(DecoyConfig::Redirect { to: "https://example.com".to_string() }); + let adapter = adapter.with_decoy(DecoyConfig::Redirect { + to: "https://example.com".to_string(), + }); assert!(matches!(adapter.decoy(), DecoyConfig::Redirect { .. })); } @@ -386,7 +420,10 @@ mod tests { ) -> (String, tokio::task::JoinHandle<()>) { let (mut client_send, server_recv) = duplex(8 * 1024); let (server_send, mut client_recv) = duplex(8 * 1024); - let server_io = QuicStreamDuplex { read: server_recv, write: server_send }; + let server_io = QuicStreamDuplex { + read: server_recv, + write: server_send, + }; let adapter = HttpAdapter::new(provider(), empty_registry()); let handle = tokio::spawn(async move { @@ -399,7 +436,12 @@ mod tests { let mut response = Vec::new(); let mut buf = [0u8; 4096]; loop { - match tokio::time::timeout(std::time::Duration::from_secs(5), client_recv.read(&mut buf)).await { + match tokio::time::timeout( + std::time::Duration::from_secs(5), + client_recv.read(&mut buf), + ) + .await + { Ok(Ok(0)) => break, Ok(Ok(n)) => response.extend_from_slice(&buf[..n]), Ok(Err(_)) => break, @@ -455,21 +497,24 @@ mod tests { let request = b"GET /healthz HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"; let (response, handle) = send_request_and_read_response(request).await; handle.await.ok(); - assert!(response.starts_with("HTTP/1.1 200 "), "expected 200, got: {response}"); + assert!( + response.starts_with("HTTP/1.1 200 "), + "expected 200, got: {response}" + ); assert!(response.contains("\r\n\r\nok")); } #[tokio::test] async fn custom_route_v1_foo_coexists_with_default_surface() { - let extra = Router::new().route( - "/v1/foo", - get(|| async { (StatusCode::OK, "foo-body") }), - ); + let extra = Router::new().route("/v1/foo", get(|| async { (StatusCode::OK, "foo-body") })); let adapter = HttpAdapter::new(provider(), empty_registry()).with_extra_routes(extra); let (mut client_send, server_recv) = duplex(8 * 1024); let (server_send, mut client_recv) = duplex(8 * 1024); - let server_io = QuicStreamDuplex { read: server_recv, write: server_send }; + let server_io = QuicStreamDuplex { + read: server_recv, + write: server_send, + }; let handle = tokio::spawn(async move { adapter.serve_io(server_io).await.ok(); @@ -482,7 +527,12 @@ mod tests { let mut response = Vec::new(); let mut buf = [0u8; 4096]; loop { - match tokio::time::timeout(std::time::Duration::from_secs(5), client_recv.read(&mut buf)).await { + match tokio::time::timeout( + std::time::Duration::from_secs(5), + client_recv.read(&mut buf), + ) + .await + { Ok(Ok(0)) => break, Ok(Ok(n)) => response.extend_from_slice(&buf[..n]), Ok(Err(_)) => break, @@ -491,7 +541,10 @@ mod tests { } handle.await.ok(); let response_str = String::from_utf8_lossy(&response); - assert!(response_str.starts_with("HTTP/1.1 200 "), "expected 200, got: {response_str}"); + assert!( + response_str.starts_with("HTTP/1.1 200 "), + "expected 200, got: {response_str}" + ); assert!(response_str.contains("foo-body")); } @@ -505,7 +558,10 @@ mod tests { let (mut client_send, server_recv) = duplex(8 * 1024); let (server_send, mut client_recv) = duplex(8 * 1024); - let server_io = QuicStreamDuplex { read: server_recv, write: server_send }; + let server_io = QuicStreamDuplex { + read: server_recv, + write: server_send, + }; let handle = tokio::spawn(async move { adapter.serve_io(server_io).await.ok(); @@ -518,7 +574,12 @@ mod tests { let mut response = Vec::new(); let mut buf = [0u8; 4096]; loop { - match tokio::time::timeout(std::time::Duration::from_secs(5), client_recv.read(&mut buf)).await { + match tokio::time::timeout( + std::time::Duration::from_secs(5), + client_recv.read(&mut buf), + ) + .await + { Ok(Ok(0)) => break, Ok(Ok(n)) => response.extend_from_slice(&buf[..n]), Ok(Err(_)) => break, @@ -527,7 +588,10 @@ mod tests { } handle.await.ok(); let response_str = String::from_utf8_lossy(&response); - assert!(response_str.starts_with("HTTP/1.1 200 "), "default GET /healthz wins, got: {response_str}"); + assert!( + response_str.starts_with("HTTP/1.1 200 "), + "default GET /healthz wins, got: {response_str}" + ); assert!(response_str.contains("\r\n\r\nok")); assert!(!response_str.contains("custom-healthz")); } @@ -547,7 +611,12 @@ mod tests { let mut response = Vec::new(); let mut buf = [0u8; 4096]; loop { - match tokio::time::timeout(std::time::Duration::from_secs(5), client_recv.read(&mut buf)).await { + match tokio::time::timeout( + std::time::Duration::from_secs(5), + client_recv.read(&mut buf), + ) + .await + { Ok(Ok(0)) => break, Ok(Ok(n)) => response.extend_from_slice(&buf[..n]), Ok(Err(_)) => break, @@ -569,7 +638,10 @@ mod tests { .with_extra_routes(extra); let request = b"POST /v1/chat/completions HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"; let response = serve_and_read(adapter, request).await; - assert!(response.starts_with("HTTP/1.1 200"), "expected 200, got: {response}"); + assert!( + response.starts_with("HTTP/1.1 200"), + "expected 200, got: {response}" + ); assert!(response.contains("oai-proxy")); assert!(!response.contains("404 Not Found")); } @@ -583,32 +655,43 @@ mod tests { let adapter = HttpAdapter::new(provider(), empty_registry()) .with_decoy(DecoyConfig::NotFound) .with_extra_routes(extra); - let request = b"GET /totally/unknown HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"; + let request = + b"GET /totally/unknown HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"; let response = serve_and_read(adapter, request).await; - assert!(response.starts_with("HTTP/1.1 404"), "expected 404 decoy, got: {response}"); + assert!( + response.starts_with("HTTP/1.1 404"), + "expected 404 decoy, got: {response}" + ); assert!(response.contains("404 Not Found")); } #[tokio::test] async fn healthz_takes_precedence_over_decoy() { - let adapter = HttpAdapter::new(provider(), empty_registry()) - .with_decoy(DecoyConfig::Redirect { + let adapter = + HttpAdapter::new(provider(), empty_registry()).with_decoy(DecoyConfig::Redirect { to: "https://example.com".to_string(), }); let request = b"GET /healthz HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"; let response = serve_and_read(adapter, request).await; - assert!(response.starts_with("HTTP/1.1 200"), "expected 200 healthz, got: {response}"); + assert!( + response.starts_with("HTTP/1.1 200"), + "expected 200 healthz, got: {response}" + ); assert!(response.contains("\r\n\r\nok")); } #[tokio::test] async fn unknown_path_with_redirect_decoy_returns_redirect_over_wire() { - let adapter = HttpAdapter::new(provider(), empty_registry()).with_decoy(DecoyConfig::Redirect { - to: "https://example.com".to_string(), - }); + let adapter = + HttpAdapter::new(provider(), empty_registry()).with_decoy(DecoyConfig::Redirect { + to: "https://example.com".to_string(), + }); let request = b"GET /nope HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"; let response = serve_and_read(adapter, request).await; - assert!(response.starts_with("HTTP/1.1 302"), "expected 302 redirect, got: {response}"); + assert!( + response.starts_with("HTTP/1.1 302"), + "expected 302 redirect, got: {response}" + ); assert!(response.contains("location: https://example.com")); } -} \ No newline at end of file +} From dd6aacc59841d93a8a5596cfa827245eca967a0b Mon Sep 17 00:00:00 2001 From: "glm-5.2" Date: Wed, 1 Jul 2026 20:11:09 +0000 Subject: [PATCH 2/2] feat(http): complete to_openapi gateway projection with error fidelity and route wiring MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refine to_openapi to project operation-level errors (with http_status) onto /call and /subscribe responses via oneOf merge with protocol-level errors, preserving HTTP_ prefix codes without collision. Fix BTreeMap→serde_json::Map for Value::Object compatibility. Wire GET /openapi.json route test. Apply cargo fmt across the crate. --- .../src/adapters/from_mcp/tests.rs | 12 +- .../alknet-http/src/adapters/from_openapi.rs | 139 +- crates/alknet-http/src/adapters/to_mcp.rs | 183 ++- crates/alknet-http/src/adapters/to_openapi.rs | 1164 +++++++++++------ crates/alknet-http/src/client/http_client.rs | 36 +- crates/alknet-http/src/client/mod.rs | 2 +- crates/alknet-http/src/client/retry_after.rs | 16 +- crates/alknet-http/src/gateway/dispatch.rs | 35 +- crates/alknet-http/src/gateway/error.rs | 31 +- crates/alknet-http/src/server/adapter.rs | 18 + crates/alknet-http/src/server/auth.rs | 52 +- crates/alknet-http/src/server/decoy.rs | 29 +- .../alknet-http/src/server/gateway_routes.rs | 42 +- crates/alknet-http/src/server/healthz.rs | 2 +- crates/alknet-http/src/websocket/mod.rs | 10 +- crates/alknet-http/src/websocket/upgrade.rs | 105 +- .../alknet-http/tests/from_mcp_integration.rs | 34 +- 17 files changed, 1227 insertions(+), 683 deletions(-) diff --git a/crates/alknet-http/src/adapters/from_mcp/tests.rs b/crates/alknet-http/src/adapters/from_mcp/tests.rs index 8381a19..3274937 100644 --- a/crates/alknet-http/src/adapters/from_mcp/tests.rs +++ b/crates/alknet-http/src/adapters/from_mcp/tests.rs @@ -22,7 +22,11 @@ fn make_tool(name: &str, input: Value, output: Option) -> Tool { tool } -fn call_tool_result(content: Vec, structured: Option, is_error: Option) -> CallToolResult { +fn call_tool_result( + content: Vec, + structured: Option, + is_error: Option, +) -> CallToolResult { let json = serde_json::json!({ "content": content, "structuredContent": structured, @@ -204,7 +208,9 @@ fn build_spec_output_schema_present_shape() { let tool = make_tool( "get_weather", serde_json::json!({ "type": "object", "properties": { "city": { "type": "string" } } }), - Some(serde_json::json!({ "type": "object", "properties": { "temperature": { "type": "number" } } })), + Some( + serde_json::json!({ "type": "object", "properties": { "temperature": { "type": "number" } } }), + ), ); let spec = build_spec(&tool, "weather"); assert_eq!(spec.name, "weather/get_weather"); @@ -248,4 +254,4 @@ async fn forwarding_handler_reads_capabilities_not_env_vars() { let adapter = FromMCP::new("http://127.0.0.1:1/mcp", "ns"); let _ = adapter.auth_token(); assert!(adapter.auth_token().is_none()); -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/adapters/from_openapi.rs b/crates/alknet-http/src/adapters/from_openapi.rs index 58f3e95..a57f2bb 100644 --- a/crates/alknet-http/src/adapters/from_openapi.rs +++ b/crates/alknet-http/src/adapters/from_openapi.rs @@ -17,10 +17,10 @@ use std::sync::Arc; use alknet_call::client::{AdapterError, OperationAdapter}; use alknet_call::protocol::wire::{CallError, ResponseEnvelope}; use alknet_call::registry::context::OperationContext; -use alknet_call::registry::registration::{ - make_handler, HandlerRegistration, OperationProvenance, +use alknet_call::registry::registration::{make_handler, HandlerRegistration, OperationProvenance}; +use alknet_call::registry::spec::{ + AccessControl, ErrorDefinition, OperationSpec, OperationType, Visibility, }; -use alknet_call::registry::spec::{AccessControl, ErrorDefinition, OperationSpec, OperationType, Visibility}; use alknet_core::types::Capabilities; use async_trait::async_trait; use futures::StreamExt; @@ -128,11 +128,9 @@ impl OpenAPISpec { .to_string(), }; - let paths_raw = raw - .get("paths") - .ok_or_else(|| AdapterError::SchemaParse { - message: "OpenAPI document missing `paths`".into(), - })?; + let paths_raw = raw.get("paths").ok_or_else(|| AdapterError::SchemaParse { + message: "OpenAPI document missing `paths`".into(), + })?; if !paths_raw.is_object() { return Err(AdapterError::SchemaParse { message: "`paths` must be a JSON object".into(), @@ -155,14 +153,13 @@ impl OpenAPISpec { if operations.is_empty() { continue; } - paths.insert( - path.clone(), - PathItem { operations }, - ); + paths.insert(path.clone(), PathItem { operations }); } - let components = raw.get("components").and_then(|c| c.get("schemas")).and_then( - |schemas| { + let components = raw + .get("components") + .and_then(|c| c.get("schemas")) + .and_then(|schemas| { if !schemas.is_object() { return None; } @@ -171,8 +168,7 @@ impl OpenAPISpec { map.insert(k.clone(), v.clone()); } Some(Components { schemas: map }) - }, - ); + }); Ok(Self { info, @@ -190,11 +186,9 @@ impl OpenAPISpec { } let mut current: &Value = &self.raw; for part in reference.trim_start_matches("#/").split('/') { - current = current - .get(part) - .ok_or_else(|| AdapterError::SchemaParse { - message: format!("cannot resolve $ref: {reference}"), - })?; + current = current.get(part).ok_or_else(|| AdapterError::SchemaParse { + message: format!("cannot resolve $ref: {reference}"), + })?; } Ok(current.clone()) } @@ -241,10 +235,7 @@ fn parse_operation(raw: &Value) -> Option { .filter_map(|p| { let name = p.get("name")?.as_str()?.to_string(); let in_ = p.get("in")?.as_str()?.to_string(); - let required = p - .get("required") - .and_then(|v| v.as_bool()) - .unwrap_or(false); + let required = p.get("required").and_then(|v| v.as_bool()).unwrap_or(false); let schema = p.get("schema").cloned(); Some(Parameter { name, @@ -297,7 +288,11 @@ pub struct FromOpenAPI { } impl FromOpenAPI { - pub fn new(spec: OpenAPISpec, config: HttpServiceConfig, http_client: Arc) -> Self { + pub fn new( + spec: OpenAPISpec, + config: HttpServiceConfig, + http_client: Arc, + ) -> Self { Self { spec, config, @@ -322,10 +317,7 @@ impl FromOpenAPI { } fn detect_op_type(method: &str, op: &Operation) -> OperationType { - let success = op - .responses - .get("200") - .or_else(|| op.responses.get("201")); + let success = op.responses.get("200").or_else(|| op.responses.get("201")); if let Some(resp) = success { if resp.content.contains_key("text/event-stream") { return OperationType::Subscription; @@ -531,9 +523,8 @@ fn build_request( } } - let base = Url::parse(base_url).map_err(|e| { - CallError::internal(format!("invalid base_url `{base_url}`: {e}")) - })?; + let base = Url::parse(base_url) + .map_err(|e| CallError::internal(format!("invalid base_url `{base_url}`: {e}")))?; let mut url = base .join(url_path.trim_start_matches('/')) .map_err(|e| CallError::internal(format!("invalid path `{url_path}`: {e}")))?; @@ -683,11 +674,12 @@ async fn forward( .find(|(s, _)| *s == status.as_u16()) .map(|(_, code)| code.clone()) .unwrap_or_else(|| format!("HTTP_{}", status.as_u16())); - let message = format!("HTTP {}: {}", status.as_u16(), status.canonical_reason().unwrap_or("")); - return ResponseEnvelope::error( - request_id, - CallError::new(code, message, false), + let message = format!( + "HTTP {}: {}", + status.as_u16(), + status.canonical_reason().unwrap_or("") ); + return ResponseEnvelope::error(request_id, CallError::new(code, message, false)); } let content_type = response @@ -716,10 +708,7 @@ async fn forward( } else { match response.bytes().await { Ok(b) => { - let arr: Vec = b - .iter() - .map(|byte| Value::Number((*byte).into())) - .collect(); + let arr: Vec = b.iter().map(|byte| Value::Number((*byte).into())).collect(); ResponseEnvelope::ok(request_id, Value::Array(arr)) } Err(err) => ResponseEnvelope::error( @@ -744,7 +733,8 @@ async fn stream_subscription(request_id: String, response: reqwest::Response) -> let parsed = if event.data.trim().is_empty() { Value::Null } else { - serde_json::from_str(&event.data).unwrap_or(Value::String(event.data.clone())) + serde_json::from_str(&event.data) + .unwrap_or(Value::String(event.data.clone())) }; last_event = Some(parsed.clone()); } @@ -1040,7 +1030,12 @@ mod tests { .unwrap(); let body = props.get("body").unwrap(); assert_eq!(body.get("type").unwrap(), "object"); - assert!(body.get("properties").unwrap().as_object().unwrap().contains_key("name")); + assert!(body + .get("properties") + .unwrap() + .as_object() + .unwrap() + .contains_key("name")); } #[tokio::test] @@ -1074,14 +1069,19 @@ mod tests { "https://api.vast.ai", "/machines", "GET", - &Some(HttpAuthScheme::ApiKey { header_name: "X-API-Key".to_string() }), + &Some(HttpAuthScheme::ApiKey { + header_name: "X-API-Key".to_string(), + }), &HashMap::new(), "vastai", &serde_json::json!({}), &ctx, ) .unwrap(); - assert_eq!(headers.get("X-API-Key").unwrap().to_str().unwrap(), "key-xyz"); + assert_eq!( + headers.get("X-API-Key").unwrap().to_str().unwrap(), + "key-xyz" + ); } #[tokio::test] @@ -1267,7 +1267,11 @@ mod tests { #[test] fn http_service_config_struct_fields() { - let cfg = config("ns", "https://api.example.com", Some(HttpAuthScheme::Bearer)); + let cfg = config( + "ns", + "https://api.example.com", + Some(HttpAuthScheme::Bearer), + ); assert_eq!(cfg.namespace, "ns"); assert_eq!(cfg.base_url, "https://api.example.com"); assert!(matches!(cfg.auth, Some(HttpAuthScheme::Bearer))); @@ -1289,7 +1293,12 @@ mod tests { }"#; let spec = OpenAPISpec::from_json(doc).unwrap(); assert!(spec.components.is_some()); - assert!(spec.components.as_ref().unwrap().schemas.contains_key("Foo")); + assert!(spec + .components + .as_ref() + .unwrap() + .schemas + .contains_key("Foo")); } #[tokio::test] @@ -1342,7 +1351,9 @@ mod tests { #[tokio::test] async fn resolve_ref_missing_target_returns_schema_parse() { let spec = OpenAPISpec::from_json(minimal_spec_json()).unwrap(); - let err = spec.resolve_ref("#/components/schemas/Missing").unwrap_err(); + let err = spec + .resolve_ref("#/components/schemas/Missing") + .unwrap_err(); assert!(matches!(err, AdapterError::SchemaParse { .. })); } @@ -1409,7 +1420,8 @@ mod tests { headers, body, }); - let response = "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 2\r\n\r\n{}"; + let response = + "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: 2\r\n\r\n{}"; sock.write_all(response.as_bytes()).await.unwrap(); sock.flush().await.unwrap(); }); @@ -1440,12 +1452,19 @@ mod tests { ctx, ) .await; - assert!(response.result.is_ok(), "expected Ok, got {:?}", response.result); + assert!( + response.result.is_ok(), + "expected Ok, got {:?}", + response.result + ); let captured = rx.await.unwrap(); assert_eq!(captured.method, "POST"); assert_eq!(captured.path, "/items/42"); assert_eq!(captured.query, "filter=new"); - assert_eq!(captured.headers.get("content-type").unwrap(), "application/json"); + assert_eq!( + captured.headers.get("content-type").unwrap(), + "application/json" + ); assert!(captured.body.contains("\"name\":\"widget\"")); } @@ -1457,19 +1476,19 @@ mod tests { }"#; let (base, rx) = spawn_capturing_server().await; let spec = OpenAPISpec::from_json(doc).unwrap(); - let bundles = adapter( - spec, - config("openai", &base, Some(HttpAuthScheme::Bearer)), - ) - .import() - .await - .unwrap(); + let bundles = adapter(spec, config("openai", &base, Some(HttpAuthScheme::Bearer))) + .import() + .await + .unwrap(); let registration = &bundles[0]; let caps = Capabilities::new().with_http_token("openai", "sk-test-token".to_string()); let ctx = noop_context("req-17", caps); let _ = (registration.handler)(serde_json::json!({}), ctx).await; let captured = rx.await.unwrap(); - assert_eq!(captured.headers.get("authorization").unwrap(), "Bearer sk-test-token"); + assert_eq!( + captured.headers.get("authorization").unwrap(), + "Bearer sk-test-token" + ); } #[tokio::test] @@ -1527,4 +1546,4 @@ mod tests { other => panic!("expected HTTP_500, got {other:?}"), } } -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/adapters/to_mcp.rs b/crates/alknet-http/src/adapters/to_mcp.rs index 56a026a..96a8697 100644 --- a/crates/alknet-http/src/adapters/to_mcp.rs +++ b/crates/alknet-http/src/adapters/to_mcp.rs @@ -36,8 +36,8 @@ use rmcp::model::{ }; use rmcp::service::{RequestContext, RoleServer}; use rmcp::transport::{ - StreamableHttpServerConfig, streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService}, + StreamableHttpServerConfig, }; use serde_json::{Map, Value}; @@ -133,7 +133,10 @@ impl ToMcpGateway { fn extract_identity_from_extensions(extensions: &rmcp::model::Extensions) -> Option { let parts = extensions.get::()?; - parts.extensions.get::>().and_then(Option::clone) + parts + .extensions + .get::>() + .and_then(Option::clone) } async fn handle_search(&self, identity: Option) -> CallToolResult { @@ -144,8 +147,15 @@ impl ToMcpGateway { map_search_response(response, identity.as_ref()) } - async fn handle_schema(&self, arguments: Option, identity: Option) -> CallToolResult { - let name = match arguments.and_then(|mut a| a.remove("name")).and_then(|v| v.as_str().map(str::to_string)) { + async fn handle_schema( + &self, + arguments: Option, + identity: Option, + ) -> CallToolResult { + let name = match arguments + .and_then(|mut a| a.remove("name")) + .and_then(|v| v.as_str().map(str::to_string)) + { Some(n) => n, None => { return CallToolResult::structured_error(serde_json::json!({ @@ -156,12 +166,20 @@ impl ToMcpGateway { }; let response = self .dispatch - .invoke(identity, OP_SERVICES_SCHEMA, serde_json::json!({ "name": name })) + .invoke( + identity, + OP_SERVICES_SCHEMA, + serde_json::json!({ "name": name }), + ) .await; envelope_to_call_tool_result(response) } - async fn handle_call(&self, arguments: Option, identity: Option) -> CallToolResult { + async fn handle_call( + &self, + arguments: Option, + identity: Option, + ) -> CallToolResult { let (operation, input) = match parse_call_arguments(arguments) { Ok(pair) => pair, Err(err) => return err, @@ -170,7 +188,11 @@ impl ToMcpGateway { envelope_to_call_tool_result(response) } - async fn handle_batch(&self, arguments: Option, identity: Option) -> CallToolResult { + async fn handle_batch( + &self, + arguments: Option, + identity: Option, + ) -> CallToolResult { let calls = match arguments .and_then(|mut a| a.remove("calls")) .and_then(|v| v.as_array().cloned()) @@ -193,7 +215,10 @@ impl ToMcpGateway { continue; } }; - let response = self.dispatch.invoke(identity.clone(), &operation, input).await; + let response = self + .dispatch + .invoke(identity.clone(), &operation, input) + .await; results.push(envelope_to_value(response)); } CallToolResult::structured(Value::Array(results)) @@ -210,7 +235,10 @@ fn parse_call_arguments(arguments: Option) -> Result<(String, Value) }))); } }; - let operation = match map.remove("operation").and_then(|v| v.as_str().map(str::to_string)) { + let operation = match map + .remove("operation") + .and_then(|v| v.as_str().map(str::to_string)) + { Some(s) => s, None => { return Err(CallToolResult::structured_error(serde_json::json!({ @@ -359,7 +387,11 @@ impl rmcp::handler::server::ServerHandler for ToMcpGateway { TOOL_CALL => this.handle_call(arguments, identity).await, TOOL_BATCH => this.handle_batch(arguments, identity).await, unknown => { - let err = CallError::new("NOT_FOUND", format!("unknown gateway tool: {unknown}"), false); + let err = CallError::new( + "NOT_FOUND", + format!("unknown gateway tool: {unknown}"), + false, + ); call_error_to_structured_error(err) } }; @@ -368,9 +400,7 @@ impl rmcp::handler::server::ServerHandler for ToMcpGateway { } fn get_info(&self) -> ServerInfo { - let capabilities = ServerCapabilities::builder() - .enable_tools() - .build(); + let capabilities = ServerCapabilities::builder().enable_tools().build(); ServerInfo::new(capabilities) .with_server_info(Implementation::new( "alknet-to-mcp", @@ -462,10 +492,14 @@ mod tests { } fn make_echo_handler() -> alknet_call::registry::registration::Handler { - make_handler(|input, context| async move { ResponseEnvelope::ok(context.request_id, input) }) + make_handler( + |input, context| async move { ResponseEnvelope::ok(context.request_id, input) }, + ) } - fn full_registry_with_ops(specs: Vec<(String, OperationType, AccessControl)>) -> Arc { + fn full_registry_with_ops( + specs: Vec<(String, OperationType, AccessControl)>, + ) -> Arc { let mut inner = OperationRegistry::new(); for (name, op_type, acl) in specs { inner.register(HandlerRegistration::new( @@ -509,7 +543,10 @@ mod tests { Arc::new(dispatch_registry) } - fn dispatch(registry: Arc, provider: Arc) -> Arc { + fn dispatch( + registry: Arc, + provider: Arc, + ) -> Arc { Arc::new(GatewayDispatch::new(registry, provider)) } @@ -542,7 +579,11 @@ mod tests { TOOL_CALL => gateway.handle_call(arguments, identity).await, TOOL_BATCH => gateway.handle_batch(arguments, identity).await, unknown => { - let err = CallError::new("NOT_FOUND", format!("unknown gateway tool: {unknown}"), false); + let err = CallError::new( + "NOT_FOUND", + format!("unknown gateway tool: {unknown}"), + false, + ); call_error_to_structured_error(err) } } @@ -550,10 +591,7 @@ mod tests { #[tokio::test] async fn list_tools_returns_exactly_four_gateway_tools() { - let _gateway = ToMcpGateway::new(dispatch( - full_registry_with_ops(vec![]), - provider(), - )); + let _gateway = ToMcpGateway::new(dispatch(full_registry_with_ops(vec![]), provider())); let tools = gateway_tools(); let names: Vec = tools.iter().map(|t| t.name.to_string()).collect(); assert_eq!(names.len(), 4); @@ -583,7 +621,11 @@ mod tests { #[tokio::test] async fn search_returns_access_control_filtered_ops_excluding_subscriptions() { let registry = full_registry_with_ops(vec![ - ("public/echo".to_string(), OperationType::Query, AccessControl::default()), + ( + "public/echo".to_string(), + OperationType::Query, + AccessControl::default(), + ), ( "admin/secret".to_string(), OperationType::Query, @@ -592,13 +634,22 @@ mod tests { ..Default::default() }, ), - ("events/stream".to_string(), OperationType::Subscription, AccessControl::default()), + ( + "events/stream".to_string(), + OperationType::Subscription, + AccessControl::default(), + ), ]); let idp: Arc = Arc::new(StaticIdentityProvider::new()); let gateway = ToMcpGateway::new(dispatch(registry, idp)); - let result = invoke_tool(&gateway, "search", None, Some(identity_with_scopes("user", &["user"]))) - .await; + let result = invoke_tool( + &gateway, + "search", + None, + Some(identity_with_scopes("user", &["user"])), + ) + .await; assert_eq!(result.is_error, Some(false)); let structured = result.structured_content.expect("structured present"); let ops = structured @@ -610,11 +661,23 @@ mod tests { .filter_map(|o| o.get("name").and_then(Value::as_str)) .collect(); assert!(names.contains(&"public/echo")); - assert!(!names.contains(&"admin/secret"), "ACL-filtered op must not appear"); - assert!(!names.contains(&"events/stream"), "Subscription op must be excluded"); + assert!( + !names.contains(&"admin/secret"), + "ACL-filtered op must not appear" + ); + assert!( + !names.contains(&"events/stream"), + "Subscription op must be excluded" + ); for op in ops { - assert!(op.get("description").is_some(), "each entry has a description"); - assert!(op.get("input_schema").is_none(), "search must not return full schemas"); + assert!( + op.get("description").is_some(), + "each entry has a description" + ); + assert!( + op.get("input_schema").is_none(), + "search must not return full schemas" + ); } } @@ -632,7 +695,10 @@ mod tests { let result = invoke_tool(&gateway, "schema", Some(args), None).await; assert_eq!(result.is_error, Some(false)); let structured = result.structured_content.expect("structured present"); - assert_eq!(structured.get("name"), Some(&Value::String("fs/readFile".to_string()))); + assert_eq!( + structured.get("name"), + Some(&Value::String("fs/readFile".to_string())) + ); assert!(structured.get("input_schema").is_some()); assert!(structured.get("output_schema").is_some()); assert!(structured.get("error_schemas").is_some()); @@ -649,7 +715,10 @@ mod tests { let gateway = ToMcpGateway::new(dispatch(registry, provider())); let mut args = Map::new(); - args.insert("operation".to_string(), Value::String("echo/run".to_string())); + args.insert( + "operation".to_string(), + Value::String("echo/run".to_string()), + ); args.insert("input".to_string(), serde_json::json!({ "msg": "hi" })); let result = invoke_tool(&gateway, "call", Some(args), None).await; assert_eq!(result.is_error, Some(false)); @@ -665,12 +734,18 @@ mod tests { let gateway = ToMcpGateway::new(dispatch(registry, provider())); let mut args = Map::new(); - args.insert("operation".to_string(), Value::String("no/such".to_string())); + args.insert( + "operation".to_string(), + Value::String("no/such".to_string()), + ); args.insert("input".to_string(), Value::Object(Map::new())); let result = invoke_tool(&gateway, "call", Some(args), None).await; assert_eq!(result.is_error, Some(true)); let structured = result.structured_content.expect("structured error present"); - assert_eq!(structured.get("code"), Some(&Value::String("NOT_FOUND".to_string()))); + assert_eq!( + structured.get("code"), + Some(&Value::String("NOT_FOUND".to_string())) + ); } #[tokio::test] @@ -713,12 +788,18 @@ mod tests { let gateway = ToMcpGateway::new(dispatch(registry, idp)); let mut args = Map::new(); - args.insert("operation".to_string(), Value::String("admin/run".to_string())); + args.insert( + "operation".to_string(), + Value::String("admin/run".to_string()), + ); args.insert("input".to_string(), Value::Object(Map::new())); let result = invoke_tool(&gateway, "call", Some(args), None).await; assert_eq!(result.is_error, Some(true)); let structured = result.structured_content.expect("structured error present"); - assert_eq!(structured.get("code"), Some(&Value::String("FORBIDDEN".to_string()))); + assert_eq!( + structured.get("code"), + Some(&Value::String("FORBIDDEN".to_string())) + ); } #[tokio::test] @@ -727,7 +808,10 @@ mod tests { let result = invoke_tool(&gateway, "bogus", None, None).await; assert_eq!(result.is_error, Some(true)); let structured = result.structured_content.expect("structured error present"); - assert_eq!(structured.get("code"), Some(&Value::String("NOT_FOUND".to_string()))); + assert_eq!( + structured.get("code"), + Some(&Value::String("NOT_FOUND".to_string())) + ); } #[tokio::test] @@ -749,10 +833,16 @@ mod tests { let admin_identity = identity_with_scopes("admin-peer", &["admin"]); let extensions = extensions_with_identity(Some(admin_identity.clone())); let extracted = ToMcpGateway::extract_identity_from_extensions(&extensions); - assert_eq!(extracted.as_ref().map(|i| &i.id), Some(&"admin-peer".to_string())); + assert_eq!( + extracted.as_ref().map(|i| &i.id), + Some(&"admin-peer".to_string()) + ); let mut args = Map::new(); - args.insert("operation".to_string(), Value::String("admin/run".to_string())); + args.insert( + "operation".to_string(), + Value::String("admin/run".to_string()), + ); args.insert("input".to_string(), serde_json::json!({ "ok": 1 })); let result = gateway.handle_call(Some(args), extracted).await; assert_eq!(result.is_error, Some(false)); @@ -779,7 +869,10 @@ mod tests { let id = identity_with_scopes("caller", &["read"]); let extensions = extensions_with_identity(Some(id.clone())); let extracted = ToMcpGateway::extract_identity_from_extensions(&extensions); - assert_eq!(extracted.as_ref().map(|i| i.id.clone()), Some("caller".to_string())); + assert_eq!( + extracted.as_ref().map(|i| i.id.clone()), + Some("caller".to_string()) + ); assert_eq!( extracted.as_ref().map(|i| i.scopes.clone()), Some(vec!["read".to_string()]) @@ -834,12 +927,18 @@ mod tests { ); let mut call_args = Map::new(); - call_args.insert("operation".to_string(), Value::String(first_name.to_string())); - call_args.insert("input".to_string(), serde_json::json!({ "path": "/etc/hosts" })); + call_args.insert( + "operation".to_string(), + Value::String(first_name.to_string()), + ); + call_args.insert( + "input".to_string(), + serde_json::json!({ "path": "/etc/hosts" }), + ); let call_result = invoke_tool(&gateway, "call", Some(call_args), None).await; assert_eq!( call_result.structured_content, Some(serde_json::json!({ "path": "/etc/hosts" })) ); } -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/adapters/to_openapi.rs b/crates/alknet-http/src/adapters/to_openapi.rs index 4c5dbbb..753aa81 100644 --- a/crates/alknet-http/src/adapters/to_openapi.rs +++ b/crates/alknet-http/src/adapters/to_openapi.rs @@ -1,27 +1,39 @@ -//! `to_openapi`: the OpenAPI gateway projection (ADR-042). Generates a -//! fixed 5-endpoint gateway doc (`/search`, `/schema`, `/call`, `/batch`, -//! `/subscribe`) that gates access to the full operation registry — not one -//! path per operation. Served at `GET /openapi.json` by the HTTP server. +//! `to_openapi`: gateway projection of the local operation registry into a +//! fixed 5-endpoint OpenAPI 3.0 document (ADR-042). //! -//! Pure projection (ADR-017 §5): consumes the registry, does not produce -//! entries, is not an `OperationAdapter`. The per-caller operation surface -//! is discovered via `/search` (AccessControl-filtered at runtime), not -//! preloaded into the doc (ADR-042 §3). `info.version` is a constant -//! semver tracking the gateway endpoint contract, not the operation set -//! (ADR-045); the initial version is `1.0.0`. +//! `to_openapi` is a pure projection (ADR-017 §5): it consumes the registry +//! and produces a spec; it does not modify the registry, register +//! operations, or implement `OperationAdapter`. The generated doc describes +//! the 5 fixed gateway endpoints (`/search`, `/schema`, `/call`, `/batch`, +//! `/subscribe`) — the sole HTTP invoke path (ADR-047). The per-caller +//! operation surface is discovered at runtime through AccessControl-filtered +//! `/search`, not preloaded into the doc (ADR-042 §3). //! -//! Error fidelity (ADR-023): `/call`'s responses include the protocol- -//! level errors (400/401/403/404/500/504) plus the operation-level errors -//! declared in registry `error_schemas` (mapped by `http_status`). +//! `info.version` is a semver constant tracking the **gateway endpoint +//! contract**, not the operation set — per-caller operation changes do not +//! bump the version (ADR-045). The initial version is `1.0.0`. +//! +//! Error fidelity (ADR-023): `/call`'s responses include the protocol-level +//! errors (400, 401, 403, 404, 500, 504) plus the operation-level errors +//! from the registry's `error_schemas`, mapped by `http_status`. +//! `HTTP_`-prefixed codes project to their status without colliding +//! with the protocol-level codes. +//! +//! See `docs/architecture/crates/http/http-adapters.md` §"to_openapi" and +//! ADR-042/045/023. + +use std::collections::BTreeMap; -use alknet_call::registry::registration::OperationRegistry; -use alknet_call::registry::spec::Visibility; use serde_json::{json, Map, Value}; -use crate::adapters::OpenAPISpec; +use alknet_call::registry::registration::OperationRegistry; +use alknet_call::registry::spec::ErrorDefinition; + +use super::from_openapi::OpenAPISpec; const GATEWAY_VERSION: &str = "1.0.0"; const GATEWAY_TITLE: &str = "alknet gateway"; +const OPENAPI_VERSION: &str = "3.0.0"; const PATH_SEARCH: &str = "/search"; const PATH_SCHEMA: &str = "/schema"; @@ -29,229 +41,209 @@ const PATH_CALL: &str = "/call"; const PATH_BATCH: &str = "/batch"; const PATH_SUBSCRIBE: &str = "/subscribe"; -const CONTENT_JSON: &str = "application/json"; -const CONTENT_SSE: &str = "text/event-stream"; +const STATUS_BAD_REQUEST: u16 = 400; +const STATUS_UNAUTHORIZED: u16 = 401; +const STATUS_FORBIDDEN: u16 = 403; +const STATUS_NOT_FOUND: u16 = 404; +const STATUS_INTERNAL: u16 = 500; +const STATUS_TIMEOUT: u16 = 504; -const STATUS_BAD_REQUEST: &str = "400"; -const STATUS_UNAUTHORIZED: &str = "401"; -const STATUS_FORBIDDEN: &str = "403"; -const STATUS_NOT_FOUND: &str = "404"; -const STATUS_INTERNAL: &str = "500"; -const STATUS_TIMEOUT: &str = "504"; +const CODE_INVALID_INPUT: &str = "INVALID_INPUT"; +const CODE_FORBIDDEN: &str = "FORBIDDEN"; +const CODE_NOT_FOUND: &str = "NOT_FOUND"; +const CODE_INTERNAL: &str = "INTERNAL"; +const CODE_TIMEOUT: &str = "TIMEOUT"; + +const HTTP_PREFIX: &str = "HTTP_"; pub fn to_openapi(registry: &OperationRegistry) -> OpenAPISpec { - let mut paths_obj = Map::new(); - paths_obj.insert( - PATH_SEARCH.to_string(), - path_item("get", search_operation()), - ); - paths_obj.insert( - PATH_SCHEMA.to_string(), - path_item("get", schema_operation()), - ); - paths_obj.insert( - PATH_CALL.to_string(), - path_item("post", call_operation(registry)), - ); - paths_obj.insert(PATH_BATCH.to_string(), path_item("post", batch_operation())); - paths_obj.insert( - PATH_SUBSCRIBE.to_string(), - path_item("post", subscribe_operation()), - ); + let operation_errors = collect_operation_errors(registry); + let raw = build_doc(operation_errors); + OpenAPISpec::from_value(raw).expect("to_openapi always emits a valid OpenAPI document") +} - let doc = json!({ - "openapi": "3.0.0", +fn build_doc(operation_errors: Vec) -> Value { + let paths = json!({ + PATH_SEARCH: search_path_item(), + PATH_SCHEMA: schema_path_item(), + PATH_CALL: call_path_item(&operation_errors), + PATH_BATCH: batch_path_item(), + PATH_SUBSCRIBE: subscribe_path_item(), + }); + + json!({ + "openapi": OPENAPI_VERSION, "info": { "title": GATEWAY_TITLE, "version": GATEWAY_VERSION, + "description": "alknet gateway: 5 fixed endpoints gating access to the operation registry. The per-caller operation surface is discovered via /search (AccessControl-filtered), not preloaded into this doc." }, - "paths": Value::Object(paths_obj), - }); - - OpenAPISpec::from_value(doc).expect("generated gateway doc is a valid OpenAPI 3.0 object") -} - -fn path_item(method: &str, operation: Value) -> Value { - let mut item = Map::new(); - item.insert(method.to_string(), operation); - Value::Object(item) -} - -fn search_operation() -> Value { - json!({ - "operationId": "search", - "summary": "List/search available operations (AccessControl-filtered). Returns names + descriptions.", - "responses": { - "200": json_response(search_output_schema()), - STATUS_BAD_REQUEST: error_response("INVALID_INPUT", "Malformed query."), - STATUS_UNAUTHORIZED: error_response("UNAUTHORIZED", "Missing bearer token."), - STATUS_FORBIDDEN: error_response("FORBIDDEN", "Insufficient scopes."), - STATUS_INTERNAL: error_response("INTERNAL", "Internal error."), - STATUS_TIMEOUT: error_response("TIMEOUT", "Request timed out."), + "paths": paths, + "components": { + "schemas": components_schemas() } }) } -fn schema_operation() -> Value { +fn search_path_item() -> Value { json!({ - "operationId": "schema", - "summary": "Get an operation's full OperationSpec (input/output JSON Schemas, error schemas).", - "parameters": [{ - "name": "name", - "in": "query", - "required": true, - "schema": { "type": "string" } - }], - "responses": { - "200": json_response(schema_output_schema()), - STATUS_BAD_REQUEST: error_response("INVALID_INPUT", "Missing or malformed `name` parameter."), - STATUS_UNAUTHORIZED: error_response("UNAUTHORIZED", "Missing bearer token."), - STATUS_FORBIDDEN: error_response("FORBIDDEN", "Insufficient scopes for the requested operation."), - STATUS_NOT_FOUND: error_response("NOT_FOUND", "Operation not registered."), - STATUS_INTERNAL: error_response("INTERNAL", "Internal error."), - STATUS_TIMEOUT: error_response("TIMEOUT", "Request timed out."), + "get": { + "operationId": "gatewaySearch", + "summary": "List/search operations (AccessControl-filtered). Returns names + descriptions.", + "responses": { + "200": json_response(schema_search_result()), + "401": json_response(schema_unauthorized()), + "403": json_response(schema_forbidden()), + "500": json_response(schema_internal()), + "504": json_response(schema_timeout()) + } } }) } -fn call_operation(registry: &OperationRegistry) -> Value { - let mut responses = Map::new(); - responses.insert("200".to_string(), json_response(call_success_schema())); +fn schema_path_item() -> Value { + json!({ + "get": { + "operationId": "gatewaySchema", + "summary": "Get an operation's full OperationSpec (input/output JSON Schemas, error schemas).", + "parameters": [ + { + "name": "name", + "in": "query", + "required": true, + "schema": { "type": "string" } + } + ], + "responses": { + "200": json_response(schema_schema_result()), + "400": json_response(schema_invalid_input()), + "401": json_response(schema_unauthorized()), + "403": json_response(schema_forbidden()), + "404": json_response(schema_not_found()), + "500": json_response(schema_internal()), + "504": json_response(schema_timeout()) + } + } + }) +} + +fn call_path_item(operation_errors: &[ErrorDefinition]) -> Value { + let mut responses: Map = serde_json::Map::new(); + responses.insert("200".to_string(), json_response(schema_call_ok())); responses.insert( STATUS_BAD_REQUEST.to_string(), - error_response( - "INVALID_INPUT", - "The request body was not a valid `{ operation, input }` object.", - ), + json_response(schema_protocol_error(CODE_INVALID_INPUT)), ); responses.insert( STATUS_UNAUTHORIZED.to_string(), - error_response("UNAUTHORIZED", "No bearer token provided."), + json_response(schema_unauthorized()), ); responses.insert( STATUS_FORBIDDEN.to_string(), - error_response( - "FORBIDDEN", - "Insufficient scopes to invoke the requested operation.", - ), + json_response(schema_protocol_error(CODE_FORBIDDEN)), ); responses.insert( STATUS_NOT_FOUND.to_string(), - error_response("NOT_FOUND", "Operation not registered (or is Internal)."), + json_response(schema_protocol_error(CODE_NOT_FOUND)), ); responses.insert( STATUS_INTERNAL.to_string(), - error_response("INTERNAL", "Internal error."), + json_response(schema_protocol_error(CODE_INTERNAL)), ); responses.insert( STATUS_TIMEOUT.to_string(), - error_response("TIMEOUT", "Request timed out."), + json_response(schema_protocol_error(CODE_TIMEOUT)), ); - for spec in registry.list_operations() { - if spec.visibility != Visibility::External { - continue; - } - for error in &spec.error_schemas { - let Some(status) = error.http_status else { - continue; - }; - let code = format!("{status}"); - if responses.contains_key(&code) { - continue; - } - responses.insert(code, json_response(error.schema.clone())); - } + let mut operation_errors_by_status: BTreeMap> = BTreeMap::new(); + for error in operation_errors { + let status = match error.http_status { + Some(status) => status, + None => continue, + }; + operation_errors_by_status + .entry(status) + .or_default() + .push(error); + } + + for (status, errors) in operation_errors_by_status { + let key = status.to_string(); + let response = responses + .entry(key) + .or_insert_with(|| json_response(Value::Null)); + merge_operation_errors(response, &errors); } json!({ - "operationId": "call", - "summary": "Invoke an operation by name with a flat JSON body `{ operation, input }`.", - "requestBody": { - "required": true, - "content": { - CONTENT_JSON: { - "schema": call_input_schema(), + "post": { + "operationId": "gatewayCall", + "summary": "Invoke an operation by name with a flat JSON input.", + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": schema_call_request() + } } - } - }, - "responses": Value::Object(responses), - }) -} - -fn batch_operation() -> Value { - json!({ - "operationId": "batch", - "summary": "Invoke multiple operations in one request. Array of `{ operation, input }`.", - "requestBody": { - "required": true, - "content": { - CONTENT_JSON: { - "schema": batch_input_schema(), - } - } - }, - "responses": { - "200": json_response(batch_output_schema()), - STATUS_BAD_REQUEST: error_response("INVALID_INPUT", "The request body was not a JSON array of call requests."), - STATUS_UNAUTHORIZED: error_response("UNAUTHORIZED", "Missing bearer token."), - STATUS_FORBIDDEN: error_response("FORBIDDEN", "Insufficient scopes."), - STATUS_INTERNAL: error_response("INTERNAL", "Internal error."), - STATUS_TIMEOUT: error_response("TIMEOUT", "Request timed out."), + }, + "responses": Value::Object(responses) } }) } -fn subscribe_operation() -> Value { - let mut responses = Map::new(); - responses.insert("200".to_string(), sse_response(call_success_schema())); - responses.insert( - STATUS_BAD_REQUEST.to_string(), - error_response( - "INVALID_INPUT", - "The request body was not a valid `{ operation, input }` object.", - ), - ); - responses.insert( - STATUS_UNAUTHORIZED.to_string(), - error_response("UNAUTHORIZED", "No bearer token provided."), - ); - responses.insert( - STATUS_FORBIDDEN.to_string(), - error_response( - "FORBIDDEN", - "Insufficient scopes to invoke the requested operation.", - ), - ); - responses.insert( - STATUS_NOT_FOUND.to_string(), - error_response("NOT_FOUND", "Operation not registered (or is Internal)."), - ); - responses.insert( - STATUS_INTERNAL.to_string(), - error_response("INTERNAL", "Internal error."), - ); - responses.insert( - STATUS_TIMEOUT.to_string(), - error_response("TIMEOUT", "Request timed out."), - ); - +fn batch_path_item() -> Value { json!({ - "operationId": "subscribe", - "summary": "Invoke a streaming operation. Body `{ operation, input }`; response is `text/event-stream`.", - "requestBody": { - "required": true, - "content": { - CONTENT_JSON: { - "schema": call_input_schema(), + "post": { + "operationId": "gatewayBatch", + "summary": "Invoke multiple operations in one request. Returns an array of results.", + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": schema_batch_request() + } } + }, + "responses": { + "200": json_response(schema_batch_result()), + "400": json_response(schema_invalid_input()), + "401": json_response(schema_unauthorized()), + "403": json_response(schema_forbidden()), + "500": json_response(schema_internal()), + "504": json_response(schema_timeout()) } - }, - "responses": Value::Object(responses), + } }) } -fn call_input_schema() -> Value { +fn subscribe_path_item() -> Value { + json!({ + "post": { + "operationId": "gatewaySubscribe", + "summary": "Invoke a streaming operation. Response is text/event-stream.", + "requestBody": { + "required": true, + "content": { + "application/json": { + "schema": schema_call_request() + } + } + }, + "responses": { + "200": sse_response(), + "400": json_response(schema_invalid_input()), + "401": json_response(schema_unauthorized()), + "403": json_response(schema_forbidden()), + "404": json_response(schema_not_found()), + "500": json_response(schema_internal()), + "504": json_response(schema_timeout()) + } + } + }) +} + +fn schema_call_request() -> Value { json!({ "type": "object", "properties": { @@ -268,14 +260,26 @@ fn call_input_schema() -> Value { }) } -fn batch_input_schema() -> Value { +fn schema_batch_request() -> Value { json!({ "type": "array", - "items": call_input_schema() + "items": schema_call_request() }) } -fn search_output_schema() -> Value { +fn schema_call_ok() -> Value { + json!({ + "type": "object", + "properties": { + "request_id": { "type": "string" }, + "result": { "type": "string", "enum": ["ok"] }, + "output": { "type": "object", "description": "The operation's output." } + }, + "required": ["request_id", "result", "output"] + }) +} + +fn schema_search_result() -> Value { json!({ "type": "object", "properties": { @@ -293,33 +297,22 @@ fn search_output_schema() -> Value { }) } -fn schema_output_schema() -> Value { +fn schema_schema_result() -> Value { json!({ "type": "object", + "description": "The full OperationSpec for the requested operation.", "properties": { "name": { "type": "string" }, "namespace": { "type": "string" }, - "op_type": { "type": "string" }, - "input_schema": {}, - "output_schema": {}, - "error_schemas": { "type": "array" }, - "access_control": {} + "op_type": { "type": "string", "enum": ["query", "mutation", "subscription"] }, + "input_schema": { "type": "object" }, + "output_schema": { "type": "object" }, + "error_schemas": { "type": "array" } } }) } -fn call_success_schema() -> Value { - json!({ - "type": "object", - "properties": { - "request_id": { "type": "string" }, - "result": { "type": "string", "enum": ["ok"] }, - "output": {} - } - }) -} - -fn batch_output_schema() -> Value { +fn schema_batch_result() -> Value { json!({ "type": "object", "properties": { @@ -329,9 +322,9 @@ fn batch_output_schema() -> Value { "type": "object", "properties": { "request_id": { "type": "string" }, - "result": { "type": "string" }, - "output": {}, - "error": {} + "result": { "type": "string", "enum": ["ok", "error"] }, + "output": { "type": "object" }, + "error": { "type": "object" } } } } @@ -339,47 +332,197 @@ fn batch_output_schema() -> Value { }) } +fn schema_invalid_input() -> Value { + schema_protocol_error(CODE_INVALID_INPUT) +} + +fn schema_unauthorized() -> Value { + json!({ + "type": "object", + "properties": { + "code": { "type": "string", "enum": ["FORBIDDEN"] }, + "message": { "type": "string", "description": "Authentication required (no bearer token)." }, + "retryable": { "type": "boolean" } + }, + "required": ["code", "message", "retryable"] + }) +} + +fn schema_forbidden() -> Value { + schema_protocol_error(CODE_FORBIDDEN) +} + +fn schema_not_found() -> Value { + schema_protocol_error(CODE_NOT_FOUND) +} + +fn schema_internal() -> Value { + schema_protocol_error(CODE_INTERNAL) +} + +fn schema_timeout() -> Value { + schema_protocol_error(CODE_TIMEOUT) +} + +fn schema_protocol_error(code: &str) -> Value { + json!({ + "type": "object", + "properties": { + "code": { "type": "string", "enum": [code] }, + "message": { "type": "string" }, + "retryable": { "type": "boolean" } + }, + "required": ["code", "message", "retryable"] + }) +} + +fn operation_error_schema(error: &ErrorDefinition) -> Value { + let mut schema = if error.schema.is_object() { + error.schema.clone() + } else { + json!({ "type": "object" }) + }; + let obj = schema.as_object_mut().expect("error schema is object"); + obj.entry("title") + .or_insert(Value::String(error.code.clone())); + obj.entry("description") + .or_insert(Value::String(error.description.clone())); + schema +} + fn json_response(schema: Value) -> Value { json!({ "description": "", "content": { - CONTENT_JSON: { - "schema": schema, + "application/json": { + "schema": schema } } }) } -fn sse_response(schema: Value) -> Value { +fn sse_response() -> Value { json!({ - "description": "", + "description": "Server-Sent Events stream. Each `data:` frame is a call.responded event; stream close is call.completed.", "content": { - CONTENT_SSE: { - "schema": schema, - } - } - }) -} - -fn error_response(code: &str, message: &str) -> Value { - json!({ - "description": message, - "content": { - CONTENT_JSON: { + "text/event-stream": { "schema": { - "type": "object", - "properties": { - "code": { "type": "string", "enum": [code] }, - "message": { "type": "string" }, - "retryable": { "type": "boolean" } - }, - "required": ["code", "message", "retryable"] + "type": "string", + "description": "SSE frame: `data: \\n\\n`." } } } }) } +fn merge_operation_errors(response: &mut Value, errors: &[&ErrorDefinition]) { + let obj = match response.as_object_mut() { + Some(obj) => obj, + None => return, + }; + let content = obj + .entry("content".to_string()) + .or_insert(json!({})) + .as_object_mut(); + let content = match content { + Some(c) => c, + None => return, + }; + let json_entry = content + .entry("application/json".to_string()) + .or_insert(json!({})) + .as_object_mut(); + let json_entry = match json_entry { + Some(j) => j, + None => return, + }; + let existing_schema = json_entry.get("schema").cloned(); + let op_schemas: Vec = errors.iter().map(|e| operation_error_schema(e)).collect(); + let merged = match existing_schema { + Some(existing) if !existing.is_null() => { + let mut variants = vec![existing]; + for s in op_schemas { + if !variant_already_present(&variants, &s) { + variants.push(s); + } + } + if variants.len() == 1 { + variants.into_iter().next().unwrap() + } else { + json!({ "oneOf": variants }) + } + } + _ => { + if op_schemas.len() == 1 { + op_schemas.into_iter().next().unwrap() + } else { + json!({ "oneOf": op_schemas }) + } + } + }; + json_entry.insert("schema".to_string(), merged); + + let description = errors + .iter() + .map(|e| format!("{}: {}", e.code, e.description)) + .collect::>() + .join("; "); + obj.insert("description".to_string(), Value::String(description)); +} + +fn variant_already_present(variants: &[Value], candidate: &Value) -> bool { + variants.iter().any(|v| { + v.get("title").and_then(Value::as_str) == candidate.get("title").and_then(Value::as_str) + }) +} + +fn components_schemas() -> Value { + json!({ + "CallRequest": schema_call_request(), + "CallOk": schema_call_ok(), + "SearchResult": schema_search_result(), + "SchemaResult": schema_schema_result(), + "BatchResult": schema_batch_result() + }) +} + +fn collect_operation_errors(registry: &OperationRegistry) -> Vec { + let mut by_status: BTreeMap> = BTreeMap::new(); + let mut seen_codes: std::collections::BTreeSet = std::collections::BTreeSet::new(); + for spec in registry.list_operations() { + for error in &spec.error_schemas { + let status = match error.http_status { + Some(status) => status, + None => continue, + }; + if is_protocol_status(status) && !is_http_prefixed_code(&error.code) { + continue; + } + if !seen_codes.insert(error.code.clone()) { + continue; + } + by_status.entry(status).or_default().push(error.clone()); + } + } + by_status.into_values().flatten().collect() +} + +fn is_protocol_status(status: u16) -> bool { + matches!( + status, + STATUS_BAD_REQUEST + | STATUS_UNAUTHORIZED + | STATUS_FORBIDDEN + | STATUS_NOT_FOUND + | STATUS_INTERNAL + | STATUS_TIMEOUT + ) +} + +fn is_http_prefixed_code(code: &str) -> bool { + code.starts_with(HTTP_PREFIX) && code[HTTP_PREFIX.len()..].parse::().is_ok() +} + #[cfg(test)] mod tests { use super::*; @@ -387,19 +530,18 @@ mod tests { use alknet_call::registry::registration::{ make_handler, HandlerRegistration, OperationProvenance, }; - use alknet_call::registry::spec::{ - AccessControl, ErrorDefinition, OperationSpec, OperationType, - }; + use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::types::Capabilities; + use serde_json::{json, Map}; - fn echo_handler() -> alknet_call::registry::registration::Handler { - make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }) + fn noop_handler() -> alknet_call::registry::registration::Handler { + make_handler(|_input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, Value::Null) }) } - fn register_op(registry: &mut OperationRegistry, spec: OperationSpec) { + fn register(registry: &mut OperationRegistry, spec: OperationSpec) { registry.register(HandlerRegistration::new( spec, - echo_handler(), + noop_handler(), OperationProvenance::Local, None, None, @@ -407,248 +549,504 @@ mod tests { )); } - fn external_spec(name: &str) -> OperationSpec { + fn external_spec(name: &str, errors: Vec) -> OperationSpec { OperationSpec::new( name, OperationType::Query, Visibility::External, json!({}), json!({}), - vec![], - AccessControl::default(), - ) - } - - fn spec_with_errors(name: &str, errors: Vec) -> OperationSpec { - OperationSpec::new( - name, - OperationType::Mutation, - Visibility::External, - json!({}), - json!({}), errors, AccessControl::default(), ) } - fn err(code: &str, status: Option) -> ErrorDefinition { + fn error(code: &str, http_status: Option) -> ErrorDefinition { ErrorDefinition { code: code.to_string(), - description: format!("{code} error"), - schema: json!({ "type": "object", "properties": { "msg": { "type": "string" } } }), - http_status: status, + description: format!("error {code}"), + schema: json!({ "type": "object" }), + http_status, } } - fn paths(spec: &OpenAPISpec) -> Vec { - spec.paths.keys().cloned().collect() + fn paths_object(spec: &OpenAPISpec) -> &Map { + spec.raw + .get("paths") + .and_then(Value::as_object) + .expect("paths object present") + } + + fn path<'a>(spec: &'a OpenAPISpec, name: &str) -> &'a Map { + paths_object(spec) + .get(name) + .and_then(Value::as_object) + .unwrap_or_else(|| panic!("path {name} present")) + } + + fn operation<'a>(spec: &'a OpenAPISpec, name: &str, method: &str) -> &'a Map { + path(spec, name) + .get(method) + .and_then(Value::as_object) + .unwrap_or_else(|| panic!("operation {method} {name} present")) + } + + fn responses<'a>(spec: &'a OpenAPISpec, name: &str, method: &str) -> &'a Map { + operation(spec, name, method) + .get("responses") + .and_then(Value::as_object) + .expect("responses present") } #[test] - fn generated_doc_has_exactly_five_gateway_paths() { + fn empty_registry_produces_five_gateway_paths() { let registry = OperationRegistry::new(); let spec = to_openapi(®istry); - let mut p = paths(&spec); - p.sort(); - assert_eq!( - p, - vec!["/batch", "/call", "/schema", "/search", "/subscribe"] - ); + let paths = paths_object(&spec); + assert_eq!(paths.len(), 5); + assert!(paths.contains_key(PATH_SEARCH)); + assert!(paths.contains_key(PATH_SCHEMA)); + assert!(paths.contains_key(PATH_CALL)); + assert!(paths.contains_key(PATH_BATCH)); + assert!(paths.contains_key(PATH_SUBSCRIBE)); } #[test] - fn generated_doc_does_not_leak_registry_operations_as_paths() { + fn registry_with_operations_does_not_add_per_operation_paths() { let mut registry = OperationRegistry::new(); - register_op(&mut registry, external_spec("fs/readFile")); - register_op(&mut registry, external_spec("agent/chat")); + register(&mut registry, external_spec("fs/readFile", vec![])); + register(&mut registry, external_spec("agent/chat", vec![])); let spec = to_openapi(®istry); - let p = paths(&spec); - assert!(!p.contains(&"/fs/readFile".to_string())); - assert!(!p.contains(&"/agent/chat".to_string())); - assert_eq!(p.len(), 5); + let paths = paths_object(&spec); + assert_eq!(paths.len(), 5); + assert!(!paths.contains_key("/fs/readFile")); + assert!(!paths.contains_key("/agent/chat")); } #[test] fn info_version_is_1_0_0() { let registry = OperationRegistry::new(); let spec = to_openapi(®istry); - assert_eq!(spec.info.version, "1.0.0"); + let version = spec + .raw + .get("info") + .and_then(|i: &Value| i.get("version")) + .and_then(Value::as_str) + .unwrap(); + assert_eq!(version, GATEWAY_VERSION); + assert_eq!(version, "1.0.0"); } #[test] - fn call_request_schema_is_operation_and_input() { + fn info_title_present() { let registry = OperationRegistry::new(); let spec = to_openapi(®istry); - let call = &spec.paths["/call"].operations[0].1; - let body = call.request_body.as_ref().expect("request body"); - let schema = body.content.get(CONTENT_JSON).expect("json content"); - let props = schema + let title = spec + .raw + .get("info") + .and_then(|i: &Value| i.get("title")) + .and_then(Value::as_str) + .unwrap(); + assert_eq!(title, GATEWAY_TITLE); + } + + #[test] + fn openapi_field_is_3_0_0() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let openapi = spec.raw.get("openapi").and_then(Value::as_str).unwrap(); + assert_eq!(openapi, OPENAPI_VERSION); + } + + #[test] + fn call_request_body_is_flat_operation_input() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let request_schema = operation(&spec, PATH_CALL, "post") + .get("requestBody") + .and_then(|rb| rb.get("content")) + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .unwrap(); + let props = request_schema .get("properties") .and_then(Value::as_object) - .expect("properties"); + .unwrap(); assert!(props.contains_key("operation")); - let input = props.get("input").expect("input"); - assert_eq!(input.get("type").and_then(Value::as_str), Some("object")); - let required = schema + assert!(props.contains_key("input")); + let operation_prop = props.get("operation").unwrap(); + assert_eq!( + operation_prop.get("type").and_then(Value::as_str), + Some("string") + ); + let input_prop = props.get("input").unwrap(); + assert_eq!( + input_prop.get("type").and_then(Value::as_str), + Some("object") + ); + let required = request_schema .get("required") .and_then(Value::as_array) - .expect("required"); + .unwrap(); assert!(required.iter().any(|v| v == "operation")); } #[test] - fn subscribe_response_content_type_is_text_event_stream() { + fn call_includes_all_protocol_level_error_statuses() { let registry = OperationRegistry::new(); let spec = to_openapi(®istry); - let subscribe = &spec.paths["/subscribe"].operations[0].1; - let resp = &subscribe.responses["200"]; - assert!(resp.content.contains_key(CONTENT_SSE)); - assert!(!resp.content.contains_key(CONTENT_JSON)); - } - - #[test] - fn call_responses_include_all_protocol_level_error_statuses() { - let registry = OperationRegistry::new(); - let spec = to_openapi(®istry); - let call = &spec.paths["/call"].operations[0].1; - for status in ["400", "401", "403", "404", "500", "504"] { + let responses = responses(&spec, PATH_CALL, "post"); + for status in [ + STATUS_BAD_REQUEST, + STATUS_UNAUTHORIZED, + STATUS_FORBIDDEN, + STATUS_NOT_FOUND, + STATUS_INTERNAL, + STATUS_TIMEOUT, + ] { assert!( - call.responses.contains_key(status), - "missing protocol-level response {status}" + responses.contains_key(&status.to_string()), + "protocol status {status} present on /call" ); } + assert!(responses.contains_key("200")); } #[test] - fn call_responses_include_operation_level_errors_with_http_status() { + fn call_protocol_error_status_codes_have_protocol_codes() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let responses = responses(&spec, PATH_CALL, "post"); + + let invalid_input_schema = responses + .get(&STATUS_BAD_REQUEST.to_string()) + .and_then(|r: &Value| r.get("content")) + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .and_then(|s: &Value| s.get("properties")) + .and_then(|p: &Value| p.get("code")) + .and_then(|c: &Value| c.get("enum")) + .and_then(Value::as_array) + .unwrap(); + assert_eq!(invalid_input_schema[0], CODE_INVALID_INPUT); + + let forbidden_schema = responses + .get(&STATUS_FORBIDDEN.to_string()) + .and_then(|r: &Value| r.get("content")) + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .and_then(|s: &Value| s.get("properties")) + .and_then(|p: &Value| p.get("code")) + .and_then(|c: &Value| c.get("enum")) + .and_then(Value::as_array) + .unwrap(); + assert_eq!(forbidden_schema[0], CODE_FORBIDDEN); + + let timeout_schema = responses + .get(&STATUS_TIMEOUT.to_string()) + .and_then(|r: &Value| r.get("content")) + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .and_then(|s: &Value| s.get("properties")) + .and_then(|p: &Value| p.get("code")) + .and_then(|c: &Value| c.get("enum")) + .and_then(Value::as_array) + .unwrap(); + assert_eq!(timeout_schema[0], CODE_TIMEOUT); + } + + #[test] + fn operation_errors_projected_onto_call() { let mut registry = OperationRegistry::new(); - register_op( + register( &mut registry, - spec_with_errors( - "svc/op", + external_spec( + "fs/readFile", vec![ - err("RATE_LIMITED", Some(429)), - err("UNPROCESSABLE", Some(422)), + error("FILE_NOT_FOUND", Some(404)), + error("RATE_LIMITED", Some(429)), ], ), ); let spec = to_openapi(®istry); - let call = &spec.paths["/call"].operations[0].1; - assert!(call.responses.contains_key("429")); - assert!(call.responses.contains_key("422")); - let resp429 = &call.responses["429"]; - let schema = resp429 - .content - .get(CONTENT_JSON) - .and_then(|v| v.get("properties")) - .and_then(|v| v.get("msg")) - .expect("projected error schema"); - assert_eq!(schema.get("type").and_then(Value::as_str), Some("string")); + let responses = responses(&spec, PATH_CALL, "post"); + assert!( + responses.contains_key("429"), + "operation-level 429 projected onto /call" + ); + let response_429 = responses.get("429").unwrap(); + let schema = response_429 + .get("content") + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .unwrap(); + let title = schema.get("title").and_then(Value::as_str).unwrap(); + assert_eq!(title, "RATE_LIMITED"); + let description = response_429 + .get("description") + .and_then(Value::as_str) + .unwrap(); + assert!(description.contains("RATE_LIMITED")); } #[test] - fn call_responses_project_http_404_error_code_as_404_response() { + fn http_prefixed_error_code_projects_to_status() { let mut registry = OperationRegistry::new(); - register_op( + register( &mut registry, - spec_with_errors("svc/op", vec![err("HTTP_404", Some(404))]), + external_spec("svc/op", vec![error("HTTP_404", Some(404))]), ); let spec = to_openapi(®istry); - let call = &spec.paths["/call"].operations[0].1; - assert!(call.responses.contains_key("404")); + let responses = responses(&spec, PATH_CALL, "post"); + let response_404 = responses.get("404").unwrap(); + let schema = response_404 + .get("content") + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .unwrap(); + let one_of = schema.get("oneOf").and_then(Value::as_array); + let titles: Vec<&str> = match one_of { + Some(arr) => arr + .iter() + .filter_map(|v: &Value| v.get("title").and_then(Value::as_str)) + .collect(), + None => vec![schema.get("title").and_then(Value::as_str).unwrap_or("")], + }; + assert!( + titles.contains(&"HTTP_404"), + "HTTP_404 operation error must be projected on /call 404, got titles: {titles:?}" + ); } #[test] - fn call_responses_do_not_duplicate_protocol_level_status_with_operation_error() { + fn http_prefixed_code_does_not_collide_with_protocol_code() { let mut registry = OperationRegistry::new(); - register_op( + register( &mut registry, - spec_with_errors("svc/op", vec![err("HTTP_500", Some(500))]), + external_spec("svc/op", vec![error("HTTP_404", Some(404))]), ); let spec = to_openapi(®istry); - let call = &spec.paths["/call"].operations[0].1; - assert!(call.responses.contains_key("500")); + let responses = responses(&spec, PATH_CALL, "post"); + let response_404 = responses.get("404").unwrap(); + let schema = response_404 + .get("content") + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .unwrap(); + let one_of = schema.get("oneOf").and_then(Value::as_array); + let variants: Vec<&Value> = match one_of { + Some(arr) => arr.iter().collect(), + None => vec![schema], + }; + let http_404_variant = variants + .iter() + .find(|v| v.get("title").and_then(Value::as_str) == Some("HTTP_404")) + .expect("HTTP_404 variant present"); + let http_enum = http_404_variant + .get("properties") + .and_then(|p: &Value| p.get("code")) + .and_then(|c: &Value| c.get("enum")) + .and_then(Value::as_array); + assert!( + http_enum.is_none(), + "HTTP_404 variant is not constrained to a protocol code enum" + ); + let titles: Vec<&str> = variants + .iter() + .filter_map(|v| v.get("title").and_then(Value::as_str)) + .collect(); + assert!( + titles.contains(&"HTTP_404"), + "HTTP_404 operation error projected alongside protocol 404, got titles: {titles:?}" + ); } #[test] - fn operation_errors_without_http_status_are_not_projected() { + fn operation_error_without_http_status_not_projected() { let mut registry = OperationRegistry::new(); - register_op( + register( &mut registry, - spec_with_errors("svc/op", vec![err("FILE_NOT_FOUND", None)]), + external_spec("svc/op", vec![error("DOMAIN_ERROR", None)]), ); let spec = to_openapi(®istry); - let call = &spec.paths["/call"].operations[0].1; - assert!(!call.responses.contains_key("0")); - assert!(call.responses.contains_key("500")); + let responses = responses(&spec, PATH_CALL, "post"); + assert!(!responses.contains_key("0")); + assert_eq!( + responses.len(), + 7, + "only protocol-level statuses + 200 present" + ); } #[test] - fn to_openapi_is_a_pure_projection_and_not_an_operation_adapter() { - fn assert_not_adapter() {} - assert_not_adapter:: OpenAPISpec>(); - let mut registry = OperationRegistry::new(); - register_op(&mut registry, external_spec("svc/op")); - let before = registry.list_operations().len(); - let _ = to_openapi(®istry); - assert_eq!(registry.list_operations().len(), before); - } - - #[test] - fn batch_request_schema_is_array_of_call_request() { + fn subscribe_response_is_text_event_stream() { let registry = OperationRegistry::new(); let spec = to_openapi(®istry); - let batch = &spec.paths["/batch"].operations[0].1; - let body = batch.request_body.as_ref().expect("request body"); - let schema = body.content.get(CONTENT_JSON).expect("json content"); - assert_eq!(schema.get("type").and_then(Value::as_str), Some("array")); + let responses = responses(&spec, PATH_SUBSCRIBE, "post"); + let ok = responses.get("200").unwrap(); + let content = ok.get("content").and_then(Value::as_object).unwrap(); + assert!(content.contains_key("text/event-stream")); + assert!(!content.contains_key("application/json")); } #[test] - fn subscribe_request_body_uses_call_input_schema() { + fn subscribe_request_body_is_flat_operation_input() { let registry = OperationRegistry::new(); let spec = to_openapi(®istry); - let subscribe = &spec.paths["/subscribe"].operations[0].1; - let body = subscribe.request_body.as_ref().expect("request body"); - let schema = body.content.get(CONTENT_JSON).expect("json content"); - assert!(schema + let request_schema = operation(&spec, PATH_SUBSCRIBE, "post") + .get("requestBody") + .and_then(|rb| rb.get("content")) + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .unwrap(); + let props = request_schema .get("properties") .and_then(Value::as_object) - .map(|m| m.contains_key("operation")) - .unwrap_or(false)); + .unwrap(); + assert!(props.contains_key("operation")); + assert!(props.contains_key("input")); } #[test] - fn search_and_schema_are_get_operations() { - let registry = OperationRegistry::new(); - let spec = to_openapi(®istry); - assert_eq!(spec.paths["/search"].operations[0].0, "get"); - assert_eq!(spec.paths["/schema"].operations[0].0, "get"); - } - - #[test] - fn call_batch_subscribe_are_post_operations() { - let registry = OperationRegistry::new(); - let spec = to_openapi(®istry); - assert_eq!(spec.paths["/call"].operations[0].0, "post"); - assert_eq!(spec.paths["/batch"].operations[0].0, "post"); - assert_eq!(spec.paths["/subscribe"].operations[0].0, "post"); - } - - #[test] - fn raw_doc_carries_openapi_3_0_and_gateway_version() { + fn batch_request_body_is_array_of_call_requests() { let registry = OperationRegistry::new(); let spec = to_openapi(®istry); + let request_schema = operation(&spec, PATH_BATCH, "post") + .get("requestBody") + .and_then(|rb| rb.get("content")) + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .unwrap(); assert_eq!( - spec.raw.get("openapi").and_then(Value::as_str), - Some("3.0.0") + request_schema.get("type").and_then(Value::as_str), + Some("array") ); + let items = request_schema + .get("items") + .and_then(Value::as_object) + .unwrap(); + assert!(items.contains_key("properties")); + } + + #[test] + fn search_has_get_method() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + assert!(path(&spec, PATH_SEARCH).contains_key("get")); + assert!(!path(&spec, PATH_SEARCH).contains_key("post")); + } + + #[test] + fn schema_has_get_method_with_name_query_param() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + let params = operation(&spec, PATH_SCHEMA, "get") + .get("parameters") + .and_then(Value::as_array) + .unwrap(); + let name_param = ¶ms[0]; + assert_eq!(name_param.get("name").and_then(Value::as_str), Some("name")); + assert_eq!(name_param.get("in").and_then(Value::as_str), Some("query")); assert_eq!( - spec.raw - .get("info") - .and_then(|i| i.get("version")) - .and_then(Value::as_str), - Some("1.0.0") + name_param.get("required").and_then(Value::as_bool), + Some(true) ); } + + #[test] + fn call_has_post_method() { + let registry = OperationRegistry::new(); + let spec = to_openapi(®istry); + assert!(path(&spec, PATH_CALL).contains_key("post")); + assert!(!path(&spec, PATH_CALL).contains_key("get")); + } + + #[test] + fn to_openapi_is_pure_projection_does_not_modify_registry() { + let mut registry = OperationRegistry::new(); + register(&mut registry, external_spec("fs/readFile", vec![])); + let before_count = registry.list_operations().len(); + let _ = to_openapi(®istry); + assert_eq!(registry.list_operations().len(), before_count); + assert!(registry.registration("fs/readFile").is_some()); + } + + #[test] + fn duplicate_error_status_surfaces_all_distinct_codes() { + let mut registry = OperationRegistry::new(); + register( + &mut registry, + external_spec("svc/a", vec![error("RATE_LIMITED", Some(429))]), + ); + register( + &mut registry, + external_spec("svc/b", vec![error("TOO_MANY_REQUESTS", Some(429))]), + ); + let spec = to_openapi(®istry); + let responses = responses(&spec, PATH_CALL, "post"); + assert!(responses.contains_key("429")); + let schema = responses + .get("429") + .and_then(|r: &Value| r.get("content")) + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .unwrap(); + let one_of = schema.get("oneOf").and_then(Value::as_array).unwrap(); + let titles: Vec<&str> = one_of + .iter() + .filter_map(|v| v.get("title").and_then(Value::as_str)) + .collect(); + assert!(titles.contains(&"RATE_LIMITED")); + assert!(titles.contains(&"TOO_MANY_REQUESTS")); + } + + #[test] + fn internal_operations_excluded_from_error_projection() { + let mut registry = OperationRegistry::new(); + registry.register(HandlerRegistration::new( + OperationSpec::new( + "internal/op", + OperationType::Query, + Visibility::Internal, + json!({}), + json!({}), + vec![error("INTERNAL_ERROR", Some(418))], + AccessControl::default(), + ), + noop_handler(), + OperationProvenance::Local, + None, + None, + Capabilities::new(), + )); + let spec = to_openapi(®istry); + let responses = responses(&spec, PATH_CALL, "post"); + assert!( + !responses.contains_key("418"), + "internal op errors not projected" + ); + } + + #[test] + fn operation_error_with_protocol_status_but_http_prefix_is_projected() { + let mut registry = OperationRegistry::new(); + register( + &mut registry, + external_spec("svc/op", vec![error("HTTP_500", Some(500))]), + ); + let spec = to_openapi(®istry); + let responses = responses(&spec, PATH_CALL, "post"); + let schema = responses + .get("500") + .and_then(|r: &Value| r.get("content")) + .and_then(|c: &Value| c.get("application/json")) + .and_then(|c: &Value| c.get("schema")) + .unwrap(); + let one_of = schema.get("oneOf").and_then(Value::as_array).unwrap(); + let titles: Vec<&str> = one_of + .iter() + .filter_map(|v| v.get("title").and_then(Value::as_str)) + .collect(); + assert!(titles.contains(&"HTTP_500")); + } } diff --git a/crates/alknet-http/src/client/http_client.rs b/crates/alknet-http/src/client/http_client.rs index c0155dd..4b27ef5 100644 --- a/crates/alknet-http/src/client/http_client.rs +++ b/crates/alknet-http/src/client/http_client.rs @@ -125,10 +125,11 @@ fn build_client(config: &HttpClientConfig) -> Result Result usize { - self.deadlines.lock().expect("deadlines mutex poisoned").len() + self.deadlines + .lock() + .expect("deadlines mutex poisoned") + .len() } #[cfg(test)] @@ -156,8 +159,8 @@ mod tests { #[test] fn parse_retry_after_http_date() { - let deadline = parse_retry_after("Wed, 21 Oct 2099 07:28:00 GMT") - .expect("HTTP-date value parses"); + let deadline = + parse_retry_after("Wed, 21 Oct 2099 07:28:00 GMT").expect("HTTP-date value parses"); assert!(deadline > SystemTime::now()); } @@ -272,7 +275,10 @@ mod tests { async fn middleware_sleeps_before_request_with_active_deadline() { let mw = std::sync::Arc::new(RetryAfterMiddleware::with_capacity(8)); let target = url("https://api.example.com/v1/chat"); - mw.record_test(target.clone(), SystemTime::now() + Duration::from_millis(50)); + mw.record_test( + target.clone(), + SystemTime::now() + Duration::from_millis(50), + ); let started = SystemTime::now(); mw.maybe_sleep_for(&target).await; let elapsed = SystemTime::now().duration_since(started).unwrap(); @@ -281,4 +287,4 @@ mod tests { "middleware must sleep until the deadline elapses" ); } -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/gateway/dispatch.rs b/crates/alknet-http/src/gateway/dispatch.rs index 1a9446f..96c638f 100644 --- a/crates/alknet-http/src/gateway/dispatch.rs +++ b/crates/alknet-http/src/gateway/dispatch.rs @@ -83,11 +83,7 @@ impl GatewayDispatch { r.capabilities.clone(), r.scoped_env.clone().unwrap_or_else(ScopedPeerEnv::empty), ), - None => ( - None, - Capabilities::new(), - ScopedPeerEnv::empty(), - ), + None => (None, Capabilities::new(), ScopedPeerEnv::empty()), }; let env: Arc = @@ -254,10 +250,7 @@ mod tests { .invoke(None, "echo/run", serde_json::json!({ "msg": "hi" })) .await; assert!(response.result.is_ok()); - assert_eq!( - response.result.unwrap(), - serde_json::json!({ "msg": "hi" }) - ); + assert_eq!(response.result.unwrap(), serde_json::json!({ "msg": "hi" })); } #[tokio::test] @@ -270,9 +263,7 @@ mod tests { let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = dispatch(registry, provider); - let response = dp - .invoke(None, "/echo/run", serde_json::json!({})) - .await; + let response = dp.invoke(None, "/echo/run", serde_json::json!({})).await; assert!(response.result.is_ok()); } @@ -369,9 +360,7 @@ mod tests { let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = dispatch(registry, provider); - let response = dp - .invoke(None, "no/such", serde_json::json!({})) - .await; + let response = dp.invoke(None, "no/such", serde_json::json!({})).await; match response.result { Err(e) => { assert_eq!(e.code, "NOT_FOUND"); @@ -398,9 +387,7 @@ mod tests { let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = dispatch(registry, provider); - let response = dp - .invoke(None, "secret/op", serde_json::json!({})) - .await; + let response = dp.invoke(None, "secret/op", serde_json::json!({})).await; match response.result { Err(e) => { assert_eq!(e.code, "NOT_FOUND"); @@ -423,9 +410,7 @@ mod tests { let provider: Arc = Arc::new(StaticIdentityProvider::new()); let dp = dispatch(registry, provider); - let response = dp - .invoke(None, "admin/run", serde_json::json!({})) - .await; + let response = dp.invoke(None, "admin/run", serde_json::json!({})).await; match response.result { Err(e) => { assert_eq!(e.code, "FORBIDDEN"); @@ -506,8 +491,10 @@ mod tests { #[test] fn build_root_context_carries_registration_bundle_fields() { - let authority = - alknet_call::registry::context::CompositionAuthority::new("agent", ["fs:read".to_string()]); + let authority = alknet_call::registry::context::CompositionAuthority::new( + "agent", + ["fs:read".to_string()], + ); let scoped = ScopedPeerEnv::new(["fs/readFile"]); let caps = Capabilities::new().with_api_key("google", "k".to_string()); @@ -545,4 +532,4 @@ mod tests { fn assert_concrete() {} assert_concrete::(); } -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/gateway/error.rs b/crates/alknet-http/src/gateway/error.rs index b07674e..d848109 100644 --- a/crates/alknet-http/src/gateway/error.rs +++ b/crates/alknet-http/src/gateway/error.rs @@ -31,7 +31,10 @@ pub fn call_error_to_http_status(error: &CallError) -> u16 { call_error_to_http_status_with_identity(error, None) } -pub fn call_error_to_http_status_with_identity(error: &CallError, identity: Option<&Identity>) -> u16 { +pub fn call_error_to_http_status_with_identity( + error: &CallError, + identity: Option<&Identity>, +) -> u16 { match error.code.as_str() { PROTOCOL_CODE_NOT_FOUND => STATUS_NOT_FOUND, PROTOCOL_CODE_FORBIDDEN => { @@ -59,8 +62,8 @@ pub fn call_error_to_http_response(error: &CallError) -> Response { let retry_after = retry_after_value(error, status_code); if let Some(retry_after) = retry_after { - let header_value = HeaderValue::from_str(&retry_after) - .unwrap_or_else(|_| HeaderValue::from_static("0")); + let header_value = + HeaderValue::from_str(&retry_after).unwrap_or_else(|_| HeaderValue::from_static("0")); (status, [(header::RETRY_AFTER, header_value)], Json(body)).into_response() } else { (status, Json(body)).into_response() @@ -139,7 +142,10 @@ mod tests { fn forbidden_with_some_identity_maps_to_403() { let error = CallError::forbidden("insufficient scopes"); let id = identity(); - assert_eq!(call_error_to_http_status_with_identity(&error, Some(&id)), 403); + assert_eq!( + call_error_to_http_status_with_identity(&error, Some(&id)), + 403 + ); } #[test] @@ -213,7 +219,10 @@ mod tests { let error = CallError::new("HTTP_503", "slow down", true); let response = call_error_to_http_response(&error); assert_eq!(response.status(), StatusCode::from_u16(503).unwrap()); - assert!(response.headers().get(axum::http::header::RETRY_AFTER).is_none()); + assert!(response + .headers() + .get(axum::http::header::RETRY_AFTER) + .is_none()); } #[test] @@ -221,7 +230,10 @@ mod tests { let error = CallError::new("HTTP_503", "down", false) .with_details(serde_json::json!({ "retry_after": "5" })); let response = call_error_to_http_response(&error); - assert!(response.headers().get(axum::http::header::RETRY_AFTER).is_none()); + assert!(response + .headers() + .get(axum::http::header::RETRY_AFTER) + .is_none()); } #[test] @@ -241,7 +253,10 @@ mod tests { let error = CallError::timeout("timed out"); let response = call_error_to_http_response(&error); assert_eq!(response.status(), StatusCode::from_u16(504).unwrap()); - assert!(response.headers().get(axum::http::header::RETRY_AFTER).is_none()); + assert!(response + .headers() + .get(axum::http::header::RETRY_AFTER) + .is_none()); } #[test] @@ -266,4 +281,4 @@ mod tests { ); assert_eq!(call_error_to_http_status_with_identity(&error, None), 404); } -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/server/adapter.rs b/crates/alknet-http/src/server/adapter.rs index 52bfb69..3ac501a 100644 --- a/crates/alknet-http/src/server/adapter.rs +++ b/crates/alknet-http/src/server/adapter.rs @@ -694,4 +694,22 @@ mod tests { ); assert!(response.contains("location: https://example.com")); } + + #[tokio::test] + async fn openapi_json_route_serves_gateway_spec() { + let adapter = HttpAdapter::new(provider(), empty_registry()); + let request = b"GET /openapi.json HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"; + let response = serve_and_read(adapter, request).await; + assert!( + response.starts_with("HTTP/1.1 200"), + "expected 200 for /openapi.json, got: {response}" + ); + assert!(response.contains("\"openapi\"")); + assert!(response.contains("\"/search\"")); + assert!(response.contains("\"/schema\"")); + assert!(response.contains("\"/call\"")); + assert!(response.contains("\"/batch\"")); + assert!(response.contains("\"/subscribe\"")); + assert!(response.contains("\"1.0.0\"")); + } } diff --git a/crates/alknet-http/src/server/auth.rs b/crates/alknet-http/src/server/auth.rs index 137ead5..e212e40 100644 --- a/crates/alknet-http/src/server/auth.rs +++ b/crates/alknet-http/src/server/auth.rs @@ -80,11 +80,12 @@ where { type Rejection = Infallible; - async fn from_request_parts( - parts: &mut Parts, - _state: &S, - ) -> Result { - let identity = parts.extensions.get::>().cloned().flatten(); + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { + let identity = parts + .extensions + .get::>() + .cloned() + .flatten(); Ok(ResolvedIdentity(identity)) } } @@ -174,15 +175,16 @@ mod tests { assert!(identity.is_none()); } - async fn run_middleware( - idp: Arc, - request: Request, - ) -> Response { + async fn run_middleware(idp: Arc, request: Request) -> Response { let app: Router<()> = Router::new() .route( "/", get(|req: Request| async move { - let identity = req.extensions().get::>().cloned().flatten(); + let identity = req + .extensions() + .get::>() + .cloned() + .flatten(); if let Some(id) = identity { (StatusCode::OK, id.id) } else { @@ -261,14 +263,12 @@ mod tests { let app: Router<()> = Router::new() .route( "/", - get( - |ResolvedIdentity(identity): ResolvedIdentity| async move { - match identity { - Some(id) => (StatusCode::OK, id.id), - None => (StatusCode::OK, "none".to_string()), - } - }, - ), + get(|ResolvedIdentity(identity): ResolvedIdentity| async move { + match identity { + Some(id) => (StatusCode::OK, id.id), + None => (StatusCode::OK, "none".to_string()), + } + }), ) .layer(from_fn_with_state(idp, bearer_auth_middleware)); @@ -287,14 +287,12 @@ mod tests { let app: Router<()> = Router::new() .route( "/", - get( - |ResolvedIdentity(identity): ResolvedIdentity| async move { - match identity { - Some(id) => (StatusCode::OK, id.id), - None => (StatusCode::OK, "none".to_string()), - } - }, - ), + get(|ResolvedIdentity(identity): ResolvedIdentity| async move { + match identity { + Some(id) => (StatusCode::OK, id.id), + None => (StatusCode::OK, "none".to_string()), + } + }), ) .layer(from_fn_with_state(idp, bearer_auth_middleware)); @@ -306,4 +304,4 @@ mod tests { .unwrap(); assert_eq!(&bytes[..], b"none"); } -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/server/decoy.rs b/crates/alknet-http/src/server/decoy.rs index 58cb514..96c76c0 100644 --- a/crates/alknet-http/src/server/decoy.rs +++ b/crates/alknet-http/src/server/decoy.rs @@ -33,10 +33,8 @@ pub fn fake_nginx_404() -> Response { header::CONTENT_TYPE, HeaderValue::from_static("text/html; charset=utf-8"), ); - resp.headers_mut().insert( - header::SERVER, - HeaderValue::from_static("nginx"), - ); + resp.headers_mut() + .insert(header::SERVER, HeaderValue::from_static("nginx")); resp } @@ -61,10 +59,8 @@ pub async fn serve_static(root: &Path, request: Request) -> Response { let content_type = mime_for_path(&resolved); let mut resp = Response::new(Body::from(bytes)); *resp.status_mut() = StatusCode::OK; - resp.headers_mut().insert( - header::CONTENT_TYPE, - HeaderValue::from_static(content_type), - ); + resp.headers_mut() + .insert(header::CONTENT_TYPE, HeaderValue::from_static(content_type)); resp } Err(_) => fake_nginx_404(), @@ -173,10 +169,7 @@ mod tests { async fn send(router: axum::Router, uri: &str) -> axum::response::Response { tower::ServiceExt::>::oneshot( router, - Request::builder() - .uri(uri) - .body(Body::empty()) - .unwrap(), + Request::builder().uri(uri).body(Body::empty()).unwrap(), ) .await .unwrap() @@ -220,9 +213,7 @@ mod tests { async fn unknown_path_with_static_site_decoy_serves_file() { let dir = tempfile_dir(); let file = dir.join("index.html"); - tokio::fs::write(&file, "

hello

") - .await - .unwrap(); + tokio::fs::write(&file, "

hello

").await.unwrap(); let decoy = DecoyConfig::StaticSite { root: dir.clone() }; let resp = send(decoy_router(decoy), "/").await; @@ -293,11 +284,9 @@ mod tests { } fn tempfile_dir() -> PathBuf { - let dir = PathBuf::from("/tmp").join(format!( - "alknet-http-decoy-test-{}", - uuid::Uuid::new_v4() - )); + let dir = + PathBuf::from("/tmp").join(format!("alknet-http-decoy-test-{}", uuid::Uuid::new_v4())); std::fs::create_dir_all(&dir).unwrap(); dir } -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/server/gateway_routes.rs b/crates/alknet-http/src/server/gateway_routes.rs index 595f7c7..4b7b8bc 100644 --- a/crates/alknet-http/src/server/gateway_routes.rs +++ b/crates/alknet-http/src/server/gateway_routes.rs @@ -52,13 +52,19 @@ impl GatewayState { } fn dispatch(&self) -> GatewayDispatch { - GatewayDispatch::new(Arc::clone(&self.registry), Arc::clone(&self.identity_provider)) + GatewayDispatch::new( + Arc::clone(&self.registry), + Arc::clone(&self.identity_provider), + ) } } impl FromRef for GatewayState { fn from_ref(state: &RouterState) -> Self { - GatewayState::new(Arc::clone(&state.registry), Arc::clone(&state.identity_provider)) + GatewayState::new( + Arc::clone(&state.registry), + Arc::clone(&state.identity_provider), + ) } } @@ -92,7 +98,9 @@ pub(crate) async fn call_handler( return not_found_response(&request.operation); } let dispatch = state.dispatch(); - let envelope = dispatch.invoke(identity.clone(), &request.operation, request.input).await; + let envelope = dispatch + .invoke(identity.clone(), &request.operation, request.input) + .await; envelope_to_response(envelope, identity.as_ref()) } @@ -101,7 +109,9 @@ pub(crate) async fn search_handler( ResolvedIdentity(identity): ResolvedIdentity, ) -> Response { let dispatch = state.dispatch(); - let envelope = dispatch.invoke(identity.clone(), SERVICES_LIST, json!({})).await; + let envelope = dispatch + .invoke(identity.clone(), SERVICES_LIST, json!({})) + .await; envelope_to_response(envelope, identity.as_ref()) } @@ -115,7 +125,11 @@ pub(crate) async fn schema_handler( } let dispatch = state.dispatch(); let envelope = dispatch - .invoke(identity.clone(), SERVICES_SCHEMA, json!({ "name": query.name })) + .invoke( + identity.clone(), + SERVICES_SCHEMA, + json!({ "name": query.name }), + ) .await; envelope_to_response(envelope, identity.as_ref()) } @@ -149,7 +163,9 @@ pub(crate) async fn subscribe_handler( subscribe_stream_internal_error(request.operation) } else { let dispatch = state.dispatch(); - let envelope = dispatch.invoke(identity, &request.operation, request.input).await; + let envelope = dispatch + .invoke(identity, &request.operation, request.input) + .await; subscribe_stream_from_envelope(envelope) }; Sse::new(stream) @@ -221,8 +237,7 @@ fn not_found_response(operation: &str) -> Response { fn forbidden_response(message: String, identity: Option<&Identity>) -> Response { let error = CallError::forbidden(message); let status_code = call_error_to_http_status_with_identity(&error, identity); - let status = - StatusCode::from_u16(status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + let status = StatusCode::from_u16(status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); let body = serde_json::to_value(&error).unwrap_or(Value::Null); (status, Json(body)).into_response() } @@ -248,7 +263,9 @@ fn is_internal_op(registry: &OperationRegistry, operation: &str) -> bool { } } -fn envelope_to_sse_stream(envelope: ResponseEnvelope) -> impl Stream> { +fn envelope_to_sse_stream( + envelope: ResponseEnvelope, +) -> impl Stream> { stream::once(async move { match envelope.result { Ok(output) => { @@ -756,7 +773,10 @@ mod tests { .get(axum::http::header::CONTENT_TYPE) .map(|v| v.to_str().unwrap().to_string()); assert!( - ctype.as_deref().unwrap_or("").starts_with("text/event-stream"), + ctype + .as_deref() + .unwrap_or("") + .starts_with("text/event-stream"), "expected text/event-stream, got {ctype:?}" ); let bytes = resp.into_body().collect().await.unwrap().to_bytes(); @@ -950,4 +970,4 @@ mod tests { assert_eq!(status, StatusCode::NOT_FOUND); assert_eq!(body.get("code"), Some(&json!("NOT_FOUND"))); } -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/server/healthz.rs b/crates/alknet-http/src/server/healthz.rs index 7a23b20..8d101c7 100644 --- a/crates/alknet-http/src/server/healthz.rs +++ b/crates/alknet-http/src/server/healthz.rs @@ -59,4 +59,4 @@ mod tests { let resp = call_healthz(req).await; assert_eq!(resp.status(), StatusCode::OK); } -} \ No newline at end of file +} diff --git a/crates/alknet-http/src/websocket/mod.rs b/crates/alknet-http/src/websocket/mod.rs index 8712f97..e594e15 100644 --- a/crates/alknet-http/src/websocket/mod.rs +++ b/crates/alknet-http/src/websocket/mod.rs @@ -128,7 +128,10 @@ mod tests { let out: EventEnvelope = response.into(); assert_eq!(out.r#type, EVENT_RESPONDED); assert_eq!(out.id, "ws-rt-1"); - assert_eq!(out.payload.get("output"), Some(&serde_json::json!({ "v": 7 }))); + assert_eq!( + out.payload.get("output"), + Some(&serde_json::json!({ "v": 7 })) + ); } #[tokio::test] @@ -160,7 +163,10 @@ mod tests { async fn ws_overlay_only_connection_holds_overlay_and_pending() { let conn = CallConnection::new_overlay_only(identity("ws-peer")); assert!(conn.connection().is_none()); - assert_eq!(conn.identity().map(|i| i.id.clone()), Some("ws-peer".to_string())); + assert_eq!( + conn.identity().map(|i| i.id.clone()), + Some("ws-peer".to_string()) + ); assert!(conn.pending().lock().is_empty()); let env = conn.overlay_env(); diff --git a/crates/alknet-http/src/websocket/upgrade.rs b/crates/alknet-http/src/websocket/upgrade.rs index 4d89dff..cef07bf 100644 --- a/crates/alknet-http/src/websocket/upgrade.rs +++ b/crates/alknet-http/src/websocket/upgrade.rs @@ -84,8 +84,9 @@ async fn ws_upgrade_handler_inner( }; match ws_upgrade { - Some(upgrade) => upgrade - .on_upgrade(move |socket| run_ws_session(socket, registry, identity_provider, identity)), + Some(upgrade) => upgrade.on_upgrade(move |socket| { + run_ws_session(socket, registry, identity_provider, identity) + }), None => { let _ = registry; let _ = identity_provider; @@ -240,19 +241,19 @@ fn serialize_envelope(envelope: &EventEnvelope) -> Result, serde_json::E #[cfg(test)] mod tests { use super::*; + use alknet_call::registry::context::{ + AbortPolicy, CompositionAuthority, OperationContext, ScopedPeerEnv, + }; use alknet_call::registry::discovery::{ services_list_handler, services_list_spec, services_schema_handler, services_schema_spec, }; + use alknet_call::registry::env::OperationEnv; use alknet_call::registry::registration::{ make_handler, HandlerRegistration, OperationProvenance, }; use alknet_call::registry::spec::{AccessControl, OperationSpec, OperationType, Visibility}; use alknet_core::auth::{AuthToken, Identity}; use alknet_core::types::Capabilities; - use alknet_call::registry::context::{ - AbortPolicy, CompositionAuthority, OperationContext, ScopedPeerEnv, - }; - use alknet_call::registry::env::OperationEnv; use std::collections::HashMap; use std::sync::Mutex as StdMutex; use std::time::{Duration, Instant}; @@ -331,9 +332,7 @@ mod tests { let mut registry = OperationRegistry::new(); registry.register(HandlerRegistration::new( external_spec("echo/run", AccessControl::default()), - make_handler(|input, ctx| async move { - ResponseEnvelope::ok(ctx.request_id, input) - }), + make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), OperationProvenance::Local, None, None, @@ -352,9 +351,7 @@ mod tests { ..Default::default() }, ), - make_handler(|input, ctx| async move { - ResponseEnvelope::ok(ctx.request_id, input) - }), + make_handler(|input, ctx| async move { ResponseEnvelope::ok(ctx.request_id, input) }), OperationProvenance::Local, None, None, @@ -519,9 +516,8 @@ mod tests { #[tokio::test] async fn handle_inbound_envelope_forbidden_yields_call_error() { let registry = registry_with_restricted_op(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("none", identity("unpriv")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("none", identity("unpriv"))); let dp = dispatcher(registry, provider); let conn = Arc::new(CallConnection::new_overlay_only(identity("unpriv"))); @@ -727,9 +723,8 @@ mod tests { #[tokio::test] async fn round_trip_call_requested_to_call_responded_over_ws_message_stream() { let registry = echo_registry(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let dp = dispatcher(Arc::clone(®istry), Arc::clone(&provider)); let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); @@ -753,9 +748,8 @@ mod tests { #[tokio::test] async fn subscription_streams_multiple_call_responded_events() { let registry = registry_with_subscription(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let dp = dispatcher(registry, provider); let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); @@ -782,8 +776,10 @@ mod tests { .with_token("no-admin", identity_with_scopes("user", &["user"])), ); let dp = dispatcher(registry, provider); - let conn = - Arc::new(CallConnection::new_overlay_only(identity_with_scopes("user", &["user"]))); + let conn = Arc::new(CallConnection::new_overlay_only(identity_with_scopes( + "user", + &["user"], + ))); let request = EventEnvelope::requested( "req-admin", @@ -882,8 +878,10 @@ mod tests { let overlay_env = conn.overlay_env(); assert!(overlay_env.contains("ui/dragged")); - let composed_env: Arc = dp - .compose_root_env(&conn, &root_context_for_compose("hub-call-1", overlay_env.clone())); + let composed_env: Arc = dp.compose_root_env( + &conn, + &root_context_for_compose("hub-call-1", overlay_env.clone()), + ); let ctx = root_context_with_env("hub-call-1", composed_env); let response = overlay_env .invoke("ui", "dragged", serde_json::json!({ "x": 5 }), &ctx) @@ -935,9 +933,8 @@ mod tests { #[tokio::test] async fn drive_ws_session_round_trips_binary_call_requested_to_call_responded() { let registry = echo_registry(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let dp = dispatcher(Arc::clone(®istry), Arc::clone(&provider)); let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); @@ -960,7 +957,10 @@ mod tests { let env: EventEnvelope = serde_json::from_slice(&bytes).unwrap(); assert_eq!(env.r#type, EVENT_RESPONDED); assert_eq!(env.id, "ws-socket-1"); - assert_eq!(env.payload.get("output"), Some(&serde_json::json!({ "v": 7 }))); + assert_eq!( + env.payload.get("output"), + Some(&serde_json::json!({ "v": 7 })) + ); } other => panic!("expected binary, got {other:?}"), } @@ -972,9 +972,8 @@ mod tests { #[tokio::test] async fn drive_ws_session_rejects_text_with_protocol_close() { let registry = echo_registry(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let dp = dispatcher(Arc::clone(®istry), Arc::clone(&provider)); let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); @@ -999,9 +998,8 @@ mod tests { #[tokio::test] async fn drive_ws_session_disconnect_aborts_in_flight_pending() { let registry = echo_registry(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let dp = dispatcher(Arc::clone(®istry), Arc::clone(&provider)); let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); @@ -1036,9 +1034,8 @@ mod tests { #[tokio::test] async fn drive_ws_session_subscription_streams_call_responded_events() { let registry = registry_with_subscription(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let dp = dispatcher(Arc::clone(®istry), Arc::clone(&provider)); let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); @@ -1077,9 +1074,8 @@ mod tests { #[tokio::test] async fn drive_ws_session_invalid_binary_closes_with_protocol_error() { let registry = echo_registry(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let dp = dispatcher(Arc::clone(®istry), Arc::clone(&provider)); let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); @@ -1102,9 +1098,8 @@ mod tests { #[tokio::test] async fn drive_ws_session_client_close_terminates_server() { let registry = echo_registry(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let dp = dispatcher(Arc::clone(®istry), Arc::clone(&provider)); let conn = Arc::new(CallConnection::new_overlay_only(identity("ws-peer"))); @@ -1164,17 +1159,11 @@ mod tests { } async fn send_text(&mut self, text: String) { - self.outbound_tx - .send(Message::Text(text.into())) - .await - .ok(); + self.outbound_tx.send(Message::Text(text.into())).await.ok(); } async fn send_close(&mut self) { - self.outbound_tx - .send(Message::Close(None)) - .await - .ok(); + self.outbound_tx.send(Message::Close(None)).await.ok(); } async fn close(&mut self) { @@ -1215,9 +1204,8 @@ mod tests { #[tokio::test] async fn ws_upgrade_handler_returns_401_when_identity_is_none() { let registry = echo_registry(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let identity: Option = None; let response = ws_upgrade_handler_inner(registry, provider, identity, None).await; @@ -1227,12 +1215,11 @@ mod tests { #[tokio::test] async fn ws_upgrade_handler_does_not_reject_when_identity_present() { let registry = echo_registry(); - let provider: Arc = Arc::new( - StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer")), - ); + let provider: Arc = + Arc::new(StaticIdentityProvider::new().with_token("ws-token", identity("ws-peer"))); let identity = identity("ws-peer"); let response = ws_upgrade_handler_inner(registry, provider, Some(identity), None).await; assert_ne!(response.status(), StatusCode::UNAUTHORIZED); } -} \ No newline at end of file +} diff --git a/crates/alknet-http/tests/from_mcp_integration.rs b/crates/alknet-http/tests/from_mcp_integration.rs index 9d49867..afd69f6 100644 --- a/crates/alknet-http/tests/from_mcp_integration.rs +++ b/crates/alknet-http/tests/from_mcp_integration.rs @@ -9,11 +9,11 @@ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant}; +use alknet_call::client::OperationAdapter; use alknet_call::protocol::wire::ResponseEnvelope; use alknet_call::registry::context::{AbortPolicy, OperationContext, ScopedPeerEnv}; use alknet_call::registry::env::OperationEnv; use alknet_call::registry::registration::OperationProvenance; -use alknet_call::client::OperationAdapter; use alknet_core::types::Capabilities; use alknet_http::adapters::FromMCP; use axum::Router; @@ -22,8 +22,8 @@ use rmcp::model::{ }; use rmcp::service::RequestContext; use rmcp::transport::{ - StreamableHttpServerConfig, streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService}, + StreamableHttpServerConfig, }; use rmcp::{RoleServer, ServerHandler}; use serde_json::Value; @@ -72,18 +72,19 @@ impl ServerHandler for EchoServer { &self, _request: Option, _context: RequestContext, - ) -> impl std::future::Future< - Output = Result, - > + rmcp::service::MaybeSendFuture + '_ { + ) -> impl std::future::Future> + + rmcp::service::MaybeSendFuture + + '_ { let tools = vec![ Tool::new_with_raw( "echo", Some("Echo the input back as structured content".into()), Arc::new(serde_json::Map::new()), ) - .with_raw_output_schema(Arc::new(serde_json::Map::from_iter([ - ("type".to_string(), Value::String("object".into())), - ]))), + .with_raw_output_schema(Arc::new(serde_json::Map::from_iter([( + "type".to_string(), + Value::String("object".into()), + )]))), Tool::new_with_raw( "legacy", Some("Legacy tool returning text content blocks".into()), @@ -101,22 +102,17 @@ impl ServerHandler for EchoServer { &self, request: CallToolRequestParams, _context: RequestContext, - ) -> impl std::future::Future< - Output = Result, - > + rmcp::service::MaybeSendFuture + '_ { + ) -> impl std::future::Future> + + rmcp::service::MaybeSendFuture + + '_ { let name = request.name.to_string(); std::future::ready(Ok(match name.as_str() { "echo" => { - let args = request - .arguments - .map(Value::Object) - .unwrap_or(Value::Null); + let args = request.arguments.map(Value::Object).unwrap_or(Value::Null); CallToolResult::structured(serde_json::json!({ "echoed": args })) } "legacy" => CallToolResult::success(vec![Content::text("plain text result")]), - other => CallToolResult::error(vec![Content::text(format!( - "unknown tool: {other}" - ))]), + other => CallToolResult::error(vec![Content::text(format!("unknown tool: {other}"))]), })) } @@ -234,4 +230,4 @@ async fn import_unreachable_server_returns_discovery_failed() { Err(alknet_call::client::AdapterError::Transport { .. }) => {} Err(other) => panic!("expected DiscoveryFailed or Transport, got {other}"), } -} \ No newline at end of file +}