From f3dcc8762e6a46d9c7ab95cf6a057026bf09bb40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 2 Jul 2020 23:32:48 +0200 Subject: [PATCH] Update TODO; minor fixes to aquatic_http and aquatic_common_tcp --- TODO.md | 6 +- aquatic_common_tcp/src/config.rs | 2 - aquatic_http/src/lib/network/mod.rs | 202 +++++++++++++-------------- aquatic_http/src/lib/protocol/mod.rs | 2 +- 4 files changed, 107 insertions(+), 105 deletions(-) diff --git a/TODO.md b/TODO.md index 6a48d4a..df765cf 100644 --- a/TODO.md +++ b/TODO.md @@ -7,12 +7,16 @@ ## aquatic_http * handshake stuff - * fix overcomplicated and possibly incorrect implementation * fixed size buffer is probably bad + * test tls +* request parsing in protocol module instead of in network? Not obvious + what error return type to use then * compact peer representation in announce response: is implementation correct? * scrape info hash parsing: multiple ought to be accepted * info hashes, peer ids: check that whole deserialization and url decoding works as it should. There are suspicously many `\u{fffd}` +* established connections do not get valid_until updated, I think? Might + be the case in aquatic_ws too. * move stuff to common crate with ws: what about Request/InMessage etc? * don't overdo this diff --git a/aquatic_common_tcp/src/config.rs b/aquatic_common_tcp/src/config.rs index b70d232..8cd23e3 100644 --- a/aquatic_common_tcp/src/config.rs +++ b/aquatic_common_tcp/src/config.rs @@ -1,5 +1,3 @@ -use std::net::SocketAddr; - use serde::{Serialize, Deserialize}; diff --git a/aquatic_http/src/lib/network/mod.rs b/aquatic_http/src/lib/network/mod.rs index c3e4ee1..6265d6b 100644 --- a/aquatic_http/src/lib/network/mod.rs +++ b/aquatic_http/src/lib/network/mod.rs @@ -19,6 +19,106 @@ use crate::protocol::*; use connection::*; +pub fn run_socket_worker( + config: Config, + socket_worker_index: usize, + socket_worker_statuses: SocketWorkerStatuses, + request_channel_sender: RequestChannelSender, + response_channel_receiver: ResponseChannelReceiver, + opt_tls_acceptor: Option, +){ + match create_listener(config.network.address, config.network.ipv6_only){ + Ok(listener) => { + socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(())); + + run_poll_loop( + config, + socket_worker_index, + request_channel_sender, + response_channel_receiver, + listener, + opt_tls_acceptor + ); + }, + Err(err) => { + socket_worker_statuses.lock()[socket_worker_index] = Some( + Err(format!("Couldn't open socket: {:#}", err)) + ); + } + } +} + + +pub fn run_poll_loop( + config: Config, + socket_worker_index: usize, + request_channel_sender: RequestChannelSender, + response_channel_receiver: ResponseChannelReceiver, + listener: ::std::net::TcpListener, + opt_tls_acceptor: Option, +){ + let poll_timeout = Duration::from_millis( + config.network.poll_timeout_milliseconds + ); + + let mut listener = TcpListener::from_std(listener); + let mut poll = Poll::new().expect("create poll"); + let mut events = Events::with_capacity(config.network.poll_event_capacity); + + poll.registry() + .register(&mut listener, Token(0), Interest::READABLE) + .unwrap(); + + let mut connections: ConnectionMap = HashMap::new(); + let opt_tls_acceptor = opt_tls_acceptor.map(Arc::new); + + let mut poll_token_counter = Token(0usize); + let mut iter_counter = 0usize; + + loop { + poll.poll(&mut events, Some(poll_timeout)) + .expect("failed polling"); + + let valid_until = ValidUntil::new(config.cleaning.max_connection_age); + + for event in events.iter(){ + let token = event.token(); + + if token.0 == 0 { + accept_new_streams( + &mut listener, + &mut poll, + &mut connections, + valid_until, + &mut poll_token_counter, + &opt_tls_acceptor, + ); + } else { + run_handshakes_and_read_requests( + socket_worker_index, + &request_channel_sender, + &mut connections, + token, + valid_until, + ); + } + } + + send_responses( + response_channel_receiver.drain(), + &mut connections + ); + + // Remove inactive connections, but not every iteration + if iter_counter % 128 == 0 { + remove_inactive_connections(&mut connections); + } + + iter_counter = iter_counter.wrapping_add(1); + } +} + + fn accept_new_streams( listener: &mut TcpListener, poll: &mut Poll, @@ -63,7 +163,7 @@ fn accept_new_streams( /// On the stream given by poll_token, get TLS up and running if requested, /// then read requests and pass on through channel. -pub fn run_handshake_and_read_requests( +pub fn run_handshakes_and_read_requests( socket_worker_index: usize, request_channel_sender: &RequestChannelSender, connections: &mut ConnectionMap, @@ -179,103 +279,3 @@ pub fn remove_inactive_connections( connections.shrink_to_fit(); } - - -pub fn run_poll_loop( - config: Config, - socket_worker_index: usize, - request_channel_sender: RequestChannelSender, - response_channel_receiver: ResponseChannelReceiver, - listener: ::std::net::TcpListener, - opt_tls_acceptor: Option, -){ - let poll_timeout = Duration::from_millis( - config.network.poll_timeout_milliseconds - ); - - let mut listener = TcpListener::from_std(listener); - let mut poll = Poll::new().expect("create poll"); - let mut events = Events::with_capacity(config.network.poll_event_capacity); - - poll.registry() - .register(&mut listener, Token(0), Interest::READABLE) - .unwrap(); - - let mut connections: ConnectionMap = HashMap::new(); - let opt_tls_acceptor = opt_tls_acceptor.map(Arc::new); - - let mut poll_token_counter = Token(0usize); - let mut iter_counter = 0usize; - - loop { - poll.poll(&mut events, Some(poll_timeout)) - .expect("failed polling"); - - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - - for event in events.iter(){ - let token = event.token(); - - if token.0 == 0 { - accept_new_streams( - &mut listener, - &mut poll, - &mut connections, - valid_until, - &mut poll_token_counter, - &opt_tls_acceptor, - ); - } else { - run_handshake_and_read_requests( - socket_worker_index, - &request_channel_sender, - &mut connections, - token, - valid_until, - ); - } - } - - send_responses( - response_channel_receiver.drain(), - &mut connections - ); - - // Remove inactive connections, but not every iteration - if iter_counter % 128 == 0 { - remove_inactive_connections(&mut connections); - } - - iter_counter = iter_counter.wrapping_add(1); - } -} - - -pub fn run_socket_worker( - config: Config, - socket_worker_index: usize, - socket_worker_statuses: SocketWorkerStatuses, - request_channel_sender: RequestChannelSender, - response_channel_receiver: ResponseChannelReceiver, - opt_tls_acceptor: Option, -){ - match create_listener(config.network.address, config.network.ipv6_only){ - Ok(listener) => { - socket_worker_statuses.lock()[socket_worker_index] = Some(Ok(())); - - run_poll_loop( - config, - socket_worker_index, - request_channel_sender, - response_channel_receiver, - listener, - opt_tls_acceptor - ); - }, - Err(err) => { - socket_worker_statuses.lock()[socket_worker_index] = Some( - Err(format!("Couldn't open socket: {:#}", err)) - ); - } - } -} \ No newline at end of file diff --git a/aquatic_http/src/lib/protocol/mod.rs b/aquatic_http/src/lib/protocol/mod.rs index ef8036b..28182be 100644 --- a/aquatic_http/src/lib/protocol/mod.rs +++ b/aquatic_http/src/lib/protocol/mod.rs @@ -134,7 +134,7 @@ pub struct ScrapeResponse { } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone)] pub enum Request { Announce(AnnounceRequest), Scrape(ScrapeRequest),