From 416d61a2b2f41c761f228cc4618ff56c143c4708 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 May 2020 15:15:01 +0200 Subject: [PATCH] aquatic_ws: split torrent state by ipv4/ipv6 --- TODO.md | 2 +- aquatic_ws/src/lib/common.rs | 11 +++++++++-- aquatic_ws/src/lib/handler.rs | 25 +++++++++++++++++-------- aquatic_ws/src/lib/tasks.rs | 23 +++++++++++++++-------- 4 files changed, 42 insertions(+), 19 deletions(-) diff --git a/TODO.md b/TODO.md index 536d3a4..3c65a71 100644 --- a/TODO.md +++ b/TODO.md @@ -1,9 +1,9 @@ # TODO ## aquatic_ws -* ipv4 / ipv6 split state? * network * send/recv buffer size config + * ipv6_only setting * is it even necessary to check if event is readable in poll, since that is all we're listening for? * panic/error in workers: print error, exit program with non-zero exit code diff --git a/aquatic_ws/src/lib/common.rs b/aquatic_ws/src/lib/common.rs index 60d51bd..2e3ad00 100644 --- a/aquatic_ws/src/lib/common.rs +++ b/aquatic_ws/src/lib/common.rs @@ -83,16 +83,23 @@ impl Default for TorrentData { pub type TorrentMap = HashMap; +#[derive(Default)] +pub struct TorrentMaps { + pub ipv4: TorrentMap, + pub ipv6: TorrentMap, +} + + #[derive(Clone)] pub struct State { - pub torrents: Arc>, + pub torrent_maps: Arc>, } impl Default for State { fn default() -> Self { Self { - torrents: Arc::new(Mutex::new(HashMap::new())), + torrent_maps: Arc::new(Mutex::new(TorrentMaps::default())), } } } diff --git a/aquatic_ws/src/lib/handler.rs b/aquatic_ws/src/lib/handler.rs index 82875c5..71413c9 100644 --- a/aquatic_ws/src/lib/handler.rs +++ b/aquatic_ws/src/lib/handler.rs @@ -30,7 +30,7 @@ pub fn run_request_worker( ); loop { - let mut opt_torrent_map_guard: Option> = None; + let mut opt_torrent_map_guard: Option> = None; for i in 0..config.handlers.max_requests_per_iter { let opt_in_message = if i == 0 { @@ -47,7 +47,7 @@ pub fn run_request_worker( scrape_requests.push((meta, r)); }, None => { - if let Some(torrent_guard) = state.torrents.try_lock(){ + if let Some(torrent_guard) = state.torrent_maps.try_lock(){ opt_torrent_map_guard = Some(torrent_guard); break @@ -57,7 +57,7 @@ pub fn run_request_worker( } let mut torrent_map_guard = opt_torrent_map_guard - .unwrap_or_else(|| state.torrents.lock()); + .unwrap_or_else(|| state.torrent_maps.lock()); handle_announce_requests( &config, @@ -86,7 +86,7 @@ pub fn run_request_worker( pub fn handle_announce_requests( config: &Config, rng: &mut impl Rng, - torrents: &mut TorrentMap, + torrent_maps: &mut TorrentMaps, messages_out: &mut Vec<(ConnectionMeta, OutMessage)>, requests: Drain<(ConnectionMeta, AnnounceRequest)>, ){ @@ -96,8 +96,11 @@ pub fn handle_announce_requests( let info_hash = request.info_hash; let peer_id = request.peer_id; - let torrent_data = torrents.entry(info_hash) - .or_default(); + let torrent_data: &mut TorrentData = if sender_meta.peer_addr.is_ipv4(){ + torrent_maps.ipv4.entry(info_hash).or_default() + } else { + torrent_maps.ipv6.entry(info_hash).or_default() + }; // If there is already a peer with this peer_id, check that socket // addr is same as that of request sender. Otherwise, ignore request. @@ -213,7 +216,7 @@ pub fn handle_announce_requests( pub fn handle_scrape_requests( config: &Config, - torrents: &mut TorrentMap, + torrent_maps: &mut TorrentMaps, messages_out: &mut Vec<(ConnectionMeta, OutMessage)>, requests: Drain<(ConnectionMeta, ScrapeRequest)>, ){ @@ -226,10 +229,16 @@ pub fn handle_scrape_requests( files: HashMap::with_capacity(num_to_take), }; + let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4(){ + &mut torrent_maps.ipv4 + } else { + &mut torrent_maps.ipv6 + }; + // If request.info_hashes is empty, don't return scrape for all // torrents, even though reference server does it. It is too expensive. for info_hash in request.info_hashes.into_iter().take(num_to_take){ - if let Some(torrent_data) = torrents.get(&info_hash){ + if let Some(torrent_data) = torrent_map.get(&info_hash){ let stats = ScrapeStatistics { complete: torrent_data.num_seeders, downloaded: 0, // No implementation planned diff --git a/aquatic_ws/src/lib/tasks.rs b/aquatic_ws/src/lib/tasks.rs index 20cfe2e..e91e5e4 100644 --- a/aquatic_ws/src/lib/tasks.rs +++ b/aquatic_ws/src/lib/tasks.rs @@ -4,17 +4,24 @@ use crate::common::*; pub fn clean_torrents(state: &State){ - let mut torrents = state.torrents.lock(); + fn clean_torrent_map( + torrent_map: &mut TorrentMap, + ){ + let now = Instant::now(); - let now = Instant::now(); + torrent_map.retain(|_, torrent_data| { + torrent_data.peers.retain(|_, peer| { + peer.valid_until.0 >= now + }); - torrents.retain(|_, torrent_data| { - torrent_data.peers.retain(|_, peer| { - peer.valid_until.0 >= now + !torrent_data.peers.is_empty() }); - !torrent_data.peers.is_empty() - }); + torrent_map.shrink_to_fit(); + } - torrents.shrink_to_fit(); + let mut torrent_maps = state.torrent_maps.lock(); + + clean_torrent_map(&mut torrent_maps.ipv4); + clean_torrent_map(&mut torrent_maps.ipv6); } \ No newline at end of file