udp: io-uring: add ipv6 support

This commit is contained in:
Joakim Frostegård 2021-11-14 02:39:51 +01:00
parent c5916d9633
commit efbf51ba19

View file

@ -1,19 +1,19 @@
use std::io::Cursor;
use std::mem::size_of_val;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4};
use std::os::unix::prelude::{AsRawFd};
use std::ptr::{null_mut};
use std::mem::size_of;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::os::unix::prelude::AsRawFd;
use std::ptr::null_mut;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
use std::time::{Duration, Instant};
use aquatic_common::access_list::{AccessListCache, create_access_list_cache};
use aquatic_common::access_list::{create_access_list_cache, AccessListCache};
use crossbeam_channel::{Receiver, Sender};
use io_uring::SubmissionQueue;
use io_uring::types::{Fixed, Timespec};
use libc::{c_void, in_addr, iovec, msghdr, sockaddr_in};
use io_uring::SubmissionQueue;
use libc::{AF_INET, AF_INET6, c_void, in6_addr, in_addr, iovec, msghdr, sockaddr_in, sockaddr_in6};
use rand::prelude::{Rng, SeedableRng, StdRng};
use slab::Slab;
use socket2::{Domain, Protocol, Socket, Type};
@ -34,24 +34,16 @@ const NUM_BUFFERS: usize = MAX_RECV_EVENTS + MAX_SEND_EVENTS;
#[derive(Clone, Copy, Debug, PartialEq)]
enum UserData {
RecvMsg {
slab_key: usize,
},
SendMsg {
slab_key: usize,
},
RecvMsg { slab_key: usize },
SendMsg { slab_key: usize },
Timeout,
}
impl UserData {
fn get_buffer_index(&self) -> usize {
match self {
Self::RecvMsg { slab_key } => {
*slab_key
}
Self::SendMsg { slab_key } => {
slab_key + MAX_RECV_EVENTS
}
Self::RecvMsg { slab_key } => *slab_key,
Self::SendMsg { slab_key } => slab_key + MAX_RECV_EVENTS,
Self::Timeout => {
unreachable!()
}
@ -131,44 +123,62 @@ pub fn run_socket_worker(
let mut iter_counter = 0usize;
let mut last_cleaning = Instant::now();
let mut buffers: Vec<[u8; MAX_PACKET_SIZE]> = (0..NUM_BUFFERS).map(|_| [0; MAX_PACKET_SIZE]).collect();
let mut buffers: Vec<[u8; MAX_PACKET_SIZE]> =
(0..NUM_BUFFERS).map(|_| [0; MAX_PACKET_SIZE]).collect();
let mut sockaddrs_ipv4 = [
sockaddr_in {
sin_addr: in_addr {
s_addr: 0,
},
sin_port: 0,
sin_family: 0,
sin_zero: Default::default(),
}
; NUM_BUFFERS
];
let mut sockaddrs_ipv4 = [sockaddr_in {
sin_addr: in_addr { s_addr: 0 },
sin_port: 0,
sin_family: AF_INET as u16,
sin_zero: Default::default(),
}; NUM_BUFFERS];
let mut iovs: Vec<iovec> = (0..NUM_BUFFERS).map(|i| {
let iov_base = buffers[i].as_mut_ptr() as *mut c_void;
let iov_len = MAX_PACKET_SIZE;
let mut sockaddrs_ipv6 = [sockaddr_in6 {
sin6_addr: in6_addr { s6_addr: [0; 16] },
sin6_port: 0,
sin6_family: AF_INET6 as u16,
sin6_flowinfo: 0,
sin6_scope_id: 0,
}; NUM_BUFFERS];
iovec {
iov_base,
iov_len,
}
}).collect();
let mut iovs: Vec<iovec> = (0..NUM_BUFFERS)
.map(|i| {
let iov_base = buffers[i].as_mut_ptr() as *mut c_void;
let iov_len = MAX_PACKET_SIZE;
let mut msghdrs: Vec<msghdr> = (0..NUM_BUFFERS).map(|i| {
let msg_iov: *mut iovec = &mut iovs[i];
let msg_name: *mut sockaddr_in = &mut sockaddrs_ipv4[i];
iovec { iov_base, iov_len }
})
.collect();
msghdr {
msg_name: msg_name as *mut c_void,
msg_namelen: size_of_val(&sockaddrs_ipv4[i]) as u32,
msg_iov,
msg_iovlen: 1,
msg_control: null_mut(),
msg_controllen: 0,
msg_flags: 0,
}
}).collect();
let mut msghdrs: Vec<msghdr> = (0..NUM_BUFFERS)
.map(|i| {
let msg_iov: *mut iovec = &mut iovs[i];
let mut msghdr = msghdr {
msg_name: null_mut(),
msg_namelen: 0,
msg_iov,
msg_iovlen: 1,
msg_control: null_mut(),
msg_controllen: 0,
msg_flags: 0,
};
if config.network.address.is_ipv4() {
let ptr: *mut sockaddr_in = &mut sockaddrs_ipv4[i];
msghdr.msg_name = ptr as *mut c_void;
msghdr.msg_namelen = size_of::<sockaddr_in>() as u32;
} else {
let ptr: *mut sockaddr_in6 = &mut sockaddrs_ipv6[i];
msghdr.msg_name = ptr as *mut c_void;
msghdr.msg_namelen = size_of::<sockaddr_in6>() as u32;
}
msghdr
})
.collect();
let timeout = Timespec::new().nsec(500_000_000);
let mut timeout_set = false;
@ -195,20 +205,47 @@ pub fn run_socket_worker(
let result = entry.result();
if result < 0 {
::log::info!("recvmsg error {}: {:#}", result, ::std::io::Error::from_raw_os_error(-result));
::log::info!(
"recvmsg error {}: {:#}",
result,
::std::io::Error::from_raw_os_error(-result)
);
} else if result == 0 {
::log::info!("recvmsg error: 0 bytes read");
} else {
let buffer_index = user_data.get_buffer_index();
let buffer_len = result as usize;
let src = SocketAddrV4::new(
Ipv4Addr::from(u32::from_be(sockaddrs_ipv4[buffer_index].sin_addr.s_addr)),
u16::from_be(sockaddrs_ipv4[buffer_index].sin_port),
);
let addr = if config.network.address.is_ipv4() {
SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::from(u32::from_be(
sockaddrs_ipv4[buffer_index].sin_addr.s_addr,
)),
u16::from_be(sockaddrs_ipv4[buffer_index].sin_port),
))
} else {
let mut octets = sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr;
let port = u16::from_be(sockaddrs_ipv6[buffer_index].sin6_port);
let res_request =
Request::from_bytes(&buffers[buffer_index][..buffer_len], config.protocol.max_scrape_torrents);
for byte in octets.iter_mut() {
*byte = u8::from_be(*byte);
}
let ip = match octets {
// Convert IPv4-mapped address (available in std but nightly-only)
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => {
Ipv4Addr::new(a, b, c, d).into()
}
octets => Ipv6Addr::from(octets).into(),
};
SocketAddr::new(ip, port)
};
let res_request = Request::from_bytes(
&buffers[buffer_index][..buffer_len],
config.protocol.max_scrape_torrents,
);
handle_request(
&config,
@ -219,7 +256,7 @@ pub fn run_socket_worker(
&request_sender,
&mut local_responses,
res_request,
SocketAddr::V4(src),
addr,
);
}
}
@ -227,7 +264,10 @@ pub fn run_socket_worker(
send_entries.remove(slab_key);
if entry.result() < 0 {
::log::info!("recvmsg error: {:#}", ::std::io::Error::from_raw_os_error(-entry.result()));
::log::info!(
"sendmsg error: {:#}",
::std::io::Error::from_raw_os_error(-entry.result())
);
}
}
UserData::Timeout => {
@ -240,11 +280,11 @@ pub fn run_socket_worker(
let slab_key = recv_entries.insert(());
let user_data = UserData::RecvMsg { slab_key };
let buffer_index = user_data.get_buffer_index();
let msghdr_ptr: *mut msghdr = &mut msghdrs[user_data.get_buffer_index()];
let buf_ptr: *mut msghdr = &mut msghdrs[buffer_index];
let entry = io_uring::opcode::RecvMsg::new(fd, buf_ptr).build().user_data(user_data.into());
let entry = io_uring::opcode::RecvMsg::new(fd, msghdr_ptr)
.build()
.user_data(user_data.into());
unsafe {
sq.push(&entry).unwrap();
@ -257,7 +297,9 @@ pub fn run_socket_worker(
let timespec_ptr: *const Timespec = &timeout;
let entry = io_uring::opcode::Timeout::new(timespec_ptr).build().user_data(user_data.into());
let entry = io_uring::opcode::Timeout::new(timespec_ptr)
.build()
.user_data(user_data.into());
unsafe {
sq.push(&entry).unwrap();
@ -268,12 +310,40 @@ pub fn run_socket_worker(
let num_local_to_queue = (MAX_SEND_EVENTS - send_entries.len()).min(local_responses.len());
for (response, addr) in local_responses.drain(local_responses.len() - num_local_to_queue..) {
queue_response(&mut sq, fd, &mut send_entries, &mut buffers, &mut iovs, &mut sockaddrs_ipv4, &mut msghdrs, response, addr);
for (response, addr) in local_responses.drain(local_responses.len() - num_local_to_queue..)
{
queue_response(
&config,
&mut sq,
fd,
&mut send_entries,
&mut buffers,
&mut iovs,
&mut sockaddrs_ipv4,
&mut sockaddrs_ipv6,
&mut msghdrs,
response,
addr,
);
}
for (response, addr) in response_receiver.try_iter().take(MAX_SEND_EVENTS - send_entries.len()) {
queue_response(&mut sq, fd, &mut send_entries, &mut buffers, &mut iovs, &mut sockaddrs_ipv4, &mut msghdrs, response.into(), addr);
for (response, addr) in response_receiver
.try_iter()
.take(MAX_SEND_EVENTS - send_entries.len())
{
queue_response(
&config,
&mut sq,
fd,
&mut send_entries,
&mut buffers,
&mut iovs,
&mut sockaddrs_ipv4,
&mut sockaddrs_ipv6,
&mut msghdrs,
response.into(),
addr,
);
}
if iter_counter % 32 == 0 {
@ -306,15 +376,17 @@ pub fn run_socket_worker(
}
fn queue_response(
config: &Config,
sq: &mut SubmissionQueue,
fd: Fixed,
send_events: &mut Slab<()>,
buffers: &mut [[u8; MAX_PACKET_SIZE]],
iovs: &mut [iovec],
sockaddrs: &mut [sockaddr_in],
sockaddrs_ipv4: &mut [sockaddr_in],
sockaddrs_ipv6: &mut [sockaddr_in6],
msghdrs: &mut [msghdr],
response: Response,
src: SocketAddr,
addr: SocketAddr,
) {
let slab_key = send_events.insert(());
let user_data = UserData::SendMsg { slab_key };
@ -323,27 +395,43 @@ fn queue_response(
let mut cursor = Cursor::new(&mut buffers[buffer_index][..]);
match response.write(&mut cursor, ip_version_from_ip(src.ip())) {
match response.write(&mut cursor, ip_version_from_ip(addr.ip())) {
Ok(()) => {
iovs[buffer_index].iov_len = cursor.position() as usize;
let src = if let SocketAddr::V4(src) = src {
src
} else {
return; // FIXME
};
if config.network.address.is_ipv4() {
let addr = if let SocketAddr::V4(addr) = addr {
addr
} else {
unreachable!();
};
sockaddrs[buffer_index].sin_addr.s_addr = u32::to_be((*src.ip()).into());
sockaddrs[buffer_index].sin_port = u16::to_be(src.port());
sockaddrs_ipv4[buffer_index].sin_addr.s_addr = u32::to_be((*addr.ip()).into());
sockaddrs_ipv4[buffer_index].sin_port = u16::to_be(addr.port());
} else {
let mut octets = match addr {
SocketAddr::V4(addr) => addr.ip().to_ipv6_mapped().octets(),
SocketAddr::V6(addr) => addr.ip().octets(),
};
for byte in octets.iter_mut() {
*byte = byte.to_be();
}
sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr = octets;
sockaddrs_ipv6[buffer_index].sin6_port = u16::to_be(addr.port());
}
}
Err(err) => {
::log::error!("Response::write error: {:?}", err);
}
}
let buf_ptr: *mut msghdr = &mut msghdrs[buffer_index];
let msghdr_ptr: *mut msghdr = &mut msghdrs[buffer_index];
let entry = io_uring::opcode::SendMsg::new(fd, buf_ptr).build().user_data(user_data.into());
let entry = io_uring::opcode::SendMsg::new(fd, msghdr_ptr)
.build()
.user_data(user_data.into());
unsafe {
sq.push(&entry).unwrap();
@ -395,7 +483,6 @@ fn handle_request(
res_request: Result<Request, RequestParseError>,
src: SocketAddr,
) {
let valid_until = ValidUntil::new(config.cleaning.max_connection_age);
let access_list_mode = config.access_list.mode;
@ -418,8 +505,8 @@ fn handle_request(
.load()
.allows(access_list_mode, &request.info_hash.0)
{
if let Err(err) = request_sender
.try_send((ConnectedRequest::Announce(request), src))
if let Err(err) =
request_sender.try_send((ConnectedRequest::Announce(request), src))
{
::log::warn!("request_sender.try_send failed: {:?}", err)
}
@ -465,7 +552,6 @@ fn handle_request(
}
}
}
}
fn ip_version_from_ip(ip: IpAddr) -> IpVersion {
@ -496,18 +582,12 @@ mod tests {
let slab_key = slab_key as usize;
if b {
UserData::RecvMsg {
slab_key
}
UserData::RecvMsg { slab_key }
} else {
UserData::SendMsg {
slab_key
}
UserData::SendMsg { slab_key }
}
}
_ => {
UserData::Timeout
}
_ => UserData::Timeout,
}
}
}
@ -519,4 +599,4 @@ mod tests {
a == b
}
}
}