mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
aquatic_common: add work-in-progress SO_ATTACH_REUSEPORT_CBPF impl
This commit is contained in:
parent
b5643aa7ab
commit
54149ed3eb
3 changed files with 107 additions and 2 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
|
@ -93,6 +93,7 @@ dependencies = [
|
||||||
"hashbrown 0.11.2",
|
"hashbrown 0.11.2",
|
||||||
"hex",
|
"hex",
|
||||||
"indexmap-amortized",
|
"indexmap-amortized",
|
||||||
|
"libc",
|
||||||
"log",
|
"log",
|
||||||
"privdrop",
|
"privdrop",
|
||||||
"rand",
|
"rand",
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ repository = "https://github.com/greatest-ape/aquatic"
|
||||||
name = "aquatic_common"
|
name = "aquatic_common"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
cpu-pinning = ["affinity"]
|
cpu-pinning = ["affinity", "libc"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
ahash = "0.7"
|
ahash = "0.7"
|
||||||
|
|
@ -25,4 +25,5 @@ privdrop = "0.5"
|
||||||
rand = { version = "0.8", features = ["small_rng"] }
|
rand = { version = "0.8", features = ["small_rng"] }
|
||||||
serde = { version = "1", features = ["derive"] }
|
serde = { version = "1", features = ["derive"] }
|
||||||
|
|
||||||
affinity = { version = "0.1", optional = true }
|
affinity = { version = "0.1", optional = true }
|
||||||
|
libc = { version = "0.2", optional = true }
|
||||||
|
|
@ -104,3 +104,106 @@ pub fn pin_current_if_configured_to(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tell Linux that incoming messages should be handled by the socket worker
|
||||||
|
/// with the same index as the CPU core receiving the interrupt.
|
||||||
|
///
|
||||||
|
/// Requires that sockets are actually bound in order, so waiting has to be done
|
||||||
|
/// in socket workers.
|
||||||
|
///
|
||||||
|
/// It might make sense to first enable RSS or RPS (if hardware doesn't support
|
||||||
|
/// RSS) and enable sending interrupts to all CPUs that have socket workers
|
||||||
|
/// running on them. Possibly, CPU 0 should be excluded.
|
||||||
|
///
|
||||||
|
/// More Information:
|
||||||
|
/// - https://talawah.io/blog/extreme-http-performance-tuning-one-point-two-million/
|
||||||
|
/// - https://www.kernel.org/doc/Documentation/networking/scaling.txt
|
||||||
|
/// - https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/6/html/performance_tuning_guide/network-rps
|
||||||
|
#[cfg(target_os = "linux")]
|
||||||
|
pub fn socket_attach_cbpf<S: ::std::os::unix::prelude::AsRawFd>(
|
||||||
|
socket: &S,
|
||||||
|
_num_sockets: usize,
|
||||||
|
) -> ::std::io::Result<()> {
|
||||||
|
use std::mem::size_of;
|
||||||
|
use std::os::raw::c_void;
|
||||||
|
|
||||||
|
use libc::{setsockopt, sock_filter, sock_fprog, SOL_SOCKET, SO_ATTACH_REUSEPORT_CBPF};
|
||||||
|
|
||||||
|
// Good BPF documentation: https://man.openbsd.org/bpf.4
|
||||||
|
|
||||||
|
// Values of constants were copied from the following Linux source files:
|
||||||
|
// - include/uapi/linux/bpf_common.h
|
||||||
|
// - include/uapi/linux/filter.h
|
||||||
|
|
||||||
|
// Instruction
|
||||||
|
const BPF_LD: u16 = 0x00; // Load into A
|
||||||
|
// const BPF_LDX: u16 = 0x01; // Load into X
|
||||||
|
// const BPF_ALU: u16 = 0x04; // Load into X
|
||||||
|
const BPF_RET: u16 = 0x06; // Return value
|
||||||
|
// const BPF_MOD: u16 = 0x90; // Run modulo on A
|
||||||
|
|
||||||
|
// Size
|
||||||
|
const BPF_W: u16 = 0x00; // 32-bit width
|
||||||
|
|
||||||
|
// Source
|
||||||
|
// const BPF_IMM: u16 = 0x00; // Use constant (k)
|
||||||
|
const BPF_ABS: u16 = 0x20;
|
||||||
|
|
||||||
|
// Registers
|
||||||
|
// const BPF_K: u16 = 0x00;
|
||||||
|
const BPF_A: u16 = 0x10;
|
||||||
|
|
||||||
|
// k
|
||||||
|
const SKF_AD_OFF: i32 = -0x1000; // Activate extensions
|
||||||
|
const SKF_AD_CPU: i32 = 36; // Extension for getting CPU
|
||||||
|
|
||||||
|
// Return index of socket that should receive packet
|
||||||
|
let mut filter = [
|
||||||
|
// Store index of CPU receiving packet in register A
|
||||||
|
sock_filter {
|
||||||
|
code: BPF_LD | BPF_W | BPF_ABS,
|
||||||
|
jt: 0,
|
||||||
|
jf: 0,
|
||||||
|
k: u32::from_ne_bytes((SKF_AD_OFF + SKF_AD_CPU).to_ne_bytes()),
|
||||||
|
},
|
||||||
|
/* Disabled, because it doesn't make a lot of sense
|
||||||
|
// Run A = A % socket_workers
|
||||||
|
sock_filter {
|
||||||
|
code: BPF_ALU | BPF_MOD,
|
||||||
|
jt: 0,
|
||||||
|
jf: 0,
|
||||||
|
k: num_sockets as u32,
|
||||||
|
},
|
||||||
|
*/
|
||||||
|
// Return A
|
||||||
|
sock_filter {
|
||||||
|
code: BPF_RET | BPF_A,
|
||||||
|
jt: 0,
|
||||||
|
jf: 0,
|
||||||
|
k: 0,
|
||||||
|
},
|
||||||
|
];
|
||||||
|
|
||||||
|
let program = sock_fprog {
|
||||||
|
filter: filter.as_mut_ptr(),
|
||||||
|
len: filter.len() as u16,
|
||||||
|
};
|
||||||
|
|
||||||
|
let program_ptr: *const sock_fprog = &program;
|
||||||
|
|
||||||
|
unsafe {
|
||||||
|
let result = setsockopt(
|
||||||
|
socket.as_raw_fd(),
|
||||||
|
SOL_SOCKET,
|
||||||
|
SO_ATTACH_REUSEPORT_CBPF,
|
||||||
|
program_ptr as *const c_void,
|
||||||
|
size_of::<sock_fprog>() as u32,
|
||||||
|
);
|
||||||
|
|
||||||
|
if result != 0 {
|
||||||
|
Err(::std::io::Error::last_os_error())
|
||||||
|
} else {
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue