aquatic_udp: move code only use in mio impl out of crate::common

This commit is contained in:
Joakim Frostegård 2021-10-21 15:35:21 +02:00
parent 81b7777a4a
commit b10f7b89e7
16 changed files with 89 additions and 71 deletions

View file

@ -5,7 +5,6 @@
* clean connections * clean connections
* update peer valid until * update peer valid until
* privdrop * privdrop
* a lot of "common code" is only used in mio implementation
* access lists: * access lists:
* use arc-swap Cache * use arc-swap Cache

View file

@ -1,13 +1,10 @@
use std::borrow::Borrow; use std::borrow::Borrow;
use std::hash::Hash; use std::hash::Hash;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::{atomic::AtomicUsize, Arc};
use std::time::Instant; use std::time::Instant;
use aquatic_common::access_list::AccessListArcSwap;
use hashbrown::HashMap; use hashbrown::HashMap;
use indexmap::IndexMap; use indexmap::IndexMap;
use parking_lot::Mutex;
pub use aquatic_common::{access_list::AccessList, ValidUntil}; pub use aquatic_common::{access_list::AccessList, ValidUntil};
pub use aquatic_udp_protocol::*; pub use aquatic_udp_protocol::*;
@ -35,25 +32,6 @@ impl Ip for Ipv6Addr {
} }
} }
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape(ScrapeRequest),
}
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape(ScrapeResponse),
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape(response) => Response::Scrape(response),
}
}
}
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum PeerStatus { pub enum PeerStatus {
Seeding, Seeding,
@ -175,31 +153,6 @@ impl TorrentMaps {
} }
} }
#[derive(Default)]
pub struct Statistics {
pub requests_received: AtomicUsize,
pub responses_sent: AtomicUsize,
pub bytes_received: AtomicUsize,
pub bytes_sent: AtomicUsize,
}
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub torrents: Arc<Mutex<TorrentMaps>>,
pub statistics: Arc<Statistics>,
}
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(AccessListArcSwap::default()),
torrents: Arc::new(Mutex::new(TorrentMaps::default())),
statistics: Arc::new(Statistics::default()),
}
}
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
#[test] #[test]

View file

@ -47,7 +47,8 @@ pub async fn run_request_worker(
torrents.clone(), torrents.clone(),
response_senders.clone(), response_senders.clone(),
receiver, receiver,
)).detach(); ))
.detach();
handles.push(handle); handles.push(handle);
} }

View file

@ -1,3 +1,8 @@
//! Work-in-progress glommio (io_uring) implementation
//!
//! * Doesn't support scrape requests
//! * Currently not faster than mio implementation
use std::sync::{atomic::AtomicUsize, Arc}; use std::sync::{atomic::AtomicUsize, Arc};
use glommio::channels::channel_mesh::MeshBuilder; use glommio::channels::channel_mesh::MeshBuilder;

View file

@ -51,13 +51,15 @@ pub async fn run_socket_worker(
response_consumer_index, response_consumer_index,
local_sender, local_sender,
socket.clone(), socket.clone(),
)).detach(); ))
.detach();
for (_, receiver) in response_receivers.streams().into_iter() { for (_, receiver) in response_receivers.streams().into_iter() {
spawn_local(send_responses( spawn_local(send_responses(
socket.clone(), socket.clone(),
receiver.map(|(response, addr)| (response.into(), addr)), receiver.map(|(response, addr)| (response.into(), addr)),
)).detach(); ))
.detach();
} }
send_responses(socket, local_receiver.stream()).await; send_responses(socket, local_receiver.stream()).await;
@ -174,7 +176,7 @@ where
while let Some((response, src)) = stream.next().await { while let Some((response, src)) = stream.next().await {
buf.set_position(0); buf.set_position(0);
::log::debug!("preparing to send response: {:?}", response.clone()); ::log::debug!("preparing to send response: {:?}", response.clone());
response response

View file

@ -1,6 +1,4 @@
use std::sync::Arc; use cfg_if::cfg_if;
use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery};
pub mod common; pub mod common;
pub mod config; pub mod config;
@ -13,7 +11,7 @@ use config::Config;
pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker"; pub const APP_NAME: &str = "aquatic_udp: UDP BitTorrent tracker";
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
cfg_if::cfg_if! { cfg_if! {
if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { if #[cfg(all(feature = "with-glommio", target_os = "linux"))] {
glommio::run(config) glommio::run(config)
} else { } else {
@ -21,14 +19,3 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
} }
} }
} }
pub fn update_access_list(config: &Config, access_list: &Arc<AccessListArcSwap>) {
match config.access_list.mode {
AccessListMode::White | AccessListMode::Black => {
if let Err(err) = access_list.update_from_path(&config.access_list.path) {
::log::error!("Update access list from path: {:?}", err);
}
}
AccessListMode::Off => {}
}
}

View file

