diff --git a/aquatic_udp/src/common.rs b/aquatic_udp/src/common.rs index 0b56c17..a32981a 100644 --- a/aquatic_udp/src/common.rs +++ b/aquatic_udp/src/common.rs @@ -31,15 +31,13 @@ impl Ip for Ipv6Addr { #[derive(Debug)] pub struct PendingScrapeRequest { - pub connection_id: ConnectionId, - pub transaction_id: TransactionId, + pub slab_key: usize, pub info_hashes: BTreeMap, } #[derive(Debug)] pub struct PendingScrapeResponse { - pub connection_id: ConnectionId, - pub transaction_id: TransactionId, + pub slab_key: usize, pub torrent_stats: BTreeMap, } @@ -151,7 +149,10 @@ impl PeerStatus { pub struct Statistics { pub requests_received: AtomicUsize, - pub responses_sent: AtomicUsize, + pub responses_sent_connect: AtomicUsize, + pub responses_sent_announce: AtomicUsize, + pub responses_sent_scrape: AtomicUsize, + pub responses_sent_error: AtomicUsize, pub bytes_received: AtomicUsize, pub bytes_sent: AtomicUsize, pub torrents: Vec, @@ -162,7 +163,10 @@ impl Statistics { pub fn new(num_request_workers: usize) -> Self { Self { requests_received: Default::default(), - responses_sent: Default::default(), + responses_sent_connect: Default::default(), + responses_sent_announce: Default::default(), + responses_sent_scrape: Default::default(), + responses_sent_error: Default::default(), bytes_received: Default::default(), bytes_sent: Default::default(), torrents: Self::create_atomic_usize_vec(num_request_workers), diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index d3158c8..92f4319 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -391,8 +391,7 @@ fn handle_scrape_request( } PendingScrapeResponse { - connection_id: request.connection_id, - transaction_id: request.transaction_id, + slab_key: request.slab_key, torrent_stats, } } diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index 2ba7a23..130e11b 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -12,6 +12,7 @@ use crossbeam_channel::Receiver; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; use rand::prelude::{Rng, SeedableRng, StdRng}; +use slab::Slab; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListCache; @@ -49,23 +50,21 @@ impl ConnectionMap { } #[derive(Debug)] -pub struct PendingScrapeResponseMapEntry { +pub struct PendingScrapeResponseSlabEntry { num_pending: usize, valid_until: ValidUntil, torrent_stats: BTreeMap, + transaction_id: TransactionId, } #[derive(Default)] -pub struct PendingScrapeResponseMap( - AHashIndexMap<(ConnectionId, TransactionId, SocketAddr), PendingScrapeResponseMapEntry>, -); +pub struct PendingScrapeResponseSlab(Slab); -impl PendingScrapeResponseMap { +impl PendingScrapeResponseSlab { pub fn prepare_split_requests( &mut self, config: &Config, request: ScrapeRequest, - addr: SocketAddr, valid_until: ValidUntil, ) -> impl IntoIterator { let mut split_requests: AHashIndexMap = @@ -73,54 +72,38 @@ impl PendingScrapeResponseMap { if request.info_hashes.is_empty() { ::log::warn!( - "Attempted to prepare PendingScrapeResponseMap entry with zero info hashes" + "Attempted to prepare PendingScrapeResponseSlab entry with zero info hashes" ); return split_requests; } - let connection_id = request.connection_id; - let transaction_id = request.transaction_id; + let vacant_entry = self.0.vacant_entry(); + let slab_key = vacant_entry.key(); for (i, info_hash) in request.info_hashes.into_iter().enumerate() { let split_request = split_requests .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) .or_insert_with(|| PendingScrapeRequest { - connection_id, - transaction_id, + slab_key, info_hashes: BTreeMap::new(), }); split_request.info_hashes.insert(i, info_hash); } - let key = (connection_id, transaction_id, addr); - - let entry = PendingScrapeResponseMapEntry { + vacant_entry.insert(PendingScrapeResponseSlabEntry { num_pending: split_requests.len(), valid_until, torrent_stats: Default::default(), - }; - - if let Some(previous_entry) = self.0.insert(key, entry) { - ::log::warn!( - "PendingScrapeResponseMap.prepare replaced previous entry {:?} for key {:?}", - previous_entry, - key - ); - } + transaction_id: request.transaction_id, + }); split_requests } - pub fn add_and_get_finished( - &mut self, - response: PendingScrapeResponse, - addr: SocketAddr, - ) -> Option { - let key = (response.connection_id, response.transaction_id, addr); - - let finished = if let Some(entry) = self.0.get_mut(&key) { + pub fn add_and_get_finished(&mut self, response: PendingScrapeResponse) -> Option { + let finished = if let Some(entry) = self.0.get_mut(response.slab_key) { entry.num_pending -= 1; entry @@ -130,18 +113,18 @@ impl PendingScrapeResponseMap { entry.num_pending == 0 } else { ::log::warn!( - "PendingScrapeResponseMap.add didn't find entry for key {:?}", - key + "PendingScrapeResponseSlab.add didn't find entry for key {:?}", + response.slab_key ); false }; if finished { - let entry = self.0.remove(&key).unwrap(); + let entry = self.0.remove(response.slab_key); Some(Response::Scrape(ScrapeResponse { - transaction_id: response.transaction_id, + transaction_id: entry.transaction_id, torrent_stats: entry.torrent_stats.into_values().collect(), })) } else { @@ -157,7 +140,7 @@ impl PendingScrapeResponseMap { if !keep { ::log::warn!( - "Removing PendingScrapeResponseMap entry while cleaning. {:?}: {:?}", + "Removing PendingScrapeResponseSlab entry while cleaning. {:?}: {:?}", k, v ); @@ -193,7 +176,7 @@ pub fn run_socket_worker( let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut connections = ConnectionMap::default(); - let mut pending_scrape_responses = PendingScrapeResponseMap::default(); + let mut pending_scrape_responses = PendingScrapeResponseSlab::default(); let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); @@ -278,7 +261,7 @@ fn read_requests( config: &Config, state: &State, connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, access_list_cache: &mut AccessListCache, rng: &mut StdRng, socket: &mut UdpSocket, @@ -375,7 +358,7 @@ fn read_requests( pub fn handle_request( config: &Config, connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, access_list_cache: &mut AccessListCache, rng: &mut StdRng, request_sender: &ConnectedRequestSender, @@ -429,7 +412,6 @@ pub fn handle_request( let split_requests = pending_scrape_responses.prepare_split_requests( config, request, - src, pending_scrape_valid_until, ); @@ -471,78 +453,31 @@ fn send_responses( socket: &mut UdpSocket, buffer: &mut [u8], response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>, - pending_scrape_responses: &mut PendingScrapeResponseMap, + pending_scrape_responses: &mut PendingScrapeResponseSlab, local_responses: Drain<(Response, SocketAddr)>, ) { - let mut responses_sent_ipv4: usize = 0; - let mut responses_sent_ipv6: usize = 0; - let mut bytes_sent_ipv4: usize = 0; - let mut bytes_sent_ipv6: usize = 0; - for (response, addr) in local_responses { - send_response( - config, - socket, - buffer, - &mut responses_sent_ipv4, - &mut responses_sent_ipv6, - &mut bytes_sent_ipv4, - &mut bytes_sent_ipv6, - response, - addr, - ); + send_response(state, config, socket, buffer, response, addr); } for (response, addr) in response_receiver.try_iter() { let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r, addr), + ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), }; if let Some(response) = opt_response { - send_response( - config, - socket, - buffer, - &mut responses_sent_ipv4, - &mut responses_sent_ipv6, - &mut bytes_sent_ipv4, - &mut bytes_sent_ipv6, - response, - addr, - ); + send_response(state, config, socket, buffer, response, addr); } } - - if config.statistics.active() { - state - .statistics_ipv4 - .responses_sent - .fetch_add(responses_sent_ipv4, Ordering::Release); - state - .statistics_ipv6 - .responses_sent - .fetch_add(responses_sent_ipv6, Ordering::Release); - state - .statistics_ipv4 - .bytes_sent - .fetch_add(bytes_sent_ipv4, Ordering::Release); - state - .statistics_ipv6 - .bytes_sent - .fetch_add(bytes_sent_ipv6, Ordering::Release); - } } fn send_response( + state: &State, config: &Config, socket: &mut UdpSocket, buffer: &mut [u8], - responses_sent_ipv4: &mut usize, - responses_sent_ipv6: &mut usize, - bytes_sent_ipv4: &mut usize, - bytes_sent_ipv6: &mut usize, response: Response, addr: SocketAddr, ) { @@ -572,15 +507,33 @@ fn send_response( let amt = cursor.position() as usize; match socket.send_to(&cursor.get_ref()[..amt], addr) { - Ok(amt) => { - if addr_is_ipv4 { - *responses_sent_ipv4 += 1; - *bytes_sent_ipv4 += amt; + Ok(amt) if config.statistics.active() => { + let stats = if addr_is_ipv4 { + &state.statistics_ipv4 } else { - *responses_sent_ipv6 += 1; - *bytes_sent_ipv6 += amt; + &state.statistics_ipv6 + }; + + stats.bytes_sent.fetch_add(amt, Ordering::Relaxed); + + match response { + Response::Connect(_) => { + stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed); + } + Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => { + stats + .responses_sent_announce + .fetch_add(1, Ordering::Relaxed); + } + Response::Scrape(_) => { + stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed); + } + Response::Error(_) => { + stats.responses_sent_error.fetch_add(1, Ordering::Relaxed); + } } } + Ok(_) => {} Err(err) => { ::log::info!("send_to error: {}", err); } @@ -638,7 +591,7 @@ mod tests { #[quickcheck] fn test_pending_scrape_response_map( - request_data: Vec<(i32, i64, SocketAddr, u8)>, + request_data: Vec<(i32, i64, u8)>, request_workers: u8, ) -> TestResult { if request_workers == 0 { @@ -651,11 +604,11 @@ mod tests { let valid_until = ValidUntil::new(1); - let mut map = PendingScrapeResponseMap::default(); + let mut map = PendingScrapeResponseSlab::default(); let mut requests = Vec::new(); - for (t, c, a, b) in request_data { + for (t, c, b) in request_data { if b == 0 { return TestResult::discard(); } @@ -674,32 +627,27 @@ mod tests { info_hashes, }; - requests.push((request, a)); + requests.push(request); } let mut all_split_requests = Vec::new(); - for (request, addr) in requests.iter() { - let split_requests = map.prepare_split_requests( - &config, - request.to_owned(), - addr.to_owned(), - valid_until, - ); + for request in requests.iter() { + let split_requests = + map.prepare_split_requests(&config, request.to_owned(), valid_until); - all_split_requests.push(( - addr, + all_split_requests.push( split_requests .into_iter() .collect::>(), - )); + ); } assert_eq!(map.0.len(), requests.len()); let mut responses = Vec::new(); - for (addr, split_requests) in all_split_requests { + for split_requests in all_split_requests { for (worker_index, split_request) in split_requests { assert!(worker_index.0 < request_workers as usize); @@ -719,12 +667,11 @@ mod tests { .collect(); let response = PendingScrapeResponse { - transaction_id: split_request.transaction_id, - connection_id: split_request.connection_id, + slab_key: split_request.slab_key, torrent_stats, }; - if let Some(response) = map.add_and_get_finished(response, addr.to_owned()) { + if let Some(response) = map.add_and_get_finished(response) { responses.push(response); } } diff --git a/aquatic_udp/src/workers/statistics.rs b/aquatic_udp/src/workers/statistics.rs index 52f5f4c..4477662 100644 --- a/aquatic_udp/src/workers/statistics.rs +++ b/aquatic_udp/src/workers/statistics.rs @@ -24,7 +24,10 @@ const STYLESHEET_CONTENTS: &str = concat!( #[derive(Clone, Copy, Debug)] struct CollectedStatistics { requests_per_second: f64, - responses_per_second: f64, + responses_per_second_connect: f64, + responses_per_second_announce: f64, + responses_per_second_scrape: f64, + responses_per_second_error: f64, bytes_received_per_second: f64, bytes_sent_per_second: f64, num_torrents: usize, @@ -33,10 +36,21 @@ struct CollectedStatistics { impl CollectedStatistics { fn from_shared(statistics: &Arc, last: &mut Instant) -> Self { - let requests_received = statistics.requests_received.fetch_and(0, Ordering::AcqRel) as f64; - let responses_sent = statistics.responses_sent.fetch_and(0, Ordering::AcqRel) as f64; - let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::AcqRel) as f64; - let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::AcqRel) as f64; + let requests_received = statistics.requests_received.fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_connect = statistics + .responses_sent_connect + .fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_announce = statistics + .responses_sent_announce + .fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_scrape = statistics + .responses_sent_scrape + .fetch_and(0, Ordering::Relaxed) as f64; + let responses_sent_error = statistics + .responses_sent_error + .fetch_and(0, Ordering::Relaxed) as f64; + let bytes_received = statistics.bytes_received.fetch_and(0, Ordering::Relaxed) as f64; + let bytes_sent = statistics.bytes_sent.fetch_and(0, Ordering::Relaxed) as f64; let num_torrents = Self::sum_atomic_usizes(&statistics.torrents); let num_peers = Self::sum_atomic_usizes(&statistics.peers); @@ -48,7 +62,10 @@ impl CollectedStatistics { Self { requests_per_second: requests_received / elapsed, - responses_per_second: responses_sent / elapsed, + responses_per_second_connect: responses_sent_connect / elapsed, + responses_per_second_announce: responses_sent_announce / elapsed, + responses_per_second_scrape: responses_sent_scrape / elapsed, + responses_per_second_error: responses_sent_error / elapsed, bytes_received_per_second: bytes_received / elapsed, bytes_sent_per_second: bytes_sent / elapsed, num_torrents, @@ -57,7 +74,7 @@ impl CollectedStatistics { } fn sum_atomic_usizes(values: &[AtomicUsize]) -> usize { - values.iter().map(|n| n.load(Ordering::Acquire)).sum() + values.iter().map(|n| n.load(Ordering::Relaxed)).sum() } } @@ -66,10 +83,23 @@ impl Into for CollectedStatistics { let rx_mbits = self.bytes_received_per_second * 8.0 / 1_000_000.0; let tx_mbits = self.bytes_sent_per_second * 8.0 / 1_000_000.0; + let responses_per_second_total = self.responses_per_second_connect + + self.responses_per_second_announce + + self.responses_per_second_scrape + + self.responses_per_second_error; + FormattedStatistics { requests_per_second: (self.requests_per_second as usize) .to_formatted_string(&Locale::en), - responses_per_second: (self.responses_per_second as usize) + responses_per_second_total: (responses_per_second_total as usize) + .to_formatted_string(&Locale::en), + responses_per_second_connect: (self.responses_per_second_connect as usize) + .to_formatted_string(&Locale::en), + responses_per_second_announce: (self.responses_per_second_announce as usize) + .to_formatted_string(&Locale::en), + responses_per_second_scrape: (self.responses_per_second_scrape as usize) + .to_formatted_string(&Locale::en), + responses_per_second_error: (self.responses_per_second_error as usize) .to_formatted_string(&Locale::en), rx_mbits: format!("{:.2}", rx_mbits), tx_mbits: format!("{:.2}", tx_mbits), @@ -82,7 +112,11 @@ impl Into for CollectedStatistics { #[derive(Clone, Debug, Serialize)] struct FormattedStatistics { requests_per_second: String, - responses_per_second: String, + responses_per_second_total: String, + responses_per_second_connect: String, + responses_per_second_announce: String, + responses_per_second_scrape: String, + responses_per_second_error: String, rx_mbits: String, tx_mbits: String, num_torrents: String, @@ -161,9 +195,27 @@ pub fn run_statistics_worker(config: Config, state: State) { } fn print_to_stdout(config: &Config, statistics: &FormattedStatistics) { + println!(" requests/second: {:>10}", statistics.requests_per_second); + println!(" responses/second"); println!( - " requests/second: {:>10}, responses/second: {:>10}", - statistics.requests_per_second, statistics.responses_per_second + " total: {:>10}", + statistics.responses_per_second_total + ); + println!( + " connect: {:>10}", + statistics.responses_per_second_connect + ); + println!( + " announce: {:>10}", + statistics.responses_per_second_announce + ); + println!( + " scrape: {:>10}", + statistics.responses_per_second_scrape + ); + println!( + " error: {:>10}", + statistics.responses_per_second_error ); println!( " bandwidth: {:>7} Mbit/s in, {:7} Mbit/s out", diff --git a/aquatic_udp/templates/statistics.html b/aquatic_udp/templates/statistics.html index 1f6e808..e62f697 100644 --- a/aquatic_udp/templates/statistics.html +++ b/aquatic_udp/templates/statistics.html @@ -39,8 +39,24 @@ { ipv4.requests_per_second } - Responses / second - { ipv4.responses_per_second } + Total responses / second + { ipv4.responses_per_second_total } + + + Connect responses / second + { ipv4.responses_per_second_connect } + + + Announce responses / second + { ipv4.responses_per_second_announce } + + + Scrape responses / second + { ipv4.responses_per_second_scrape } + + + Error responses / second + { ipv4.responses_per_second_error } Bandwidth (RX) @@ -73,8 +89,24 @@ { ipv6.requests_per_second } - Responses / second - { ipv6.responses_per_second } + Total responses / second + { ipv6.responses_per_second_total } + + + Connect responses / second + { ipv6.responses_per_second_connect } + + + Announce responses / second + { ipv6.responses_per_second_announce } + + + Scrape responses / second + { ipv6.responses_per_second_scrape } + + + Error responses / second + { ipv6.responses_per_second_error } Bandwidth (RX) diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 09fe3e5..1062ade 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -41,8 +41,7 @@ pub fn bench_scrape_handler( for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { let request = ConnectedRequest::Scrape(PendingScrapeRequest { - connection_id: request.connection_id, - transaction_id: request.transaction_id, + slab_key: 0, info_hashes: request .info_hashes .clone() diff --git a/aquatic_udp_protocol/src/response.rs b/aquatic_udp_protocol/src/response.rs index 99f3afa..8e9a280 100644 --- a/aquatic_udp_protocol/src/response.rs +++ b/aquatic_udp_protocol/src/response.rs @@ -97,7 +97,7 @@ impl Response { /// addresses. Clients seem not to support it very well, but due to a lack /// of alternative solutions, it is implemented here. #[inline] - pub fn write(self, bytes: &mut impl Write) -> Result<(), io::Error> { + pub fn write(&self, bytes: &mut impl Write) -> Result<(), io::Error> { match self { Response::Connect(r) => { bytes.write_i32::(0)?; @@ -111,7 +111,7 @@ impl Response { bytes.write_i32::(r.leechers.0)?; bytes.write_i32::(r.seeders.0)?; - for peer in r.peers { + for peer in r.peers.iter() { bytes.write_all(&peer.ip_address.octets())?; bytes.write_u16::(peer.port.0)?; } @@ -120,7 +120,7 @@ impl Response { bytes.write_i32::(2)?; bytes.write_i32::(r.transaction_id.0)?; - for torrent_stat in r.torrent_stats { + for torrent_stat in r.torrent_stats.iter() { bytes.write_i32::(torrent_stat.seeders.0)?; bytes.write_i32::(torrent_stat.completed.0)?; bytes.write_i32::(torrent_stat.leechers.0)?; @@ -139,7 +139,7 @@ impl Response { bytes.write_i32::(r.leechers.0)?; bytes.write_i32::(r.seeders.0)?; - for peer in r.peers { + for peer in r.peers.iter() { bytes.write_all(&peer.ip_address.octets())?; bytes.write_u16::(peer.port.0)?; }