From 1dcc48ee67c7a5178b669cb8656b87ae44895e50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 12:25:18 +0200 Subject: [PATCH 01/25] aquatic_udp: glommio: setup timer to update peer_valid_until --- aquatic_udp/Cargo.toml | 1 + aquatic_udp/src/lib/glommio/handlers.rs | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 43ebe06..3388c1b 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -15,6 +15,7 @@ path = "src/lib/lib.rs" name = "aquatic_udp" [features] +default = ["with-glommio"] with-glommio = ["glommio", "futures-lite"] [dependencies] diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 9e18322..adeefaa 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -68,8 +68,16 @@ async fn handle_request_stream( { let mut rng = SmallRng::from_entropy(); - // Needs to be updated periodically: use timer? - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + let max_peer_age = config.cleaning.max_peer_age; + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); + + TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { + enclose!((peer_valid_until) move || async move { + *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + + Some(Duration::from_secs(1)) + })() + })); while let Some((producer_index, request, addr)) = stream.next().await { let response = match addr.ip() { @@ -79,7 +87,7 @@ async fn handle_request_stream( &mut torrents.borrow_mut().ipv4, request, ip, - peer_valid_until, + peer_valid_until.borrow().to_owned(), ), IpAddr::V6(ip) => handle_announce_request( &config, @@ -87,7 +95,7 @@ async fn handle_request_stream( &mut torrents.borrow_mut().ipv6, request, ip, - peer_valid_until, + peer_valid_until.borrow().to_owned(), ), }; From 0d39c93239993be341355fd27ae3bcbc492fc9bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 12:41:07 +0200 Subject: [PATCH 02/25] aquatic_udp: glommio: run cleaning tasks in network.rs --- aquatic_udp/src/lib/common/network.rs | 2 +- aquatic_udp/src/lib/glommio/network.rs | 53 ++++++++++++++++++++------ 2 files changed, 43 insertions(+), 12 deletions(-) diff --git a/aquatic_udp/src/lib/common/network.rs b/aquatic_udp/src/lib/common/network.rs index 833c99f..469d658 100644 --- a/aquatic_udp/src/lib/common/network.rs +++ b/aquatic_udp/src/lib/common/network.rs @@ -17,7 +17,7 @@ impl ConnectionMap { self.0.insert((connection_id, socket_addr), valid_until); } - pub fn contains(&mut self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { + pub fn contains(&self, connection_id: ConnectionId, socket_addr: SocketAddr) -> bool { self.0.contains_key(&(connection_id, socket_addr)) } diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 93d5f04..0a87d4b 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,3 +1,4 @@ +use std::cell::RefCell; use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; @@ -5,16 +6,21 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; +use std::time::Duration; use futures_lite::{Stream, StreamExt}; +use glommio::enclose; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_unbounded, LocalSender}; use glommio::net::UdpSocket; use glommio::prelude::*; +use glommio::timer::TimerActionRepeat; use rand::prelude::{Rng, SeedableRng, StdRng}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use super::common::update_access_list; + use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; @@ -76,12 +82,37 @@ async fn read_requests( let access_list_mode = config.access_list.mode; - // Needs to be updated periodically: use timer? - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - // Needs to be updated periodically: use timer? - let access_list = AccessList::default(); - // Needs to be cleaned periodically: use timer? - let mut connections = ConnectionMap::default(); + let max_connection_age = config.cleaning.max_connection_age; + let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); + let access_list = Rc::new(RefCell::new(AccessList::default())); + let connections = Rc::new(RefCell::new(ConnectionMap::default())); + + // Periodically update connection_valid_until + TimerActionRepeat::repeat(enclose!((connection_valid_until) move || { + enclose!((connection_valid_until) move || async move { + *connection_valid_until.borrow_mut() = ValidUntil::new(max_connection_age); + + Some(Duration::from_secs(1)) + })() + })); + + // Periodically update access list + TimerActionRepeat::repeat(enclose!((config, access_list) move || { + enclose!((config, access_list) move || async move { + update_access_list(config.clone(), access_list.clone()).await; + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + + // Periodically clean connections + TimerActionRepeat::repeat(enclose!((config, connections) move || { + enclose!((config, connections) move || async move { + connections.borrow_mut().clean(); + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); let mut buf = [0u8; 2048]; @@ -96,7 +127,7 @@ async fn read_requests( Ok(Request::Connect(request)) => { let connection_id = ConnectionId(rng.gen()); - connections.insert(connection_id, src, valid_until); + connections.borrow_mut().insert(connection_id, src, connection_valid_until.borrow().to_owned()); let response = Response::Connect(ConnectResponse { connection_id, @@ -106,8 +137,8 @@ async fn read_requests( local_sender.try_send((response, src)).unwrap(); } Ok(Request::Announce(request)) => { - if connections.contains(request.connection_id, src) { - if access_list.allows(access_list_mode, &request.info_hash.0) { + if connections.borrow().contains(request.connection_id, src) { + if access_list.borrow().allows(access_list_mode, &request.info_hash.0) { let request_consumer_index = (request.info_hash.0[0] as usize) % config.request_workers; @@ -128,7 +159,7 @@ async fn read_requests( } } Ok(Request::Scrape(request)) => { - if connections.contains(request.connection_id, src) { + if connections.borrow().contains(request.connection_id, src) { let response = Response::Error(ErrorResponse { transaction_id: request.transaction_id, message: "Scrape requests not supported".into(), @@ -146,7 +177,7 @@ async fn read_requests( err, } = err { - if connections.contains(connection_id, src) { + if connections.borrow().contains(connection_id, src) { let response = ErrorResponse { transaction_id, message: err.right_or("Parse error").into(), From aaa609b5f0ac7f7fda635a662794484d6de8a149 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 12:42:20 +0200 Subject: [PATCH 03/25] aquatic_udp: glommio: use MAX_PACKET_SIZE in network.read_requests --- aquatic_udp/src/lib/glommio/network.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 0a87d4b..0e8edbb 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -114,7 +114,7 @@ async fn read_requests( })() })); - let mut buf = [0u8; 2048]; + let mut buf = [0u8; MAX_PACKET_SIZE]; loop { match socket.recv_from(&mut buf).await { From 072f0641383a7868270f6f7f8712bb383b0dae5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 12:42:56 +0200 Subject: [PATCH 04/25] Run cargo fmt --- aquatic_udp/src/lib/glommio/mod.rs | 11 ++++++----- aquatic_udp/src/lib/glommio/network.rs | 13 ++++++++++--- aquatic_udp/src/lib/mio/mod.rs | 24 ++++++++++++------------ aquatic_udp_load_test/src/main.rs | 21 ++++++++++----------- 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 3d60544..78c6e5a 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,5 +1,5 @@ //! Work-in-progress glommio (io_uring) implementation -//! +//! //! * Doesn't support scrape requests //! * Currently not faster than mio implementation @@ -18,9 +18,9 @@ pub const SHARED_CHANNEL_SIZE: usize = 4096; pub fn run(config: Config) -> anyhow::Result<()> { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset, + }); } let num_peers = config.socket_workers + config.request_workers; @@ -65,7 +65,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { let mut builder = LocalExecutorBuilder::default(); if config.core_affinity.set_affinities { - builder = builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i); + builder = + builder.pin_to_cpu(config.core_affinity.offset + 1 + config.socket_workers + i); } let executor = builder.spawn(|| async move { diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 0e8edbb..747e094 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -9,9 +9,9 @@ use std::sync::{ use std::time::Duration; use futures_lite::{Stream, StreamExt}; -use glommio::enclose; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_unbounded, LocalSender}; +use glommio::enclose; use glommio::net::UdpSocket; use glommio::prelude::*; use glommio::timer::TimerActionRepeat; @@ -127,7 +127,11 @@ async fn read_requests( Ok(Request::Connect(request)) => { let connection_id = ConnectionId(rng.gen()); - connections.borrow_mut().insert(connection_id, src, connection_valid_until.borrow().to_owned()); + connections.borrow_mut().insert( + connection_id, + src, + connection_valid_until.borrow().to_owned(), + ); let response = Response::Connect(ConnectResponse { connection_id, @@ -138,7 +142,10 @@ async fn read_requests( } Ok(Request::Announce(request)) => { if connections.borrow().contains(request.connection_id, src) { - if access_list.borrow().allows(access_list_mode, &request.info_hash.0) { + if access_list + .borrow() + .allows(access_list_mode, &request.info_hash.0) + { let request_consumer_index = (request.info_hash.0[0] as usize) % config.request_workers; diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index ad59297..abda9c9 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -25,9 +25,9 @@ use common::State; pub fn run(config: Config) -> ::anyhow::Result<()> { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset, + }); } let state = State::default(); @@ -93,9 +93,9 @@ pub fn start_workers( .name(format!("request-{:02}", i + 1)) .spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset + 1 + i } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset + 1 + i, + }); } handlers::run_request_worker(state, config, request_receiver, response_sender) @@ -114,9 +114,9 @@ pub fn start_workers( .name(format!("socket-{:02}", i + 1)) .spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset + 1 + config.request_workers + i } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset + 1 + config.request_workers + i, + }); } network::run_socket_worker( @@ -139,9 +139,9 @@ pub fn start_workers( .name("statistics-collector".to_string()) .spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: config.core_affinity.offset } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: config.core_affinity.offset, + }); } loop { diff --git a/aquatic_udp_load_test/src/main.rs b/aquatic_udp_load_test/src/main.rs index e1fbe0c..92be3bb 100644 --- a/aquatic_udp_load_test/src/main.rs +++ b/aquatic_udp_load_test/src/main.rs @@ -34,13 +34,12 @@ impl aquatic_cli_helpers::Config for Config {} fn run(config: Config) -> ::anyhow::Result<()> { let affinity_max = core_affinity::get_core_ids() - .map(|ids| ids.iter().map(|id| id.id ).max()) - .flatten().unwrap_or(0); + .map(|ids| ids.iter().map(|id| id.id).max()) + .flatten() + .unwrap_or(0); if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: affinity_max } - ); + core_affinity::set_for_current(core_affinity::CoreId { id: affinity_max }); } if config.handler.weight_announce + config.handler.weight_connect + config.handler.weight_scrape @@ -103,9 +102,9 @@ fn run(config: Config) -> ::anyhow::Result<()> { thread::spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: affinity_max - 1 - i as usize } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: affinity_max - 1 - i as usize, + }); } run_socket_thread(state, response_sender, receiver, &config, addr, thread_id) @@ -120,9 +119,9 @@ fn run(config: Config) -> ::anyhow::Result<()> { thread::spawn(move || { if config.core_affinity.set_affinities { - core_affinity::set_for_current( - core_affinity::CoreId { id: affinity_max - config.num_socket_workers as usize - 1 - i as usize } - ); + core_affinity::set_for_current(core_affinity::CoreId { + id: affinity_max - config.num_socket_workers as usize - 1 - i as usize, + }); } run_handler_thread(&config, state, pareto, request_senders, response_receiver) }); From 2732e520f77508761e39d22f6b1bb0794125481a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 12:48:49 +0200 Subject: [PATCH 05/25] aquatic_udp: glommio: improve access list error reporting --- aquatic_udp/src/lib/glommio/common.rs | 35 +++++++++++++++++---------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs index 5dd6eeb..3ab9210 100644 --- a/aquatic_udp/src/lib/glommio/common.rs +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -10,23 +10,32 @@ use crate::config::Config; pub async fn update_access_list(config: Config, access_list: Rc>) { if config.access_list.mode.is_on() { - let access_list_file = BufferedFile::open(config.access_list.path).await.unwrap(); + match BufferedFile::open(config.access_list.path).await { + Ok(file) => { + let mut reader = StreamReaderBuilder::new(file).build(); - let mut reader = StreamReaderBuilder::new(access_list_file).build(); + loop { + let mut buf = String::with_capacity(42); - loop { - let mut buf = String::with_capacity(42); + match reader.read_line(&mut buf).await { + Ok(_) => { + if let Err(err) = access_list.borrow_mut().insert_from_line(&buf) { + ::log::error!("Couln't parse access list line '{}': {:?}", buf, err); + } + } + Err(err) => { + ::log::error!("Couln't read access list line {:?}", err); - match reader.read_line(&mut buf).await { - Ok(_) => { - access_list.borrow_mut().insert_from_line(&buf); - } - Err(err) => { - break; + break; + } + } + + yield_if_needed().await; } + }, + Err(err) => { + ::log::error!("Couldn't open access list file: {:?}", err) } - - yield_if_needed().await; - } + }; } } From 119cd0fe4674b45bc2c7789f09207e8448b693f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 12:57:00 +0200 Subject: [PATCH 06/25] Improve common AccessList code --- aquatic_common/src/access_list.rs | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 28e24e3..06b5dd4 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -51,6 +51,20 @@ impl AccessList { Ok(()) } + + pub fn create_from_path(path: &PathBuf) -> anyhow::Result { + let file = File::open(path)?; + let reader = BufReader::new(file); + + let mut new_list = Self::default(); + + for line in reader.lines() { + new_list.insert_from_line(&line?)?; + } + + Ok(new_list) + } + pub fn allows(&self, mode: AccessListMode, info_hash: &[u8; 20]) -> bool { match mode { AccessListMode::White => self.0.contains(info_hash), @@ -69,16 +83,7 @@ pub type AccessListArcSwap = ArcSwap; impl AccessListQuery for AccessListArcSwap { fn update_from_path(&self, path: &PathBuf) -> anyhow::Result<()> { - let file = File::open(path)?; - let reader = BufReader::new(file); - - let mut new_list = HashSet::new(); - - for line in reader.lines() { - new_list.insert(parse_info_hash(&line?)?); - } - - self.store(Arc::new(AccessList(new_list))); + self.store(Arc::new(AccessList::create_from_path(path)?)); Ok(()) } From d0be89388ca4160734e14acb051d14056788b629 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 13:04:14 +0200 Subject: [PATCH 07/25] aquatic_udp: glommio: read access list on start --- aquatic_udp/src/lib/glommio/common.rs | 8 ++++++-- aquatic_udp/src/lib/glommio/handlers.rs | 3 ++- aquatic_udp/src/lib/glommio/mod.rs | 18 +++++++++++++++++- aquatic_udp/src/lib/glommio/network.rs | 5 ++++- 4 files changed, 29 insertions(+), 5 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs index 3ab9210..d1d398c 100644 --- a/aquatic_udp/src/lib/glommio/common.rs +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -20,7 +20,11 @@ pub async fn update_access_list(config: Config, access_list: Rc { if let Err(err) = access_list.borrow_mut().insert_from_line(&buf) { - ::log::error!("Couln't parse access list line '{}': {:?}", buf, err); + ::log::error!( + "Couln't parse access list line '{}': {:?}", + buf, + err + ); } } Err(err) => { @@ -32,7 +36,7 @@ pub async fn update_access_list(config: Config, access_list: Rc { ::log::error!("Couldn't open access list file: {:?}", err) } diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index adeefaa..8ccc0c9 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -19,6 +19,7 @@ pub async fn run_request_worker( config: Config, request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, + access_list: AccessList, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); @@ -26,7 +27,7 @@ pub async fn run_request_worker( let response_senders = Rc::new(response_senders); let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - let access_list = Rc::new(RefCell::new(AccessList::default())); + let access_list = Rc::new(RefCell::new(access_list)); // Periodically clean torrents and update access list TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 78c6e5a..1064fd7 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -5,6 +5,7 @@ use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_common::access_list::AccessList; use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; @@ -23,6 +24,12 @@ pub fn run(config: Config) -> anyhow::Result<()> { }); } + let access_list = if config.access_list.mode.is_on() { + AccessList::create_from_path(&config.access_list.path).expect("Load access list") + } else { + AccessList::default() + }; + let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -37,6 +44,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); + let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -50,6 +58,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { request_mesh_builder, response_mesh_builder, num_bound_sockets, + access_list, ) .await }); @@ -61,6 +70,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let config = config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); + let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -70,7 +80,13 @@ pub fn run(config: Config) -> anyhow::Result<()> { } let executor = builder.spawn(|| async move { - handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await + handlers::run_request_worker( + config, + request_mesh_builder, + response_mesh_builder, + access_list, + ) + .await }); executors.push(executor); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 747e094..d66db0d 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -30,6 +30,7 @@ pub async fn run_socket_worker( request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, num_bound_sockets: Arc, + access_list: AccessList, ) { let (local_sender, local_receiver) = new_unbounded(); @@ -57,6 +58,7 @@ pub async fn run_socket_worker( response_consumer_index, local_sender, socket.clone(), + access_list, )) .detach(); @@ -77,6 +79,7 @@ async fn read_requests( response_consumer_index: usize, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, + access_list: AccessList, ) { let mut rng = StdRng::from_entropy(); @@ -84,7 +87,7 @@ async fn read_requests( let max_connection_age = config.cleaning.max_connection_age; let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); - let access_list = Rc::new(RefCell::new(AccessList::default())); + let access_list = Rc::new(RefCell::new(access_list)); let connections = Rc::new(RefCell::new(ConnectionMap::default())); // Periodically update connection_valid_until From 08920fce5f6d91f6717dd55a16c96a571fc348d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 14:05:54 +0200 Subject: [PATCH 08/25] WIP: aquatic_udp: glommio: work on handling scrape requests --- .../lib/common/{announce.rs => handlers.rs} | 75 ++++++++++++++++++ aquatic_udp/src/lib/common/mod.rs | 2 +- aquatic_udp/src/lib/glommio/handlers.rs | 52 +++++++------ aquatic_udp/src/lib/glommio/network.rs | 78 ++++++++++++++++--- aquatic_udp/src/lib/mio/common.rs | 19 ----- aquatic_udp/src/lib/mio/handlers/announce.rs | 4 +- aquatic_udp/src/lib/mio/handlers/mod.rs | 8 +- aquatic_udp/src/lib/mio/handlers/scrape.rs | 66 ---------------- aquatic_udp/src/lib/mio/network.rs | 1 + aquatic_udp_bench/src/announce.rs | 2 +- aquatic_udp_bench/src/scrape.rs | 2 +- 11 files changed, 183 insertions(+), 126 deletions(-) rename aquatic_udp/src/lib/common/{announce.rs => handlers.rs} (69%) delete mode 100644 aquatic_udp/src/lib/mio/handlers/scrape.rs diff --git a/aquatic_udp/src/lib/common/announce.rs b/aquatic_udp/src/lib/common/handlers.rs similarity index 69% rename from aquatic_udp/src/lib/common/announce.rs rename to aquatic_udp/src/lib/common/handlers.rs index 2a63b61..41d7728 100644 --- a/aquatic_udp/src/lib/common/announce.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -1,9 +1,33 @@ +use std::net::SocketAddr; + use rand::rngs::SmallRng; use aquatic_common::extract_response_peers; +use aquatic_common::convert_ipv4_mapped_ipv6; use crate::common::*; +#[derive(Debug)] +pub enum ConnectedRequest { + Announce(AnnounceRequest), + Scrape(ScrapeRequest), +} + +#[derive(Debug)] +pub enum ConnectedResponse { + Announce(AnnounceResponse), + Scrape(ScrapeResponse), +} + +impl Into for ConnectedResponse { + fn into(self) -> Response { + match self { + Self::Announce(response) => Response::Announce(response), + Self::Scrape(response) => Response::Scrape(response), + } + } +} + pub fn handle_announce_request( config: &Config, rng: &mut SmallRng, @@ -83,6 +107,57 @@ fn calc_max_num_peers_to_take(config: &Config, peers_wanted: i32) -> usize { } } +#[inline] +pub fn handle_scrape_request( + torrents: &mut TorrentMaps, + src: SocketAddr, + request: ScrapeRequest, +) -> ScrapeResponse { + let empty_stats = create_torrent_scrape_statistics(0, 0); + + let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); + + let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); + + if peer_ip.is_ipv4() { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv4.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } + } + } else { + for info_hash in request.info_hashes.iter() { + if let Some(torrent_data) = torrents.ipv6.get(info_hash) { + stats.push(create_torrent_scrape_statistics( + torrent_data.num_seeders as i32, + torrent_data.num_leechers as i32, + )); + } else { + stats.push(empty_stats); + } + } + } + + ScrapeResponse { + transaction_id: request.transaction_id, + torrent_stats: stats, + } +} + +#[inline(always)] +fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { + TorrentScrapeStatistics { + seeders: NumberOfPeers(seeders), + completed: NumberOfDownloads(0), // No implementation planned + leechers: NumberOfPeers(leechers), + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; diff --git a/aquatic_udp/src/lib/common/mod.rs b/aquatic_udp/src/lib/common/mod.rs index 8c84c3c..bcc073b 100644 --- a/aquatic_udp/src/lib/common/mod.rs +++ b/aquatic_udp/src/lib/common/mod.rs @@ -11,7 +11,7 @@ pub use aquatic_udp_protocol::*; use crate::config::Config; -pub mod announce; +pub mod handlers; pub mod network; pub const MAX_PACKET_SIZE: usize = 4096; diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 8ccc0c9..88ddc33 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -10,15 +10,16 @@ use glommio::{enclose, prelude::*}; use rand::prelude::SmallRng; use rand::SeedableRng; -use crate::common::announce::handle_announce_request; +use crate::common::handlers::handle_announce_request; use crate::common::*; +use crate::common::handlers::*; use crate::config::Config; use crate::glommio::common::update_access_list; pub async fn run_request_worker( config: Config, - request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, - response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, + request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, access_list: AccessList, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); @@ -62,10 +63,10 @@ pub async fn run_request_worker( async fn handle_request_stream( config: Config, torrents: Rc>, - response_senders: Rc>, + response_senders: Rc>, mut stream: S, ) where - S: Stream + ::std::marker::Unpin, + S: Stream + ::std::marker::Unpin, { let mut rng = SmallRng::from_entropy(); @@ -81,23 +82,30 @@ async fn handle_request_stream( })); while let Some((producer_index, request, addr)) = stream.next().await { - let response = match addr.ip() { - IpAddr::V4(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv4, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), - IpAddr::V6(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv6, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), + let response = match request { + ConnectedRequest::Announce(request) => { + ConnectedResponse::Announce(match addr.ip() { + IpAddr::V4(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv4, + request, + ip, + peer_valid_until.borrow().to_owned(), + ), + IpAddr::V6(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv6, + request, + ip, + peer_valid_until.borrow().to_owned(), + ), + }) + } + ConnectedRequest::Scrape(request) => { + ConnectedResponse::Scrape(handle_scrape_request(&mut torrents.borrow_mut(), addr, request)) + } }; ::log::debug!("preparing to send response to channel: {:?}", response); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index d66db0d..22516a7 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,5 +1,6 @@ use std::cell::RefCell; use std::io::Cursor; +use std::iter::FromIterator; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; use std::sync::{ @@ -15,20 +16,45 @@ use glommio::enclose; use glommio::net::UdpSocket; use glommio::prelude::*; use glommio::timer::TimerActionRepeat; +use hashbrown::HashMap; use rand::prelude::{Rng, SeedableRng, StdRng}; use aquatic_udp_protocol::{IpVersion, Request, Response}; use super::common::update_access_list; +use crate::common::handlers::*; use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; +struct PendingScrapeResponse { + pending_worker_responses: usize, + valid_until: ValidUntil, + src: SocketAddr, + stats: Vec, +} + +#[derive(Default)] +struct PendingScrapeResponses(HashMap); + +impl PendingScrapeResponses { + fn insert_empty(&mut self, transaction_id: TransactionId, src: SocketAddr, pending_worker_responses: usize, valid_until: ValidUntil) { + let pending = PendingScrapeResponse { + pending_worker_responses, + valid_until, + src, + stats: Vec::new(), + }; + + self.0.insert(transaction_id, pending); + } +} + pub async fn run_socket_worker( config: Config, - request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, - response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, + request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>, + response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>, num_bound_sockets: Arc, access_list: AccessList, ) { @@ -52,12 +78,15 @@ pub async fn run_socket_worker( let response_consumer_index = response_receivers.consumer_id().unwrap(); + let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default())); + spawn_local(read_requests( config.clone(), request_senders, response_consumer_index, local_sender, socket.clone(), + pending_scrape_responses, access_list, )) .detach(); @@ -75,10 +104,11 @@ pub async fn run_socket_worker( async fn read_requests( config: Config, - request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>, + request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>, response_consumer_index: usize, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, + pending_scrape_responses: Rc>, access_list: AccessList, ) { let mut rng = StdRng::from_entropy(); @@ -150,11 +180,11 @@ async fn read_requests( .allows(access_list_mode, &request.info_hash.0) { let request_consumer_index = - (request.info_hash.0[0] as usize) % config.request_workers; + calculate_request_consumer_index(&config, request.info_hash); if let Err(err) = request_senders.try_send_to( request_consumer_index, - (response_consumer_index, request, src), + (response_consumer_index, ConnectedRequest::Announce(request), src), ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -170,12 +200,36 @@ async fn read_requests( } Ok(Request::Scrape(request)) => { if connections.borrow().contains(request.connection_id, src) { - let response = Response::Error(ErrorResponse { - transaction_id: request.transaction_id, - message: "Scrape requests not supported".into(), - }); + let mut consumer_requests: HashMap = HashMap::new(); - local_sender.try_send((response, src)).unwrap(); + for info_hash in request.info_hashes { + consumer_requests + .entry(calculate_request_consumer_index(&config, info_hash)) + .or_insert( + ScrapeRequest { + transaction_id: request.transaction_id, + connection_id: request.connection_id, + info_hashes: Vec::new(), + } + ) + .info_hashes.push(info_hash); + } + + pending_scrape_responses.borrow_mut().insert_empty( + request.transaction_id, + src, + consumer_requests.len(), + connection_valid_until.borrow().to_owned(), // FIXME: use seperate ValidUntil + ); + + for (consumer_index, request) in consumer_requests { + if let Err(err) = request_senders.try_send_to( + consumer_index, + (response_consumer_index, ConnectedRequest::Scrape(request), src), + ) { + ::log::warn!("request_sender.try_send failed: {:?}", err) + } + } } } Err(err) => { @@ -234,6 +288,10 @@ where } } +fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { + (info_hash.0[0] as usize) % config.request_workers +} + fn ip_version_from_ip(ip: IpAddr) -> IpVersion { match ip { IpAddr::V4(_) => IpVersion::IPv4, diff --git a/aquatic_udp/src/lib/mio/common.rs b/aquatic_udp/src/lib/mio/common.rs index 8bf2233..bcaff2f 100644 --- a/aquatic_udp/src/lib/mio/common.rs +++ b/aquatic_udp/src/lib/mio/common.rs @@ -4,25 +4,6 @@ use std::sync::{atomic::AtomicUsize, Arc}; use crate::common::*; -pub enum ConnectedRequest { - Announce(AnnounceRequest), - Scrape(ScrapeRequest), -} - -pub enum ConnectedResponse { - Announce(AnnounceResponse), - Scrape(ScrapeResponse), -} - -impl Into for ConnectedResponse { - fn into(self) -> Response { - match self { - Self::Announce(response) => Response::Announce(response), - Self::Scrape(response) => Response::Scrape(response), - } - } -} - #[derive(Default)] pub struct Statistics { pub requests_received: AtomicUsize, diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index 549d061..b399e3b 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -7,10 +7,10 @@ use rand::rngs::SmallRng; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; -use crate::common::announce::handle_announce_request; +use crate::common::handlers::handle_announce_request; use crate::common::*; use crate::config::Config; -use crate::mio::common::*; +use crate::common::handlers::*; #[inline] pub fn handle_announce_requests( diff --git a/aquatic_udp/src/lib/mio/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers/mod.rs index af8e5a8..5afd71b 100644 --- a/aquatic_udp/src/lib/mio/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers/mod.rs @@ -7,13 +7,11 @@ use rand::{rngs::SmallRng, SeedableRng}; use aquatic_udp_protocol::*; use crate::config::Config; +use crate::common::handlers::*; use crate::mio::common::*; mod announce; -mod scrape; - use announce::handle_announce_requests; -use scrape::handle_scrape_requests; pub fn run_request_worker( state: State, @@ -76,7 +74,9 @@ pub fn run_request_worker( &mut responses, ); - handle_scrape_requests(&mut torrents, scrape_requests.drain(..), &mut responses); + responses.extend(scrape_requests.drain(..).map(|(request, src)| { + (ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), src) + })); } for r in responses.drain(..) { diff --git a/aquatic_udp/src/lib/mio/handlers/scrape.rs b/aquatic_udp/src/lib/mio/handlers/scrape.rs deleted file mode 100644 index b1a6742..0000000 --- a/aquatic_udp/src/lib/mio/handlers/scrape.rs +++ /dev/null @@ -1,66 +0,0 @@ -use std::net::SocketAddr; -use std::vec::Drain; - -use parking_lot::MutexGuard; - -use aquatic_common::convert_ipv4_mapped_ipv6; -use aquatic_udp_protocol::*; - -use crate::mio::common::*; - -use crate::common::*; - -#[inline] -pub fn handle_scrape_requests( - torrents: &mut MutexGuard, - requests: Drain<(ScrapeRequest, SocketAddr)>, - responses: &mut Vec<(ConnectedResponse, SocketAddr)>, -) { - let empty_stats = create_torrent_scrape_statistics(0, 0); - - responses.extend(requests.map(|(request, src)| { - let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); - - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - if peer_ip.is_ipv4() { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv4.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } else { - for info_hash in request.info_hashes.iter() { - if let Some(torrent_data) = torrents.ipv6.get(info_hash) { - stats.push(create_torrent_scrape_statistics( - torrent_data.num_seeders as i32, - torrent_data.num_leechers as i32, - )); - } else { - stats.push(empty_stats); - } - } - } - - let response = ConnectedResponse::Scrape(ScrapeResponse { - transaction_id: request.transaction_id, - torrent_stats: stats, - }); - - (response, src) - })); -} - -#[inline(always)] -fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { - TorrentScrapeStatistics { - seeders: NumberOfPeers(seeders), - completed: NumberOfDownloads(0), // No implementation planned - leechers: NumberOfPeers(leechers), - } -} diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index fe34023..1739dd7 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -18,6 +18,7 @@ use aquatic_udp_protocol::{IpVersion, Request, Response}; use crate::common::network::ConnectionMap; use crate::common::*; +use crate::common::handlers::*; use crate::config::Config; use super::common::*; diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index d71f77c..eb09c3a 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -8,7 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::mio::common::*; +use aquatic_udp::common::handlers::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index a7d5c18..450b65e 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -8,7 +8,7 @@ use rand_distr::Pareto; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::mio::common::*; +use aquatic_udp::common::handlers::*; use crate::common::*; use crate::config::BenchConfig; From 96196239f5b3d3eeb9458fc7c258579581e373ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 14:42:42 +0200 Subject: [PATCH 09/25] aquatic_udp: glommio: mostly implement scrape request support --- aquatic_common/src/access_list.rs | 2 +- aquatic_udp/src/lib/glommio/network.rs | 114 +++++++++++++++++++------ 2 files changed, 88 insertions(+), 28 deletions(-) diff --git a/aquatic_common/src/access_list.rs b/aquatic_common/src/access_list.rs index 06b5dd4..69f856c 100644 --- a/aquatic_common/src/access_list.rs +++ b/aquatic_common/src/access_list.rs @@ -42,7 +42,7 @@ impl Default for AccessListConfig { } } -#[derive(Default)] +#[derive(Default, Clone)] pub struct AccessList(HashSet<[u8; 20]>); impl AccessList { diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 22516a7..0ef3ae4 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,6 +1,5 @@ use std::cell::RefCell; use std::io::Cursor; -use std::iter::FromIterator; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; use std::sync::{ @@ -31,7 +30,6 @@ use crate::config::Config; struct PendingScrapeResponse { pending_worker_responses: usize, valid_until: ValidUntil, - src: SocketAddr, stats: Vec, } @@ -39,16 +37,39 @@ struct PendingScrapeResponse { struct PendingScrapeResponses(HashMap); impl PendingScrapeResponses { - fn insert_empty(&mut self, transaction_id: TransactionId, src: SocketAddr, pending_worker_responses: usize, valid_until: ValidUntil) { + fn prepare(&mut self, transaction_id: TransactionId, pending_worker_responses: usize, valid_until: ValidUntil) { let pending = PendingScrapeResponse { pending_worker_responses, valid_until, - src, stats: Vec::new(), }; self.0.insert(transaction_id, pending); } + + fn add_and_get_finished(&mut self, mut response: ScrapeResponse) -> Option { + let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) { + r.pending_worker_responses -= 1; + r.stats.append(&mut response.torrent_stats); + + r.pending_worker_responses == 0 + } else { + ::log::warn!("PendingScrapeResponses.add didn't find PendingScrapeResponse in map"); + + false + }; + + if finished { + let r = self.0.remove(&response.transaction_id).unwrap(); + + Some(ScrapeResponse { + transaction_id: response.transaction_id, + torrent_stats: r.stats, + }) + } else { + None + } + } } pub async fn run_socket_worker( @@ -78,9 +99,10 @@ pub async fn run_socket_worker( let response_consumer_index = response_receivers.consumer_id().unwrap(); + // FIXME: needs cleaning let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default())); - spawn_local(read_requests( + spawn_local(enclose!((pending_scrape_responses) read_requests( config.clone(), request_senders, response_consumer_index, @@ -88,18 +110,19 @@ pub async fn run_socket_worker( socket.clone(), pending_scrape_responses, access_list, - )) + ))) .detach(); for (_, receiver) in response_receivers.streams().into_iter() { - spawn_local(send_responses( + spawn_local(enclose!((pending_scrape_responses) handle_shared_responses( socket.clone(), - receiver.map(|(response, addr)| (response.into(), addr)), - )) + pending_scrape_responses, + receiver, + ))) .detach(); } - send_responses(socket, local_receiver.stream()).await; + send_local_responses(socket, local_receiver.stream()).await; } async fn read_requests( @@ -215,9 +238,8 @@ async fn read_requests( .info_hashes.push(info_hash); } - pending_scrape_responses.borrow_mut().insert_empty( + pending_scrape_responses.borrow_mut().prepare( request.transaction_id, - src, consumer_requests.len(), connection_valid_until.borrow().to_owned(), // FIXME: use seperate ValidUntil ); @@ -262,32 +284,70 @@ async fn read_requests( } } -async fn send_responses(socket: Rc, mut stream: S) +async fn handle_shared_responses( + socket: Rc, + pending_scrape_responses: Rc>, + mut stream: S, +) where + S: Stream + ::std::marker::Unpin, +{ + let mut buf = [0u8; MAX_PACKET_SIZE]; + let mut buf = Cursor::new(&mut buf[..]); + + while let Some((response, addr)) = stream.next().await { + let opt_response = match response { + ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)), + ConnectedResponse::Scrape(response) => { + pending_scrape_responses + .borrow_mut() + .add_and_get_finished(response) + .map(|response| (Response::Scrape(response), addr)) + }, + }; + + if let Some((response, addr)) = opt_response { + write_response_to_socket(&socket, &mut buf, addr, response).await; + } + + yield_if_needed().await; + } +} + +async fn send_local_responses(socket: Rc, mut stream: S) where S: Stream + ::std::marker::Unpin, { let mut buf = [0u8; MAX_PACKET_SIZE]; let mut buf = Cursor::new(&mut buf[..]); - while let Some((response, src)) = stream.next().await { - buf.set_position(0); - - ::log::debug!("preparing to send response: {:?}", response.clone()); - - response - .write(&mut buf, ip_version_from_ip(src.ip())) - .expect("write response"); - - let position = buf.position() as usize; - - if let Err(err) = socket.send_to(&buf.get_ref()[..position], src).await { - ::log::info!("send_to failed: {:?}", err); - } + while let Some((response, addr)) = stream.next().await { + write_response_to_socket(&socket, &mut buf, addr, response).await; yield_if_needed().await; } } +async fn write_response_to_socket( + socket: &Rc, + buf: &mut Cursor<&mut [u8]>, + addr: SocketAddr, + response: Response, +) { + buf.set_position(0); + + ::log::debug!("preparing to send response: {:?}", response.clone()); + + response + .write(buf, ip_version_from_ip(addr.ip())) + .expect("write response"); + + let position = buf.position() as usize; + + if let Err(err) = socket.send_to(&buf.get_ref()[..position], addr).await { + ::log::info!("send_to failed: {:?}", err); + } +} + fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { (info_hash.0[0] as usize) % config.request_workers } From 33558224223d2e691599a3639957d23e02210494 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 14:50:06 +0200 Subject: [PATCH 10/25] aquatic_udp: glommio: periodically clean/update pending scrape vars --- aquatic_udp/src/lib/glommio/network.rs | 35 +++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 3 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 0ef3ae4..eabbec4 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -6,7 +6,7 @@ use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; -use std::time::Duration; +use std::time::{Duration, Instant}; use futures_lite::{Stream, StreamExt}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; @@ -27,6 +27,8 @@ use crate::common::network::ConnectionMap; use crate::common::*; use crate::config::Config; +const PENDING_SCRAPE_MAX_WAIT: u64 = 30; + struct PendingScrapeResponse { pending_worker_responses: usize, valid_until: ValidUntil, @@ -70,6 +72,15 @@ impl PendingScrapeResponses { None } } + + fn clean(&mut self) { + let now = Instant::now(); + + self.0.retain(|_, v| { + v.valid_until.0 > now + }); + self.0.shrink_to_fit(); + } } pub async fn run_socket_worker( @@ -99,9 +110,17 @@ pub async fn run_socket_worker( let response_consumer_index = response_receivers.consumer_id().unwrap(); - // FIXME: needs cleaning let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default())); + // Periodically clean pending_scrape_responses + TimerActionRepeat::repeat(enclose!((config, pending_scrape_responses) move || { + enclose!((config, pending_scrape_responses) move || async move { + pending_scrape_responses.borrow_mut().clean(); + + Some(Duration::from_secs(config.cleaning.interval)) + })() + })); + spawn_local(enclose!((pending_scrape_responses) read_requests( config.clone(), request_senders, @@ -140,6 +159,7 @@ async fn read_requests( let max_connection_age = config.cleaning.max_connection_age; let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); + let pending_scrape_valid_until = Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); let access_list = Rc::new(RefCell::new(access_list)); let connections = Rc::new(RefCell::new(ConnectionMap::default())); @@ -152,6 +172,15 @@ async fn read_requests( })() })); + // Periodically update pending_scrape_valid_until + TimerActionRepeat::repeat(enclose!((pending_scrape_valid_until) move || { + enclose!((pending_scrape_valid_until) move || async move { + *pending_scrape_valid_until.borrow_mut() = ValidUntil::new(PENDING_SCRAPE_MAX_WAIT); + + Some(Duration::from_secs(10)) + })() + })); + // Periodically update access list TimerActionRepeat::repeat(enclose!((config, access_list) move || { enclose!((config, access_list) move || async move { @@ -241,7 +270,7 @@ async fn read_requests( pending_scrape_responses.borrow_mut().prepare( request.transaction_id, consumer_requests.len(), - connection_valid_until.borrow().to_owned(), // FIXME: use seperate ValidUntil + pending_scrape_valid_until.borrow().to_owned(), ); for (consumer_index, request) in consumer_requests { From 15642914716246cb1ac7603740385534ed298934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 14:56:00 +0200 Subject: [PATCH 11/25] Run rustfmt --- aquatic_udp/src/lib/common/handlers.rs | 2 +- aquatic_udp/src/lib/glommio/handlers.rs | 48 ++++++++--------- aquatic_udp/src/lib/glommio/network.rs | 54 ++++++++++++-------- aquatic_udp/src/lib/mio/handlers/announce.rs | 2 +- aquatic_udp/src/lib/mio/handlers/mod.rs | 7 ++- aquatic_udp/src/lib/mio/network.rs | 2 +- aquatic_udp_bench/src/announce.rs | 2 +- aquatic_udp_bench/src/scrape.rs | 2 +- 8 files changed, 66 insertions(+), 53 deletions(-) diff --git a/aquatic_udp/src/lib/common/handlers.rs b/aquatic_udp/src/lib/common/handlers.rs index 41d7728..ce6446f 100644 --- a/aquatic_udp/src/lib/common/handlers.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -2,8 +2,8 @@ use std::net::SocketAddr; use rand::rngs::SmallRng; -use aquatic_common::extract_response_peers; use aquatic_common::convert_ipv4_mapped_ipv6; +use aquatic_common::extract_response_peers; use crate::common::*; diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 88ddc33..809fca0 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -11,8 +11,8 @@ use rand::prelude::SmallRng; use rand::SeedableRng; use crate::common::handlers::handle_announce_request; -use crate::common::*; use crate::common::handlers::*; +use crate::common::*; use crate::config::Config; use crate::glommio::common::update_access_list; @@ -83,29 +83,29 @@ async fn handle_request_stream( while let Some((producer_index, request, addr)) = stream.next().await { let response = match request { - ConnectedRequest::Announce(request) => { - ConnectedResponse::Announce(match addr.ip() { - IpAddr::V4(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv4, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), - IpAddr::V6(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv6, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), - }) - } - ConnectedRequest::Scrape(request) => { - ConnectedResponse::Scrape(handle_scrape_request(&mut torrents.borrow_mut(), addr, request)) - } + ConnectedRequest::Announce(request) => ConnectedResponse::Announce(match addr.ip() { + IpAddr::V4(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv4, + request, + ip, + peer_valid_until.borrow().to_owned(), + ), + IpAddr::V6(ip) => handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut().ipv6, + request, + ip, + peer_valid_until.borrow().to_owned(), + ), + }), + ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape(handle_scrape_request( + &mut torrents.borrow_mut(), + addr, + request, + )), }; ::log::debug!("preparing to send response to channel: {:?}", response); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index eabbec4..df35f00 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -39,7 +39,12 @@ struct PendingScrapeResponse { struct PendingScrapeResponses(HashMap); impl PendingScrapeResponses { - fn prepare(&mut self, transaction_id: TransactionId, pending_worker_responses: usize, valid_until: ValidUntil) { + fn prepare( + &mut self, + transaction_id: TransactionId, + pending_worker_responses: usize, + valid_until: ValidUntil, + ) { let pending = PendingScrapeResponse { pending_worker_responses, valid_until, @@ -76,9 +81,7 @@ impl PendingScrapeResponses { fn clean(&mut self) { let now = Instant::now(); - self.0.retain(|_, v| { - v.valid_until.0 > now - }); + self.0.retain(|_, v| v.valid_until.0 > now); self.0.shrink_to_fit(); } } @@ -159,7 +162,8 @@ async fn read_requests( let max_connection_age = config.cleaning.max_connection_age; let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); - let pending_scrape_valid_until = Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); + let pending_scrape_valid_until = + Rc::new(RefCell::new(ValidUntil::new(PENDING_SCRAPE_MAX_WAIT))); let access_list = Rc::new(RefCell::new(access_list)); let connections = Rc::new(RefCell::new(ConnectionMap::default())); @@ -236,7 +240,11 @@ async fn read_requests( if let Err(err) = request_senders.try_send_to( request_consumer_index, - (response_consumer_index, ConnectedRequest::Announce(request), src), + ( + response_consumer_index, + ConnectedRequest::Announce(request), + src, + ), ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -252,19 +260,19 @@ async fn read_requests( } Ok(Request::Scrape(request)) => { if connections.borrow().contains(request.connection_id, src) { - let mut consumer_requests: HashMap = HashMap::new(); + let mut consumer_requests: HashMap = + HashMap::new(); for info_hash in request.info_hashes { consumer_requests .entry(calculate_request_consumer_index(&config, info_hash)) - .or_insert( - ScrapeRequest { - transaction_id: request.transaction_id, - connection_id: request.connection_id, - info_hashes: Vec::new(), - } - ) - .info_hashes.push(info_hash); + .or_insert(ScrapeRequest { + transaction_id: request.transaction_id, + connection_id: request.connection_id, + info_hashes: Vec::new(), + }) + .info_hashes + .push(info_hash); } pending_scrape_responses.borrow_mut().prepare( @@ -276,7 +284,11 @@ async fn read_requests( for (consumer_index, request) in consumer_requests { if let Err(err) = request_senders.try_send_to( consumer_index, - (response_consumer_index, ConnectedRequest::Scrape(request), src), + ( + response_consumer_index, + ConnectedRequest::Scrape(request), + src, + ), ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -326,12 +338,10 @@ async fn handle_shared_responses( while let Some((response, addr)) = stream.next().await { let opt_response = match response { ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)), - ConnectedResponse::Scrape(response) => { - pending_scrape_responses - .borrow_mut() - .add_and_get_finished(response) - .map(|response| (Response::Scrape(response), addr)) - }, + ConnectedResponse::Scrape(response) => pending_scrape_responses + .borrow_mut() + .add_and_get_finished(response) + .map(|response| (Response::Scrape(response), addr)), }; if let Some((response, addr)) = opt_response { diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs index b399e3b..11fb19a 100644 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ b/aquatic_udp/src/lib/mio/handlers/announce.rs @@ -8,9 +8,9 @@ use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_udp_protocol::*; use crate::common::handlers::handle_announce_request; +use crate::common::handlers::*; use crate::common::*; use crate::config::Config; -use crate::common::handlers::*; #[inline] pub fn handle_announce_requests( diff --git a/aquatic_udp/src/lib/mio/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers/mod.rs index 5afd71b..bd97161 100644 --- a/aquatic_udp/src/lib/mio/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers/mod.rs @@ -6,8 +6,8 @@ use rand::{rngs::SmallRng, SeedableRng}; use aquatic_udp_protocol::*; -use crate::config::Config; use crate::common::handlers::*; +use crate::config::Config; use crate::mio::common::*; mod announce; @@ -75,7 +75,10 @@ pub fn run_request_worker( ); responses.extend(scrape_requests.drain(..).map(|(request, src)| { - (ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), src) + ( + ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), + src, + ) })); } diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index 1739dd7..a510fca 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -16,9 +16,9 @@ use socket2::{Domain, Protocol, Socket, Type}; use aquatic_udp_protocol::{IpVersion, Request, Response}; +use crate::common::handlers::*; use crate::common::network::ConnectionMap; use crate::common::*; -use crate::common::handlers::*; use crate::config::Config; use super::common::*; diff --git a/aquatic_udp_bench/src/announce.rs b/aquatic_udp_bench/src/announce.rs index eb09c3a..5eac23d 100644 --- a/aquatic_udp_bench/src/announce.rs +++ b/aquatic_udp_bench/src/announce.rs @@ -6,9 +6,9 @@ use indicatif::ProgressIterator; use rand::Rng; use rand_distr::Pareto; +use aquatic_udp::common::handlers::*; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::common::handlers::*; use crate::common::*; use crate::config::BenchConfig; diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index 450b65e..d5eee1c 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -6,9 +6,9 @@ use indicatif::ProgressIterator; use rand::Rng; use rand_distr::Pareto; +use aquatic_udp::common::handlers::*; use aquatic_udp::common::*; use aquatic_udp::config::Config; -use aquatic_udp::common::handlers::*; use crate::common::*; use crate::config::BenchConfig; From cbca0e03eb14fdf63c5bdce586b91117b7ff2183 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 14:56:34 +0200 Subject: [PATCH 12/25] aquatic_udp: glommio: remove comment --- aquatic_udp/src/lib/glommio/mod.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 1064fd7..636f5bf 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -1,8 +1,3 @@ -//! Work-in-progress glommio (io_uring) implementation -//! -//! * Doesn't support scrape requests -//! * Currently not faster than mio implementation - use std::sync::{atomic::AtomicUsize, Arc}; use aquatic_common::access_list::AccessList; From c1dd50d0c945abfc88c21380adb5f3481f05765b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 14:56:58 +0200 Subject: [PATCH 13/25] Update TODO --- TODO.md | 3 --- 1 file changed, 3 deletions(-) diff --git a/TODO.md b/TODO.md index b290b67..17e74c6 100644 --- a/TODO.md +++ b/TODO.md @@ -1,9 +1,6 @@ # TODO * aquatic_udp glommio - * update access lists - * clean connections - * update peer valid until * privdrop * access lists: From 32113ea2f3dc1959638382d558462af71977c4ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 15:10:01 +0200 Subject: [PATCH 14/25] aquatic_udp: move more announce logic into common code --- TODO.md | 1 + aquatic_udp/src/lib/common/handlers.rs | 30 +++++++++++- aquatic_udp/src/lib/glommio/handlers.rs | 45 +++++++---------- .../lib/mio/{handlers/mod.rs => handlers.rs} | 33 +++++++------ aquatic_udp/src/lib/mio/handlers/announce.rs | 49 ------------------- 5 files changed, 67 insertions(+), 91 deletions(-) rename aquatic_udp/src/lib/mio/{handlers/mod.rs => handlers.rs} (81%) delete mode 100644 aquatic_udp/src/lib/mio/handlers/announce.rs diff --git a/TODO.md b/TODO.md index 17e74c6..9cdd777 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,7 @@ * aquatic_udp glommio * privdrop + * disable by default! * access lists: * use arc-swap Cache diff --git a/aquatic_udp/src/lib/common/handlers.rs b/aquatic_udp/src/lib/common/handlers.rs index ce6446f..daf4ba9 100644 --- a/aquatic_udp/src/lib/common/handlers.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -28,7 +28,35 @@ impl Into for ConnectedResponse { } } -pub fn handle_announce_request( +pub fn handle_announce_request( + config: &Config, + rng: &mut SmallRng, + torrents: &mut TorrentMaps, + request: AnnounceRequest, + src: SocketAddr, + peer_valid_until: ValidUntil, +) -> AnnounceResponse { + match convert_ipv4_mapped_ipv6(src.ip()) { + IpAddr::V4(ip) => handle_announce_request_inner( + config, + rng, + &mut torrents.ipv4, + request, + ip, + peer_valid_until, + ), + IpAddr::V6(ip) => handle_announce_request_inner( + config, + rng, + &mut torrents.ipv6, + request, + ip, + peer_valid_until, + ), + } +} + +fn handle_announce_request_inner( config: &Config, rng: &mut SmallRng, torrents: &mut TorrentMap, diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index 809fca0..a288755 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::net::{IpAddr, SocketAddr}; +use std::net::SocketAddr; use std::rc::Rc; use std::time::Duration; @@ -81,36 +81,27 @@ async fn handle_request_stream( })() })); - while let Some((producer_index, request, addr)) = stream.next().await { - let response = match request { - ConnectedRequest::Announce(request) => ConnectedResponse::Announce(match addr.ip() { - IpAddr::V4(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv4, - request, - ip, - peer_valid_until.borrow().to_owned(), + while let Some((producer_index, request, src)) = stream.next().await { + let response = + match request { + ConnectedRequest::Announce(request) => { + ConnectedResponse::Announce(handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut(), + request, + src, + peer_valid_until.borrow().to_owned(), + )) + } + ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape( + handle_scrape_request(&mut torrents.borrow_mut(), src, request), ), - IpAddr::V6(ip) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut().ipv6, - request, - ip, - peer_valid_until.borrow().to_owned(), - ), - }), - ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape(handle_scrape_request( - &mut torrents.borrow_mut(), - addr, - request, - )), - }; + }; ::log::debug!("preparing to send response to channel: {:?}", response); - if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { + if let Err(err) = response_senders.try_send_to(producer_index, (response, src)) { ::log::warn!("response_sender.try_send: {:?}", err); } diff --git a/aquatic_udp/src/lib/mio/handlers/mod.rs b/aquatic_udp/src/lib/mio/handlers.rs similarity index 81% rename from aquatic_udp/src/lib/mio/handlers/mod.rs rename to aquatic_udp/src/lib/mio/handlers.rs index bd97161..99023fa 100644 --- a/aquatic_udp/src/lib/mio/handlers/mod.rs +++ b/aquatic_udp/src/lib/mio/handlers.rs @@ -1,6 +1,7 @@ use std::net::SocketAddr; use std::time::Duration; +use aquatic_common::ValidUntil; use crossbeam_channel::{Receiver, Sender}; use rand::{rngs::SmallRng, SeedableRng}; @@ -10,9 +11,6 @@ use crate::common::handlers::*; use crate::config::Config; use crate::mio::common::*; -mod announce; -use announce::handle_announce_requests; - pub fn run_request_worker( state: State, config: Config, @@ -66,19 +64,26 @@ pub fn run_request_worker( { let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock()); - handle_announce_requests( - &config, - &mut torrents, - &mut small_rng, - announce_requests.drain(..), - &mut responses, - ); + let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); + + responses.extend(announce_requests.drain(..).map(|(request, src)| { + let response = handle_announce_request( + &config, + &mut small_rng, + &mut torrents, + request, + src, + peer_valid_until, + ); + + (ConnectedResponse::Announce(response), src) + })); responses.extend(scrape_requests.drain(..).map(|(request, src)| { - ( - ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), - src, - ) + let response = + ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)); + + (response, src) })); } diff --git a/aquatic_udp/src/lib/mio/handlers/announce.rs b/aquatic_udp/src/lib/mio/handlers/announce.rs deleted file mode 100644 index 11fb19a..0000000 --- a/aquatic_udp/src/lib/mio/handlers/announce.rs +++ /dev/null @@ -1,49 +0,0 @@ -use std::net::{IpAddr, SocketAddr}; -use std::vec::Drain; - -use parking_lot::MutexGuard; -use rand::rngs::SmallRng; - -use aquatic_common::convert_ipv4_mapped_ipv6; -use aquatic_udp_protocol::*; - -use crate::common::handlers::handle_announce_request; -use crate::common::handlers::*; -use crate::common::*; -use crate::config::Config; - -#[inline] -pub fn handle_announce_requests( - config: &Config, - torrents: &mut MutexGuard, - rng: &mut SmallRng, - requests: Drain<(AnnounceRequest, SocketAddr)>, - responses: &mut Vec<(ConnectedResponse, SocketAddr)>, -) { - let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age); - - responses.extend(requests.map(|(request, src)| { - let peer_ip = convert_ipv4_mapped_ipv6(src.ip()); - - let response = match peer_ip { - IpAddr::V4(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv4, - request, - ip, - peer_valid_until, - ), - IpAddr::V6(ip) => handle_announce_request( - config, - rng, - &mut torrents.ipv6, - request, - ip, - peer_valid_until, - ), - }; - - (ConnectedResponse::Announce(response), src) - })); -} From eafb88c345a2e88407d89448c2c2da93e4435bbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 15:12:12 +0200 Subject: [PATCH 15/25] aquatic_udp: use const for empty scrape statistics --- aquatic_udp/src/lib/common/handlers.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aquatic_udp/src/lib/common/handlers.rs b/aquatic_udp/src/lib/common/handlers.rs index daf4ba9..773d268 100644 --- a/aquatic_udp/src/lib/common/handlers.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -141,7 +141,7 @@ pub fn handle_scrape_request( src: SocketAddr, request: ScrapeRequest, ) -> ScrapeResponse { - let empty_stats = create_torrent_scrape_statistics(0, 0); + const EMPTY_STATS: TorrentScrapeStatistics = create_torrent_scrape_statistics(0, 0); let mut stats: Vec = Vec::with_capacity(request.info_hashes.len()); @@ -155,7 +155,7 @@ pub fn handle_scrape_request( torrent_data.num_leechers as i32, )); } else { - stats.push(empty_stats); + stats.push(EMPTY_STATS); } } } else { @@ -166,7 +166,7 @@ pub fn handle_scrape_request( torrent_data.num_leechers as i32, )); } else { - stats.push(empty_stats); + stats.push(EMPTY_STATS); } } } @@ -178,7 +178,7 @@ pub fn handle_scrape_request( } #[inline(always)] -fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { +const fn create_torrent_scrape_statistics(seeders: i32, leechers: i32) -> TorrentScrapeStatistics { TorrentScrapeStatistics { seeders: NumberOfPeers(seeders), completed: NumberOfDownloads(0), // No implementation planned From 0e58347ac43deb95afb5676102ce0f529535e4e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 15:18:05 +0200 Subject: [PATCH 16/25] aquatic_udp: move privdrop code to crate root, use in glommio impl --- TODO.md | 1 - aquatic_udp/src/lib/glommio/mod.rs | 3 +++ aquatic_udp/src/lib/lib.rs | 41 ++++++++++++++++++++++++++++++ aquatic_udp/src/lib/mio/mod.rs | 32 +++-------------------- 4 files changed, 47 insertions(+), 30 deletions(-) diff --git a/TODO.md b/TODO.md index 9cdd777..21b7abd 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,6 @@ # TODO * aquatic_udp glommio - * privdrop * disable by default! * access lists: diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 636f5bf..d95121f 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -5,6 +5,7 @@ use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; use crate::config::Config; +use crate::drop_privileges_after_socket_binding; mod common; pub mod handlers; @@ -87,6 +88,8 @@ pub fn run(config: Config) -> anyhow::Result<()> { executors.push(executor); } + drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap(); + for executor in executors { executor .expect("failed to spawn local executor") diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 187d563..8e8cc4d 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,3 +1,11 @@ +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + time::Duration, +}; + use cfg_if::cfg_if; pub mod common; @@ -7,6 +15,7 @@ pub mod glommio; pub mod mio; use config::Config; +use privdrop::PrivDrop; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; @@ -19,3 +28,35 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { } } } + +fn drop_privileges_after_socket_binding( + config: &Config, + num_bound_sockets: Arc, +) -> anyhow::Result<()> { + if config.privileges.drop_privileges { + let mut counter = 0usize; + + loop { + let sockets = num_bound_sockets.load(Ordering::SeqCst); + + if sockets == config.socket_workers { + PrivDrop::default() + .chroot(config.privileges.chroot_path.clone()) + .user(config.privileges.user.clone()) + .apply()?; + + break; + } + + ::std::thread::sleep(Duration::from_millis(10)); + + counter += 1; + + if counter == 500 { + panic!("Sockets didn't bind in time for privilege drop."); + } + } + } + + Ok(()) +} diff --git a/aquatic_udp/src/lib/mio/mod.rs b/aquatic_udp/src/lib/mio/mod.rs index abda9c9..5c5f649 100644 --- a/aquatic_udp/src/lib/mio/mod.rs +++ b/aquatic_udp/src/lib/mio/mod.rs @@ -2,15 +2,11 @@ use std::thread::Builder; use std::time::Duration; use std::{ ops::Deref, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, + sync::{atomic::AtomicUsize, Arc}, }; use anyhow::Context; use crossbeam_channel::unbounded; -use privdrop::PrivDrop; pub mod common; pub mod handlers; @@ -20,6 +16,7 @@ pub mod tasks; use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery}; use crate::config::Config; +use crate::drop_privileges_after_socket_binding; use common::State; @@ -38,30 +35,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { start_workers(config.clone(), state.clone(), num_bound_sockets.clone())?; - if config.privileges.drop_privileges { - let mut counter = 0usize; - - loop { - let sockets = num_bound_sockets.load(Ordering::SeqCst); - - if sockets == config.socket_workers { - PrivDrop::default() - .chroot(config.privileges.chroot_path.clone()) - .user(config.privileges.user.clone()) - .apply()?; - - break; - } - - ::std::thread::sleep(Duration::from_millis(10)); - - counter += 1; - - if counter == 500 { - panic!("Sockets didn't bind in time for privilege drop."); - } - } - } + drop_privileges_after_socket_binding(&config, num_bound_sockets).unwrap(); loop { ::std::thread::sleep(Duration::from_secs(config.cleaning.interval)); From 821608ab503bbe6fb6f42fec2d2d0881368a376d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 16:00:41 +0200 Subject: [PATCH 17/25] aquatic_udp: glommio: return scrape stats in correct order --- aquatic_udp/src/lib/common/handlers.rs | 14 ++++- aquatic_udp/src/lib/glommio/handlers.rs | 37 +++++++----- aquatic_udp/src/lib/glommio/network.rs | 78 ++++++++++++++++--------- aquatic_udp/src/lib/mio/handlers.rs | 10 ++-- aquatic_udp/src/lib/mio/network.rs | 9 ++- aquatic_udp_bench/src/scrape.rs | 19 +++--- 6 files changed, 108 insertions(+), 59 deletions(-) diff --git a/aquatic_udp/src/lib/common/handlers.rs b/aquatic_udp/src/lib/common/handlers.rs index 773d268..380616a 100644 --- a/aquatic_udp/src/lib/common/handlers.rs +++ b/aquatic_udp/src/lib/common/handlers.rs @@ -10,20 +10,28 @@ use crate::common::*; #[derive(Debug)] pub enum ConnectedRequest { Announce(AnnounceRequest), - Scrape(ScrapeRequest), + Scrape { + request: ScrapeRequest, + /// Currently only used by glommio implementation + original_indices: Vec, + }, } #[derive(Debug)] pub enum ConnectedResponse { Announce(AnnounceResponse), - Scrape(ScrapeResponse), + Scrape { + response: ScrapeResponse, + /// Currently only used by glommio implementation + original_indices: Vec, + }, } impl Into for ConnectedResponse { fn into(self) -> Response { match self { Self::Announce(response) => Response::Announce(response), - Self::Scrape(response) => Response::Scrape(response), + Self::Scrape { response, .. } => Response::Scrape(response), } } } diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index a288755..3bfbc0d 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -82,22 +82,29 @@ async fn handle_request_stream( })); while let Some((producer_index, request, src)) = stream.next().await { - let response = - match request { - ConnectedRequest::Announce(request) => { - ConnectedResponse::Announce(handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut(), - request, - src, - peer_valid_until.borrow().to_owned(), - )) + let response = match request { + ConnectedRequest::Announce(request) => { + ConnectedResponse::Announce(handle_announce_request( + &config, + &mut rng, + &mut torrents.borrow_mut(), + request, + src, + peer_valid_until.borrow().to_owned(), + )) + } + ConnectedRequest::Scrape { + request, + original_indices, + } => { + let response = handle_scrape_request(&mut torrents.borrow_mut(), src, request); + + ConnectedResponse::Scrape { + response, + original_indices, } - ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape( - handle_scrape_request(&mut torrents.borrow_mut(), src, request), - ), - }; + } + }; ::log::debug!("preparing to send response to channel: {:?}", response); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index df35f00..feea9a1 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -1,4 +1,5 @@ use std::cell::RefCell; +use std::collections::BTreeMap; use std::io::Cursor; use std::net::{IpAddr, SocketAddr}; use std::rc::Rc; @@ -32,7 +33,7 @@ const PENDING_SCRAPE_MAX_WAIT: u64 = 30; struct PendingScrapeResponse { pending_worker_responses: usize, valid_until: ValidUntil, - stats: Vec, + stats: BTreeMap, } #[derive(Default)] @@ -48,16 +49,25 @@ impl PendingScrapeResponses { let pending = PendingScrapeResponse { pending_worker_responses, valid_until, - stats: Vec::new(), + stats: BTreeMap::new(), }; self.0.insert(transaction_id, pending); } - fn add_and_get_finished(&mut self, mut response: ScrapeResponse) -> Option { + fn add_and_get_finished( + &mut self, + mut response: ScrapeResponse, + mut original_indices: Vec, + ) -> Option { let finished = if let Some(r) = self.0.get_mut(&response.transaction_id) { r.pending_worker_responses -= 1; - r.stats.append(&mut response.torrent_stats); + + r.stats.extend( + original_indices + .drain(..) + .zip(response.torrent_stats.drain(..)), + ); r.pending_worker_responses == 0 } else { @@ -67,11 +77,12 @@ impl PendingScrapeResponses { }; if finished { - let r = self.0.remove(&response.transaction_id).unwrap(); + let PendingScrapeResponse { stats, .. } = + self.0.remove(&response.transaction_id).unwrap(); Some(ScrapeResponse { transaction_id: response.transaction_id, - torrent_stats: r.stats, + torrent_stats: stats.into_values().collect(), }) } else { None @@ -258,37 +269,47 @@ async fn read_requests( } } } - Ok(Request::Scrape(request)) => { - if connections.borrow().contains(request.connection_id, src) { - let mut consumer_requests: HashMap = + Ok(Request::Scrape(ScrapeRequest { + transaction_id, + connection_id, + info_hashes, + })) => { + if connections.borrow().contains(connection_id, src) { + let mut consumer_requests: HashMap)> = HashMap::new(); - for info_hash in request.info_hashes { - consumer_requests + for (i, info_hash) in info_hashes.into_iter().enumerate() { + let (req, indices) = consumer_requests .entry(calculate_request_consumer_index(&config, info_hash)) - .or_insert(ScrapeRequest { - transaction_id: request.transaction_id, - connection_id: request.connection_id, - info_hashes: Vec::new(), - }) - .info_hashes - .push(info_hash); + .or_insert_with(|| { + let request = ScrapeRequest { + transaction_id: transaction_id, + connection_id: connection_id, + info_hashes: Vec::new(), + }; + + (request, Vec::new()) + }); + + req.info_hashes.push(info_hash); + indices.push(i); } pending_scrape_responses.borrow_mut().prepare( - request.transaction_id, + transaction_id, consumer_requests.len(), pending_scrape_valid_until.borrow().to_owned(), ); - for (consumer_index, request) in consumer_requests { + for (consumer_index, (request, original_indices)) in consumer_requests { + let request = ConnectedRequest::Scrape { + request, + original_indices, + }; + if let Err(err) = request_senders.try_send_to( consumer_index, - ( - response_consumer_index, - ConnectedRequest::Scrape(request), - src, - ), + (response_consumer_index, request, src), ) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -338,9 +359,12 @@ async fn handle_shared_responses( while let Some((response, addr)) = stream.next().await { let opt_response = match response { ConnectedResponse::Announce(response) => Some((Response::Announce(response), addr)), - ConnectedResponse::Scrape(response) => pending_scrape_responses + ConnectedResponse::Scrape { + response, + original_indices, + } => pending_scrape_responses .borrow_mut() - .add_and_get_finished(response) + .add_and_get_finished(response, original_indices) .map(|response| (Response::Scrape(response), addr)), }; diff --git a/aquatic_udp/src/lib/mio/handlers.rs b/aquatic_udp/src/lib/mio/handlers.rs index 99023fa..7019b98 100644 --- a/aquatic_udp/src/lib/mio/handlers.rs +++ b/aquatic_udp/src/lib/mio/handlers.rs @@ -55,8 +55,8 @@ pub fn run_request_worker( }; match request { - ConnectedRequest::Announce(r) => announce_requests.push((r, src)), - ConnectedRequest::Scrape(r) => scrape_requests.push((r, src)), + ConnectedRequest::Announce(request) => announce_requests.push((request, src)), + ConnectedRequest::Scrape { request, .. } => scrape_requests.push((request, src)), } } @@ -80,8 +80,10 @@ pub fn run_request_worker( })); responses.extend(scrape_requests.drain(..).map(|(request, src)| { - let response = - ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)); + let response = ConnectedResponse::Scrape { + response: handle_scrape_request(&mut torrents, src, request), + original_indices: Vec::new(), + }; (response, src) })); diff --git a/aquatic_udp/src/lib/mio/network.rs b/aquatic_udp/src/lib/mio/network.rs index a510fca..73dacfc 100644 --- a/aquatic_udp/src/lib/mio/network.rs +++ b/aquatic_udp/src/lib/mio/network.rs @@ -192,9 +192,12 @@ fn read_requests( } Ok(Request::Scrape(request)) => { if connections.contains(request.connection_id, src) { - if let Err(err) = - request_sender.try_send((ConnectedRequest::Scrape(request), src)) - { + let request = ConnectedRequest::Scrape { + request, + original_indices: Vec::new(), + }; + + if let Err(err) = request_sender.try_send((request, src)) { ::log::warn!("request_sender.try_send failed: {:?}", err) } } diff --git a/aquatic_udp_bench/src/scrape.rs b/aquatic_udp_bench/src/scrape.rs index d5eee1c..f718753 100644 --- a/aquatic_udp_bench/src/scrape.rs +++ b/aquatic_udp_bench/src/scrape.rs @@ -42,15 +42,20 @@ pub fn bench_scrape_handler( for round in (0..bench_config.num_rounds).progress_with(pb) { for request_chunk in requests.chunks(p) { for (request, src) in request_chunk { - request_sender - .send((ConnectedRequest::Scrape(request.clone()), *src)) - .unwrap(); + let request = ConnectedRequest::Scrape { + request: request.clone(), + original_indices: Vec::new(), + }; + + request_sender.send((request, *src)).unwrap(); } - while let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.try_recv() { + while let Ok((ConnectedResponse::Scrape { response, .. }, _)) = + response_receiver.try_recv() + { num_responses += 1; - if let Some(stat) = r.torrent_stats.last() { + if let Some(stat) = response.torrent_stats.last() { dummy ^= stat.leechers.0; } } @@ -59,10 +64,10 @@ pub fn bench_scrape_handler( let total = bench_config.num_scrape_requests * (round + 1); while num_responses < total { - if let Ok((ConnectedResponse::Scrape(r), _)) = response_receiver.recv() { + if let Ok((ConnectedResponse::Scrape { response, .. }, _)) = response_receiver.recv() { num_responses += 1; - if let Some(stat) = r.torrent_stats.last() { + if let Some(stat) = response.torrent_stats.last() { dummy ^= stat.leechers.0; } } From 08488f50cde9f9aaec032339fd8e906a5151f81b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 16:34:41 +0200 Subject: [PATCH 18/25] Update TODO --- TODO.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/TODO.md b/TODO.md index 21b7abd..0a050ee 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,9 @@ * aquatic_udp glommio * disable by default! + * consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest) + containing TransactionId and BTreeMap, and same for + response * access lists: * use arc-swap Cache From d0aa87f99eb27797550a9a076027fe44decf65f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 19:02:05 +0200 Subject: [PATCH 19/25] github actions: rewrite file test action to fix ulimit; add timeouts --- .github/actions/test-transfer/Dockerfile | 8 +++++--- .github/actions/test-transfer/action.yml | 6 ++++-- .github/actions/test-transfer/entrypoint.sh | 10 +++++----- .github/workflows/cargo-build-and-test.yml | 3 +-- .github/workflows/test-transfer.yml | 6 +++++- 5 files changed, 20 insertions(+), 13 deletions(-) diff --git a/.github/actions/test-transfer/Dockerfile b/.github/actions/test-transfer/Dockerfile index b297bc7..aaf8299 100644 --- a/.github/actions/test-transfer/Dockerfile +++ b/.github/actions/test-transfer/Dockerfile @@ -1,8 +1,10 @@ -# Container image that runs your code +# Not used by Github action, but can be used to run test locally: +# 1. docker build -t aquatic ./path/to/Dockerfile +# 2. docker run aquatic +# 3. On failure, run `docker rmi aquatic -f` and go back to step 1 + FROM rust:bullseye -# Copies your code file from your action repository to the filesystem path `/` of the container COPY entrypoint.sh /entrypoint.sh -# Code file to execute when the docker container starts up (`entrypoint.sh`) ENTRYPOINT ["/entrypoint.sh"] \ No newline at end of file diff --git a/.github/actions/test-transfer/action.yml b/.github/actions/test-transfer/action.yml index 2520a97..aa5afef 100644 --- a/.github/actions/test-transfer/action.yml +++ b/.github/actions/test-transfer/action.yml @@ -10,5 +10,7 @@ outputs: wss_ipv4: description: 'WSS IPv4 status' runs: - using: 'docker' - image: 'Dockerfile' \ No newline at end of file + using: 'composite' + steps: + - run: $GITHUB_ACTION_PATH/entrypoint.sh + shell: bash \ No newline at end of file diff --git a/.github/actions/test-transfer/entrypoint.sh b/.github/actions/test-transfer/entrypoint.sh index 77fd0ad..1cdc7b0 100755 --- a/.github/actions/test-transfer/entrypoint.sh +++ b/.github/actions/test-transfer/entrypoint.sh @@ -5,11 +5,6 @@ # # IPv6 is unfortunately disabled by default in Docker # (see sysctl net.ipv6.conf.lo.disable_ipv6) -# -# When testing locally, use: -# 1. docker build -t aquatic ./path/to/Dockerfile -# 2. docker run aquatic -# 3. On failure, run `docker rmi aquatic -f` and go back to step 1 set -e @@ -21,6 +16,8 @@ else SUDO="" fi +ulimit -a + $SUDO apt-get update $SUDO apt-get install -y cmake libssl-dev screen rtorrent mktorrent ssl-cert ca-certificates curl golang @@ -43,6 +40,9 @@ else cd "$GITHUB_WORKSPACE" fi +echo "last aquatic commits:" +git log --oneline -3 + # Setup bogus TLS certificate $SUDO echo "127.0.0.1 example.com" >> /etc/hosts diff --git a/.github/workflows/cargo-build-and-test.yml b/.github/workflows/cargo-build-and-test.yml index 4e9b66e..53f6721 100644 --- a/.github/workflows/cargo-build-and-test.yml +++ b/.github/workflows/cargo-build-and-test.yml @@ -11,9 +11,8 @@ env: jobs: build: - runs-on: ubuntu-latest - + timeout-minutes: 10 steps: - uses: actions/checkout@v2 - name: Build diff --git a/.github/workflows/test-transfer.yml b/.github/workflows/test-transfer.yml index 97a4f91..2679356 100644 --- a/.github/workflows/test-transfer.yml +++ b/.github/workflows/test-transfer.yml @@ -10,9 +10,13 @@ jobs: test-transfer-http: runs-on: ubuntu-latest name: "Test BitTorrent file transfer over HTTP (with and without TLS), UDP and WSS" + timeout-minutes: 20 + container: + image: rust:1-bullseye + options: --ulimit memlock=524288:524288 steps: - name: Checkout uses: actions/checkout@v2 - name: Test file transfers uses: ./.github/actions/test-transfer - id: test_transfer \ No newline at end of file + id: test_transfer From 5729f337adb6bfd72b58bb99b8a7d5dfe8e2461a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 20:25:42 +0200 Subject: [PATCH 20/25] aquatic_udp: rename glommio feature to "io_uring", remove from default --- TODO.md | 1 - aquatic_udp/Cargo.toml | 3 +-- aquatic_udp/src/lib/lib.rs | 4 ++-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/TODO.md b/TODO.md index 0a050ee..72cec4d 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,6 @@ # TODO * aquatic_udp glommio - * disable by default! * consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest) containing TransactionId and BTreeMap, and same for response diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 3388c1b..b283860 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -15,8 +15,7 @@ path = "src/lib/lib.rs" name = "aquatic_udp" [features] -default = ["with-glommio"] -with-glommio = ["glommio", "futures-lite"] +io_uring = ["glommio", "futures-lite"] [dependencies] anyhow = "1" diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 8e8cc4d..a4a3a55 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -10,7 +10,7 @@ use cfg_if::cfg_if; pub mod common; pub mod config; -#[cfg(all(feature = "with-glommio", target_os = "linux"))] +#[cfg(all(feature = "io_uring", target_os = "linux"))] pub mod glommio; pub mod mio; @@ -21,7 +21,7 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { cfg_if! { - if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { + if #[cfg(all(feature = "io_uring", target_os = "linux"))] { glommio::run(config) } else { mio::run(config) From f69e3a00c751624a7beb0144afe40e02855a764e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 20:28:36 +0200 Subject: [PATCH 21/25] aquatic_udp: glommio: cleanup --- aquatic_udp/src/lib/glommio/network.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index feea9a1..cb046c0 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -119,7 +119,6 @@ pub async fn run_socket_worker( num_bound_sockets.fetch_add(1, Ordering::SeqCst); let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); - let (_, mut response_receivers) = response_mesh_builder.join(Role::Consumer).await.unwrap(); let response_consumer_index = response_receivers.consumer_id().unwrap(); From 13d0224321c7ac7aab13287c76e1c105f530c05d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 22:27:42 +0200 Subject: [PATCH 22/25] aquatic_udp: use features "with-glommio" and (default) "with-mio" --- aquatic_udp/Cargo.toml | 15 ++++++++++----- aquatic_udp/src/lib/config.rs | 12 +++++++++++- aquatic_udp/src/lib/lib.rs | 5 +++-- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index b283860..4dfed44 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -15,7 +15,9 @@ path = "src/lib/lib.rs" name = "aquatic_udp" [features] -io_uring = ["glommio", "futures-lite"] +default = ["with-mio"] +with-glommio = ["glommio", "futures-lite"] +with-mio = ["crossbeam-channel", "histogram", "mio", "socket2"] [dependencies] anyhow = "1" @@ -24,20 +26,23 @@ aquatic_common = "0.1.0" aquatic_udp_protocol = "0.1.0" cfg-if = "1" core_affinity = "0.5" -crossbeam-channel = "0.5" hashbrown = "0.11.2" hex = "0.4" -histogram = "0.6" indexmap = "1" log = "0.4" mimalloc = { version = "0.1", default-features = false } -mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } parking_lot = "0.11" privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } -socket2 = { version = "0.4.1", features = ["all"] } +# mio +crossbeam-channel = { version = "0.5", optional = true } +histogram = { version = "0.6", optional = true } +mio = { version = "0.7", features = ["udp", "os-poll", "os-util"], optional = true } +socket2 = { version = "0.4.1", features = ["all"], optional = true } + +# glommio glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } futures-lite = { version = "1", optional = true } diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index 5e7b6f6..d05b338 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -18,7 +18,9 @@ pub struct Config { pub log_level: LogLevel, pub network: NetworkConfig, pub protocol: ProtocolConfig, + #[cfg(feature = "with-mio")] pub handlers: HandlerConfig, + #[cfg(feature = "with-mio")] pub statistics: StatisticsConfig, pub cleaning: CleaningConfig, pub privileges: PrivilegeConfig, @@ -52,6 +54,7 @@ pub struct NetworkConfig { /// $ sudo sysctl -w net.core.rmem_max=104857600 /// $ sudo sysctl -w net.core.rmem_default=104857600 pub socket_recv_buffer_size: usize, + #[cfg(feature = "with-mio")] pub poll_event_capacity: usize, } @@ -66,6 +69,7 @@ pub struct ProtocolConfig { pub peer_announce_interval: i32, } +#[cfg(feature = "with-mio")] #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct HandlerConfig { @@ -75,6 +79,7 @@ pub struct HandlerConfig { pub channel_recv_timeout_microseconds: u64, } +#[cfg(feature = "with-mio")] #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct StatisticsConfig { @@ -119,7 +124,9 @@ impl Default for Config { log_level: LogLevel::Error, network: NetworkConfig::default(), protocol: ProtocolConfig::default(), + #[cfg(feature = "with-mio")] handlers: HandlerConfig::default(), + #[cfg(feature = "with-mio")] statistics: StatisticsConfig::default(), cleaning: CleaningConfig::default(), privileges: PrivilegeConfig::default(), @@ -133,8 +140,9 @@ impl Default for NetworkConfig { fn default() -> Self { Self { address: SocketAddr::from(([0, 0, 0, 0], 3000)), - poll_event_capacity: 4096, socket_recv_buffer_size: 4096 * 128, + #[cfg(feature = "with-mio")] + poll_event_capacity: 4096, } } } @@ -149,6 +157,7 @@ impl Default for ProtocolConfig { } } +#[cfg(feature = "with-mio")] impl Default for HandlerConfig { fn default() -> Self { Self { @@ -158,6 +167,7 @@ impl Default for HandlerConfig { } } +#[cfg(feature = "with-mio")] impl Default for StatisticsConfig { fn default() -> Self { Self { interval: 0 } diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index a4a3a55..e3eea68 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -10,8 +10,9 @@ use cfg_if::cfg_if; pub mod common; pub mod config; -#[cfg(all(feature = "io_uring", target_os = "linux"))] +#[cfg(all(feature = "with-glommio", target_os = "linux"))] pub mod glommio; +#[cfg(feature = "with-mio")] pub mod mio; use config::Config; @@ -21,7 +22,7 @@ pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { cfg_if! { - if #[cfg(all(feature = "io_uring", target_os = "linux"))] { + if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { glommio::run(config) } else { mio::run(config) From a554240ec8ed6bf2f6ba12415fe52a8542464201 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 22:28:11 +0200 Subject: [PATCH 23/25] Update README --- README.md | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/README.md b/README.md index 1538cdc..1a9e3b4 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,22 @@ Server responses per second, best result in bold: Please refer to `documents/aquatic-udp-load-test-2021-08-19.pdf` for more details. +#### Alternative implementation using io_uring + +[io_uring]: https://en.wikipedia.org/wiki/Io_uring +[glommio]: https://github.com/DataDog/glommio + +There is an alternative implementation that utilizes [io_uring] by running on +[glommio]. It only runs on Linux and requires a recent kernel (version 5.1 or later). +In some cases, it performs even better than the cross-platform implementation. + +To use it, pass the `with-glommio` feature when building, e.g.: + +```sh +cargo build -p aquatic_udp --features "with-glommio" --no-default-features +./target/release/aquatic_udp +``` + ### aquatic_http: HTTP BitTorrent tracker Aims for compatibility with the HTTP BitTorrent protocol, as described From b530ca20cfbd35fa1b3827949545a48f99b91ede Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 22:29:46 +0200 Subject: [PATCH 24/25] Update TODO --- TODO.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/TODO.md b/TODO.md index 72cec4d..d7ceb85 100644 --- a/TODO.md +++ b/TODO.md @@ -1,12 +1,13 @@ # TODO * aquatic_udp glommio + * Add to file transfer CI * consider adding ConnectedScrapeRequest::Scrape(PendingScrapeRequest) containing TransactionId and BTreeMap, and same for response * access lists: - * use arc-swap Cache + * use arc-swap Cache? * add CI tests * aquatic_ws: should it send back error on message parse error, or does that From a7b29c8deb2ad8e1e0c6d6ec42de31913e6f78d1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 23 Oct 2021 22:32:59 +0200 Subject: [PATCH 25/25] README: mention aquatic_udp core affinity settings --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 1a9e3b4..8089f9e 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,8 @@ except that it: Supports IPv4 and IPv6 (BitTorrent UDP protocol doesn't support IPv6 very well, however.) +For optimal performance, enable setting of core affinities in configuration. + #### Benchmarks [opentracker]: http://erdgeist.org/arts/software/opentracker/