Update TODO; minor fixes to aquatic_http and aquatic_common_tcp

This commit is contained in:
Joakim Frostegård 2020-07-02 23:32:48 +02:00
parent 7bc47ab93f
commit f3dcc8762e
4 changed files with 107 additions and 105 deletions

View file

@ -7,12 +7,16 @@
## aquatic_http
* handshake stuff
* fix overcomplicated and possibly incorrect implementation
* fixed size buffer is probably bad
* test tls
* request parsing in protocol module instead of in network? Not obvious
what error return type to use then
* compact peer representation in announce response: is implementation correct?
* scrape info hash parsing: multiple ought to be accepted
* info hashes, peer ids: check that whole deserialization and url decoding
works as it should. There are suspicously many `\u{fffd}`
* established connections do not get valid_until updated, I think? Might
be the case in aquatic_ws too.
* move stuff to common crate with ws: what about Request/InMessage etc?
* don't overdo this

View file

@ -1,5 +1,3 @@
use std::net::SocketAddr;
use serde::{Serialize, Deserialize};

View file

@ -19,6 +19,106 @@ use crate::protocol::*;
use connection::*;
pub fn run_socket_worker(
config: Config,
socket_worker_index: usize,
socket_worker_statuses: SocketWorkerStatuses,
request_channel_sender: RequestChannelSender,
response_channel_receiver: ResponseChannelReceiver,
opt_tls_acceptor: Option<TlsAcceptor>,
){
match create_listener(config.network.address, config.network.ipv6_only){
Ok(listener) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(()));
run_poll_loop(
config,
socket_worker_index,
request_channel_sender,
response_channel_receiver,
listener,
opt_tls_acceptor
);
},
Err(err) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(
Err(format!("Couldn't open socket: {:#}", err))
);
}
}
}
pub fn run_poll_loop(
config: Config,
socket_worker_index: usize,
request_channel_sender: RequestChannelSender,
response_channel_receiver: ResponseChannelReceiver,
listener: ::std::net::TcpListener,
opt_tls_acceptor: Option<TlsAcceptor>,
){
let poll_timeout = Duration::from_millis(
config.network.poll_timeout_milliseconds
);
let mut listener = TcpListener::from_std(listener);
let mut poll = Poll::new().expect("create poll");
let mut events = Events::with_capacity(config.network.poll_event_capacity);
poll.registry()
.register(&mut listener, Token(0), Interest::READABLE)
.unwrap();
let mut connections: ConnectionMap = HashMap::new();
let opt_tls_acceptor = opt_tls_acceptor.map(Arc::new);
let mut poll_token_counter = Token(0usize);
let mut iter_counter = 0usize;
loop {
poll.poll(&mut events, Some(poll_timeout))
.expect("failed polling");
let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
for event in events.iter(){
let token = event.token();
if token.0 == 0 {
accept_new_streams(
&mut listener,
&mut poll,
&mut connections,
valid_until,
&mut poll_token_counter,
&opt_tls_acceptor,
);
} else {
run_handshakes_and_read_requests(
socket_worker_index,
&request_channel_sender,
&mut connections,
token,
valid_until,
);
}
}
send_responses(
response_channel_receiver.drain(),
&mut connections
);
// Remove inactive connections, but not every iteration
if iter_counter % 128 == 0 {
remove_inactive_connections(&mut connections);
}
iter_counter = iter_counter.wrapping_add(1);
}
}
fn accept_new_streams(
listener: &mut TcpListener,
poll: &mut Poll,
@ -63,7 +163,7 @@ fn accept_new_streams(
/// On the stream given by poll_token, get TLS up and running if requested,
/// then read requests and pass on through channel.
pub fn run_handshake_and_read_requests(
pub fn run_handshakes_and_read_requests(
socket_worker_index: usize,
request_channel_sender: &RequestChannelSender,
connections: &mut ConnectionMap,
@ -179,103 +279,3 @@ pub fn remove_inactive_connections(
connections.shrink_to_fit();
}
pub fn run_poll_loop(
config: Config,
socket_worker_index: usize,
request_channel_sender: RequestChannelSender,
response_channel_receiver: ResponseChannelReceiver,
listener: ::std::net::TcpListener,
opt_tls_acceptor: Option<TlsAcceptor>,
){
let poll_timeout = Duration::from_millis(
config.network.poll_timeout_milliseconds
);
let mut listener = TcpListener::from_std(listener);
let mut poll = Poll::new().expect("create poll");
let mut events = Events::with_capacity(config.network.poll_event_capacity);
poll.registry()
.register(&mut listener, Token(0), Interest::READABLE)
.unwrap();
let mut connections: ConnectionMap = HashMap::new();
let opt_tls_acceptor = opt_tls_acceptor.map(Arc::new);
let mut poll_token_counter = Token(0usize);
let mut iter_counter = 0usize;
loop {
poll.poll(&mut events, Some(poll_timeout))
.expect("failed polling");
let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
for event in events.iter(){
let token = event.token();
if token.0 == 0 {
accept_new_streams(
&mut listener,
&mut poll,
&mut connections,
valid_until,
&mut poll_token_counter,
&opt_tls_acceptor,
);
} else {
run_handshake_and_read_requests(
socket_worker_index,
&request_channel_sender,
&mut connections,
token,
valid_until,
);
}
}
send_responses(
response_channel_receiver.drain(),
&mut connections
);
// Remove inactive connections, but not every iteration
if iter_counter % 128 == 0 {
remove_inactive_connections(&mut connections);
}
iter_counter = iter_counter.wrapping_add(1);
}
}
pub fn run_socket_worker(
config: Config,
socket_worker_index: usize,
socket_worker_statuses: SocketWorkerStatuses,
request_channel_sender: RequestChannelSender,
response_channel_receiver: ResponseChannelReceiver,
opt_tls_acceptor: Option<TlsAcceptor>,
){
match create_listener(config.network.address, config.network.ipv6_only){
Ok(listener) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(()));
run_poll_loop(
config,
socket_worker_index,
request_channel_sender,
response_channel_receiver,
listener,
opt_tls_acceptor
);
},
Err(err) => {
socket_worker_statuses.lock()[socket_worker_index] = Some(
Err(format!("Couldn't open socket: {:#}", err))
);
}
}
}

View file

@ -134,7 +134,7 @@ pub struct ScrapeResponse {
}
#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone)]
pub enum Request {
Announce(AnnounceRequest),
Scrape(ScrapeRequest),