# russh-sftp: Client Implementation ## Client Architecture Overview The client side provides two tiers of API: 1. **`RawSftpSession`** — Low-level request-response client that sends individual SFTP packets and awaits their responses. Suitable for custom or non-standard SFTP interactions. 2. **`SftpSession`** — High-level client modeled after `std::fs`. Provides ergonomic methods for file operations and creates `File` objects implementing `AsyncRead`/`AsyncWrite`/`AsyncSeek`. Both operate on a stream type `S: AsyncRead + AsyncWrite + Unpin + Send + 'static`. ## Client Handler Trait The raw client infrastructure uses an internal `Handler` trait to dispatch incoming response packets: ```rust pub trait Handler: Sized { type Error: Into; fn version(&mut self, version: Version) -> impl Future> + Send; fn status(&mut self, status: Status) -> impl Future> + Send; fn handle(&mut self, handle: Handle) -> impl Future> + Send; fn data(&mut self, data: Data) -> impl Future> + Send; fn name(&mut self, name: Name) -> impl Future> + Send; fn attrs(&mut self, attrs: Attrs) -> impl Future> + Send; fn extended_reply(&mut self, reply: ExtendedReply) -> impl Future> + Send; } ``` With the `async-trait` feature enabled, this becomes `#[async_trait]`. The `run()` function spawns two tasks: - **Reader task**: reads packets from the stream, dispatches to the handler - **Writer task**: sends serialized packets from an `mpsc::UnboundedSender` ```rust pub fn run(stream: S, handler: H) -> mpsc::UnboundedSender where S: AsyncRead + AsyncWrite + Unpin + Send + 'static, H: Handler + Send + 'static, ``` The returned sender is the write handle — callers push `Bytes` (serialized packets) into it. ## RawSftpSession `RawSftpSession` is the core request-response client. It implements `Handler` internally via `SessionInner` to route response packets back to awaiting request futures. ### Construction ```rust impl RawSftpSession { pub fn new(stream: S) -> Self where S: AsyncRead + AsyncWrite + Unpin + Send + 'static; pub fn new_with_config(stream: S, cfg: Config) -> Self where S: AsyncRead + AsyncWrite + Unpin + Send + 'static; } ``` ### Internal Architecture ``` ┌─────────────────────────────────────────────────────┐ │ RawSftpSession │ │ │ │ tx: UnboundedSender ←── write to stream │ │ requests: Arc, oneshot::Sender>>│ │ next_req_id: AtomicU32 │ │ handles: AtomicU64 │ │ timeout: AtomicU64 │ │ limits: Limits │ │ │ │ ┌──────────────────────────────────────────────┐ │ │ │ SessionInner (Handler impl) │ │ │ │ │ │ │ │ version() → stores version, replies to init │ │ │ │ status() → replies by request ID │ │ │ │ handle() → replies by request ID │ │ │ │ data() → replies by request ID │ │ │ │ name() → replies by request ID │ │ │ │ attrs() → replies by request ID │ │ │ │ extended_reply() → replies by request ID │ │ │ └──────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ ``` ### Request-Response Mechanism 1. Caller invokes a method like `raw.open(filename, flags, attrs).await` 2. `RawSftpSession` generates a unique `id` via `AtomicU32::fetch_add` 3. Creates a `oneshot::channel` and inserts `(Some(id), sender)` into the `DashMap` 4. Serializes the packet to `Bytes` and sends it via the `tx` channel 5. Awaits the `oneshot::Receiver` with a timeout 6. When the reader task receives a response packet, `SessionInner::reply()` removes the entry from the `DashMap` and sends the `Packet` through the `oneshot` sender 7. The original request future receives the `Packet` and pattern-matches it The `init()` method is special: it uses `id: None` (no request ID) and the version handler stores the negotiated version. ### Limits Tracking ```rust pub struct Limits { pub packet_len: Option, pub read_len: Option, pub write_len: Option, pub open_handles: Option, } ``` - Fetched from the server via the `limits@openssh.com` extension during `SftpSession::new_with_config()` - Enforced client-side: `open()` and `opendir()` check `open_handles`, `read()` checks `read_len`, `write()` and `write_nowait()` check `write_len`, `send()` checks `packet_len` ### API Methods | Method | SFTP Packet | Response | Notes | |--------|------------|----------|-------| | `init()` | `Init` | `Version` | Handshake; version negotiation | | `open(filename, flags, attrs)` | `Open` | `Handle` | Opens a file; increments handle count | | `close(handle)` | `Close` | `Status` | Closes a handle; decrements handle count | | `close_nowait(handle)` | `Close` | — | Fire-and-forget close (returns oneshot receiver) | | `read(handle, offset, len)` | `Read` | `Data` | Reads data; respects `Limits.read_len` | | `write(handle, offset, data)` | `Write` | `Status` | Writes data; respects `Limits.write_len` | | `write_nowait(handle, offset, data)` | `Write` | — | Fire-and-forget write | | `lstat(path)` | `Lstat` | `Attrs` | Stat without following symlinks | | `fstat(handle)` | `Fstat` | `Attrs` | Stat an open handle | | `setstat(path, attrs)` | `SetStat` | `Status` | Set file attributes by path | | `fsetstat(handle, attrs)` | `FSetStat` | `Status` | Set file attributes by handle | | `opendir(path)` | `OpenDir` | `Handle` | Opens a directory; increments handle count | | `readdir(handle)` | `ReadDir` | `Name` | Reads directory entries | | `remove(filename)` | `Remove` | `Status` | Deletes a file | | `mkdir(path, attrs)` | `MkDir` | `Status` | Creates a directory | | `rmdir(path)` | `RmDir` | `Status` | Removes a directory | | `realpath(path)` | `RealPath` | `Name` | Canonicalizes a path | | `stat(path)` | `Stat` | `Status` | Stat following symlinks | | `rename(oldpath, newpath)` | `Rename` | `Status` | Renames a file | | `readlink(path)` | `ReadLink` | `Name` | Reads symlink target | | `symlink(path, target)` | `Symlink` | `Status` | Creates a symbolic link | | `extended(request, data)` | `Extended` | `Packet` | Generic extension mechanism | | `limits()` | `Extended` | `LimitsExtension` | `limits@openssh.com` | | `hardlink(oldpath, newpath)` | `Extended` | `Status` | `hardlink@openssh.com` | | `fsync(handle)` | `Extended` | `Status` | `fsync@openssh.com` | | `statvfs(path)` | `Extended` | `Statvfs` | `statvfs@openssh.com` v2 | ### Response Classification Two macros handle the common response patterns: ```rust // For responses that return a specific packet type or error status macro_rules! into_with_status { ($result:ident, $packet:ident) => { match $result { Packet::$packet(p) => Ok(p), Packet::Status(p) => Err(p.into()), _ => Err(Error::UnexpectedPacket), } }; } // For responses that only return success/failure status macro_rules! into_status { ($result:ident) => { match $result { Packet::Status(status) if status.status_code == StatusCode::Ok => Ok(status), Packet::Status(status) => Err(status.into()), _ => Err(Error::UnexpectedPacket), } }; } ``` ### Drop Behavior `RawSftpSession` implements `Drop` to call `close_session()`, which sends an empty `Bytes` to signal the writer task to shut down the stream. ## SftpSession (High-Level Client) `SftpSession` wraps `RawSftpSession` in an `Arc` and provides `std::fs`-like methods: ```rust pub struct SftpSession { session: Arc, features: Features, } ``` ### Construction and Version Negotiation ```rust impl SftpSession { pub async fn new(stream: S) -> SftpResult; pub async fn new_with_config(stream: S, cfg: Config) -> SftpResult; } ``` Construction performs: 1. Creates `RawSftpSession` with config 2. Sends `SSH_FXP_INIT` via `session.init().await` 3. Checks for supported extensions in the version response 4. If `limits@openssh.com` extension is available, fetches and applies limits 5. Stores detected feature flags in `Features` ### Features Detection ```rust pub(crate) struct Features { pub hardlink: bool, // hardlink@openssh.com v1 pub fsync: bool, // fsync@openssh.com v1 pub statvfs: bool, // statvfs@openssh.com v2 pub limits: Option, // from limits@openssh.com pub max_concurrent_writes: usize, pub max_packet_len: u32, } ``` ### High-Level API | Method | Description | |--------|-------------| | `open(filename)` | Opens file read-only | | `create(filename)` | Creates/truncates file write-only | | `open_with_flags(filename, flags)` | Opens with specified `OpenFlags` | | `open_with_flags_and_attributes(filename, flags, attrs)` | Opens with flags and initial attrs | | `canonicalize(path)` | Resolves to absolute path via `realpath` | | `create_dir(path)` | Creates directory | | `read(path)` | Reads entire file to `Vec` | | `write(path, data)` | Writes data to file | | `try_exists(path)` | Checks existence (returns `Ok(false)` on `NoSuchFile`) | | `read_dir(path)` | Returns `ReadDir` iterator over directory entries | | `read_link(path)` | Reads symlink target | | `remove_dir(path)` | Removes directory | | `remove_file(filename)` | Removes file | | `rename(oldpath, newpath)` | Renames file/directory | | `symlink(path, target)` | Creates symbolic link | | `metadata(path)` | Gets `FileAttributes` via `stat` | | `set_metadata(path, metadata)` | Sets attributes via `setstat` | | `symlink_metadata(path)` | Gets `FileAttributes` via `lstat` | | `hardlink(oldpath, newpath)` | Creates hard link (returns `Ok(false)` if unsupported) | | `fs_info(path)` | Gets filesystem stats via `statvfs` (returns `Ok(None)` if unsupported) | | `set_timeout(secs)` | Sets request timeout | | `close()` | Closes the SFTP session stream | ## File (Async I/O) `client::fs::File` implements `AsyncRead`, `AsyncWrite`, and `AsyncSeek` for remote file I/O: ```rust pub struct File { session: Arc, handle: String, state: FileState, pos: u64, closed: bool, features: Features, } struct FileState { f_read: Option>>>>>>, f_seek: Option>>>>, f_flush: Option>>>>, f_shutdown: Option>>>>, write_acks: VecDeque>>, } ``` ### AsyncRead Implementation - Uses a state-machine pattern: `f_read` stores the in-progress read future - Calculates read length as `min(remaining_buffer, max_read_len)` where `max_read_len` comes from `Limits.read_len` or `max_packet_len - 9` (read overhead) - On `StatusCode::Eof`, returns `Ok(None)` → signals clean EOF to the caller - Advances `pos` by the data length on successful read ### AsyncWrite Implementation - Implements **pipelined writes** with configurable concurrency (`max_concurrent_writes`) - Uses `write_nowait()` to fire off write requests without awaiting each ACK - Stores `oneshot::Receiver`s in `write_acks: VecDeque` - When `write_acks.len() >= max_concurrent_writes`, polls the oldest pending ACK before accepting new writes - Write chunk size: `min(data.len(), max_write_len)` where `max_write_len` comes from `Limits.write_len` or `max_packet_len - 21 - handle_length` Overhead constants: ```rust const READ_OVERHEAD_LENGTH: u32 = 9; // type(1) + id(4) + data_len(4) const WRITE_OVERHEAD_LENGTH: u32 = 21; // type(1) + id(4) + handle_len(4) + offset(8) + data_len(4) ``` ### AsyncSeek Implementation - `SeekFrom::Start(pos)` — direct position set - `SeekFrom::Current(delta)` — arithmetic on current `pos` - `SeekFrom::End(delta)` — requires an `fstat()` round-trip to get file size, then computes position ### Flush and Shutdown - `poll_flush()`: Drains all pending write ACKs, then optionally calls `fsync` if the server supports it - `poll_shutdown()`: Drains pending ACKs, then sends `close()` on the handle and marks `closed = true` ### Drop Behavior If `closed` is not yet true, `File::drop()` sends a `close_nowait()` (fire-and-forget) to avoid blocking in a destructor. ### File Metadata Methods ```rust impl File { pub async fn metadata(&self) -> SftpResult; // fstat pub async fn set_metadata(&self, metadata: Metadata) -> SftpResult<()>; // fsetstat pub async fn sync_all(&self) -> SftpResult<()>; // fsync (no-op if unsupported) } ``` ## ReadDir and DirEntry ```rust pub struct ReadDir { parent: Arc, entries: VecDeque<(String, Metadata)>, } pub struct DirEntry { parent: Arc, file: String, metadata: Metadata, } ``` - `ReadDir` implements `Iterator` - Automatically filters out `.` and `..` entries - `DirEntry::path()` constructs full paths using POSIX-style `/` separator - `DirEntry::file_name()`, `file_type()`, `metadata()` provide accessors ## Runtime Abstraction (`client/runtime.rs`) The client runtime abstracts over native tokio and WASM environments: - **Native** (`not(target_arch = "wasm32")`): Uses `tokio::spawn` and `tokio::time::timeout` - **WASM** (`target_arch = "wasm32"`): Uses `wasm_bindgen_futures::spawn_local` and `gloo_timers::future::TimeoutFuture` The `spawn()` function returns a `JoinHandle` that wraps a `oneshot::Receiver`, providing a unified API for both platforms. ## Timeout Behavior Each request in `RawSftpSession` has a configurable timeout (default 10 seconds, set via `Config::request_timeout_secs`): ```rust async fn request(&self, id: Option, packet: Packet) -> SftpResult { let rx = self.send(id, packet)?; let timeout = self.timeout.load(Ordering::Relaxed); match runtime::timeout(Duration::from_secs(timeout), rx).await { Ok(Ok(result)) => result, Ok(Err(_)) => Err(Error::UnexpectedBehavior("sender dropped".into())), Err(error) => { self.requests.remove(&id); Err(error) // Error::Timeout } } } ``` On timeout, the pending request entry is cleaned up from the `DashMap`.