diff --git a/Cargo.lock b/Cargo.lock index 5231562..2420b84 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,6 +216,7 @@ dependencies = [ "getrandom", "hashbrown 0.12.1", "hex", + "libc", "log", "mimalloc", "mio", @@ -498,9 +499,9 @@ checksum = "904dfeac50f3cdaba28fc6f57fdcddb75f49ed61346676a78c4ffe55877802fd" [[package]] name = "base64ct" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dea908e7347a8c64e378c17e30ef880ad73e3b4498346b055c2c00ea342f3179" +checksum = "3bdca834647821e0b13d9539a8634eb62d3501b6b6c2cec1722786ee6671b851" [[package]] name = "bendy" @@ -800,9 +801,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.9" +version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ff1f980957787286a554052d03c7aee98d99cc32e09f6d45f0a814133c87978" +checksum = "7d82ee10ce34d7bc12c2122495e7593a9c41347ecdd64185af4ecf72cb1a7f83" dependencies = [ "cfg-if", "once_cell", @@ -2481,9 +2482,9 @@ checksum = "eb703cfe953bccee95685111adeedb76fabe4e97549a58d16f03ea7b9367bb32" [[package]] name = "smallvec" -version = "1.8.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" +checksum = "cc88c725d61fc6c3132893370cac4a0200e3fedf5da8331c570664b1987f5ca2" [[package]] name = "smartstring" @@ -2920,9 +2921,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7709595b8878a4965ce5e87ebf880a7d39c9afc6837721b21a5a816a8117d921" +checksum = "7b7358be39f2f274f322d2aaed611acc57f382e8eb1e5b48cb9ae30933495ce7" dependencies = [ "once_cell", ] @@ -2972,9 +2973,9 @@ checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" [[package]] name = "unicode-normalization" -version = "0.1.19" +version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d54590932941a9e9266f0832deed84ebe1bf2e4c9e4a3554d393d18f5e854bf9" +checksum = "81dee68f85cab8cf68dec42158baf3a79a1cdc065a8b103025965d6ccb7f6cbd" dependencies = [ "tinyvec", ] diff --git a/TODO.md b/TODO.md index d56b2ce..c34f091 100644 --- a/TODO.md +++ b/TODO.md @@ -2,16 +2,12 @@ ## High priority -* udp: add option for disallowing connect requests with localhost as sender -* udp: add response buffering on send failure with configuration size +* udp: add IP blocklist, which would be more flexible than just adding option + for disallowing requests (claiming to be) from localhost ## Medium priority * rename request workers to swarm workers - -* save space by making ValidUntil just contain u32 with seconds, measured - some Instant created at application start - * quit whole program if any thread panics * But it would be nice not to panic in workers, but to return errors instead. Once JoinHandle::is_finished is available in stable Rust (#90470), an diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml index c876fad..266d415 100644 --- a/aquatic/Cargo.toml +++ b/aquatic/Cargo.toml @@ -4,7 +4,7 @@ version = "0.2.0" authors = ["Joakim Frostegård "] edition = "2021" license = "Apache-2.0" -description = "Blazingly fast, multi-threaded BitTorrent tracker (UDP, HTTP, WebTorrent)" +description = "High-performance open BitTorrent tracker (UDP, HTTP, WebTorrent)" repository = "https://github.com/greatest-ape/aquatic" keywords = ["bittorrent", "torrent", "webtorrent"] readme = "../README.md" diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 805c722..385efaa 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -4,7 +4,7 @@ version = "0.2.0" authors = ["Joakim Frostegård "] edition = "2021" license = "Apache-2.0" -description = "Blazingly fast, multi-threaded HTTP BitTorrent tracker" +description = "High-performance open BitTorrent tracker (HTTP over TLS)" repository = "https://github.com/greatest-ape/aquatic" keywords = ["http", "server", "peer-to-peer", "torrent", "bittorrent"] readme = "../README.md" diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index e8dcb7b..8088efc 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -4,7 +4,7 @@ version = "0.2.0" authors = ["Joakim Frostegård "] edition = "2021" license = "Apache-2.0" -description = "Blazingly fast, multi-threaded UDP BitTorrent tracker" +description = "High-performance open UDP BitTorrent tracker" repository = "https://github.com/greatest-ape/aquatic" keywords = ["udp", "server", "peer-to-peer", "torrent", "bittorrent"] readme = "../README.md" @@ -31,6 +31,7 @@ crossbeam-channel = "0.5" getrandom = "0.2" hashbrown = { version = "0.12", default-features = false } hex = "0.4" +libc = "0.2" log = "0.4" mimalloc = { version = "0.1", default-features = false } mio = { version = "0.8", features = ["net", "os-poll"] } diff --git a/aquatic_udp/src/config.rs b/aquatic_udp/src/config.rs index 7823ed8..1214eee 100644 --- a/aquatic_udp/src/config.rs +++ b/aquatic_udp/src/config.rs @@ -86,6 +86,12 @@ pub struct NetworkConfig { pub socket_recv_buffer_size: usize, pub poll_event_capacity: usize, pub poll_timeout_ms: u64, + /// Store this many responses at most for retryin on send failure + /// + /// Useful on operating systems that do not provide an udp send buffer, + /// such as FreeBSD. Setting the value to zero disables resending + /// functionality. + pub resend_buffer_max_len: usize, } impl NetworkConfig { @@ -105,6 +111,7 @@ impl Default for NetworkConfig { socket_recv_buffer_size: 4096 * 128, poll_event_capacity: 4096, poll_timeout_ms: 50, + resend_buffer_max_len: 0, } } } diff --git a/aquatic_udp/src/workers/socket/mod.rs b/aquatic_udp/src/workers/socket/mod.rs index d53d8b4..bc48932 100644 --- a/aquatic_udp/src/workers/socket/mod.rs +++ b/aquatic_udp/src/workers/socket/mod.rs @@ -23,6 +23,8 @@ use requests::read_requests; use responses::send_responses; use storage::PendingScrapeResponseSlab; +use self::responses::send_responses_with_resends; + pub fn run_socket_worker( _sentinel: PanicSentinel, state: State, @@ -50,6 +52,7 @@ pub fn run_socket_worker( let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); + let mut resend_buffer: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms); @@ -60,6 +63,7 @@ pub fn run_socket_worker( let mut last_pending_scrape_cleaning = Instant::now(); let mut iter_counter = 0usize; + let response_resending_active = config.network.resend_buffer_max_len > 0; loop { poll.poll(&mut events, Some(poll_timeout)) @@ -84,15 +88,28 @@ pub fn run_socket_worker( } } - send_responses( - &state, - &config, - &mut socket, - &mut buffer, - &response_receiver, - &mut pending_scrape_responses, - local_responses.drain(..), - ); + if response_resending_active { + send_responses_with_resends( + &state, + &config, + &mut socket, + &mut buffer, + &response_receiver, + &mut pending_scrape_responses, + local_responses.drain(..), + &mut resend_buffer, + ); + } else { + send_responses( + &state, + &config, + &mut socket, + &mut buffer, + &response_receiver, + &mut pending_scrape_responses, + local_responses.drain(..), + ); + } // Run periodic ValidUntil updates and state cleaning if iter_counter % 256 == 0 { diff --git a/aquatic_udp/src/workers/socket/responses.rs b/aquatic_udp/src/workers/socket/responses.rs index 7bb5510..998380a 100644 --- a/aquatic_udp/src/workers/socket/responses.rs +++ b/aquatic_udp/src/workers/socket/responses.rs @@ -1,8 +1,9 @@ -use std::io::Cursor; +use std::io::{Cursor, ErrorKind}; use std::sync::atomic::Ordering; use std::vec::Drain; use crossbeam_channel::Receiver; +use libc::ENOBUFS; use mio::net::UdpSocket; use aquatic_common::CanonicalSocketAddr; @@ -23,7 +24,7 @@ pub fn send_responses( local_responses: Drain<(Response, CanonicalSocketAddr)>, ) { for (response, addr) in local_responses { - send_response(state, config, socket, buffer, response, addr); + let _ = send_response(state, config, socket, buffer, &response, addr); } for (response, addr) in response_receiver.try_iter() { @@ -36,19 +37,76 @@ pub fn send_responses( }; if let Some(response) = opt_response { - send_response(state, config, socket, buffer, response, addr); + let _ = send_response(state, config, socket, buffer, &response, addr); } } } +pub fn send_responses_with_resends( + state: &State, + config: &Config, + socket: &mut UdpSocket, + buffer: &mut [u8], + response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, + pending_scrape_responses: &mut PendingScrapeResponseSlab, + local_responses: Drain<(Response, CanonicalSocketAddr)>, + resend_buffer: &mut Vec<(Response, CanonicalSocketAddr)>, +) { + let resend_buffer_max_len = config.network.resend_buffer_max_len; + + for (response, addr) in resend_buffer.drain(..) { + let _ = send_response(state, config, socket, buffer, &response, addr); + } + + for (response, addr) in local_responses { + match send_response(state, config, socket, buffer, &response, addr) { + Err(err) if error_should_cause_resend(&err) => { + if resend_buffer.len() < resend_buffer_max_len { + resend_buffer.push((response, addr)); + } else { + ::log::warn!("response resend buffer full, dropping response"); + } + } + _ => (), + } + } + + for (response, addr) in response_receiver.try_iter() { + let opt_response = match response { + ConnectedResponse::Scrape(r) => pending_scrape_responses + .add_and_get_finished(r) + .map(Response::Scrape), + ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), + ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), + }; + + if let Some(response) = opt_response { + match send_response(state, config, socket, buffer, &response, addr) { + Err(err) if error_should_cause_resend(&err) => { + if resend_buffer.len() < resend_buffer_max_len { + resend_buffer.push((response, addr)); + } else { + ::log::warn!("response resend buffer full, dropping response"); + } + } + _ => (), + } + } + } +} + +fn error_should_cause_resend(err: &::std::io::Error) -> bool { + (err.raw_os_error() == Some(ENOBUFS)) | (err.kind() == ErrorKind::WouldBlock) +} + fn send_response( state: &State, config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - response: Response, + response: &Response, addr: CanonicalSocketAddr, -) { +) -> std::io::Result<()> { let mut cursor = Cursor::new(buffer); let canonical_addr_is_ipv4 = addr.is_ipv4(); @@ -90,15 +148,21 @@ fn send_response( stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); } } + + Ok(()) } - Ok(_) => {} + Ok(_) => Ok(()), Err(err) => { - ::log::warn!("send_to error: {:#}", err); + ::log::warn!("Sending response to {} failed: {:#}", addr, err); + + Err(err) } } } Err(err) => { - ::log::error!("Response::write error: {:?}", err); + ::log::error!("Converting response to bytes failed: {:#}", err); + + Err(err) } } } diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 16d54a3..c0d3724 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -4,7 +4,7 @@ version = "0.2.0" authors = ["Joakim Frostegård "] edition = "2021" license = "Apache-2.0" -description = "Blazingly fast, multi-threaded WebTorrent tracker" +description = "High-performance open WebTorrent tracker" repository = "https://github.com/greatest-ape/aquatic" keywords = ["webtorrent", "websocket", "peer-to-peer", "torrent", "bittorrent"] readme = "../README.md"