FromMCP (OperationAdapter, feature-gated behind mcp) discovers remote MCP tools over streamable HTTP via rmcp's StreamableHttpClientTransport, calls tools/list, and registers each as a HandlerRegistration bundle with a forwarding handler that calls the remote tool via tools/call. Output handling follows the structuredContent-preferred-over-content-blocks rule: declared outputSchema + structuredContent is the composable result; absent outputSchema falls back to the MCP ContentBlock union. isError:true maps to a CallError with the error content. No-env-vars invariant: the handler reads context.capabilities (injected at registration), never std::env::var (ADR-014). Streamable HTTP only — stdio is not built (ADR-037). Provenance is FromMCP (leaf: composition_authority None, scoped_env None, Internal by default, ADR-015/022). Includes unit tests for schema/mapping logic and an integration test that spins up a real rmcp streamable HTTP server and exercises the forwarding handler end-to-end.
237 lines
8.2 KiB
Rust
237 lines
8.2 KiB
Rust
//! Integration test for `FromMCP`: spins up a real rmcp streamable HTTP MCP
|
|
//! server, imports its tools via `FromMCP::import()`, and invokes a
|
|
//! forwarding handler end-to-end. Verifies the handler calls the remote MCP
|
|
//! tool via rmcp and reads `context.capabilities` (not `std::env::var`).
|
|
|
|
#![cfg(feature = "mcp")]
|
|
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use alknet_call::protocol::wire::ResponseEnvelope;
|
|
use alknet_call::registry::context::{AbortPolicy, OperationContext, ScopedPeerEnv};
|
|
use alknet_call::registry::env::OperationEnv;
|
|
use alknet_call::registry::registration::OperationProvenance;
|
|
use alknet_call::client::OperationAdapter;
|
|
use alknet_core::types::Capabilities;
|
|
use alknet_http::adapters::FromMCP;
|
|
use axum::Router;
|
|
use rmcp::model::{
|
|
CallToolRequestParams, CallToolResult, Content, ListToolsResult, PaginatedRequestParams, Tool,
|
|
};
|
|
use rmcp::service::RequestContext;
|
|
use rmcp::transport::{
|
|
StreamableHttpServerConfig,
|
|
streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService},
|
|
};
|
|
use rmcp::{RoleServer, ServerHandler};
|
|
use serde_json::Value;
|
|
|
|
struct NoopEnv;
|
|
|
|
#[async_trait::async_trait]
|
|
impl OperationEnv for NoopEnv {
|
|
async fn invoke_with_policy(
|
|
&self,
|
|
_ns: &str,
|
|
_op: &str,
|
|
_input: Value,
|
|
parent: &OperationContext,
|
|
_policy: AbortPolicy,
|
|
) -> ResponseEnvelope {
|
|
ResponseEnvelope::ok(parent.request_id.clone(), Value::Null)
|
|
}
|
|
|
|
fn contains(&self, _name: &str) -> bool {
|
|
false
|
|
}
|
|
}
|
|
|
|
fn test_context(request_id: &str, caps: Capabilities) -> OperationContext {
|
|
OperationContext {
|
|
request_id: request_id.to_string(),
|
|
parent_request_id: None,
|
|
identity: None,
|
|
handler_identity: None,
|
|
forwarded_for: None,
|
|
capabilities: caps,
|
|
metadata: HashMap::new(),
|
|
scoped_env: ScopedPeerEnv::empty(),
|
|
env: Arc::new(NoopEnv),
|
|
abort_policy: AbortPolicy::default(),
|
|
deadline: Some(Instant::now() + Duration::from_secs(30)),
|
|
internal: true,
|
|
}
|
|
}
|
|
|
|
struct EchoServer;
|
|
|
|
impl ServerHandler for EchoServer {
|
|
fn list_tools(
|
|
&self,
|
|
_request: Option<PaginatedRequestParams>,
|
|
_context: RequestContext<RoleServer>,
|
|
) -> impl std::future::Future<
|
|
Output = Result<ListToolsResult, rmcp::ErrorData>,
|
|
> + rmcp::service::MaybeSendFuture + '_ {
|
|
let tools = vec![
|
|
Tool::new_with_raw(
|
|
"echo",
|
|
Some("Echo the input back as structured content".into()),
|
|
Arc::new(serde_json::Map::new()),
|
|
)
|
|
.with_raw_output_schema(Arc::new(serde_json::Map::from_iter([
|
|
("type".to_string(), Value::String("object".into())),
|
|
]))),
|
|
Tool::new_with_raw(
|
|
"legacy",
|
|
Some("Legacy tool returning text content blocks".into()),
|
|
Arc::new(serde_json::Map::new()),
|
|
),
|
|
];
|
|
std::future::ready(Ok(ListToolsResult {
|
|
meta: None,
|
|
next_cursor: None,
|
|
tools,
|
|
}))
|
|
}
|
|
|
|
fn call_tool(
|
|
&self,
|
|
request: CallToolRequestParams,
|
|
_context: RequestContext<RoleServer>,
|
|
) -> impl std::future::Future<
|
|
Output = Result<CallToolResult, rmcp::ErrorData>,
|
|
> + rmcp::service::MaybeSendFuture + '_ {
|
|
let name = request.name.to_string();
|
|
std::future::ready(Ok(match name.as_str() {
|
|
"echo" => {
|
|
let args = request
|
|
.arguments
|
|
.map(Value::Object)
|
|
.unwrap_or(Value::Null);
|
|
CallToolResult::structured(serde_json::json!({ "echoed": args }))
|
|
}
|
|
"legacy" => CallToolResult::success(vec![Content::text("plain text result")]),
|
|
other => CallToolResult::error(vec![Content::text(format!(
|
|
"unknown tool: {other}"
|
|
))]),
|
|
}))
|
|
}
|
|
|
|
fn get_info(&self) -> rmcp::model::ServerInfo {
|
|
rmcp::model::ServerInfo::default()
|
|
}
|
|
}
|
|
|
|
async fn spawn_server() -> (String, tokio::task::JoinHandle<()>) {
|
|
let mcp_service: StreamableHttpService<EchoServer, LocalSessionManager> =
|
|
StreamableHttpService::new(
|
|
|| Ok(EchoServer),
|
|
LocalSessionManager::default().into(),
|
|
StreamableHttpServerConfig::default(),
|
|
);
|
|
let app = Router::new().nest_service("/mcp", mcp_service);
|
|
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
|
let addr = listener.local_addr().unwrap();
|
|
let handle = tokio::spawn(async move {
|
|
axum::serve(listener, app).await.unwrap();
|
|
});
|
|
(format!("http://{addr}/mcp"), handle)
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn import_discovers_tools_and_builds_registrations() {
|
|
let (endpoint, _handle) = spawn_server().await;
|
|
let adapter = FromMCP::new(endpoint, "echo");
|
|
let bundles = adapter
|
|
.import()
|
|
.await
|
|
.expect("import succeeds against running server");
|
|
assert_eq!(bundles.len(), 2);
|
|
let names: Vec<&str> = bundles.iter().map(|b| b.spec.name.as_str()).collect();
|
|
assert!(names.contains(&"echo/echo"));
|
|
assert!(names.contains(&"echo/legacy"));
|
|
for b in &bundles {
|
|
assert_eq!(b.provenance, OperationProvenance::FromMCP);
|
|
assert!(b.composition_authority.is_none());
|
|
assert!(b.scoped_env.is_none());
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn forwarding_handler_calls_echo_and_returns_structured_content() {
|
|
let (endpoint, _handle) = spawn_server().await;
|
|
let adapter = FromMCP::new(endpoint, "echo");
|
|
let bundles = adapter.import().await.expect("import succeeds");
|
|
let echo = bundles
|
|
.into_iter()
|
|
.find(|b| b.spec.name == "echo/echo")
|
|
.expect("echo tool present");
|
|
|
|
let caps = Capabilities::new().with_http_token("mcp", "unused-on-server".to_string());
|
|
let ctx = test_context("req-echo", caps);
|
|
let input = serde_json::json!({ "msg": "hello" });
|
|
let response = (echo.handler)(input, ctx).await;
|
|
|
|
assert_eq!(response.request_id, "req-echo");
|
|
match response.result {
|
|
Ok(v) => {
|
|
let obj = v.as_object().expect("structured object");
|
|
assert!(obj.contains_key("echoed"));
|
|
}
|
|
Err(e) => panic!("expected Ok, got Err: {e:?}"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn forwarding_handler_calls_legacy_and_returns_content_blocks() {
|
|
let (endpoint, _handle) = spawn_server().await;
|
|
let adapter = FromMCP::new(endpoint, "echo");
|
|
let bundles = adapter.import().await.expect("import succeeds");
|
|
let legacy = bundles
|
|
.into_iter()
|
|
.find(|b| b.spec.name == "echo/legacy")
|
|
.expect("legacy tool present");
|
|
|
|
let ctx = test_context("req-legacy", Capabilities::new());
|
|
let response = (legacy.handler)(serde_json::json!({}), ctx).await;
|
|
|
|
match response.result {
|
|
Ok(Value::Array(blocks)) => {
|
|
assert_eq!(blocks.len(), 1);
|
|
assert_eq!(blocks[0]["type"], "text");
|
|
assert_eq!(blocks[0]["text"], "plain text result");
|
|
}
|
|
other => panic!("expected array of content blocks, got {other:?}"),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn forwarding_handler_does_not_read_env_vars() {
|
|
std::env::set_var("MCP_TOKEN", "should-not-be-used");
|
|
let (endpoint, _handle) = spawn_server().await;
|
|
let adapter = FromMCP::new(endpoint, "echo");
|
|
let bundles = adapter.import().await.expect("import succeeds");
|
|
let echo = bundles
|
|
.into_iter()
|
|
.find(|b| b.spec.name == "echo/echo")
|
|
.expect("echo tool present");
|
|
|
|
let ctx = test_context("req-noenv", Capabilities::new());
|
|
let response = (echo.handler)(serde_json::json!({ "x": 1 }), ctx).await;
|
|
assert!(response.result.is_ok(), "handler works without env var");
|
|
std::env::remove_var("MCP_TOKEN");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn import_unreachable_server_returns_discovery_failed() {
|
|
let adapter = FromMCP::new("http://127.0.0.1:1/mcp", "x");
|
|
match adapter.import().await {
|
|
Ok(_) => panic!("expected Err for unreachable server"),
|
|
Err(alknet_call::client::AdapterError::DiscoveryFailed { .. }) => {}
|
|
Err(alknet_call::client::AdapterError::Transport { .. }) => {}
|
|
Err(other) => panic!("expected DiscoveryFailed or Transport, got {other}"),
|
|
}
|
|
} |