Merge pull request #55 from greatest-ape/2022-03-20

Fixes, dependency updates
This commit is contained in:
Joakim Frostegård 2022-03-20 21:00:11 +01:00 committed by GitHub
commit 9b85fa31d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 277 additions and 175 deletions

91
Cargo.lock generated
View file

@ -60,8 +60,9 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"aquatic_toml_config", "aquatic_toml_config",
"log",
"serde", "serde",
"simplelog", "simple_logger",
"toml", "toml",
] ]
@ -112,6 +113,7 @@ dependencies = [
"signal-hook", "signal-hook",
"slab", "slab",
"smartstring", "smartstring",
"socket2 0.4.4",
] ]
[[package]] [[package]]
@ -187,7 +189,6 @@ dependencies = [
"aquatic_toml_config", "aquatic_toml_config",
"aquatic_udp_protocol", "aquatic_udp_protocol",
"cfg-if", "cfg-if",
"chrono",
"crossbeam-channel", "crossbeam-channel",
"hex", "hex",
"log", "log",
@ -201,6 +202,7 @@ dependencies = [
"signal-hook", "signal-hook",
"slab", "slab",
"socket2 0.4.4", "socket2 0.4.4",
"time",
"tinytemplate", "tinytemplate",
] ]
@ -281,6 +283,7 @@ dependencies = [
"serde", "serde",
"signal-hook", "signal-hook",
"slab", "slab",
"socket2 0.4.4",
"tungstenite", "tungstenite",
] ]
@ -497,19 +500,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73"
dependencies = [
"libc",
"num-integer",
"num-traits",
"time",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "2.34.0" version = "2.34.0"
@ -521,6 +511,17 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "colored"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd"
dependencies = [
"atty",
"lazy_static",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "concurrent-queue" name = "concurrent-queue"
version = "1.2.2" version = "1.2.2"
@ -545,9 +546,9 @@ dependencies = [
[[package]] [[package]]
name = "cpufeatures" name = "cpufeatures"
version = "0.2.1" version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469" checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@ -604,9 +605,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.3" version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdbfe11fe19ff083c48923cf179540e8cd0535903dc35e178a1fdeeb59aef51f" checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
@ -893,9 +894,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-rustls" name = "futures-rustls"
version = "0.22.0" version = "0.22.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d383f0425d991a05e564c2f3ec150bd6dde863179c131dd60d8aa73a05434461" checksum = "e01fe9932a224b72b45336d96040aa86386d674a31d0af27d800ea7bc8ca97fe"
dependencies = [ dependencies = [
"futures-io", "futures-io",
"rustls", "rustls",
@ -1185,9 +1186,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.120" version = "0.2.121"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad5c14e80759d0939d013e6ca49930e59fc53dd8e5009132f76240c179380c09" checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f"
[[package]] [[package]]
name = "libm" name = "libm"
@ -1414,6 +1415,15 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "num_threads"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "number_prefix" name = "number_prefix"
version = "0.4.0" version = "0.4.0"
@ -1882,14 +1892,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c970da16e7c682fa90a261cf0724dee241c9f7831635ecc4e988ae8f3b505559" checksum = "c970da16e7c682fa90a261cf0724dee241c9f7831635ecc4e988ae8f3b505559"
[[package]] [[package]]
name = "simplelog" name = "simple_logger"
version = "0.11.2" version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1348164456f72ca0116e4538bdaabb0ddb622c7d9f16387c725af3e96d6001c" checksum = "c75a9723083573ace81ad0cdfc50b858aa3c366c48636edb4109d73122a0c0ea"
dependencies = [ dependencies = [
"chrono", "atty",
"colored",
"log", "log",
"termcolor", "time",
"winapi 0.3.9",
] ]
[[package]] [[package]]
@ -1984,15 +1996,6 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "termcolor"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "terminal_size" name = "terminal_size"
version = "0.1.17" version = "0.1.17"
@ -2034,14 +2037,22 @@ dependencies = [
[[package]] [[package]]
name = "time" name = "time"
version = "0.1.43" version = "0.3.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d"
dependencies = [ dependencies = [
"itoa 1.0.1",
"libc", "libc",
"winapi 0.3.9", "num_threads",
"time-macros",
] ]
[[package]]
name = "time-macros"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25eb0ca3468fc0acc11828786797f6ef9aa1555e4a211a60d64cc8e4d1be47d6"
[[package]] [[package]]
name = "tinytemplate" name = "tinytemplate"
version = "1.2.1" version = "1.2.1"

View file

@ -25,14 +25,6 @@ of sub-implementations for different protocols:
- Install Rust with [rustup](https://rustup.rs/) (stable is recommended) - Install Rust with [rustup](https://rustup.rs/) (stable is recommended)
- Install cmake with your package manager (e.g., `apt-get install cmake`) - Install cmake with your package manager (e.g., `apt-get install cmake`)
- Unless you're planning to only run `aquatic_udp`, make sure locked memory
limits are sufficient. You can do this by adding the following lines to
`/etc/security/limits.conf`, and then logging out and back in:
```
* hard memlock 512
* soft memlock 512
```
- Clone this git repository and enter it - Clone this git repository and enter it
@ -53,7 +45,16 @@ cargo build --release -p aquatic_ws
### Running ### Running
Begin by generating configuration files. They differ between protocols. Unless you're planning to only run `aquatic_udp`, make sure locked memory
limits are sufficient. You can do this by adding the following lines to
`/etc/security/limits.conf`, and then logging out and back in:
```
* hard memlock 512
* soft memlock 512
```
Generate configuration files. They come with comments and differ between protocols.
```sh ```sh
./target/release/aquatic_udp -p > "aquatic-udp-config.toml" ./target/release/aquatic_udp -p > "aquatic-udp-config.toml"

22
TODO.md
View file

@ -4,28 +4,18 @@
## Medium priority ## Medium priority
* Use thin LTO?
* Add release-debug profile?
* newer glommio versions might use SIGUSR1 internally, see glommio fe33e30 * newer glommio versions might use SIGUSR1 internally, see glommio fe33e30
* quit whole program if any thread panics * quit whole program if any thread panics
* implement socket_recv_size and ipv6_only in glommio implementations
* config: fail on unrecognized keys? * config: fail on unrecognized keys?
* Run cargo-deny in CI * Run cargo-deny in CI
* aquatic_http:
* clean out connections regularly
* handle like in aquatic_ws
* Rc<RefCell<ValidUntil>> which get set on successful request parsing and
successful response sending. Clone kept in connection slab which gets cleaned
periodically (= cancel tasks). Means that task handle will need to be stored in slab.
Config vars kill_idle_connections: bool, max_idle_connection_time. Remove keepalive.
* handle panicked/cancelled tasks?
* aquatic_ws * aquatic_ws
* remove mio implementation when glommio issues fixed * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity
* glommio * replacing indexmap_amortized / simd_json with equivalents doesn't help
* RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity * SinkExt::send maybe doesn't wake up properly?
* replacing indexmap_amortized / simd_json with equivalents doesn't help * related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ?
* SinkExt::send maybe doesn't wake up properly?
* related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ?
* extract_response_peers * extract_response_peers
* don't assume requesting peer is in list? * don't assume requesting peer is in list?

View file

@ -11,6 +11,7 @@ repository = "https://github.com/greatest-ape/aquatic"
aquatic_toml_config = "0.1.0" aquatic_toml_config = "0.1.0"
anyhow = "1" anyhow = "1"
log = "0.4"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
simplelog = "0.11" simple_logger = { version = "2", features = ["stderr"] }
toml = "0.5" toml = "0.5"

View file

@ -3,8 +3,9 @@ use std::io::Read;
use anyhow::Context; use anyhow::Context;
use aquatic_toml_config::TomlConfig; use aquatic_toml_config::TomlConfig;
use log::LevelFilter;
use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Serialize};
use simplelog::{ColorChoice, ConfigBuilder, LevelFilter, TermLogger, TerminalMode}; use simple_logger::SimpleLogger;
/// Log level. Available values are off, error, warn, info, debug and trace. /// Log level. Available values are off, error, warn, info, debug and trace.
#[derive(Debug, Clone, Copy, PartialEq, TomlConfig, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, TomlConfig, Serialize, Deserialize)]
@ -186,19 +187,11 @@ fn start_logger(log_level: LogLevel) -> ::anyhow::Result<()> {
LogLevel::Trace => LevelFilter::Trace, LogLevel::Trace => LevelFilter::Trace,
}; };
// Note: logger doesn't seem to pick up thread names. Not a huge loss. SimpleLogger::new()
let simplelog_config = ConfigBuilder::new() .with_level(level_filter)
.set_time_to_local(true) .with_utc_timestamps()
.set_location_level(LevelFilter::Off) .init()
.build(); .context("Couldn't initialize logger")?;
TermLogger::init(
level_filter,
simplelog_config,
TerminalMode::Stderr,
ColorChoice::Auto,
)
.context("Couldn't initialize logger")?;
Ok(()) Ok(())
} }

View file

@ -9,7 +9,8 @@ pub mod access_list;
pub mod cpu_pinning; pub mod cpu_pinning;
pub mod privileges; pub mod privileges;
pub type AHashIndexMap<K, V> = indexmap_amortized::IndexMap<K, V, RandomState>; /// Amortized IndexMap using AHash hasher
pub type AmortizedIndexMap<K, V> = indexmap_amortized::IndexMap<K, V, RandomState>;
/// Peer or connection valid until this instant /// Peer or connection valid until this instant
/// ///
@ -38,7 +39,7 @@ impl ValidUntil {
#[inline] #[inline]
pub fn extract_response_peers<K, V, R, F>( pub fn extract_response_peers<K, V, R, F>(
rng: &mut impl Rng, rng: &mut impl Rng,
peer_map: &AHashIndexMap<K, V>, peer_map: &AmortizedIndexMap<K, V>,
max_num_peers_to_take: usize, max_num_peers_to_take: usize,
sender_peer_map_key: K, sender_peer_map_key: K,
peer_conversion_function: F, peer_conversion_function: F,

View file

@ -6,6 +6,7 @@ edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
description = "Blazingly fast, multi-threaded HTTP BitTorrent tracker" description = "Blazingly fast, multi-threaded HTTP BitTorrent tracker"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
keywords = ["http", "server", "peer-to-peer", "torrent", "bittorrent"]
[lib] [lib]
name = "aquatic_http" name = "aquatic_http"
@ -40,6 +41,7 @@ serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" } signal-hook = { version = "0.3" }
slab = "0.4" slab = "0.4"
smartstring = "1" smartstring = "1"
socket2 = { version = "0.4", features = ["all"] }
[dev-dependencies] [dev-dependencies]
quickcheck = "1" quickcheck = "1"

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::{AHashIndexMap, CanonicalSocketAddr}; use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr};
use either::Either; use either::Either;
use smartstring::{LazyCompact, SmartString}; use smartstring::{LazyCompact, SmartString};
@ -140,7 +140,7 @@ pub struct PeerMapKey<I: Ip> {
pub ip_or_key: Either<I, SmartString<LazyCompact>>, pub ip_or_key: Either<I, SmartString<LazyCompact>>,
} }
pub type PeerMap<I> = AHashIndexMap<PeerMapKey<I>, Peer<I>>; pub type PeerMap<I> = AmortizedIndexMap<PeerMapKey<I>, Peer<I>>;
pub struct TorrentData<I: Ip> { pub struct TorrentData<I: Ip> {
pub peers: PeerMap<I>, pub peers: PeerMap<I>,
@ -159,7 +159,7 @@ impl<I: Ip> Default for TorrentData<I> {
} }
} }
pub type TorrentMap<I> = AHashIndexMap<InfoHash, TorrentData<I>>; pub type TorrentMap<I> = AmortizedIndexMap<InfoHash, TorrentData<I>>;
#[derive(Default)] #[derive(Default)]
pub struct TorrentMaps { pub struct TorrentMaps {

View file

@ -27,47 +27,6 @@ pub struct Config {
pub cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig, pub cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig,
} }
impl aquatic_cli_helpers::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
/// Only allow access over IPv6
pub ipv6_only: bool,
/// Path to TLS certificate (DER-encoded X.509)
pub tls_certificate_path: PathBuf,
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
pub tls_private_key_path: PathBuf,
/// Keep connections alive
pub keep_alive: bool,
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
/// Maximum number of requested peers to accept in announce request
pub max_peers: usize,
/// Ask peers to announce this often (seconds)
pub peer_announce_interval: usize,
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub torrent_cleaning_interval: u64,
/// Remove peers that have not announced for this long (seconds)
pub max_peer_age: u64,
}
impl Default for Config { impl Default for Config {
fn default() -> Self { fn default() -> Self {
Self { Self {
@ -85,33 +44,83 @@ impl Default for Config {
} }
} }
impl aquatic_cli_helpers::Config for Config {
fn get_log_level(&self) -> Option<LogLevel> {
Some(self.log_level)
}
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
/// Only allow access over IPv6
pub only_ipv6: bool,
/// Maximum number of pending TCP connections
pub tcp_backlog: i32,
/// Path to TLS certificate (DER-encoded X.509)
pub tls_certificate_path: PathBuf,
/// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format)
pub tls_private_key_path: PathBuf,
/// Keep connections alive after sending a response
pub keep_alive: bool,
}
impl Default for NetworkConfig { impl Default for NetworkConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
address: SocketAddr::from(([0, 0, 0, 0], 3000)), address: SocketAddr::from(([0, 0, 0, 0], 3000)),
tls_certificate_path: "".into(), tls_certificate_path: "".into(),
tls_private_key_path: "".into(), tls_private_key_path: "".into(),
ipv6_only: false, only_ipv6: false,
tcp_backlog: 1024,
keep_alive: true, keep_alive: true,
} }
} }
} }
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
/// Maximum number of requested peers to accept in announce request
pub max_peers: usize,
/// Ask peers to announce this often (seconds)
pub peer_announce_interval: usize,
}
impl Default for ProtocolConfig { impl Default for ProtocolConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
max_scrape_torrents: 255, // FIXME: what value is reasonable? max_scrape_torrents: 100,
max_peers: 50, max_peers: 50,
peer_announce_interval: 120, peer_announce_interval: 120,
} }
} }
} }
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub torrent_cleaning_interval: u64,
/// Clean connections this often (seconds)
pub connection_cleaning_interval: u64,
/// Remove peers that have not announced for this long (seconds)
pub max_peer_age: u64,
/// Remove connections that haven't seen valid requests for this long (seconds)
pub max_connection_idle: u64,
}
impl Default for CleaningConfig { impl Default for CleaningConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
torrent_cleaning_interval: 30, torrent_cleaning_interval: 30,
connection_cleaning_interval: 60,
max_peer_age: 1800, max_peer_age: 1800,
max_connection_idle: 180,
} }
} }
} }

