diff --git a/Cargo.lock b/Cargo.lock index 7fa5c08..f6af11f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,7 +94,7 @@ dependencies = [ "either", "futures-lite", "futures-rustls", - "glommio", + "glommio 0.6.0 (git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5)", "itoa", "log", "memchr", @@ -120,7 +120,7 @@ dependencies = [ "aquatic_common", "aquatic_http_protocol", "futures-lite", - "glommio", + "glommio 0.6.0 (git+https://github.com/DataDog/glommio.git?rev=4e6b14772da2f4325271fbcf12d24cf91ed466e5)", "hashbrown 0.11.2", "log", "mimalloc", @@ -239,7 +239,7 @@ dependencies = [ "futures", "futures-lite", "futures-rustls", - "glommio", + "glommio 0.6.0 (git+https://github.com/DataDog/glommio.git?rev=2efe2f2a08f54394a435b674e8e0125057cbff03)", "hashbrown 0.11.2", "histogram", "log", @@ -270,7 +270,7 @@ dependencies = [ "async-tungstenite", "futures", "futures-rustls", - "glommio", + "glommio 0.6.0 (git+https://github.com/DataDog/glommio.git?rev=2efe2f2a08f54394a435b674e8e0125057cbff03)", "hashbrown 0.11.2", "mimalloc", "quickcheck", @@ -315,9 +315,9 @@ dependencies = [ [[package]] name = "async-tungstenite" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "742cc7dcb20b2f84a42f4691aa999070ec7e78f8e7e7438bf14be7017b44907e" +checksum = "a0d06e9a20f1c0d64b6067ef6aa9fdf59e194ecde93575591fb4c78063692324" dependencies = [ "futures-io", "futures-util", @@ -944,6 +944,37 @@ version = "0.26.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78cc372d058dcf6d5ecd98510e7fbc9e5aec4d21de70f65fea8fecebcd881bd4" +[[package]] +name = "glommio" +version = "0.6.0" +source = "git+https://github.com/DataDog/glommio.git?rev=2efe2f2a08f54394a435b674e8e0125057cbff03#2efe2f2a08f54394a435b674e8e0125057cbff03" +dependencies = [ + "ahash", + "bitflags 1.3.2", + "bitmaps", + "buddy-alloc", + "cc", + "concurrent-queue", + "crossbeam", + "enclose", + "futures-lite", + "intrusive-collections", + "lazy_static", + "libc", + "lockfree", + "log", + "membarrier", + "nix", + "pin-project-lite", + "rlimit", + "scoped-tls", + "scopeguard", + "smallvec", + "socket2 0.3.19", + "tracing", + "typenum", +] + [[package]] name = "glommio" version = "0.6.0" @@ -2214,9 +2245,9 @@ dependencies = [ [[package]] name = "tungstenite" -version = "0.15.0" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "983d40747bce878d2fb67d910dcb8bd3eca2b2358540c3cc1b98c027407a3ae3" +checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1" dependencies = [ "base64", "byteorder", diff --git a/TODO.md b/TODO.md index 6e04db2..e29a54f 100644 --- a/TODO.md +++ b/TODO.md @@ -41,7 +41,13 @@ * aquatic_ws * glommio - * fix memory leak / huge growth + * proper cpu set pinning + * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity + * replacing indexmap_amortized / simd_json with equivalents doesn't help + * SinkExt::send maybe doesn't wake up properly? + * related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ? + * general + * large amount of temporary allocations in serialize_20_bytes, pretty many in deserialize_20_bytes # Less important diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index d01fab8..4bab56b 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -34,7 +34,7 @@ rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" -tungstenite = "0.15" +tungstenite = "0.16" # mio crossbeam-channel = { version = "0.5", optional = true } @@ -45,11 +45,11 @@ parking_lot = { version = "0.11", optional = true } socket2 = { version = "0.4", features = ["all"], optional = true } # glommio -async-tungstenite = { version = "0.15", optional = true } +async-tungstenite = { version = "0.16", optional = true } futures-lite = { version = "1", optional = true } futures = { version = "0.3", optional = true } futures-rustls = { version = "0.22", optional = true } -glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5", optional = true } +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "2efe2f2a08f54394a435b674e8e0125057cbff03", optional = true } rustls-pemfile = { version = "0.2", optional = true } [dev-dependencies] diff --git a/aquatic_ws/src/common/handlers.rs b/aquatic_ws/src/common/handlers.rs index 29677ba..94dc0e5 100644 --- a/aquatic_ws/src/common/handlers.rs +++ b/aquatic_ws/src/common/handlers.rs @@ -1,6 +1,6 @@ use aquatic_common::extract_response_peers; use hashbrown::HashMap; -use rand::Rng; +use rand::rngs::SmallRng; use aquatic_ws_protocol::*; @@ -9,7 +9,7 @@ use crate::config::Config; pub fn handle_announce_request( config: &Config, - rng: &mut impl Rng, + rng: &mut SmallRng, torrent_maps: &mut TorrentMaps, out_messages: &mut Vec<(ConnectionMeta, OutMessage)>, valid_until: ValidUntil, diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index b376c87..8037b66 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -94,6 +94,13 @@ pub struct CleaningConfig { /// Remove peers that haven't announced for this long (seconds) pub max_peer_age: u64, + // Clean connections this often (seconds) + #[cfg(feature = "with-glommio")] + pub connection_cleaning_interval: u64, + /// Close connections if no responses have been sent to them for this long (seconds) + #[cfg(feature = "with-glommio")] + pub max_connection_idle: u64, + /// Remove connections that are older than this (seconds) #[cfg(feature = "with-mio")] pub max_connection_age: u64, @@ -180,9 +187,13 @@ impl Default for CleaningConfig { Self { torrent_cleaning_interval: 30, max_peer_age: 1800, + #[cfg(feature = "with-glommio")] + max_connection_idle: 60 * 5, #[cfg(feature = "with-mio")] max_connection_age: 1800, + #[cfg(feature = "with-glommio")] + connection_cleaning_interval: 30, } } } diff --git a/aquatic_ws/src/glommio/mod.rs b/aquatic_ws/src/glommio/mod.rs index 20b86cf..88d5659 100644 --- a/aquatic_ws/src/glommio/mod.rs +++ b/aquatic_ws/src/glommio/mod.rs @@ -17,13 +17,13 @@ use self::common::*; use glommio::{channels::channel_mesh::MeshBuilder, prelude::*}; -const SHARED_CHANNEL_SIZE: usize = 1024; +pub const SHARED_IN_CHANNEL_SIZE: usize = 1024; pub fn run(config: Config, state: State) -> anyhow::Result<()> { let num_peers = config.socket_workers + config.request_workers; - let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); - let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); let num_bound_sockets = Arc::new(AtomicUsize::new(0)); diff --git a/aquatic_ws/src/glommio/request.rs b/aquatic_ws/src/glommio/request.rs index c3332e0..bf9cdfb 100644 --- a/aquatic_ws/src/glommio/request.rs +++ b/aquatic_ws/src/glommio/request.rs @@ -2,7 +2,7 @@ use std::cell::RefCell; use std::rc::Rc; use std::time::Duration; -use futures_lite::StreamExt; +use futures::StreamExt; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::enclose; use glommio::prelude::*; @@ -16,6 +16,7 @@ use crate::common::*; use crate::config::Config; use super::common::State; +use super::SHARED_IN_CHANNEL_SIZE; pub async fn run_request_worker( config: Config, @@ -63,11 +64,11 @@ async fn handle_request_stream( config: Config, torrents: Rc>, out_message_senders: Rc>, - mut stream: S, + stream: S, ) where S: futures_lite::Stream + ::std::marker::Unpin, { - let mut rng = SmallRng::from_entropy(); + let rng = Rc::new(RefCell::new(SmallRng::from_entropy())); let max_peer_age = config.cleaning.max_peer_age; let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); @@ -80,35 +81,48 @@ async fn handle_request_stream( })() })); - let mut out_messages = Vec::new(); + let config = &config; + let torrents = &torrents; + let peer_valid_until = &peer_valid_until; + let rng = &rng; + let out_message_senders = &out_message_senders; - while let Some((meta, in_message)) = stream.next().await { - match in_message { - InMessage::AnnounceRequest(request) => handle_announce_request( - &config, - &mut rng, - &mut torrents.borrow_mut(), - &mut out_messages, - peer_valid_until.borrow().to_owned(), - meta, - request, - ), - InMessage::ScrapeRequest(request) => handle_scrape_request( - &config, - &mut torrents.borrow_mut(), - &mut out_messages, - meta, - request, - ), - }; + stream + .for_each_concurrent( + SHARED_IN_CHANNEL_SIZE, + move |(meta, in_message)| async move { + let mut out_messages = Vec::new(); - for (meta, out_message) in out_messages.drain(..) { - out_message_senders - .send_to(meta.out_message_consumer_id.0, (meta, out_message)) - .await - .expect("failed sending out_message to socket worker"); - } + match in_message { + InMessage::AnnounceRequest(request) => handle_announce_request( + &config, + &mut rng.borrow_mut(), + &mut torrents.borrow_mut(), + &mut out_messages, + peer_valid_until.borrow().to_owned(), + meta, + request, + ), + InMessage::ScrapeRequest(request) => handle_scrape_request( + &config, + &mut torrents.borrow_mut(), + &mut out_messages, + meta, + request, + ), + }; - yield_if_needed().await; - } + for (meta, out_message) in out_messages.drain(..) { + ::log::info!("request worker trying to send OutMessage to socket worker"); + + out_message_senders + .send_to(meta.out_message_consumer_id.0, (meta, out_message)) + .await + .expect("failed sending out_message to socket worker"); + + ::log::info!("request worker sent OutMessage to socket worker"); + } + }, + ) + .await; } diff --git a/aquatic_ws/src/glommio/socket.rs b/aquatic_ws/src/glommio/socket.rs index 3dccf25..5d47091 100644 --- a/aquatic_ws/src/glommio/socket.rs +++ b/aquatic_ws/src/glommio/socket.rs @@ -5,22 +5,23 @@ use std::net::SocketAddr; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_ws_protocol::*; use async_tungstenite::WebSocketStream; use futures::stream::{SplitSink, SplitStream}; +use futures::StreamExt; use futures_lite::future::race; -use futures_lite::StreamExt; use futures_rustls::server::TlsStream; use futures_rustls::TlsAcceptor; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; -use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender}; +use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::channels::shared_channel::ConnectedReceiver; use glommio::net::{TcpListener, TcpStream}; -use glommio::timer::TimerActionRepeat; +use glommio::task::JoinHandle; +use glommio::timer::{sleep, timeout, TimerActionRepeat}; use glommio::{enclose, prelude::*}; use hashbrown::HashMap; use slab::Slab; @@ -31,13 +32,20 @@ use crate::common::*; use super::common::*; +const LOCAL_CHANNEL_SIZE: usize = 16; + struct PendingScrapeResponse { pending_worker_out_messages: usize, stats: HashMap, } struct ConnectionReference { + task_handle: Option>, + /// Sender part of channel used to pass on outgoing messages from request + /// worker out_message_sender: Rc>, + /// Updated after sending message to peer + valid_until: ValidUntil, } pub async fn run_socket_worker( @@ -57,29 +65,38 @@ pub async fn run_socket_worker( let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); let in_message_senders = Rc::new(in_message_senders); + let tq_prioritized = executor().create_task_queue( + Shares::Static(100), + Latency::Matters(Duration::from_millis(1)), + "prioritized", + ); + let tq_regular = + executor().create_task_queue(Shares::Static(1), Latency::NotImportant, "regular"); + let (_, mut out_message_receivers) = out_message_mesh_builder.join(Role::Consumer).await.unwrap(); let out_message_consumer_id = ConsumerId(out_message_receivers.consumer_id().unwrap()); let connection_slab = Rc::new(RefCell::new(Slab::new())); - let connections_to_remove = Rc::new(RefCell::new(Vec::new())); - // Periodically remove closed connections - TimerActionRepeat::repeat( - enclose!((config, connection_slab, connections_to_remove) move || { - remove_closed_connections( + // Periodically clean connections + TimerActionRepeat::repeat_into( + enclose!((config, connection_slab) move || { + clean_connections( config.clone(), connection_slab.clone(), - connections_to_remove.clone(), ) }), - ); + tq_prioritized, + ) + .unwrap(); for (_, out_message_receiver) in out_message_receivers.streams() { - spawn_local(receive_out_messages( - out_message_receiver, - connection_slab.clone(), - )) + spawn_local_into( + receive_out_messages(out_message_receiver, connection_slab.clone()), + tq_regular, + ) + .unwrap() .detach(); } @@ -88,18 +105,25 @@ pub async fn run_socket_worker( while let Some(stream) = incoming.next().await { match stream { Ok(stream) => { - let (out_message_sender, out_message_receiver) = new_unbounded(); + let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE); let out_message_sender = Rc::new(out_message_sender); let key = RefCell::borrow_mut(&connection_slab).insert(ConnectionReference { + task_handle: None, out_message_sender: out_message_sender.clone(), + valid_until: ValidUntil::new(config.cleaning.max_connection_idle), }); - spawn_local(enclose!((config, access_list, in_message_senders, tls_config, connections_to_remove) async move { - if let Err(err) = Connection::run( + ::log::info!("accepting stream: {}", key); + + let task_handle = spawn_local_into(enclose!((config, access_list, in_message_senders, connection_slab, tls_config) async move { + if let Err(err) = run_connection( config, access_list, in_message_senders, + tq_prioritized, + tq_regular, + connection_slab.clone(), out_message_sender, out_message_receiver, out_message_consumer_id, @@ -110,9 +134,14 @@ pub async fn run_socket_worker( ::log::debug!("Connection::run() error: {:?}", err); } - RefCell::borrow_mut(&connections_to_remove).push(key); - })) + connection_slab.borrow_mut().try_remove(key); + }), tq_regular) + .unwrap() .detach(); + + if let Some(reference) = connection_slab.borrow_mut().get_mut(key) { + reference.task_handle = Some(task_handle); + } } Err(err) => { ::log::error!("accept connection: {:?}", err); @@ -121,26 +150,28 @@ pub async fn run_socket_worker( } } -async fn remove_closed_connections( +async fn clean_connections( config: Rc, connection_slab: Rc>>, - connections_to_remove: Rc>>, ) -> Option { - let connections_to_remove = connections_to_remove.replace(Vec::new()); + let now = Instant::now(); - for connection_id in connections_to_remove { - if let Some(_) = RefCell::borrow_mut(&connection_slab).try_remove(connection_id) { - ::log::debug!("removed connection with id {}", connection_id); - } else { - ::log::error!( - "couldn't remove connection with id {}, it is not in connection slab", - connection_id - ); + connection_slab.borrow_mut().retain(|_, reference| { + let keep = reference.valid_until.0 > now; + + if !keep { + if let Some(ref handle) = reference.task_handle { + handle.cancel(); + } } - } + + keep + }); + + connection_slab.borrow_mut().shrink_to_fit(); Some(Duration::from_secs( - config.cleaning.torrent_cleaning_interval, + config.cleaning.connection_cleaning_interval, )) } @@ -148,15 +179,22 @@ async fn receive_out_messages( mut out_message_receiver: ConnectedReceiver<(ConnectionMeta, OutMessage)>, connection_references: Rc>>, ) { - while let Some(channel_out_message) = out_message_receiver.next().await { - if let Some(reference) = connection_references - .borrow() - .get(channel_out_message.0.connection_id.0) - { - match reference.out_message_sender.try_send(channel_out_message) { - Ok(()) | Err(GlommioError::Closed(_)) => {} + let connection_references = &connection_references; + + while let Some((meta, out_message)) = out_message_receiver.next().await { + if let Some(reference) = connection_references.borrow().get(meta.connection_id.0) { + ::log::info!( + "local channel {} len: {}", + meta.connection_id.0, + reference.out_message_sender.len() + ); + + match reference.out_message_sender.try_send((meta, out_message)) { + Ok(()) => {} + Err(GlommioError::Closed(_)) => {} + Err(GlommioError::WouldBlock(_)) => {} Err(err) => { - ::log::error!( + ::log::info!( "Couldn't send out_message from shared channel to local receiver: {:?}", err ); @@ -166,39 +204,42 @@ async fn receive_out_messages( } } -struct Connection; +async fn run_connection( + config: Rc, + access_list: Arc, + in_message_senders: Rc>, + tq_prioritized: TaskQueueHandle, + tq_regular: TaskQueueHandle, + connection_slab: Rc>>, + out_message_sender: Rc>, + out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + out_message_consumer_id: ConsumerId, + connection_id: ConnectionId, + tls_config: Arc, + stream: TcpStream, +) -> anyhow::Result<()> { + let peer_addr = stream + .peer_addr() + .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; -impl Connection { - async fn run( - config: Rc, - access_list: Arc, - in_message_senders: Rc>, - out_message_sender: Rc>, - out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, - out_message_consumer_id: ConsumerId, - connection_id: ConnectionId, - tls_config: Arc, - stream: TcpStream, - ) -> anyhow::Result<()> { - let peer_addr = stream - .peer_addr() - .map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?; + let tls_acceptor: TlsAcceptor = tls_config.into(); + let stream = tls_acceptor.accept(stream).await?; - let tls_acceptor: TlsAcceptor = tls_config.into(); - let stream = tls_acceptor.accept(stream).await?; + let ws_config = tungstenite::protocol::WebSocketConfig { + max_frame_size: Some(config.network.websocket_max_frame_size), + max_message_size: Some(config.network.websocket_max_message_size), + max_send_queue: Some(2), + ..Default::default() + }; + let stream = async_tungstenite::accept_async_with_config(stream, Some(ws_config)).await?; - let ws_config = tungstenite::protocol::WebSocketConfig { - max_frame_size: Some(config.network.websocket_max_frame_size), - max_message_size: Some(config.network.websocket_max_message_size), - ..Default::default() - }; - let stream = async_tungstenite::accept_async_with_config(stream, Some(ws_config)).await?; - let (ws_out, ws_in) = futures::StreamExt::split(stream); + let (ws_out, ws_in) = futures::StreamExt::split(stream); - let pending_scrape_slab = Rc::new(RefCell::new(Slab::new())); - let access_list_cache = create_access_list_cache(&access_list); + let pending_scrape_slab = Rc::new(RefCell::new(Slab::new())); + let access_list_cache = create_access_list_cache(&access_list); - let reader_handle = spawn_local(enclose!((pending_scrape_slab) async move { + let reader_handle = spawn_local_into( + enclose!((config, pending_scrape_slab) async move { let mut reader = ConnectionReader { config, access_list_cache, @@ -211,24 +252,37 @@ impl Connection { connection_id, }; - reader.run_in_message_loop().await - })) - .detach(); + let result = reader.run_in_message_loop().await; - let writer_handle = spawn_local(async move { + result + }), + tq_regular, + ) + .unwrap() + .detach(); + + let writer_handle = spawn_local_into( + async move { let mut writer = ConnectionWriter { + config, out_message_receiver, + connection_slab, ws_out, pending_scrape_slab, peer_addr, + connection_id, }; - writer.run_out_message_loop().await - }) - .detach(); + let result = writer.run_out_message_loop().await; - race(reader_handle, writer_handle).await.unwrap() - } + result + }, + tq_prioritized, + ) + .unwrap() + .detach(); + + race(reader_handle, writer_handle).await.unwrap() } struct ConnectionReader { @@ -248,20 +302,29 @@ impl ConnectionReader { loop { ::log::debug!("read_in_message"); + while self.out_message_sender.is_full() { + sleep(Duration::from_millis(100)).await; + + yield_if_needed().await; + } + let message = self.ws_in.next().await.unwrap()?; match InMessage::from_ws_message(message) { Ok(in_message) => { - ::log::debug!("received in_message: {:?}", in_message); + ::log::debug!("parsed in_message"); self.handle_in_message(in_message).await?; } Err(err) => { ::log::debug!("Couldn't parse in_message: {:?}", err); - self.send_error_response("Invalid request".into(), None); + self.send_error_response("Invalid request".into(), None) + .await?; } } + + yield_if_needed().await; } } @@ -288,8 +351,10 @@ impl ConnectionReader { ) .await .unwrap(); + ::log::info!("sent message to request worker"); } else { - self.send_error_response("Info hash not allowed".into(), Some(info_hash)); + self.send_error_response("Info hash not allowed".into(), Some(info_hash)) + .await?; } } InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => { @@ -298,7 +363,8 @@ impl ConnectionReader { } else { // If request.info_hashes is empty, don't return scrape for all // torrents, even though reference server does it. It is too expensive. - self.send_error_response("Full scrapes are not allowed".into(), None); + self.send_error_response("Full scrapes are not allowed".into(), None) + .await?; return Ok(()); }; @@ -337,6 +403,7 @@ impl ConnectionReader { .send_to(consumer_index, (meta, in_message)) .await .unwrap(); + ::log::info!("sent message to request worker"); } } } @@ -344,19 +411,21 @@ impl ConnectionReader { Ok(()) } - fn send_error_response(&self, failure_reason: Cow<'static, str>, info_hash: Option) { + async fn send_error_response( + &self, + failure_reason: Cow<'static, str>, + info_hash: Option, + ) -> anyhow::Result<()> { let out_message = OutMessage::ErrorResponse(ErrorResponse { action: Some(ErrorResponseAction::Scrape), failure_reason, info_hash, }); - if let Err(err) = self - .out_message_sender - .try_send((self.make_connection_meta(None), out_message)) - { - ::log::error!("ConnectionWriter::send_error_response failed: {:?}", err) - } + self.out_message_sender + .send((self.make_connection_meta(None), out_message)) + .await + .map_err(|err| anyhow::anyhow!("ConnectionReader::send_error_response failed: {}", err)) } fn make_connection_meta(&self, pending_scrape_id: Option) -> ConnectionMeta { @@ -371,10 +440,13 @@ impl ConnectionReader { } struct ConnectionWriter { + config: Rc, out_message_receiver: LocalReceiver<(ConnectionMeta, OutMessage)>, + connection_slab: Rc>>, ws_out: SplitSink>, tungstenite::Message>, pending_scrape_slab: Rc>>, peer_addr: SocketAddr, + connection_id: ConnectionId, } impl ConnectionWriter { @@ -431,10 +503,40 @@ impl ConnectionWriter { } async fn send_out_message(&mut self, out_message: &OutMessage) -> anyhow::Result<()> { - futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await?; - futures::SinkExt::flush(&mut self.ws_out).await?; + let result = timeout(Duration::from_secs(10), async { + let result = + futures::SinkExt::send(&mut self.ws_out, out_message.to_ws_message()).await; - Ok(()) + Ok(result) + }) + .await; + + match result { + Ok(Ok(())) => { + self.connection_slab + .borrow_mut() + .get_mut(self.connection_id.0) + .ok_or_else(|| { + anyhow::anyhow!( + "connection reference {} not found in slab", + self.connection_id.0 + ) + })? + .valid_until = ValidUntil::new(self.config.cleaning.max_connection_idle); + + Ok(()) + } + Ok(Err(err)) => Err(err.into()), + Err(err) => { + ::log::info!( + "send_out_message: send to {} took to long: {}", + self.peer_addr, + err + ); + + Ok(()) + } + } } } diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index b131619..a922427 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -14,13 +14,13 @@ cpu-pinning = ["aquatic_common/cpu-pinning"] [dependencies] anyhow = "1" -async-tungstenite = "0.15" +async-tungstenite = "0.16" aquatic_cli_helpers = "0.1.0" aquatic_common = "0.1.0" aquatic_ws_protocol = "0.1.0" futures = "0.3" futures-rustls = "0.22" -glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" } +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "2efe2f2a08f54394a435b674e8e0125057cbff03" } hashbrown = { version = "0.11", features = ["serde"] } mimalloc = { version = "0.1", default-features = false } rand = { version = "0.8", features = ["small_rng"] } @@ -28,7 +28,7 @@ rand_distr = "0.4" rustls = { version = "0.20", features = ["dangerous_configuration"] } serde = { version = "1", features = ["derive"] } serde_json = "1" -tungstenite = "0.15" +tungstenite = "0.16" [dev-dependencies] quickcheck = "1" diff --git a/aquatic_ws_load_test/src/config.rs b/aquatic_ws_load_test/src/config.rs index 9eda671..8812562 100644 --- a/aquatic_ws_load_test/src/config.rs +++ b/aquatic_ws_load_test/src/config.rs @@ -11,7 +11,8 @@ pub struct Config { pub server_address: SocketAddr, pub log_level: LogLevel, pub num_workers: usize, - pub num_connections: usize, + pub num_connections_per_worker: usize, + pub connection_creation_interval_ms: u64, pub duration: usize, pub torrents: TorrentConfig, #[cfg(feature = "cpu-pinning")] @@ -24,6 +25,22 @@ impl aquatic_cli_helpers::Config for Config { } } +impl Default for Config { + fn default() -> Self { + Self { + server_address: "127.0.0.1:3000".parse().unwrap(), + log_level: LogLevel::Error, + num_workers: 1, + num_connections_per_worker: 16, + connection_creation_interval_ms: 10, + duration: 0, + torrents: TorrentConfig::default(), + #[cfg(feature = "cpu-pinning")] + cpu_pinning: CpuPinningConfig::default_for_load_test(), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct TorrentConfig { @@ -43,21 +60,6 @@ pub struct TorrentConfig { pub weight_scrape: usize, } -impl Default for Config { - fn default() -> Self { - Self { - server_address: "127.0.0.1:3000".parse().unwrap(), - log_level: LogLevel::Error, - num_workers: 1, - num_connections: 16, - duration: 0, - torrents: TorrentConfig::default(), - #[cfg(feature = "cpu-pinning")] - cpu_pinning: CpuPinningConfig::default_for_load_test(), - } - } -} - impl Default for TorrentConfig { fn default() -> Self { Self { diff --git a/aquatic_ws_load_test/src/network.rs b/aquatic_ws_load_test/src/network.rs index b0f78ce..c3f75c2 100644 --- a/aquatic_ws_load_test/src/network.rs +++ b/aquatic_ws_load_test/src/network.rs @@ -44,7 +44,9 @@ async fn periodically_open_connections( load_test_state: LoadTestState, num_active_connections: Rc>, ) -> Option { - if *num_active_connections.borrow() < config.num_connections { + let wait = Duration::from_millis(config.connection_creation_interval_ms); + + if *num_active_connections.borrow() < config.num_connections_per_worker { spawn_local(async move { if let Err(err) = Connection::run(config, tls_config, load_test_state, num_active_connections).await @@ -55,7 +57,7 @@ async fn periodically_open_connections( .detach(); } - Some(Duration::from_secs(1)) + Some(wait) } struct Connection { @@ -102,8 +104,6 @@ impl Connection { *num_active_connections.borrow_mut() += 1; - println!("run connection"); - if let Err(err) = connection.run_connection_loop().await { eprintln!("connection error: {:?}", err); } diff --git a/aquatic_ws_protocol/Cargo.toml b/aquatic_ws_protocol/Cargo.toml index 55c5153..26f0ea7 100644 --- a/aquatic_ws_protocol/Cargo.toml +++ b/aquatic_ws_protocol/Cargo.toml @@ -22,7 +22,7 @@ hashbrown = { version = "0.11", features = ["serde"] } serde = { version = "1", features = ["derive"] } serde_json = "1" simd-json = { version = "0.4", features = ["allow-non-simd"] } -tungstenite = "0.15" +tungstenite = "0.16" [dev-dependencies] criterion = "0.3" diff --git a/scripts/heaptrack.sh b/scripts/heaptrack.sh new file mode 100755 index 0000000..bd602dd --- /dev/null +++ b/scripts/heaptrack.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +heaptrack --pid $(pgrep "^aquatic_[a-z]{1,4}$") diff --git a/scripts/watch-threads.sh b/scripts/watch-threads.sh index 864fbad..dfeb355 100755 --- a/scripts/watch-threads.sh +++ b/scripts/watch-threads.sh @@ -1,3 +1,3 @@ #!/bin/sh -watch -d -n 0.5 ps H -o euser,pid,tid,comm,%mem,rss,%cpu,psr -p `pgrep aquatic` +watch -n 0.5 ps H -o euser,pid,tid,comm,%mem,rss,%cpu,psr -p `pgrep aquatic`