From 138ae710efa3c87c3047e546f4efcbfde0e36b6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 18 Nov 2021 22:18:45 +0100 Subject: [PATCH] udp: remove io_uring version, it is slower than mio version --- Cargo.lock | 19 - aquatic_udp/Cargo.toml | 12 +- aquatic_udp/src/lib/config.rs | 2 - aquatic_udp/src/lib/lib.rs | 31 +- .../src/lib/{network_mio.rs => network.rs} | 0 aquatic_udp/src/lib/network_uring.rs | 483 ------------------ scripts/run-aquatic-udp.sh | 14 +- 7 files changed, 10 insertions(+), 551 deletions(-) rename aquatic_udp/src/lib/{network_mio.rs => network.rs} (100%) delete mode 100644 aquatic_udp/src/lib/network_uring.rs diff --git a/Cargo.lock b/Cargo.lock index 9325c50..c2e45b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -180,12 +180,9 @@ dependencies = [ "aquatic_cli_helpers", "aquatic_common", "aquatic_udp_protocol", - "bytemuck", "cfg-if", "crossbeam-channel", "hex", - "io-uring", - "libc", "log", "mimalloc", "mio", @@ -447,12 +444,6 @@ version = "3.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9df67f7bf9ef8498769f994239c45613ef0c5899415fb58e9add412d2c1a538" -[[package]] -name = "bytemuck" -version = "1.7.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72957246c41db82b8ef88a5486143830adeb8227ef9837740bdec67724cf2c5b" - [[package]] name = "byteorder" version = "1.4.3" @@ -1137,16 +1128,6 @@ dependencies = [ "memoffset 0.5.6", ] -[[package]] -name = "io-uring" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d75829ed9377bab6c90039fe47b9d84caceb4b5063266142e21bcce6550cda8" -dependencies = [ - "bitflags", - "libc", -] - [[package]] name = "itertools" version = "0.10.1" diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 8e9ecb9..ef91567 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -15,10 +15,7 @@ path = "src/lib/lib.rs" name = "aquatic_udp" [features] -default = ["with-mio"] cpu-pinning = ["aquatic_common/cpu-pinning"] -with-mio = ["mio"] -with-io-uring = ["io-uring", "libc", "bytemuck"] [dependencies] anyhow = "1" @@ -30,6 +27,7 @@ crossbeam-channel = "0.5" hex = "0.4" log = "0.4" mimalloc = { version = "0.1", default-features = false } +mio = { version = "0.7", features = ["udp", "os-poll", "os-util"] } parking_lot = "0.11" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } @@ -37,14 +35,6 @@ slab = "0.4" signal-hook = { version = "0.3" } socket2 = { version = "0.4.1", features = ["all"] } -# mio -mio = { version = "0.7", features = ["udp", "os-poll", "os-util"], optional = true } - -# io-uring -io-uring = { version = "0.5", optional = true } -libc = { version = "0.2", optional = true } -bytemuck = { version = "1", optional = true } - [dev-dependencies] quickcheck = "1.0" quickcheck_macros = "1.0" diff --git a/aquatic_udp/src/lib/config.rs b/aquatic_udp/src/lib/config.rs index e9b3108..f26ba4e 100644 --- a/aquatic_udp/src/lib/config.rs +++ b/aquatic_udp/src/lib/config.rs @@ -54,7 +54,6 @@ pub struct NetworkConfig { /// $ sudo sysctl -w net.core.rmem_max=104857600 /// $ sudo sysctl -w net.core.rmem_default=104857600 pub socket_recv_buffer_size: usize, - #[cfg(feature = "with-mio")] pub poll_event_capacity: usize, } @@ -120,7 +119,6 @@ impl Default for NetworkConfig { address: SocketAddr::from(([0, 0, 0, 0], 3000)), only_ipv6: false, socket_recv_buffer_size: 4096 * 128, - #[cfg(feature = "with-mio")] poll_event_capacity: 4096, } } diff --git a/aquatic_udp/src/lib/lib.rs b/aquatic_udp/src/lib/lib.rs index 014e0ca..f42c377 100644 --- a/aquatic_udp/src/lib/lib.rs +++ b/aquatic_udp/src/lib/lib.rs @@ -1,10 +1,7 @@ pub mod common; pub mod config; pub mod handlers; -#[cfg(feature = "with-mio")] -pub mod network_mio; -#[cfg(feature = "with-io-uring")] -pub mod network_uring; +pub mod network; pub mod tasks; use config::Config; @@ -96,25 +93,13 @@ pub fn run(config: Config) -> ::anyhow::Result<()> { WorkerIndex::SocketWorker(i), ); - cfg_if::cfg_if!( - if #[cfg(feature = "with-io-uring")] { - network_uring::run_socket_worker( - state, - config, - request_sender, - response_receiver, - num_bound_sockets, - ); - } else { - network_mio::run_socket_worker( - state, - config, - i, - request_sender, - response_receiver, - num_bound_sockets, - ); - } + network::run_socket_worker( + state, + config, + i, + request_sender, + response_receiver, + num_bound_sockets, ); }) .with_context(|| "spawn socket worker")?; diff --git a/aquatic_udp/src/lib/network_mio.rs b/aquatic_udp/src/lib/network.rs similarity index 100% rename from aquatic_udp/src/lib/network_mio.rs rename to aquatic_udp/src/lib/network.rs diff --git a/aquatic_udp/src/lib/network_uring.rs b/aquatic_udp/src/lib/network_uring.rs deleted file mode 100644 index 72c0e28..0000000 --- a/aquatic_udp/src/lib/network_uring.rs +++ /dev/null @@ -1,483 +0,0 @@ -use std::io::Cursor; -use std::mem::size_of; -use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4}; -use std::os::unix::prelude::AsRawFd; -use std::ptr::null_mut; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use std::time::{Duration, Instant}; - -use aquatic_common::access_list::create_access_list_cache; -use aquatic_common::ValidUntil; -use crossbeam_channel::Receiver; -use io_uring::types::{Fixed, Timespec}; -use io_uring::SubmissionQueue; -use libc::{ - c_void, in6_addr, in_addr, iovec, msghdr, sockaddr_in, sockaddr_in6, AF_INET, AF_INET6, -}; -use rand::prelude::{SeedableRng, StdRng}; -use slab::Slab; - -use aquatic_udp_protocol::{Request, Response}; - -use crate::common::network::ConnectionMap; -use crate::common::network::*; -use crate::common::*; -use crate::config::Config; - -const RING_SIZE: usize = 128; -const MAX_RECV_EVENTS: usize = 1; -const MAX_SEND_EVENTS: usize = RING_SIZE - MAX_RECV_EVENTS - 1; -const NUM_BUFFERS: usize = MAX_RECV_EVENTS + MAX_SEND_EVENTS; - -#[derive(Clone, Copy, Debug, PartialEq)] -enum UserData { - RecvMsg { slab_key: usize }, - SendMsg { slab_key: usize }, - Timeout, -} - -impl UserData { - fn get_buffer_index(&self) -> usize { - match self { - Self::RecvMsg { slab_key } => *slab_key, - Self::SendMsg { slab_key } => slab_key + MAX_RECV_EVENTS, - Self::Timeout => { - unreachable!() - } - } - } -} - -impl From for UserData { - fn from(mut n: u64) -> UserData { - let bytes = bytemuck::bytes_of_mut(&mut n); - - let t = bytes[7]; - - bytes[7] = 0; - - match t { - 0 => Self::RecvMsg { - slab_key: n as usize, - }, - 1 => Self::SendMsg { - slab_key: n as usize, - }, - 2 => Self::Timeout, - _ => unreachable!(), - } - } -} - -impl Into for UserData { - fn into(self) -> u64 { - match self { - Self::RecvMsg { slab_key } => { - let mut out = slab_key as u64; - - bytemuck::bytes_of_mut(&mut out)[7] = 0; - - out - } - Self::SendMsg { slab_key } => { - let mut out = slab_key as u64; - - bytemuck::bytes_of_mut(&mut out)[7] = 1; - - out - } - Self::Timeout => { - let mut out = 0u64; - - bytemuck::bytes_of_mut(&mut out)[7] = 2; - - out - } - } - } -} - -pub fn run_socket_worker( - state: State, - config: Config, - request_sender: ConnectedRequestSender, - response_receiver: Receiver<(ConnectedResponse, SocketAddr)>, - num_bound_sockets: Arc, -) { - let mut rng = StdRng::from_entropy(); - - let socket = create_socket(&config); - - num_bound_sockets.fetch_add(1, Ordering::SeqCst); - - let mut connections = ConnectionMap::default(); - let mut pending_scrape_responses = PendingScrapeResponseMap::default(); - let mut access_list_cache = create_access_list_cache(&state.access_list); - let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); - - let mut buffers: Vec<[u8; MAX_PACKET_SIZE]> = - (0..NUM_BUFFERS).map(|_| [0; MAX_PACKET_SIZE]).collect(); - - let mut sockaddrs_ipv4 = [sockaddr_in { - sin_addr: in_addr { s_addr: 0 }, - sin_port: 0, - sin_family: AF_INET as u16, - sin_zero: Default::default(), - }; NUM_BUFFERS]; - - let mut sockaddrs_ipv6 = [sockaddr_in6 { - sin6_addr: in6_addr { s6_addr: [0; 16] }, - sin6_port: 0, - sin6_family: AF_INET6 as u16, - sin6_flowinfo: 0, - sin6_scope_id: 0, - }; NUM_BUFFERS]; - - let mut iovs: Vec = (0..NUM_BUFFERS) - .map(|i| { - let iov_base = buffers[i].as_mut_ptr() as *mut c_void; - let iov_len = MAX_PACKET_SIZE; - - iovec { iov_base, iov_len } - }) - .collect(); - - let mut msghdrs: Vec = (0..NUM_BUFFERS) - .map(|i| { - let msg_iov: *mut iovec = &mut iovs[i]; - - let mut msghdr = msghdr { - msg_name: null_mut(), - msg_namelen: 0, - msg_iov, - msg_iovlen: 1, - msg_control: null_mut(), - msg_controllen: 0, - msg_flags: 0, - }; - - if config.network.address.is_ipv4() { - let ptr: *mut sockaddr_in = &mut sockaddrs_ipv4[i]; - - msghdr.msg_name = ptr as *mut c_void; - msghdr.msg_namelen = size_of::() as u32; - } else { - let ptr: *mut sockaddr_in6 = &mut sockaddrs_ipv6[i]; - - msghdr.msg_name = ptr as *mut c_void; - msghdr.msg_namelen = size_of::() as u32; - } - - msghdr - }) - .collect(); - - let timeout = Timespec::new().nsec(100_000_000); - - let mut force_send_responses = false; - let mut timeout_queued = false; - - let mut recv_entries = Slab::with_capacity(MAX_RECV_EVENTS); - let mut send_entries = Slab::with_capacity(MAX_SEND_EVENTS); - - let mut ring = io_uring::IoUring::new(RING_SIZE as u32).unwrap(); - - let (submitter, mut sq, mut cq) = ring.split(); - - submitter.register_files(&[socket.as_raw_fd()]).unwrap(); - - let fd = Fixed(0); - - let cleaning_duration = Duration::from_secs(config.cleaning.connection_cleaning_interval); - - let mut iter_counter = 0usize; - let mut last_cleaning = Instant::now(); - - loop { - while let Some(entry) = cq.next() { - let user_data: UserData = entry.user_data().into(); - - match user_data { - UserData::RecvMsg { slab_key } => { - recv_entries.remove(slab_key); - - let result = entry.result(); - - if result < 0 { - ::log::info!( - "recvmsg error {}: {:#}", - result, - ::std::io::Error::from_raw_os_error(-result) - ); - } else if result == 0 { - ::log::info!("recvmsg error: 0 bytes read"); - } else { - let buffer_index = user_data.get_buffer_index(); - let buffer_len = result as usize; - - let src = if config.network.address.is_ipv4() { - SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::from(u32::from_be( - sockaddrs_ipv4[buffer_index].sin_addr.s_addr, - )), - u16::from_be(sockaddrs_ipv4[buffer_index].sin_port), - )) - } else { - let mut octets = sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr; - let port = u16::from_be(sockaddrs_ipv6[buffer_index].sin6_port); - - for byte in octets.iter_mut() { - *byte = u8::from_be(*byte); - } - - let ip = match octets { - // Convert IPv4-mapped address (available in std but nightly-only) - [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { - Ipv4Addr::new(a, b, c, d).into() - } - octets => Ipv6Addr::from(octets).into(), - }; - - SocketAddr::new(ip, port) - }; - - let res_request = Request::from_bytes( - &buffers[buffer_index][..buffer_len], - config.protocol.max_scrape_torrents, - ); - - // FIXME: don't run every iteration - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); - - handle_request( - &config, - &mut connections, - &mut pending_scrape_responses, - &mut access_list_cache, - &mut rng, - &request_sender, - &mut local_responses, - valid_until, - res_request, - src, - ); - } - } - UserData::SendMsg { slab_key } => { - send_entries.remove(slab_key); - - if entry.result() < 0 { - ::log::error!( - "sendmsg error: {:#}", - ::std::io::Error::from_raw_os_error(-entry.result()) - ); - } - } - UserData::Timeout => { - force_send_responses = true; - timeout_queued = false; - } - } - } - - for _ in 0..(MAX_RECV_EVENTS - recv_entries.len()) { - let slab_key = recv_entries.insert(()); - let user_data = UserData::RecvMsg { slab_key }; - - let msghdr_ptr: *mut msghdr = &mut msghdrs[user_data.get_buffer_index()]; - - let entry = io_uring::opcode::RecvMsg::new(fd, msghdr_ptr) - .build() - .user_data(user_data.into()); - - unsafe { - sq.push(&entry).unwrap(); - } - } - - for (response, addr) in response_receiver.try_iter() { - let opt_response = match response { - ConnectedResponse::Scrape(r) => pending_scrape_responses.add_and_get_finished(r), - ConnectedResponse::AnnounceIpv4(r) => Some(Response::AnnounceIpv4(r)), - ConnectedResponse::AnnounceIpv6(r) => Some(Response::AnnounceIpv6(r)), - }; - - if let Some(response) = opt_response { - local_responses.push((response, addr)); - } - } - - let space_in_send_queue = MAX_SEND_EVENTS - send_entries.len(); - - if force_send_responses | (local_responses.len() >= space_in_send_queue) { - let num_to_queue = (space_in_send_queue).min(local_responses.len()); - let drain_from_index = local_responses.len() - num_to_queue; - - for (response, addr) in local_responses.drain(drain_from_index..) { - queue_response( - &config, - &mut sq, - fd, - &mut send_entries, - &mut buffers, - &mut iovs, - &mut sockaddrs_ipv4, - &mut sockaddrs_ipv6, - &mut msghdrs, - response, - addr, - ); - } - - if local_responses.is_empty() { - force_send_responses = false; - } - } - - if !timeout_queued & !force_send_responses { - // Setup timer to occasionally force sending of responses - let user_data = UserData::Timeout; - - let timespec_ptr: *const Timespec = &timeout; - - let entry = io_uring::opcode::Timeout::new(timespec_ptr) - .build() - .user_data(user_data.into()); - - unsafe { - sq.push(&entry).unwrap(); - } - - timeout_queued = true; - } - - if iter_counter % 32 == 0 { - let now = Instant::now(); - - if now > last_cleaning + cleaning_duration { - connections.clean(); - - last_cleaning = now; - } - } - - let wait_for_num = if force_send_responses { - send_entries.len() - } else { - send_entries.len() + recv_entries.len() - }; - - sq.sync(); - - submitter.submit_and_wait(wait_for_num).unwrap(); - - sq.sync(); - cq.sync(); - - iter_counter = iter_counter.wrapping_add(1); - } -} - -fn queue_response( - config: &Config, - sq: &mut SubmissionQueue, - fd: Fixed, - send_entries: &mut Slab<()>, - buffers: &mut [[u8; MAX_PACKET_SIZE]], - iovs: &mut [iovec], - sockaddrs_ipv4: &mut [sockaddr_in], - sockaddrs_ipv6: &mut [sockaddr_in6], - msghdrs: &mut [msghdr], - response: Response, - addr: SocketAddr, -) { - let slab_key = send_entries.insert(()); - let user_data = UserData::SendMsg { slab_key }; - - let buffer_index = user_data.get_buffer_index(); - - let mut cursor = Cursor::new(&mut buffers[buffer_index][..]); - - match response.write(&mut cursor) { - Ok(()) => { - iovs[buffer_index].iov_len = cursor.position() as usize; - - if config.network.address.is_ipv4() { - let addr = if let SocketAddr::V4(addr) = addr { - addr - } else { - unreachable!(); - }; - - sockaddrs_ipv4[buffer_index].sin_addr.s_addr = u32::to_be((*addr.ip()).into()); - sockaddrs_ipv4[buffer_index].sin_port = u16::to_be(addr.port()); - } else { - let mut octets = match addr { - SocketAddr::V4(addr) => addr.ip().to_ipv6_mapped().octets(), - SocketAddr::V6(addr) => addr.ip().octets(), - }; - - for byte in octets.iter_mut() { - *byte = byte.to_be(); - } - - sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr = octets; - sockaddrs_ipv6[buffer_index].sin6_port = u16::to_be(addr.port()); - } - } - Err(err) => { - ::log::error!("Response::write error: {:?}", err); - - send_entries.remove(slab_key); - - return; - } - } - - let msghdr_ptr: *mut msghdr = &mut msghdrs[buffer_index]; - - let entry = io_uring::opcode::SendMsg::new(fd, msghdr_ptr) - .build() - .user_data(user_data.into()); - - unsafe { - sq.push(&entry).unwrap(); - } -} - -#[cfg(test)] -mod tests { - use quickcheck::Arbitrary; - use quickcheck_macros::quickcheck; - - use super::*; - - impl quickcheck::Arbitrary for UserData { - fn arbitrary(g: &mut quickcheck::Gen) -> Self { - match (bool::arbitrary(g), bool::arbitrary(g)) { - (false, b) => { - let slab_key: u32 = Arbitrary::arbitrary(g); - let slab_key = slab_key as usize; - - if b { - UserData::RecvMsg { slab_key } - } else { - UserData::SendMsg { slab_key } - } - } - _ => UserData::Timeout, - } - } - } - - #[quickcheck] - fn test_user_data_identity(a: UserData) -> bool { - let n: u64 = a.into(); - let b = UserData::from(n); - - a == b - } -} diff --git a/scripts/run-aquatic-udp.sh b/scripts/run-aquatic-udp.sh index fe99a35..256322f 100755 --- a/scripts/run-aquatic-udp.sh +++ b/scripts/run-aquatic-udp.sh @@ -2,16 +2,4 @@ . ./scripts/env-native-cpu-without-avx-512 -USAGE="Usage: $0 [mio|io-uring] [ARGS]" - -if [ "$1" != "mio" ] && [ "$1" != "glommio" ] && [ "$1" != "io-uring" ]; then - echo "$USAGE" -else - if [ "$1" = "mio" ]; then - cargo run --release --bin aquatic_udp -- "${@:2}" - elif [ "$1" = "io-uring" ]; then - cargo run --release --features "with-io-uring" --no-default-features --bin aquatic_udp -- "${@:2}" - else - echo "$USAGE" - fi -fi +cargo run --release --bin aquatic_udp -- $@