Merge feat/ops/body-size-limit into main
This commit is contained in:
1
.worktrees/feat/config/cli-parsing
Submodule
1
.worktrees/feat/config/cli-parsing
Submodule
Submodule .worktrees/feat/config/cli-parsing added at 2791070971
1
.worktrees/feat/ops/admin-socket
Submodule
1
.worktrees/feat/ops/admin-socket
Submodule
Submodule .worktrees/feat/ops/admin-socket added at f1cada010f
1
.worktrees/feat/ops/body-size-limit
Submodule
1
.worktrees/feat/ops/body-size-limit
Submodule
Submodule .worktrees/feat/ops/body-size-limit added at 5fa0fc600e
1
.worktrees/feat/proxy/error-responses
Submodule
1
.worktrees/feat/proxy/error-responses
Submodule
Submodule .worktrees/feat/proxy/error-responses added at 23ed5cde27
1
.worktrees/feat/proxy/headers-and-forwarding
Submodule
1
.worktrees/feat/proxy/headers-and-forwarding
Submodule
Submodule .worktrees/feat/proxy/headers-and-forwarding added at 2791070971
1
.worktrees/feat/tls/http-redirect
Submodule
1
.worktrees/feat/tls/http-redirect
Submodule
Submodule .worktrees/feat/tls/http-redirect added at 2791070971
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -1606,6 +1606,7 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"dashmap",
|
"dashmap",
|
||||||
"futures",
|
"futures",
|
||||||
|
"http-body-util",
|
||||||
"hyper",
|
"hyper",
|
||||||
"rcgen",
|
"rcgen",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ clap = { version = "=4.6.1", features = ["derive"] }
|
|||||||
signal-hook = "=0.3.18"
|
signal-hook = "=0.3.18"
|
||||||
anyhow = "=1.0.102"
|
anyhow = "=1.0.102"
|
||||||
thiserror = "=2.0.18"
|
thiserror = "=2.0.18"
|
||||||
|
http-body-util = "=0.1.3"
|
||||||
futures = "=0.3.31"
|
futures = "=0.3.31"
|
||||||
dashmap = "=6.1"
|
dashmap = "=6.1"
|
||||||
|
|
||||||
|
|||||||
51
src/proxy/body_limit.rs
Normal file
51
src/proxy/body_limit.rs
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arc_swap::ArcSwap;
|
||||||
|
use axum::body::Body;
|
||||||
|
use axum::extract::State;
|
||||||
|
use axum::http::StatusCode;
|
||||||
|
use axum::response::IntoResponse;
|
||||||
|
use http_body_util::Limited;
|
||||||
|
|
||||||
|
use crate::config::DynamicConfig;
|
||||||
|
|
||||||
|
pub const DEFAULT_BODY_LIMIT_BYTES: u64 = 104_857_600;
|
||||||
|
|
||||||
|
pub async fn body_limit_middleware(
|
||||||
|
State(config): State<Arc<ArcSwap<DynamicConfig>>>,
|
||||||
|
request: axum::extract::Request,
|
||||||
|
next: axum::middleware::Next,
|
||||||
|
) -> axum::response::Response {
|
||||||
|
let limit = config.load().body.limit_bytes;
|
||||||
|
let limit = if limit == 0 {
|
||||||
|
DEFAULT_BODY_LIMIT_BYTES
|
||||||
|
} else {
|
||||||
|
limit
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(content_length) = request.headers().get("content-length") {
|
||||||
|
if let Ok(length_str) = content_length.to_str() {
|
||||||
|
if let Ok(length) = length_str.parse::<u64>() {
|
||||||
|
if length > limit {
|
||||||
|
return (StatusCode::PAYLOAD_TOO_LARGE, "Payload Too Large").into_response();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let (parts, body) = request.into_parts();
|
||||||
|
let limited_body = Limited::new(body, limit as usize);
|
||||||
|
let request = axum::extract::Request::from_parts(parts, Body::new(limited_body));
|
||||||
|
|
||||||
|
next.run(request).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn default_body_limit_is_100mb() {
|
||||||
|
assert_eq!(DEFAULT_BODY_LIMIT_BYTES, 104_857_600);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,22 @@
|
|||||||
|
pub mod body_limit;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod handler;
|
pub mod handler;
|
||||||
pub mod headers;
|
pub mod headers;
|
||||||
|
|
||||||
pub use crate::config::dynamic_config::normalize_host;
|
pub use crate::config::dynamic_config::normalize_host;
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use arc_swap::ArcSwap;
|
||||||
|
|
||||||
|
use crate::config::DynamicConfig;
|
||||||
|
|
||||||
|
pub fn router_with_body_limit(
|
||||||
|
router: axum::Router,
|
||||||
|
config: Arc<ArcSwap<DynamicConfig>>,
|
||||||
|
) -> axum::Router {
|
||||||
|
router.layer(axum::middleware::from_fn_with_state(
|
||||||
|
config,
|
||||||
|
body_limit::body_limit_middleware,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,8 +4,13 @@ use std::sync::Arc;
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
use axum::routing::get;
|
use axum::routing::{get, post};
|
||||||
use axum::Router;
|
use axum::Router;
|
||||||
|
use reverse_proxy::config::dynamic_config::{
|
||||||
|
BodyConfig, DynamicConfig, RateLimitConfig, SiteConfig,
|
||||||
|
};
|
||||||
|
use reverse_proxy::proxy::body_limit::DEFAULT_BODY_LIMIT_BYTES;
|
||||||
|
use reverse_proxy::proxy::router_with_body_limit;
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_upstream_spawn_and_connect() {
|
async fn test_upstream_spawn_and_connect() {
|
||||||
@@ -248,3 +253,209 @@ async fn test_rate_limit_eviction_task() {
|
|||||||
|
|
||||||
handle.abort();
|
handle.abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn test_dynamic_config_with_limit(limit_bytes: u64) -> Arc<ArcSwap<DynamicConfig>> {
|
||||||
|
let config = DynamicConfig {
|
||||||
|
sites: vec![SiteConfig {
|
||||||
|
host: "test.local".to_string(),
|
||||||
|
upstream: "127.0.0.1:8080".to_string(),
|
||||||
|
upstream_scheme: "http".to_string(),
|
||||||
|
upstream_connect_timeout_secs: 5,
|
||||||
|
upstream_request_timeout_secs: 60,
|
||||||
|
}],
|
||||||
|
rate_limit: RateLimitConfig {
|
||||||
|
requests_per_second: 10,
|
||||||
|
burst: 20,
|
||||||
|
},
|
||||||
|
body: BodyConfig { limit_bytes },
|
||||||
|
routing_table: Default::default(),
|
||||||
|
};
|
||||||
|
Arc::new(ArcSwap::from_pointee(config))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn spawn_server_with_limit(limit_bytes: u64) -> helpers::http_test_helper::TestUpstream {
|
||||||
|
let config = test_dynamic_config_with_limit(limit_bytes);
|
||||||
|
helpers::http_test_helper::TestUpstream::spawn(|| {
|
||||||
|
let app = Router::new().route(
|
||||||
|
"/",
|
||||||
|
post(|body: axum::body::Body| async move {
|
||||||
|
let _ = body;
|
||||||
|
"ok"
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
router_with_body_limit(app, config.clone())
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_body_limit_rejects_oversized_request() {
|
||||||
|
let server = spawn_server_with_limit(100).await;
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
let large_body = vec![0u8; 200];
|
||||||
|
let resp = client
|
||||||
|
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
|
||||||
|
.body(large_body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
|
||||||
|
let body = resp.text().await.unwrap();
|
||||||
|
assert_eq!(body, "Payload Too Large");
|
||||||
|
|
||||||
|
let _ = server.shutdown_tx.send(());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_body_limit_allows_request_within_limit() {
|
||||||
|
let server = spawn_server_with_limit(100).await;
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
let small_body = vec![0u8; 50];
|
||||||
|
let resp = client
|
||||||
|
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
|
||||||
|
.body(small_body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||||
|
|
||||||
|
let _ = server.shutdown_tx.send(());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_body_limit_allows_request_at_exact_limit() {
|
||||||
|
let server = spawn_server_with_limit(100).await;
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
let exact_body = vec![0u8; 100];
|
||||||
|
let resp = client
|
||||||
|
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
|
||||||
|
.body(exact_body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||||
|
|
||||||
|
let _ = server.shutdown_tx.send(());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_body_limit_content_length_header_rejection() {
|
||||||
|
let server = spawn_server_with_limit(100).await;
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
let resp = client
|
||||||
|
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
|
||||||
|
.header("content-length", "200")
|
||||||
|
.body(vec![0u8; 200])
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
|
||||||
|
let body = resp.text().await.unwrap();
|
||||||
|
assert_eq!(body, "Payload Too Large");
|
||||||
|
|
||||||
|
let _ = server.shutdown_tx.send(());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_body_limit_default_is_100mb() {
|
||||||
|
assert_eq!(DEFAULT_BODY_LIMIT_BYTES, 104_857_600);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_body_limit_config_reload_changes_limit() {
|
||||||
|
let config = test_dynamic_config_with_limit(100);
|
||||||
|
let config_clone = config.clone();
|
||||||
|
|
||||||
|
let server = helpers::http_test_helper::TestUpstream::spawn(|| {
|
||||||
|
let app = Router::new().route(
|
||||||
|
"/",
|
||||||
|
post(|body: axum::body::Body| async move {
|
||||||
|
let _ = body;
|
||||||
|
"ok"
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
router_with_body_limit(app, config_clone.clone())
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
let small_body = vec![0u8; 50];
|
||||||
|
let resp = client
|
||||||
|
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
|
||||||
|
.body(small_body.clone())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||||
|
|
||||||
|
let medium_body = vec![0u8; 150];
|
||||||
|
let resp = client
|
||||||
|
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
|
||||||
|
.body(medium_body.clone())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
|
||||||
|
|
||||||
|
let new_config = DynamicConfig {
|
||||||
|
sites: vec![SiteConfig {
|
||||||
|
host: "test.local".to_string(),
|
||||||
|
upstream: "127.0.0.1:8080".to_string(),
|
||||||
|
upstream_scheme: "http".to_string(),
|
||||||
|
upstream_connect_timeout_secs: 5,
|
||||||
|
upstream_request_timeout_secs: 60,
|
||||||
|
}],
|
||||||
|
rate_limit: RateLimitConfig {
|
||||||
|
requests_per_second: 10,
|
||||||
|
burst: 20,
|
||||||
|
},
|
||||||
|
body: BodyConfig { limit_bytes: 200 },
|
||||||
|
routing_table: Default::default(),
|
||||||
|
};
|
||||||
|
config.store(Arc::new(new_config));
|
||||||
|
|
||||||
|
let resp = client
|
||||||
|
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
|
||||||
|
.body(medium_body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||||
|
|
||||||
|
let large_body = vec![0u8; 300];
|
||||||
|
let resp = client
|
||||||
|
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
|
||||||
|
.body(large_body)
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
|
||||||
|
|
||||||
|
let _ = server.shutdown_tx.send(());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_body_limit_empty_body_request_succeeds() {
|
||||||
|
let server = spawn_server_with_limit(100).await;
|
||||||
|
let client = reqwest::Client::new();
|
||||||
|
|
||||||
|
let resp = client
|
||||||
|
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
|
||||||
|
.body("")
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(resp.status(), reqwest::StatusCode::OK);
|
||||||
|
|
||||||
|
let _ = server.shutdown_tx.send(());
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user