aquatic_ws, aquatic_ws_load_test: cargo fmt

This commit is contained in:
Joakim Frostegård 2021-11-02 20:33:14 +01:00
parent d922e5e680
commit 78d29770f3
3 changed files with 16 additions and 12 deletions

View file

@ -17,7 +17,7 @@ use futures_lite::StreamExt;
use futures_rustls::server::TlsStream; use futures_rustls::server::TlsStream;
use futures_rustls::TlsAcceptor; use futures_rustls::TlsAcceptor;
use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{LocalReceiver, LocalSender, new_unbounded}; use glommio::channels::local_channel::{new_unbounded, LocalReceiver, LocalSender};
use glommio::channels::shared_channel::ConnectedReceiver; use glommio::channels::shared_channel::ConnectedReceiver;
use glommio::net::{TcpListener, TcpStream}; use glommio::net::{TcpListener, TcpStream};
use glommio::timer::TimerActionRepeat; use glommio::timer::TimerActionRepeat;
@ -95,8 +95,7 @@ pub async fn run_socket_worker(
while let Some(stream) = incoming.next().await { while let Some(stream) = incoming.next().await {
match stream { match stream {
Ok(stream) => { Ok(stream) => {
let (out_message_sender, out_message_receiver) = let (out_message_sender, out_message_receiver) = new_unbounded();
new_unbounded();
let out_message_sender = Rc::new(out_message_sender); let out_message_sender = Rc::new(out_message_sender);
let key = RefCell::borrow_mut(&connection_slab).insert(ConnectionReference { let key = RefCell::borrow_mut(&connection_slab).insert(ConnectionReference {
@ -160,7 +159,7 @@ async fn receive_out_messages(
.get(channel_out_message.0.connection_id.0) .get(channel_out_message.0.connection_id.0)
{ {
match reference.out_message_sender.try_send(channel_out_message) { match reference.out_message_sender.try_send(channel_out_message) {
Ok(()) | Err(GlommioError::Closed(_)) => {}, Ok(()) | Err(GlommioError::Closed(_)) => {}
Err(err) => { Err(err) => {
::log::error!( ::log::error!(
"Couldn't send out_message from shared channel to local receiver: {:?}", "Couldn't send out_message from shared channel to local receiver: {:?}",

View file

@ -97,7 +97,6 @@ fn create_tls_config() -> anyhow::Result<Arc<rustls::ClientConfig>> {
Ok(Arc::new(config)) Ok(Arc::new(config))
} }
fn monitor_statistics(state: LoadTestState, config: &Config) { fn monitor_statistics(state: LoadTestState, config: &Config) {
let start_time = Instant::now(); let start_time = Instant::now();
let mut report_avg_response_vec: Vec<f64> = Vec::new(); let mut report_avg_response_vec: Vec<f64> = Vec::new();

View file

@ -7,12 +7,12 @@ use std::{
}; };
use aquatic_ws_protocol::{InMessage, JsonValue, OfferId, OutMessage, PeerId}; use aquatic_ws_protocol::{InMessage, JsonValue, OfferId, OutMessage, PeerId};
use async_tungstenite::{WebSocketStream, client_async}; use async_tungstenite::{client_async, WebSocketStream};
use futures::{StreamExt, SinkExt}; use futures::{SinkExt, StreamExt};
use futures_rustls::{TlsConnector, client::TlsStream}; use futures_rustls::{client::TlsStream, TlsConnector};
use glommio::net::TcpStream; use glommio::net::TcpStream;
use glommio::{prelude::*, timer::TimerActionRepeat}; use glommio::{prelude::*, timer::TimerActionRepeat};
use rand::{Rng, SeedableRng, prelude::SmallRng}; use rand::{prelude::SmallRng, Rng, SeedableRng};
use crate::{common::LoadTestState, config::Config, utils::create_random_request}; use crate::{common::LoadTestState, config::Config, utils::create_random_request};
@ -80,7 +80,9 @@ impl Connection {
let stream = TcpStream::connect(config.server_address) let stream = TcpStream::connect(config.server_address)
.await .await
.map_err(|err| anyhow::anyhow!("connect: {:?}", err))?; .map_err(|err| anyhow::anyhow!("connect: {:?}", err))?;
let stream = TlsConnector::from(tls_config).connect("example.com".try_into().unwrap(), stream).await?; let stream = TlsConnector::from(tls_config)
.connect("example.com".try_into().unwrap(), stream)
.await?;
let request = format!( let request = format!(
"ws://{}:{}", "ws://{}:{}",
config.server_address.ip(), config.server_address.ip(),
@ -114,8 +116,12 @@ impl Connection {
async fn run_connection_loop(&mut self) -> anyhow::Result<()> { async fn run_connection_loop(&mut self) -> anyhow::Result<()> {
loop { loop {
if self.can_send { if self.can_send {
let request = let request = create_random_request(
create_random_request(&self.config, &self.load_test_state, &mut self.rng, self.peer_id); &self.config,
&self.load_test_state,
&mut self.rng,
self.peer_id,
);
// If self.send_answer is set and request is announce request, make // If self.send_answer is set and request is announce request, make
// the request an offer answer // the request an offer answer