@ -0,0 +1,49 @@
use aquatic_common::access_list::AccessListArcSwap;
use parking_lot::Mutex;
use std::sync::{atomic::AtomicUsize, Arc};
use crate::common::*;
pub enum ConnectedRequest {
Announce(AnnounceRequest),
Scrape(ScrapeRequest),
}
pub enum ConnectedResponse {
Announce(AnnounceResponse),
Scrape(ScrapeResponse),
}
impl Into<Response> for ConnectedResponse {
fn into(self) -> Response {
match self {
Self::Announce(response) => Response::Announce(response),
Self::Scrape(response) => Response::Scrape(response),
}
}
}
#[derive(Default)]
pub struct Statistics {
pub requests_received: AtomicUsize,
pub responses_sent: AtomicUsize,
pub bytes_received: AtomicUsize,
pub bytes_sent: AtomicUsize,
}
#[derive(Clone)]
pub struct State {
pub access_list: Arc<AccessListArcSwap>,
pub torrents: Arc<Mutex<TorrentMaps>>,
pub statistics: Arc<Statistics>,
}
impl Default for State {
fn default() -> Self {
Self {
access_list: Arc::new(AccessListArcSwap::default()),
torrents: Arc::new(Mutex::new(TorrentMaps::default())),
statistics: Arc::new(Statistics::default()),
}
}
}

View file

@ -10,6 +10,7 @@ use aquatic_udp_protocol::*;
use crate::common::announce::handle_announce_request; use crate::common::announce::handle_announce_request;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
use crate::mio::common::*;
#[inline] #[inline]
pub fn handle_announce_requests( pub fn handle_announce_requests(

View file

@ -6,8 +6,8 @@ use rand::{rngs::SmallRng, SeedableRng};
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use crate::common::*;
use crate::config::Config; use crate::config::Config;
use crate::mio::common::*;
mod announce; mod announce;
mod scrape; mod scrape;

View file

@ -6,6 +6,8 @@ use parking_lot::MutexGuard;
use aquatic_common::convert_ipv4_mapped_ipv6; use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use crate::mio::common::*;
use crate::common::*; use crate::common::*;
#[inline] #[inline]

View file

@ -12,13 +12,16 @@ use anyhow::Context;
use crossbeam_channel::unbounded; use crossbeam_channel::unbounded;
use privdrop::PrivDrop; use privdrop::PrivDrop;
pub mod common;
pub mod handlers; pub mod handlers;
pub mod network; pub mod network;
pub mod tasks; pub mod tasks;
use crate::common::State; use aquatic_common::access_list::{AccessListArcSwap, AccessListMode, AccessListQuery};
use crate::config::Config; use crate::config::Config;
use crate::update_access_list;
use common::State;
pub fn run(config: Config) -> ::anyhow::Result<()> { pub fn run(config: Config) -> ::anyhow::Result<()> {
let state = State::default(); let state = State::default();
@ -126,3 +129,14 @@ pub fn start_workers(
Ok(()) Ok(())
} }
pub fn update_access_list(config: &Config, access_list: &Arc<AccessListArcSwap>) {
match config.access_list.mode {
AccessListMode::White | AccessListMode::Black => {
if let Err(err) = access_list.update_from_path(&config.access_list.path) {
::log::error!("Update access list from path: {:?}", err);
}
}
AccessListMode::Off => {}
}
}

View file

@ -20,6 +20,8 @@ use crate::common::network::ConnectionMap;
use crate::common::*; use crate::common::*;
use crate::config::Config; use crate::config::Config;
use super::common::*;
pub fn run_socket_worker( pub fn run_socket_worker(
state: State, state: State,
config: Config, config: Config,

View file

@ -2,7 +2,7 @@ use std::sync::atomic::Ordering;
use histogram::Histogram; use histogram::Histogram;
use crate::common::*; use super::common::*;
use crate::config::Config; use crate::config::Config;
pub fn gather_and_print_statistics(state: &State, config: &Config) { pub fn gather_and_print_statistics(state: &State, config: &Config) {

View file

@ -8,6 +8,7 @@ use rand_distr::Pareto;
use aquatic_udp::common::*; use aquatic_udp::common::*;
use aquatic_udp::config::Config; use aquatic_udp::config::Config;
use aquatic_udp::mio::common::*;
use crate::common::*; use crate::common::*;
use crate::config::BenchConfig; use crate::config::BenchConfig;

View file

@ -15,6 +15,7 @@ use std::time::Duration;
use aquatic_cli_helpers::run_app_with_cli_and_config; use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_udp::common::*; use aquatic_udp::common::*;
use aquatic_udp::config::Config; use aquatic_udp::config::Config;
use aquatic_udp::mio::common::*;
use aquatic_udp::mio::handlers; use aquatic_udp::mio::handlers;
use config::BenchConfig; use config::BenchConfig;

View file

@ -8,6 +8,7 @@ use rand_distr::Pareto;
use aquatic_udp::common::*; use aquatic_udp::common::*;
use aquatic_udp::config::Config; use aquatic_udp::config::Config;
use aquatic_udp::mio::common::*;
use crate::common::*; use crate::common::*;
use crate::config::BenchConfig; use crate::config::BenchConfig;