From 2e7c8ac90434ff17abf9325fa488e89d37bd43e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 19 Nov 2021 00:59:52 +0100 Subject: [PATCH] udp: reorder code in network.rs for better readability --- aquatic_udp/src/lib/network.rs | 278 ++++++++++++++++----------------- 1 file changed, 139 insertions(+), 139 deletions(-) diff --git a/aquatic_udp/src/lib/network.rs b/aquatic_udp/src/lib/network.rs index 2fb6769..704fb5a 100644 --- a/aquatic_udp/src/lib/network.rs +++ b/aquatic_udp/src/lib/network.rs @@ -110,145 +110,6 @@ impl PendingScrapeResponseMap { } } -pub fn handle_request( - config: &Config, - connections: &mut ConnectionMap, - pending_scrape_responses: &mut PendingScrapeResponseMap, - access_list_cache: &mut AccessListCache, - rng: &mut StdRng, - request_sender: &ConnectedRequestSender, - local_responses: &mut Vec<(Response, SocketAddr)>, - valid_until: ValidUntil, - res_request: Result, - src: SocketAddr, -) { - let access_list_mode = config.access_list.mode; - - match res_request { - Ok(Request::Connect(request)) => { - let connection_id = ConnectionId(rng.gen()); - - connections.insert(connection_id, src, valid_until); - - let response = Response::Connect(ConnectResponse { - connection_id, - transaction_id: request.transaction_id, - }); - - local_responses.push((response, src)) - } - Ok(Request::Announce(request)) => { - if connections.contains(request.connection_id, src) { - if access_list_cache - .load() - .allows(access_list_mode, &request.info_hash.0) - { - let worker_index = - RequestWorkerIndex::from_info_hash(config, request.info_hash); - - request_sender.try_send_to( - worker_index, - ConnectedRequest::Announce(request), - src, - ); - } else { - let response = Response::Error(ErrorResponse { - transaction_id: request.transaction_id, - message: "Info hash not allowed".into(), - }); - - local_responses.push((response, src)) - } - } - } - Ok(Request::Scrape(request)) => { - if connections.contains(request.connection_id, src) { - let mut requests: AHashIndexMap = - Default::default(); - - let transaction_id = request.transaction_id; - - for (i, info_hash) in request.info_hashes.into_iter().enumerate() { - let pending = requests - .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) - .or_insert_with(|| PendingScrapeRequest { - transaction_id, - info_hashes: BTreeMap::new(), - }); - - pending.info_hashes.insert(i, info_hash); - } - - pending_scrape_responses.prepare(transaction_id, requests.len(), valid_until); - - for (request_worker_index, request) in requests { - request_sender.try_send_to( - request_worker_index, - ConnectedRequest::Scrape(request), - src, - ); - } - } - } - Err(err) => { - ::log::debug!("Request::from_bytes error: {:?}", err); - - if let RequestParseError::Sendable { - connection_id, - transaction_id, - err, - } = err - { - if connections.contains(connection_id, src) { - let response = ErrorResponse { - transaction_id, - message: err.right_or("Parse error").into(), - }; - - local_responses.push((response.into(), src)); - } - } - } - } -} - -pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { - let socket = if config.network.address.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); - - if config.network.only_ipv6 { - socket.set_only_v6(true).expect("socket: set only ipv6"); - } - - socket.set_reuse_port(true).expect("socket: set reuse port"); - - socket - .set_nonblocking(true) - .expect("socket: set nonblocking"); - - socket - .bind(&config.network.address.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); - - let recv_buffer_size = config.network.socket_recv_buffer_size; - - if recv_buffer_size != 0 { - if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) { - ::log::error!( - "socket: failed setting recv buffer to {}: {:?}", - recv_buffer_size, - err - ); - } - } - - socket.into() -} - pub fn run_socket_worker( state: State, config: Config, @@ -412,6 +273,108 @@ fn read_requests( } } +pub fn handle_request( + config: &Config, + connections: &mut ConnectionMap, + pending_scrape_responses: &mut PendingScrapeResponseMap, + access_list_cache: &mut AccessListCache, + rng: &mut StdRng, + request_sender: &ConnectedRequestSender, + local_responses: &mut Vec<(Response, SocketAddr)>, + valid_until: ValidUntil, + res_request: Result, + src: SocketAddr, +) { + let access_list_mode = config.access_list.mode; + + match res_request { + Ok(Request::Connect(request)) => { + let connection_id = ConnectionId(rng.gen()); + + connections.insert(connection_id, src, valid_until); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + }); + + local_responses.push((response, src)) + } + Ok(Request::Announce(request)) => { + if connections.contains(request.connection_id, src) { + if access_list_cache + .load() + .allows(access_list_mode, &request.info_hash.0) + { + let worker_index = + RequestWorkerIndex::from_info_hash(config, request.info_hash); + + request_sender.try_send_to( + worker_index, + ConnectedRequest::Announce(request), + src, + ); + } else { + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".into(), + }); + + local_responses.push((response, src)) + } + } + } + Ok(Request::Scrape(request)) => { + if connections.contains(request.connection_id, src) { + let mut requests: AHashIndexMap = + Default::default(); + + let transaction_id = request.transaction_id; + + for (i, info_hash) in request.info_hashes.into_iter().enumerate() { + let pending = requests + .entry(RequestWorkerIndex::from_info_hash(&config, info_hash)) + .or_insert_with(|| PendingScrapeRequest { + transaction_id, + info_hashes: BTreeMap::new(), + }); + + pending.info_hashes.insert(i, info_hash); + } + + pending_scrape_responses.prepare(transaction_id, requests.len(), valid_until); + + for (request_worker_index, request) in requests { + request_sender.try_send_to( + request_worker_index, + ConnectedRequest::Scrape(request), + src, + ); + } + } + } + Err(err) => { + ::log::debug!("Request::from_bytes error: {:?}", err); + + if let RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } = err + { + if connections.contains(connection_id, src) { + let response = ErrorResponse { + transaction_id, + message: err.right_or("Parse error").into(), + }; + + local_responses.push((response.into(), src)); + } + } + } + } +} + #[inline] fn send_responses( state: &State, @@ -516,3 +479,40 @@ fn send_response( } } } + +pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { + let socket = if config.network.address.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } + .expect("create socket"); + + if config.network.only_ipv6 { + socket.set_only_v6(true).expect("socket: set only ipv6"); + } + + socket.set_reuse_port(true).expect("socket: set reuse port"); + + socket + .set_nonblocking(true) + .expect("socket: set nonblocking"); + + socket + .bind(&config.network.address.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + + let recv_buffer_size = config.network.socket_recv_buffer_size; + + if recv_buffer_size != 0 { + if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) { + ::log::error!( + "socket: failed setting recv buffer to {}: {:?}", + recv_buffer_size, + err + ); + } + } + + socket.into() +}