mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-01 18:25:30 +00:00
udp: distribute announce swarm responses among socket workers
They don't have to be sent from the same worker that received the request, so we can decrease performance loss from underutilized threads this way.
This commit is contained in:
parent
7d17213a20
commit
b527af7195
2 changed files with 91 additions and 57 deletions
|
|
@ -171,13 +171,17 @@ impl ConnectedRequestSender {
|
||||||
|
|
||||||
pub struct ConnectedResponseSender {
|
pub struct ConnectedResponseSender {
|
||||||
senders: Vec<thingbuf::mpsc::blocking::Sender<ConnectedResponseWithAddr, Recycler>>,
|
senders: Vec<thingbuf::mpsc::blocking::Sender<ConnectedResponseWithAddr, Recycler>>,
|
||||||
|
to_any_last_index_picked: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnectedResponseSender {
|
impl ConnectedResponseSender {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
senders: Vec<thingbuf::mpsc::blocking::Sender<ConnectedResponseWithAddr, Recycler>>,
|
senders: Vec<thingbuf::mpsc::blocking::Sender<ConnectedResponseWithAddr, Recycler>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self { senders }
|
Self {
|
||||||
|
senders,
|
||||||
|
to_any_last_index_picked: 0,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn try_send_ref_to(
|
pub fn try_send_ref_to(
|
||||||
|
|
@ -193,6 +197,23 @@ impl ConnectedResponseSender {
|
||||||
) -> Result<SendRef<ConnectedResponseWithAddr>, thingbuf::mpsc::errors::Closed> {
|
) -> Result<SendRef<ConnectedResponseWithAddr>, thingbuf::mpsc::errors::Closed> {
|
||||||
self.senders[index.0].send_ref()
|
self.senders[index.0].send_ref()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn send_ref_to_any(
|
||||||
|
&mut self,
|
||||||
|
) -> Result<SendRef<ConnectedResponseWithAddr>, thingbuf::mpsc::errors::Closed> {
|
||||||
|
let start = self.to_any_last_index_picked + 1;
|
||||||
|
|
||||||
|
for i in (start..start + self.senders.len()).map(|i| i % self.senders.len()) {
|
||||||
|
if let Ok(sender) = self.senders[i].try_send_ref() {
|
||||||
|
self.to_any_last_index_picked = i;
|
||||||
|
|
||||||
|
return Ok(sender);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.to_any_last_index_picked = start % self.senders.len();
|
||||||
|
self.send_ref_to(SocketWorkerIndex(self.to_any_last_index_picked))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub type ConnectedResponseReceiver =
|
pub type ConnectedResponseReceiver =
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ pub fn run_swarm_worker(
|
||||||
state: State,
|
state: State,
|
||||||
server_start_instant: ServerStartInstant,
|
server_start_instant: ServerStartInstant,
|
||||||
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
|
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
|
||||||
response_sender: ConnectedResponseSender,
|
mut response_sender: ConnectedResponseSender,
|
||||||
statistics_sender: Sender<StatisticsMessage>,
|
statistics_sender: Sender<StatisticsMessage>,
|
||||||
worker_index: SwarmWorkerIndex,
|
worker_index: SwarmWorkerIndex,
|
||||||
) {
|
) {
|
||||||
|
|
@ -43,65 +43,78 @@ pub fn run_swarm_worker(
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) {
|
if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) {
|
||||||
// It is OK to block here as long as we don't do blocking sends
|
// It is OK to block here as long as we don't also do blocking
|
||||||
// in socket workers, which could cause a deadlock
|
// sends in socket workers (doing both could cause a deadlock)
|
||||||
match response_sender.send_ref_to(sender_index) {
|
match (request, src.get().ip()) {
|
||||||
Ok(mut send_ref) => {
|
(ConnectedRequest::Announce(request), IpAddr::V4(ip)) => {
|
||||||
|
// It doesn't matter which socket worker receives announce responses
|
||||||
|
let mut send_ref = response_sender
|
||||||
|
.send_ref_to_any()
|
||||||
|
.expect("swarm response channel is closed");
|
||||||
|
|
||||||
send_ref.addr = src;
|
send_ref.addr = src;
|
||||||
|
send_ref.kind = ConnectedResponseKind::AnnounceIpv4;
|
||||||
|
|
||||||
match (request, src.get().ip()) {
|
torrents
|
||||||
(ConnectedRequest::Announce(request), IpAddr::V4(ip)) => {
|
.ipv4
|
||||||
send_ref.kind = ConnectedResponseKind::AnnounceIpv4;
|
.0
|
||||||
|
.entry(request.info_hash)
|
||||||
torrents
|
.or_default()
|
||||||
.ipv4
|
.announce(
|
||||||
.0
|
&config,
|
||||||
.entry(request.info_hash)
|
&statistics_sender,
|
||||||
.or_default()
|
&mut rng,
|
||||||
.announce(
|
&request,
|
||||||
&config,
|
ip.into(),
|
||||||
&statistics_sender,
|
peer_valid_until,
|
||||||
&mut rng,
|
&mut send_ref.announce_ipv4,
|
||||||
&request,
|
);
|
||||||
ip.into(),
|
|
||||||
peer_valid_until,
|
|
||||||
&mut send_ref.announce_ipv4,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
(ConnectedRequest::Announce(request), IpAddr::V6(ip)) => {
|
|
||||||
send_ref.kind = ConnectedResponseKind::AnnounceIpv6;
|
|
||||||
|
|
||||||
torrents
|
|
||||||
.ipv6
|
|
||||||
.0
|
|
||||||
.entry(request.info_hash)
|
|
||||||
.or_default()
|
|
||||||
.announce(
|
|
||||||
&config,
|
|
||||||
&statistics_sender,
|
|
||||||
&mut rng,
|
|
||||||
&request,
|
|
||||||
ip.into(),
|
|
||||||
peer_valid_until,
|
|
||||||
&mut send_ref.announce_ipv6,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
(ConnectedRequest::Scrape(request), IpAddr::V4(_)) => {
|
|
||||||
send_ref.kind = ConnectedResponseKind::Scrape;
|
|
||||||
|
|
||||||
torrents.ipv4.scrape(request, &mut send_ref.scrape);
|
|
||||||
}
|
|
||||||
(ConnectedRequest::Scrape(request), IpAddr::V6(_)) => {
|
|
||||||
send_ref.kind = ConnectedResponseKind::Scrape;
|
|
||||||
|
|
||||||
torrents.ipv6.scrape(request, &mut send_ref.scrape);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
Err(_) => {
|
(ConnectedRequest::Announce(request), IpAddr::V6(ip)) => {
|
||||||
panic!("swarm response channel closed");
|
// It doesn't matter which socket worker receives announce responses
|
||||||
|
let mut send_ref = response_sender
|
||||||
|
.send_ref_to_any()
|
||||||
|
.expect("swarm response channel is closed");
|
||||||
|
|
||||||
|
send_ref.addr = src;
|
||||||
|
send_ref.kind = ConnectedResponseKind::AnnounceIpv6;
|
||||||
|
|
||||||
|
torrents
|
||||||
|
.ipv6
|
||||||
|
.0
|
||||||
|
.entry(request.info_hash)
|
||||||
|
.or_default()
|
||||||
|
.announce(
|
||||||
|
&config,
|
||||||
|
&statistics_sender,
|
||||||
|
&mut rng,
|
||||||
|
&request,
|
||||||
|
ip.into(),
|
||||||
|
peer_valid_until,
|
||||||
|
&mut send_ref.announce_ipv6,
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
(ConnectedRequest::Scrape(request), IpAddr::V4(_)) => {
|
||||||
|
let mut send_ref = response_sender
|
||||||
|
.send_ref_to(sender_index)
|
||||||
|
.expect("swarm response channel is closed");
|
||||||
|
|
||||||
|
send_ref.addr = src;
|
||||||
|
send_ref.kind = ConnectedResponseKind::Scrape;
|
||||||
|
|
||||||
|
torrents.ipv4.scrape(request, &mut send_ref.scrape);
|
||||||
|
}
|
||||||
|
(ConnectedRequest::Scrape(request), IpAddr::V6(_)) => {
|
||||||
|
let mut send_ref = response_sender
|
||||||
|
.send_ref_to(sender_index)
|
||||||
|
.expect("swarm response channel is closed");
|
||||||
|
|
||||||
|
send_ref.addr = src;
|
||||||
|
send_ref.kind = ConnectedResponseKind::Scrape;
|
||||||
|
|
||||||
|
torrents.ipv6.scrape(request, &mut send_ref.scrape);
|
||||||
|
}
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run periodic tasks
|
// Run periodic tasks
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue