From 54149ed3eb8440fa15b6cbafb2f14e3e0e53141a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Thu, 18 Nov 2021 21:56:48 +0100 Subject: [PATCH] aquatic_common: add work-in-progress SO_ATTACH_REUSEPORT_CBPF impl --- Cargo.lock | 1 + aquatic_common/Cargo.toml | 5 +- aquatic_common/src/cpu_pinning.rs | 103 ++++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 50b5ece..2e2b2f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,6 +93,7 @@ dependencies = [ "hashbrown 0.11.2", "hex", "indexmap-amortized", + "libc", "log", "privdrop", "rand", diff --git a/aquatic_common/Cargo.toml b/aquatic_common/Cargo.toml index e0a4992..b076f77 100644 --- a/aquatic_common/Cargo.toml +++ b/aquatic_common/Cargo.toml @@ -11,7 +11,7 @@ repository = "https://github.com/greatest-ape/aquatic" name = "aquatic_common" [features] -cpu-pinning = ["affinity"] +cpu-pinning = ["affinity", "libc"] [dependencies] ahash = "0.7" @@ -25,4 +25,5 @@ privdrop = "0.5" rand = { version = "0.8", features = ["small_rng"] } serde = { version = "1", features = ["derive"] } -affinity = { version = "0.1", optional = true } \ No newline at end of file +affinity = { version = "0.1", optional = true } +libc = { version = "0.2", optional = true } \ No newline at end of file diff --git a/aquatic_common/src/cpu_pinning.rs b/aquatic_common/src/cpu_pinning.rs index ddfa833..43f506b 100644 --- a/aquatic_common/src/cpu_pinning.rs +++ b/aquatic_common/src/cpu_pinning.rs @@ -104,3 +104,106 @@ pub fn pin_current_if_configured_to( } } } + +/// Tell Linux that incoming messages should be handled by the socket worker +/// with the same index as the CPU core receiving the interrupt. +/// +/// Requires that sockets are actually bound in order, so waiting has to be done +/// in socket workers. +/// +/// It might make sense to first enable RSS or RPS (if hardware doesn't support +/// RSS) and enable sending interrupts to all CPUs that have socket workers +/// running on them. Possibly, CPU 0 should be excluded. +/// +/// More Information: +/// - https://talawah.io/blog/extreme-http-performance-tuning-one-point-two-million/ +/// - https://www.kernel.org/doc/Documentation/networking/scaling.txt +/// - https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/performance_tuning_guide/network-rps +#[cfg(target_os = "linux")] +pub fn socket_attach_cbpf( + socket: &S, + _num_sockets: usize, +) -> ::std::io::Result<()> { + use std::mem::size_of; + use std::os::raw::c_void; + + use libc::{setsockopt, sock_filter, sock_fprog, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF}; + + // Good BPF documentation: https://man.openbsd.org/bpf.4 + + // Values of constants were copied from the following Linux source files: + // - include/uapi/linux/bpf_common.h + // - include/uapi/linux/filter.h + + // Instruction + const BPF_LD: u16 = 0x00; // Load into A + // const BPF_LDX: u16 = 0x01; // Load into X + // const BPF_ALU: u16 = 0x04; // Load into X + const BPF_RET: u16 = 0x06; // Return value + // const BPF_MOD: u16 = 0x90; // Run modulo on A + + // Size + const BPF_W: u16 = 0x00; // 32-bit width + + // Source + // const BPF_IMM: u16 = 0x00; // Use constant (k) + const BPF_ABS: u16 = 0x20; + + // Registers + // const BPF_K: u16 = 0x00; + const BPF_A: u16 = 0x10; + + // k + const SKF_AD_OFF: i32 = -0x1000; // Activate extensions + const SKF_AD_CPU: i32 = 36; // Extension for getting CPU + + // Return index of socket that should receive packet + let mut filter = [ + // Store index of CPU receiving packet in register A + sock_filter { + code: BPF_LD | BPF_W | BPF_ABS, + jt: 0, + jf: 0, + k: u32::from_ne_bytes((SKF_AD_OFF + SKF_AD_CPU).to_ne_bytes()), + }, + /* Disabled, because it doesn't make a lot of sense + // Run A = A % socket_workers + sock_filter { + code: BPF_ALU | BPF_MOD, + jt: 0, + jf: 0, + k: num_sockets as u32, + }, + */ + // Return A + sock_filter { + code: BPF_RET | BPF_A, + jt: 0, + jf: 0, + k: 0, + }, + ]; + + let program = sock_fprog { + filter: filter.as_mut_ptr(), + len: filter.len() as u16, + }; + + let program_ptr: *const sock_fprog = &program; + + unsafe { + let result = setsockopt( + socket.as_raw_fd(), + SOL_SOCKET, + SO_ATTACH_REUSEPORT_CBPF, + program_ptr as *const c_void, + size_of::() as u32, + ); + + if result != 0 { + Err(::std::io::Error::last_os_error()) + } else { + Ok(()) + } + } +}