diff --git a/aquatic_http/src/workers/request.rs b/aquatic_http/src/workers/request.rs index 400ae8a..099298a 100644 --- a/aquatic_http/src/workers/request.rs +++ b/aquatic_http/src/workers/request.rs @@ -177,12 +177,25 @@ pub async fn run_request_worker( })() })); + let max_peer_age = config.cleaning.max_peer_age; + let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); + + // Periodically update peer_valid_until + TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { + enclose!((peer_valid_until) move || async move { + *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); + + Some(Duration::from_secs(1)) + })() + })); + let mut handles = Vec::new(); for (_, receiver) in request_receivers.streams() { let handle = spawn_local(handle_request_stream( config.clone(), torrents.clone(), + peer_valid_until.clone(), receiver, )) .detach(); @@ -195,23 +208,16 @@ pub async fn run_request_worker( } } -async fn handle_request_stream(config: Config, torrents: Rc>, mut stream: S) -where +async fn handle_request_stream( + config: Config, + torrents: Rc>, + peer_valid_until: Rc>, + mut stream: S, +) where S: Stream + ::std::marker::Unpin, { let mut rng = SmallRng::from_entropy(); - let max_peer_age = config.cleaning.max_peer_age; - let peer_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_peer_age))); - - TimerActionRepeat::repeat(enclose!((peer_valid_until) move || { - enclose!((peer_valid_until) move || async move { - *peer_valid_until.borrow_mut() = ValidUntil::new(max_peer_age); - - Some(Duration::from_secs(1)) - })() - })); - while let Some(channel_request) = stream.next().await { match channel_request { ChannelRequest::Announce {