diff --git a/TODO.md b/TODO.md index c724d87..875687e 100644 --- a/TODO.md +++ b/TODO.md @@ -3,7 +3,6 @@ ## High priority * udp - * consider ways of avoiding response peer allocations * make ConnectionValidator faster by avoiding calling time functions so often ## Medium priority diff --git a/crates/udp/src/workers/socket/uring/mod.rs b/crates/udp/src/workers/socket/uring/mod.rs index fc3ab99..299209c 100644 --- a/crates/udp/src/workers/socket/uring/mod.rs +++ b/crates/udp/src/workers/socket/uring/mod.rs @@ -85,7 +85,7 @@ pub struct SocketWorker { buf_ring: BufRing, send_buffers: SendBuffers, recv_helper: RecvHelper, - local_responses: VecDeque<(Response, CanonicalSocketAddr)>, + local_responses: VecDeque<(CanonicalSocketAddr, Response)>, resubmittable_sqe_buf: Vec, recv_sqe: io_uring::squeue::Entry, pulse_timeout_sqe: io_uring::squeue::Entry, @@ -192,7 +192,7 @@ impl SocketWorker { // Enqueue local responses for _ in 0..sq_space { - if let Some((response, addr)) = self.local_responses.pop_front() { + if let Some((addr, response)) = self.local_responses.pop_front() { match self.send_buffers.prepare_entry(response, addr) { Ok(entry) => { unsafe { ring.submission().push(&entry).unwrap() }; @@ -200,7 +200,7 @@ impl SocketWorker { num_send_added += 1; } Err(send_buffers::Error::NoBuffers(response)) => { - self.local_responses.push_front((response, addr)); + self.local_responses.push_front((addr, response)); break; } @@ -231,7 +231,9 @@ impl SocketWorker { fn handle_cqe(&mut self, cqe: io_uring::cqueue::Entry) { match cqe.user_data() { USER_DATA_RECV => { - self.handle_recv_cqe(&cqe); + if let Some((addr, response)) = self.handle_recv_cqe(&cqe) { + self.local_responses.push_back((addr, response)); + } if !io_uring::cqueue::more(cqe.flags()) { self.resubmittable_sqe_buf.push(self.recv_sqe.clone()); @@ -290,12 +292,15 @@ impl SocketWorker { } } - fn handle_recv_cqe(&mut self, cqe: &io_uring::cqueue::Entry) { + fn handle_recv_cqe( + &mut self, + cqe: &io_uring::cqueue::Entry, + ) -> Option<(CanonicalSocketAddr, Response)> { let result = cqe.result(); if result < 0 { if -result == libc::ENOBUFS { - ::log::info!("recv failed due to lack of buffers. If increasing ring size doesn't help, get faster hardware"); + ::log::info!("recv failed due to lack of buffers, try increasing ring size"); } else { ::log::warn!( "recv failed: {:#}", @@ -303,7 +308,7 @@ impl SocketWorker { ); } - return; + return None; } let buffer = unsafe { @@ -312,23 +317,48 @@ impl SocketWorker { Ok(None) => { ::log::error!("Couldn't get recv buffer"); - return; + return None; } Err(err) => { ::log::error!("Couldn't get recv buffer: {:#}", err); - return; + return None; } } }; - let addr = match self.recv_helper.parse(buffer.as_slice()) { + match self.recv_helper.parse(buffer.as_slice()) { Ok((request, addr)) => { - self.handle_request(request, addr); + if self.config.statistics.active() { + let (statistics, extra_bytes) = if addr.is_ipv4() { + (&self.statistics.ipv4, EXTRA_PACKET_SIZE_IPV4) + } else { + (&self.statistics.ipv6, EXTRA_PACKET_SIZE_IPV6) + }; - addr + statistics + .bytes_received + .fetch_add(buffer.len() + extra_bytes, Ordering::Relaxed); + statistics.requests.fetch_add(1, Ordering::Relaxed); + } + + return self.handle_request(request, addr); } Err(self::recv_helper::Error::RequestParseError(err, addr)) => { + if self.config.statistics.active() { + if addr.is_ipv4() { + self.statistics + .ipv4 + .bytes_received + .fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed); + } else { + self.statistics + .ipv6 + .bytes_received + .fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed); + } + } + match err { RequestParseError::Sendable { connection_id, @@ -343,60 +373,43 @@ impl SocketWorker { message: err.into(), }; - self.local_responses.push_back((response.into(), addr)); + return Some((addr, Response::Error(response))); } } RequestParseError::Unsendable { err } => { ::log::debug!("Couldn't parse request from {:?}: {}", addr, err); } } - - addr } Err(self::recv_helper::Error::InvalidSocketAddress) => { ::log::debug!("Ignored request claiming to be from port 0"); - - return; } Err(self::recv_helper::Error::RecvMsgParseError) => { ::log::error!("RecvMsgOut::parse failed"); - - return; } Err(self::recv_helper::Error::RecvMsgTruncated) => { ::log::warn!("RecvMsgOut::parse failed: sockaddr or payload truncated"); - - return; } - }; - - if self.config.statistics.active() { - let (statistics, extra_bytes) = if addr.is_ipv4() { - (&self.statistics.ipv4, EXTRA_PACKET_SIZE_IPV4) - } else { - (&self.statistics.ipv6, EXTRA_PACKET_SIZE_IPV6) - }; - - statistics - .bytes_received - .fetch_add(buffer.len() + extra_bytes, Ordering::Relaxed); - statistics.requests.fetch_add(1, Ordering::Relaxed); } + + None } - fn handle_request(&mut self, request: Request, src: CanonicalSocketAddr) { + fn handle_request( + &mut self, + request: Request, + src: CanonicalSocketAddr, + ) -> Option<(CanonicalSocketAddr, Response)> { let access_list_mode = self.config.access_list.mode; match request { Request::Connect(request) => { - let connection_id = self.validator.create_connection_id(src); - let response = Response::Connect(ConnectResponse { - connection_id, + connection_id: self.validator.create_connection_id(src), transaction_id: request.transaction_id, }); - self.local_responses.push_back((response, src)); + return Some((src, response)); } Request::Announce(request) => { if self @@ -417,14 +430,14 @@ impl SocketWorker { self.peer_valid_until, ); - self.local_responses.push_back((response, src)); + return Some((src, response)); } else { let response = Response::Error(ErrorResponse { transaction_id: request.transaction_id, message: "Info hash not allowed".into(), }); - self.local_responses.push_back((response, src)) + return Some((src, response)); } } } @@ -436,10 +449,12 @@ impl SocketWorker { let response = Response::Scrape(self.shared_state.torrent_maps.scrape(request, src)); - self.local_responses.push_back((response, src)); + return Some((src, response)); } } } + + None } }