From b527af7195738de2a12b036edc19c2b6a9b8f11a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Mon, 1 Jan 2024 13:32:28 +0100 Subject: [PATCH] udp: distribute announce swarm responses among socket workers They don't have to be sent from the same worker that received the request, so we can decrease performance loss from underutilized threads this way. --- crates/udp/src/common.rs | 23 ++++- crates/udp/src/workers/swarm/mod.rs | 125 +++++++++++++++------------- 2 files changed, 91 insertions(+), 57 deletions(-) diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index 439a497..8cc322c 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -171,13 +171,17 @@ impl ConnectedRequestSender { pub struct ConnectedResponseSender { senders: Vec>, + to_any_last_index_picked: usize, } impl ConnectedResponseSender { pub fn new( senders: Vec>, ) -> Self { - Self { senders } + Self { + senders, + to_any_last_index_picked: 0, + } } pub fn try_send_ref_to( @@ -193,6 +197,23 @@ impl ConnectedResponseSender { ) -> Result, thingbuf::mpsc::errors::Closed> { self.senders[index.0].send_ref() } + + pub fn send_ref_to_any( + &mut self, + ) -> Result, thingbuf::mpsc::errors::Closed> { + let start = self.to_any_last_index_picked + 1; + + for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { + if let Ok(sender) = self.senders[i].try_send_ref() { + self.to_any_last_index_picked = i; + + return Ok(sender); + } + } + + self.to_any_last_index_picked = start % self.senders.len(); + self.send_ref_to(SocketWorkerIndex(self.to_any_last_index_picked)) + } } pub type ConnectedResponseReceiver = diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index 43ee480..ea551f3 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -23,7 +23,7 @@ pub fn run_swarm_worker( state: State, server_start_instant: ServerStartInstant, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, - response_sender: ConnectedResponseSender, + mut response_sender: ConnectedResponseSender, statistics_sender: Sender, worker_index: SwarmWorkerIndex, ) { @@ -43,65 +43,78 @@ pub fn run_swarm_worker( loop { if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { - // It is OK to block here as long as we don't do blocking sends - // in socket workers, which could cause a deadlock - match response_sender.send_ref_to(sender_index) { - Ok(mut send_ref) => { + // It is OK to block here as long as we don't also do blocking + // sends in socket workers (doing both could cause a deadlock) + match (request, src.get().ip()) { + (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { + // It doesn't matter which socket worker receives announce responses + let mut send_ref = response_sender + .send_ref_to_any() + .expect("swarm response channel is closed"); + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::AnnounceIpv4; - match (request, src.get().ip()) { - (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - send_ref.kind = ConnectedResponseKind::AnnounceIpv4; - - torrents - .ipv4 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - &mut send_ref.announce_ipv4, - ); - } - (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - send_ref.kind = ConnectedResponseKind::AnnounceIpv6; - - torrents - .ipv6 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - &mut send_ref.announce_ipv6, - ); - } - (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv4.scrape(request, &mut send_ref.scrape); - } - (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv6.scrape(request, &mut send_ref.scrape); - } - }; + torrents + .ipv4 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &config, + &statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + &mut send_ref.announce_ipv4, + ); } - Err(_) => { - panic!("swarm response channel closed"); + (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { + // It doesn't matter which socket worker receives announce responses + let mut send_ref = response_sender + .send_ref_to_any() + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::AnnounceIpv6; + + torrents + .ipv6 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &config, + &statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + &mut send_ref.announce_ipv6, + ); } - } + (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { + let mut send_ref = response_sender + .send_ref_to(sender_index) + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::Scrape; + + torrents.ipv4.scrape(request, &mut send_ref.scrape); + } + (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { + let mut send_ref = response_sender + .send_ref_to(sender_index) + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::Scrape; + + torrents.ipv6.scrape(request, &mut send_ref.scrape); + } + }; } // Run periodic tasks