View file

@ -1,9 +1,10 @@
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::{Duration, Instant};
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::CanonicalSocketAddr; use aquatic_common::CanonicalSocketAddr;
@ -20,6 +21,7 @@ use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders};
use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender};
use glommio::channels::shared_channel::ConnectedReceiver; use glommio::channels::shared_channel::ConnectedReceiver;
use glommio::net::{TcpListener, TcpStream}; use glommio::net::{TcpListener, TcpStream};
use glommio::task::JoinHandle;
use glommio::timer::TimerActionRepeat; use glommio::timer::TimerActionRepeat;
use glommio::{enclose, prelude::*}; use glommio::{enclose, prelude::*};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@ -44,7 +46,9 @@ struct PendingScrapeResponse {
} }
struct ConnectionReference { struct ConnectionReference {
task_handle: Option<JoinHandle<()>>,
response_sender: LocalSender<ChannelResponse>, response_sender: LocalSender<ChannelResponse>,
valid_until: ValidUntil,
} }
pub async fn run_socket_worker( pub async fn run_socket_worker(
@ -58,7 +62,7 @@ pub async fn run_socket_worker(
let config = Rc::new(config); let config = Rc::new(config);
let access_list = state.access_list; let access_list = state.access_list;
let listener = TcpListener::bind(config.network.address).expect("bind socket"); let listener = create_tcp_listener(&config);
num_bound_sockets.fetch_add(1, Ordering::SeqCst); num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap();
@ -68,18 +72,14 @@ pub async fn run_socket_worker(
let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap()); let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap());
let connection_slab = Rc::new(RefCell::new(Slab::new())); let connection_slab = Rc::new(RefCell::new(Slab::new()));
let connections_to_remove = Rc::new(RefCell::new(Vec::new()));
// Periodically remove closed connections // Periodically remove closed connections
TimerActionRepeat::repeat( TimerActionRepeat::repeat(enclose!((config, connection_slab) move || {
enclose!((config, connection_slab, connections_to_remove) move || { clean_connections(
remove_closed_connections( config.clone(),
config.clone(), connection_slab.clone(),
connection_slab.clone(), )
connections_to_remove.clone(), }));
)
}),
);
for (_, response_receiver) in response_receivers.streams() { for (_, response_receiver) in response_receivers.streams() {
spawn_local(receive_responses( spawn_local(receive_responses(
@ -95,11 +95,14 @@ pub async fn run_socket_worker(
match stream { match stream {
Ok(stream) => { Ok(stream) => {
let (response_sender, response_receiver) = new_bounded(config.request_workers); let (response_sender, response_receiver) = new_bounded(config.request_workers);
let key = connection_slab
.borrow_mut()
.insert(ConnectionReference { response_sender });
spawn_local(enclose!((config, access_list, request_senders, tls_config, connections_to_remove) async move { let key = connection_slab.borrow_mut().insert(ConnectionReference {
task_handle: None,
response_sender,
valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
});
let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move {
if let Err(err) = Connection::run( if let Err(err) = Connection::run(
config, config,
access_list, access_list,
@ -108,14 +111,19 @@ pub async fn run_socket_worker(
response_consumer_id, response_consumer_id,
ConnectionId(key), ConnectionId(key),
tls_config, tls_config,
connection_slab.clone(),
stream stream
).await { ).await {
::log::debug!("Connection::run() error: {:?}", err); ::log::debug!("Connection::run() error: {:?}", err);
} }
connections_to_remove.borrow_mut().push(key); connection_slab.borrow_mut().try_remove(key);
})) }))
.detach(); .detach();
if let Some(reference) = connection_slab.borrow_mut().get_mut(key) {
reference.task_handle = Some(task_handle);
}
} }
Err(err) => { Err(err) => {
::log::error!("accept connection: {:?}", err); ::log::error!("accept connection: {:?}", err);
@ -124,26 +132,28 @@ pub async fn run_socket_worker(
} }
} }
async fn remove_closed_connections( async fn clean_connections(
config: Rc<Config>, config: Rc<Config>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>, connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
connections_to_remove: Rc<RefCell<Vec<usize>>>,
) -> Option<Duration> { ) -> Option<Duration> {
let connections_to_remove = connections_to_remove.replace(Vec::new()); let now = Instant::now();
for connection_id in connections_to_remove { connection_slab.borrow_mut().retain(|_, reference| {
if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) { let keep = reference.valid_until.0 > now;
::log::debug!("removed connection with id {}", connection_id);
} else { if !keep {
::log::error!( if let Some(ref handle) = reference.task_handle {
"couldn't remove connection with id {}, it is not in connection slab", handle.cancel();
connection_id }
);
} }
}
keep
});
connection_slab.borrow_mut().shrink_to_fit();
Some(Duration::from_secs( Some(Duration::from_secs(
config.cleaning.torrent_cleaning_interval, config.cleaning.connection_cleaning_interval,
)) ))
} }
@ -169,6 +179,7 @@ struct Connection {
request_senders: Rc<Senders<ChannelRequest>>, request_senders: Rc<Senders<ChannelRequest>>,
response_receiver: LocalReceiver<ChannelResponse>, response_receiver: LocalReceiver<ChannelResponse>,
response_consumer_id: ConsumerId, response_consumer_id: ConsumerId,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
stream: TlsStream<TcpStream>, stream: TlsStream<TcpStream>,
peer_addr: CanonicalSocketAddr, peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId, connection_id: ConnectionId,
@ -186,6 +197,7 @@ impl Connection {
response_consumer_id: ConsumerId, response_consumer_id: ConsumerId,
connection_id: ConnectionId, connection_id: ConnectionId,
tls_config: Arc<TlsConfig>, tls_config: Arc<TlsConfig>,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
stream: TcpStream, stream: TcpStream,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let peer_addr = stream let peer_addr = stream
@ -206,6 +218,7 @@ impl Connection {
request_senders: request_senders.clone(), request_senders: request_senders.clone(),
response_receiver, response_receiver,
response_consumer_id, response_consumer_id,
connection_slab,
stream, stream,
peer_addr, peer_addr,
connection_id, connection_id,
@ -288,12 +301,19 @@ impl Connection {
} }
/// Take a request and: /// Take a request and:
/// - Update connection ValidUntil
/// - Return error response if request is not allowed /// - Return error response if request is not allowed
/// - If it is an announce request, send it to request workers an await a /// - If it is an announce request, send it to request workers an await a
/// response /// response
/// - If it is a scrape requests, split it up, pass on the parts to /// - If it is a scrape requests, split it up, pass on the parts to
/// relevant request workers and await a response /// relevant request workers and await a response
async fn handle_request(&mut self, request: Request) -> anyhow::Result<Response> { async fn handle_request(&mut self, request: Request) -> anyhow::Result<Response> {
if let Ok(mut slab) = self.connection_slab.try_borrow_mut() {
if let Some(reference) = slab.get_mut(self.connection_id.0) {
reference.valid_until = ValidUntil::new(self.config.cleaning.max_connection_idle);
}
}
match request { match request {
Request::Announce(request) => { Request::Announce(request) => {
let info_hash = request.info_hash; let info_hash = request.info_hash;
@ -464,3 +484,30 @@ impl Connection {
fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.request_workers (info_hash.0[0] as usize) % config.request_workers
} }
fn create_tcp_listener(config: &Config) -> TcpListener {
let domain = if config.network.address.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))
.expect("create socket");
if config.network.only_ipv6 {
socket.set_only_v6(true).expect("socket: set only ipv6");
}
socket.set_reuse_port(true).expect("socket: set reuse port");
socket
.bind(&config.network.address.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err));
socket
.listen(config.network.tcp_backlog)
.unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err));
unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }
}

View file

@ -5,6 +5,7 @@ authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
keywords = ["http", "benchmark", "peer-to-peer", "torrent", "bittorrent"]
[[bin]] [[bin]]
name = "aquatic_http_load_test" name = "aquatic_http_load_test"

View file

@ -7,6 +7,7 @@ license = "Apache-2.0"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
description = "HTTP BitTorrent tracker protocol" description = "HTTP BitTorrent tracker protocol"
exclude = ["target"] exclude = ["target"]
keywords = ["http", "protocol", "peer-to-peer", "torrent", "bittorrent"]
[lib] [lib]
name = "aquatic_http_protocol" name = "aquatic_http_protocol"

View file

@ -4,9 +4,10 @@ version = "0.1.0"
authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"] authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
description = "WebTorrent tracker protocol" description = "Serialize toml with comments"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
exclude = ["target"] exclude = ["target"]
keywords = ["toml"]
[lib] [lib]
name = "aquatic_toml_config" name = "aquatic_toml_config"

View file

@ -4,9 +4,10 @@ version = "0.1.0"
authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"] authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
description = "WebTorrent tracker protocol" description = "Serialize toml with comments"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
exclude = ["target"] exclude = ["target"]
keywords = ["toml"]
[lib] [lib]
proc-macro = true proc-macro = true

View file

@ -6,6 +6,7 @@ edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
description = "Blazingly fast, multi-threaded UDP BitTorrent tracker" description = "Blazingly fast, multi-threaded UDP BitTorrent tracker"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
keywords = ["udp", "server", "peer-to-peer", "torrent", "bittorrent"]
[lib] [lib]
name = "aquatic_udp" name = "aquatic_udp"
@ -24,7 +25,6 @@ aquatic_udp_protocol = "0.1.0"
anyhow = "1" anyhow = "1"
cfg-if = "1" cfg-if = "1"
chrono = "0.4"
crossbeam-channel = "0.5" crossbeam-channel = "0.5"
hex = "0.4" hex = "0.4"
log = "0.4" log = "0.4"
@ -36,6 +36,7 @@ serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" } signal-hook = { version = "0.3" }
slab = "0.4" slab = "0.4"
socket2 = { version = "0.4", features = ["all"] } socket2 = { version = "0.4", features = ["all"] }
time = { version = "0.3", features = ["formatting"] }
tinytemplate = "1" tinytemplate = "1"
[dev-dependencies] [dev-dependencies]

View file

@ -9,7 +9,7 @@ use std::time::Instant;
use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::access_list::AccessListArcSwap; use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::AHashIndexMap; use aquatic_common::AmortizedIndexMap;
use aquatic_common::CanonicalSocketAddr; use aquatic_common::CanonicalSocketAddr;
use aquatic_common::ValidUntil; use aquatic_common::ValidUntil;
use crossbeam_channel::Receiver; use crossbeam_channel::Receiver;
@ -39,7 +39,7 @@ impl<I: Ip> Peer<I> {
} }
} }
type PeerMap<I> = AHashIndexMap<PeerId, Peer<I>>; type PeerMap<I> = AmortizedIndexMap<PeerId, Peer<I>>;
struct TorrentData<I: Ip> { struct TorrentData<I: Ip> {
pub peers: PeerMap<I>, pub peers: PeerMap<I>,
@ -57,7 +57,7 @@ impl<I: Ip> Default for TorrentData<I> {
} }
} }
type TorrentMap<I> = AHashIndexMap<InfoHash, TorrentData<I>>; type TorrentMap<I> = AmortizedIndexMap<InfoHash, TorrentData<I>>;
#[derive(Default)] #[derive(Default)]
struct TorrentMaps { struct TorrentMaps {

View file

@ -16,7 +16,7 @@ use slab::Slab;
use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::create_access_list_cache;
use aquatic_common::access_list::AccessListCache; use aquatic_common::access_list::AccessListCache;
use aquatic_common::ValidUntil; use aquatic_common::ValidUntil;
use aquatic_common::{AHashIndexMap, CanonicalSocketAddr}; use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr};
use aquatic_udp_protocol::*; use aquatic_udp_protocol::*;
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};
@ -24,7 +24,7 @@ use crate::common::*;
use crate::config::Config; use crate::config::Config;
#[derive(Default)] #[derive(Default)]
pub struct ConnectionMap(AHashIndexMap<(ConnectionId, CanonicalSocketAddr), ValidUntil>); pub struct ConnectionMap(AmortizedIndexMap<(ConnectionId, CanonicalSocketAddr), ValidUntil>);
impl ConnectionMap { impl ConnectionMap {
pub fn insert( pub fn insert(
@ -66,7 +66,7 @@ impl PendingScrapeResponseSlab {
request: ScrapeRequest, request: ScrapeRequest,
valid_until: ValidUntil, valid_until: ValidUntil,
) -> impl IntoIterator<Item = (RequestWorkerIndex, PendingScrapeRequest)> { ) -> impl IntoIterator<Item = (RequestWorkerIndex, PendingScrapeRequest)> {
let mut split_requests: AHashIndexMap<RequestWorkerIndex, PendingScrapeRequest> = let mut split_requests: AmortizedIndexMap<RequestWorkerIndex, PendingScrapeRequest> =
Default::default(); Default::default();
if request.info_hashes.is_empty() { if request.info_hashes.is_empty() {

View file

@ -5,9 +5,10 @@ use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::Context; use anyhow::Context;
use chrono::Utc;
use num_format::{Locale, ToFormattedString}; use num_format::{Locale, ToFormattedString};
use serde::Serialize; use serde::Serialize;
use time::format_description::well_known::Rfc2822;
use time::OffsetDateTime;
use tinytemplate::TinyTemplate; use tinytemplate::TinyTemplate;
use crate::common::*; use crate::common::*;
@ -183,7 +184,9 @@ pub fn run_statistics_worker(config: Config, state: State) {
ipv6_active: config.network.ipv6_active(), ipv6_active: config.network.ipv6_active(),
ipv4: statistics_ipv4, ipv4: statistics_ipv4,
ipv6: statistics_ipv6, ipv6: statistics_ipv6,
last_updated: Utc::now().to_rfc2822(), last_updated: OffsetDateTime::now_utc()
.format(&Rfc2822)
.unwrap_or("(formatting error)".into()),
peer_update_interval: format!("{}", config.cleaning.torrent_cleaning_interval), peer_update_interval: format!("{}", config.cleaning.torrent_cleaning_interval),
}; };

View file

@ -5,6 +5,7 @@ authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
keywords = ["udp", "benchmark", "peer-to-peer", "torrent", "bittorrent"]
[[bin]] [[bin]]
name = "aquatic_udp_load_test" name = "aquatic_udp_load_test"

View file

@ -6,6 +6,7 @@ edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
description = "UDP BitTorrent tracker protocol" description = "UDP BitTorrent tracker protocol"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
keywords = ["udp", "protocol", "peer-to-peer", "torrent", "bittorrent"]
[dependencies] [dependencies]
byteorder = "1" byteorder = "1"

View file

@ -6,6 +6,8 @@ edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
description = "Blazingly fast, multi-threaded WebTorrent tracker" description = "Blazingly fast, multi-threaded WebTorrent tracker"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
keywords = ["webtorrent", "websocket", "peer-to-peer", "torrent", "bittorrent"]
[lib] [lib]
name = "aquatic_ws" name = "aquatic_ws"
@ -40,6 +42,7 @@ rustls-pemfile = "0.3"
serde = { version = "1", features = ["derive"] } serde = { version = "1", features = ["derive"] }
signal-hook = { version = "0.3" } signal-hook = { version = "0.3" }
slab = "0.4" slab = "0.4"
socket2 = { version = "0.4", features = ["all"] }
tungstenite = "0.17" tungstenite = "0.17"
[dev-dependencies] [dev-dependencies]

View file

@ -59,7 +59,9 @@ pub struct NetworkConfig {
/// Bind to this address /// Bind to this address
pub address: SocketAddr, pub address: SocketAddr,
/// Only allow access over IPv6 /// Only allow access over IPv6
pub ipv6_only: bool, pub only_ipv6: bool,
/// Maximum number of pending TCP connections
pub tcp_backlog: i32,
/// Path to TLS certificate (DER-encoded X.509) /// Path to TLS certificate (DER-encoded X.509)
pub tls_certificate_path: PathBuf, pub tls_certificate_path: PathBuf,
@ -74,7 +76,8 @@ impl Default for NetworkConfig {
fn default() -> Self { fn default() -> Self {
Self { Self {
address: SocketAddr::from(([0, 0, 0, 0], 3000)), address: SocketAddr::from(([0, 0, 0, 0], 3000)),
ipv6_only: false, only_ipv6: false,
tcp_backlog: 1024,
tls_certificate_path: "".into(), tls_certificate_path: "".into(),
tls_private_key_path: "".into(), tls_private_key_path: "".into(),

View file

@ -12,7 +12,7 @@ use glommio::timer::TimerActionRepeat;
use hashbrown::HashMap; use hashbrown::HashMap;
use rand::{rngs::SmallRng, SeedableRng}; use rand::{rngs::SmallRng, SeedableRng};
use aquatic_common::{extract_response_peers, AHashIndexMap}; use aquatic_common::{extract_response_peers, AmortizedIndexMap};
use aquatic_ws_protocol::*; use aquatic_ws_protocol::*;
use crate::common::*; use crate::common::*;
@ -49,7 +49,7 @@ struct Peer {
pub valid_until: ValidUntil, pub valid_until: ValidUntil,
} }
type PeerMap = AHashIndexMap<PeerId, Peer>; type PeerMap = AmortizedIndexMap<PeerId, Peer>;
struct TorrentData { struct TorrentData {
pub peers: PeerMap, pub peers: PeerMap,
@ -68,7 +68,7 @@ impl Default for TorrentData {
} }
} }
type TorrentMap = AHashIndexMap<InfoHash, TorrentData>; type TorrentMap = AmortizedIndexMap<InfoHash, TorrentData>;
#[derive(Default)] #[derive(Default)]
struct TorrentMaps { struct TorrentMaps {

View file

@ -1,6 +1,7 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::cell::RefCell; use std::cell::RefCell;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::os::unix::prelude::{FromRawFd, IntoRawFd};
use std::rc::Rc; use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc; use std::sync::Arc;
@ -56,7 +57,8 @@ pub async fn run_socket_worker(
let config = Rc::new(config); let config = Rc::new(config);
let access_list = state.access_list; let access_list = state.access_list;
let listener = TcpListener::bind(config.network.address).expect("bind socket"); let listener = create_tcp_listener(&config);
num_bound_sockets.fetch_add(1, Ordering::SeqCst); num_bound_sockets.fetch_add(1, Ordering::SeqCst);
let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap();
@ -540,3 +542,30 @@ impl ConnectionWriter {
fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> usize { fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> usize {
(info_hash.0[0] as usize) % config.request_workers (info_hash.0[0] as usize) % config.request_workers
} }
fn create_tcp_listener(config: &Config) -> TcpListener {
let domain = if config.network.address.is_ipv4() {
socket2::Domain::IPV4
} else {
socket2::Domain::IPV6
};
let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP))
.expect("create socket");
if config.network.only_ipv6 {
socket.set_only_v6(true).expect("socket: set only ipv6");
}
socket.set_reuse_port(true).expect("socket: set reuse port");
socket
.bind(&config.network.address.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err));
socket
.listen(config.network.tcp_backlog)
.unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err));
unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) }
}

View file

@ -5,6 +5,7 @@ authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2021" edition = "2021"
license = "Apache-2.0" license = "Apache-2.0"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
keywords = ["webtorrent", "websocket", "benchmark", "torrent", "bittorrent"]
[[bin]] [[bin]]
name = "aquatic_ws_load_test" name = "aquatic_ws_load_test"

View file

@ -7,6 +7,7 @@ license = "Apache-2.0"
description = "WebTorrent tracker protocol" description = "WebTorrent tracker protocol"
repository = "https://github.com/greatest-ape/aquatic" repository = "https://github.com/greatest-ape/aquatic"
exclude = ["target"] exclude = ["target"]
keywords = ["webtorrent", "protocol", "peer-to-peer", "torrent", "bittorrent"]
[lib] [lib]
name = "aquatic_ws_protocol" name = "aquatic_ws_protocol"