diff --git a/aquatic_udp/src/lib/common.rs b/aquatic_udp/src/lib/common.rs index 132666c..3f48017 100644 --- a/aquatic_udp/src/lib/common.rs +++ b/aquatic_udp/src/lib/common.rs @@ -35,6 +35,20 @@ pub enum ConnectedRequest { Scrape(ScrapeRequest), } +pub enum ConnectedResponse { + Announce(AnnounceResponse), + Scrape(ScrapeResponse), +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::Announce(response) => Response::Announce(response), + Self::Scrape(response) => Response::Scrape(response), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct ConnectionKey { pub connection_id: ConnectionId, diff --git a/aquatic_udp/src/lib/handlers/announce.rs b/aquatic_udp/src/lib/handlers/announce.rs index bda60d6..913a0d6 100644 --- a/aquatic_udp/src/lib/handlers/announce.rs +++ b/aquatic_udp/src/lib/handlers/announce.rs @@ -16,7 +16,7 @@ pub fn handle_announce_requests( torrents: &mut MutexGuard, rng: &mut SmallRng, requests: Drain<(AnnounceRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, + responses: &mut Vec<(ConnectedResponse, SocketAddr)>, ) { let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); @@ -42,7 +42,7 @@ pub fn handle_announce_requests( ), }; - (Response::Announce(response), src) + (ConnectedResponse::Announce(response), src) })); } diff --git a/aquatic_udp/src/lib/handlers/mod.rs b/aquatic_udp/src/lib/handlers/mod.rs index 5cb7d39..a7597e0 100644 --- a/aquatic_udp/src/lib/handlers/mod.rs +++ b/aquatic_udp/src/lib/handlers/mod.rs @@ -23,12 +23,12 @@ pub fn run_request_worker( state: State, config: Config, request_receiver: Receiver<(ConnectedRequest, SocketAddr)>, - response_sender: Sender<(Response, SocketAddr)>, + response_sender: Sender<(ConnectedResponse, SocketAddr)>, ) { let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); - let mut responses: Vec<(Response, SocketAddr)> = Vec::new(); + let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new(); let mut std_rng = StdRng::from_entropy(); let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); diff --git a/aquatic_udp/src/lib/handlers/scrape.rs b/aquatic_udp/src/lib/handlers/scrape.rs index 8198bd8..b544ccf 100644 --- a/aquatic_udp/src/lib/handlers/scrape.rs +++ b/aquatic_udp/src/lib/handlers/scrape.rs @@ -12,7 +12,7 @@ use crate::common::*; pub fn handle_scrape_requests( torrents: &mut MutexGuard, requests: Drain<(ScrapeRequest, SocketAddr)>, - responses: &mut Vec<(Response, SocketAddr)>, + responses: &mut Vec<(ConnectedResponse, SocketAddr)>, ) { let empty_stats = create_torrent_scrape_statistics(0, 0); @@ -45,7 +45,7 @@ pub fn handle_scrape_requests( } } - let response = Response::Scrape(ScrapeResponse { + let response = ConnectedResponse::Scrape(ScrapeResponse { transaction_id: request.transaction_id, torrent_stats: stats, }); diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index ccea096..c8cd776 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -23,7 +23,7 @@ pub fn run_socket_worker( config: Config, token_num: usize, request_sender: Sender<(ConnectedRequest, SocketAddr)>, - response_receiver: Receiver<(Response, SocketAddr)>, + response_receiver: Receiver<(ConnectedResponse, SocketAddr)>, num_bound_sockets: Arc, ) { let mut rng = StdRng::from_entropy(); @@ -249,7 +249,7 @@ fn send_responses( config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - response_receiver: &Receiver<(Response, SocketAddr)>, + response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, local_responses: Drain<(Response, SocketAddr)>, ) { let mut responses_sent: usize = 0; @@ -257,9 +257,11 @@ fn send_responses( let mut cursor = Cursor::new(buffer); - let response_iterator = local_responses - .into_iter() - .chain(response_receiver.try_iter()); + let response_iterator = local_responses.into_iter().chain( + response_receiver + .try_iter() + .map(|(response, addr)| (response.into(), addr)), + ); for (response, src) in response_iterator { cursor.set_position(0); diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index c759b97..12b35e3 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -16,7 +16,7 @@ pub fn bench_announce_handler( bench_config: &BenchConfig, aquatic_config: &Config, request_sender: &Sender<(ConnectedRequest, SocketAddr)>, - response_receiver: &Receiver<(Response, SocketAddr)>, + response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], ) -> (usize, Duration) { @@ -36,10 +36,12 @@ pub fn bench_announce_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - request_sender.send((ConnectedRequest::Announce(request.clone()), *src)).unwrap(); + request_sender + .send((ConnectedRequest::Announce(request.clone()), *src)) + .unwrap(); } - while let Ok((Response::Announce(r), _)) = response_receiver.try_recv() { + while let Ok((ConnectedResponse::Announce(r), _)) = response_receiver.try_recv() { num_responses += 1; if let Some(last_peer) = r.peers.last() { @@ -51,7 +53,7 @@ pub fn bench_announce_handler( let total = bench_config.num_announce_requests * (round + 1); while num_responses < total { - if let Ok((Response::Announce(r), _)) = response_receiver.recv() { + if let Ok((ConnectedResponse::Announce(r), _)) = response_receiver.recv() { num_responses += 1; if let Some(last_peer) = r.peers.last() { @@ -99,7 +101,10 @@ pub fn create_requests( port: Port(rng.gen()), }; - requests.push((request, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)))); + requests.push(( + request, + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)), + )); } requests diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 26a5ad3..7b62152 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -16,7 +16,7 @@ pub fn bench_scrape_handler( bench_config: &BenchConfig, aquatic_config: &Config, request_sender: &Sender<(ConnectedRequest, SocketAddr)>, - response_receiver: &Receiver<(Response, SocketAddr)>, + response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, rng: &mut impl Rng, info_hashes: &[InfoHash], ) -> (usize, Duration) { @@ -41,10 +41,12 @@ pub fn bench_scrape_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - request_sender.send((ConnectedRequest::Scrape(request.clone()), *src)).unwrap(); + request_sender + .send((ConnectedRequest::Scrape(request.clone()), *src)) + .unwrap(); } - while let Ok((Response::Scrape(r), _)) = response_receiver.try_recv() { + while let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.try_recv() { num_responses += 1; if let Some(stat) = r.torrent_stats.last() { @@ -56,7 +58,7 @@ pub fn bench_scrape_handler( let total = bench_config.num_scrape_requests * (round + 1); while num_responses < total { - if let Ok((Response::Scrape(r), _)) = response_receiver.recv() { + if let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.recv() { num_responses += 1; if let Some(stat) = r.torrent_stats.last() { @@ -101,7 +103,10 @@ pub fn create_requests( info_hashes: request_info_hashes, }; - requests.push((request, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)))); + requests.push(( + request, + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)), + )); } requests