aquatic_ws_load_test: fix multiple issues with implementation

This commit is contained in:
Joakim Frostegård 2020-08-03 05:15:24 +02:00
parent 5523c87634
commit 9f445c7c03
3 changed files with 88 additions and 148 deletions

View file

@ -53,18 +53,11 @@ fn run(config: Config) -> ::anyhow::Result<()> {
pareto: Arc::new(pareto), pareto: Arc::new(pareto),
}; };
// Start socket workers
for _ in 0..config.num_workers { for _ in 0..config.num_workers {
let config = config.clone(); let config = config.clone();
let state = state.clone(); let state = state.clone();
thread::spawn(move || run_socket_thread( thread::spawn(move || run_socket_thread(&config, state,));
&config,
state,
1
));
} }
monitor_statistics( monitor_statistics(

View file

@ -63,14 +63,12 @@ impl ConnectionState {
Self::WebSocket(ws) => Some(Self::WebSocket(ws)), Self::WebSocket(ws) => Some(Self::WebSocket(ws)),
} }
} }
} }
pub struct Connection { pub struct Connection {
stream: ConnectionState, stream: ConnectionState,
can_send_initial: bool, can_send: bool,
marked_as_complete: bool,
send_answer: Option<(PeerId, OfferId)>, send_answer: Option<(PeerId, OfferId)>,
} }
@ -85,13 +83,12 @@ impl Connection {
let mut stream = TcpStream::connect(config.server_address)?; let mut stream = TcpStream::connect(config.server_address)?;
poll.registry() poll.registry()
.register(&mut stream, Token(*token_counter), Interest::READABLE) .register(&mut stream, Token(*token_counter), Interest::READABLE | Interest::WRITABLE)
.unwrap(); .unwrap();
let connection = Connection { let connection = Connection {
stream: ConnectionState::TcpStream(stream), stream: ConnectionState::TcpStream(stream),
can_send_initial: false, can_send: false,
marked_as_complete: false,
send_answer: None, send_answer: None,
}; };
@ -104,26 +101,23 @@ impl Connection {
pub fn advance(self, config: &Config) -> Option<Self> { pub fn advance(self, config: &Config) -> Option<Self> {
if let Some(stream) = self.stream.advance(config){ if let Some(stream) = self.stream.advance(config){
let can_send = matches!(stream, ConnectionState::WebSocket(_));
Some(Self { Some(Self {
stream, stream,
can_send_initial: self.can_send_initial, can_send,
marked_as_complete: false, send_answer: None,
send_answer: self.send_answer,
}) })
} else { } else {
None None
} }
} }
pub fn read_response_and_send_request( pub fn read_responses(
&mut self, &mut self,
config: &Config,
state: &LoadTestState, state: &LoadTestState,
rng: &mut impl Rng, ) -> bool { // bool = drop connection
){
if let ConnectionState::WebSocket(ref mut ws) = self.stream { if let ConnectionState::WebSocket(ref mut ws) = self.stream {
let mut send_random_request = false;
loop { loop {
match ws.read_message(){ match ws.read_message(){
Ok(message) => { Ok(message) => {
@ -137,25 +131,25 @@ impl Connection {
offer.offer_id offer.offer_id
)); ));
send_random_request = true; self.can_send = true;
}, },
Ok(OutMessage::Answer(_)) => { Ok(OutMessage::Answer(_)) => {
state.statistics.responses_answer state.statistics.responses_answer
.fetch_add(1, Ordering::SeqCst); .fetch_add(1, Ordering::SeqCst);
send_random_request = true; self.can_send = true;
}, },
Ok(OutMessage::AnnounceResponse(_)) => { Ok(OutMessage::AnnounceResponse(_)) => {
state.statistics.responses_announce state.statistics.responses_announce
.fetch_add(1, Ordering::SeqCst); .fetch_add(1, Ordering::SeqCst);
send_random_request = true; self.can_send = true;
}, },
Ok(OutMessage::ScrapeResponse(_)) => { Ok(OutMessage::ScrapeResponse(_)) => {
state.statistics.responses_scrape state.statistics.responses_scrape
.fetch_add(1, Ordering::SeqCst); .fetch_add(1, Ordering::SeqCst);
send_random_request = true; self.can_send = true;
}, },
Err(err) => { Err(err) => {
eprintln!("error deserializing offer: {:?}", err); eprintln!("error deserializing offer: {:?}", err);
@ -163,78 +157,75 @@ impl Connection {
} }
}, },
Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => { Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => {
self.can_send_initial = false; return false;
break;
}, },
Err(err) => { Err(err) => {
eprintln!("handle_read_event error: {}", err); // eprintln!("handle_read_event error: {}", err);
break; return true;
} }
} }
};
if send_random_request {
self.send_random_request(
config,
state,
rng,
);
} }
} }
false
} }
pub fn send_random_request( pub fn send_request(
&mut self, &mut self,
config: &Config, config: &Config,
state: &LoadTestState, state: &LoadTestState,
rng: &mut impl Rng, rng: &mut impl Rng,
){ ) -> bool { // bool = remove connection
let request = create_random_request( if !self.can_send {
&config, return false;
&state, }
rng
);
// If self.send_answer is set and request is announce request, make
// the request an offer answer
let request = if let InMessage::AnnounceRequest(mut r) = request {
if let Some((peer_id, offer_id)) = self.send_answer {
r.to_peer_id = Some(peer_id);
r.offer_id = Some(offer_id);
r.answer = Some(JsonValue(::serde_json::Value::from(JSON_VALUE)));
r.offers = None;
}
self.send_answer = None;
InMessage::AnnounceRequest(r)
} else {
request
};
self.send_request(state, request)
}
fn send_request(
&mut self,
state: &LoadTestState,
request: InMessage
){
if let ConnectionState::WebSocket(ref mut ws) = self.stream { if let ConnectionState::WebSocket(ref mut ws) = self.stream {
let request = create_random_request(
&config,
&state,
rng
);
// If self.send_answer is set and request is announce request, make
// the request an offer answer
let request = if let InMessage::AnnounceRequest(mut r) = request {
if let Some((peer_id, offer_id)) = self.send_answer {
r.to_peer_id = Some(peer_id);
r.offer_id = Some(offer_id);
r.answer = Some(JsonValue(::serde_json::Value::from(JSON_VALUE)));
r.offers = None;
}
self.send_answer = None;
InMessage::AnnounceRequest(r)
} else {
request
};
match ws.write_message(request.to_ws_message()){ match ws.write_message(request.to_ws_message()){
Ok(_) => { Ok(()) => {
state.statistics.requests.fetch_add(1, Ordering::SeqCst); state.statistics.requests.fetch_add(1, Ordering::SeqCst);
self.can_send = false;
false
}, },
Err(tungstenite::Error::Io(err)) if err.kind() == ErrorKind::WouldBlock => {
false
}
Err(err) => { Err(err) => {
eprintln!("send request error: {}", err); // eprintln!("send request error: {:?}", err);
true
} }
} }
self.can_send_initial = false;
} else { } else {
println!("send request can't send to non-ws stream"); println!("send request can't send to non-ws stream");
false
} }
} }
} }
@ -246,7 +237,6 @@ pub type ConnectionMap = HashMap<usize, Connection>;
pub fn run_socket_thread( pub fn run_socket_thread(
config: &Config, config: &Config,
state: LoadTestState, state: LoadTestState,
num_initial_requests: usize,
) { ) {
let timeout = Duration::from_micros(config.network.poll_timeout_microseconds); let timeout = Duration::from_micros(config.network.poll_timeout_microseconds);
let create_conn_interval = 2 ^ config.network.connection_creation_interval; let create_conn_interval = 2 ^ config.network.connection_creation_interval;
@ -257,98 +247,56 @@ pub fn run_socket_thread(
let mut rng = SmallRng::from_entropy(); let mut rng = SmallRng::from_entropy();
let mut token_counter = 0usize; let mut token_counter = 0usize;
for _ in 0..num_initial_requests {
Connection::create_and_register(
config,
&mut connections,
&mut poll,
&mut token_counter,
).unwrap();
}
println!("num connections in map: {}", connections.len());
let mut initial_sent = false;
let mut iter_counter = 0usize; let mut iter_counter = 0usize;
let mut num_completed = 0usize; let mut drop_keys = Vec::new();
loop { loop {
poll.poll(&mut events, Some(timeout)) poll.poll(&mut events, Some(timeout))
.expect("failed polling"); .expect("failed polling");
for event in events.iter(){ for event in events.iter(){
let token = event.token();
if event.is_readable(){ if event.is_readable(){
let token = event.token();
let mut run_advance = false;
if let Some(connection) = connections.get_mut(&token.0){ if let Some(connection) = connections.get_mut(&token.0){
if let ConnectionState::WebSocket(_) = connection.stream { if let ConnectionState::WebSocket(_) = connection.stream {
connection.read_response_and_send_request( let drop_connection = connection.read_responses(&state);
config,
&state,
&mut rng,
);
} else {
run_advance = true;
println!("set run_advance=true"); if drop_connection {
} connections.remove(&token.0);
} else {
eprintln!("connection not found: {:?}", token);
}
if run_advance {
let connection = connections.remove(&token.0).unwrap();
if let Some(connection) = connection.advance(config){
println!("advanced connection");
connections.insert(token.0, connection);
}
}
}
}
if num_completed != token_counter {
for k in 0..token_counter {
if let Some(mut connection) = connections.remove(&k){
if let ConnectionState::WebSocket(_) = connection.stream {
if !connection.marked_as_complete {
connection.can_send_initial = true;
connection.marked_as_complete = true;
initial_sent = false;
num_completed += 1;
} }
connections.insert(k, connection); continue;
} else if let Some(c) = connection.advance(config){
connections.insert(k, c);
} }
} else { }
// println!("connection not found for token {}", k); }
if let Some(connection) = connections.remove(&token.0){
if let Some(connection) = connection.advance(config){
connections.insert(token.0, connection);
} }
} }
} }
if !initial_sent { for (k, connection) in connections.iter_mut(){
for (_, connection) in connections.iter_mut(){ let drop_connection = connection.send_request(
if connection.can_send_initial { config,
&state,
&mut rng,
);
connection.send_random_request( if drop_connection {
config, drop_keys.push(*k)
&state,
&mut rng,
);
initial_sent = true;
}
} }
} }
for k in drop_keys.drain(..){
connections.remove(&k);
}
// Slowly create new connections // Slowly create new connections
if token_counter < config.num_connections && iter_counter % create_conn_interval == 0 { if connections.len() < config.num_connections && iter_counter % create_conn_interval == 0 {
let res = Connection::create_and_register( let res = Connection::create_and_register(
config, config,
&mut connections, &mut connections,
@ -359,8 +307,6 @@ pub fn run_socket_thread(
if let Err(err) = res { if let Err(err) = res {
eprintln!("create connection error: {}", err); eprintln!("create connection error: {}", err);
} }
// initial_sent = false;
} }
iter_counter = iter_counter.wrapping_add(1); iter_counter = iter_counter.wrapping_add(1);

View file

@ -260,7 +260,8 @@ impl InMessage {
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
pub enum OutMessage { pub enum OutMessage {
AnnounceResponse(AnnounceResponse), AnnounceResponse(AnnounceResponse),
ScrapeResponse(ScrapeResponse), ScrapeResponse(ScrapeResponse),