From 5cfd270ab7d3f03c533332abb1ea8c8b8a480a9b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Tue, 28 Jun 2022 00:56:29 +0200 Subject: [PATCH] udp: add optional resending buffer for responses that failed to send --- Cargo.lock | 1 + TODO.md | 1 - aquatic_udp/Cargo.toml | 1 + aquatic_udp/src/config.rs | 7 ++ aquatic_udp/src/workers/socket/mod.rs | 35 +++++++--- aquatic_udp/src/workers/socket/responses.rs | 76 +++++++++++++++++++-- 6 files changed, 105 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fe1caa2..2420b84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,6 +216,7 @@ dependencies = [ "getrandom", "hashbrown 0.12.1", "hex", + "libc", "log", "mimalloc", "mio", diff --git a/TODO.md b/TODO.md index 57d6924..c34f091 100644 --- a/TODO.md +++ b/TODO.md @@ -4,7 +4,6 @@ * udp: add IP blocklist, which would be more flexible than just adding option for disallowing requests (claiming to be) from localhost -* udp: add response buffering on send failure with configurable size ## Medium priority diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 5713058..8088efc 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -31,6 +31,7 @@ crossbeam-channel = "0.5" getrandom = "0.2" hashbrown = { version = "0.12", default-features = false } hex = "0.4" +libc = "0.2" log = "0.4" mimalloc = { version = "0.1", default-features = false } mio = { version = "0.8", features = ["net", "os-poll"] } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 7823ed8..1214eee 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -86,6 +86,12 @@ pub struct NetworkConfig { pub socket_recv_buffer_size: usize, pub poll_event_capacity: usize, pub poll_timeout_ms: u64, + /// Store this many responses at most for retryin on send failure + /// + /// Useful on operating systems that do not provide an udp send buffer, + /// such as FreeBSD. Setting the value to zero disables resending + /// functionality. + pub resend_buffer_max_len: usize, } impl NetworkConfig { @@ -105,6 +111,7 @@ impl Default for NetworkConfig { socket_recv_buffer_size: 4096 * 128, poll_event_capacity: 4096, poll_timeout_ms: 50, + resend_buffer_max_len: 0, } } } diff --git a/aquatic_udp/src/workers/socket/mod.rs b/aquatic_udp/src/workers/socket/mod.rs index d53d8b4..bc48932 100644 --- a/aquatic_udp/src/workers/socket/mod.rs +++ b/aquatic_udp/src/workers/socket/mod.rs @@ -23,6 +23,8 @@ use requests::read_requests; use responses::send_responses; use storage::PendingScrapeResponseSlab; +use self::responses::send_responses_with_resends; + pub fn run_socket_worker( _sentinel: PanicSentinel, state: State, @@ -50,6 +52,7 @@ pub fn run_socket_worker( let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); + let mut resend_buffer: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms); @@ -60,6 +63,7 @@ pub fn run_socket_worker( let mut last_pending_scrape_cleaning = Instant::now(); let mut iter_counter = 0usize; + let response_resending_active = config.network.resend_buffer_max_len > 0; loop { poll.poll(&mut events, Some(poll_timeout)) @@ -84,15 +88,28 @@ pub fn run_socket_worker( } } - send_responses( - &state, - &config, - &mut socket, - &mut buffer, - &response_receiver, - &mut pending_scrape_responses, - local_responses.drain(..), - ); + if response_resending_active { + send_responses_with_resends( + &state, + &config, + &mut socket, + &mut buffer, + &response_receiver, + &mut pending_scrape_responses, + local_responses.drain(..), + &mut resend_buffer, + ); + } else { + send_responses( + &state, + &config, + &mut socket, + &mut buffer, + &response_receiver, + &mut pending_scrape_responses, + local_responses.drain(..), + ); + } // Run periodic ValidUntil updates and state cleaning if iter_counter % 256 == 0 { diff --git a/aquatic_udp/src/workers/socket/responses.rs b/aquatic_udp/src/workers/socket/responses.rs index 7bb5510..ef6fbcc 100644 --- a/aquatic_udp/src/workers/socket/responses.rs +++ b/aquatic_udp/src/workers/socket/responses.rs @@ -1,8 +1,9 @@ -use std::io::Cursor; +use std::io::{Cursor, ErrorKind}; use std::sync::atomic::Ordering; use std::vec::Drain; use crossbeam_channel::Receiver; +use libc::ENOBUFS; use mio::net::UdpSocket; use aquatic_common::CanonicalSocketAddr; @@ -23,7 +24,7 @@ pub fn send_responses( local_responses: Drain<(Response, CanonicalSocketAddr)>, ) { for (response, addr) in local_responses { - send_response(state, config, socket, buffer, response, addr); + let _ = send_response(state, config, socket, buffer, &response, addr); } for (response, addr) in response_receiver.try_iter() { @@ -36,19 +37,76 @@ pub fn send_responses( }; if let Some(response) = opt_response { - send_response(state, config, socket, buffer, response, addr); + let _ = send_response(state, config, socket, buffer, &response, addr); } } } +pub fn send_responses_with_resends( + state: &State, + config: &Config, + socket: &mut UdpSocket, + buffer: &mut [u8], + response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, + pending_scrape_responses: &mut PendingScrapeResponseSlab, + local_responses: Drain<(Response, CanonicalSocketAddr)>, + resend_buffer: &mut Vec<(Response, CanonicalSocketAddr)>, +) { + let resend_buffer_max_len = config.network.resend_buffer_max_len; + + for (response, addr) in resend_buffer.drain(..) { + let _ = send_response(state, config, socket, buffer, &response, addr); + } + + for (response, addr) in local_responses { + match send_response(state, config, socket, buffer, &response, addr) { + Err(err) if error_should_cause_resend(&err) => { + if resend_buffer.len() < resend_buffer_max_len { + resend_buffer.push((response, addr)); + } else { + ::log::warn!("response resend buffer full, dropping response"); + } + } + _ => (), + } + } + + for (response, addr) in response_receiver.try_iter() { + let opt_response = match response { + ConnectedResponse::Scrape(r) => pending_scrape_responses + .add_and_get_finished(r) + .map(Response::Scrape), + ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), + ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), + }; + + if let Some(response) = opt_response { + match send_response(state, config, socket, buffer, &response, addr) { + Err(err) if error_should_cause_resend(&err) => { + if resend_buffer.len() < resend_buffer_max_len { + resend_buffer.push((response, addr)); + } else { + ::log::warn!("response resend buffer full, dropping response"); + } + } + _ => (), + } + } + } +} + +fn error_should_cause_resend(err: &::std::io::Error) -> bool { + (err.raw_os_error() == Some(ENOBUFS)) | (err.kind() == ErrorKind::WouldBlock) +} + fn send_response( state: &State, config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - response: Response, + response: &Response, addr: CanonicalSocketAddr, -) { +) -> std::io::Result<()> { let mut cursor = Cursor::new(buffer); let canonical_addr_is_ipv4 = addr.is_ipv4(); @@ -90,15 +148,21 @@ fn send_response( stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); } } + + Ok(()) } - Ok(_) => {} + Ok(_) => Ok(()), Err(err) => { ::log::warn!("send_to error: {:#}", err); + + Err(err) } } } Err(err) => { ::log::error!("Response::write error: {:?}", err); + + Err(err) } } }