From 06a716f78a14016bc2447fcb08804e3b0e2c34b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 20 Mar 2022 18:41:41 +0100 Subject: [PATCH] ws: implement network.only_ipv6 and network.tcp_backlog --- Cargo.lock | 1 + aquatic_ws/Cargo.toml | 1 + aquatic_ws/src/config.rs | 7 +++++-- aquatic_ws/src/workers/socket.rs | 31 ++++++++++++++++++++++++++++++- 4 files changed, 37 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef96f4d..78cf4aa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -281,6 +281,7 @@ dependencies = [ "serde", "signal-hook", "slab", + "socket2 0.4.4", "tungstenite", ] diff --git a/aquatic_ws/Cargo.toml b/aquatic_ws/Cargo.toml index 4f1222a..3a4bc77 100644 --- a/aquatic_ws/Cargo.toml +++ b/aquatic_ws/Cargo.toml @@ -40,6 +40,7 @@ rustls-pemfile = "0.3" serde = { version = "1", features = ["derive"] } signal-hook = { version = "0.3" } slab = "0.4" +socket2 = { version = "0.4", features = ["all"] } tungstenite = "0.17" [dev-dependencies] diff --git a/aquatic_ws/src/config.rs b/aquatic_ws/src/config.rs index 10ae6e1..95ab65a 100644 --- a/aquatic_ws/src/config.rs +++ b/aquatic_ws/src/config.rs @@ -59,7 +59,9 @@ pub struct NetworkConfig { /// Bind to this address pub address: SocketAddr, /// Only allow access over IPv6 - pub ipv6_only: bool, + pub only_ipv6: bool, + /// Maximum number of pending TCP connections + pub tcp_backlog: i32, /// Path to TLS certificate (DER-encoded X.509) pub tls_certificate_path: PathBuf, @@ -74,7 +76,8 @@ impl Default for NetworkConfig { fn default() -> Self { Self { address: SocketAddr::from(([0, 0, 0, 0], 3000)), - ipv6_only: false, + only_ipv6: false, + tcp_backlog: 1024, tls_certificate_path: "".into(), tls_private_key_path: "".into(), diff --git a/aquatic_ws/src/workers/socket.rs b/aquatic_ws/src/workers/socket.rs index 1de5d46..9f0384c 100644 --- a/aquatic_ws/src/workers/socket.rs +++ b/aquatic_ws/src/workers/socket.rs @@ -1,6 +1,7 @@ use std::borrow::Cow; use std::cell::RefCell; use std::collections::BTreeMap; +use std::os::unix::prelude::{FromRawFd, IntoRawFd}; use std::rc::Rc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -56,7 +57,8 @@ pub async fn run_socket_worker( let config = Rc::new(config); let access_list = state.access_list; - let listener = TcpListener::bind(config.network.address).expect("bind socket"); + let listener = create_tcp_listener(&config); + num_bound_sockets.fetch_add(1, Ordering::SeqCst); let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); @@ -540,3 +542,30 @@ impl ConnectionWriter { fn calculate_in_message_consumer_index(config: &Config, info_hash: InfoHash) -> usize { (info_hash.0[0] as usize) % config.request_workers } + +fn create_tcp_listener(config: &Config) -> TcpListener { + let domain = if config.network.address.is_ipv4() { + socket2::Domain::IPV4 + } else { + socket2::Domain::IPV6 + }; + + let socket = socket2::Socket::new(domain, socket2::Type::STREAM, Some(socket2::Protocol::TCP)) + .expect("create socket"); + + if config.network.only_ipv6 { + socket.set_only_v6(true).expect("socket: set only ipv6"); + } + + socket.set_reuse_port(true).expect("socket: set reuse port"); + + socket + .bind(&config.network.address.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + + socket + .listen(config.network.tcp_backlog) + .unwrap_or_else(|err| panic!("socket: listen {}: {:?}", config.network.address, err)); + + unsafe { TcpListener::from_raw_fd(socket.into_raw_fd()) } +}