Merge pull request #32 from greatest-ape/fixes-2021-11-27

improve folder/file organization; do some optimizations in aquatic_udp
This commit is contained in:
Joakim Frostegård 2021-11-27 18:52:54 +01:00 committed by GitHub
commit f98fb947d4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 22 additions and 35 deletions

View file

@ -17,8 +17,6 @@
* cargo-deny * cargo-deny
* aquatic_udp * aquatic_udp
* don't constantly recreate access list cache
* request worker: use `ValidUntil::new_with_now`
* look at proper cpu pinning (check that one thread gets bound per core) * look at proper cpu pinning (check that one thread gets bound per core)
* then consider so_attach_reuseport_cbpf * then consider so_attach_reuseport_cbpf
* what poll event capacity is actually needed? * what poll event capacity is actually needed?

View file

@ -9,11 +9,9 @@ repository = "https://github.com/greatest-ape/aquatic"
[lib] [lib]
name = "aquatic_http" name = "aquatic_http"
path = "src/lib/lib.rs"
[[bin]] [[bin]]
name = "aquatic_http" name = "aquatic_http"
path = "src/bin/main.rs"
[features] [features]
cpu-pinning = ["aquatic_common/cpu-pinning"] cpu-pinning = ["aquatic_common/cpu-pinning"]

View file

@ -15,9 +15,8 @@ use std::{
use crate::config::Config; use crate::config::Config;
mod common; mod common;
mod workers;
pub mod config; pub mod config;
mod handlers;
mod network;
pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker"; pub const APP_NAME: &str = "aquatic_http: HTTP/TLS BitTorrent tracker";
@ -86,7 +85,7 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
WorkerIndex::SocketWorker(i), WorkerIndex::SocketWorker(i),
); );
network::run_socket_worker( workers::socket::run_socket_worker(
config, config,
state, state,
tls_config, tls_config,
@ -116,7 +115,7 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
WorkerIndex::RequestWorker(i), WorkerIndex::RequestWorker(i),
); );
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) workers::request::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
.await .await
}); });

View file

@ -0,0 +1,2 @@
pub mod request;
pub mod socket;

View file

@ -24,11 +24,9 @@ use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*}; use glommio::{enclose, prelude::*};
use slab::Slab; use slab::Slab;
use crate::common::num_digits_in_usize; use crate::common::*;
use crate::config::Config; use crate::config::Config;
use super::common::*;
const INTERMEDIATE_BUFFER_SIZE: usize = 1024; const INTERMEDIATE_BUFFER_SIZE: usize = 1024;
const MAX_REQUEST_SIZE: usize = 2048; const MAX_REQUEST_SIZE: usize = 2048;

View file

@ -9,7 +9,6 @@ repository = "https://github.com/greatest-ape/aquatic"
[lib] [lib]
name = "aquatic_udp" name = "aquatic_udp"
path = "src/lib/lib.rs"
[[bin]] [[bin]]
name = "aquatic_udp" name = "aquatic_udp"

View file

