Implement IrohTransport and IrohAcceptor (feature-gated iroh)
Add iroh QUIC P2P transport using tokio::io::join for stream duplexing per ADR-003. Default relay is n0's https://relay.iroh.network/ (ADR-009). Proxy URL passed to Endpoint::builder (ADR-010). Integration test marked #[ignore] for CI since it requires iroh relay connectivity.
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -5476,6 +5476,7 @@ dependencies = [
|
|||||||
"anyhow",
|
"anyhow",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"iroh",
|
"iroh",
|
||||||
|
"rand 0.8.6",
|
||||||
"russh",
|
"russh",
|
||||||
"rustls",
|
"rustls",
|
||||||
"rustls-acme",
|
"rustls-acme",
|
||||||
@@ -5485,6 +5486,7 @@ dependencies = [
|
|||||||
"tokio-rustls",
|
"tokio-rustls",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
"url",
|
||||||
"wraith-core",
|
"wraith-core",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ name = "wraith_core"
|
|||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
tls = ["dep:tokio-rustls", "dep:rustls"]
|
tls = ["dep:tokio-rustls", "dep:rustls"]
|
||||||
iroh = ["dep:iroh"]
|
iroh = ["dep:iroh", "dep:url"]
|
||||||
acme = ["dep:rustls-acme", "tls"]
|
acme = ["dep:rustls-acme", "tls"]
|
||||||
testutil = []
|
testutil = []
|
||||||
transport-traits = []
|
transport-traits = []
|
||||||
@@ -25,8 +25,10 @@ tokio-rustls = { version = "0.26", optional = true }
|
|||||||
rustls = { version = "0.23", optional = true }
|
rustls = { version = "0.23", optional = true }
|
||||||
rustls-acme = { version = "0.12", optional = true }
|
rustls-acme = { version = "0.12", optional = true }
|
||||||
iroh = { version = "0.34", optional = true }
|
iroh = { version = "0.34", optional = true }
|
||||||
|
url = { version = "2", optional = true }
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
wraith-core = { path = ".", features = ["testutil"] }
|
wraith-core = { path = ".", features = ["testutil", "iroh"] }
|
||||||
tempfile = "3"
|
tempfile = "3"
|
||||||
|
rand = "0.8"
|
||||||
205
crates/wraith-core/src/transport/iroh_transport.rs
Normal file
205
crates/wraith-core/src/transport/iroh_transport.rs
Normal file
@@ -0,0 +1,205 @@
|
|||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use iroh::{
|
||||||
|
endpoint::RecvStream,
|
||||||
|
node_info::NodeIdExt,
|
||||||
|
Endpoint, NodeId, RelayMap, RelayMode, RelayUrl,
|
||||||
|
};
|
||||||
|
use tokio::io;
|
||||||
|
|
||||||
|
use super::{Transport, TransportAcceptor, TransportInfo, TransportKind};
|
||||||
|
|
||||||
|
const ALPN: &[u8] = b"wraith-ssh";
|
||||||
|
const DEFAULT_RELAY_URL: &str = "https://relay.iroh.network/";
|
||||||
|
|
||||||
|
/// A client-side iroh QUIC P2P transport that connects to a remote iroh endpoint.
|
||||||
|
///
|
||||||
|
/// Connects via `Endpoint::connect(node_id, alpn)`, opens a bidirectional
|
||||||
|
/// QUIC stream with `conn.open_bi()`, and joins the halves with
|
||||||
|
/// `tokio::io::join(recv, send)` to produce a duplex stream for russh.
|
||||||
|
/// Per ADR-003, `tokio::io::join` is used instead of a custom wrapper.
|
||||||
|
pub struct IrohTransport {
|
||||||
|
node_id: NodeId,
|
||||||
|
endpoint: Endpoint,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IrohTransport {
|
||||||
|
pub async fn new(
|
||||||
|
node_id: NodeId,
|
||||||
|
relay_url: Option<RelayUrl>,
|
||||||
|
proxy_url: Option<url::Url>,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let relay_url = relay_url.unwrap_or_else(|| {
|
||||||
|
DEFAULT_RELAY_URL.parse().expect("default relay URL is valid")
|
||||||
|
});
|
||||||
|
let relay_map = RelayMap::from_url(relay_url);
|
||||||
|
let mut builder = Endpoint::builder()
|
||||||
|
.relay_mode(RelayMode::Custom(relay_map))
|
||||||
|
.alpns(vec![ALPN.to_vec()]);
|
||||||
|
if let Some(ref proxy) = proxy_url {
|
||||||
|
builder = builder.proxy_url(proxy.clone());
|
||||||
|
}
|
||||||
|
let endpoint = builder.bind().await?;
|
||||||
|
Ok(Self { node_id, endpoint })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn endpoint_id(&self) -> String {
|
||||||
|
self.endpoint.node_id().to_z32()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn endpoint(&self) -> &Endpoint {
|
||||||
|
&self.endpoint
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Transport for IrohTransport {
|
||||||
|
type Stream = io::Join<RecvStream, iroh::endpoint::SendStream>;
|
||||||
|
|
||||||
|
async fn connect(&self) -> Result<Self::Stream> {
|
||||||
|
let conn = self.endpoint.connect(self.node_id, ALPN).await?;
|
||||||
|
let (send, recv) = conn.open_bi().await?;
|
||||||
|
Ok(io::join(recv, send))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn describe(&self) -> String {
|
||||||
|
format!("iroh://{}", self.node_id.to_z32())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A server-side iroh QUIC P2P transport acceptor that listens for incoming connections.
|
||||||
|
///
|
||||||
|
/// Binds an iroh `Endpoint` with the configured relay URL and optional proxy
|
||||||
|
/// (ADR-010). Accepts incoming connections, accepts bidirectional QUIC streams,
|
||||||
|
/// and joins the halves with `tokio::io::join(recv, send)`. Exposes
|
||||||
|
/// `endpoint_id()` for CLI display of the server's z-base-32 node ID.
|
||||||
|
pub struct IrohAcceptor {
|
||||||
|
endpoint: Endpoint,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl IrohAcceptor {
|
||||||
|
pub async fn bind(
|
||||||
|
relay_url: Option<RelayUrl>,
|
||||||
|
proxy_url: Option<url::Url>,
|
||||||
|
) -> Result<Self> {
|
||||||
|
let relay_url = relay_url.unwrap_or_else(|| {
|
||||||
|
DEFAULT_RELAY_URL.parse().expect("default relay URL is valid")
|
||||||
|
});
|
||||||
|
let relay_map = RelayMap::from_url(relay_url);
|
||||||
|
let mut builder = Endpoint::builder()
|
||||||
|
.relay_mode(RelayMode::Custom(relay_map))
|
||||||
|
.alpns(vec![ALPN.to_vec()]);
|
||||||
|
if let Some(ref proxy) = proxy_url {
|
||||||
|
builder = builder.proxy_url(proxy.clone());
|
||||||
|
}
|
||||||
|
let endpoint = builder.bind().await?;
|
||||||
|
Ok(Self { endpoint })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn endpoint_id(&self) -> String {
|
||||||
|
self.endpoint.node_id().to_z32()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn endpoint(&self) -> &Endpoint {
|
||||||
|
&self.endpoint
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl TransportAcceptor for IrohAcceptor {
|
||||||
|
type Stream = io::Join<RecvStream, iroh::endpoint::SendStream>;
|
||||||
|
|
||||||
|
async fn accept(&self) -> Result<(Self::Stream, TransportInfo)> {
|
||||||
|
let incoming = self
|
||||||
|
.endpoint
|
||||||
|
.accept()
|
||||||
|
.await
|
||||||
|
.ok_or_else(|| anyhow!("endpoint closed"))?;
|
||||||
|
let conn = incoming.await?;
|
||||||
|
let node_id = conn.remote_node_id()?;
|
||||||
|
let (send, recv) = conn.accept_bi().await?;
|
||||||
|
let stream = io::join(recv, send);
|
||||||
|
let info = TransportInfo {
|
||||||
|
remote_addr: None,
|
||||||
|
transport_kind: TransportKind::Iroh {
|
||||||
|
endpoint_id: node_id.to_z32(),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
Ok((stream, info))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn iroh_acceptor_bind_creates_endpoint() {
|
||||||
|
let acceptor = IrohAcceptor::bind(None, None).await.unwrap();
|
||||||
|
let endpoint_id = acceptor.endpoint_id();
|
||||||
|
assert!(!endpoint_id.is_empty());
|
||||||
|
let parsed = NodeId::from_z32(&endpoint_id);
|
||||||
|
assert!(parsed.is_ok());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn iroh_acceptor_bind_with_custom_relay() {
|
||||||
|
let relay: RelayUrl = "https://relay.iroh.network/".parse().unwrap();
|
||||||
|
let acceptor = IrohAcceptor::bind(Some(relay), None).await.unwrap();
|
||||||
|
assert!(!acceptor.endpoint_id().is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn iroh_transport_describe_format() {
|
||||||
|
let node_id: NodeId = iroh::SecretKey::generate(rand::rngs::OsRng)
|
||||||
|
.public()
|
||||||
|
.into();
|
||||||
|
let desc = format!("iroh://{}", node_id.to_z32());
|
||||||
|
assert!(desc.starts_with("iroh://"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn iroh_transport_connect_builds_endpoint() {
|
||||||
|
let node_id: NodeId = iroh::SecretKey::generate(rand::rngs::OsRng)
|
||||||
|
.public()
|
||||||
|
.into();
|
||||||
|
let transport = IrohTransport::new(node_id, None, None).await.unwrap();
|
||||||
|
assert!(transport.describe().starts_with("iroh://"));
|
||||||
|
assert!(!transport.endpoint_id().is_empty());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[ignore]
|
||||||
|
async fn iroh_client_connects_to_iroh_server() {
|
||||||
|
let acceptor = IrohAcceptor::bind(None, None).await.unwrap();
|
||||||
|
let server_node_id = acceptor.endpoint().node_id();
|
||||||
|
|
||||||
|
let transport = IrohTransport::new(server_node_id, None, None)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let mut addrs_watcher = acceptor.endpoint().direct_addresses();
|
||||||
|
addrs_watcher.initialized().await.unwrap();
|
||||||
|
let addr_set = addrs_watcher.get().unwrap().unwrap_or_default();
|
||||||
|
for addr in addr_set {
|
||||||
|
transport
|
||||||
|
.endpoint
|
||||||
|
.add_node_addr(iroh::NodeAddr::from_parts(
|
||||||
|
server_node_id,
|
||||||
|
None,
|
||||||
|
vec![addr.addr],
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
let accept_handle = tokio::spawn(async move {
|
||||||
|
let (stream, info) = acceptor.accept().await.unwrap();
|
||||||
|
assert!(matches!(info.transport_kind, TransportKind::Iroh { .. }));
|
||||||
|
stream
|
||||||
|
});
|
||||||
|
|
||||||
|
let _client_stream: io::Join<RecvStream, iroh::endpoint::SendStream> =
|
||||||
|
transport.connect().await.unwrap();
|
||||||
|
let _server_stream = accept_handle.await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,10 @@
|
|||||||
mod tcp;
|
mod tcp;
|
||||||
|
#[cfg(feature = "iroh")]
|
||||||
|
mod iroh_transport;
|
||||||
|
|
||||||
pub use tcp::{TcpAcceptor, TcpTransport};
|
pub use tcp::{TcpAcceptor, TcpTransport};
|
||||||
|
#[cfg(feature = "iroh")]
|
||||||
|
pub use iroh_transport::{IrohAcceptor, IrohTransport};
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user