From b10f7b89e769c1cea0d19385a15180ae17b4b134 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 21 Oct 2021 15:35:21 +0200 Subject: [PATCH] aquatic_udp: move code only use in mio impl out of crate::common --- TODO.md | 1 - aquatic_udp/src/lib/common/mod.rs | 47 ------------------- aquatic_udp/src/lib/glommio/handlers.rs | 3 +- aquatic_udp/src/lib/glommio/mod.rs | 5 ++ aquatic_udp/src/lib/glommio/network.rs | 8 ++-- aquatic_udp/src/lib/lib.rs | 17 +------ aquatic_udp/src/lib/mio/common.rs | 49 ++++++++++++++++++++ aquatic_udp/src/lib/mio/handlers/announce.rs | 1 + aquatic_udp/src/lib/mio/handlers/mod.rs | 2 +- aquatic_udp/src/lib/mio/handlers/scrape.rs | 2 + aquatic_udp/src/lib/mio/mod.rs | 18 ++++++- aquatic_udp/src/lib/mio/network.rs | 2 + aquatic_udp/src/lib/mio/tasks.rs | 2 +- aquatic_udp_bench/src/announce.rs | 1 + aquatic_udp_bench/src/main.rs | 1 + aquatic_udp_bench/src/scrape.rs | 1 + 16 files changed, 89 insertions(+), 71 deletions(-) create mode 100644 aquatic_udp/src/lib/mio/common.rs diff --git a/TODO.md b/TODO.md index f0a6971..b290b67 100644 --- a/TODO.md +++ b/TODO.md @@ -5,7 +5,6 @@ * clean connections * update peer valid until * privdrop - * a lot of "common code" is only used in mio implementation * access lists: * use arc-swap Cache diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index d42951a..8c84c3c 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -1,13 +1,10 @@ use std::borrow::Borrow; use std::hash::Hash; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; -use std::sync::{atomic::AtomicUsize, Arc}; use std::time::Instant; -use aquatic_common::access_list::AccessListArcSwap; use hashbrown::HashMap; use indexmap::IndexMap; -use parking_lot::Mutex; pub use aquatic_common::{access_list::AccessList, ValidUntil}; pub use aquatic_udp_protocol::*; @@ -35,25 +32,6 @@ impl Ip for Ipv6Addr { } } -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape(ScrapeRequest), -} - -pub enum ConnectedResponse { - Announce(AnnounceResponse), - Scrape(ScrapeResponse), -} - -impl Into for ConnectedResponse { - fn into(self) -> Response { - match self { - Self::Announce(response) => Response::Announce(response), - Self::Scrape(response) => Response::Scrape(response), - } - } -} - #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] pub enum PeerStatus { Seeding, @@ -175,31 +153,6 @@ impl TorrentMaps { } } -#[derive(Default)] -pub struct Statistics { - pub requests_received: AtomicUsize, - pub responses_sent: AtomicUsize, - pub bytes_received: AtomicUsize, - pub bytes_sent: AtomicUsize, -} - -#[derive(Clone)] -pub struct State { - pub access_list: Arc, - pub torrents: Arc>, - pub statistics: Arc, -} - -impl Default for State { - fn default() -> Self { - Self { - access_list: Arc::new(AccessListArcSwap::default()), - torrents: Arc::new(Mutex::new(TorrentMaps::default())), - statistics: Arc::new(Statistics::default()), - } - } -} - #[cfg(test)] mod tests { #[test] diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 12db332..9e18322 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -47,7 +47,8 @@ pub async fn run_request_worker( torrents.clone(), response_senders.clone(), receiver, - )).detach(); + )) + .detach(); handles.push(handle); } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 59f37d1..0da8ab9 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,3 +1,8 @@ +//! Work-in-progress glommio (io_uring) implementation +//! +//! * Doesn't support scrape requests +//! * Currently not faster than mio implementation + use std::sync::{atomic::AtomicUsize, Arc}; use glommio::channels::channel_mesh::MeshBuilder; diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 48840b6..93d5f04 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -51,13 +51,15 @@ pub async fn run_socket_worker( response_consumer_index, local_sender, socket.clone(), - )).detach(); + )) + .detach(); for (_, receiver) in response_receivers.streams().into_iter() { spawn_local(send_responses( socket.clone(), receiver.map(|(response, addr)| (response.into(), addr)), - )).detach(); + )) + .detach(); } send_responses(socket, local_receiver.stream()).await; @@ -174,7 +176,7 @@ where while let Some((response, src)) = stream.next().await { buf.set_position(0); - + ::log::debug!("preparing to send response: {:?}", response.clone()); response diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 3ea2bba..187d563 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,6 +1,4 @@ -use std::sync::Arc; - -use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; +use cfg_if::cfg_if; pub mod common; pub mod config; @@ -13,7 +11,7 @@ use config::Config; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - cfg_if::cfg_if! { + cfg_if! { if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { glommio::run(config) } else { @@ -21,14 +19,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } } } - -pub fn update_access_list(config: &Config, access_list: &Arc) { - match config.access_list.mode { - AccessListMode::White | AccessListMode::Black => { - if let Err(err) = access_list.update_from_path(&config.access_list.path) { - ::log::error!("Update access list from path: {:?}", err); - } - } - AccessListMode::Off => {} - } -} diff --git a/aquatic_udp/src/lib/mio/common.rs b/aquatic_udp/src/lib/mio/common.rs new file mode 100644 index 0000000..8bf2233 --- /dev/null +++ b/aquatic_udp/src/lib/mio/common.rs @@ -0,0 +1,49 @@ +use aquatic_common::access_list::AccessListArcSwap; +use parking_lot::Mutex; +use std::sync::{atomic::AtomicUsize, Arc}; + +use crate::common::*; + +pub enum ConnectedRequest { + Announce(AnnounceRequest), + Scrape(ScrapeRequest), +} + +pub enum ConnectedResponse { + Announce(AnnounceResponse), + Scrape(ScrapeResponse), +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::Announce(response) => Response::Announce(response), + Self::Scrape(response) => Response::Scrape(response), + } + } +} + +#[derive(Default)] +pub struct Statistics { + pub requests_received: AtomicUsize, + pub responses_sent: AtomicUsize, + pub bytes_received: AtomicUsize, + pub bytes_sent: AtomicUsize, +} + +#[derive(Clone)] +pub struct State { + pub access_list: Arc, + pub torrents: Arc>, + pub statistics: Arc, +} + +impl Default for State { + fn default() -> Self { + Self { + access_list: Arc::new(AccessListArcSwap::default()), + torrents: Arc::new(Mutex::new(TorrentMaps::default())), + statistics: Arc::new(Statistics::default()), + } + } +} diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index 99e6a46..549d061 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -10,6 +10,7 @@ use aquatic_udp_protocol::*; use crate::common::announce::handle_announce_request; use crate::common::*; use crate::config::Config; +use crate::mio::common::*; #[inline] pub fn handle_announce_requests( diff --git a/aquatic_udp/src/lib/mio/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers/mod.rs index 0634702..af8e5a8 100644 --- a/aquatic_udp/src/lib/mio/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers/mod.rs @@ -6,8 +6,8 @@ use rand::{rngs::SmallRng, SeedableRng}; use aquatic_udp_protocol::*; -use crate::common::*; use crate::config::Config; +use crate::mio::common::*; mod announce; mod scrape; diff --git a/aquatic_udp/src/lib/mio/handlers/scrape.rs b/aquatic_udp/src/lib/mio/handlers/scrape.rs index b544ccf..b1a6742 100644 --- a/aquatic_udp/src/lib/mio/handlers/scrape.rs +++ b/aquatic_udp/src/lib/mio/handlers/scrape.rs @@ -6,6 +6,8 @@ use parking_lot::MutexGuard; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; +use crate::mio::common::*; + use crate::common::*; #[inline] diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index f75ce9f..bf863ee 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -12,13 +12,16 @@ use anyhow::Context; use crossbeam_channel::unbounded; use privdrop::PrivDrop; +pub mod common; pub mod handlers; pub mod network; pub mod tasks; -use crate::common::State; +use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; + use crate::config::Config; -use crate::update_access_list; + +use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { let state = State::default(); @@ -126,3 +129,14 @@ pub fn start_workers( Ok(()) } + +pub fn update_access_list(config: &Config, access_list: &Arc) { + match config.access_list.mode { + AccessListMode::White | AccessListMode::Black => { + if let Err(err) = access_list.update_from_path(&config.access_list.path) { + ::log::error!("Update access list from path: {:?}", err); + } + } + AccessListMode::Off => {} + } +} diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index e755fdd..fe34023 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -20,6 +20,8 @@ use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; +use super::common::*; + pub fn run_socket_worker( state: State, config: Config, diff --git a/aquatic_udp/src/lib/mio/tasks.rs b/aquatic_udp/src/lib/mio/tasks.rs index 2fde39d..4d9fe16 100644 --- a/aquatic_udp/src/lib/mio/tasks.rs +++ b/aquatic_udp/src/lib/mio/tasks.rs @@ -2,7 +2,7 @@ use std::sync::atomic::Ordering; use histogram::Histogram; -use crate::common::*; +use super::common::*; use crate::config::Config; pub fn gather_and_print_statistics(state: &State, config: &Config) { diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index 12b35e3..d71f77c 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -8,6 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp::mio::common::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_udp_bench/src/main.rs b/aquatic_udp_bench/src/main.rs index 7b3f75c..28c210e 100644 --- a/aquatic_udp_bench/src/main.rs +++ b/aquatic_udp_bench/src/main.rs @@ -15,6 +15,7 @@ use std::time::Duration; use aquatic_cli_helpers::run_app_with_cli_and_config; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp::mio::common::*; use aquatic_udp::mio::handlers; use config::BenchConfig; diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 7b62152..a7d5c18 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -8,6 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; +use aquatic_udp::mio::common::*; use crate::common::*; use crate::config::BenchConfig;