From efbf51ba19f679176567dc2f120feca5117ced95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 14 Nov 2021 02:39:51 +0100 Subject: [PATCH] udp: io-uring: add ipv6 support --- aquatic_udp/src/lib/mio/network_uring.rs | 272 +++++++++++++++-------- 1 file changed, 176 insertions(+), 96 deletions(-) diff --git a/aquatic_udp/src/lib/mio/network_uring.rs b/aquatic_udp/src/lib/mio/network_uring.rs index 93b80bb..078461a 100644 --- a/aquatic_udp/src/lib/mio/network_uring.rs +++ b/aquatic_udp/src/lib/mio/network_uring.rs @@ -1,19 +1,19 @@ use std::io::Cursor; -use std::mem::size_of_val; -use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; -use std::os::unix::prelude::{AsRawFd}; -use std::ptr::{null_mut}; +use std::mem::size_of; +use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; +use std::os::unix::prelude::AsRawFd; +use std::ptr::null_mut; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; use std::time::{Duration, Instant}; -use aquatic_common::access_list::{AccessListCache, create_access_list_cache}; +use aquatic_common::access_list::{create_access_list_cache, AccessListCache}; use crossbeam_channel::{Receiver, Sender}; -use io_uring::SubmissionQueue; use io_uring::types::{Fixed, Timespec}; -use libc::{c_void, in_addr, iovec, msghdr, sockaddr_in}; +use io_uring::SubmissionQueue; +use libc::{AF_INET, AF_INET6, c_void, in6_addr, in_addr, iovec, msghdr, sockaddr_in, sockaddr_in6}; use rand::prelude::{Rng, SeedableRng, StdRng}; use slab::Slab; use socket2::{Domain, Protocol, Socket, Type}; @@ -34,24 +34,16 @@ const NUM_BUFFERS: usize = MAX_RECV_EVENTS + MAX_SEND_EVENTS; #[derive(Clone, Copy, Debug, PartialEq)] enum UserData { - RecvMsg { - slab_key: usize, - }, - SendMsg { - slab_key: usize, - }, + RecvMsg { slab_key: usize }, + SendMsg { slab_key: usize }, Timeout, } impl UserData { fn get_buffer_index(&self) -> usize { match self { - Self::RecvMsg { slab_key } => { - *slab_key - } - Self::SendMsg { slab_key } => { - slab_key + MAX_RECV_EVENTS - } + Self::RecvMsg { slab_key } => *slab_key, + Self::SendMsg { slab_key } => slab_key + MAX_RECV_EVENTS, Self::Timeout => { unreachable!() } @@ -131,44 +123,62 @@ pub fn run_socket_worker( let mut iter_counter = 0usize; let mut last_cleaning = Instant::now(); - let mut buffers: Vec<[u8; MAX_PACKET_SIZE]> = (0..NUM_BUFFERS).map(|_| [0; MAX_PACKET_SIZE]).collect(); + let mut buffers: Vec<[u8; MAX_PACKET_SIZE]> = + (0..NUM_BUFFERS).map(|_| [0; MAX_PACKET_SIZE]).collect(); - let mut sockaddrs_ipv4 = [ - sockaddr_in { - sin_addr: in_addr { - s_addr: 0, - }, - sin_port: 0, - sin_family: 0, - sin_zero: Default::default(), - } - ; NUM_BUFFERS - ]; + let mut sockaddrs_ipv4 = [sockaddr_in { + sin_addr: in_addr { s_addr: 0 }, + sin_port: 0, + sin_family: AF_INET as u16, + sin_zero: Default::default(), + }; NUM_BUFFERS]; - let mut iovs: Vec = (0..NUM_BUFFERS).map(|i| { - let iov_base = buffers[i].as_mut_ptr() as *mut c_void; - let iov_len = MAX_PACKET_SIZE; + let mut sockaddrs_ipv6 = [sockaddr_in6 { + sin6_addr: in6_addr { s6_addr: [0; 16] }, + sin6_port: 0, + sin6_family: AF_INET6 as u16, + sin6_flowinfo: 0, + sin6_scope_id: 0, + }; NUM_BUFFERS]; - iovec { - iov_base, - iov_len, - } - }).collect(); + let mut iovs: Vec = (0..NUM_BUFFERS) + .map(|i| { + let iov_base = buffers[i].as_mut_ptr() as *mut c_void; + let iov_len = MAX_PACKET_SIZE; - let mut msghdrs: Vec = (0..NUM_BUFFERS).map(|i| { - let msg_iov: *mut iovec = &mut iovs[i]; - let msg_name: *mut sockaddr_in = &mut sockaddrs_ipv4[i]; + iovec { iov_base, iov_len } + }) + .collect(); - msghdr { - msg_name: msg_name as *mut c_void, - msg_namelen: size_of_val(&sockaddrs_ipv4[i]) as u32, - msg_iov, - msg_iovlen: 1, - msg_control: null_mut(), - msg_controllen: 0, - msg_flags: 0, - } - }).collect(); + let mut msghdrs: Vec = (0..NUM_BUFFERS) + .map(|i| { + let msg_iov: *mut iovec = &mut iovs[i]; + + let mut msghdr = msghdr { + msg_name: null_mut(), + msg_namelen: 0, + msg_iov, + msg_iovlen: 1, + msg_control: null_mut(), + msg_controllen: 0, + msg_flags: 0, + }; + + if config.network.address.is_ipv4() { + let ptr: *mut sockaddr_in = &mut sockaddrs_ipv4[i]; + + msghdr.msg_name = ptr as *mut c_void; + msghdr.msg_namelen = size_of::() as u32; + } else { + let ptr: *mut sockaddr_in6 = &mut sockaddrs_ipv6[i]; + + msghdr.msg_name = ptr as *mut c_void; + msghdr.msg_namelen = size_of::() as u32; + } + + msghdr + }) + .collect(); let timeout = Timespec::new().nsec(500_000_000); let mut timeout_set = false; @@ -195,20 +205,47 @@ pub fn run_socket_worker( let result = entry.result(); if result < 0 { - ::log::info!("recvmsg error {}: {:#}", result, ::std::io::Error::from_raw_os_error(-result)); + ::log::info!( + "recvmsg error {}: {:#}", + result, + ::std::io::Error::from_raw_os_error(-result) + ); } else if result == 0 { ::log::info!("recvmsg error: 0 bytes read"); } else { let buffer_index = user_data.get_buffer_index(); let buffer_len = result as usize; - let src = SocketAddrV4::new( - Ipv4Addr::from(u32::from_be(sockaddrs_ipv4[buffer_index].sin_addr.s_addr)), - u16::from_be(sockaddrs_ipv4[buffer_index].sin_port), - ); + let addr = if config.network.address.is_ipv4() { + SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::from(u32::from_be( + sockaddrs_ipv4[buffer_index].sin_addr.s_addr, + )), + u16::from_be(sockaddrs_ipv4[buffer_index].sin_port), + )) + } else { + let mut octets = sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr; + let port = u16::from_be(sockaddrs_ipv6[buffer_index].sin6_port); - let res_request = - Request::from_bytes(&buffers[buffer_index][..buffer_len], config.protocol.max_scrape_torrents); + for byte in octets.iter_mut() { + *byte = u8::from_be(*byte); + } + + let ip = match octets { + // Convert IPv4-mapped address (available in std but nightly-only) + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0xff, 0xff, a, b, c, d] => { + Ipv4Addr::new(a, b, c, d).into() + } + octets => Ipv6Addr::from(octets).into(), + }; + + SocketAddr::new(ip, port) + }; + + let res_request = Request::from_bytes( + &buffers[buffer_index][..buffer_len], + config.protocol.max_scrape_torrents, + ); handle_request( &config, @@ -219,7 +256,7 @@ pub fn run_socket_worker( &request_sender, &mut local_responses, res_request, - SocketAddr::V4(src), + addr, ); } } @@ -227,7 +264,10 @@ pub fn run_socket_worker( send_entries.remove(slab_key); if entry.result() < 0 { - ::log::info!("recvmsg error: {:#}", ::std::io::Error::from_raw_os_error(-entry.result())); + ::log::info!( + "sendmsg error: {:#}", + ::std::io::Error::from_raw_os_error(-entry.result()) + ); } } UserData::Timeout => { @@ -240,11 +280,11 @@ pub fn run_socket_worker( let slab_key = recv_entries.insert(()); let user_data = UserData::RecvMsg { slab_key }; - let buffer_index = user_data.get_buffer_index(); + let msghdr_ptr: *mut msghdr = &mut msghdrs[user_data.get_buffer_index()]; - let buf_ptr: *mut msghdr = &mut msghdrs[buffer_index]; - - let entry = io_uring::opcode::RecvMsg::new(fd, buf_ptr).build().user_data(user_data.into()); + let entry = io_uring::opcode::RecvMsg::new(fd, msghdr_ptr) + .build() + .user_data(user_data.into()); unsafe { sq.push(&entry).unwrap(); @@ -257,7 +297,9 @@ pub fn run_socket_worker( let timespec_ptr: *const Timespec = &timeout; - let entry = io_uring::opcode::Timeout::new(timespec_ptr).build().user_data(user_data.into()); + let entry = io_uring::opcode::Timeout::new(timespec_ptr) + .build() + .user_data(user_data.into()); unsafe { sq.push(&entry).unwrap(); @@ -268,12 +310,40 @@ pub fn run_socket_worker( let num_local_to_queue = (MAX_SEND_EVENTS - send_entries.len()).min(local_responses.len()); - for (response, addr) in local_responses.drain(local_responses.len() - num_local_to_queue..) { - queue_response(&mut sq, fd, &mut send_entries, &mut buffers, &mut iovs, &mut sockaddrs_ipv4, &mut msghdrs, response, addr); + for (response, addr) in local_responses.drain(local_responses.len() - num_local_to_queue..) + { + queue_response( + &config, + &mut sq, + fd, + &mut send_entries, + &mut buffers, + &mut iovs, + &mut sockaddrs_ipv4, + &mut sockaddrs_ipv6, + &mut msghdrs, + response, + addr, + ); } - for (response, addr) in response_receiver.try_iter().take(MAX_SEND_EVENTS - send_entries.len()) { - queue_response(&mut sq, fd, &mut send_entries, &mut buffers, &mut iovs, &mut sockaddrs_ipv4, &mut msghdrs, response.into(), addr); + for (response, addr) in response_receiver + .try_iter() + .take(MAX_SEND_EVENTS - send_entries.len()) + { + queue_response( + &config, + &mut sq, + fd, + &mut send_entries, + &mut buffers, + &mut iovs, + &mut sockaddrs_ipv4, + &mut sockaddrs_ipv6, + &mut msghdrs, + response.into(), + addr, + ); } if iter_counter % 32 == 0 { @@ -306,15 +376,17 @@ pub fn run_socket_worker( } fn queue_response( + config: &Config, sq: &mut SubmissionQueue, fd: Fixed, send_events: &mut Slab<()>, buffers: &mut [[u8; MAX_PACKET_SIZE]], iovs: &mut [iovec], - sockaddrs: &mut [sockaddr_in], + sockaddrs_ipv4: &mut [sockaddr_in], + sockaddrs_ipv6: &mut [sockaddr_in6], msghdrs: &mut [msghdr], response: Response, - src: SocketAddr, + addr: SocketAddr, ) { let slab_key = send_events.insert(()); let user_data = UserData::SendMsg { slab_key }; @@ -323,27 +395,43 @@ fn queue_response( let mut cursor = Cursor::new(&mut buffers[buffer_index][..]); - match response.write(&mut cursor, ip_version_from_ip(src.ip())) { + match response.write(&mut cursor, ip_version_from_ip(addr.ip())) { Ok(()) => { iovs[buffer_index].iov_len = cursor.position() as usize; - let src = if let SocketAddr::V4(src) = src { - src - } else { - return; // FIXME - }; + if config.network.address.is_ipv4() { + let addr = if let SocketAddr::V4(addr) = addr { + addr + } else { + unreachable!(); + }; - sockaddrs[buffer_index].sin_addr.s_addr = u32::to_be((*src.ip()).into()); - sockaddrs[buffer_index].sin_port = u16::to_be(src.port()); + sockaddrs_ipv4[buffer_index].sin_addr.s_addr = u32::to_be((*addr.ip()).into()); + sockaddrs_ipv4[buffer_index].sin_port = u16::to_be(addr.port()); + } else { + let mut octets = match addr { + SocketAddr::V4(addr) => addr.ip().to_ipv6_mapped().octets(), + SocketAddr::V6(addr) => addr.ip().octets(), + }; + + for byte in octets.iter_mut() { + *byte = byte.to_be(); + } + + sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr = octets; + sockaddrs_ipv6[buffer_index].sin6_port = u16::to_be(addr.port()); + } } Err(err) => { ::log::error!("Response::write error: {:?}", err); } } - let buf_ptr: *mut msghdr = &mut msghdrs[buffer_index]; + let msghdr_ptr: *mut msghdr = &mut msghdrs[buffer_index]; - let entry = io_uring::opcode::SendMsg::new(fd, buf_ptr).build().user_data(user_data.into()); + let entry = io_uring::opcode::SendMsg::new(fd, msghdr_ptr) + .build() + .user_data(user_data.into()); unsafe { sq.push(&entry).unwrap(); @@ -395,7 +483,6 @@ fn handle_request( res_request: Result, src: SocketAddr, ) { - let valid_until = ValidUntil::new(config.cleaning.max_connection_age); let access_list_mode = config.access_list.mode; @@ -418,8 +505,8 @@ fn handle_request( .load() .allows(access_list_mode, &request.info_hash.0) { - if let Err(err) = request_sender - .try_send((ConnectedRequest::Announce(request), src)) + if let Err(err) = + request_sender.try_send((ConnectedRequest::Announce(request), src)) { ::log::warn!("request_sender.try_send failed: {:?}", err) } @@ -465,7 +552,6 @@ fn handle_request( } } } - } fn ip_version_from_ip(ip: IpAddr) -> IpVersion { @@ -496,18 +582,12 @@ mod tests { let slab_key = slab_key as usize; if b { - UserData::RecvMsg { - slab_key - } + UserData::RecvMsg { slab_key } } else { - UserData::SendMsg { - slab_key - } + UserData::SendMsg { slab_key } } } - _ => { - UserData::Timeout - } + _ => UserData::Timeout, } } } @@ -519,4 +599,4 @@ mod tests { a == b } -} \ No newline at end of file +}