diff --git a/Cargo.lock b/Cargo.lock index ef96f4d..a07380d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -60,8 +60,9 @@ version = "0.1.0" dependencies = [ "anyhow", "aquatic_toml_config", + "log", "serde", - "simplelog", + "simple_logger", "toml", ] @@ -112,6 +113,7 @@ dependencies = [ "signal-hook", "slab", "smartstring", + "socket2 0.4.4", ] [[package]] @@ -187,7 +189,6 @@ dependencies = [ "aquatic_toml_config", "aquatic_udp_protocol", "cfg-if", - "chrono", "crossbeam-channel", "hex", "log", @@ -201,6 +202,7 @@ dependencies = [ "signal-hook", "slab", "socket2 0.4.4", + "time", "tinytemplate", ] @@ -281,6 +283,7 @@ dependencies = [ "serde", "signal-hook", "slab", + "socket2 0.4.4", "tungstenite", ] @@ -497,19 +500,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chrono" -version = "0.4.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" -dependencies = [ - "libc", - "num-integer", - "num-traits", - "time", - "winapi 0.3.9", -] - [[package]] name = "clap" version = "2.34.0" @@ -521,6 +511,17 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "colored" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd" +dependencies = [ + "atty", + "lazy_static", + "winapi 0.3.9", +] + [[package]] name = "concurrent-queue" version = "1.2.2" @@ -545,9 +546,9 @@ dependencies = [ [[package]] name = "cpufeatures" -version = "0.2.1" +version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469" +checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b" dependencies = [ "libc", ] @@ -604,9 +605,9 @@ dependencies = [ [[package]] name = "crossbeam-channel" -version = "0.5.3" +version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fdbfe11fe19ff083c48923cf179540e8cd0535903dc35e178a1fdeeb59aef51f" +checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53" dependencies = [ "cfg-if", "crossbeam-utils", @@ -893,9 +894,9 @@ dependencies = [ [[package]] name = "futures-rustls" -version = "0.22.0" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d383f0425d991a05e564c2f3ec150bd6dde863179c131dd60d8aa73a05434461" +checksum = "e01fe9932a224b72b45336d96040aa86386d674a31d0af27d800ea7bc8ca97fe" dependencies = [ "futures-io", "rustls", @@ -1185,9 +1186,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.120" +version = "0.2.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad5c14e80759d0939d013e6ca49930e59fc53dd8e5009132f76240c179380c09" +checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f" [[package]] name = "libm" @@ -1414,6 +1415,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0" +dependencies = [ + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -1882,14 +1892,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c970da16e7c682fa90a261cf0724dee241c9f7831635ecc4e988ae8f3b505559" [[package]] -name = "simplelog" -version = "0.11.2" +name = "simple_logger" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1348164456f72ca0116e4538bdaabb0ddb622c7d9f16387c725af3e96d6001c" +checksum = "c75a9723083573ace81ad0cdfc50b858aa3c366c48636edb4109d73122a0c0ea" dependencies = [ - "chrono", + "atty", + "colored", "log", - "termcolor", + "time", + "winapi 0.3.9", ] [[package]] @@ -1984,15 +1996,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "termcolor" -version = "1.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" -dependencies = [ - "winapi-util", -] - [[package]] name = "terminal_size" version = "0.1.17" @@ -2034,14 +2037,22 @@ dependencies = [ [[package]] name = "time" -version = "0.1.43" +version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca8a50ef2360fbd1eeb0ecd46795a87a19024eb4b53c5dc916ca1fd95fe62438" +checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d" dependencies = [ + "itoa 1.0.1", "libc", - "winapi 0.3.9", + "num_threads", + "time-macros", ] +[[package]] +name = "time-macros" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25eb0ca3468fc0acc11828786797f6ef9aa1555e4a211a60d64cc8e4d1be47d6" + [[package]] name = "tinytemplate" version = "1.2.1" diff --git a/README.md b/README.md index 11ee3d8..c17cee7 100644 --- a/README.md +++ b/README.md @@ -25,14 +25,6 @@ of sub-implementations for different protocols: - Install Rust with [rustup](https://rustup.rs/) (stable is recommended) - Install cmake with your package manager (e.g., `apt-get install cmake`) -- Unless you're planning to only run `aquatic_udp`, make sure locked memory - limits are sufficient. You can do this by adding the following lines to - `/etc/security/limits.conf`, and then logging out and back in: - -``` -* hard memlock 512 -* soft memlock 512 -``` - Clone this git repository and enter it @@ -53,7 +45,16 @@ cargo build --release -p aquatic_ws ### Running -Begin by generating configuration files. They differ between protocols. +Unless you're planning to only run `aquatic_udp`, make sure locked memory +limits are sufficient. You can do this by adding the following lines to +`/etc/security/limits.conf`, and then logging out and back in: + +``` +* hard memlock 512 +* soft memlock 512 +``` + +Generate configuration files. They come with comments and differ between protocols. ```sh ./target/release/aquatic_udp -p > "aquatic-udp-config.toml" diff --git a/TODO.md b/TODO.md index 884dfec..f9720dc 100644 --- a/TODO.md +++ b/TODO.md @@ -4,28 +4,18 @@ ## Medium priority +* Use thin LTO? +* Add release-debug profile? * newer glommio versions might use SIGUSR1 internally, see glommio fe33e30 * quit whole program if any thread panics -* implement socket_recv_size and ipv6_only in glommio implementations * config: fail on unrecognized keys? * Run cargo-deny in CI -* aquatic_http: - * clean out connections regularly - * handle like in aquatic_ws - * Rc> which get set on successful request parsing and - successful response sending. Clone kept in connection slab which gets cleaned - periodically (= cancel tasks). Means that task handle will need to be stored in slab. - Config vars kill_idle_connections: bool, max_idle_connection_time. Remove keepalive. - * handle panicked/cancelled tasks? - * aquatic_ws - * remove mio implementation when glommio issues fixed - * glommio - * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity - * replacing indexmap_amortized / simd_json with equivalents doesn't help - * SinkExt::send maybe doesn't wake up properly? - * related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ? + * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity + * replacing indexmap_amortized / simd_json with equivalents doesn't help + * SinkExt::send maybe doesn't wake up properly? + * related to https://github.com/sdroege/async-tungstenite/blob/master/src/compat.rs#L18 ? * extract_response_peers * don't assume requesting peer is in list? diff --git a/aquatic_cli_helpers/Cargo.toml b/aquatic_cli_helpers/Cargo.toml index 853b635..8094be5 100644 --- a/aquatic_cli_helpers/Cargo.toml +++ b/aquatic_cli_helpers/Cargo.toml @@ -11,6 +11,7 @@ repository = "https://github.com/greatest-ape/aquatic" aquatic_toml_config = "0.1.0" anyhow = "1" +log = "0.4" serde = { version = "1", features = ["derive"] } -simplelog = "0.11" +simple_logger = { version = "2", features = ["stderr"] } toml = "0.5" diff --git a/aquatic_cli_helpers/src/lib.rs b/aquatic_cli_helpers/src/lib.rs index a6598c1..39b7585 100644 --- a/aquatic_cli_helpers/src/lib.rs +++ b/aquatic_cli_helpers/src/lib.rs @@ -3,8 +3,9 @@ use std::io::Read; use anyhow::Context; use aquatic_toml_config::TomlConfig; +use log::LevelFilter; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use simplelog::{ColorChoice, ConfigBuilder, LevelFilter, TermLogger, TerminalMode}; +use simple_logger::SimpleLogger; /// Log level. Available values are off, error, warn, info, debug and trace. #[derive(Debug, Clone, Copy, PartialEq, TomlConfig, Serialize, Deserialize)] @@ -186,19 +187,11 @@ fn start_logger(log_level: LogLevel) -> ::anyhow::Result<()> { LogLevel::Trace => LevelFilter::Trace, }; - // Note: logger doesn't seem to pick up thread names. Not a huge loss. - let simplelog_config = ConfigBuilder::new() - .set_time_to_local(true) - .set_location_level(LevelFilter::Off) - .build(); - - TermLogger::init( - level_filter, - simplelog_config, - TerminalMode::Stderr, - ColorChoice::Auto, - ) - .context("Couldn't initialize logger")?; + SimpleLogger::new() + .with_level(level_filter) + .with_utc_timestamps() + .init() + .context("Couldn't initialize logger")?; Ok(()) } diff --git a/aquatic_common/src/lib.rs b/aquatic_common/src/lib.rs index 17c9f24..6995382 100644 --- a/aquatic_common/src/lib.rs +++ b/aquatic_common/src/lib.rs @@ -9,7 +9,8 @@ pub mod access_list; pub mod cpu_pinning; pub mod privileges; -pub type AHashIndexMap = indexmap_amortized::IndexMap; +/// Amortized IndexMap using AHash hasher +pub type AmortizedIndexMap = indexmap_amortized::IndexMap; /// Peer or connection valid until this instant /// @@ -38,7 +39,7 @@ impl ValidUntil { #[inline] pub fn extract_response_peers( rng: &mut impl Rng, - peer_map: &AHashIndexMap, + peer_map: &AmortizedIndexMap, max_num_peers_to_take: usize, sender_peer_map_key: K, peer_conversion_function: F, diff --git a/aquatic_http/Cargo.toml b/aquatic_http/Cargo.toml index 8566590..4053c12 100644 --- a/aquatic_http/Cargo.toml +++ b/aquatic_http/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" license = "Apache-2.0" description = "Blazingly fast, multi-threaded HTTP BitTorrent tracker" repository = "https://github.com/greatest-ape/aquatic" +keywords = ["http", "server", "peer-to-peer", "torrent", "bittorrent"] [lib] name = "aquatic_http" @@ -40,6 +41,7 @@ serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" smartstring = "1" +socket2 = { version = "0.4", features = ["all"] } [dev-dependencies] quickcheck = "1" diff --git a/aquatic_http/src/common.rs b/aquatic_http/src/common.rs index c925ad5..930f865 100644 --- a/aquatic_http/src/common.rs +++ b/aquatic_http/src/common.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::time::Instant; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; -use aquatic_common::{AHashIndexMap, CanonicalSocketAddr}; +use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; use either::Either; use smartstring::{LazyCompact, SmartString}; @@ -140,7 +140,7 @@ pub struct PeerMapKey { pub ip_or_key: Either>, } -pub type PeerMap = AHashIndexMap, Peer>; +pub type PeerMap = AmortizedIndexMap, Peer>; pub struct TorrentData { pub peers: PeerMap, @@ -159,7 +159,7 @@ impl Default for TorrentData { } } -pub type TorrentMap = AHashIndexMap>; +pub type TorrentMap = AmortizedIndexMap>; #[derive(Default)] pub struct TorrentMaps { diff --git a/aquatic_http/src/config.rs b/aquatic_http/src/config.rs index 095acce..1da0542 100644 --- a/aquatic_http/src/config.rs +++ b/aquatic_http/src/config.rs @@ -27,47 +27,6 @@ pub struct Config { pub cpu_pinning: aquatic_common::cpu_pinning::CpuPinningConfig, } -impl aquatic_cli_helpers::Config for Config { - fn get_log_level(&self) -> Option { - Some(self.log_level) - } -} - -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] -pub struct NetworkConfig { - /// Bind to this address - pub address: SocketAddr, - /// Only allow access over IPv6 - pub ipv6_only: bool, - /// Path to TLS certificate (DER-encoded X.509) - pub tls_certificate_path: PathBuf, - /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) - pub tls_private_key_path: PathBuf, - /// Keep connections alive - pub keep_alive: bool, -} - -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] -pub struct ProtocolConfig { - /// Maximum number of torrents to accept in scrape request - pub max_scrape_torrents: usize, - /// Maximum number of requested peers to accept in announce request - pub max_peers: usize, - /// Ask peers to announce this often (seconds) - pub peer_announce_interval: usize, -} - -#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] -#[serde(default)] -pub struct CleaningConfig { - /// Clean peers this often (seconds) - pub torrent_cleaning_interval: u64, - /// Remove peers that have not announced for this long (seconds) - pub max_peer_age: u64, -} - impl Default for Config { fn default() -> Self { Self { @@ -85,33 +44,83 @@ impl Default for Config { } } +impl aquatic_cli_helpers::Config for Config { + fn get_log_level(&self) -> Option { + Some(self.log_level) + } +} + +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct NetworkConfig { + /// Bind to this address + pub address: SocketAddr, + /// Only allow access over IPv6 + pub only_ipv6: bool, + /// Maximum number of pending TCP connections + pub tcp_backlog: i32, + /// Path to TLS certificate (DER-encoded X.509) + pub tls_certificate_path: PathBuf, + /// Path to TLS private key (DER-encoded ASN.1 in PKCS#8 or PKCS#1 format) + pub tls_private_key_path: PathBuf, + /// Keep connections alive after sending a response + pub keep_alive: bool, +} + impl Default for NetworkConfig { fn default() -> Self { Self { address: SocketAddr::from(([0, 0, 0, 0], 3000)), tls_certificate_path: "".into(), tls_private_key_path: "".into(), - ipv6_only: false, + only_ipv6: false, + tcp_backlog: 1024, keep_alive: true, } } } +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct ProtocolConfig { + /// Maximum number of torrents to accept in scrape request + pub max_scrape_torrents: usize, + /// Maximum number of requested peers to accept in announce request + pub max_peers: usize, + /// Ask peers to announce this often (seconds) + pub peer_announce_interval: usize, +} + impl Default for ProtocolConfig { fn default() -> Self { Self { - max_scrape_torrents: 255, // FIXME: what value is reasonable? + max_scrape_torrents: 100, max_peers: 50, peer_announce_interval: 120, } } } +#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)] +#[serde(default)] +pub struct CleaningConfig { + /// Clean peers this often (seconds) + pub torrent_cleaning_interval: u64, + /// Clean connections this often (seconds) + pub connection_cleaning_interval: u64, + /// Remove peers that have not announced for this long (seconds) + pub max_peer_age: u64, + /// Remove connections that haven't seen valid requests for this long (seconds) + pub max_connection_idle: u64, +} + impl Default for CleaningConfig { fn default() -> Self { Self { torrent_cleaning_interval: 30, + connection_cleaning_interval: 60, max_peer_age: 1800, + max_connection_idle: 180, } } } diff --git a/aquatic_http/src/workers/socket.rs b/aquatic_http/src/workers/socket.rs index 493efd9..8c8b512 100644 --- a/aquatic_http/src/workers/socket.rs +++ b/aquatic_http/src/workers/socket.rs @@ -1,9 +1,10 @@ use std::cell::RefCell; use std::collections::BTreeMap; +use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, Instant}; use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache}; use aquatic_common::CanonicalSocketAddr; @@ -20,6 +21,7 @@ use glommio::channels::channel_mesh::{MeshBuilder, Partial, Role, Senders}; use glommio::channels::local_channel::{new_bounded, LocalReceiver, LocalSender}; use glommio::channels::shared_channel::ConnectedReceiver; use glommio::net::{TcpListener, TcpStream}; +use glommio::task::JoinHandle; use glommio::timer::TimerActionRepeat; use glommio::{enclose, prelude::*}; use once_cell::sync::Lazy; @@ -44,7 +46,9 @@ struct PendingScrapeResponse { } struct ConnectionReference { + task_handle: Option>, response_sender: LocalSender, + valid_until: ValidUntil, } pub async fn run_socket_worker( @@ -58,7 +62,7 @@ pub async fn run_socket_worker( let config = Rc::new(config); let access_list = state.access_list; - let listener = TcpListener::bind(config.network.address).expect("bind socket"); + let listener = create_tcp_listener(&config); num_bound_sockets.fetch_add(1, Ordering::SeqCst); let (request_senders, _) = request_mesh_builder.join(Role::Producer).await.unwrap(); @@ -68,18 +72,14 @@ pub async fn run_socket_worker( let response_consumer_id = ConsumerId(response_receivers.consumer_id().unwrap()); let connection_slab = Rc::new(RefCell::new(Slab::new())); - let connections_to_remove = Rc::new(RefCell::new(Vec::new())); // Periodically remove closed connections - TimerActionRepeat::repeat( - enclose!((config, connection_slab, connections_to_remove) move || { - remove_closed_connections( - config.clone(), - connection_slab.clone(), - connections_to_remove.clone(), - ) - }), - ); + TimerActionRepeat::repeat(enclose!((config, connection_slab) move || { + clean_connections( + config.clone(), + connection_slab.clone(), + ) + })); for (_, response_receiver) in response_receivers.streams() { spawn_local(receive_responses( @@ -95,11 +95,14 @@ pub async fn run_socket_worker( match stream { Ok(stream) => { let (response_sender, response_receiver) = new_bounded(config.request_workers); - let key = connection_slab - .borrow_mut() - .insert(ConnectionReference { response_sender }); - spawn_local(enclose!((config, access_list, request_senders, tls_config, connections_to_remove) async move { + let key = connection_slab.borrow_mut().insert(ConnectionReference { + task_handle: None, + response_sender, + valid_until: ValidUntil::new(config.cleaning.max_connection_idle), + }); + + let task_handle = spawn_local(enclose!((config, access_list, request_senders, tls_config, connection_slab) async move { if let Err(err) = Connection::run( config, access_list, @@ -108,14 +111,19 @@ pub async fn run_socket_worker( response_consumer_id, ConnectionId(key), tls_config, + connection_slab.clone(), stream ).await { ::log::debug!("Connection::run() error: {:?}", err); } - connections_to_remove.borrow_mut().push(key); + connection_slab.borrow_mut().try_remove(key); })) .detach(); + + if let Some(reference) = connection_slab.borrow_mut().get_mut(key) { + reference.task_handle = Some(task_handle); + } } Err(err) => { ::log::error!("accept connection: {:?}", err); @@ -124,26 +132,28 @@ pub async fn run_socket_worker( } } -async fn remove_closed_connections( +async fn clean_connections( config: Rc, connection_slab: Rc>>, - connections_to_remove: Rc>>, ) -> Option { - let connections_to_remove = connections_to_remove.replace(Vec::new()); + let now = Instant::now(); - for connection_id in connections_to_remove { - if let Some(_) = connection_slab.borrow_mut().try_remove(connection_id) { - ::log::debug!("removed connection with id {}", connection_id); - } else { - ::log::error!( - "couldn't remove connection with id {}, it is not in connection slab", - connection_id - ); + connection_slab.borrow_mut().retain(|_, reference| { + let keep = reference.valid_until.0 > now; + + if !keep { + if let Some(ref handle) = reference.task_handle { + handle.cancel(); + } } - } + + keep + }); + + connection_slab.borrow_mut().shrink_to_fit(); Some(Duration::from_secs( - config.cleaning.torrent_cleaning_interval, + config.cleaning.connection_cleaning_interval, )) } @@ -169,6 +179,7 @@ struct Connection { request_senders: Rc>, response_receiver: LocalReceiver, response_consumer_id: ConsumerId, + connection_slab: Rc>>, stream: TlsStream, peer_addr: CanonicalSocketAddr, connection_id: ConnectionId, @@ -186,6 +197,7 @@ impl Connection { response_consumer_id: ConsumerId, connection_id: ConnectionId, tls_config: Arc, + connection_slab: Rc>>, stream: TcpStream, ) -> anyhow::Result<()> { let peer_addr = stream @@ -206,6 +218,7 @@ impl Connection { request_senders: request_senders.clone(), response_receiver, response_consumer_id, + connection_slab, stream, peer_addr, connection_id, @@ -288,12 +301,19 @@ impl Connection { } /// Take a request and: + /// - Update connection ValidUntil /// - Return error response if request is not allowed /// - If it is an announce request, send it to request workers an await a /// response /// - If it is a scrape requests, split it up, pass on the parts to /// relevant request workers and await a response async fn handle_request(&mut self, request: Request) -> anyhow::Result { + if let Ok(mut slab) = self.connection_slab.try_borrow_mut() { + if let Some(reference) = slab.get_mut(self.connection_id.0) { + reference.valid_until = ValidUntil::new(self.config.cleaning.max_connection_idle); + } + } + match request { Request::Announce(request) => { let info_hash = request.info_hash; @@ -464,3 +484,30 @@ impl Connection { fn calculate_request_consumer_index(config: &Config, info_hash: InfoHash) -> usize { (info_hash.0[0] as usize) % config.request_workers } + +fn create_tcp_listener(config: &Config) -> TcpListener { + let domain = if config.network.address.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }; + + let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) + .expect("create socket"); + + if config.network.only_ipv6 { + socket.set_only_v6(true).expect("socket: set only ipv6"); + } + + socket.set_reuse_port(true).expect("socket: set reuse port"); + + socket + .bind(&config.network.address.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + + socket + .listen(config.network.tcp_backlog) + .unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err)); + + unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) } +} diff --git a/aquatic_http_load_test/Cargo.toml b/aquatic_http_load_test/Cargo.toml index 34352a6..f5413f8 100644 --- a/aquatic_http_load_test/Cargo.toml +++ b/aquatic_http_load_test/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Joakim Frostegård "] edition = "2021" license = "Apache-2.0" repository = "https://github.com/greatest-ape/aquatic" +keywords = ["http", "benchmark", "peer-to-peer", "torrent", "bittorrent"] [[bin]] name = "aquatic_http_load_test" diff --git a/aquatic_http_protocol/Cargo.toml b/aquatic_http_protocol/Cargo.toml index 9bda1de..230f719 100644 --- a/aquatic_http_protocol/Cargo.toml +++ b/aquatic_http_protocol/Cargo.toml @@ -7,6 +7,7 @@ license = "Apache-2.0" repository = "https://github.com/greatest-ape/aquatic" description = "HTTP BitTorrent tracker protocol" exclude = ["target"] +keywords = ["http", "protocol", "peer-to-peer", "torrent", "bittorrent"] [lib] name = "aquatic_http_protocol" diff --git a/aquatic_toml_config/Cargo.toml b/aquatic_toml_config/Cargo.toml index 68d17bf..3c9e9fb 100644 --- a/aquatic_toml_config/Cargo.toml +++ b/aquatic_toml_config/Cargo.toml @@ -4,9 +4,10 @@ version = "0.1.0" authors = ["Joakim Frostegård "] edition = "2021" license = "Apache-2.0" -description = "WebTorrent tracker protocol" +description = "Serialize toml with comments" repository = "https://github.com/greatest-ape/aquatic" exclude = ["target"] +keywords = ["toml"] [lib] name = "aquatic_toml_config" diff --git a/aquatic_toml_config_derive/Cargo.toml b/aquatic_toml_config_derive/Cargo.toml index a3c1b3d..3e55d89 100644 --- a/aquatic_toml_config_derive/Cargo.toml +++ b/aquatic_toml_config_derive/Cargo.toml @@ -4,9 +4,10 @@ version = "0.1.0" authors = ["Joakim Frostegård "] edition = "2021" license = "Apache-2.0" -description = "WebTorrent tracker protocol" +description = "Serialize toml with comments" repository = "https://github.com/greatest-ape/aquatic" exclude = ["target"] +keywords = ["toml"] [lib] proc-macro = true diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 7acb60d..c986152 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" license = "Apache-2.0" description = "Blazingly fast, multi-threaded UDP BitTorrent tracker" repository = "https://github.com/greatest-ape/aquatic" +keywords = ["udp", "server", "peer-to-peer", "torrent", "bittorrent"] [lib] name = "aquatic_udp" @@ -24,7 +25,6 @@ aquatic_udp_protocol = "0.1.0" anyhow = "1" cfg-if = "1" -chrono = "0.4" crossbeam-channel = "0.5" hex = "0.4" log = "0.4" @@ -36,6 +36,7 @@ serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" socket2 = { version = "0.4", features = ["all"] } +time = { version = "0.3", features = ["formatting"] } tinytemplate = "1" [dev-dependencies] diff --git a/aquatic_udp/src/workers/request.rs b/aquatic_udp/src/workers/request.rs index 4c013eb..447fab4 100644 --- a/aquatic_udp/src/workers/request.rs +++ b/aquatic_udp/src/workers/request.rs @@ -9,7 +9,7 @@ use std::time::Instant; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListArcSwap; -use aquatic_common::AHashIndexMap; +use aquatic_common::AmortizedIndexMap; use aquatic_common::CanonicalSocketAddr; use aquatic_common::ValidUntil; use crossbeam_channel::Receiver; @@ -39,7 +39,7 @@ impl Peer { } } -type PeerMap = AHashIndexMap>; +type PeerMap = AmortizedIndexMap>; struct TorrentData { pub peers: PeerMap, @@ -57,7 +57,7 @@ impl Default for TorrentData { } } -type TorrentMap = AHashIndexMap>; +type TorrentMap = AmortizedIndexMap>; #[derive(Default)] struct TorrentMaps { diff --git a/aquatic_udp/src/workers/socket.rs b/aquatic_udp/src/workers/socket.rs index f06a32c..6bf3487 100644 --- a/aquatic_udp/src/workers/socket.rs +++ b/aquatic_udp/src/workers/socket.rs @@ -16,7 +16,7 @@ use slab::Slab; use aquatic_common::access_list::create_access_list_cache; use aquatic_common::access_list::AccessListCache; use aquatic_common::ValidUntil; -use aquatic_common::{AHashIndexMap, CanonicalSocketAddr}; +use aquatic_common::{AmortizedIndexMap, CanonicalSocketAddr}; use aquatic_udp_protocol::*; use socket2::{Domain, Protocol, Socket, Type}; @@ -24,7 +24,7 @@ use crate::common::*; use crate::config::Config; #[derive(Default)] -pub struct ConnectionMap(AHashIndexMap<(ConnectionId, CanonicalSocketAddr), ValidUntil>); +pub struct ConnectionMap(AmortizedIndexMap<(ConnectionId, CanonicalSocketAddr), ValidUntil>); impl ConnectionMap { pub fn insert( @@ -66,7 +66,7 @@ impl PendingScrapeResponseSlab { request: ScrapeRequest, valid_until: ValidUntil, ) -> impl IntoIterator { - let mut split_requests: AHashIndexMap = + let mut split_requests: AmortizedIndexMap = Default::default(); if request.info_hashes.is_empty() { diff --git a/aquatic_udp/src/workers/statistics.rs b/aquatic_udp/src/workers/statistics.rs index 4477662..20ec0e7 100644 --- a/aquatic_udp/src/workers/statistics.rs +++ b/aquatic_udp/src/workers/statistics.rs @@ -5,9 +5,10 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use anyhow::Context; -use chrono::Utc; use num_format::{Locale, ToFormattedString}; use serde::Serialize; +use time::format_description::well_known::Rfc2822; +use time::OffsetDateTime; use tinytemplate::TinyTemplate; use crate::common::*; @@ -183,7 +184,9 @@ pub fn run_statistics_worker(config: Config, state: State) { ipv6_active: config.network.ipv6_active(), ipv4: statistics_ipv4, ipv6: statistics_ipv6, - last_updated: Utc::now().to_rfc2822(), + last_updated: OffsetDateTime::now_utc() + .format(&Rfc2822) + .unwrap_or("(formatting error)".into()), peer_update_interval: format!("{}", config.cleaning.torrent_cleaning_interval), }; diff --git a/aquatic_udp_load_test/Cargo.toml b/aquatic_udp_load_test/Cargo.toml index 5656069..ae803c2 100644 --- a/aquatic_udp_load_test/Cargo.toml +++ b/aquatic_udp_load_test/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Joakim Frostegård "] edition = "2021" license = "Apache-2.0" repository = "https://github.com/greatest-ape/aquatic" +keywords = ["udp", "benchmark", "peer-to-peer", "torrent", "bittorrent"] [[bin]] name = "aquatic_udp_load_test" diff --git a/aquatic_udp_protocol/Cargo.toml b/aquatic_udp_protocol/Cargo.toml index 9ee5ca5..67dda3b 100644 --- a/aquatic_udp_protocol/Cargo.toml +++ b/aquatic_udp_protocol/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" license = "Apache-2.0" description = "UDP BitTorrent tracker protocol" repository = "https://github.com/greatest-ape/aquatic" +keywords = ["udp", "protocol", "peer-to-peer", "torrent", "bittorrent"] [dependencies] byteorder = "1" diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 4f1222a..01665d6 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -6,6 +6,8 @@ edition = "2021" license = "Apache-2.0" description = "Blazingly fast, multi-threaded WebTorrent tracker" repository = "https://github.com/greatest-ape/aquatic" +keywords = ["webtorrent", "websocket", "peer-to-peer", "torrent", "bittorrent"] + [lib] name = "aquatic_ws" @@ -40,6 +42,7 @@ rustls-pemfile = "0.3" serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" +socket2 = { version = "0.4", features = ["all"] } tungstenite = "0.17" [dev-dependencies] diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 10ae6e1..95ab65a 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -59,7 +59,9 @@ pub struct NetworkConfig { /// Bind to this address pub address: SocketAddr, /// Only allow access over IPv6 - pub ipv6_only: bool, + pub only_ipv6: bool, + /// Maximum number of pending TCP connections + pub tcp_backlog: i32, /// Path to TLS certificate (DER-encoded X.509) pub tls_certificate_path: PathBuf, @@ -74,7 +76,8 @@ impl Default for NetworkConfig { fn default() -> Self { Self { address: SocketAddr::from(([0, 0, 0, 0], 3000)), - ipv6_only: false, + only_ipv6: false, + tcp_backlog: 1024, tls_certificate_path: "".into(), tls_private_key_path: "".into(), diff --git a/aquatic_ws/src/workers/request.rs b/aquatic_ws/src/workers/request.rs index c9adc36..693ac7d 100644 --- a/aquatic_ws/src/workers/request.rs +++ b/aquatic_ws/src/workers/request.rs @@ -12,7 +12,7 @@ use glommio::timer::TimerActionRepeat; use hashbrown::HashMap; use rand::{rngs::SmallRng, SeedableRng}; -use aquatic_common::{extract_response_peers, AHashIndexMap}; +use aquatic_common::{extract_response_peers, AmortizedIndexMap}; use aquatic_ws_protocol::*; use crate::common::*; @@ -49,7 +49,7 @@ struct Peer { pub valid_until: ValidUntil, } -type PeerMap = AHashIndexMap; +type PeerMap = AmortizedIndexMap; struct TorrentData { pub peers: PeerMap, @@ -68,7 +68,7 @@ impl Default for TorrentData { } } -type TorrentMap = AHashIndexMap; +type TorrentMap = AmortizedIndexMap; #[derive(Default)] struct TorrentMaps { diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 1de5d46..9f0384c 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::collections::BTreeMap; +use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -56,7 +57,8 @@ pub async fn run_socket_worker( let config = Rc::new(config); let access_list = state.access_list; - let listener = TcpListener::bind(config.network.address).expect("bind socket"); + let listener = create_tcp_listener(&config); + num_bound_sockets.fetch_add(1, Ordering::SeqCst); let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); @@ -540,3 +542,30 @@ impl ConnectionWriter { fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> usize { (info_hash.0[0] as usize) % config.request_workers } + +fn create_tcp_listener(config: &Config) -> TcpListener { + let domain = if config.network.address.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }; + + let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) + .expect("create socket"); + + if config.network.only_ipv6 { + socket.set_only_v6(true).expect("socket: set only ipv6"); + } + + socket.set_reuse_port(true).expect("socket: set reuse port"); + + socket + .bind(&config.network.address.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + + socket + .listen(config.network.tcp_backlog) + .unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err)); + + unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) } +} diff --git a/aquatic_ws_load_test/Cargo.toml b/aquatic_ws_load_test/Cargo.toml index b1c92e1..af0528e 100644 --- a/aquatic_ws_load_test/Cargo.toml +++ b/aquatic_ws_load_test/Cargo.toml @@ -5,6 +5,7 @@ authors = ["Joakim Frostegård "] edition = "2021" license = "Apache-2.0" repository = "https://github.com/greatest-ape/aquatic" +keywords = ["webtorrent", "websocket", "benchmark", "torrent", "bittorrent"] [[bin]] name = "aquatic_ws_load_test" diff --git a/aquatic_ws_protocol/Cargo.toml b/aquatic_ws_protocol/Cargo.toml index 18d4162..79be691 100644 --- a/aquatic_ws_protocol/Cargo.toml +++ b/aquatic_ws_protocol/Cargo.toml @@ -7,6 +7,7 @@ license = "Apache-2.0" description = "WebTorrent tracker protocol" repository = "https://github.com/greatest-ape/aquatic" exclude = ["target"] +keywords = ["webtorrent", "protocol", "peer-to-peer", "torrent", "bittorrent"] [lib] name = "aquatic_ws_protocol"