diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 8088efc..51b2ef4 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -8,6 +8,7 @@ description = "High-performance open UDP BitTorrent tracker" repository = "https://github.com/greatest-ape/aquatic" keywords = ["udp", "server", "peer-to-peer", "torrent", "bittorrent"] readme = "../README.md" +rust-version = "1.62" [lib] name = "aquatic_udp" diff --git a/aquatic_udp/src/workers/socket/mod.rs b/aquatic_udp/src/workers/socket/mod.rs index bc48932..6d84352 100644 --- a/aquatic_udp/src/workers/socket/mod.rs +++ b/aquatic_udp/src/workers/socket/mod.rs @@ -23,8 +23,6 @@ use requests::read_requests; use responses::send_responses; use storage::PendingScrapeResponseSlab; -use self::responses::send_responses_with_resends; - pub fn run_socket_worker( _sentinel: PanicSentinel, state: State, @@ -52,7 +50,7 @@ pub fn run_socket_worker( let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); - let mut resend_buffer: Vec<(Response, CanonicalSocketAddr)> = Vec::new(); + let mut opt_resend_buffer = (config.network.resend_buffer_max_len > 0).then_some(Vec::new()); let poll_timeout = Duration::from_millis(config.network.poll_timeout_ms); @@ -63,7 +61,6 @@ pub fn run_socket_worker( let mut last_pending_scrape_cleaning = Instant::now(); let mut iter_counter = 0usize; - let response_resending_active = config.network.resend_buffer_max_len > 0; loop { poll.poll(&mut events, Some(poll_timeout)) @@ -88,28 +85,16 @@ pub fn run_socket_worker( } } - if response_resending_active { - send_responses_with_resends( - &state, - &config, - &mut socket, - &mut buffer, - &response_receiver, - &mut pending_scrape_responses, - local_responses.drain(..), - &mut resend_buffer, - ); - } else { - send_responses( - &state, - &config, - &mut socket, - &mut buffer, - &response_receiver, - &mut pending_scrape_responses, - local_responses.drain(..), - ); - } + send_responses( + &state, + &config, + &mut socket, + &mut buffer, + &response_receiver, + &mut pending_scrape_responses, + local_responses.drain(..), + &mut opt_resend_buffer, + ); // Run periodic ValidUntil updates and state cleaning if iter_counter % 256 == 0 { diff --git a/aquatic_udp/src/workers/socket/responses.rs b/aquatic_udp/src/workers/socket/responses.rs index 998380a..92a7b5c 100644 --- a/aquatic_udp/src/workers/socket/responses.rs +++ b/aquatic_udp/src/workers/socket/responses.rs @@ -22,9 +22,24 @@ pub fn send_responses( response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, pending_scrape_responses: &mut PendingScrapeResponseSlab, local_responses: Drain<(Response, CanonicalSocketAddr)>, + opt_resend_buffer: &mut Option>, ) { + if let Some(resend_buffer) = opt_resend_buffer { + for (response, addr) in resend_buffer.drain(..) { + send_response(state, config, socket, buffer, response, addr, &mut None); + } + } + for (response, addr) in local_responses { - let _ = send_response(state, config, socket, buffer, &response, addr); + send_response( + state, + config, + socket, + buffer, + response, + addr, + opt_resend_buffer, + ); } for (response, addr) in response_receiver.try_iter() { @@ -37,91 +52,44 @@ pub fn send_responses( }; if let Some(response) = opt_response { - let _ = send_response(state, config, socket, buffer, &response, addr); + send_response( + state, + config, + socket, + buffer, + response, + addr, + opt_resend_buffer, + ); } } } -pub fn send_responses_with_resends( - state: &State, - config: &Config, - socket: &mut UdpSocket, - buffer: &mut [u8], - response_receiver: &Receiver<(ConnectedResponse, CanonicalSocketAddr)>, - pending_scrape_responses: &mut PendingScrapeResponseSlab, - local_responses: Drain<(Response, CanonicalSocketAddr)>, - resend_buffer: &mut Vec<(Response, CanonicalSocketAddr)>, -) { - let resend_buffer_max_len = config.network.resend_buffer_max_len; - - for (response, addr) in resend_buffer.drain(..) { - let _ = send_response(state, config, socket, buffer, &response, addr); - } - - for (response, addr) in local_responses { - match send_response(state, config, socket, buffer, &response, addr) { - Err(err) if error_should_cause_resend(&err) => { - if resend_buffer.len() < resend_buffer_max_len { - resend_buffer.push((response, addr)); - } else { - ::log::warn!("response resend buffer full, dropping response"); - } - } - _ => (), - } - } - - for (response, addr) in response_receiver.try_iter() { - let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses - .add_and_get_finished(r) - .map(Response::Scrape), - ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), - ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), - }; - - if let Some(response) = opt_response { - match send_response(state, config, socket, buffer, &response, addr) { - Err(err) if error_should_cause_resend(&err) => { - if resend_buffer.len() < resend_buffer_max_len { - resend_buffer.push((response, addr)); - } else { - ::log::warn!("response resend buffer full, dropping response"); - } - } - _ => (), - } - } - } -} - -fn error_should_cause_resend(err: &::std::io::Error) -> bool { - (err.raw_os_error() == Some(ENOBUFS)) | (err.kind() == ErrorKind::WouldBlock) -} - fn send_response( state: &State, config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - response: &Response, - addr: CanonicalSocketAddr, -) -> std::io::Result<()> { + response: Response, + canonical_addr: CanonicalSocketAddr, + resend_buffer: &mut Option>, +) { let mut cursor = Cursor::new(buffer); - let canonical_addr_is_ipv4 = addr.is_ipv4(); - - let addr = if config.network.address.is_ipv4() { - addr.get_ipv4() - .expect("found peer ipv6 address while running bound to ipv4 address") - } else { - addr.get_ipv6_mapped() - }; - match response.write(&mut cursor) { Ok(()) => { let amt = cursor.position() as usize; + let canonical_addr_is_ipv4 = canonical_addr.is_ipv4(); + + let addr = if config.network.address.is_ipv4() { + canonical_addr + .get_ipv4() + .expect("found peer ipv6 address while running bound to ipv4 address") + } else { + canonical_addr.get_ipv6_mapped() + }; + match socket.send_to(&cursor.get_ref()[..amt], addr) { Ok(amt) if config.statistics.active() => { let stats = if canonical_addr_is_ipv4 { @@ -148,21 +116,29 @@ fn send_response( stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); } } - - Ok(()) } - Ok(_) => Ok(()), - Err(err) => { - ::log::warn!("Sending response to {} failed: {:#}", addr, err); + Ok(_) => (), + Err(err) => match resend_buffer { + Some(resend_buffer) + if (err.raw_os_error() == Some(ENOBUFS)) + || (err.kind() == ErrorKind::WouldBlock) => + { + if resend_buffer.len() < config.network.resend_buffer_max_len { + ::log::info!("Adding response to resend queue, since sending it to {} failed with: {:#}", addr, err); - Err(err) - } + resend_buffer.push((response, canonical_addr)); + } else { + ::log::warn!("Response resend buffer full, dropping response"); + } + } + _ => { + ::log::warn!("Sending response to {} failed: {:#}", addr, err); + } + }, } } Err(err) => { ::log::error!("Converting response to bytes failed: {:#}", err); - - Err(err) } } }