From f8713c09c1dcf893c33c6e13c8017dd5efb15274 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 12 Apr 2020 14:54:38 +0200 Subject: [PATCH] aquatic: handler: put responses in vector, send after releasing lock --- TODO.md | 2 -- aquatic/src/lib/handlers.rs | 46 ++++++++++++++++++++++--------------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/TODO.md b/TODO.md index 98df145..e308428 100644 --- a/TODO.md +++ b/TODO.md @@ -2,8 +2,6 @@ ## aquatic * `thread 'main' panicked at 'overflow when subtracting duration from instant', src/libstd/time.rs:374:9` -* Handler: put responses in vector and send them all together after releasing - lock? * Use bounded request channel? * Handle Ipv4 and Ipv6 peers. Probably split state. Ipv4 peers can't make use of Ipv6 ones. Ipv6 ones may or may note be able to make use of Ipv4 diff --git a/aquatic/src/lib/handlers.rs b/aquatic/src/lib/handlers.rs index 3a22844..28a01fe 100644 --- a/aquatic/src/lib/handlers.rs +++ b/aquatic/src/lib/handlers.rs @@ -23,6 +23,8 @@ pub fn handle( let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); + let mut responses: Vec<(Response, SocketAddr)> = Vec::new(); + let mut std_rng = StdRng::from_entropy(); let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap(); @@ -80,7 +82,7 @@ pub fn handle( &mut data, &mut std_rng, connect_requests.drain(..), - &response_sender + &mut responses ); handle_announce_requests( @@ -88,13 +90,21 @@ pub fn handle( &config, &mut small_rng, announce_requests.drain(..), - &response_sender + &mut responses ); handle_scrape_requests( &mut data, scrape_requests.drain(..), - &response_sender + &mut responses ); + + ::std::mem::drop(data); + + for r in responses.drain(..){ + if let Err(err) = response_sender.send(r){ + eprintln!("error sending response to channel: {}", err); + } + } } } @@ -104,11 +114,11 @@ pub fn handle_connect_requests( data: &mut MutexGuard, rng: &mut StdRng, requests: Drain<(ConnectRequest, SocketAddr)>, - response_sender: &Sender<(Response, SocketAddr)>, + responses: &mut Vec<(Response, SocketAddr)>, ){ let now = Time(Instant::now()); - for (request, src) in requests { + responses.extend(requests.map(|(request, src)| { let connection_id = ConnectionId(rng.gen()); let key = ConnectionKey { @@ -125,8 +135,8 @@ pub fn handle_connect_requests( } ); - response_sender.send((response, src)); - } + (response, src) + })); } @@ -136,9 +146,9 @@ pub fn handle_announce_requests( config: &Config, rng: &mut SmallRng, requests: Drain<(AnnounceRequest, SocketAddr)>, - response_sender: &Sender<(Response, SocketAddr)>, + responses: &mut Vec<(Response, SocketAddr)>, ){ - for (request, src) in requests { + responses.extend(requests.map(|(request, src)| { let connection_key = ConnectionKey { connection_id: request.connection_id, socket_addr: src, @@ -150,7 +160,7 @@ pub fn handle_announce_requests( message: "Connection invalid or expired".to_string() }; - response_sender.send((response.into(), src)); + return (response.into(), src); } let peer_key = PeerMapKey { @@ -161,7 +171,7 @@ pub fn handle_announce_requests( let peer = Peer::from_announce_and_ip(&request, src.ip()); let peer_status = peer.status; - let mut torrent_data = data.torrents + let torrent_data = data.torrents .entry(request.info_hash) .or_default(); @@ -210,8 +220,8 @@ pub fn handle_announce_requests( peers: response_peers }); - response_sender.send((response, src)); - } + (response, src) + })); } @@ -219,11 +229,11 @@ pub fn handle_announce_requests( pub fn handle_scrape_requests( data: &mut MutexGuard, requests: Drain<(ScrapeRequest, SocketAddr)>, - response_sender: &Sender<(Response, SocketAddr)>, + responses: &mut Vec<(Response, SocketAddr)>, ){ let empty_stats = create_torrent_scrape_statistics(0, 0); - for (request, src) in requests { + responses.extend(requests.map(|(request, src)|{ let connection_key = ConnectionKey { connection_id: request.connection_id, socket_addr: src, @@ -235,7 +245,7 @@ pub fn handle_scrape_requests( message: "Connection invalid or expired".to_string() }; - response_sender.send((response.into(), src)); + return (response.into(), src); } let mut stats: Vec = Vec::with_capacity( @@ -258,8 +268,8 @@ pub fn handle_scrape_requests( torrent_stats: stats, }); - response_sender.send((response, src)); - }; + (response, src) + })); }