Implement graceful shutdown for listeners, admin socket, eviction task, and ACME

- Replace handle.abort() for HTTPS server tasks with timeout-based join,
  allowing in-flight requests to drain before forceful shutdown
- Add shutdown_rx to start_admin_socket with tokio::select! for clean
  accept loop exit and Unix socket file cleanup on shutdown
- Add shutdown_rx to start_eviction_task with tokio::select! for
  cancellable eviction loop
- Add shutdown channel to spawn_acme_state for cancellable ACME state
  machine via tokio::select!
- Pass Arc<GracefulShutdown> through setup_tls to ACME state machine
- Move GracefulShutdown creation before admin socket and TLS setup
- Update integration test for new start_eviction_task signature
This commit is contained in:
2026-06-12 04:59:18 +00:00
parent abc8a44134
commit 280fe782a1
6 changed files with 177 additions and 113 deletions

View File

@@ -9,6 +9,8 @@ use tokio::net::UnixListener;
use tokio::sync::Mutex;
use tracing::{info, warn};
use crate::shutdown::GracefulShutdown;
use crate::config::ConfigReloadHandle;
#[derive(Debug, thiserror::Error)]
@@ -70,7 +72,10 @@ impl AdminSocket {
}
}
pub async fn start_admin_socket(admin_socket: Arc<AdminSocket>) -> Result<(), AdminSocketError> {
pub async fn start_admin_socket(
admin_socket: Arc<AdminSocket>,
shutdown: Arc<GracefulShutdown>,
) -> Result<(), AdminSocketError> {
if admin_socket.socket_path.is_empty() {
info!("admin socket disabled (empty path)");
return Err(AdminSocketError::Disabled);
@@ -96,19 +101,41 @@ pub async fn start_admin_socket(admin_socket: Arc<AdminSocket>) -> Result<(), Ad
info!("admin socket listening on {}", socket_path);
let mut shutdown_rx = shutdown.subscribe();
loop {
match listener.accept().await {
Ok((stream, _addr)) => {
let admin_socket = admin_socket.clone();
tokio::spawn(async move {
handle_connection(stream, admin_socket).await;
});
tokio::select! {
result = listener.accept() => {
match result {
Ok((stream, _addr)) => {
let admin_socket = admin_socket.clone();
tokio::spawn(async move {
handle_connection(stream, admin_socket).await;
});
}
Err(e) => {
warn!("failed to accept admin socket connection: {}", e);
}
}
}
Err(e) => {
warn!("failed to accept admin socket connection: {}", e);
_ = shutdown_rx.changed() => {
info!("admin socket shutting down");
break;
}
}
}
cleanup_socket_file(socket_path).await;
Ok(())
}
async fn cleanup_socket_file(path: &str) {
if Path::new(path).exists() {
if let Err(e) = tokio::fs::remove_file(path).await {
warn!("failed to remove admin socket file {}: {}", path, e);
}
}
}
async fn cleanup_stale_socket(path: &str) -> Result<(), AdminSocketError> {
@@ -507,7 +534,7 @@ upstream = "127.0.0.1:8080"
dir.path().join("config.toml").to_string_lossy().to_string(),
));
let result = start_admin_socket(admin_socket).await;
let result = start_admin_socket(admin_socket, Arc::new(GracefulShutdown::new(30))).await;
assert!(matches!(result, Err(AdminSocketError::Disabled)));
}
@@ -530,7 +557,7 @@ upstream = "127.0.0.1:8080"
dir.path().join("config.toml").to_string_lossy().to_string(),
));
let result = start_admin_socket(admin_socket).await;
let result = start_admin_socket(admin_socket, Arc::new(GracefulShutdown::new(30))).await;
assert!(matches!(result, Err(AdminSocketError::SocketInUse(_))));
}