aquatic_ws: split torrent state by ipv4/ipv6

This commit is contained in:
Joakim Frostegård 2020-05-23 15:15:01 +02:00
parent 7430c23ccc
commit 416d61a2b2
4 changed files with 42 additions and 19 deletions

View file

@ -1,9 +1,9 @@
# TODO # TODO
## aquatic_ws ## aquatic_ws
* ipv4 / ipv6 split state?
* network * network
* send/recv buffer size config * send/recv buffer size config
* ipv6_only setting
* is it even necessary to check if event is readable in poll, since that * is it even necessary to check if event is readable in poll, since that
is all we're listening for? is all we're listening for?
* panic/error in workers: print error, exit program with non-zero exit code * panic/error in workers: print error, exit program with non-zero exit code

View file

@ -83,16 +83,23 @@ impl Default for TorrentData {
pub type TorrentMap = HashMap<InfoHash, TorrentData>; pub type TorrentMap = HashMap<InfoHash, TorrentData>;
#[derive(Default)]
pub struct TorrentMaps {
pub ipv4: TorrentMap,
pub ipv6: TorrentMap,
}
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
pub torrents: Arc<Mutex<TorrentMap>>, pub torrent_maps: Arc<Mutex<TorrentMaps>>,
} }
impl Default for State { impl Default for State {
fn default() -> Self { fn default() -> Self {
Self { Self {
torrents: Arc::new(Mutex::new(HashMap::new())), torrent_maps: Arc::new(Mutex::new(TorrentMaps::default())),
} }
} }
} }

View file

@ -30,7 +30,7 @@ pub fn run_request_worker(
); );
loop { loop {
let mut opt_torrent_map_guard: Option<MutexGuard<TorrentMap>> = None; let mut opt_torrent_map_guard: Option<MutexGuard<TorrentMaps>> = None;
for i in 0..config.handlers.max_requests_per_iter { for i in 0..config.handlers.max_requests_per_iter {
let opt_in_message = if i == 0 { let opt_in_message = if i == 0 {
@ -47,7 +47,7 @@ pub fn run_request_worker(
scrape_requests.push((meta, r)); scrape_requests.push((meta, r));
}, },
None => { None => {
if let Some(torrent_guard) = state.torrents.try_lock(){ if let Some(torrent_guard) = state.torrent_maps.try_lock(){
opt_torrent_map_guard = Some(torrent_guard); opt_torrent_map_guard = Some(torrent_guard);
break break
@ -57,7 +57,7 @@ pub fn run_request_worker(
} }
let mut torrent_map_guard = opt_torrent_map_guard let mut torrent_map_guard = opt_torrent_map_guard
.unwrap_or_else(|| state.torrents.lock()); .unwrap_or_else(|| state.torrent_maps.lock());
handle_announce_requests( handle_announce_requests(
&config, &config,
@ -86,7 +86,7 @@ pub fn run_request_worker(
pub fn handle_announce_requests( pub fn handle_announce_requests(
config: &Config, config: &Config,
rng: &mut impl Rng, rng: &mut impl Rng,
torrents: &mut TorrentMap, torrent_maps: &mut TorrentMaps,
messages_out: &mut Vec<(ConnectionMeta, OutMessage)>, messages_out: &mut Vec<(ConnectionMeta, OutMessage)>,
requests: Drain<(ConnectionMeta, AnnounceRequest)>, requests: Drain<(ConnectionMeta, AnnounceRequest)>,
){ ){
@ -96,8 +96,11 @@ pub fn handle_announce_requests(
let info_hash = request.info_hash; let info_hash = request.info_hash;
let peer_id = request.peer_id; let peer_id = request.peer_id;
let torrent_data = torrents.entry(info_hash) let torrent_data: &mut TorrentData = if sender_meta.peer_addr.is_ipv4(){
.or_default(); torrent_maps.ipv4.entry(info_hash).or_default()
} else {
torrent_maps.ipv6.entry(info_hash).or_default()
};
// If there is already a peer with this peer_id, check that socket // If there is already a peer with this peer_id, check that socket
// addr is same as that of request sender. Otherwise, ignore request. // addr is same as that of request sender. Otherwise, ignore request.
@ -213,7 +216,7 @@ pub fn handle_announce_requests(
pub fn handle_scrape_requests( pub fn handle_scrape_requests(
config: &Config, config: &Config,
torrents: &mut TorrentMap, torrent_maps: &mut TorrentMaps,
messages_out: &mut Vec<(ConnectionMeta, OutMessage)>, messages_out: &mut Vec<(ConnectionMeta, OutMessage)>,
requests: Drain<(ConnectionMeta, ScrapeRequest)>, requests: Drain<(ConnectionMeta, ScrapeRequest)>,
){ ){
@ -226,10 +229,16 @@ pub fn handle_scrape_requests(
files: HashMap::with_capacity(num_to_take), files: HashMap::with_capacity(num_to_take),
}; };
let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4(){
&mut torrent_maps.ipv4
} else {
&mut torrent_maps.ipv6
};
// If request.info_hashes is empty, don't return scrape for all // If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive. // torrents, even though reference server does it. It is too expensive.
for info_hash in request.info_hashes.into_iter().take(num_to_take){ for info_hash in request.info_hashes.into_iter().take(num_to_take){
if let Some(torrent_data) = torrents.get(&info_hash){ if let Some(torrent_data) = torrent_map.get(&info_hash){
let stats = ScrapeStatistics { let stats = ScrapeStatistics {
complete: torrent_data.num_seeders, complete: torrent_data.num_seeders,
downloaded: 0, // No implementation planned downloaded: 0, // No implementation planned

View file

@ -4,17 +4,24 @@ use crate::common::*;
pub fn clean_torrents(state: &State){ pub fn clean_torrents(state: &State){
let mut torrents = state.torrents.lock(); fn clean_torrent_map(
torrent_map: &mut TorrentMap,
){
let now = Instant::now();
let now = Instant::now(); torrent_map.retain(|_, torrent_data| {
torrent_data.peers.retain(|_, peer| {
peer.valid_until.0 >= now
});
torrents.retain(|_, torrent_data| { !torrent_data.peers.is_empty()
torrent_data.peers.retain(|_, peer| {
peer.valid_until.0 >= now
}); });
!torrent_data.peers.is_empty() torrent_map.shrink_to_fit();
}); }
torrents.shrink_to_fit(); let mut torrent_maps = state.torrent_maps.lock();
clean_torrent_map(&mut torrent_maps.ipv4);
clean_torrent_map(&mut torrent_maps.ipv6);
} }