mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
aquatic_http_load_test: fix connection buffer issue, optimize
This commit is contained in:
parent
ff6eddfc85
commit
75ebe3208d
1 changed files with 26 additions and 22 deletions
|
|
@ -13,9 +13,9 @@ use crate::utils::create_random_request;
|
|||
|
||||
pub struct Connection {
|
||||
stream: TcpStream,
|
||||
read_buffer: [u8; 2048],
|
||||
read_buffer: [u8; 4096],
|
||||
bytes_read: usize,
|
||||
can_send: bool,
|
||||
can_send_initial: bool,
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -34,9 +34,9 @@ impl Connection {
|
|||
|
||||
let connection = Connection {
|
||||
stream,
|
||||
read_buffer: [0; 2048],
|
||||
read_buffer: [0; 4096],
|
||||
bytes_read: 0,
|
||||
can_send: true,
|
||||
can_send_initial: true,
|
||||
};
|
||||
|
||||
connections.insert(*token_counter, connection);
|
||||
|
|
@ -51,27 +51,27 @@ impl Connection {
|
|||
config: &Config,
|
||||
state: &LoadTestState,
|
||||
rng: &mut impl Rng,
|
||||
) -> bool { // true = response received
|
||||
){
|
||||
loop {
|
||||
match self.stream.read(&mut self.read_buffer){
|
||||
match self.stream.read(&mut self.read_buffer[self.bytes_read..]){
|
||||
Ok(bytes_read) => {
|
||||
self.bytes_read = bytes_read;
|
||||
|
||||
break;
|
||||
},
|
||||
Err(err) if err.kind() == ErrorKind::WouldBlock => {
|
||||
self.can_send = false;
|
||||
self.can_send_initial = false;
|
||||
|
||||
eprintln!("handle_read_event error would block: {}", err);
|
||||
|
||||
return false;
|
||||
return;
|
||||
},
|
||||
Err(err) => {
|
||||
self.bytes_read = 0;
|
||||
|
||||
eprintln!("handle_read_event error: {}", err);
|
||||
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
@ -98,7 +98,7 @@ impl Connection {
|
|||
Err(err) => {
|
||||
eprintln!("response from bytes error: {}", err);
|
||||
|
||||
return false;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -107,8 +107,6 @@ impl Connection {
|
|||
state,
|
||||
rng
|
||||
);
|
||||
|
||||
true
|
||||
}
|
||||
|
||||
pub fn send_request(
|
||||
|
|
@ -132,7 +130,7 @@ impl Connection {
|
|||
}
|
||||
}
|
||||
|
||||
self.can_send = false;
|
||||
self.can_send_initial = false;
|
||||
}
|
||||
|
||||
fn send_request_inner(&mut self, request: &[u8]) -> ::std::io::Result<()> {
|
||||
|
|
@ -171,9 +169,9 @@ pub fn run_socket_thread(
|
|||
).unwrap();
|
||||
}
|
||||
|
||||
loop {
|
||||
let mut responses_received = 0usize;
|
||||
let mut initial_sent = false;
|
||||
|
||||
loop {
|
||||
poll.poll(&mut events, Some(timeout))
|
||||
.expect("failed polling");
|
||||
|
||||
|
|
@ -182,22 +180,28 @@ pub fn run_socket_thread(
|
|||
let token = event.token();
|
||||
|
||||
if let Some(connection) = connections.get_mut(&token.0){
|
||||
if connection.read_response_and_send_request(config, &state, &mut rng){
|
||||
responses_received += 1;
|
||||
}
|
||||
connection.read_response_and_send_request(
|
||||
config,
|
||||
&state,
|
||||
&mut rng
|
||||
);
|
||||
} else {
|
||||
eprintln!("connection not found: {:?}", token);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (_, connection) in connections.iter_mut(){
|
||||
if connection.can_send {
|
||||
connection.send_request(config, &state, &mut rng);
|
||||
if !initial_sent {
|
||||
for connection in connections.values_mut(){
|
||||
if connection.can_send_initial {
|
||||
connection.send_request(config, &state, &mut rng);
|
||||
|
||||
initial_sent = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if token_counter < 1 && responses_received > 0 {
|
||||
if token_counter < 1 { // Only create one connection at the moment
|
||||
Connection::create_and_register(
|
||||
config,
|
||||
&mut connections,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue