Implement body size limit middleware with dynamic config reload

Add body_limit middleware that reads limit from ArcSwap<DynamicConfig>
on each request, enabling runtime config changes without restart.
Uses Content-Length header check for fast rejection and http_body_util::Limited
for streaming body enforcement. Default limit: 100 MB (104,857,600 bytes).
Returns 413 Payload Too Large when exceeded.
This commit is contained in:
2026-06-11 13:02:59 +00:00
parent 994ce0fb66
commit 5fa0fc600e
5 changed files with 285 additions and 0 deletions

1
Cargo.lock generated
View File

@@ -1585,6 +1585,7 @@ dependencies = [
"axum",
"clap",
"futures",
"http-body-util",
"hyper",
"rcgen",
"reqwest",

View File

@@ -31,6 +31,7 @@ clap = { version = "=4.6.1", features = ["derive"] }
signal-hook = "=0.3.18"
anyhow = "=1.0.102"
thiserror = "=2.0.18"
http-body-util = "=0.1.3"
futures = "=0.3.31"
[dev-dependencies]

51
src/proxy/body_limit.rs Normal file
View File

@@ -0,0 +1,51 @@
use std::sync::Arc;
use arc_swap::ArcSwap;
use axum::body::Body;
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use http_body_util::Limited;
use crate::config::DynamicConfig;
pub const DEFAULT_BODY_LIMIT_BYTES: u64 = 104_857_600;
pub async fn body_limit_middleware(
State(config): State<Arc<ArcSwap<DynamicConfig>>>,
request: axum::extract::Request,
next: axum::middleware::Next,
) -> axum::response::Response {
let limit = config.load().body.limit_bytes;
let limit = if limit == 0 {
DEFAULT_BODY_LIMIT_BYTES
} else {
limit
};
if let Some(content_length) = request.headers().get("content-length") {
if let Ok(length_str) = content_length.to_str() {
if let Ok(length) = length_str.parse::<u64>() {
if length > limit {
return (StatusCode::PAYLOAD_TOO_LARGE, "Payload Too Large").into_response();
}
}
}
}
let (parts, body) = request.into_parts();
let limited_body = Limited::new(body, limit as usize);
let request = axum::extract::Request::from_parts(parts, Body::new(limited_body));
next.run(request).await
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_body_limit_is_100mb() {
assert_eq!(DEFAULT_BODY_LIMIT_BYTES, 104_857_600);
}
}

View File

@@ -1,3 +1,20 @@
pub mod body_limit;
pub mod error;
pub mod handler;
pub mod headers;
use std::sync::Arc;
use arc_swap::ArcSwap;
use crate::config::DynamicConfig;
pub fn router_with_body_limit(
router: axum::Router,
config: Arc<ArcSwap<DynamicConfig>>,
) -> axum::Router {
router.layer(axum::middleware::from_fn_with_state(
config,
body_limit::body_limit_middleware,
))
}

View File

@@ -1,5 +1,16 @@
mod helpers;
use std::sync::Arc;
use arc_swap::ArcSwap;
use axum::routing::post;
use axum::Router;
use reverse_proxy::config::dynamic_config::{
BodyConfig, DynamicConfig, RateLimitConfig, SiteConfig,
};
use reverse_proxy::proxy::body_limit::DEFAULT_BODY_LIMIT_BYTES;
use reverse_proxy::proxy::router_with_body_limit;
#[tokio::test]
async fn test_upstream_spawn_and_connect() {
let upstream = helpers::http_test_helper::TestUpstream::spawn_ok().await;
@@ -71,3 +82,207 @@ async fn test_health_check_disabled_when_port_zero() {
assert_ne!(addr.port(), 0);
handle.abort();
}
fn test_dynamic_config_with_limit(limit_bytes: u64) -> Arc<ArcSwap<DynamicConfig>> {
let config = DynamicConfig {
sites: vec![SiteConfig {
host: "test.local".to_string(),
upstream: "127.0.0.1:8080".to_string(),
upstream_scheme: "http".to_string(),
upstream_connect_timeout_secs: 5,
upstream_request_timeout_secs: 60,
}],
rate_limit: RateLimitConfig {
requests_per_second: 10,
burst: 20,
},
body: BodyConfig { limit_bytes },
};
Arc::new(ArcSwap::from_pointee(config))
}
async fn spawn_server_with_limit(limit_bytes: u64) -> helpers::http_test_helper::TestUpstream {
let config = test_dynamic_config_with_limit(limit_bytes);
helpers::http_test_helper::TestUpstream::spawn(|| {
let app = Router::new().route(
"/",
post(|body: axum::body::Body| async move {
let _ = body;
"ok"
}),
);
router_with_body_limit(app, config.clone())
})
.await
}
#[tokio::test]
async fn test_body_limit_rejects_oversized_request() {
let server = spawn_server_with_limit(100).await;
let client = reqwest::Client::new();
let large_body = vec![0u8; 200];
let resp = client
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
.body(large_body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
let body = resp.text().await.unwrap();
assert_eq!(body, "Payload Too Large");
let _ = server.shutdown_tx.send(());
}
#[tokio::test]
async fn test_body_limit_allows_request_within_limit() {
let server = spawn_server_with_limit(100).await;
let client = reqwest::Client::new();
let small_body = vec![0u8; 50];
let resp = client
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
.body(small_body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let _ = server.shutdown_tx.send(());
}
#[tokio::test]
async fn test_body_limit_allows_request_at_exact_limit() {
let server = spawn_server_with_limit(100).await;
let client = reqwest::Client::new();
let exact_body = vec![0u8; 100];
let resp = client
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
.body(exact_body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let _ = server.shutdown_tx.send(());
}
#[tokio::test]
async fn test_body_limit_content_length_header_rejection() {
let server = spawn_server_with_limit(100).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
.header("content-length", "200")
.body(vec![0u8; 200])
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
let body = resp.text().await.unwrap();
assert_eq!(body, "Payload Too Large");
let _ = server.shutdown_tx.send(());
}
#[tokio::test]
async fn test_body_limit_default_is_100mb() {
assert_eq!(DEFAULT_BODY_LIMIT_BYTES, 104_857_600);
}
#[tokio::test]
async fn test_body_limit_config_reload_changes_limit() {
let config = test_dynamic_config_with_limit(100);
let config_clone = config.clone();
let server = helpers::http_test_helper::TestUpstream::spawn(|| {
let app = Router::new().route(
"/",
post(|body: axum::body::Body| async move {
let _ = body;
"ok"
}),
);
router_with_body_limit(app, config_clone.clone())
})
.await;
let client = reqwest::Client::new();
let small_body = vec![0u8; 50];
let resp = client
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
.body(small_body.clone())
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let medium_body = vec![0u8; 150];
let resp = client
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
.body(medium_body.clone())
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
let new_config = DynamicConfig {
sites: vec![SiteConfig {
host: "test.local".to_string(),
upstream: "127.0.0.1:8080".to_string(),
upstream_scheme: "http".to_string(),
upstream_connect_timeout_secs: 5,
upstream_request_timeout_secs: 60,
}],
rate_limit: RateLimitConfig {
requests_per_second: 10,
burst: 20,
},
body: BodyConfig { limit_bytes: 200 },
};
config.store(Arc::new(new_config));
let resp = client
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
.body(medium_body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let large_body = vec![0u8; 300];
let resp = client
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
.body(large_body)
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::PAYLOAD_TOO_LARGE);
let _ = server.shutdown_tx.send(());
}
#[tokio::test]
async fn test_body_limit_empty_body_request_succeeds() {
let server = spawn_server_with_limit(100).await;
let client = reqwest::Client::new();
let resp = client
.post(format!("http://127.0.0.1:{}/", server.addr.port()))
.body("")
.send()
.await
.unwrap();
assert_eq!(resp.status(), reqwest::StatusCode::OK);
let _ = server.shutdown_tx.send(());
}