aquatic: handler: put responses in vector, send after releasing lock

This commit is contained in:
Joakim Frostegård 2020-04-12 14:54:38 +02:00
parent 16155f3de5
commit f8713c09c1
2 changed files with 28 additions and 20 deletions

View file

@ -2,8 +2,6 @@
## aquatic
* `thread 'main' panicked at 'overflow when subtracting duration from instant', src/libstd/time.rs:374:9`
* Handler: put responses in vector and send them all together after releasing
lock?
* Use bounded request channel?
* Handle Ipv4 and Ipv6 peers. Probably split state. Ipv4 peers can't make
use of Ipv6 ones. Ipv6 ones may or may note be able to make use of Ipv4

View file

@ -23,6 +23,8 @@ pub fn handle(
let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new();
let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new();
let mut responses: Vec<(Response, SocketAddr)> = Vec::new();
let mut std_rng = StdRng::from_entropy();
let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap();
@ -80,7 +82,7 @@ pub fn handle(
&mut data,
&mut std_rng,
connect_requests.drain(..),
&response_sender
&mut responses
);
handle_announce_requests(
@ -88,13 +90,21 @@ pub fn handle(
&config,
&mut small_rng,
announce_requests.drain(..),
&response_sender
&mut responses
);
handle_scrape_requests(
&mut data,
scrape_requests.drain(..),
&response_sender
&mut responses
);
::std::mem::drop(data);
for r in responses.drain(..){
if let Err(err) = response_sender.send(r){
eprintln!("error sending response to channel: {}", err);
}
}
}
}
@ -104,11 +114,11 @@ pub fn handle_connect_requests(
data: &mut MutexGuard<HandlerData>,
rng: &mut StdRng,
requests: Drain<(ConnectRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>,
responses: &mut Vec<(Response, SocketAddr)>,
){
let now = Time(Instant::now());
for (request, src) in requests {
responses.extend(requests.map(|(request, src)| {
let connection_id = ConnectionId(rng.gen());
let key = ConnectionKey {
@ -125,8 +135,8 @@ pub fn handle_connect_requests(
}
);
response_sender.send((response, src));
}
(response, src)
}));
}
@ -136,9 +146,9 @@ pub fn handle_announce_requests(
config: &Config,
rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>,
responses: &mut Vec<(Response, SocketAddr)>,
){
for (request, src) in requests {
responses.extend(requests.map(|(request, src)| {
let connection_key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: src,
@ -150,7 +160,7 @@ pub fn handle_announce_requests(
message: "Connection invalid or expired".to_string()
};
response_sender.send((response.into(), src));
return (response.into(), src);
}
let peer_key = PeerMapKey {
@ -161,7 +171,7 @@ pub fn handle_announce_requests(
let peer = Peer::from_announce_and_ip(&request, src.ip());
let peer_status = peer.status;
let mut torrent_data = data.torrents
let torrent_data = data.torrents
.entry(request.info_hash)
.or_default();
@ -210,8 +220,8 @@ pub fn handle_announce_requests(
peers: response_peers
});
response_sender.send((response, src));
}
(response, src)
}));
}
@ -219,11 +229,11 @@ pub fn handle_announce_requests(
pub fn handle_scrape_requests(
data: &mut MutexGuard<HandlerData>,
requests: Drain<(ScrapeRequest, SocketAddr)>,
response_sender: &Sender<(Response, SocketAddr)>,
responses: &mut Vec<(Response, SocketAddr)>,
){
let empty_stats = create_torrent_scrape_statistics(0, 0);
for (request, src) in requests {
responses.extend(requests.map(|(request, src)|{
let connection_key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: src,
@ -235,7 +245,7 @@ pub fn handle_scrape_requests(
message: "Connection invalid or expired".to_string()
};
response_sender.send((response.into(), src));
return (response.into(), src);
}
let mut stats: Vec<TorrentScrapeStatistics> = Vec::with_capacity(
@ -258,8 +268,8 @@ pub fn handle_scrape_requests(
torrent_stats: stats,
});
response_sender.send((response, src));
};
(response, src)
}));
}