From e458cc54db11f7cb6c13f525f145feb6cc6d9ae4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 28 Oct 2021 01:13:18 +0200 Subject: [PATCH] aquatic_http_load_test: remove mio implementation, clean up --- Cargo.lock | 2 - aquatic_http_load_test/Cargo.toml | 7 +- aquatic_http_load_test/src/config.rs | 22 +- aquatic_http_load_test/src/glommio.rs | 260 -------------- aquatic_http_load_test/src/main.rs | 59 ++-- aquatic_http_load_test/src/network.rs | 478 ++++++++++++-------------- 6 files changed, 242 insertions(+), 586 deletions(-) delete mode 100644 aquatic_http_load_test/src/glommio.rs diff --git a/Cargo.lock b/Cargo.lock index aa199b5..d76d7ea 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,13 +131,11 @@ dependencies = [ "glommio", "hashbrown 0.11.2", "mimalloc", - "mio", "quickcheck", "quickcheck_macros", "rand", "rand_distr", "rustls", - "rustls-pemfile", "serde", ] diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 0183b75..25baab6 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -13,18 +13,15 @@ name = "aquatic_http_load_test" anyhow = "1" aquatic_cli_helpers = "0.1.0" aquatic_http_protocol = "0.1.0" +futures-lite = "1" hashbrown = "0.11.2" +glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" } mimalloc = { version = "0.1", default-features = false } -mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } rand = { version = "0.8", features = ["small_rng"] } rand_distr = "0.4" rustls = { version = "0.20", features = ["dangerous_configuration"] } -rustls-pemfile = "0.2" serde = { version = "1", features = ["derive"] } -futures-lite = "1" -glommio = { git = "https://github.com/DataDog/glommio.git", rev = "4e6b14772da2f4325271fbcf12d24cf91ed466e5" } - [dev-dependencies] quickcheck = "1.0" quickcheck_macros = "1.0" diff --git a/aquatic_http_load_test/src/config.rs b/aquatic_http_load_test/src/config.rs index eb87bb9..1c8456a 100644 --- a/aquatic_http_load_test/src/config.rs +++ b/aquatic_http_load_test/src/config.rs @@ -1,4 +1,4 @@ -use std::{net::SocketAddr, path::PathBuf}; +use std::net::SocketAddr; use serde::{Deserialize, Serialize}; @@ -9,20 +9,11 @@ pub struct Config { pub num_workers: u8, pub num_connections: usize, pub duration: usize, - pub network: NetworkConfig, pub torrents: TorrentConfig, } impl aquatic_cli_helpers::Config for Config {} -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(default)] -pub struct NetworkConfig { - pub connection_creation_interval: usize, - pub poll_timeout_microseconds: u64, - pub poll_event_capacity: usize, -} - #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct TorrentConfig { @@ -48,22 +39,11 @@ impl Default for Config { num_workers: 1, num_connections: 8, duration: 0, - network: NetworkConfig::default(), torrents: TorrentConfig::default(), } } } -impl Default for NetworkConfig { - fn default() -> Self { - Self { - connection_creation_interval: 10, - poll_timeout_microseconds: 197, - poll_event_capacity: 64, - } - } -} - impl Default for TorrentConfig { fn default() -> Self { Self { diff --git a/aquatic_http_load_test/src/glommio.rs b/aquatic_http_load_test/src/glommio.rs deleted file mode 100644 index 5783a85..0000000 --- a/aquatic_http_load_test/src/glommio.rs +++ /dev/null @@ -1,260 +0,0 @@ -use std::{cell::RefCell, convert::TryInto, io::{Cursor, ErrorKind, Read}, rc::Rc, sync::{Arc, atomic::Ordering}, time::Duration}; - -use aquatic_http_protocol::response::Response; -use futures_lite::{AsyncReadExt, AsyncWriteExt}; -use glommio::{enclose, prelude::*, timer::TimerActionRepeat}; -use glommio::net::TcpStream; -use rand::{SeedableRng, prelude::SmallRng}; -use rustls::ClientConnection; - -use crate::{common::LoadTestState, config::Config, utils::create_random_request}; - -pub async fn run_socket_thread( - config: Config, - tls_config: Arc, - load_test_state: LoadTestState, -) -> anyhow::Result<()> { - let config = Rc::new(config); - let num_active_connections = Rc::new(RefCell::new(0usize)); - - TimerActionRepeat::repeat(enclose!((config, tls_config, load_test_state, num_active_connections) move || { - enclose!((config, tls_config, load_test_state, num_active_connections) move || async move { - if *num_active_connections.borrow() < config.num_connections { - spawn_local(async move { - if let Err(err) = Connection::run(config, tls_config, load_test_state, num_active_connections).await { - eprintln!("connection creation error: {:?}", err); - } - }).detach(); - } - - Some(Duration::from_secs(1)) - })() - })); - - futures_lite::future::pending::().await; - - Ok(()) -} - -struct Connection { - config: Rc, - load_test_state: LoadTestState, - rng: SmallRng, - stream: TcpStream, - tls: ClientConnection, - response_buffer: [u8; 2048], - response_buffer_position: usize, - send_new_request: bool, - queued_responses: usize, -} - -impl Connection { - async fn run( - config: Rc, - tls_config: Arc, - load_test_state: LoadTestState, - num_active_connections: Rc>, - ) -> anyhow::Result<()> { - let stream = TcpStream::connect(config.server_address).await - .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; - let tls = ClientConnection::new(tls_config, "example.com".try_into().unwrap()).unwrap(); - let rng = SmallRng::from_entropy(); - - let mut connection = Connection { - config, - load_test_state, - rng, - stream, - tls, - response_buffer: [0; 2048], - response_buffer_position: 0, - send_new_request: true, - queued_responses: 0, - }; - - *num_active_connections.borrow_mut() += 1; - - println!("run connection"); - - if let Err(err) = connection.run_connection_loop().await { - eprintln!("connection error: {:?}", err); - } - - *num_active_connections.borrow_mut() -= 1; - - Ok(()) - } - - async fn run_connection_loop(&mut self) -> anyhow::Result<()> { - loop { - if self.send_new_request { - let request = create_random_request(&self.config, &self.load_test_state, &mut self.rng); - - request.write(&mut self.tls.writer())?; - self.queued_responses += 1; - - self.send_new_request = false; - } - - self.write_tls().await?; - self.read_tls().await?; - } - } - - async fn read_tls(&mut self) -> anyhow::Result<()> { - loop { - let mut buf = [0u8; 1024]; - - let bytes_read = self.stream.read(&mut buf).await?; - - if bytes_read == 0 { - return Err(anyhow::anyhow!("Peer has closed connection")); - } - - self - .load_test_state - .statistics - .bytes_received - .fetch_add(bytes_read, Ordering::SeqCst); - - let _ = self.tls.read_tls(&mut &buf[..bytes_read]).unwrap(); - - let io_state = self.tls.process_new_packets()?; - - let mut added_plaintext = false; - - if io_state.plaintext_bytes_to_read() != 0 { - loop { - match self.tls.reader().read(&mut buf) { - Ok(0) => { - break; - } - Ok(amt) => { - let end = self.response_buffer_position + amt; - - if end > self.response_buffer.len() { - return Err(anyhow::anyhow!("response too large")); - } else { - let response_buffer_slice = &mut self.response_buffer[self.response_buffer_position..end]; - - response_buffer_slice.copy_from_slice(&buf[..amt]); - - self.response_buffer_position = end; - - added_plaintext = true; - } - } - Err(err) if err.kind() == ErrorKind::WouldBlock => { - break; - } - Err(err) => { - break; - } - } - } - } - - if added_plaintext { - let interesting_bytes = &self.response_buffer[..self.response_buffer_position]; - - let mut opt_body_start_index = None; - - for (i, chunk) in interesting_bytes.windows(4).enumerate() { - if chunk == b"\r\n\r\n" { - opt_body_start_index = Some(i + 4); - - break; - } - } - - if let Some(body_start_index) = opt_body_start_index { - let interesting_bytes = &interesting_bytes[body_start_index..]; - - match Response::from_bytes(interesting_bytes) { - Ok(response) => { - - match response { - Response::Announce(_) => { - self - .load_test_state - .statistics - .responses_announce - .fetch_add(1, Ordering::SeqCst); - } - Response::Scrape(_) => { - self - .load_test_state - .statistics - .responses_scrape - .fetch_add(1, Ordering::SeqCst); - } - Response::Failure(response) => { - self - .load_test_state - .statistics - .responses_failure - .fetch_add(1, Ordering::SeqCst); - println!( - "failure response: reason: {}", - response.failure_reason - ); - } - } - - self.response_buffer_position = 0; - self.send_new_request = true; - - break; - } - Err(err) => { - eprintln!( - "deserialize response error with {} bytes read: {:?}, text: {}", - self.response_buffer_position, - err, - String::from_utf8_lossy(interesting_bytes) - ); - } - } - } - } - - if self.tls.wants_write() { - break; - } - } - - Ok(()) - } - - async fn write_tls(&mut self) -> anyhow::Result<()> { - if !self.tls.wants_write() { - return Ok(()); - } - - let mut buf = Vec::new(); - let mut buf = Cursor::new(&mut buf); - - while self.tls.wants_write() { - self.tls.write_tls(&mut buf).unwrap(); - } - - let len = buf.get_ref().len(); - - self.stream.write_all(&buf.into_inner()).await?; - self.stream.flush().await?; - - self - .load_test_state - .statistics - .bytes_sent - .fetch_add(len, Ordering::SeqCst); - - if self.queued_responses != 0 { - self.load_test_state.statistics.requests.fetch_add(self.queued_responses, Ordering::SeqCst); - - self.queued_responses = 0; - } - - Ok(()) - } -} \ No newline at end of file diff --git a/aquatic_http_load_test/src/main.rs b/aquatic_http_load_test/src/main.rs index efe3f6b..b0d3120 100644 --- a/aquatic_http_load_test/src/main.rs +++ b/aquatic_http_load_test/src/main.rs @@ -1,5 +1,3 @@ -use std::fs::File; -use std::io::BufReader; use std::sync::{atomic::Ordering, Arc}; use std::thread; use std::time::{Duration, Instant}; @@ -10,7 +8,6 @@ use rand_distr::Pareto; mod common; mod config; -mod glommio; mod network; mod utils; @@ -65,7 +62,7 @@ fn run(config: Config) -> ::anyhow::Result<()> { let state = state.clone(); LocalExecutorBuilder::default().spawn(|| async move { - glommio::run_socket_thread(config, tls_config, state).await.unwrap(); + run_socket_thread(config, tls_config, state).await.unwrap(); }).unwrap(); } @@ -74,33 +71,6 @@ fn run(config: Config) -> ::anyhow::Result<()> { Ok(()) } -struct FakeCertificateVerifier; - -impl rustls::client::ServerCertVerifier for FakeCertificateVerifier { - fn verify_server_cert( - &self, - _end_entity: &rustls::Certificate, - _intermediates: &[rustls::Certificate], - _server_name: &rustls::ServerName, - _scts: &mut dyn Iterator, - _ocsp_response: &[u8], - _now: std::time::SystemTime, - ) -> Result { - Ok(rustls::client::ServerCertVerified::assertion()) - } -} - -fn create_tls_config() -> anyhow::Result> { - let mut config = rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(rustls::RootCertStore::empty()) - .with_no_client_auth(); - - config.dangerous().set_certificate_verifier(Arc::new(FakeCertificateVerifier)); - - Ok(Arc::new(config)) -} - fn monitor_statistics(state: LoadTestState, config: &Config) { let start_time = Instant::now(); let mut report_avg_response_vec: Vec = Vec::new(); @@ -183,3 +153,30 @@ fn monitor_statistics(state: LoadTestState, config: &Config) { } } } + +struct FakeCertificateVerifier; + +impl rustls::client::ServerCertVerifier for FakeCertificateVerifier { + fn verify_server_cert( + &self, + _end_entity: &rustls::Certificate, + _intermediates: &[rustls::Certificate], + _server_name: &rustls::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls::client::ServerCertVerified::assertion()) + } +} + +fn create_tls_config() -> anyhow::Result> { + let mut config = rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(rustls::RootCertStore::empty()) + .with_no_client_auth(); + + config.dangerous().set_certificate_verifier(Arc::new(FakeCertificateVerifier)); + + Ok(Arc::new(config)) +} diff --git a/aquatic_http_load_test/src/network.rs b/aquatic_http_load_test/src/network.rs index bbd9b6c..5783a85 100644 --- a/aquatic_http_load_test/src/network.rs +++ b/aquatic_http_load_test/src/network.rs @@ -1,316 +1,260 @@ -use std::convert::TryInto; -use std::io::{Cursor, ErrorKind, Read, Write}; -use std::sync::Arc; -use std::sync::atomic::Ordering; -use std::time::Duration; +use std::{cell::RefCell, convert::TryInto, io::{Cursor, ErrorKind, Read}, rc::Rc, sync::{Arc, atomic::Ordering}, time::Duration}; -use hashbrown::HashMap; -use mio::{net::TcpStream, Events, Interest, Poll, Token}; -use rand::{prelude::*, rngs::SmallRng}; +use aquatic_http_protocol::response::Response; +use futures_lite::{AsyncReadExt, AsyncWriteExt}; +use glommio::{enclose, prelude::*, timer::TimerActionRepeat}; +use glommio::net::TcpStream; +use rand::{SeedableRng, prelude::SmallRng}; +use rustls::ClientConnection; -use crate::common::*; -use crate::config::*; -use crate::utils::create_random_request; +use crate::{common::LoadTestState, config::Config, utils::create_random_request}; -pub struct Connection { +pub async fn run_socket_thread( + config: Config, + tls_config: Arc, + load_test_state: LoadTestState, +) -> anyhow::Result<()> { + let config = Rc::new(config); + let num_active_connections = Rc::new(RefCell::new(0usize)); + + TimerActionRepeat::repeat(enclose!((config, tls_config, load_test_state, num_active_connections) move || { + enclose!((config, tls_config, load_test_state, num_active_connections) move || async move { + if *num_active_connections.borrow() < config.num_connections { + spawn_local(async move { + if let Err(err) = Connection::run(config, tls_config, load_test_state, num_active_connections).await { + eprintln!("connection creation error: {:?}", err); + } + }).detach(); + } + + Some(Duration::from_secs(1)) + })() + })); + + futures_lite::future::pending::().await; + + Ok(()) +} + +struct Connection { + config: Rc, + load_test_state: LoadTestState, + rng: SmallRng, stream: TcpStream, - tls: rustls::ClientConnection, - read_buffer: [u8; 4096], - response_buffer: Vec, - bytes_read: usize, - can_send: bool, + tls: ClientConnection, + response_buffer: [u8; 2048], + response_buffer_position: usize, + send_new_request: bool, + queued_responses: usize, } impl Connection { - pub fn create_and_register( - config: &Config, + async fn run( + config: Rc, tls_config: Arc, - connections: &mut ConnectionMap, - poll: &mut Poll, - token_counter: &mut usize, + load_test_state: LoadTestState, + num_active_connections: Rc>, ) -> anyhow::Result<()> { - let mut stream = TcpStream::connect(config.server_address)?; - let tls = rustls::ClientConnection::new(tls_config, "example.com".try_into().unwrap())?; + let stream = TcpStream::connect(config.server_address).await + .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; + let tls = ClientConnection::new(tls_config, "example.com".try_into().unwrap()).unwrap(); + let rng = SmallRng::from_entropy(); - poll.registry() - .register(&mut stream, Token(*token_counter), Interest::READABLE) - .unwrap(); - - let connection = Connection { + let mut connection = Connection { + config, + load_test_state, + rng, stream, tls, - read_buffer: [0; 4096], - response_buffer: Vec::new(), - bytes_read: 0, - can_send: true, + response_buffer: [0; 2048], + response_buffer_position: 0, + send_new_request: true, + queued_responses: 0, }; - connections.insert(*token_counter, connection); + *num_active_connections.borrow_mut() += 1; - *token_counter = token_counter.wrapping_add(1); + println!("run connection"); + + if let Err(err) = connection.run_connection_loop().await { + eprintln!("connection error: {:?}", err); + } + + *num_active_connections.borrow_mut() -= 1; Ok(()) } - pub fn read_response(&mut self, state: &LoadTestState) -> bool { - // bool = remove connection + async fn run_connection_loop(&mut self) -> anyhow::Result<()> { loop { - match self.stream.read(&mut self.read_buffer[self.bytes_read..]) { - Ok(0) => { - if self.bytes_read == self.read_buffer.len() { - eprintln!("read buffer is full"); - } + if self.send_new_request { + let request = create_random_request(&self.config, &self.load_test_state, &mut self.rng); - break true; - } - Ok(bytes_read) => { - self.bytes_read += bytes_read; + request.write(&mut self.tls.writer())?; + self.queued_responses += 1; - let mut interesting_bytes = &self.read_buffer[..self.bytes_read]; + self.send_new_request = false; + } - self.tls.read_tls(&mut interesting_bytes as &mut dyn std::io::Read).unwrap(); + self.write_tls().await?; + self.read_tls().await?; + } + } - let io_state = self.tls.process_new_packets().unwrap(); + async fn read_tls(&mut self) -> anyhow::Result<()> { + loop { + let mut buf = [0u8; 1024]; - if io_state.plaintext_bytes_to_read() == 0 { - while self.tls.wants_write(){ - self.tls.write_tls(&mut self.stream).unwrap(); + let bytes_read = self.stream.read(&mut buf).await?; + + if bytes_read == 0 { + return Err(anyhow::anyhow!("Peer has closed connection")); + } + + self + .load_test_state + .statistics + .bytes_received + .fetch_add(bytes_read, Ordering::SeqCst); + + let _ = self.tls.read_tls(&mut &buf[..bytes_read]).unwrap(); + + let io_state = self.tls.process_new_packets()?; + + let mut added_plaintext = false; + + if io_state.plaintext_bytes_to_read() != 0 { + loop { + match self.tls.reader().read(&mut buf) { + Ok(0) => { + break; } + Ok(amt) => { + let end = self.response_buffer_position + amt; - break false; - } + if end > self.response_buffer.len() { + return Err(anyhow::anyhow!("response too large")); + } else { + let response_buffer_slice = &mut self.response_buffer[self.response_buffer_position..end]; - self.tls.reader().read_to_end(&mut self.response_buffer).unwrap(); + response_buffer_slice.copy_from_slice(&buf[..amt]); - let interesting_bytes = &self.response_buffer[..]; - - let mut opt_body_start_index = None; - - for (i, chunk) in interesting_bytes.windows(4).enumerate() { - if chunk == b"\r\n\r\n" { - opt_body_start_index = Some(i + 4); + self.response_buffer_position = end; + added_plaintext = true; + } + } + Err(err) if err.kind() == ErrorKind::WouldBlock => { + break; + } + Err(err) => { break; } } + } + } - if let Some(body_start_index) = opt_body_start_index { - let interesting_bytes = &interesting_bytes[body_start_index..]; + if added_plaintext { + let interesting_bytes = &self.response_buffer[..self.response_buffer_position]; - match Response::from_bytes(interesting_bytes) { - Ok(response) => { - state - .statistics - .bytes_received - .fetch_add(self.bytes_read, Ordering::SeqCst); + let mut opt_body_start_index = None; - match response { - Response::Announce(_) => { - state - .statistics - .responses_announce - .fetch_add(1, Ordering::SeqCst); - } - Response::Scrape(_) => { - state - .statistics - .responses_scrape - .fetch_add(1, Ordering::SeqCst); - } - Response::Failure(response) => { - state - .statistics - .responses_failure - .fetch_add(1, Ordering::SeqCst); - println!( - "failure response: reason: {}", - response.failure_reason - ); - } + for (i, chunk) in interesting_bytes.windows(4).enumerate() { + if chunk == b"\r\n\r\n" { + opt_body_start_index = Some(i + 4); + + break; + } + } + + if let Some(body_start_index) = opt_body_start_index { + let interesting_bytes = &interesting_bytes[body_start_index..]; + + match Response::from_bytes(interesting_bytes) { + Ok(response) => { + + match response { + Response::Announce(_) => { + self + .load_test_state + .statistics + .responses_announce + .fetch_add(1, Ordering::SeqCst); } + Response::Scrape(_) => { + self + .load_test_state + .statistics + .responses_scrape + .fetch_add(1, Ordering::SeqCst); + } + Response::Failure(response) => { + self + .load_test_state + .statistics + .responses_failure + .fetch_add(1, Ordering::SeqCst); + println!( + "failure response: reason: {}", + response.failure_reason + ); + } + } - self.bytes_read = 0; - self.can_send = true; - } - Err(err) => { - eprintln!( - "deserialize response error with {} bytes read: {:?}, text: {}", - self.bytes_read, - err, - String::from_utf8_lossy(interesting_bytes) - ); - } + self.response_buffer_position = 0; + self.send_new_request = true; + + break; + } + Err(err) => { + eprintln!( + "deserialize response error with {} bytes read: {:?}, text: {}", + self.response_buffer_position, + err, + String::from_utf8_lossy(interesting_bytes) + ); } } } - Err(err) if err.kind() == ErrorKind::WouldBlock => { - break false; - } - Err(_) => { - self.bytes_read = 0; + } - break true; - } + if self.tls.wants_write() { + break; } } - } - - pub fn send_request( - &mut self, - config: &Config, - state: &LoadTestState, - rng: &mut impl Rng, - request_buffer: &mut Cursor<&mut [u8]>, - ) -> bool { - // bool = remove connection - if !self.can_send { - return false; - } - - let request = create_random_request(&config, &state, rng); - - request_buffer.set_position(0); - request.write(request_buffer).unwrap(); - let position = request_buffer.position() as usize; - - match self.send_request_inner(state, &request_buffer.get_mut()[..position]) { - Ok(()) => { - state.statistics.requests.fetch_add(1, Ordering::SeqCst); - - self.can_send = false; - - false - } - Err(_) => true, - } - } - - fn send_request_inner( - &mut self, - state: &LoadTestState, - request: &[u8], - ) -> ::std::io::Result<()> { - self.tls.writer().write(request)?; - - let mut bytes_sent = 0; - - while self.tls.wants_write(){ - bytes_sent += self.tls.write_tls(&mut self.stream)?; - } - - state - .statistics - .bytes_sent - .fetch_add(bytes_sent, Ordering::SeqCst); - - self.stream.flush()?; Ok(()) } - fn deregister(&mut self, poll: &mut Poll) -> ::std::io::Result<()> { - poll.registry().deregister(&mut self.stream) + async fn write_tls(&mut self) -> anyhow::Result<()> { + if !self.tls.wants_write() { + return Ok(()); + } + + let mut buf = Vec::new(); + let mut buf = Cursor::new(&mut buf); + + while self.tls.wants_write() { + self.tls.write_tls(&mut buf).unwrap(); + } + + let len = buf.get_ref().len(); + + self.stream.write_all(&buf.into_inner()).await?; + self.stream.flush().await?; + + self + .load_test_state + .statistics + .bytes_sent + .fetch_add(len, Ordering::SeqCst); + + if self.queued_responses != 0 { + self.load_test_state.statistics.requests.fetch_add(self.queued_responses, Ordering::SeqCst); + + self.queued_responses = 0; + } + + Ok(()) } -} - -pub type ConnectionMap = HashMap; - -pub fn run_socket_thread( - config: &Config, - tls_config: Arc, - state: LoadTestState, - num_initial_requests: usize -) { - let timeout = Duration::from_micros(config.network.poll_timeout_microseconds); - let create_conn_interval = 2 ^ config.network.connection_creation_interval; - - let mut connections: ConnectionMap = HashMap::with_capacity(config.num_connections); - let mut poll = Poll::new().expect("create poll"); - let mut events = Events::with_capacity(config.network.poll_event_capacity); - let mut rng = SmallRng::from_entropy(); - let mut request_buffer = [0u8; 1024]; - let mut request_buffer = Cursor::new(&mut request_buffer[..]); - - let mut token_counter = 0usize; - - for _ in 0..num_initial_requests { - Connection::create_and_register(config, tls_config.clone(), &mut connections, &mut poll, &mut token_counter) - .unwrap(); - } - - let mut iter_counter = 0usize; - let mut num_to_create = 0usize; - - let mut drop_connections = Vec::with_capacity(config.num_connections); - - loop { - poll.poll(&mut events, Some(timeout)) - .expect("failed polling"); - - for event in events.iter() { - if event.is_readable() { - let token = event.token(); - - if let Some(connection) = connections.get_mut(&token.0) { - // Note that this does not indicate successfully reading - // response - if connection.read_response(&state) { - remove_connection(&mut poll, &mut connections, token.0); - - num_to_create += 1; - } - } else { - eprintln!("connection not found: {:?}", token); - } - } - } - - for (k, connection) in connections.iter_mut() { - let remove_connection = - connection.send_request(config, &state, &mut rng, &mut request_buffer); - - if remove_connection { - drop_connections.push(*k); - } - } - - for k in drop_connections.drain(..) { - remove_connection(&mut poll, &mut connections, k); - - num_to_create += 1; - } - - let max_new = config.num_connections - connections.len(); - - if iter_counter % create_conn_interval == 0 { - num_to_create += 1; - } - - num_to_create = num_to_create.min(max_new); - - for _ in 0..num_to_create { - let ok = Connection::create_and_register( - config, - tls_config.clone(), - &mut connections, - &mut poll, - &mut token_counter, - ) - .is_ok(); - - if ok { - num_to_create -= 1; - } - } - - iter_counter = iter_counter.wrapping_add(1); - } -} - -fn remove_connection(poll: &mut Poll, connections: &mut ConnectionMap, connection_id: usize) { - if let Some(mut connection) = connections.remove(&connection_id) { - if let Err(err) = connection.deregister(poll) { - eprintln!("couldn't deregister connection: {}", err); - } - } -} +} \ No newline at end of file