WIP: work on http load test (now partly working) and http protocol

This commit is contained in:
Joakim Frostegård 2020-07-20 14:30:36 +02:00
parent 5b0d364ccf
commit d1e9d24773
10 changed files with 326 additions and 226 deletions

View file

@ -133,7 +133,7 @@ impl Default for HandlerConfig {
peer_seeder_probability: 0.25,
scrape_max_torrents: 50,
weight_announce: 5,
weight_scrape: 1,
weight_scrape: 0,
additional_request_factor: 0.4,
max_responses_per_iter: 10_000,
channel_timeout: 200,

View file

@ -40,8 +40,10 @@ fn run(config: Config) -> ::anyhow::Result<()> {
let mut info_hashes = Vec::with_capacity(config.handler.number_of_torrents);
let mut rng = SmallRng::from_entropy();
for _ in 0..config.handler.number_of_torrents {
info_hashes.push(generate_info_hash());
info_hashes.push(generate_info_hash(&mut rng));
}
let pareto = Pareto::new(
@ -57,60 +59,20 @@ fn run(config: Config) -> ::anyhow::Result<()> {
// Start socket workers
let (response_sender, response_receiver) = unbounded();
let mut request_senders = Vec::new();
for i in 0..config.num_socket_workers {
let thread_id = ThreadId(i);
for _ in 0..config.num_socket_workers {
let (sender, receiver) = unbounded();
let port = config.network.first_port + (i as u16);
let addr = if config.network.multiple_client_ips {
let ip = if config.network.ipv6_client { // FIXME: test ipv6
Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1 + i as u16).into()
} else {
Ipv4Addr::new(127, 0, 0, 1 + i).into()
};
SocketAddr::new(ip, port)
} else {
let ip = if config.network.ipv6_client {
Ipv6Addr::LOCALHOST.into()
} else {
Ipv4Addr::LOCALHOST.into()
};
SocketAddr::new(ip, port)
};
request_senders.push(sender);
let config = config.clone();
let response_sender = response_sender.clone();
let state = state.clone();
thread::spawn(move || run_socket_thread(
&config,
state,
response_sender,
receiver,
&config,
addr,
thread_id
));
}
for _ in 0..config.num_request_workers {
let config = config.clone();
let state= state.clone();
let request_senders = request_senders.clone();
let response_receiver = response_receiver.clone();
thread::spawn(move || run_handler_thread(
&config,
state,
request_senders,
response_receiver,
));
}

View file

@ -1,198 +1,248 @@
use std::io::Cursor;
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::io::{Read, Write, ErrorKind};
use anyhow::Context;
use crossbeam_channel::{Receiver, Sender};
use mio::{net::UdpSocket, Events, Poll, Interest, Token};
use hashbrown::HashMap;
use mio::{net::TcpStream, Events, Poll, Interest, Token};
use socket2::{Socket, Domain, Type, Protocol};
use rand::{rngs::SmallRng, prelude::*};
use crate::common::*;
use crate::handler::create_random_request;
const MAX_PACKET_SIZE: usize = 4096;
pub fn create_socket(
config: &Config,
addr: SocketAddr
) -> ::std::net::UdpSocket {
let socket = if addr.is_ipv4(){
Socket::new(Domain::ipv4(), Type::dgram(), Some(Protocol::udp()))
} else {
Socket::new(Domain::ipv6(), Type::dgram(), Some(Protocol::udp()))
}.expect("create socket");
socket.set_nonblocking(true)
.expect("socket: set nonblocking");
if config.network.recv_buffer != 0 {
if let Err(err) = socket.set_recv_buffer_size(config.network.recv_buffer){
eprintln!(
"socket: failed setting recv buffer to {}: {:?}",
config.network.recv_buffer,
err
);
}
pub fn create_socket(
address: SocketAddr,
server_address: SocketAddr,
ipv6_only: bool
) -> ::anyhow::Result<TcpStream> {
let builder = if address.is_ipv4(){
Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp()))
} else {
Socket::new(Domain::ipv6(), Type::stream(), Some(Protocol::tcp()))
}.context("Couldn't create socket2::Socket")?;
if ipv6_only {
builder.set_only_v6(true)
.context("Couldn't put socket in ipv6 only mode")?
}
socket.bind(&addr.into())
.expect(&format!("socket: bind to {}", addr));
builder.set_nonblocking(true)
.context("Couldn't put socket in non-blocking mode")?;
builder.set_reuse_port(true)
.context("Couldn't put socket in reuse_port mode")?;
builder.connect(&server_address.into()).with_context(||
format!("Couldn't connect to address {}", server_address)
)?;
socket.connect(&config.server_address.into())
.expect("socket: connect to server");
socket.into_udp_socket()
Ok(TcpStream::from_std(builder.into_tcp_stream()))
}
pub fn run_socket_thread(
state: LoadTestState,
response_channel_sender: Sender<(ThreadId, Response)>,
request_receiver: Receiver<Request>,
config: &Config,
addr: SocketAddr,
thread_id: ThreadId
) {
let mut socket = UdpSocket::from_std(create_socket(config, addr));
let mut buffer = [0u8; MAX_PACKET_SIZE];
let token = Token(thread_id.0 as usize);
let interests = Interest::READABLE;
pub struct Connection {
stream: TcpStream,
read_buffer: [u8; 2048],
bytes_read: usize,
can_send: bool,
}
impl Connection {
pub fn create_and_register(
config: &Config,
state: &LoadTestState,
connections: &mut ConnectionMap,
poll: &mut Poll,
token_counter: &mut usize,
) -> anyhow::Result<()> {
let mut stream = TcpStream::connect(config.server_address)?;
poll.registry()
.register(&mut stream, Token(*token_counter), Interest::READABLE)
.unwrap();
let connection = Connection {
stream,
read_buffer: [0; 2048],
bytes_read: 0,
can_send: true,
};
connections.insert(*token_counter, connection);
*token_counter = token_counter.wrapping_add(1);
Ok(())
}
pub fn read_response_and_send_request(
&mut self,
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
) -> bool { // true = response received
loop {
match self.stream.read(&mut self.read_buffer){
Ok(bytes_read) => {
self.bytes_read = bytes_read;
break;
},
Err(err) if err.kind() == ErrorKind::WouldBlock => {
self.can_send = false;
eprintln!("handle_read_event error would block: {}", err);
return false;
},
Err(err) => {
self.bytes_read = 0;
eprintln!("handle_read_event error: {}", err);
return false;
}
}
};
let res_response = Response::from_bytes(
&self.read_buffer[..self.bytes_read]
);
self.bytes_read = 0;
match res_response {
Ok(Response::Announce(_)) => {
state.statistics.responses_announce
.fetch_add(1, Ordering::SeqCst);
},
Ok(Response::Scrape(_)) => {
state.statistics.responses_scrape
.fetch_add(1, Ordering::SeqCst);
},
Ok(Response::Failure(_)) => {
state.statistics.responses_failure
.fetch_add(1, Ordering::SeqCst);
},
Err(err) => {
eprintln!("response from bytes error: {}", err);
return false;
}
}
self.send_request(
config,
state,
rng
);
true
}
pub fn send_request(
&mut self,
config: &Config,
state: &LoadTestState,
rng: &mut impl Rng,
){
let request = create_random_request(
&config,
&state,
rng
);
match self.send_request_inner(&request.as_bytes()){
Ok(_) => {
state.statistics.requests.fetch_add(1, Ordering::SeqCst);
},
Err(err) => {
eprintln!("send request error: {}", err);
}
}
self.can_send = false;
}
fn send_request_inner(&mut self, request: &[u8]) -> ::std::io::Result<()> {
self.stream.write(request)?;
self.stream.flush()?;
Ok(())
}
}
pub type ConnectionMap = HashMap<usize, Connection>;
pub fn run_socket_thread(
config: &Config,
state: LoadTestState,
request_receiver: Receiver<Request>,
) {
let timeout = Duration::from_micros(config.network.poll_timeout);
let mut connections: ConnectionMap = HashMap::new();
let mut poll = Poll::new().expect("create poll");
poll.registry()
.register(&mut socket, token, interests)
.unwrap();
let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut rng = SmallRng::from_entropy();
let mut statistics = SocketWorkerLocalStatistics::default();
let mut responses = Vec::new();
let mut token_counter = 0usize;
for _ in request_receiver.try_recv(){
Connection::create_and_register(
config,
&state,
&mut connections,
&mut poll,
&mut token_counter,
).unwrap();
}
loop {
let mut responses_received = 0usize;
poll.poll(&mut events, Some(timeout))
.expect("failed polling");
for event in events.iter(){
if event.token() == token {
if event.is_readable(){
read_responses(
thread_id,
&socket,
&mut buffer,
&mut statistics,
&mut responses
);
if event.is_readable(){
let token = event.token();
for r in responses.drain(..){
response_channel_sender.send(r)
.expect(&format!(
"add response to channel in socket worker {}",
thread_id.0
));
if let Some(connection) = connections.get_mut(&token.0){
if connection.read_response_and_send_request(config, &state, &mut rng){
responses_received += 1;
}
poll.registry()
.reregister(&mut socket, token, interests)
.unwrap();
} else {
eprintln!("connection not found: {:?}", token);
}
}
}
send_requests(
for (_, connection) in connections.iter_mut(){
if connection.can_send {
connection.send_request(config, &state, &mut rng);
}
}
if token_counter < 1 && responses_received > 0 {
Connection::create_and_register(
config,
&state,
&mut socket,
&mut buffer,
&request_receiver,
&mut statistics
);
}
send_requests(
&state,
&mut socket,
&mut buffer,
&request_receiver,
&mut statistics
);
state.statistics.requests
.fetch_add(statistics.requests, Ordering::SeqCst);
state.statistics.responses_announce
.fetch_add(statistics.responses_announce, Ordering::SeqCst);
state.statistics.responses_scrape
.fetch_add(statistics.responses_scrape, Ordering::SeqCst);
state.statistics.responses_failure
.fetch_add(statistics.responses_failure, Ordering::SeqCst);
state.statistics.response_peers
.fetch_add(statistics.response_peers, Ordering::SeqCst);
statistics = SocketWorkerLocalStatistics::default();
}
}
fn read_responses(
thread_id: ThreadId,
socket: &UdpSocket,
buffer: &mut [u8],
ls: &mut SocketWorkerLocalStatistics,
responses: &mut Vec<(ThreadId, Response)>,
){
while let Ok(amt) = socket.recv(buffer) {
match response_from_bytes(&buffer[0..amt]){
Ok(response) => {
match response {
Response::Announce(ref r) => {
ls.responses_announce += 1;
ls.response_peers += r.peers.len();
},
Response::Scrape(_) => {
ls.responses_scrape += 1;
},
Response::Failure(_) => {
ls.responses_failure += 1;
},
}
responses.push((thread_id, response))
},
Err(err) => {
eprintln!("Received invalid response: {:#?}", err);
}
&mut connections,
&mut poll,
&mut token_counter,
).unwrap();
}
}
}
fn send_requests(
state: &LoadTestState,
socket: &mut UdpSocket,
buffer: &mut [u8],
receiver: &Receiver<Request>,
statistics: &mut SocketWorkerLocalStatistics,
){
let mut cursor = Cursor::new(buffer);
while let Ok(request) = receiver.try_recv() {
cursor.set_position(0);
if let Err(err) = request_to_bytes(&mut cursor, request){
eprintln!("request_to_bytes err: {}", err);
}
let position = cursor.position() as usize;
let inner = cursor.get_ref();
match socket.send(&inner[..position]) {
Ok(_) => {
statistics.requests += 1;
},
Err(err) => {
eprintln!("Couldn't send packet: {:?}", err);
}
}
}
}

View file

@ -11,13 +11,13 @@ pub fn select_info_hash_index(
state: &LoadTestState,
rng: &mut impl Rng,
) -> usize {
pareto_usize(rng, state.pareto, config.handler.number_of_torrents - 1)
pareto_usize(rng, &state.pareto, config.handler.number_of_torrents - 1)
}
pub fn pareto_usize(
rng: &mut impl Rng,
pareto: Arc<Pareto<f64>>,
pareto: &Arc<Pareto<f64>>,
max: usize,
) -> usize {
let p: f64 = pareto.sample(rng);
@ -27,27 +27,6 @@ pub fn pareto_usize(
}
pub fn generate_peer_id() -> PeerId {
PeerId(random_20_bytes())
pub fn generate_info_hash(rng: &mut impl Rng) -> InfoHash {
InfoHash(rng.gen())
}
pub fn generate_info_hash() -> InfoHash {
InfoHash(random_20_bytes())
}
// Don't use SmallRng here for now
fn random_20_bytes() -> [u8; 20] {
let mut bytes = [0; 20];
for (i, b) in rand::thread_rng()
.sample_iter(&Standard)
.enumerate()
.take(20) {
bytes[i] = b
}
bytes
}