diff --git a/Cargo.lock b/Cargo.lock index 37e798c..62ea631 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -182,8 +182,6 @@ dependencies = [ "bytemuck", "cfg-if", "crossbeam-channel", - "futures-lite", - "glommio", "hex", "histogram", "io-uring", @@ -208,6 +206,7 @@ dependencies = [ "anyhow", "aquatic_cli_helpers", "aquatic_udp", + "aquatic_udp_protocol", "crossbeam-channel", "indicatif", "mimalloc", diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index d7a7b90..f2756e4 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -17,9 +17,8 @@ name = "aquatic_udp" [features] default = ["with-mio"] cpu-pinning = ["aquatic_common/cpu-pinning"] -with-glommio = ["cpu-pinning", "glommio", "futures-lite"] -with-mio = ["crossbeam-channel", "histogram", "mio", "socket2"] -with-io-uring = ["crossbeam-channel", "histogram", "socket2", "io-uring", "libc", "bytemuck"] +with-mio = ["mio"] +with-io-uring = ["io-uring", "libc", "bytemuck"] [dependencies] anyhow = "1" @@ -27,7 +26,9 @@ aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" cfg-if = "1" +crossbeam-channel = "0.5" hex = "0.4" +histogram = "0.6" log = "0.4" mimalloc = { version = "0.1", default-features = false } parking_lot = "0.11" @@ -35,11 +36,7 @@ rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } slab = "0.4" signal-hook = { version = "0.3" } - -# mio / io-uring -crossbeam-channel = { version = "0.5", optional = true } -histogram = { version = "0.6", optional = true } -socket2 = { version = "0.4.1", features = ["all"], optional = true } +socket2 = { version = "0.4.1", features = ["all"] } # mio mio = { version = "0.7", features = ["udp", "os-poll", "os-util"], optional = true } @@ -49,10 +46,6 @@ io-uring = { version = "0.5", optional = true } libc = { version = "0.2", optional = true } bytemuck = { version = "1", optional = true } -# glommio -glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } -futures-lite = { version = "1", optional = true } - [dev-dependencies] quickcheck = "1.0" quickcheck_macros = "1.0" diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index ea55cce..8a29d21 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -1,21 +1,52 @@ use std::hash::Hash; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; use std::time::Instant; +use parking_lot::Mutex; +use socket2::{Domain, Protocol, Socket, Type}; + use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap}; use aquatic_common::AHashIndexMap; - -pub use aquatic_common::{access_list::AccessList, ValidUntil}; -pub use aquatic_udp_protocol::*; +use aquatic_common::ValidUntil; +use aquatic_udp_protocol::*; use crate::config::Config; -pub mod handlers; pub mod network; pub const MAX_PACKET_SIZE: usize = 8192; +#[derive(Debug)] +pub enum ConnectedRequest { + Announce(AnnounceRequest), + Scrape { + request: ScrapeRequest, + /// Currently only used by glommio implementation + original_indices: Vec, + }, +} + +#[derive(Debug)] +pub enum ConnectedResponse { + Announce(AnnounceResponse), + Scrape { + response: ScrapeResponse, + /// Currently only used by glommio implementation + original_indices: Vec, + }, +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::Announce(response) => Response::Announce(response), + Self::Scrape { response, .. } => Response::Scrape(response), + } + } +} + pub trait Ip: Hash + PartialEq + Eq + Clone + Copy { fn ip_addr(self) -> IpAddr; } @@ -160,6 +191,43 @@ impl TorrentMaps { } } +#[derive(Default)] +pub struct Statistics { + pub requests_received: AtomicUsize, + pub responses_sent: AtomicUsize, + pub bytes_received: AtomicUsize, + pub bytes_sent: AtomicUsize, +} + +#[derive(Clone)] +pub struct State { + pub access_list: Arc, + pub torrents: Arc>, + pub statistics: Arc, +} + +impl Default for State { + fn default() -> Self { + Self { + access_list: Arc::new(AccessListArcSwap::default()), + torrents: Arc::new(Mutex::new(TorrentMaps::default())), + statistics: Arc::new(Statistics::default()), + } + } +} +pub fn ip_version_from_ip(ip: IpAddr) -> IpVersion { + match ip { + IpAddr::V4(_) => IpVersion::IPv4, + IpAddr::V6(ip) => { + if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() { + IpVersion::IPv4 + } else { + IpVersion::IPv6 + } + } + } +} + #[cfg(test)] mod tests { use std::net::{IpAddr, Ipv6Addr}; diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs index e0cd81e..ca3edaf 100644 --- a/aquatic_udp/src/lib/common/network.rs +++ b/aquatic_udp/src/lib/common/network.rs @@ -1,8 +1,13 @@ use std::{net::SocketAddr, time::Instant}; +use aquatic_common::access_list::AccessListCache; use aquatic_common::AHashIndexMap; -pub use aquatic_common::{access_list::AccessList, ValidUntil}; -pub use aquatic_udp_protocol::*; +use aquatic_common::ValidUntil; +use aquatic_udp_protocol::*; +use crossbeam_channel::Sender; +use rand::{prelude::StdRng, Rng}; + +use crate::common::*; #[derive(Default)] pub struct ConnectionMap(AHashIndexMap<(ConnectionId, SocketAddr), ValidUntil>); @@ -28,3 +33,117 @@ impl ConnectionMap { self.0.shrink_to_fit(); } } + +pub fn handle_request( + config: &Config, + connections: &mut ConnectionMap, + access_list_cache: &mut AccessListCache, + rng: &mut StdRng, + request_sender: &Sender<(ConnectedRequest, SocketAddr)>, + local_responses: &mut Vec<(Response, SocketAddr)>, + valid_until: ValidUntil, + res_request: Result, + src: SocketAddr, +) { + let access_list_mode = config.access_list.mode; + + match res_request { + Ok(Request::Connect(request)) => { + let connection_id = ConnectionId(rng.gen()); + + connections.insert(connection_id, src, valid_until); + + let response = Response::Connect(ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + }); + + local_responses.push((response, src)) + } + Ok(Request::Announce(request)) => { + if connections.contains(request.connection_id, src) { + if access_list_cache + .load() + .allows(access_list_mode, &request.info_hash.0) + { + if let Err(err) = + request_sender.try_send((ConnectedRequest::Announce(request), src)) + { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } else { + let response = Response::Error(ErrorResponse { + transaction_id: request.transaction_id, + message: "Info hash not allowed".into(), + }); + + local_responses.push((response, src)) + } + } + } + Ok(Request::Scrape(request)) => { + if connections.contains(request.connection_id, src) { + let request = ConnectedRequest::Scrape { + request, + original_indices: Vec::new(), + }; + + if let Err(err) = request_sender.try_send((request, src)) { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } + } + Err(err) => { + ::log::debug!("Request::from_bytes error: {:?}", err); + + if let RequestParseError::Sendable { + connection_id, + transaction_id, + err, + } = err + { + if connections.contains(connection_id, src) { + let response = ErrorResponse { + transaction_id, + message: err.right_or("Parse error").into(), + }; + + local_responses.push((response.into(), src)); + } + } + } + } +} + +pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { + let socket = if config.network.address.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } + .expect("create socket"); + + socket.set_reuse_port(true).expect("socket: set reuse port"); + + socket + .set_nonblocking(true) + .expect("socket: set nonblocking"); + + socket + .bind(&config.network.address.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + + let recv_buffer_size = config.network.socket_recv_buffer_size; + + if recv_buffer_size != 0 { + if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) { + ::log::error!( + "socket: failed setting recv buffer to {}: {:?}", + recv_buffer_size, + err + ); + } + } + + socket.into() +} diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index 45c150d..f6c0247 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -18,9 +18,7 @@ pub struct Config { pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, - #[cfg(any(feature = "with-mio", feature = "with-io-uring"))] pub handlers: HandlerConfig, - #[cfg(any(feature = "with-mio", feature = "with-io-uring"))] pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, @@ -70,7 +68,6 @@ pub struct ProtocolConfig { pub peer_announce_interval: i32, } -#[cfg(any(feature = "with-mio", feature = "with-io-uring"))] #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct HandlerConfig { @@ -80,7 +77,6 @@ pub struct HandlerConfig { pub channel_recv_timeout_microseconds: u64, } -#[cfg(any(feature = "with-mio", feature = "with-io-uring"))] #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct StatisticsConfig { @@ -109,9 +105,7 @@ impl Default for Config { log_level: LogLevel::Error, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), - #[cfg(any(feature = "with-mio", feature = "with-io-uring"))] handlers: HandlerConfig::default(), - #[cfg(any(feature = "with-mio", feature = "with-io-uring"))] statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), @@ -143,7 +137,6 @@ impl Default for ProtocolConfig { } } -#[cfg(any(feature = "with-mio", feature = "with-io-uring"))] impl Default for HandlerConfig { fn default() -> Self { Self { @@ -153,7 +146,6 @@ impl Default for HandlerConfig { } } -#[cfg(any(feature = "with-mio", feature = "with-io-uring"))] impl Default for StatisticsConfig { fn default() -> Self { Self { interval: 0 } diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs deleted file mode 100644 index 3506b09..0000000 --- a/aquatic_udp/src/lib/glommio/common.rs +++ /dev/null @@ -1,8 +0,0 @@ -use std::sync::Arc; - -use aquatic_common::access_list::AccessListArcSwap; - -#[derive(Default, Clone)] -pub struct State { - pub access_list: Arc, -} diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs deleted file mode 100644 index 55adc4a..0000000 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ /dev/null @@ -1,117 +0,0 @@ -use std::cell::RefCell; -use std::net::SocketAddr; -use std::rc::Rc; -use std::time::Duration; - -use futures_lite::{Stream, StreamExt}; -use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::timer::TimerActionRepeat; -use glommio::{enclose, prelude::*}; -use rand::prelude::SmallRng; -use rand::SeedableRng; - -use crate::common::handlers::handle_announce_request; -use crate::common::handlers::*; -use crate::common::*; -use crate::config::Config; - -use super::common::State; - -pub async fn run_request_worker( - config: Config, - state: State, - request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, - response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, -) { - let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); - let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); - let response_senders = Rc::new(response_senders); - - let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - - // Periodically clean torrents - TimerActionRepeat::repeat(enclose!((config, torrents, state) move || { - enclose!((config, torrents, state) move || async move { - torrents.borrow_mut().clean(&config, &state.access_list); - - Some(Duration::from_secs(config.cleaning.torrent_cleaning_interval)) - })() - })); - - let mut handles = Vec::new(); - - for (_, receiver) in request_receivers.streams() { - let handle = spawn_local(handle_request_stream( - config.clone(), - torrents.clone(), - response_senders.clone(), - receiver, - )) - .detach(); - - handles.push(handle); - } - - for handle in handles { - handle.await; - } -} - -async fn handle_request_stream( - config: Config, - torrents: Rc>, - response_senders: Rc>, - mut stream: S, -) where - S: Stream + ::std::marker::Unpin, -{ - let mut rng = SmallRng::from_entropy(); - - let max_peer_age = config.cleaning.max_peer_age; - let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); - - TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { - enclose!((peer_valid_until) move || async move { - *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); - - Some(Duration::from_secs(1)) - })() - })); - - while let Some((producer_index, request, src)) = stream.next().await { - let response = match request { - ConnectedRequest::Announce(request) => { - ConnectedResponse::Announce(handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut(), - request, - src, - peer_valid_until.borrow().to_owned(), - )) - } - ConnectedRequest::Scrape { - request, - original_indices, - } => { - let response = handle_scrape_request(&mut torrents.borrow_mut(), src, request); - - ConnectedResponse::Scrape { - response, - original_indices, - } - } - }; - - ::log::debug!("preparing to send response to channel: {:?}", response); - - if let Err(err) = response_senders - .send_to(producer_index, (response, src)) - .await - { - ::log::error!("response_sender.send: {:?}", err); - } - - yield_if_needed().await; - } -} diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs deleted file mode 100644 index 8cc8d27..0000000 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ /dev/null @@ -1,135 +0,0 @@ -use std::sync::{atomic::AtomicUsize, Arc}; - -use aquatic_common::access_list::update_access_list; -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use aquatic_common::privileges::drop_privileges_after_socket_binding; -use glommio::channels::channel_mesh::MeshBuilder; -use glommio::prelude::*; -use signal_hook::consts::SIGUSR1; -use signal_hook::iterator::Signals; - -use crate::config::Config; - -use self::common::State; - -mod common; -pub mod handlers; -pub mod network; - -pub const SHARED_CHANNEL_SIZE: usize = 4096; - -pub fn run(config: Config) -> ::anyhow::Result<()> { - let state = State::default(); - - update_access_list(&config.access_list, &state.access_list)?; - - let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; - - { - let config = config.clone(); - let state = state.clone(); - - ::std::thread::spawn(move || run_inner(config, state)); - } - - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - _ => unreachable!(), - } - } - - Ok(()) -} - -pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { - let num_peers = config.socket_workers + config.request_workers; - - let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); - let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); - - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - - let mut executors = Vec::new(); - - for i in 0..(config.socket_workers) { - let config = config.clone(); - let state = state.clone(); - let request_mesh_builder = request_mesh_builder.clone(); - let response_mesh_builder = response_mesh_builder.clone(); - let num_bound_sockets = num_bound_sockets.clone(); - - let builder = LocalExecutorBuilder::default().name("socket"); - - let executor = builder.spawn(move || async move { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::SocketWorker(i), - ); - - network::run_socket_worker( - config, - state, - request_mesh_builder, - response_mesh_builder, - num_bound_sockets, - ) - .await - }); - - executors.push(executor); - } - - for i in 0..(config.request_workers) { - let config = config.clone(); - let state = state.clone(); - let request_mesh_builder = request_mesh_builder.clone(); - let response_mesh_builder = response_mesh_builder.clone(); - - let builder = LocalExecutorBuilder::default().name("request"); - - let executor = builder.spawn(move || async move { - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::RequestWorker(i), - ); - - handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) - .await - }); - - executors.push(executor); - } - - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - - for executor in executors { - executor - .expect("failed to spawn local executor") - .join() - .unwrap(); - } - - Ok(()) -} diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs deleted file mode 100644 index f8eb462..0000000 --- a/aquatic_udp/src/lib/glommio/network.rs +++ /dev/null @@ -1,428 +0,0 @@ -use std::cell::RefCell; -use std::collections::BTreeMap; -use std::io::Cursor; -use std::net::{IpAddr, SocketAddr}; -use std::rc::Rc; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use std::time::{Duration, Instant}; - -use aquatic_common::access_list::create_access_list_cache; -use aquatic_common::AHashIndexMap; -use futures_lite::{Stream, StreamExt}; -use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::channels::local_channel::{new_unbounded, LocalSender}; -use glommio::enclose; -use glommio::net::UdpSocket; -use glommio::prelude::*; -use glommio::timer::TimerActionRepeat; -use rand::prelude::{Rng, SeedableRng, StdRng}; - -use aquatic_udp_protocol::{IpVersion, Request, Response}; - -use super::common::State; - -use crate::common::handlers::*; -use crate::common::network::ConnectionMap; -use crate::common::*; -use crate::config::Config; - -const PENDING_SCRAPE_MAX_WAIT: u64 = 30; - -struct PendingScrapeResponse { - pending_worker_responses: usize, - valid_until: ValidUntil, - stats: BTreeMap, -} - -#[derive(Default)] -struct PendingScrapeResponses(AHashIndexMap); - -impl PendingScrapeResponses { - fn prepare( - &mut self, - transaction_id: TransactionId, - pending_worker_responses: usize, - valid_until: ValidUntil, - ) { - let pending = PendingScrapeResponse { - pending_worker_responses, - valid_until, - stats: BTreeMap::new(), - }; - - self.0.insert(transaction_id, pending); - } - - fn add_and_get_finished( - &mut self, - mut response: ScrapeResponse, - mut original_indices: Vec, - ) -> Option { - let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) { - r.pending_worker_responses -= 1; - - r.stats.extend( - original_indices - .drain(..) - .zip(response.torrent_stats.drain(..)), - ); - - r.pending_worker_responses == 0 - } else { - ::log::warn!("PendingScrapeResponses.add didn't find PendingScrapeResponse in map"); - - false - }; - - if finished { - let PendingScrapeResponse { stats, .. } = - self.0.remove(&response.transaction_id).unwrap(); - - Some(ScrapeResponse { - transaction_id: response.transaction_id, - torrent_stats: stats.into_values().collect(), - }) - } else { - None - } - } - - fn clean(&mut self) { - let now = Instant::now(); - - self.0.retain(|_, v| v.valid_until.0 > now); - self.0.shrink_to_fit(); - } -} - -pub async fn run_socket_worker( - config: Config, - state: State, - request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, - response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, - num_bound_sockets: Arc, -) { - let (local_sender, local_receiver) = new_unbounded(); - - let mut socket = UdpSocket::bind(config.network.address).unwrap(); - - let recv_buffer_size = config.network.socket_recv_buffer_size; - - if recv_buffer_size != 0 { - socket.set_buffer_size(recv_buffer_size); - } - - let socket = Rc::new(socket); - - num_bound_sockets.fetch_add(1, Ordering::SeqCst); - - let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); - - let response_consumer_index = response_receivers.consumer_id().unwrap(); - - let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default())); - - // Periodically clean pending_scrape_responses - TimerActionRepeat::repeat(enclose!((pending_scrape_responses) move || { - enclose!((pending_scrape_responses) move || async move { - pending_scrape_responses.borrow_mut().clean(); - - Some(Duration::from_secs(120)) - })() - })); - - spawn_local(enclose!((pending_scrape_responses) read_requests( - config.clone(), - state, - request_senders, - response_consumer_index, - local_sender, - socket.clone(), - pending_scrape_responses, - ))) - .detach(); - - for (_, receiver) in response_receivers.streams().into_iter() { - spawn_local(enclose!((pending_scrape_responses) handle_shared_responses( - socket.clone(), - pending_scrape_responses, - receiver, - ))) - .detach(); - } - - send_local_responses(socket, local_receiver.stream()).await; -} - -async fn read_requests( - config: Config, - state: State, - request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>, - response_consumer_index: usize, - local_sender: LocalSender<(Response, SocketAddr)>, - socket: Rc, - pending_scrape_responses: Rc>, -) { - let mut rng = StdRng::from_entropy(); - - let access_list_mode = config.access_list.mode; - - let max_connection_age = config.cleaning.max_connection_age; - let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); - let pending_scrape_valid_until = - Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); - let connections = Rc::new(RefCell::new(ConnectionMap::default())); - let mut access_list_cache = create_access_list_cache(&state.access_list); - - // Periodically update connection_valid_until - TimerActionRepeat::repeat(enclose!((connection_valid_until) move || { - enclose!((connection_valid_until) move || async move { - *connection_valid_until.borrow_mut() = ValidUntil::new(max_connection_age); - - Some(Duration::from_secs(1)) - })() - })); - - // Periodically update pending_scrape_valid_until - TimerActionRepeat::repeat(enclose!((pending_scrape_valid_until) move || { - enclose!((pending_scrape_valid_until) move || async move { - *pending_scrape_valid_until.borrow_mut() = ValidUntil::new(PENDING_SCRAPE_MAX_WAIT); - - Some(Duration::from_secs(10)) - })() - })); - - // Periodically clean connections - TimerActionRepeat::repeat(enclose!((config, connections) move || { - enclose!((config, connections) move || async move { - connections.borrow_mut().clean(); - - Some(Duration::from_secs(config.cleaning.connection_cleaning_interval)) - })() - })); - - let mut buf = [0u8; MAX_PACKET_SIZE]; - - loop { - match socket.recv_from(&mut buf).await { - Ok((amt, src)) => { - let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents); - - ::log::debug!("read request: {:?}", request); - - match request { - Ok(Request::Connect(request)) => { - let connection_id = ConnectionId(rng.gen()); - - connections.borrow_mut().insert( - connection_id, - src, - connection_valid_until.borrow().to_owned(), - ); - - let response = Response::Connect(ConnectResponse { - connection_id, - transaction_id: request.transaction_id, - }); - - local_sender.try_send((response, src)).unwrap(); - } - Ok(Request::Announce(request)) => { - if connections.borrow().contains(request.connection_id, src) { - if access_list_cache - .load() - .allows(access_list_mode, &request.info_hash.0) - { - let request_consumer_index = - calculate_request_consumer_index(&config, request.info_hash); - - if let Err(err) = request_senders - .send_to( - request_consumer_index, - ( - response_consumer_index, - ConnectedRequest::Announce(request), - src, - ), - ) - .await - { - ::log::error!("request_sender.try_send failed: {:?}", err) - } - } else { - let response = Response::Error(ErrorResponse { - transaction_id: request.transaction_id, - message: "Info hash not allowed".into(), - }); - - local_sender.try_send((response, src)).unwrap(); - } - } - } - Ok(Request::Scrape(ScrapeRequest { - transaction_id, - connection_id, - info_hashes, - })) => { - if connections.borrow().contains(connection_id, src) { - let mut consumer_requests: AHashIndexMap< - usize, - (ScrapeRequest, Vec), - > = Default::default(); - - for (i, info_hash) in info_hashes.into_iter().enumerate() { - let (req, indices) = consumer_requests - .entry(calculate_request_consumer_index(&config, info_hash)) - .or_insert_with(|| { - let request = ScrapeRequest { - transaction_id: transaction_id, - connection_id: connection_id, - info_hashes: Vec::new(), - }; - - (request, Vec::new()) - }); - - req.info_hashes.push(info_hash); - indices.push(i); - } - - pending_scrape_responses.borrow_mut().prepare( - transaction_id, - consumer_requests.len(), - pending_scrape_valid_until.borrow().to_owned(), - ); - - for (consumer_index, (request, original_indices)) in consumer_requests { - let request = ConnectedRequest::Scrape { - request, - original_indices, - }; - - if let Err(err) = request_senders - .send_to( - consumer_index, - (response_consumer_index, request, src), - ) - .await - { - ::log::error!("request_sender.send failed: {:?}", err) - } - } - } - } - Err(err) => { - ::log::debug!("Request::from_bytes error: {:?}", err); - - if let RequestParseError::Sendable { - connection_id, - transaction_id, - err, - } = err - { - if connections.borrow().contains(connection_id, src) { - let response = ErrorResponse { - transaction_id, - message: err.right_or("Parse error").into(), - }; - - local_sender.try_send((response.into(), src)).unwrap(); - } - } - } - } - } - Err(err) => { - ::log::error!("recv_from: {:?}", err); - } - } - - yield_if_needed().await; - } -} - -async fn handle_shared_responses( - socket: Rc, - pending_scrape_responses: Rc>, - mut stream: S, -) where - S: Stream + ::std::marker::Unpin, -{ - let mut buf = [0u8; MAX_PACKET_SIZE]; - let mut buf = Cursor::new(&mut buf[..]); - - while let Some((response, addr)) = stream.next().await { - let opt_response = match response { - ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)), - ConnectedResponse::Scrape { - response, - original_indices, - } => pending_scrape_responses - .borrow_mut() - .add_and_get_finished(response, original_indices) - .map(|response| (Response::Scrape(response), addr)), - }; - - if let Some((response, addr)) = opt_response { - write_response_to_socket(&socket, &mut buf, addr, response).await; - } - - yield_if_needed().await; - } -} - -async fn send_local_responses(socket: Rc, mut stream: S) -where - S: Stream + ::std::marker::Unpin, -{ - let mut buf = [0u8; MAX_PACKET_SIZE]; - let mut buf = Cursor::new(&mut buf[..]); - - while let Some((response, addr)) = stream.next().await { - write_response_to_socket(&socket, &mut buf, addr, response).await; - - yield_if_needed().await; - } -} - -async fn write_response_to_socket( - socket: &Rc, - buf: &mut Cursor<&mut [u8]>, - addr: SocketAddr, - response: Response, -) { - buf.set_position(0); - - ::log::debug!("preparing to send response: {:?}", response.clone()); - - response - .write(buf, ip_version_from_ip(addr.ip())) - .expect("write response"); - - let position = buf.position() as usize; - - if let Err(err) = socket.send_to(&buf.get_ref()[..position], addr).await { - ::log::info!("send_to failed: {:?}", err); - } -} - -fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { - (info_hash.0[0] as usize) % config.request_workers -} - -fn ip_version_from_ip(ip: IpAddr) -> IpVersion { - match ip { - IpAddr::V4(_) => IpVersion::IPv4, - IpAddr::V6(ip) => { - if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() { - IpVersion::IPv4 - } else { - IpVersion::IPv6 - } - } - } -} diff --git a/aquatic_udp/src/lib/common/handlers.rs b/aquatic_udp/src/lib/handlers.rs similarity index 68% rename from aquatic_udp/src/lib/common/handlers.rs rename to aquatic_udp/src/lib/handlers.rs index d14b630..c77ce89 100644 --- a/aquatic_udp/src/lib/common/handlers.rs +++ b/aquatic_udp/src/lib/handlers.rs @@ -1,37 +1,101 @@ +use std::net::IpAddr; use std::net::SocketAddr; +use std::time::Duration; -use rand::rngs::SmallRng; +use aquatic_common::ValidUntil; +use crossbeam_channel::{Receiver, Sender}; +use rand::{rngs::SmallRng, SeedableRng}; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_common::extract_response_peers; +use aquatic_udp_protocol::*; + use crate::common::*; +use crate::config::Config; -#[derive(Debug)] -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape { - request: ScrapeRequest, - /// Currently only used by glommio implementation - original_indices: Vec, - }, -} +pub fn run_request_worker( + state: State, + config: Config, + request_receiver: Receiver<(ConnectedRequest, SocketAddr)>, + response_sender: Sender<(ConnectedResponse, SocketAddr)>, +) { + let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); + let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); + let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new(); -#[derive(Debug)] -pub enum ConnectedResponse { - Announce(AnnounceResponse), - Scrape { - response: ScrapeResponse, - /// Currently only used by glommio implementation - original_indices: Vec, - }, -} + let mut small_rng = SmallRng::from_entropy(); -impl Into for ConnectedResponse { - fn into(self) -> Response { - match self { - Self::Announce(response) => Response::Announce(response), - Self::Scrape { response, .. } => Response::Scrape(response), + let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); + + loop { + let mut opt_torrents = None; + + // Collect requests from channel, divide them by type + // + // Collect a maximum number of request. Stop collecting before that + // number is reached if having waited for too long for a request, but + // only if TorrentMaps mutex isn't locked. + for i in 0..config.handlers.max_requests_per_iter { + let (request, src): (ConnectedRequest, SocketAddr) = if i == 0 { + match request_receiver.recv() { + Ok(r) => r, + Err(_) => break, // Really shouldn't happen + } + } else { + match request_receiver.recv_timeout(timeout) { + Ok(r) => r, + Err(_) => { + if let Some(guard) = state.torrents.try_lock() { + opt_torrents = Some(guard); + + break; + } else { + continue; + } + } + } + }; + + match request { + ConnectedRequest::Announce(request) => announce_requests.push((request, src)), + ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)), + } + } + + // Generate responses for announce and scrape requests, then drop MutexGuard. + { + let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock()); + + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + responses.extend(announce_requests.drain(..).map(|(request, src)| { + let response = handle_announce_request( + &config, + &mut small_rng, + &mut torrents, + request, + src, + peer_valid_until, + ); + + (ConnectedResponse::Announce(response), src) + })); + + responses.extend(scrape_requests.drain(..).map(|(request, src)| { + let response = ConnectedResponse::Scrape { + response: handle_scrape_request(&mut torrents, src, request), + original_indices: Vec::new(), + }; + + (response, src) + })); + } + + for r in responses.drain(..) { + if let Err(err) = response_sender.send(r) { + ::log::error!("error sending response to channel: {}", err); + } } } } @@ -143,7 +207,6 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { } } -#[inline] pub fn handle_scrape_request( torrents: &mut TorrentMaps, src: SocketAddr, diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index cc4403d..a08b862 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,22 +1,175 @@ -use cfg_if::cfg_if; - pub mod common; pub mod config; -#[cfg(all(feature = "with-glommio", target_os = "linux"))] -pub mod glommio; -#[cfg(any(feature = "with-mio", feature = "with-io-uring"))] -pub mod other; +pub mod handlers; +#[cfg(feature = "with-mio")] +pub mod network_mio; +#[cfg(feature = "with-io-uring")] +pub mod network_uring; +pub mod tasks; use config::Config; +use std::sync::{atomic::AtomicUsize, Arc}; +use std::thread::Builder; +use std::time::Duration; + +use anyhow::Context; +#[cfg(feature = "cpu-pinning")] +use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; +use aquatic_common::privileges::drop_privileges_after_socket_binding; +use crossbeam_channel::unbounded; + +use aquatic_common::access_list::update_access_list; +use signal_hook::consts::SIGUSR1; +use signal_hook::iterator::Signals; + +use common::State; + pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - cfg_if! { - if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { - glommio::run(config) - } else { - other::run(config) + let state = State::default(); + + update_access_list(&config.access_list, &state.access_list)?; + + let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; + + { + let config = config.clone(); + let state = state.clone(); + + ::std::thread::spawn(move || run_inner(config, state)); + } + + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + + for signal in &mut signals { + match signal { + SIGUSR1 => { + let _ = update_access_list(&config.access_list, &state.access_list); + } + _ => unreachable!(), } } + + Ok(()) +} + +pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { + let num_bound_sockets = Arc::new(AtomicUsize::new(0)); + + let (request_sender, request_receiver) = unbounded(); + let (response_sender, response_receiver) = unbounded(); + + for i in 0..config.request_workers { + let state = state.clone(); + let config = config.clone(); + let request_receiver = request_receiver.clone(); + let response_sender = response_sender.clone(); + + Builder::new() + .name(format!("request-{:02}", i + 1)) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::RequestWorker(i), + ); + + handlers::run_request_worker(state, config, request_receiver, response_sender) + }) + .with_context(|| "spawn request worker")?; + } + + for i in 0..config.socket_workers { + let state = state.clone(); + let config = config.clone(); + let request_sender = request_sender.clone(); + let response_receiver = response_receiver.clone(); + let num_bound_sockets = num_bound_sockets.clone(); + + Builder::new() + .name(format!("socket-{:02}", i + 1)) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::SocketWorker(i), + ); + + cfg_if::cfg_if!( + if #[cfg(feature = "with-io-uring")] { + network_uring::run_socket_worker( + state, + config, + request_sender, + response_receiver, + num_bound_sockets, + ); + } else { + network_mio::run_socket_worker( + state, + config, + i, + request_sender, + response_receiver, + num_bound_sockets, + ); + } + ); + }) + .with_context(|| "spawn socket worker")?; + } + + if config.statistics.interval != 0 { + let state = state.clone(); + let config = config.clone(); + + Builder::new() + .name("statistics-collector".to_string()) + .spawn(move || { + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + + loop { + ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); + + tasks::gather_and_print_statistics(&state, &config); + } + }) + .with_context(|| "spawn statistics worker")?; + } + + drop_privileges_after_socket_binding( + &config.privileges, + num_bound_sockets, + config.socket_workers, + ) + .unwrap(); + + #[cfg(feature = "cpu-pinning")] + pin_current_if_configured_to( + &config.cpu_pinning, + config.socket_workers, + WorkerIndex::Other, + ); + + loop { + ::std::thread::sleep(Duration::from_secs( + config.cleaning.torrent_cleaning_interval, + )); + + state.torrents.lock().clean(&config, &state.access_list); + } } diff --git a/aquatic_udp/src/lib/other/network_mio.rs b/aquatic_udp/src/lib/network_mio.rs similarity index 98% rename from aquatic_udp/src/lib/other/network_mio.rs rename to aquatic_udp/src/lib/network_mio.rs index d04fb2a..164aeb1 100644 --- a/aquatic_udp/src/lib/other/network_mio.rs +++ b/aquatic_udp/src/lib/network_mio.rs @@ -8,6 +8,7 @@ use std::time::{Duration, Instant}; use std::vec::Drain; use aquatic_common::access_list::create_access_list_cache; +use aquatic_common::ValidUntil; use crossbeam_channel::{Receiver, Sender}; use mio::net::UdpSocket; use mio::{Events, Interest, Poll, Token}; @@ -15,13 +16,10 @@ use rand::prelude::{SeedableRng, StdRng}; use aquatic_udp_protocol::{Request, Response}; -use crate::common::handlers::*; -use crate::common::network::ConnectionMap; +use crate::common::network::*; use crate::common::*; use crate::config::Config; -use super::common::*; - pub fn run_socket_worker( state: State, config: Config, diff --git a/aquatic_udp/src/lib/other/network_uring.rs b/aquatic_udp/src/lib/network_uring.rs similarity index 99% rename from aquatic_udp/src/lib/other/network_uring.rs rename to aquatic_udp/src/lib/network_uring.rs index 184e092..dc2722e 100644 --- a/aquatic_udp/src/lib/other/network_uring.rs +++ b/aquatic_udp/src/lib/network_uring.rs @@ -10,6 +10,7 @@ use std::sync::{ use std::time::{Duration, Instant}; use aquatic_common::access_list::create_access_list_cache; +use aquatic_common::ValidUntil; use crossbeam_channel::{Receiver, Sender}; use io_uring::types::{Fixed, Timespec}; use io_uring::SubmissionQueue; @@ -21,13 +22,11 @@ use slab::Slab; use aquatic_udp_protocol::{Request, Response}; -use crate::common::handlers::*; use crate::common::network::ConnectionMap; +use crate::common::network::*; use crate::common::*; use crate::config::Config; -use super::common::*; - const RING_SIZE: usize = 128; const MAX_RECV_EVENTS: usize = 1; const MAX_SEND_EVENTS: usize = RING_SIZE - MAX_RECV_EVENTS - 1; diff --git a/aquatic_udp/src/lib/other/common.rs b/aquatic_udp/src/lib/other/common.rs deleted file mode 100644 index a1f62f8..0000000 --- a/aquatic_udp/src/lib/other/common.rs +++ /dev/null @@ -1,166 +0,0 @@ -use aquatic_common::access_list::{AccessListArcSwap, AccessListCache}; -use aquatic_udp_protocol::*; -use crossbeam_channel::Sender; -use parking_lot::Mutex; -use rand::{prelude::StdRng, Rng}; -use socket2::{Domain, Protocol, Socket, Type}; -use std::{ - net::{IpAddr, SocketAddr}, - sync::{atomic::AtomicUsize, Arc}, -}; - -use crate::common::*; -use crate::common::{handlers::ConnectedRequest, network::ConnectionMap}; -use crate::config::Config; - -#[derive(Default)] -pub struct Statistics { - pub requests_received: AtomicUsize, - pub responses_sent: AtomicUsize, - pub bytes_received: AtomicUsize, - pub bytes_sent: AtomicUsize, -} - -#[derive(Clone)] -pub struct State { - pub access_list: Arc, - pub torrents: Arc>, - pub statistics: Arc, -} - -impl Default for State { - fn default() -> Self { - Self { - access_list: Arc::new(AccessListArcSwap::default()), - torrents: Arc::new(Mutex::new(TorrentMaps::default())), - statistics: Arc::new(Statistics::default()), - } - } -} - -pub fn handle_request( - config: &Config, - connections: &mut ConnectionMap, - access_list_cache: &mut AccessListCache, - rng: &mut StdRng, - request_sender: &Sender<(ConnectedRequest, SocketAddr)>, - local_responses: &mut Vec<(Response, SocketAddr)>, - valid_until: ValidUntil, - res_request: Result, - src: SocketAddr, -) { - let access_list_mode = config.access_list.mode; - - match res_request { - Ok(Request::Connect(request)) => { - let connection_id = ConnectionId(rng.gen()); - - connections.insert(connection_id, src, valid_until); - - let response = Response::Connect(ConnectResponse { - connection_id, - transaction_id: request.transaction_id, - }); - - local_responses.push((response, src)) - } - Ok(Request::Announce(request)) => { - if connections.contains(request.connection_id, src) { - if access_list_cache - .load() - .allows(access_list_mode, &request.info_hash.0) - { - if let Err(err) = - request_sender.try_send((ConnectedRequest::Announce(request), src)) - { - ::log::warn!("request_sender.try_send failed: {:?}", err) - } - } else { - let response = Response::Error(ErrorResponse { - transaction_id: request.transaction_id, - message: "Info hash not allowed".into(), - }); - - local_responses.push((response, src)) - } - } - } - Ok(Request::Scrape(request)) => { - if connections.contains(request.connection_id, src) { - let request = ConnectedRequest::Scrape { - request, - original_indices: Vec::new(), - }; - - if let Err(err) = request_sender.try_send((request, src)) { - ::log::warn!("request_sender.try_send failed: {:?}", err) - } - } - } - Err(err) => { - ::log::debug!("Request::from_bytes error: {:?}", err); - - if let RequestParseError::Sendable { - connection_id, - transaction_id, - err, - } = err - { - if connections.contains(connection_id, src) { - let response = ErrorResponse { - transaction_id, - message: err.right_or("Parse error").into(), - }; - - local_responses.push((response.into(), src)); - } - } - } - } -} - -pub fn ip_version_from_ip(ip: IpAddr) -> IpVersion { - match ip { - IpAddr::V4(_) => IpVersion::IPv4, - IpAddr::V6(ip) => { - if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() { - IpVersion::IPv4 - } else { - IpVersion::IPv6 - } - } - } -} - -pub fn create_socket(config: &Config) -> ::std::net::UdpSocket { - let socket = if config.network.address.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); - - socket.set_reuse_port(true).expect("socket: set reuse port"); - - socket - .set_nonblocking(true) - .expect("socket: set nonblocking"); - - socket - .bind(&config.network.address.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); - - let recv_buffer_size = config.network.socket_recv_buffer_size; - - if recv_buffer_size != 0 { - if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) { - ::log::error!( - "socket: failed setting recv buffer to {}: {:?}", - recv_buffer_size, - err - ); - } - } - - socket.into() -} diff --git a/aquatic_udp/src/lib/other/handlers.rs b/aquatic_udp/src/lib/other/handlers.rs deleted file mode 100644 index 0c2c5f2..0000000 --- a/aquatic_udp/src/lib/other/handlers.rs +++ /dev/null @@ -1,98 +0,0 @@ -use std::net::SocketAddr; -use std::time::Duration; - -use aquatic_common::ValidUntil; -use crossbeam_channel::{Receiver, Sender}; -use rand::{rngs::SmallRng, SeedableRng}; - -use aquatic_udp_protocol::*; - -use crate::common::handlers::*; -use crate::config::Config; -use crate::other::common::*; - -pub fn run_request_worker( - state: State, - config: Config, - request_receiver: Receiver<(ConnectedRequest, SocketAddr)>, - response_sender: Sender<(ConnectedResponse, SocketAddr)>, -) { - let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::new(); - let mut scrape_requests: Vec<(ScrapeRequest, SocketAddr)> = Vec::new(); - let mut responses: Vec<(ConnectedResponse, SocketAddr)> = Vec::new(); - - let mut small_rng = SmallRng::from_entropy(); - - let timeout = Duration::from_micros(config.handlers.channel_recv_timeout_microseconds); - - loop { - let mut opt_torrents = None; - - // Collect requests from channel, divide them by type - // - // Collect a maximum number of request. Stop collecting before that - // number is reached if having waited for too long for a request, but - // only if TorrentMaps mutex isn't locked. - for i in 0..config.handlers.max_requests_per_iter { - let (request, src): (ConnectedRequest, SocketAddr) = if i == 0 { - match request_receiver.recv() { - Ok(r) => r, - Err(_) => break, // Really shouldn't happen - } - } else { - match request_receiver.recv_timeout(timeout) { - Ok(r) => r, - Err(_) => { - if let Some(guard) = state.torrents.try_lock() { - opt_torrents = Some(guard); - - break; - } else { - continue; - } - } - } - }; - - match request { - ConnectedRequest::Announce(request) => announce_requests.push((request, src)), - ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)), - } - } - - // Generate responses for announce and scrape requests, then drop MutexGuard. - { - let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock()); - - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - - responses.extend(announce_requests.drain(..).map(|(request, src)| { - let response = handle_announce_request( - &config, - &mut small_rng, - &mut torrents, - request, - src, - peer_valid_until, - ); - - (ConnectedResponse::Announce(response), src) - })); - - responses.extend(scrape_requests.drain(..).map(|(request, src)| { - let response = ConnectedResponse::Scrape { - response: handle_scrape_request(&mut torrents, src, request), - original_indices: Vec::new(), - }; - - (response, src) - })); - } - - for r in responses.drain(..) { - if let Err(err) = response_sender.send(r) { - ::log::error!("error sending response to channel: {}", err); - } - } - } -} diff --git a/aquatic_udp/src/lib/other/mod.rs b/aquatic_udp/src/lib/other/mod.rs deleted file mode 100644 index 19e7c31..0000000 --- a/aquatic_udp/src/lib/other/mod.rs +++ /dev/null @@ -1,172 +0,0 @@ -use std::sync::{atomic::AtomicUsize, Arc}; -use std::thread::Builder; -use std::time::Duration; - -use anyhow::Context; -#[cfg(feature = "cpu-pinning")] -use aquatic_common::cpu_pinning::{pin_current_if_configured_to, WorkerIndex}; -use aquatic_common::privileges::drop_privileges_after_socket_binding; -use crossbeam_channel::unbounded; - -use aquatic_common::access_list::update_access_list; -use signal_hook::consts::SIGUSR1; -use signal_hook::iterator::Signals; - -use crate::config::Config; - -pub mod common; -pub mod handlers; -#[cfg(feature = "with-mio")] -pub mod network_mio; -#[cfg(feature = "with-io-uring")] -pub mod network_uring; -pub mod tasks; - -use common::State; - -pub fn run(config: Config) -> ::anyhow::Result<()> { - let state = State::default(); - - update_access_list(&config.access_list, &state.access_list)?; - - let mut signals = Signals::new(::std::iter::once(SIGUSR1))?; - - { - let config = config.clone(); - let state = state.clone(); - - ::std::thread::spawn(move || run_inner(config, state)); - } - - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - - for signal in &mut signals { - match signal { - SIGUSR1 => { - let _ = update_access_list(&config.access_list, &state.access_list); - } - _ => unreachable!(), - } - } - - Ok(()) -} - -pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> { - let num_bound_sockets = Arc::new(AtomicUsize::new(0)); - - let (request_sender, request_receiver) = unbounded(); - let (response_sender, response_receiver) = unbounded(); - - for i in 0..config.request_workers { - let state = state.clone(); - let config = config.clone(); - let request_receiver = request_receiver.clone(); - let response_sender = response_sender.clone(); - - Builder::new() - .name(format!("request-{:02}", i + 1)) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::RequestWorker(i), - ); - - handlers::run_request_worker(state, config, request_receiver, response_sender) - }) - .with_context(|| "spawn request worker")?; - } - - for i in 0..config.socket_workers { - let state = state.clone(); - let config = config.clone(); - let request_sender = request_sender.clone(); - let response_receiver = response_receiver.clone(); - let num_bound_sockets = num_bound_sockets.clone(); - - Builder::new() - .name(format!("socket-{:02}", i + 1)) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::SocketWorker(i), - ); - - cfg_if::cfg_if!( - if #[cfg(feature = "with-io-uring")] { - network_uring::run_socket_worker( - state, - config, - request_sender, - response_receiver, - num_bound_sockets, - ); - } else { - network_mio::run_socket_worker( - state, - config, - i, - request_sender, - response_receiver, - num_bound_sockets, - ); - } - ); - }) - .with_context(|| "spawn socket worker")?; - } - - if config.statistics.interval != 0 { - let state = state.clone(); - let config = config.clone(); - - Builder::new() - .name("statistics-collector".to_string()) - .spawn(move || { - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - - loop { - ::std::thread::sleep(Duration::from_secs(config.statistics.interval)); - - tasks::gather_and_print_statistics(&state, &config); - } - }) - .with_context(|| "spawn statistics worker")?; - } - - drop_privileges_after_socket_binding( - &config.privileges, - num_bound_sockets, - config.socket_workers, - ) - .unwrap(); - - #[cfg(feature = "cpu-pinning")] - pin_current_if_configured_to( - &config.cpu_pinning, - config.socket_workers, - WorkerIndex::Other, - ); - - loop { - ::std::thread::sleep(Duration::from_secs( - config.cleaning.torrent_cleaning_interval, - )); - - state.torrents.lock().clean(&config, &state.access_list); - } -} diff --git a/aquatic_udp/src/lib/other/tasks.rs b/aquatic_udp/src/lib/tasks.rs similarity index 100% rename from aquatic_udp/src/lib/other/tasks.rs rename to aquatic_udp/src/lib/tasks.rs diff --git a/aquatic_udp_bench/Cargo.toml b/aquatic_udp_bench/Cargo.toml index a645449..e2950df 100644 --- a/aquatic_udp_bench/Cargo.toml +++ b/aquatic_udp_bench/Cargo.toml @@ -13,6 +13,7 @@ name = "aquatic_udp_bench" anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_udp = "0.1.0" +aquatic_udp_protocol = "0.1.0" crossbeam-channel = "0.5" indicatif = "0.16.2" mimalloc = { version = "0.1", default-features = false } diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index 5eac23d..bcc84e8 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -6,9 +6,9 @@ use indicatif::ProgressIterator; use rand::Rng; use rand_distr::Pareto; -use aquatic_udp::common::handlers::*; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp_protocol::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 6d294e2..fb2895f 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -7,6 +7,7 @@ //! Scrape: 1 873 545 requests/second, 533.75 ns/request //! ``` +use aquatic_udp::handlers::run_request_worker; use crossbeam_channel::unbounded; use num_format::{Locale, ToFormattedString}; use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng}; @@ -15,8 +16,7 @@ use std::time::Duration; use aquatic_cli_helpers::run_app_with_cli_and_config; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::other::common::*; -use aquatic_udp::other::handlers; +use aquatic_udp_protocol::*; use config::BenchConfig; @@ -52,7 +52,7 @@ pub fn run(bench_config: BenchConfig) -> ::anyhow::Result<()> { let response_sender = response_sender.clone(); ::std::thread::spawn(move || { - handlers::run_request_worker(state, config, request_receiver, response_sender) + run_request_worker(state, config, request_receiver, response_sender) }); } diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index f718753..39d6ade 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -6,9 +6,9 @@ use indicatif::ProgressIterator; use rand::Rng; use rand_distr::Pareto; -use aquatic_udp::common::handlers::*; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp_protocol::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/scripts/run-aquatic-udp.sh b/scripts/run-aquatic-udp.sh index 0af7880..fe99a35 100755 --- a/scripts/run-aquatic-udp.sh +++ b/scripts/run-aquatic-udp.sh @@ -2,14 +2,16 @@ . ./scripts/env-native-cpu-without-avx-512 +USAGE="Usage: $0 [mio|io-uring] [ARGS]" + if [ "$1" != "mio" ] && [ "$1" != "glommio" ] && [ "$1" != "io-uring" ]; then - echo "Usage: $0 [mio|glommio|io-uring] [ARGS]" + echo "$USAGE" else if [ "$1" = "mio" ]; then cargo run --release --bin aquatic_udp -- "${@:2}" elif [ "$1" = "io-uring" ]; then cargo run --release --features "with-io-uring" --no-default-features --bin aquatic_udp -- "${@:2}" else - cargo run --release --features "with-glommio" --no-default-features --bin aquatic_udp -- "${@:2}" + echo "$USAGE" fi fi