diff --git a/crates/udp/src/common.rs b/crates/udp/src/common.rs index 439a497..8cc322c 100644 --- a/crates/udp/src/common.rs +++ b/crates/udp/src/common.rs @@ -171,13 +171,17 @@ impl ConnectedRequestSender { pub struct ConnectedResponseSender { senders: Vec>, + to_any_last_index_picked: usize, } impl ConnectedResponseSender { pub fn new( senders: Vec>, ) -> Self { - Self { senders } + Self { + senders, + to_any_last_index_picked: 0, + } } pub fn try_send_ref_to( @@ -193,6 +197,23 @@ impl ConnectedResponseSender { ) -> Result, thingbuf::mpsc::errors::Closed> { self.senders[index.0].send_ref() } + + pub fn send_ref_to_any( + &mut self, + ) -> Result, thingbuf::mpsc::errors::Closed> { + let start = self.to_any_last_index_picked + 1; + + for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) { + if let Ok(sender) = self.senders[i].try_send_ref() { + self.to_any_last_index_picked = i; + + return Ok(sender); + } + } + + self.to_any_last_index_picked = start % self.senders.len(); + self.send_ref_to(SocketWorkerIndex(self.to_any_last_index_picked)) + } } pub type ConnectedResponseReceiver = diff --git a/crates/udp/src/workers/swarm/mod.rs b/crates/udp/src/workers/swarm/mod.rs index 43ee480..ea551f3 100644 --- a/crates/udp/src/workers/swarm/mod.rs +++ b/crates/udp/src/workers/swarm/mod.rs @@ -23,7 +23,7 @@ pub fn run_swarm_worker( state: State, server_start_instant: ServerStartInstant, request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>, - response_sender: ConnectedResponseSender, + mut response_sender: ConnectedResponseSender, statistics_sender: Sender, worker_index: SwarmWorkerIndex, ) { @@ -43,65 +43,78 @@ pub fn run_swarm_worker( loop { if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) { - // It is OK to block here as long as we don't do blocking sends - // in socket workers, which could cause a deadlock - match response_sender.send_ref_to(sender_index) { - Ok(mut send_ref) => { + // It is OK to block here as long as we don't also do blocking + // sends in socket workers (doing both could cause a deadlock) + match (request, src.get().ip()) { + (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { + // It doesn't matter which socket worker receives announce responses + let mut send_ref = response_sender + .send_ref_to_any() + .expect("swarm response channel is closed"); + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::AnnounceIpv4; - match (request, src.get().ip()) { - (ConnectedRequest::Announce(request), IpAddr::V4(ip)) => { - send_ref.kind = ConnectedResponseKind::AnnounceIpv4; - - torrents - .ipv4 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - &mut send_ref.announce_ipv4, - ); - } - (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { - send_ref.kind = ConnectedResponseKind::AnnounceIpv6; - - torrents - .ipv6 - .0 - .entry(request.info_hash) - .or_default() - .announce( - &config, - &statistics_sender, - &mut rng, - &request, - ip.into(), - peer_valid_until, - &mut send_ref.announce_ipv6, - ); - } - (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv4.scrape(request, &mut send_ref.scrape); - } - (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { - send_ref.kind = ConnectedResponseKind::Scrape; - - torrents.ipv6.scrape(request, &mut send_ref.scrape); - } - }; + torrents + .ipv4 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &config, + &statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + &mut send_ref.announce_ipv4, + ); } - Err(_) => { - panic!("swarm response channel closed"); + (ConnectedRequest::Announce(request), IpAddr::V6(ip)) => { + // It doesn't matter which socket worker receives announce responses + let mut send_ref = response_sender + .send_ref_to_any() + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::AnnounceIpv6; + + torrents + .ipv6 + .0 + .entry(request.info_hash) + .or_default() + .announce( + &config, + &statistics_sender, + &mut rng, + &request, + ip.into(), + peer_valid_until, + &mut send_ref.announce_ipv6, + ); } - } + (ConnectedRequest::Scrape(request), IpAddr::V4(_)) => { + let mut send_ref = response_sender + .send_ref_to(sender_index) + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::Scrape; + + torrents.ipv4.scrape(request, &mut send_ref.scrape); + } + (ConnectedRequest::Scrape(request), IpAddr::V6(_)) => { + let mut send_ref = response_sender + .send_ref_to(sender_index) + .expect("swarm response channel is closed"); + + send_ref.addr = src; + send_ref.kind = ConnectedResponseKind::Scrape; + + torrents.ipv6.scrape(request, &mut send_ref.scrape); + } + }; } // Run periodic tasks