WIP: aquatic_udp: glommio: work on handling scrape requests

This commit is contained in:
Joakim Frostegård 2021-10-23 14:05:54 +02:00
parent d0be89388c
commit 08920fce5f
11 changed files with 183 additions and 126 deletions

View file

@ -1,5 +1,6 @@
use std::cell::RefCell;
use std::io::Cursor;
use std::iter::FromIterator;
use std::net::{IpAddr, SocketAddr};
use std::rc::Rc;
use std::sync::{
@ -15,20 +16,45 @@ use glommio::enclose;
use glommio::net::UdpSocket;
use glommio::prelude::*;
use glommio::timer::TimerActionRepeat;
use hashbrown::HashMap;
use rand::prelude::{Rng, SeedableRng, StdRng};
use aquatic_udp_protocol::{IpVersion, Request, Response};
use super::common::update_access_list;
use crate::common::handlers::*;
use crate::common::network::ConnectionMap;
use crate::common::*;
use crate::config::Config;
struct PendingScrapeResponse {
pending_worker_responses: usize,
valid_until: ValidUntil,
src: SocketAddr,
stats: Vec<TorrentScrapeStatistics>,
}
#[derive(Default)]
struct PendingScrapeResponses(HashMap<TransactionId, PendingScrapeResponse>);
impl PendingScrapeResponses {
fn insert_empty(&mut self, transaction_id: TransactionId, src: SocketAddr, pending_worker_responses: usize, valid_until: ValidUntil) {
let pending = PendingScrapeResponse {
pending_worker_responses,
valid_until,
src,
stats: Vec::new(),
};
self.0.insert(transaction_id, pending);
}
}
pub async fn run_socket_worker(
config: Config,
request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>,
request_mesh_builder: MeshBuilder<(usize, ConnectedRequest, SocketAddr), Partial>,
response_mesh_builder: MeshBuilder<(ConnectedResponse, SocketAddr), Partial>,
num_bound_sockets: Arc<AtomicUsize>,
access_list: AccessList,
) {
@ -52,12 +78,15 @@ pub async fn run_socket_worker(
let response_consumer_index = response_receivers.consumer_id().unwrap();
let pending_scrape_responses = Rc::new(RefCell::new(PendingScrapeResponses::default()));
spawn_local(read_requests(
config.clone(),
request_senders,
response_consumer_index,
local_sender,
socket.clone(),
pending_scrape_responses,
access_list,
))
.detach();
@ -75,10 +104,11 @@ pub async fn run_socket_worker(
async fn read_requests(
config: Config,
request_senders: Senders<(usize, AnnounceRequest, SocketAddr)>,
request_senders: Senders<(usize, ConnectedRequest, SocketAddr)>,
response_consumer_index: usize,
local_sender: LocalSender<(Response, SocketAddr)>,
socket: Rc<UdpSocket>,
pending_scrape_responses: Rc<RefCell<PendingScrapeResponses>>,
access_list: AccessList,
) {
let mut rng = StdRng::from_entropy();
@ -150,11 +180,11 @@ async fn read_requests(
.allows(access_list_mode, &request.info_hash.0)
{
let request_consumer_index =
(request.info_hash.0[0] as usize) % config.request_workers;
calculate_request_consumer_index(&config, request.info_hash);
if let Err(err) = request_senders.try_send_to(
request_consumer_index,
(response_consumer_index, request, src),
(response_consumer_index, ConnectedRequest::Announce(request), src),
) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
@ -170,12 +200,36 @@ async fn read_requests(
}
Ok(Request::Scrape(request)) => {
if connections.borrow().contains(request.connection_id, src) {
let response = Response::Error(ErrorResponse {
transaction_id: request.transaction_id,
message: "Scrape requests not supported".into(),
});
let mut consumer_requests: HashMap<usize, ScrapeRequest> = HashMap::new();
local_sender.try_send((response, src)).unwrap();
for info_hash in request.info_hashes {
consumer_requests
.entry(calculate_request_consumer_index(&config, info_hash))
.or_insert(
ScrapeRequest {
transaction_id: request.transaction_id,
connection_id: request.connection_id,
info_hashes: Vec::new(),
}
)
.info_hashes.push(info_hash);
}
pending_scrape_responses.borrow_mut().insert_empty(
request.transaction_id,
src,
consumer_requests.len(),
connection_valid_until.borrow().to_owned(), // FIXME: use seperate ValidUntil
);
for (consumer_index, request) in consumer_requests {
if let Err(err) = request_senders.try_send_to(
consumer_index,
(response_consumer_index, ConnectedRequest::Scrape(request), src),
) {
::log::warn!("request_sender.try_send failed: {:?}", err)
}
}
}
}
Err(err) => {
@ -234,6 +288,10 @@ where
}
}
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.request_workers
}
fn ip_version_from_ip(ip: IpAddr) -> IpVersion {
match ip {
IpAddr::V4(_) => IpVersion::IPv4,