diff --git a/TODO.md b/TODO.md index 17e74c6..9cdd777 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,7 @@ * aquatic_udp glommio * privdrop + * disable by default! * access lists: * use arc-swap Cache diff --git a/aquatic_udp/src/lib/common/handlers.rs b/aquatic_udp/src/lib/common/handlers.rs index ce6446f..daf4ba9 100644 --- a/aquatic_udp/src/lib/common/handlers.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -28,7 +28,35 @@ impl Into for ConnectedResponse { } } -pub fn handle_announce_request( +pub fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrents: &mut TorrentMaps, + request: AnnounceRequest, + src: SocketAddr, + peer_valid_until: ValidUntil, +) -> AnnounceResponse { + match convert_ipv4_mapped_ipv6(src.ip()) { + IpAddr::V4(ip) => handle_announce_request_inner( + config, + rng, + &mut torrents.ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request_inner( + config, + rng, + &mut torrents.ipv6, + request, + ip, + peer_valid_until, + ), + } +} + +fn handle_announce_request_inner( config: &Config, rng: &mut SmallRng, torrents: &mut TorrentMap, diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 809fca0..a288755 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::rc::Rc; use std::time::Duration; @@ -81,36 +81,27 @@ async fn handle_request_stream( })() })); - while let Some((producer_index, request, addr)) = stream.next().await { - let response = match request { - ConnectedRequest::Announce(request) => ConnectedResponse::Announce(match addr.ip() { - IpAddr::V4(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv4, - request, - ip, - peer_valid_until.borrow().to_owned(), + while let Some((producer_index, request, src)) = stream.next().await { + let response = + match request { + ConnectedRequest::Announce(request) => { + ConnectedResponse::Announce(handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut(), + request, + src, + peer_valid_until.borrow().to_owned(), + )) + } + ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape( + handle_scrape_request(&mut torrents.borrow_mut(), src, request), ), - IpAddr::V6(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv6, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), - }), - ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape(handle_scrape_request( - &mut torrents.borrow_mut(), - addr, - request, - )), - }; + }; ::log::debug!("preparing to send response to channel: {:?}", response); - if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { + if let Err(err) = response_senders.try_send_to(producer_index, (response, src)) { ::log::warn!("response_sender.try_send: {:?}", err); } diff --git a/aquatic_udp/src/lib/mio/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers.rs similarity index 81% rename from aquatic_udp/src/lib/mio/handlers/mod.rs rename to aquatic_udp/src/lib/mio/handlers.rs index bd97161..99023fa 100644 --- a/aquatic_udp/src/lib/mio/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::time::Duration; +use aquatic_common::ValidUntil; use crossbeam_channel::{Receiver, Sender}; use rand::{rngs::SmallRng, SeedableRng}; @@ -10,9 +11,6 @@ use crate::common::handlers::*; use crate::config::Config; use crate::mio::common::*; -mod announce; -use announce::handle_announce_requests; - pub fn run_request_worker( state: State, config: Config, @@ -66,19 +64,26 @@ pub fn run_request_worker( { let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock()); - handle_announce_requests( - &config, - &mut torrents, - &mut small_rng, - announce_requests.drain(..), - &mut responses, - ); + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + responses.extend(announce_requests.drain(..).map(|(request, src)| { + let response = handle_announce_request( + &config, + &mut small_rng, + &mut torrents, + request, + src, + peer_valid_until, + ); + + (ConnectedResponse::Announce(response), src) + })); responses.extend(scrape_requests.drain(..).map(|(request, src)| { - ( - ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), - src, - ) + let response = + ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)); + + (response, src) })); } diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs deleted file mode 100644 index 11fb19a..0000000 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::net::{IpAddr, SocketAddr}; -use std::vec::Drain; - -use parking_lot::MutexGuard; -use rand::rngs::SmallRng; - -use aquatic_common::convert_ipv4_mapped_ipv6; -use aquatic_udp_protocol::*; - -use crate::common::handlers::handle_announce_request; -use crate::common::handlers::*; -use crate::common::*; -use crate::config::Config; - -#[inline] -pub fn handle_announce_requests( - config: &Config, - torrents: &mut MutexGuard, - rng: &mut SmallRng, - requests: Drain<(AnnounceRequest, SocketAddr)>, - responses: &mut Vec<(ConnectedResponse, SocketAddr)>, -) { - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - - responses.extend(requests.map(|(request, src)| { - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - let response = match peer_ip { - IpAddr::V4(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv4, - request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv6, - request, - ip, - peer_valid_until, - ), - }; - - (ConnectedResponse::Announce(response), src) - })); -}