diff --git a/crates/udp/src/workers/socket/mio.rs b/crates/udp/src/workers/socket/mio.rs index f3fb0ca..26ab5be 100644 --- a/crates/udp/src/workers/socket/mio.rs +++ b/crates/udp/src/workers/socket/mio.rs @@ -130,7 +130,7 @@ impl SocketWorker { // If resend buffer is enabled, send any responses in it if let Some(resend_buffer) = self.opt_resend_buffer.as_mut() { for (addr, response) in resend_buffer.drain(..) { - Self::send_response( + send_response( &self.config, &self.statistics, &mut self.socket, @@ -241,7 +241,7 @@ impl SocketWorker { message: err.into(), }; - Self::send_response( + send_response( &self.config, &self.statistics, &mut self.socket, @@ -288,7 +288,7 @@ impl SocketWorker { transaction_id: request.transaction_id, }; - Self::send_response( + send_response( &self.config, &self.statistics, &mut self.socket, @@ -324,7 +324,7 @@ impl SocketWorker { message: "Info hash not allowed".into(), }; - Self::send_response( + send_response( &self.config, &self.statistics, &mut self.socket, @@ -392,7 +392,7 @@ impl SocketWorker { ConnectedResponse::AnnounceIpv6(r) => Response::AnnounceIpv6(r), }; - Self::send_response( + send_response( &self.config, &self.statistics, &mut self.socket, @@ -403,71 +403,73 @@ impl SocketWorker { ); } } +} - fn send_response( - config: &Config, - statistics: &CachePaddedArc>, - socket: &mut UdpSocket, - buffer: &mut [u8], - opt_resend_buffer: &mut Option>, - response: Response, - canonical_addr: CanonicalSocketAddr, - ) { - let mut buffer = Cursor::new(&mut buffer[..]); +fn send_response( + config: &Config, + statistics: &CachePaddedArc>, + socket: &mut UdpSocket, + buffer: &mut [u8], + opt_resend_buffer: &mut Option>, + response: Response, + canonical_addr: CanonicalSocketAddr, +) { + let mut buffer = Cursor::new(&mut buffer[..]); - if let Err(err) = response.write_bytes(&mut buffer) { - ::log::error!("failed writing response to buffer: {:#}", err); + if let Err(err) = response.write_bytes(&mut buffer) { + ::log::error!("failed writing response to buffer: {:#}", err); - return; - } + return; + } - let bytes_written = buffer.position() as usize; + let bytes_written = buffer.position() as usize; - let addr = if config.network.address.is_ipv4() { - canonical_addr - .get_ipv4() - .expect("found peer ipv6 address while running bound to ipv4 address") - } else { - canonical_addr.get_ipv6_mapped() - }; + let addr = if config.network.address.is_ipv4() { + canonical_addr + .get_ipv4() + .expect("found peer ipv6 address while running bound to ipv4 address") + } else { + canonical_addr.get_ipv6_mapped() + }; - match socket.send_to(&buffer.into_inner()[..bytes_written], addr) { - Ok(amt) if config.statistics.active() => { - let stats = if canonical_addr.is_ipv4() { - let stats = &statistics.ipv4; + match socket.send_to(&buffer.into_inner()[..bytes_written], addr) { + Ok(amt) if config.statistics.active() => { + let stats = if canonical_addr.is_ipv4() { + let stats = &statistics.ipv4; - stats - .bytes_sent - .fetch_add(amt + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed); + stats + .bytes_sent + .fetch_add(amt + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed); - stats - } else { - let stats = &statistics.ipv6; + stats + } else { + let stats = &statistics.ipv6; - stats - .bytes_sent - .fetch_add(amt + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed); + stats + .bytes_sent + .fetch_add(amt + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed); - stats - }; + stats + }; - match response { - Response::Connect(_) => { - stats.responses_connect.fetch_add(1, Ordering::Relaxed); - } - Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { - stats.responses_announce.fetch_add(1, Ordering::Relaxed); - } - Response::Scrape(_) => { - stats.responses_scrape.fetch_add(1, Ordering::Relaxed); - } - Response::Error(_) => { - stats.responses_error.fetch_add(1, Ordering::Relaxed); - } + match response { + Response::Connect(_) => { + stats.responses_connect.fetch_add(1, Ordering::Relaxed); + } + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { + stats.responses_announce.fetch_add(1, Ordering::Relaxed); + } + Response::Scrape(_) => { + stats.responses_scrape.fetch_add(1, Ordering::Relaxed); + } + Response::Error(_) => { + stats.responses_error.fetch_add(1, Ordering::Relaxed); } } - Ok(_) => (), - Err(err) => match opt_resend_buffer.as_mut() { + } + Ok(_) => (), + Err(err) => { + match opt_resend_buffer.as_mut() { Some(resend_buffer) if (err.raw_os_error() == Some(libc::ENOBUFS)) || (err.kind() == ErrorKind::WouldBlock) => @@ -483,7 +485,7 @@ impl SocketWorker { _ => { ::log::warn!("Sending response to {} failed: {:#}", addr, err); } - }, + } } } }