aquatic_udp: add and use ConnectedResponse enum

This commit is contained in:
Joakim Frostegård 2021-10-18 01:25:04 +02:00
parent 7616df9686
commit de85feec9a
7 changed files with 47 additions and 21 deletions

View file

@ -35,6 +35,20 @@ pub enum ConnectedRequest {
Scrape(ScrapeRequest),
}
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape(ScrapeResponse),
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape(response) => Response::Scrape(response),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConnectionKey {
pub connection_id: ConnectionId,

View file

@ -16,7 +16,7 @@ pub fn handle_announce_requests(
torrents: &mut MutexGuard<TorrentMaps>,
rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>,
responses: &mut Vec<(Response, SocketAddr)>,
responses: &mut Vec<(ConnectedResponse, SocketAddr)>,
) {
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
@ -42,7 +42,7 @@ pub fn handle_announce_requests(
),
};
(Response::Announce(response), src)
(ConnectedResponse::Announce(response), src)
}));
}

View file

@ -23,12 +23,12 @@ pub fn run_request_worker(
state: State,
config: Config,
request_receiver: Receiver<(ConnectedRequest, SocketAddr)>,
response_sender: Sender<(Response, SocketAddr)>,
response_sender: Sender<(ConnectedResponse, SocketAddr)>,
) {
let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new();
let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new();
let mut responses: Vec<(Response, SocketAddr)> = Vec::new();
let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new();
let mut std_rng = StdRng::from_entropy();
let mut small_rng = SmallRng::from_rng(&mut std_rng).unwrap();

View file

@ -12,7 +12,7 @@ use crate::common::*;
pub fn handle_scrape_requests(
torrents: &mut MutexGuard<TorrentMaps>,
requests: Drain<(ScrapeRequest, SocketAddr)>,
responses: &mut Vec<(Response, SocketAddr)>,
responses: &mut Vec<(ConnectedResponse, SocketAddr)>,
) {
let empty_stats = create_torrent_scrape_statistics(0, 0);
@ -45,7 +45,7 @@ pub fn handle_scrape_requests(
}
}
let response = Response::Scrape(ScrapeResponse {
let response = ConnectedResponse::Scrape(ScrapeResponse {
transaction_id: request.transaction_id,
torrent_stats: stats,
});

View file

@ -23,7 +23,7 @@ pub fn run_socket_worker(
config: Config,
token_num: usize,
request_sender: Sender<(ConnectedRequest, SocketAddr)>,
response_receiver: Receiver<(Response, SocketAddr)>,
response_receiver: Receiver<(ConnectedResponse, SocketAddr)>,
num_bound_sockets: Arc<AtomicUsize>,
) {
let mut rng = StdRng::from_entropy();
@ -249,7 +249,7 @@ fn send_responses(
config: &Config,
socket: &mut UdpSocket,
buffer: &mut [u8],
response_receiver: &Receiver<(Response, SocketAddr)>,
response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>,
local_responses: Drain<(Response, SocketAddr)>,
) {
let mut responses_sent: usize = 0;
@ -257,9 +257,11 @@ fn send_responses(
let mut cursor = Cursor::new(buffer);
let response_iterator = local_responses
.into_iter()
.chain(response_receiver.try_iter());
let response_iterator = local_responses.into_iter().chain(
response_receiver
.try_iter()
.map(|(response, addr)| (response.into(), addr)),
);
for (response, src) in response_iterator {
cursor.set_position(0);

View file

@ -16,7 +16,7 @@ pub fn bench_announce_handler(
bench_config: &BenchConfig,
aquatic_config: &Config,
request_sender: &Sender<(ConnectedRequest, SocketAddr)>,
response_receiver: &Receiver<(Response, SocketAddr)>,
response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>,
rng: &mut impl Rng,
info_hashes: &[InfoHash],
) -> (usize, Duration) {
@ -36,10 +36,12 @@ pub fn bench_announce_handler(
for round in (0..bench_config.num_rounds).progress_with(pb) {
for request_chunk in requests.chunks(p) {
for (request, src) in request_chunk {
request_sender.send((ConnectedRequest::Announce(request.clone()), *src)).unwrap();
request_sender
.send((ConnectedRequest::Announce(request.clone()), *src))
.unwrap();
}
while let Ok((Response::Announce(r), _)) = response_receiver.try_recv() {
while let Ok((ConnectedResponse::Announce(r), _)) = response_receiver.try_recv() {
num_responses += 1;
if let Some(last_peer) = r.peers.last() {
@ -51,7 +53,7 @@ pub fn bench_announce_handler(
let total = bench_config.num_announce_requests * (round + 1);
while num_responses < total {
if let Ok((Response::Announce(r), _)) = response_receiver.recv() {
if let Ok((ConnectedResponse::Announce(r), _)) = response_receiver.recv() {
num_responses += 1;
if let Some(last_peer) = r.peers.last() {
@ -99,7 +101,10 @@ pub fn create_requests(
port: Port(rng.gen()),
};
requests.push((request, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1))));
requests.push((
request,
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)),
));
}
requests

View file

@ -16,7 +16,7 @@ pub fn bench_scrape_handler(
bench_config: &BenchConfig,
aquatic_config: &Config,
request_sender: &Sender<(ConnectedRequest, SocketAddr)>,
response_receiver: &Receiver<(Response, SocketAddr)>,
response_receiver: &Receiver<(ConnectedResponse, SocketAddr)>,
rng: &mut impl Rng,
info_hashes: &[InfoHash],
) -> (usize, Duration) {
@ -41,10 +41,12 @@ pub fn bench_scrape_handler(
for round in (0..bench_config.num_rounds).progress_with(pb) {
for request_chunk in requests.chunks(p) {
for (request, src) in request_chunk {
request_sender.send((ConnectedRequest::Scrape(request.clone()), *src)).unwrap();
request_sender
.send((ConnectedRequest::Scrape(request.clone()), *src))
.unwrap();
}
while let Ok((Response::Scrape(r), _)) = response_receiver.try_recv() {
while let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.try_recv() {
num_responses += 1;
if let Some(stat) = r.torrent_stats.last() {
@ -56,7 +58,7 @@ pub fn bench_scrape_handler(
let total = bench_config.num_scrape_requests * (round + 1);
while num_responses < total {
if let Ok((Response::Scrape(r), _)) = response_receiver.recv() {
if let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.recv() {
num_responses += 1;
if let Some(stat) = r.torrent_stats.last() {
@ -101,7 +103,10 @@ pub fn create_requests(
info_hashes: request_info_hashes,
};
requests.push((request, SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1))));
requests.push((
request,
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 1)),
));
}
requests