udp: add io-uring implementation

This commit is contained in:
Joakim Frostegård 2021-11-14 02:47:15 +01:00
parent efbf51ba19
commit 18635bf26c
9 changed files with 37 additions and 19 deletions

View file

@ -18,7 +18,8 @@ name = "aquatic_udp"
default = ["with-mio"] default = ["with-mio"]
cpu-pinning = ["aquatic_common/cpu-pinning"] cpu-pinning = ["aquatic_common/cpu-pinning"]
with-glommio = ["cpu-pinning", "glommio", "futures-lite"] with-glommio = ["cpu-pinning", "glommio", "futures-lite"]
with-mio = ["crossbeam-channel", "histogram", "mio", "socket2", "io-uring", "libc", "bytemuck"] with-mio = ["crossbeam-channel", "histogram", "mio", "socket2"]
with-io-uring = ["crossbeam-channel", "histogram", "socket2", "io-uring", "libc", "bytemuck"]
[dependencies] [dependencies]
anyhow = "1" anyhow = "1"
@ -35,11 +36,15 @@ serde = { version = "1", features = ["derive"] }
slab = "0.4" slab = "0.4"
signal-hook = { version = "0.3" } signal-hook = { version = "0.3" }
# mio # mio / io-uring
crossbeam-channel = { version = "0.5", optional = true } crossbeam-channel = { version = "0.5", optional = true }
histogram = { version = "0.6", optional = true } histogram = { version = "0.6", optional = true }
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"], optional = true }
socket2 = { version = "0.4.1", features = ["all"], optional = true } socket2 = { version = "0.4.1", features = ["all"], optional = true }
# mio
mio = { version = "0.7", features = ["udp", "os-poll", "os-util"], optional = true }
# io-uring
io-uring = { version = "0.5", optional = true } io-uring = { version = "0.5", optional = true }
libc = { version = "0.2", optional = true } libc = { version = "0.2", optional = true }
bytemuck = { version = "1", optional = true } bytemuck = { version = "1", optional = true }

View file

@ -5,7 +5,7 @@ pub mod config;
#[cfg(all(feature = "with-glommio", target_os = "linux"))] #[cfg(all(feature = "with-glommio", target_os = "linux"))]
pub mod glommio; pub mod glommio;
#[cfg(feature = "with-mio")] #[cfg(feature = "with-mio")]
pub mod mio; pub mod other;
use config::Config; use config::Config;
@ -16,7 +16,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
if #[cfg(all(feature = "with-glommio", target_os = "linux"))] { if #[cfg(all(feature = "with-glommio", target_os = "linux"))] {
glommio::run(config) glommio::run(config)
} else { } else {
mio::run(config) other::run(config)
} }
} }
} }

View file

@ -9,7 +9,7 @@ use aquatic_udp_protocol::*;
use crate::common::handlers::*; use crate::common::handlers::*;
use crate::config::Config; use crate::config::Config;
use crate::mio::common::*; use crate::other::common::*;
pub fn run_request_worker( pub fn run_request_worker(
state: State, state: State,

View file

@ -16,7 +16,7 @@ use crate::config::Config;
pub mod common; pub mod common;
pub mod handlers; pub mod handlers;
pub mod network; pub mod network_mio;
pub mod network_uring; pub mod network_uring;
pub mod tasks; pub mod tasks;
@ -99,13 +99,25 @@ pub fn run_inner(config: Config, state: State) -> ::anyhow::Result<()> {
WorkerIndex::SocketWorker(i), WorkerIndex::SocketWorker(i),
); );
network_uring::run_socket_worker( cfg_if::cfg_if!(
state, if #[cfg(feature = "with-io-uring")] {
config, network_uring::run_socket_worker(
i, state,
request_sender, config,
response_receiver, request_sender,
num_bound_sockets, response_receiver,
num_bound_sockets,
)
} else if #[cfg(feature = "with-mio")] {
network_mio::run_socket_worker(
state,
config,
i,
request_sender,
response_receiver,
num_bound_sockets,
)
}
) )
}) })
.with_context(|| "spawn socket worker")?; .with_context(|| "spawn socket worker")?;

View file

@ -1,6 +1,6 @@
use std::io::Cursor; use std::io::Cursor;
use std::mem::size_of; use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4};
use std::os::unix::prelude::AsRawFd; use std::os::unix::prelude::AsRawFd;
use std::ptr::null_mut; use std::ptr::null_mut;
use std::sync::{ use std::sync::{
@ -13,7 +13,9 @@ use aquatic_common::access_list::{create_access_list_cache, AccessListCache};
use crossbeam_channel::{Receiver, Sender}; use crossbeam_channel::{Receiver, Sender};
use io_uring::types::{Fixed, Timespec}; use io_uring::types::{Fixed, Timespec};
use io_uring::SubmissionQueue; use io_uring::SubmissionQueue;
use libc::{AF_INET, AF_INET6, c_void, in6_addr, in_addr, iovec, msghdr, sockaddr_in, sockaddr_in6}; use libc::{
c_void, in6_addr, in_addr, iovec, msghdr, sockaddr_in, sockaddr_in6, AF_INET, AF_INET6,
};
use rand::prelude::{Rng, SeedableRng, StdRng}; use rand::prelude::{Rng, SeedableRng, StdRng};
use slab::Slab; use slab::Slab;
use socket2::{Domain, Protocol, Socket, Type}; use socket2::{Domain, Protocol, Socket, Type};
@ -103,7 +105,6 @@ impl Into<u64> for UserData {
pub fn run_socket_worker( pub fn run_socket_worker(
state: State, state: State,
config: Config, config: Config,
token_num: usize,
request_sender: Sender<(ConnectedRequest, SocketAddr)>, request_sender: Sender<(ConnectedRequest, SocketAddr)>,
response_receiver: Receiver<(ConnectedResponse, SocketAddr)>, response_receiver: Receiver<(ConnectedResponse, SocketAddr)>,
num_bound_sockets: Arc<AtomicUsize>, num_bound_sockets: Arc<AtomicUsize>,

View file

@ -15,8 +15,8 @@ use std::time::Duration;
use aquatic_cli_helpers::run_app_with_cli_and_config; use aquatic_cli_helpers::run_app_with_cli_and_config;
use aquatic_udp::common::*; use aquatic_udp::common::*;
use aquatic_udp::config::Config; use aquatic_udp::config::Config;
use aquatic_udp::mio::common::*; use aquatic_udp::other::common::*;
use aquatic_udp::mio::handlers; use aquatic_udp::other::handlers;
use config::BenchConfig; use config::BenchConfig;