mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
udp: fix/silence clippy warnings
This commit is contained in:
parent
5401eaf85f
commit
9d1bba5e92
8 changed files with 137 additions and 123 deletions
|
|
@ -22,6 +22,7 @@ use common::{
|
|||
};
|
||||
use config::Config;
|
||||
use workers::socket::ConnectionValidator;
|
||||
use workers::swarm::SwarmWorker;
|
||||
|
||||
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
|
||||
pub const APP_VERSION: &str = env!("CARGO_PKG_VERSION");
|
||||
|
|
@ -79,16 +80,18 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
|
|||
WorkerIndex::SwarmWorker(i),
|
||||
);
|
||||
|
||||
workers::swarm::run_swarm_worker(
|
||||
sentinel,
|
||||
let mut worker = SwarmWorker {
|
||||
_sentinel: sentinel,
|
||||
config,
|
||||
state,
|
||||
server_start_instant,
|
||||
request_receiver,
|
||||
response_sender,
|
||||
statistics_sender,
|
||||
SwarmWorkerIndex(i),
|
||||
)
|
||||
worker_index: SwarmWorkerIndex(i),
|
||||
};
|
||||
|
||||
worker.run();
|
||||
})
|
||||
.with_context(|| "spawn swarm worker")?;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ pub struct SocketWorker {
|
|||
}
|
||||
|
||||
impl SocketWorker {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn run(
|
||||
_sentinel: PanicSentinel,
|
||||
shared_state: State,
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ const EXTRA_PACKET_SIZE_IPV4: usize = 8 + 18 + 20 + 8;
|
|||
/// - 8 bit udp header
|
||||
const EXTRA_PACKET_SIZE_IPV6: usize = 8 + 18 + 40 + 8;
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn run_socket_worker(
|
||||
sentinel: PanicSentinel,
|
||||
shared_state: State,
|
||||
|
|
|
|||
|
|
@ -130,9 +130,10 @@ mod tests {
|
|||
return TestResult::discard();
|
||||
}
|
||||
|
||||
let mut config = Config::default();
|
||||
|
||||
config.swarm_workers = swarm_workers as usize;
|
||||
let config = Config {
|
||||
swarm_workers: swarm_workers as usize,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let valid_until = ValidUntil::new(ServerStartInstant::new(), 1);
|
||||
|
||||
|
|
|
|||
|
|
@ -96,6 +96,7 @@ pub struct SocketWorker {
|
|||
}
|
||||
|
||||
impl SocketWorker {
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub fn run(
|
||||
_sentinel: PanicSentinel,
|
||||
shared_state: State,
|
||||
|
|
@ -136,7 +137,7 @@ impl SocketWorker {
|
|||
.build()
|
||||
.unwrap();
|
||||
|
||||
let recv_sqe = recv_helper.create_entry(buf_ring.bgid().try_into().unwrap());
|
||||
let recv_sqe = recv_helper.create_entry(buf_ring.bgid());
|
||||
|
||||
// This timeout enables regular updates of pending_scrape_valid_until
|
||||
// and wakes the main loop to send any pending responses in the case
|
||||
|
|
@ -209,7 +210,7 @@ impl SocketWorker {
|
|||
// Enqueue local responses
|
||||
for _ in 0..sq_space {
|
||||
if let Some((response, addr)) = self.local_responses.pop_front() {
|
||||
match self.send_buffers.prepare_entry(response.into(), addr) {
|
||||
match self.send_buffers.prepare_entry(response, addr) {
|
||||
Ok(entry) => {
|
||||
unsafe { ring.submission().push(&entry).unwrap() };
|
||||
|
||||
|
|
@ -471,11 +472,11 @@ impl SocketWorker {
|
|||
let worker_index =
|
||||
SwarmWorkerIndex::from_info_hash(&self.config, request.info_hash);
|
||||
|
||||
if let Err(_) = self.request_sender.try_send_to(
|
||||
worker_index,
|
||||
ConnectedRequest::Announce(request),
|
||||
src,
|
||||
) {
|
||||
if self
|
||||
.request_sender
|
||||
.try_send_to(worker_index, ConnectedRequest::Announce(request), src)
|
||||
.is_err()
|
||||
{
|
||||
::log::warn!("request sender full, dropping request");
|
||||
}
|
||||
} else {
|
||||
|
|
@ -500,11 +501,11 @@ impl SocketWorker {
|
|||
);
|
||||
|
||||
for (swarm_worker_index, request) in split_requests {
|
||||
if let Err(_) = self.request_sender.try_send_to(
|
||||
swarm_worker_index,
|
||||
ConnectedRequest::Scrape(request),
|
||||
src,
|
||||
) {
|
||||
if self
|
||||
.request_sender
|
||||
.try_send_to(swarm_worker_index, ConnectedRequest::Scrape(request), src)
|
||||
.is_err()
|
||||
{
|
||||
::log::warn!("request sender full, dropping request");
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ use crate::config::Config;
|
|||
|
||||
use super::{SOCKET_IDENTIFIER, USER_DATA_RECV};
|
||||
|
||||
#[allow(clippy::enum_variant_names)]
|
||||
pub enum Error {
|
||||
RecvMsgParseError,
|
||||
RecvMsgTruncated,
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ impl SendBuffers {
|
|||
self.likely_next_free_index = 0;
|
||||
}
|
||||
|
||||
pub fn prepare_entry<'a>(
|
||||
pub fn prepare_entry(
|
||||
&mut self,
|
||||
response: Response,
|
||||
addr: CanonicalSocketAddr,
|
||||
|
|
|
|||
|
|
@ -17,122 +17,128 @@ use crate::config::Config;
|
|||
|
||||
use storage::TorrentMaps;
|
||||
|
||||
pub fn run_swarm_worker(
|
||||
_sentinel: PanicSentinel,
|
||||
config: Config,
|
||||
state: State,
|
||||
server_start_instant: ServerStartInstant,
|
||||
request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
|
||||
mut response_sender: ConnectedResponseSender,
|
||||
statistics_sender: Sender<StatisticsMessage>,
|
||||
worker_index: SwarmWorkerIndex,
|
||||
) {
|
||||
let mut torrents = TorrentMaps::default();
|
||||
let mut rng = SmallRng::from_entropy();
|
||||
pub struct SwarmWorker {
|
||||
pub _sentinel: PanicSentinel,
|
||||
pub config: Config,
|
||||
pub state: State,
|
||||
pub server_start_instant: ServerStartInstant,
|
||||
pub request_receiver: Receiver<(SocketWorkerIndex, ConnectedRequest, CanonicalSocketAddr)>,
|
||||
pub response_sender: ConnectedResponseSender,
|
||||
pub statistics_sender: Sender<StatisticsMessage>,
|
||||
pub worker_index: SwarmWorkerIndex,
|
||||
}
|
||||
|
||||
let timeout = Duration::from_millis(config.request_channel_recv_timeout_ms);
|
||||
let mut peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
|
||||
impl SwarmWorker {
|
||||
pub fn run(&mut self) {
|
||||
let mut torrents = TorrentMaps::default();
|
||||
let mut rng = SmallRng::from_entropy();
|
||||
|
||||
let cleaning_interval = Duration::from_secs(config.cleaning.torrent_cleaning_interval);
|
||||
let statistics_update_interval = Duration::from_secs(config.statistics.interval);
|
||||
let timeout = Duration::from_millis(self.config.request_channel_recv_timeout_ms);
|
||||
let mut peer_valid_until =
|
||||
ValidUntil::new(self.server_start_instant, self.config.cleaning.max_peer_age);
|
||||
|
||||
let mut last_cleaning = Instant::now();
|
||||
let mut last_statistics_update = Instant::now();
|
||||
let cleaning_interval = Duration::from_secs(self.config.cleaning.torrent_cleaning_interval);
|
||||
let statistics_update_interval = Duration::from_secs(self.config.statistics.interval);
|
||||
|
||||
let mut iter_counter = 0usize;
|
||||
let mut last_cleaning = Instant::now();
|
||||
let mut last_statistics_update = Instant::now();
|
||||
|
||||
loop {
|
||||
if let Ok((sender_index, request, src)) = request_receiver.recv_timeout(timeout) {
|
||||
// It is OK to block here as long as we don't also do blocking
|
||||
// sends in socket workers (doing both could cause a deadlock)
|
||||
match (request, src.get().ip()) {
|
||||
(ConnectedRequest::Announce(request), IpAddr::V4(ip)) => {
|
||||
let response = torrents
|
||||
.ipv4
|
||||
.0
|
||||
.entry(request.info_hash)
|
||||
.or_default()
|
||||
.announce(
|
||||
&config,
|
||||
&statistics_sender,
|
||||
&mut rng,
|
||||
&request,
|
||||
ip.into(),
|
||||
peer_valid_until,
|
||||
);
|
||||
let mut iter_counter = 0usize;
|
||||
|
||||
// It doesn't matter which socket worker receives announce responses
|
||||
response_sender
|
||||
.send_to_any(src, ConnectedResponse::AnnounceIpv4(response))
|
||||
.expect("swarm response channel is closed");
|
||||
}
|
||||
(ConnectedRequest::Announce(request), IpAddr::V6(ip)) => {
|
||||
let response = torrents
|
||||
.ipv6
|
||||
.0
|
||||
.entry(request.info_hash)
|
||||
.or_default()
|
||||
.announce(
|
||||
&config,
|
||||
&statistics_sender,
|
||||
&mut rng,
|
||||
&request,
|
||||
ip.into(),
|
||||
peer_valid_until,
|
||||
);
|
||||
loop {
|
||||
if let Ok((sender_index, request, src)) = self.request_receiver.recv_timeout(timeout) {
|
||||
// It is OK to block here as long as we don't also do blocking
|
||||
// sends in socket workers (doing both could cause a deadlock)
|
||||
match (request, src.get().ip()) {
|
||||
(ConnectedRequest::Announce(request), IpAddr::V4(ip)) => {
|
||||
let response = torrents
|
||||
.ipv4
|
||||
.0
|
||||
.entry(request.info_hash)
|
||||
.or_default()
|
||||
.announce(
|
||||
&self.config,
|
||||
&self.statistics_sender,
|
||||
&mut rng,
|
||||
&request,
|
||||
ip.into(),
|
||||
peer_valid_until,
|
||||
);
|
||||
|
||||
// It doesn't matter which socket worker receives announce responses
|
||||
response_sender
|
||||
.send_to_any(src, ConnectedResponse::AnnounceIpv6(response))
|
||||
.expect("swarm response channel is closed");
|
||||
}
|
||||
(ConnectedRequest::Scrape(request), IpAddr::V4(_)) => {
|
||||
let response = torrents.ipv4.scrape(request);
|
||||
// It doesn't matter which socket worker receives announce responses
|
||||
self.response_sender
|
||||
.send_to_any(src, ConnectedResponse::AnnounceIpv4(response))
|
||||
.expect("swarm response channel is closed");
|
||||
}
|
||||
(ConnectedRequest::Announce(request), IpAddr::V6(ip)) => {
|
||||
let response = torrents
|
||||
.ipv6
|
||||
.0
|
||||
.entry(request.info_hash)
|
||||
.or_default()
|
||||
.announce(
|
||||
&self.config,
|
||||
&self.statistics_sender,
|
||||
&mut rng,
|
||||
&request,
|
||||
ip.into(),
|
||||
peer_valid_until,
|
||||
);
|
||||
|
||||
response_sender
|
||||
.send_to(sender_index, src, ConnectedResponse::Scrape(response))
|
||||
.expect("swarm response channel is closed");
|
||||
}
|
||||
(ConnectedRequest::Scrape(request), IpAddr::V6(_)) => {
|
||||
let response = torrents.ipv6.scrape(request);
|
||||
// It doesn't matter which socket worker receives announce responses
|
||||
self.response_sender
|
||||
.send_to_any(src, ConnectedResponse::AnnounceIpv6(response))
|
||||
.expect("swarm response channel is closed");
|
||||
}
|
||||
(ConnectedRequest::Scrape(request), IpAddr::V4(_)) => {
|
||||
let response = torrents.ipv4.scrape(request);
|
||||
|
||||
response_sender
|
||||
.send_to(sender_index, src, ConnectedResponse::Scrape(response))
|
||||
.expect("swarm response channel is closed");
|
||||
}
|
||||
};
|
||||
}
|
||||
self.response_sender
|
||||
.send_to(sender_index, src, ConnectedResponse::Scrape(response))
|
||||
.expect("swarm response channel is closed");
|
||||
}
|
||||
(ConnectedRequest::Scrape(request), IpAddr::V6(_)) => {
|
||||
let response = torrents.ipv6.scrape(request);
|
||||
|
||||
// Run periodic tasks
|
||||
if iter_counter % 128 == 0 {
|
||||
let now = Instant::now();
|
||||
|
||||
peer_valid_until = ValidUntil::new(server_start_instant, config.cleaning.max_peer_age);
|
||||
|
||||
if now > last_cleaning + cleaning_interval {
|
||||
torrents.clean_and_update_statistics(
|
||||
&config,
|
||||
&state,
|
||||
&statistics_sender,
|
||||
&state.access_list,
|
||||
server_start_instant,
|
||||
worker_index,
|
||||
);
|
||||
|
||||
last_cleaning = now;
|
||||
self.response_sender
|
||||
.send_to(sender_index, src, ConnectedResponse::Scrape(response))
|
||||
.expect("swarm response channel is closed");
|
||||
}
|
||||
};
|
||||
}
|
||||
if config.statistics.active()
|
||||
&& now > last_statistics_update + statistics_update_interval
|
||||
{
|
||||
state.statistics_ipv4.torrents[worker_index.0]
|
||||
.store(torrents.ipv4.num_torrents(), Ordering::Release);
|
||||
state.statistics_ipv6.torrents[worker_index.0]
|
||||
.store(torrents.ipv6.num_torrents(), Ordering::Release);
|
||||
|
||||
last_statistics_update = now;
|
||||
// Run periodic tasks
|
||||
if iter_counter % 128 == 0 {
|
||||
let now = Instant::now();
|
||||
|
||||
peer_valid_until =
|
||||
ValidUntil::new(self.server_start_instant, self.config.cleaning.max_peer_age);
|
||||
|
||||
if now > last_cleaning + cleaning_interval {
|
||||
torrents.clean_and_update_statistics(
|
||||
&self.config,
|
||||
&self.state,
|
||||
&self.statistics_sender,
|
||||
&self.state.access_list,
|
||||
self.server_start_instant,
|
||||
self.worker_index,
|
||||
);
|
||||
|
||||
last_cleaning = now;
|
||||
}
|
||||
if self.config.statistics.active()
|
||||
&& now > last_statistics_update + statistics_update_interval
|
||||
{
|
||||
self.state.statistics_ipv4.torrents[self.worker_index.0]
|
||||
.store(torrents.ipv4.num_torrents(), Ordering::Release);
|
||||
self.state.statistics_ipv6.torrents[self.worker_index.0]
|
||||
.store(torrents.ipv6.num_torrents(), Ordering::Release);
|
||||
|
||||
last_statistics_update = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
iter_counter = iter_counter.wrapping_add(1);
|
||||
iter_counter = iter_counter.wrapping_add(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue