aquatic_udp: move more announce logic into common code

This commit is contained in:
Joakim Frostegård 2021-10-23 15:10:01 +02:00
parent c1dd50d0c9
commit 32113ea2f3
5 changed files with 67 additions and 91 deletions

View file

@ -2,6 +2,7 @@
* aquatic_udp glommio * aquatic_udp glommio
* privdrop * privdrop
* disable by default!
* access lists: * access lists:
* use arc-swap Cache * use arc-swap Cache

View file

@ -28,7 +28,35 @@ impl Into<Response> for ConnectedResponse {
} }
} }
pub fn handle_announce_request<I: Ip>( pub fn handle_announce_request(
config: &Config,
rng: &mut SmallRng,
torrents: &mut TorrentMaps,
request: AnnounceRequest,
src: SocketAddr,
peer_valid_until: ValidUntil,
) -> AnnounceResponse {
match convert_ipv4_mapped_ipv6(src.ip()) {
IpAddr::V4(ip) => handle_announce_request_inner(
config,
rng,
&mut torrents.ipv4,
request,
ip,
peer_valid_until,
),
IpAddr::V6(ip) => handle_announce_request_inner(
config,
rng,
&mut torrents.ipv6,
request,
ip,
peer_valid_until,
),
}
}
fn handle_announce_request_inner<I: Ip>(
config: &Config, config: &Config,
rng: &mut SmallRng, rng: &mut SmallRng,
torrents: &mut TorrentMap<I>, torrents: &mut TorrentMap<I>,

View file

@ -1,5 +1,5 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::net::{IpAddr, SocketAddr}; use std::net::SocketAddr;
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
@ -81,36 +81,27 @@ async fn handle_request_stream<S>(
})() })()
})); }));
while let Some((producer_index, request, addr)) = stream.next().await { while let Some((producer_index, request, src)) = stream.next().await {
let response = match request { let response =
ConnectedRequest::Announce(request) => ConnectedResponse::Announce(match addr.ip() { match request {
IpAddr::V4(ip) => handle_announce_request( ConnectedRequest::Announce(request) => {
ConnectedResponse::Announce(handle_announce_request(
&config, &config,
&mut rng, &mut rng,
&mut torrents.borrow_mut().ipv4,
request,
ip,
peer_valid_until.borrow().to_owned(),
),
IpAddr::V6(ip) => handle_announce_request(
&config,
&mut rng,
&mut torrents.borrow_mut().ipv6,
request,
ip,
peer_valid_until.borrow().to_owned(),
),
}),
ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape(handle_scrape_request(
&mut torrents.borrow_mut(), &mut torrents.borrow_mut(),
addr,
request, request,
)), src,
peer_valid_until.borrow().to_owned(),
))
}
ConnectedRequest::Scrape(request) => ConnectedResponse::Scrape(
handle_scrape_request(&mut torrents.borrow_mut(), src, request),
),
}; };
::log::debug!("preparing to send response to channel: {:?}", response); ::log::debug!("preparing to send response to channel: {:?}", response);
if let Err(err) = response_senders.try_send_to(producer_index, (response, addr)) { if let Err(err) = response_senders.try_send_to(producer_index, (response, src)) {
::log::warn!("response_sender.try_send: {:?}", err); ::log::warn!("response_sender.try_send: {:?}", err);
} }

View file

@ -1,6 +1,7 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::time::Duration; use std::time::Duration;
use aquatic_common::ValidUntil;
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use rand::{rngs::SmallRng, SeedableRng}; use rand::{rngs::SmallRng, SeedableRng};
@ -10,9 +11,6 @@ use crate::common::handlers::*;
use crate::config::Config; use crate::config::Config;
use crate::mio::common::*; use crate::mio::common::*;
mod announce;
use announce::handle_announce_requests;
pub fn run_request_worker( pub fn run_request_worker(
state: State, state: State,
config: Config, config: Config,
@ -66,19 +64,26 @@ pub fn run_request_worker(
{ {
let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock()); let mut torrents = opt_torrents.unwrap_or_else(|| state.torrents.lock());
handle_announce_requests( let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(announce_requests.drain(..).map(|(request, src)| {
let response = handle_announce_request(
&config, &config,
&mut torrents,
&mut small_rng, &mut small_rng,
announce_requests.drain(..), &mut torrents,
&mut responses, request,
src,
peer_valid_until,
); );
(ConnectedResponse::Announce(response), src)
}));
responses.extend(scrape_requests.drain(..).map(|(request, src)| { responses.extend(scrape_requests.drain(..).map(|(request, src)| {
( let response =
ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request)), ConnectedResponse::Scrape(handle_scrape_request(&mut torrents, src, request));
src,
) (response, src)
})); }));
} }

View file

@ -1,49 +0,0 @@
use std::net::{IpAddr, SocketAddr};
use std::vec::Drain;
use parking_lot::MutexGuard;
use rand::rngs::SmallRng;
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_udp_protocol::*;
use crate::common::handlers::handle_announce_request;
use crate::common::handlers::*;
use crate::common::*;
use crate::config::Config;
#[inline]
pub fn handle_announce_requests(
config: &Config,
torrents: &mut MutexGuard<TorrentMaps>,
rng: &mut SmallRng,
requests: Drain<(AnnounceRequest, SocketAddr)>,
responses: &mut Vec<(ConnectedResponse, SocketAddr)>,
) {
let peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
responses.extend(requests.map(|(request, src)| {
let peer_ip = convert_ipv4_mapped_ipv6(src.ip());
let response = match peer_ip {
IpAddr::V4(ip) => handle_announce_request(
config,
rng,
&mut torrents.ipv4,
request,
ip,
peer_valid_until,
),
IpAddr::V6(ip) => handle_announce_request(
config,
rng,
&mut torrents.ipv6,
request,
ip,
peer_valid_until,
),
};
(ConnectedResponse::Announce(response), src)
}));
}