diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index acb2ff5..12db332 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -39,13 +39,21 @@ pub async fn run_request_worker( })() })); + let mut handles = Vec::new(); + for (_, receiver) in request_receivers.streams() { - spawn_local(handle_request_stream( + let handle = spawn_local(handle_request_stream( config.clone(), torrents.clone(), response_senders.clone(), receiver, - )).await; + )).detach(); + + handles.push(handle); + } + + for handle in handles { + handle.await; } } @@ -82,6 +90,8 @@ async fn handle_request_stream( ), }; + ::log::debug!("preparing to send response to channel: {:?}", response); + if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { ::log::warn!("response_sender.try_send: {:?}", err); } diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 31fd483..59f37d1 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -9,11 +9,13 @@ mod common; pub mod handlers; pub mod network; +pub const SHARED_CHANNEL_SIZE: usize = 4096; + pub fn run(config: Config) -> anyhow::Result<()> { let num_peers = config.socket_workers + config.request_workers; - let request_mesh_builder = MeshBuilder::partial(num_peers, 1024); - let response_mesh_builder = MeshBuilder::partial(num_peers, 1024); + let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); + let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); let num_bound_sockets = Arc::new(AtomicUsize::new(0)); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 9f007b0..48840b6 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -51,15 +51,13 @@ pub async fn run_socket_worker( response_consumer_index, local_sender, socket.clone(), - )) - .await; + )).detach(); for (_, receiver) in response_receivers.streams().into_iter() { spawn_local(send_responses( socket.clone(), receiver.map(|(response, addr)| (response.into(), addr)), - )) - .await; + )).detach(); } send_responses(socket, local_receiver.stream()).await; @@ -90,6 +88,8 @@ async fn read_requests( Ok((amt, src)) => { let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents); + ::log::debug!("read request: {:?}", request); + match request { Ok(Request::Connect(request)) => { let connection_id = ConnectionId(rng.gen()); @@ -101,7 +101,7 @@ async fn read_requests( transaction_id: request.transaction_id, }); - local_sender.try_send((response, src)); + local_sender.try_send((response, src)).unwrap(); } Ok(Request::Announce(request)) => { if connections.contains(request.connection_id, src) { @@ -121,7 +121,7 @@ async fn read_requests( message: "Info hash not allowed".into(), }); - local_sender.try_send((response, src)); + local_sender.try_send((response, src)).unwrap(); } } } @@ -132,7 +132,7 @@ async fn read_requests( message: "Scrape requests not supported".into(), }); - local_sender.try_send((response, src)); + local_sender.try_send((response, src)).unwrap(); } } Err(err) => { @@ -150,7 +150,7 @@ async fn read_requests( message: err.right_or("Parse error").into(), }; - local_sender.try_send((response.into(), src)); + local_sender.try_send((response.into(), src)).unwrap(); } } } @@ -174,6 +174,8 @@ where while let Some((response, src)) = stream.next().await { buf.set_position(0); + + ::log::debug!("preparing to send response: {:?}", response.clone()); response .write(&mut buf, ip_version_from_ip(src.ip())) diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 134427c..41003fc 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -12,7 +12,7 @@ use config::Config; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub fn run(config: Config) -> ::anyhow::Result<()> { - mio::run(config) + glommio::run(config) } pub fn update_access_list(config: &Config, access_list: &Arc) {