@ -121,10 +121,10 @@ pub fn run_request_worker(
} }
if iter_counter % 128 == 0 { if iter_counter % 128 == 0 {
peer_valid_until = ValidUntil::new(config.cleaning.max_peer_age);
let now = Instant::now(); let now = Instant::now();
peer_valid_until = ValidUntil::new_with_now(now, config.cleaning.max_peer_age);
if now > last_cleaning + cleaning_interval { if now > last_cleaning + cleaning_interval {
torrents.clean(&config, &state.access_list); torrents.clean(&config, &state.access_list);

View file

@ -135,6 +135,7 @@ pub fn run_socket_worker(
let mut events = Events::with_capacity(config.network.poll_event_capacity); let mut events = Events::with_capacity(config.network.poll_event_capacity);
let mut connections = ConnectionMap::default(); let mut connections = ConnectionMap::default();
let mut pending_scrape_responses = PendingScrapeResponseMap::default(); let mut pending_scrape_responses = PendingScrapeResponseMap::default();
let mut access_list_cache = create_access_list_cache(&state.access_list);
let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new();
@ -166,6 +167,7 @@ pub fn run_socket_worker(
&state, &state,
&mut connections, &mut connections,
&mut pending_scrape_responses, &mut pending_scrape_responses,
&mut access_list_cache,
&mut rng, &mut rng,
&mut socket, &mut socket,
&mut buffer, &mut buffer,
@ -218,6 +220,7 @@ fn read_requests(
state: &State, state: &State,
connections: &mut ConnectionMap, connections: &mut ConnectionMap,
pending_scrape_responses: &mut PendingScrapeResponseMap, pending_scrape_responses: &mut PendingScrapeResponseMap,
access_list_cache: &mut AccessListCache,
rng: &mut StdRng, rng: &mut StdRng,
socket: &mut UdpSocket, socket: &mut UdpSocket,
buffer: &mut [u8], buffer: &mut [u8],
@ -231,8 +234,6 @@ fn read_requests(
let mut bytes_received_ipv4: usize = 0; let mut bytes_received_ipv4: usize = 0;
let mut bytes_received_ipv6 = 0; let mut bytes_received_ipv6 = 0;
let mut access_list_cache = create_access_list_cache(&state.access_list);
loop { loop {
match socket.recv_from(&mut buffer[..]) { match socket.recv_from(&mut buffer[..]) {
Ok((amt, src)) => { Ok((amt, src)) => {
@ -272,7 +273,7 @@ fn read_requests(
config, config,
connections, connections,
pending_scrape_responses, pending_scrape_responses,
&mut access_list_cache, access_list_cache,
rng, rng,
request_sender, request_sender,
local_responses, local_responses,

View file

@ -9,11 +9,9 @@ repository = "https://github.com/greatest-ape/aquatic"
[lib] [lib]
name = "aquatic_ws" name = "aquatic_ws"
path = "src/lib/lib.rs"
[[bin]] [[bin]]
name = "aquatic_ws" name = "aquatic_ws"
path = "src/bin/main.rs"
[features] [features]
default = ["with-mio"] default = ["with-mio"]

View file

@ -1,6 +1,6 @@
pub mod common; pub mod common;
pub mod handlers; pub mod request;
pub mod network; pub mod socket;
use std::{ use std::{
fs::File, fs::File,
@ -19,7 +19,7 @@ use glommio::{channels::channel_mesh::MeshBuilder, prelude::*};
const SHARED_CHANNEL_SIZE: usize = 1024; const SHARED_CHANNEL_SIZE: usize = 1024;
pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> { pub fn run(config: Config, state: State) -> anyhow::Result<()> {
let num_peers = config.socket_workers + config.request_workers; let num_peers = config.socket_workers + config.request_workers;
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE);
@ -49,7 +49,7 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
WorkerIndex::SocketWorker(i), WorkerIndex::SocketWorker(i),
); );
network::run_socket_worker( socket::run_socket_worker(
config, config,
state, state,
tls_config, tls_config,
@ -79,7 +79,7 @@ pub fn run_inner(config: Config, state: State) -> anyhow::Result<()> {
WorkerIndex::RequestWorker(i), WorkerIndex::RequestWorker(i),
); );
handlers::run_request_worker(config, state, request_mesh_builder, response_mesh_builder) request::run_request_worker(config, state, request_mesh_builder, response_mesh_builder)
.await .await
}); });

View file

@ -34,7 +34,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
cfg_if!( cfg_if!(
if #[cfg(feature = "with-glommio")] { if #[cfg(feature = "with-glommio")] {
::std::thread::spawn(move || glommio::run_inner(config, state)); ::std::thread::spawn(move || glommio::run(config, state));
} else { } else {
::std::thread::spawn(move || mio::run(config, state)); ::std::thread::spawn(move || mio::run(config, state));
} }

View file

@ -1,6 +0,0 @@
use aquatic_common::access_list::{AccessListMode, AccessListQuery};
use histogram::Histogram;
use crate::common::*;
use crate::config::Config;

View file

@ -14,8 +14,8 @@ use parking_lot::Mutex;
use privdrop::PrivDrop; use privdrop::PrivDrop;
pub mod common; pub mod common;
pub mod handlers; pub mod request;
pub mod network; pub mod socket;
use crate::config::Config; use crate::config::Config;
use common::*; use common::*;
@ -85,7 +85,7 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> {
WorkerIndex::SocketWorker(i), WorkerIndex::SocketWorker(i),
); );
network::run_socket_worker( socket::run_socket_worker(
config, config,
state, state,
i, i,
@ -144,7 +144,7 @@ pub fn start_workers(config: Config, state: State) -> anyhow::Result<()> {
WorkerIndex::RequestWorker(i), WorkerIndex::RequestWorker(i),
); );
handlers::run_request_worker( request::run_request_worker(
config, config,
state, state,
in_message_receiver, in_message_receiver,