aquatic_udp glommio: detach tasks, await them later; add debug logging

This commit is contained in:
Joakim Frostegård 2021-10-20 02:06:51 +02:00
parent 2cc357f2f2
commit 047d138b2b
4 changed files with 27 additions and 13 deletions

View file

@ -39,13 +39,21 @@ pub async fn run_request_worker(
})() })()
})); }));
let mut handles = Vec::new();
for (_, receiver) in request_receivers.streams() { for (_, receiver) in request_receivers.streams() {
spawn_local(handle_request_stream( let handle = spawn_local(handle_request_stream(
config.clone(), config.clone(),
torrents.clone(), torrents.clone(),
response_senders.clone(), response_senders.clone(),
receiver, receiver,
)).await; )).detach();
handles.push(handle);
}
for handle in handles {
handle.await;
} }
} }
@ -82,6 +90,8 @@ async fn handle_request_stream<S>(
), ),
}; };
::log::debug!("preparing to send response to channel: {:?}", response);
if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) {
::log::warn!("response_sender.try_send: {:?}", err); ::log::warn!("response_sender.try_send: {:?}", err);
} }

View file

@ -9,11 +9,13 @@ mod common;
pub mod handlers; pub mod handlers;
pub mod network; pub mod network;
pub const SHARED_CHANNEL_SIZE: usize = 4096;
pub fn run(config: Config) -> anyhow::Result<()> { pub fn run(config: Config) -> anyhow::Result<()> {
let num_peers = config.socket_workers + config.request_workers; let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, 1024); let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, 1024); let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
let num_bound_sockets = Arc::new(AtomicUsize::new(0)); let num_bound_sockets = Arc::new(AtomicUsize::new(0));

View file

@ -51,15 +51,13 @@ pub async fn run_socket_worker(
response_consumer_index, response_consumer_index,
local_sender, local_sender,
socket.clone(), socket.clone(),
)) )).detach();
.await;
for (_, receiver) in response_receivers.streams().into_iter() { for (_, receiver) in response_receivers.streams().into_iter() {
spawn_local(send_responses( spawn_local(send_responses(
socket.clone(), socket.clone(),
receiver.map(|(response, addr)| (response.into(), addr)), receiver.map(|(response, addr)| (response.into(), addr)),
)) )).detach();
.await;
} }
send_responses(socket, local_receiver.stream()).await; send_responses(socket, local_receiver.stream()).await;
@ -90,6 +88,8 @@ async fn read_requests(
Ok((amt, src)) => { Ok((amt, src)) => {
let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents); let request = Request::from_bytes(&buf[..amt], config.protocol.max_scrape_torrents);
::log::debug!("read request: {:?}", request);
match request { match request {
Ok(Request::Connect(request)) => { Ok(Request::Connect(request)) => {
let connection_id = ConnectionId(rng.gen()); let connection_id = ConnectionId(rng.gen());
@ -101,7 +101,7 @@ async fn read_requests(
transaction_id: request.transaction_id, transaction_id: request.transaction_id,
}); });
local_sender.try_send((response, src)); local_sender.try_send((response, src)).unwrap();
} }
Ok(Request::Announce(request)) => { Ok(Request::Announce(request)) => {
if connections.contains(request.connection_id, src) { if connections.contains(request.connection_id, src) {
@ -121,7 +121,7 @@ async fn read_requests(
message: "Info hash not allowed".into(), message: "Info hash not allowed".into(),
}); });
local_sender.try_send((response, src)); local_sender.try_send((response, src)).unwrap();
} }
} }
} }
@ -132,7 +132,7 @@ async fn read_requests(
message: "Scrape requests not supported".into(), message: "Scrape requests not supported".into(),
}); });
local_sender.try_send((response, src)); local_sender.try_send((response, src)).unwrap();
} }
} }
Err(err) => { Err(err) => {
@ -150,7 +150,7 @@ async fn read_requests(
message: err.right_or("Parse error").into(), message: err.right_or("Parse error").into(),
}; };
local_sender.try_send((response.into(), src)); local_sender.try_send((response.into(), src)).unwrap();
} }
} }
} }
@ -175,6 +175,8 @@ where
while let Some((response, src)) = stream.next().await { while let Some((response, src)) = stream.next().await {
buf.set_position(0); buf.set_position(0);
::log::debug!("preparing to send response: {:?}", response.clone());
response response
.write(&mut buf, ip_version_from_ip(src.ip())) .write(&mut buf, ip_version_from_ip(src.ip()))
.expect("write response"); .expect("write response");

View file

@ -12,7 +12,7 @@ use config::Config;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
mio::run(config) glommio::run(config)
} }
pub fn update_access_list(config: &Config, access_list: &Arc<AccessListArcSwap>) { pub fn update_access_list(config: &Config, access_list: &Arc<AccessListArcSwap>) {