diff --git a/.github/actions/test-transfer/Dockerfile b/.github/actions/test-transfer/Dockerfile index b297bc7..aaf8299 100644 --- a/.github/actions/test-transfer/Dockerfile +++ b/.github/actions/test-transfer/Dockerfile @@ -1,8 +1,10 @@ -# Container image that runs your code +# Not used by Github action, but can be used to run test locally: +# 1. docker build -t aquatic ./path/to/Dockerfile +# 2. docker run aquatic +# 3. On failure, run `docker rmi aquatic -f` and go back to step 1 + FROM rust:bullseye -# Copies your code file from your action repository to the filesystem path `/` of the container COPY entrypoint.sh /entrypoint.sh -# Code file to execute when the docker container starts up (`entrypoint.sh`) ENTRYPOINT ["/entrypoint.sh"] \ No newline at end of file diff --git a/.github/actions/test-transfer/action.yml b/.github/actions/test-transfer/action.yml index 2520a97..aa5afef 100644 --- a/.github/actions/test-transfer/action.yml +++ b/.github/actions/test-transfer/action.yml @@ -10,5 +10,7 @@ outputs: wss_ipv4: description: 'WSS IPv4 status' runs: - using: 'docker' - image: 'Dockerfile' \ No newline at end of file + using: 'composite' + steps: + - run: $GITHUB_ACTION_PATH/entrypoint.sh + shell: bash \ No newline at end of file diff --git a/.github/actions/test-transfer/entrypoint.sh b/.github/actions/test-transfer/entrypoint.sh index 77fd0ad..1cdc7b0 100755 --- a/.github/actions/test-transfer/entrypoint.sh +++ b/.github/actions/test-transfer/entrypoint.sh @@ -5,11 +5,6 @@ # # IPv6 is unfortunately disabled by default in Docker # (see sysctl net.ipv6.conf.lo.disable_ipv6) -# -# When testing locally, use: -# 1. docker build -t aquatic ./path/to/Dockerfile -# 2. docker run aquatic -# 3. On failure, run `docker rmi aquatic -f` and go back to step 1 set -e @@ -21,6 +16,8 @@ else SUDO="" fi +ulimit -a + $SUDO apt-get update $SUDO apt-get install -y cmake libssl-dev screen rtorrent mktorrent ssl-cert ca-certificates curl golang @@ -43,6 +40,9 @@ else cd "$GITHUB_WORKSPACE" fi +echo "last aquatic commits:" +git log --oneline -3 + # Setup bogus TLS certificate $SUDO echo "127.0.0.1 example.com" >> /etc/hosts diff --git a/.github/workflows/cargo-build-and-test.yml b/.github/workflows/cargo-build-and-test.yml index 4e9b66e..53f6721 100644 --- a/.github/workflows/cargo-build-and-test.yml +++ b/.github/workflows/cargo-build-and-test.yml @@ -11,9 +11,8 @@ env: jobs: build: - runs-on: ubuntu-latest - + timeout-minutes: 10 steps: - uses: actions/checkout@v2 - name: Build diff --git a/.github/workflows/test-transfer.yml b/.github/workflows/test-transfer.yml index 97a4f91..2679356 100644 --- a/.github/workflows/test-transfer.yml +++ b/.github/workflows/test-transfer.yml @@ -10,9 +10,13 @@ jobs: test-transfer-http: runs-on: ubuntu-latest name: "Test BitTorrent file transfer over HTTP (with and without TLS), UDP and WSS" + timeout-minutes: 20 + container: + image: rust:1-bullseye + options: --ulimit memlock=524288:524288 steps: - name: Checkout uses: actions/checkout@v2 - name: Test file transfers uses: ./.github/actions/test-transfer - id: test_transfer \ No newline at end of file + id: test_transfer diff --git a/README.md b/README.md index 1538cdc..8089f9e 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,8 @@ except that it: Supports IPv4 and IPv6 (BitTorrent UDP protocol doesn't support IPv6 very well, however.) +For optimal performance, enable setting of core affinities in configuration. + #### Benchmarks [opentracker]: http://erdgeist.org/arts/software/opentracker/ @@ -110,6 +112,22 @@ Server responses per second, best result in bold: Please refer to `documents/aquatic-udp-load-test-2021-08-19.pdf` for more details. +#### Alternative implementation using io_uring + +[io_uring]: https://en.wikipedia.org/wiki/Io_uring +[glommio]: https://github.com/DataDog/glommio + +There is an alternative implementation that utilizes [io_uring] by running on +[glommio]. It only runs on Linux and requires a recent kernel (version 5.1 or later). +In some cases, it performs even better than the cross-platform implementation. + +To use it, pass the `with-glommio` feature when building, e.g.: + +```sh +cargo build -p aquatic_udp --features "with-glommio" --no-default-features +./target/release/aquatic_udp +``` + ### aquatic_http: HTTP BitTorrent tracker Aims for compatibility with the HTTP BitTorrent protocol, as described diff --git a/TODO.md b/TODO.md index b290b67..d7ceb85 100644 --- a/TODO.md +++ b/TODO.md @@ -1,13 +1,13 @@ # TODO * aquatic_udp glommio - * update access lists - * clean connections - * update peer valid until - * privdrop + * Add to file transfer CI + * consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest) + containing TransactionId and BTreeMap, and same for + response * access lists: - * use arc-swap Cache + * use arc-swap Cache? * add CI tests * aquatic_ws: should it send back error on message parse error, or does that diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 28e24e3..69f856c 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -42,7 +42,7 @@ impl Default for AccessListConfig { } } -#[derive(Default)] +#[derive(Default, Clone)] pub struct AccessList(HashSet<[u8; 20]>); impl AccessList { @@ -51,6 +51,20 @@ impl AccessList { Ok(()) } + + pub fn create_from_path(path: &PathBuf) -> anyhow::Result { + let file = File::open(path)?; + let reader = BufReader::new(file); + + let mut new_list = Self::default(); + + for line in reader.lines() { + new_list.insert_from_line(&line?)?; + } + + Ok(new_list) + } + pub fn allows(&self, mode: AccessListMode, info_hash: &[u8; 20]) -> bool { match mode { AccessListMode::White => self.0.contains(info_hash), @@ -69,16 +83,7 @@ pub type AccessListArcSwap = ArcSwap; impl AccessListQuery for AccessListArcSwap { fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { - let file = File::open(path)?; - let reader = BufReader::new(file); - - let mut new_list = HashSet::new(); - - for line in reader.lines() { - new_list.insert(parse_info_hash(&line?)?); - } - - self.store(Arc::new(AccessList(new_list))); + self.store(Arc::new(AccessList::create_from_path(path)?)); Ok(()) } diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 43ebe06..4dfed44 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -15,7 +15,9 @@ path = "src/lib/lib.rs" name = "aquatic_udp" [features] +default = ["with-mio"] with-glommio = ["glommio", "futures-lite"] +with-mio = ["crossbeam-channel", "histogram", "mio", "socket2"] [dependencies] anyhow = "1" @@ -24,20 +26,23 @@ aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" cfg-if = "1" core_affinity = "0.5" -crossbeam-channel = "0.5" hashbrown = "0.11.2" hex = "0.4" -histogram = "0.6" indexmap = "1" log = "0.4" mimalloc = { version = "0.1", default-features = false } -mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } parking_lot = "0.11" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } -socket2 = { version = "0.4.1", features = ["all"] } +# mio +crossbeam-channel = { version = "0.5", optional = true } +histogram = { version = "0.6", optional = true } +mio = { version = "0.7", features = ["udp", "os-poll", "os-util"], optional = true } +socket2 = { version = "0.4.1", features = ["all"], optional = true } + +# glommio glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } futures-lite = { version = "1", optional = true } diff --git a/aquatic_udp/src/lib/common/announce.rs b/aquatic_udp/src/lib/common/handlers.rs similarity index 61% rename from aquatic_udp/src/lib/common/announce.rs rename to aquatic_udp/src/lib/common/handlers.rs index 2a63b61..380616a 100644 --- a/aquatic_udp/src/lib/common/announce.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -1,10 +1,70 @@ +use std::net::SocketAddr; + use rand::rngs::SmallRng; +use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_common::extract_response_peers; use crate::common::*; -pub fn handle_announce_request( +#[derive(Debug)] +pub enum ConnectedRequest { + Announce(AnnounceRequest), + Scrape { + request: ScrapeRequest, + /// Currently only used by glommio implementation + original_indices: Vec, + }, +} + +#[derive(Debug)] +pub enum ConnectedResponse { + Announce(AnnounceResponse), + Scrape { + response: ScrapeResponse, + /// Currently only used by glommio implementation + original_indices: Vec, + }, +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::Announce(response) => Response::Announce(response), + Self::Scrape { response, .. } => Response::Scrape(response), + } + } +} + +pub fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrents: &mut TorrentMaps, + request: AnnounceRequest, + src: SocketAddr, + peer_valid_until: ValidUntil, +) -> AnnounceResponse { + match convert_ipv4_mapped_ipv6(src.ip()) { + IpAddr::V4(ip) => handle_announce_request_inner( + config, + rng, + &mut torrents.ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request_inner( + config, + rng, + &mut torrents.ipv6, + request, + ip, + peer_valid_until, + ), + } +} + +fn handle_announce_request_inner( config: &Config, rng: &mut SmallRng, torrents: &mut TorrentMap, @@ -83,6 +143,57 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { } } +#[inline] +pub fn handle_scrape_request( + torrents: &mut TorrentMaps, + src: SocketAddr, + request: ScrapeRequest, +) -> ScrapeResponse { + const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0); + + let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); + + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + if peer_ip.is_ipv4() { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv4.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(EMPTY_STATS); + } + } + } else { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv6.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(EMPTY_STATS); + } + } + } + + ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats: stats, + } +} + +#[inline(always)] +const fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { + TorrentScrapeStatistics { + seeders: NumberOfPeers(seeders), + completed: NumberOfDownloads(0), // No implementation planned + leechers: NumberOfPeers(leechers), + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index 8c84c3c..bcc073b 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -11,7 +11,7 @@ pub use aquatic_udp_protocol::*; use crate::config::Config; -pub mod announce; +pub mod handlers; pub mod network; pub const MAX_PACKET_SIZE: usize = 4096; diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs index 833c99f..469d658 100644 --- a/aquatic_udp/src/lib/common/network.rs +++ b/aquatic_udp/src/lib/common/network.rs @@ -17,7 +17,7 @@ impl ConnectionMap { self.0.insert((connection_id, socket_addr), valid_until); } - pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { + pub fn contains(&self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { self.0.contains_key(&(connection_id, socket_addr)) } diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index 5e7b6f6..d05b338 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -18,7 +18,9 @@ pub struct Config { pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, + #[cfg(feature = "with-mio")] pub handlers: HandlerConfig, + #[cfg(feature = "with-mio")] pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, @@ -52,6 +54,7 @@ pub struct NetworkConfig { /// $ sudo sysctl -w net.core.rmem_max=104857600 /// $ sudo sysctl -w net.core.rmem_default=104857600 pub socket_recv_buffer_size: usize, + #[cfg(feature = "with-mio")] pub poll_event_capacity: usize, } @@ -66,6 +69,7 @@ pub struct ProtocolConfig { pub peer_announce_interval: i32, } +#[cfg(feature = "with-mio")] #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct HandlerConfig { @@ -75,6 +79,7 @@ pub struct HandlerConfig { pub channel_recv_timeout_microseconds: u64, } +#[cfg(feature = "with-mio")] #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct StatisticsConfig { @@ -119,7 +124,9 @@ impl Default for Config { log_level: LogLevel::Error, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), + #[cfg(feature = "with-mio")] handlers: HandlerConfig::default(), + #[cfg(feature = "with-mio")] statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), @@ -133,8 +140,9 @@ impl Default for NetworkConfig { fn default() -> Self { Self { address: SocketAddr::from(([0, 0, 0, 0], 3000)), - poll_event_capacity: 4096, socket_recv_buffer_size: 4096 * 128, + #[cfg(feature = "with-mio")] + poll_event_capacity: 4096, } } } @@ -149,6 +157,7 @@ impl Default for ProtocolConfig { } } +#[cfg(feature = "with-mio")] impl Default for HandlerConfig { fn default() -> Self { Self { @@ -158,6 +167,7 @@ impl Default for HandlerConfig { } } +#[cfg(feature = "with-mio")] impl Default for StatisticsConfig { fn default() -> Self { Self { interval: 0 } diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs index 5dd6eeb..d1d398c 100644 --- a/aquatic_udp/src/lib/glommio/common.rs +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -10,23 +10,36 @@ use crate::config::Config; pub async fn update_access_list(config: Config, access_list: Rc>) { if config.access_list.mode.is_on() { - let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); + match BufferedFile::open(config.access_list.path).await { + Ok(file) => { + let mut reader = StreamReaderBuilder::new(file).build(); - let mut reader = StreamReaderBuilder::new(access_list_file).build(); + loop { + let mut buf = String::with_capacity(42); - loop { - let mut buf = String::with_capacity(42); + match reader.read_line(&mut buf).await { + Ok(_) => { + if let Err(err) = access_list.borrow_mut().insert_from_line(&buf) { + ::log::error!( + "Couln't parse access list line '{}': {:?}", + buf, + err + ); + } + } + Err(err) => { + ::log::error!("Couln't read access list line {:?}", err); - match reader.read_line(&mut buf).await { - Ok(_) => { - access_list.borrow_mut().insert_from_line(&buf); - } - Err(err) => { - break; + break; + } + } + + yield_if_needed().await; } } - - yield_if_needed().await; - } + Err(err) => { + ::log::error!("Couldn't open access list file: {:?}", err) + } + }; } } diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 9e18322..3bfbc0d 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::rc::Rc; use std::time::Duration; @@ -10,15 +10,17 @@ use glommio::{enclose, prelude::*}; use rand::prelude::SmallRng; use rand::SeedableRng; -use crate::common::announce::handle_announce_request; +use crate::common::handlers::handle_announce_request; +use crate::common::handlers::*; use crate::common::*; use crate::config::Config; use crate::glommio::common::update_access_list; pub async fn run_request_worker( config: Config, - request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, - response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, + request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, + access_list: AccessList, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); @@ -26,7 +28,7 @@ pub async fn run_request_worker( let response_senders = Rc::new(response_senders); let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - let access_list = Rc::new(RefCell::new(AccessList::default())); + let access_list = Rc::new(RefCell::new(access_list)); // Periodically clean torrents and update access list TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { @@ -61,39 +63,52 @@ pub async fn run_request_worker( async fn handle_request_stream( config: Config, torrents: Rc>, - response_senders: Rc>, + response_senders: Rc>, mut stream: S, ) where - S: Stream + ::std::marker::Unpin, + S: Stream + ::std::marker::Unpin, { let mut rng = SmallRng::from_entropy(); - // Needs to be updated periodically: use timer? - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + let max_peer_age = config.cleaning.max_peer_age; + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); - while let Some((producer_index, request, addr)) = stream.next().await { - let response = match addr.ip() { - IpAddr::V4(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv4, + TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { + enclose!((peer_valid_until) move || async move { + *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + + Some(Duration::from_secs(1)) + })() + })); + + while let Some((producer_index, request, src)) = stream.next().await { + let response = match request { + ConnectedRequest::Announce(request) => { + ConnectedResponse::Announce(handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut(), + request, + src, + peer_valid_until.borrow().to_owned(), + )) + } + ConnectedRequest::Scrape { request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv6, - request, - ip, - peer_valid_until, - ), + original_indices, + } => { + let response = handle_scrape_request(&mut torrents.borrow_mut(), src, request); + + ConnectedResponse::Scrape { + response, + original_indices, + } + } }; ::log::debug!("preparing to send response to channel: {:?}", response); - if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { + if let Err(err) = response_senders.try_send_to(producer_index, (response, src)) { ::log::warn!("response_sender.try_send: {:?}", err); } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 3d60544..d95121f 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,14 +1,11 @@ -//! Work-in-progress glommio (io_uring) implementation -//! -//! * Doesn't support scrape requests -//! * Currently not faster than mio implementation - use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_common::access_list::AccessList; use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; use crate::config::Config; +use crate::drop_privileges_after_socket_binding; mod common; pub mod handlers; @@ -18,11 +15,17 @@ pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> anyhow::Result<()> { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset, + }); } + let access_list = if config.access_list.mode.is_on() { + AccessList::create_from_path(&config.access_list.path).expect("Load access list") + } else { + AccessList::default() + }; + let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -37,6 +40,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); + let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -50,6 +54,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { request_mesh_builder, response_mesh_builder, num_bound_sockets, + access_list, ) .await }); @@ -61,20 +66,30 @@ pub fn run(config: Config) -> anyhow::Result<()> { let config = config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); + let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); if config.core_affinity.set_affinities { - builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i); + builder = + builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i); } let executor = builder.spawn(|| async move { - handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await + handlers::run_request_worker( + config, + request_mesh_builder, + response_mesh_builder, + access_list, + ) + .await }); executors.push(executor); } + drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap(); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 93d5f04..cb046c0 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,3 +1,5 @@ +use std::cell::RefCell; +use std::collections::BTreeMap; use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; @@ -5,25 +7,102 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; +use std::time::{Duration, Instant}; use futures_lite::{Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_unbounded, LocalSender}; +use glommio::enclose; use glommio::net::UdpSocket; use glommio::prelude::*; +use glommio::timer::TimerActionRepeat; +use hashbrown::HashMap; use rand::prelude::{Rng, SeedableRng, StdRng}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use super::common::update_access_list; + +use crate::common::handlers::*; use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; +const PENDING_SCRAPE_MAX_WAIT: u64 = 30; + +struct PendingScrapeResponse { + pending_worker_responses: usize, + valid_until: ValidUntil, + stats: BTreeMap, +} + +#[derive(Default)] +struct PendingScrapeResponses(HashMap); + +impl PendingScrapeResponses { + fn prepare( + &mut self, + transaction_id: TransactionId, + pending_worker_responses: usize, + valid_until: ValidUntil, + ) { + let pending = PendingScrapeResponse { + pending_worker_responses, + valid_until, + stats: BTreeMap::new(), + }; + + self.0.insert(transaction_id, pending); + } + + fn add_and_get_finished( + &mut self, + mut response: ScrapeResponse, + mut original_indices: Vec, + ) -> Option { + let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) { + r.pending_worker_responses -= 1; + + r.stats.extend( + original_indices + .drain(..) + .zip(response.torrent_stats.drain(..)), + ); + + r.pending_worker_responses == 0 + } else { + ::log::warn!("PendingScrapeResponses.add didn't find PendingScrapeResponse in map"); + + false + }; + + if finished { + let PendingScrapeResponse { stats, .. } = + self.0.remove(&response.transaction_id).unwrap(); + + Some(ScrapeResponse { + transaction_id: response.transaction_id, + torrent_stats: stats.into_values().collect(), + }) + } else { + None + } + } + + fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| v.valid_until.0 > now); + self.0.shrink_to_fit(); + } +} + pub async fn run_socket_worker( config: Config, - request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, - response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, + request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, num_bound_sockets: Arc, + access_list: AccessList, ) { let (local_sender, local_receiver) = new_unbounded(); @@ -40,50 +119,101 @@ pub async fn run_socket_worker( num_bound_sockets.fetch_add(1, Ordering::SeqCst); let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); let response_consumer_index = response_receivers.consumer_id().unwrap(); - spawn_local(read_requests( + let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default())); + + // Periodically clean pending_scrape_responses + TimerActionRepeat::repeat(enclose!((config, pending_scrape_responses) move || { + enclose!((config, pending_scrape_responses) move || async move { + pending_scrape_responses.borrow_mut().clean(); + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + + spawn_local(enclose!((pending_scrape_responses) read_requests( config.clone(), request_senders, response_consumer_index, local_sender, socket.clone(), - )) + pending_scrape_responses, + access_list, + ))) .detach(); for (_, receiver) in response_receivers.streams().into_iter() { - spawn_local(send_responses( + spawn_local(enclose!((pending_scrape_responses) handle_shared_responses( socket.clone(), - receiver.map(|(response, addr)| (response.into(), addr)), - )) + pending_scrape_responses, + receiver, + ))) .detach(); } - send_responses(socket, local_receiver.stream()).await; + send_local_responses(socket, local_receiver.stream()).await; } async fn read_requests( config: Config, - request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>, + request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>, response_consumer_index: usize, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, + pending_scrape_responses: Rc>, + access_list: AccessList, ) { let mut rng = StdRng::from_entropy(); let access_list_mode = config.access_list.mode; - // Needs to be updated periodically: use timer? - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - // Needs to be updated periodically: use timer? - let access_list = AccessList::default(); - // Needs to be cleaned periodically: use timer? - let mut connections = ConnectionMap::default(); + let max_connection_age = config.cleaning.max_connection_age; + let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); + let pending_scrape_valid_until = + Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); + let access_list = Rc::new(RefCell::new(access_list)); + let connections = Rc::new(RefCell::new(ConnectionMap::default())); - let mut buf = [0u8; 2048]; + // Periodically update connection_valid_until + TimerActionRepeat::repeat(enclose!((connection_valid_until) move || { + enclose!((connection_valid_until) move || async move { + *connection_valid_until.borrow_mut() = ValidUntil::new(max_connection_age); + + Some(Duration::from_secs(1)) + })() + })); + + // Periodically update pending_scrape_valid_until + TimerActionRepeat::repeat(enclose!((pending_scrape_valid_until) move || { + enclose!((pending_scrape_valid_until) move || async move { + *pending_scrape_valid_until.borrow_mut() = ValidUntil::new(PENDING_SCRAPE_MAX_WAIT); + + Some(Duration::from_secs(10)) + })() + })); + + // Periodically update access list + TimerActionRepeat::repeat(enclose!((config, access_list) move || { + enclose!((config, access_list) move || async move { + update_access_list(config.clone(), access_list.clone()).await; + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + + // Periodically clean connections + TimerActionRepeat::repeat(enclose!((config, connections) move || { + enclose!((config, connections) move || async move { + connections.borrow_mut().clean(); + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + + let mut buf = [0u8; MAX_PACKET_SIZE]; loop { match socket.recv_from(&mut buf).await { @@ -96,7 +226,11 @@ async fn read_requests( Ok(Request::Connect(request)) => { let connection_id = ConnectionId(rng.gen()); - connections.insert(connection_id, src, valid_until); + connections.borrow_mut().insert( + connection_id, + src, + connection_valid_until.borrow().to_owned(), + ); let response = Response::Connect(ConnectResponse { connection_id, @@ -106,14 +240,21 @@ async fn read_requests( local_sender.try_send((response, src)).unwrap(); } Ok(Request::Announce(request)) => { - if connections.contains(request.connection_id, src) { - if access_list.allows(access_list_mode, &request.info_hash.0) { + if connections.borrow().contains(request.connection_id, src) { + if access_list + .borrow() + .allows(access_list_mode, &request.info_hash.0) + { let request_consumer_index = - (request.info_hash.0[0] as usize) % config.request_workers; + calculate_request_consumer_index(&config, request.info_hash); if let Err(err) = request_senders.try_send_to( request_consumer_index, - (response_consumer_index, request, src), + ( + response_consumer_index, + ConnectedRequest::Announce(request), + src, + ), ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -127,14 +268,51 @@ async fn read_requests( } } } - Ok(Request::Scrape(request)) => { - if connections.contains(request.connection_id, src) { - let response = Response::Error(ErrorResponse { - transaction_id: request.transaction_id, - message: "Scrape requests not supported".into(), - }); + Ok(Request::Scrape(ScrapeRequest { + transaction_id, + connection_id, + info_hashes, + })) => { + if connections.borrow().contains(connection_id, src) { + let mut consumer_requests: HashMap)> = + HashMap::new(); - local_sender.try_send((response, src)).unwrap(); + for (i, info_hash) in info_hashes.into_iter().enumerate() { + let (req, indices) = consumer_requests + .entry(calculate_request_consumer_index(&config, info_hash)) + .or_insert_with(|| { + let request = ScrapeRequest { + transaction_id: transaction_id, + connection_id: connection_id, + info_hashes: Vec::new(), + }; + + (request, Vec::new()) + }); + + req.info_hashes.push(info_hash); + indices.push(i); + } + + pending_scrape_responses.borrow_mut().prepare( + transaction_id, + consumer_requests.len(), + pending_scrape_valid_until.borrow().to_owned(), + ); + + for (consumer_index, (request, original_indices)) in consumer_requests { + let request = ConnectedRequest::Scrape { + request, + original_indices, + }; + + if let Err(err) = request_senders.try_send_to( + consumer_index, + (response_consumer_index, request, src), + ) { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } } } Err(err) => { @@ -146,7 +324,7 @@ async fn read_requests( err, } = err { - if connections.contains(connection_id, src) { + if connections.borrow().contains(connection_id, src) { let response = ErrorResponse { transaction_id, message: err.right_or("Parse error").into(), @@ -167,32 +345,75 @@ async fn read_requests( } } -async fn send_responses(socket: Rc, mut stream: S) +async fn handle_shared_responses( + socket: Rc, + pending_scrape_responses: Rc>, + mut stream: S, +) where + S: Stream + ::std::marker::Unpin, +{ + let mut buf = [0u8; MAX_PACKET_SIZE]; + let mut buf = Cursor::new(&mut buf[..]); + + while let Some((response, addr)) = stream.next().await { + let opt_response = match response { + ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)), + ConnectedResponse::Scrape { + response, + original_indices, + } => pending_scrape_responses + .borrow_mut() + .add_and_get_finished(response, original_indices) + .map(|response| (Response::Scrape(response), addr)), + }; + + if let Some((response, addr)) = opt_response { + write_response_to_socket(&socket, &mut buf, addr, response).await; + } + + yield_if_needed().await; + } +} + +async fn send_local_responses(socket: Rc, mut stream: S) where S: Stream + ::std::marker::Unpin, { let mut buf = [0u8; MAX_PACKET_SIZE]; let mut buf = Cursor::new(&mut buf[..]); - while let Some((response, src)) = stream.next().await { - buf.set_position(0); - - ::log::debug!("preparing to send response: {:?}", response.clone()); - - response - .write(&mut buf, ip_version_from_ip(src.ip())) - .expect("write response"); - - let position = buf.position() as usize; - - if let Err(err) = socket.send_to(&buf.get_ref()[..position], src).await { - ::log::info!("send_to failed: {:?}", err); - } + while let Some((response, addr)) = stream.next().await { + write_response_to_socket(&socket, &mut buf, addr, response).await; yield_if_needed().await; } } +async fn write_response_to_socket( + socket: &Rc, + buf: &mut Cursor<&mut [u8]>, + addr: SocketAddr, + response: Response, +) { + buf.set_position(0); + + ::log::debug!("preparing to send response: {:?}", response.clone()); + + response + .write(buf, ip_version_from_ip(addr.ip())) + .expect("write response"); + + let position = buf.position() as usize; + + if let Err(err) = socket.send_to(&buf.get_ref()[..position], addr).await { + ::log::info!("send_to failed: {:?}", err); + } +} + +fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { + (info_hash.0[0] as usize) % config.request_workers +} + fn ip_version_from_ip(ip: IpAddr) -> IpVersion { match ip { IpAddr::V4(_) => IpVersion::IPv4, diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 187d563..e3eea68 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,12 +1,22 @@ +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + use cfg_if::cfg_if; pub mod common; pub mod config; #[cfg(all(feature = "with-glommio", target_os = "linux"))] pub mod glommio; +#[cfg(feature = "with-mio")] pub mod mio; use config::Config; +use privdrop::PrivDrop; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; @@ -19,3 +29,35 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } } } + +fn drop_privileges_after_socket_binding( + config: &Config, + num_bound_sockets: Arc, +) -> anyhow::Result<()> { + if config.privileges.drop_privileges { + let mut counter = 0usize; + + loop { + let sockets = num_bound_sockets.load(Ordering::SeqCst); + + if sockets == config.socket_workers { + PrivDrop::default() + .chroot(config.privileges.chroot_path.clone()) + .user(config.privileges.user.clone()) + .apply()?; + + break; + } + + ::std::thread::sleep(Duration::from_millis(10)); + + counter += 1; + + if counter == 500 { + panic!("Sockets didn't bind in time for privilege drop."); + } + } + } + + Ok(()) +} diff --git a/aquatic_udp/src/lib/mio/common.rs b/aquatic_udp/src/lib/mio/common.rs index 8bf2233..bcaff2f 100644 --- a/aquatic_udp/src/lib/mio/common.rs +++ b/aquatic_udp/src/lib/mio/common.rs @@ -4,25 +4,6 @@ use std::sync::{atomic::AtomicUsize, Arc}; use crate::common::*; -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape(ScrapeRequest), -} - -pub enum ConnectedResponse { - Announce(AnnounceResponse), - Scrape(ScrapeResponse), -} - -impl Into for ConnectedResponse { - fn into(self) -> Response { - match self { - Self::Announce(response) => Response::Announce(response), - Self::Scrape(response) => Response::Scrape(response), - } - } -} - #[derive(Default)] pub struct Statistics { pub requests_received: AtomicUsize, diff --git a/aquatic_udp/src/lib/mio/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers.rs similarity index 67% rename from aquatic_udp/src/lib/mio/handlers/mod.rs rename to aquatic_udp/src/lib/mio/handlers.rs index af8e5a8..7019b98 100644 --- a/aquatic_udp/src/lib/mio/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers.rs @@ -1,20 +1,16 @@ use std::net::SocketAddr; use std::time::Duration; +use aquatic_common::ValidUntil; use crossbeam_channel::{Receiver, Sender}; use rand::{rngs::SmallRng, SeedableRng}; use aquatic_udp_protocol::*; +use crate::common::handlers::*; use crate::config::Config; use crate::mio::common::*; -mod announce; -mod scrape; - -use announce::handle_announce_requests; -use scrape::handle_scrape_requests; - pub fn run_request_worker( state: State, config: Config, @@ -59,8 +55,8 @@ pub fn run_request_worker( }; match request { - ConnectedRequest::Announce(r) => announce_requests.push((r, src)), - ConnectedRequest::Scrape(r) => scrape_requests.push((r, src)), + ConnectedRequest::Announce(request) => announce_requests.push((request, src)), + ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)), } } @@ -68,15 +64,29 @@ pub fn run_request_worker( { let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock()); - handle_announce_requests( - &config, - &mut torrents, - &mut small_rng, - announce_requests.drain(..), - &mut responses, - ); + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); + responses.extend(announce_requests.drain(..).map(|(request, src)| { + let response = handle_announce_request( + &config, + &mut small_rng, + &mut torrents, + request, + src, + peer_valid_until, + ); + + (ConnectedResponse::Announce(response), src) + })); + + responses.extend(scrape_requests.drain(..).map(|(request, src)| { + let response = ConnectedResponse::Scrape { + response: handle_scrape_request(&mut torrents, src, request), + original_indices: Vec::new(), + }; + + (response, src) + })); } for r in responses.drain(..) { diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs deleted file mode 100644 index 549d061..0000000 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::net::{IpAddr, SocketAddr}; -use std::vec::Drain; - -use parking_lot::MutexGuard; -use rand::rngs::SmallRng; - -use aquatic_common::convert_ipv4_mapped_ipv6; -use aquatic_udp_protocol::*; - -use crate::common::announce::handle_announce_request; -use crate::common::*; -use crate::config::Config; -use crate::mio::common::*; - -#[inline] -pub fn handle_announce_requests( - config: &Config, - torrents: &mut MutexGuard, - rng: &mut SmallRng, - requests: Drain<(AnnounceRequest, SocketAddr)>, - responses: &mut Vec<(ConnectedResponse, SocketAddr)>, -) { - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - - responses.extend(requests.map(|(request, src)| { - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - let response = match peer_ip { - IpAddr::V4(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv4, - request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv6, - request, - ip, - peer_valid_until, - ), - }; - - (ConnectedResponse::Announce(response), src) - })); -} diff --git a/aquatic_udp/src/lib/mio/handlers/scrape.rs b/aquatic_udp/src/lib/mio/handlers/scrape.rs deleted file mode 100644 index b1a6742..0000000 --- a/aquatic_udp/src/lib/mio/handlers/scrape.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::net::SocketAddr; -use std::vec::Drain; - -use parking_lot::MutexGuard; - -use aquatic_common::convert_ipv4_mapped_ipv6; -use aquatic_udp_protocol::*; - -use crate::mio::common::*; - -use crate::common::*; - -#[inline] -pub fn handle_scrape_requests( - torrents: &mut MutexGuard, - requests: Drain<(ScrapeRequest, SocketAddr)>, - responses: &mut Vec<(ConnectedResponse, SocketAddr)>, -) { - let empty_stats = create_torrent_scrape_statistics(0, 0); - - responses.extend(requests.map(|(request, src)| { - let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); - - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - if peer_ip.is_ipv4() { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv4.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } else { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv6.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } - - let response = ConnectedResponse::Scrape(ScrapeResponse { - transaction_id: request.transaction_id, - torrent_stats: stats, - }); - - (response, src) - })); -} - -#[inline(always)] -fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { - TorrentScrapeStatistics { - seeders: NumberOfPeers(seeders), - completed: NumberOfDownloads(0), // No implementation planned - leechers: NumberOfPeers(leechers), - } -} diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index ad59297..5c5f649 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -2,15 +2,11 @@ use std::thread::Builder; use std::time::Duration; use std::{ ops::Deref, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, + sync::{atomic::AtomicUsize, Arc}, }; use anyhow::Context; use crossbeam_channel::unbounded; -use privdrop::PrivDrop; pub mod common; pub mod handlers; @@ -20,14 +16,15 @@ pub mod tasks; use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; use crate::config::Config; +use crate::drop_privileges_after_socket_binding; use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset, + }); } let state = State::default(); @@ -38,30 +35,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; - if config.privileges.drop_privileges { - let mut counter = 0usize; - - loop { - let sockets = num_bound_sockets.load(Ordering::SeqCst); - - if sockets == config.socket_workers { - PrivDrop::default() - .chroot(config.privileges.chroot_path.clone()) - .user(config.privileges.user.clone()) - .apply()?; - - break; - } - - ::std::thread::sleep(Duration::from_millis(10)); - - counter += 1; - - if counter == 500 { - panic!("Sockets didn't bind in time for privilege drop."); - } - } - } + drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap(); loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); @@ -93,9 +67,9 @@ pub fn start_workers( .name(format!("request-{:02}", i + 1)) .spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset + 1 + i } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset + 1 + i, + }); } handlers::run_request_worker(state, config, request_receiver, response_sender) @@ -114,9 +88,9 @@ pub fn start_workers( .name(format!("socket-{:02}", i + 1)) .spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset + 1 + config.request_workers + i } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset + 1 + config.request_workers + i, + }); } network::run_socket_worker( @@ -139,9 +113,9 @@ pub fn start_workers( .name("statistics-collector".to_string()) .spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset, + }); } loop { diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index fe34023..73dacfc 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -16,6 +16,7 @@ use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use crate::common::handlers::*; use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; @@ -191,9 +192,12 @@ fn read_requests( } Ok(Request::Scrape(request)) => { if connections.contains(request.connection_id, src) { - if let Err(err) = - request_sender.try_send((ConnectedRequest::Scrape(request), src)) - { + let request = ConnectedRequest::Scrape { + request, + original_indices: Vec::new(), + }; + + if let Err(err) = request_sender.try_send((request, src)) { ::log::warn!("request_sender.try_send failed: {:?}", err) } } diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index d71f77c..5eac23d 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -6,9 +6,9 @@ use indicatif::ProgressIterator; use rand::Rng; use rand_distr::Pareto; +use aquatic_udp::common::handlers::*; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::mio::common::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index a7d5c18..f718753 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -6,9 +6,9 @@ use indicatif::ProgressIterator; use rand::Rng; use rand_distr::Pareto; +use aquatic_udp::common::handlers::*; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::mio::common::*; use crate::common::*; use crate::config::BenchConfig; @@ -42,15 +42,20 @@ pub fn bench_scrape_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - request_sender - .send((ConnectedRequest::Scrape(request.clone()), *src)) - .unwrap(); + let request = ConnectedRequest::Scrape { + request: request.clone(), + original_indices: Vec::new(), + }; + + request_sender.send((request, *src)).unwrap(); } - while let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.try_recv() { + while let Ok((ConnectedResponse::Scrape { response, .. }, _)) = + response_receiver.try_recv() + { num_responses += 1; - if let Some(stat) = r.torrent_stats.last() { + if let Some(stat) = response.torrent_stats.last() { dummy ^= stat.leechers.0; } } @@ -59,10 +64,10 @@ pub fn bench_scrape_handler( let total = bench_config.num_scrape_requests * (round + 1); while num_responses < total { - if let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.recv() { + if let Ok((ConnectedResponse::Scrape { response, .. }, _)) = response_receiver.recv() { num_responses += 1; - if let Some(stat) = r.torrent_stats.last() { + if let Some(stat) = response.torrent_stats.last() { dummy ^= stat.leechers.0; } } diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index e1fbe0c..92be3bb 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -34,13 +34,12 @@ impl aquatic_cli_helpers::Config for Config {} fn run(config: Config) -> ::anyhow::Result<()> { let affinity_max = core_affinity::get_core_ids() - .map(|ids| ids.iter().map(|id| id.id ).max()) - .flatten().unwrap_or(0); + .map(|ids| ids.iter().map(|id| id.id).max()) + .flatten() + .unwrap_or(0); if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: affinity_max } - ); + core_affinity::set_for_current(core_affinity::CoreId { id: affinity_max }); } if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape @@ -103,9 +102,9 @@ fn run(config: Config) -> ::anyhow::Result<()> { thread::spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: affinity_max - 1 - i as usize } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: affinity_max - 1 - i as usize, + }); } run_socket_thread(state, response_sender, receiver, &config, addr, thread_id) @@ -120,9 +119,9 @@ fn run(config: Config) -> ::anyhow::Result<()> { thread::spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: affinity_max - config.num_socket_workers as usize - 1 - i as usize } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: affinity_max - config.num_socket_workers as usize - 1 - i as usize, + }); } run_handler_thread(&config, state, pareto, request_senders, response_receiver) });