From 339feb3d0a44ee958e1284275ea222ad968b6f01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Mar 2023 13:14:35 +0100 Subject: [PATCH 1/8] udp: uring: rewrite SendBuffers to use UnsafeCell --- .../src/workers/socket/uring/send_buffers.rs | 228 +++++++++--------- 1 file changed, 118 insertions(+), 110 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/send_buffers.rs b/aquatic_udp/src/workers/socket/uring/send_buffers.rs index 0eae144..cd711f0 100644 --- a/aquatic_udp/src/workers/socket/uring/send_buffers.rs +++ b/aquatic_udp/src/workers/socket/uring/send_buffers.rs @@ -1,4 +1,4 @@ -use std::{io::Cursor, net::IpAddr, ops::IndexMut, ptr::null_mut}; +use std::{cell::UnsafeCell, io::Cursor, net::SocketAddr, ops::IndexMut, ptr::null_mut}; use aquatic_common::CanonicalSocketAddr; use aquatic_udp_protocol::Response; @@ -32,114 +32,110 @@ impl ResponseType { } } -pub struct SendBuffers { - likely_next_free_index: usize, - network_address: IpAddr, - names_v4: Vec, - names_v6: Vec, - buffers: Vec<[u8; BUF_LEN]>, - iovecs: Vec, - msghdrs: Vec, - free: Vec, +struct SendBuffer { + name_v4: UnsafeCell, + name_v6: UnsafeCell, + bytes: UnsafeCell<[u8; BUF_LEN]>, + iovec: UnsafeCell, + msghdr: UnsafeCell, + free: bool, // Only used for statistics - receiver_is_ipv4: Vec, + receiver_is_ipv4: bool, // Only used for statistics - response_types: Vec, + response_type: ResponseType, } -impl SendBuffers { - pub fn new(config: &Config, capacity: usize) -> Self { - let mut buffers = ::std::iter::repeat([0u8; BUF_LEN]) - .take(capacity) - .collect::>(); - - let mut iovecs = buffers - .iter_mut() - .map(|buffer| libc::iovec { - iov_base: buffer.as_mut_ptr() as *mut libc::c_void, - iov_len: buffer.len(), - }) - .collect::>(); - - let (names_v4, names_v6, msghdrs) = if config.network.address.is_ipv4() { - let mut names_v4 = ::std::iter::repeat(libc::sockaddr_in { +impl SendBuffer { + fn new() -> Self { + Self { + name_v4: UnsafeCell::new(libc::sockaddr_in { sin_family: libc::AF_INET as u16, sin_port: 0, sin_addr: libc::in_addr { s_addr: 0 }, sin_zero: [0; 8], - }) - .take(capacity) - .collect::>(); - - let msghdrs = names_v4 - .iter_mut() - .zip(iovecs.iter_mut()) - .map(|(msg_name, msg_iov)| libc::msghdr { - msg_name: msg_name as *mut _ as *mut libc::c_void, - msg_namelen: core::mem::size_of::() as u32, - msg_iov: msg_iov as *mut _, - msg_iovlen: 1, - msg_control: null_mut(), - msg_controllen: 0, - msg_flags: 0, - }) - .collect::>(); - - (names_v4, Vec::new(), msghdrs) - } else { - let mut names_v6 = ::std::iter::repeat(libc::sockaddr_in6 { + }), + name_v6: UnsafeCell::new(libc::sockaddr_in6 { sin6_family: libc::AF_INET6 as u16, sin6_port: 0, sin6_flowinfo: 0, sin6_addr: libc::in6_addr { s6_addr: [0; 16] }, sin6_scope_id: 0, - }) + }), + bytes: UnsafeCell::new([0; BUF_LEN]), + iovec: UnsafeCell::new(libc::iovec { + iov_base: null_mut(), + iov_len: 0, + }), + msghdr: UnsafeCell::new(libc::msghdr { + msg_name: null_mut(), + msg_namelen: 0, + msg_iov: null_mut(), + msg_iovlen: 1, + msg_control: null_mut(), + msg_controllen: 0, + msg_flags: 0, + }), + free: true, + receiver_is_ipv4: true, + response_type: ResponseType::Connect, + } + } +} + +pub struct SendBuffers { + likely_next_free_index: usize, + socket_is_ipv4: bool, + buffers: Box<[SendBuffer]>, +} + +impl SendBuffers { + pub fn new(config: &Config, capacity: usize) -> Self { + let socket_is_ipv4 = config.network.address.is_ipv4(); + + let mut buffers = ::std::iter::repeat_with(|| SendBuffer::new()) .take(capacity) - .collect::>(); + .collect::>() + .into_boxed_slice(); - let msghdrs = names_v6 - .iter_mut() - .zip(iovecs.iter_mut()) - .map(|(msg_name, msg_iov)| libc::msghdr { - msg_name: msg_name as *mut _ as *mut libc::c_void, - msg_namelen: core::mem::size_of::() as u32, - msg_iov: msg_iov as *mut _, - msg_iovlen: 1, - msg_control: null_mut(), - msg_controllen: 0, - msg_flags: 0, - }) - .collect::>(); + for buffer in buffers.iter_mut() { + unsafe { + let iovec = &mut *buffer.iovec.get(); - (Vec::new(), names_v6, msghdrs) - }; + iovec.iov_base = buffer.bytes.get() as *mut libc::c_void; + iovec.iov_len = (&*buffer.bytes.get()).len(); + } + unsafe { + let msghdr = &mut *buffer.msghdr.get(); + + msghdr.msg_iov = buffer.iovec.get(); + + if socket_is_ipv4 { + msghdr.msg_name = buffer.name_v4.get() as *mut libc::c_void; + msghdr.msg_namelen = core::mem::size_of::() as u32; + } else { + msghdr.msg_name = buffer.name_v6.get() as *mut libc::c_void; + msghdr.msg_namelen = core::mem::size_of::() as u32; + } + } + } Self { likely_next_free_index: 0, - network_address: config.network.address.ip(), - names_v4, - names_v6, + socket_is_ipv4, buffers, - iovecs, - msghdrs, - free: ::std::iter::repeat(true).take(capacity).collect(), - receiver_is_ipv4: ::std::iter::repeat(true).take(capacity).collect(), - response_types: ::std::iter::repeat(ResponseType::Connect) - .take(capacity) - .collect(), } } pub fn receiver_is_ipv4(&mut self, index: usize) -> bool { - self.receiver_is_ipv4[index] + self.buffers[index].receiver_is_ipv4 } pub fn response_type(&mut self, index: usize) -> ResponseType { - self.response_types[index] + self.buffers[index].response_type } pub fn mark_index_as_free(&mut self, index: usize) { - self.free[index] = true; + self.buffers[index].free = true; } /// Call after going through completion queue @@ -154,64 +150,76 @@ impl SendBuffers { ) -> Result { let index = self.next_free_index()?; - // Set receiver socket addr - if self.network_address.is_ipv4() { - let msg_name = self.names_v4.index_mut(index); - let addr = addr.get_ipv4().unwrap(); + let buffer = self.buffers.index_mut(index); - msg_name.sin_port = addr.port().to_be(); - msg_name.sin_addr.s_addr = if let IpAddr::V4(addr) = addr.ip() { - u32::from(addr).to_be() + // Set receiver socket addr + if self.socket_is_ipv4 { + buffer.receiver_is_ipv4 = true; + + let addr = if let Some(SocketAddr::V4(addr)) = addr.get_ipv4() { + addr } else { panic!("ipv6 address in ipv4 mode"); }; - self.receiver_is_ipv4[index] = true; - } else { - let msg_name = self.names_v6.index_mut(index); - let addr = addr.get_ipv6_mapped(); + unsafe { + let name = &mut *buffer.name_v4.get(); - msg_name.sin6_port = addr.port().to_be(); - msg_name.sin6_addr.s6_addr = if let IpAddr::V6(addr) = addr.ip() { - addr.octets() + name.sin_port = addr.port().to_be(); + name.sin_addr.s_addr = u32::from(*addr.ip()).to_be(); + } + } else { + buffer.receiver_is_ipv4 = addr.is_ipv4(); + + let addr = if let SocketAddr::V6(addr) = addr.get_ipv6_mapped() { + addr } else { panic!("ipv4 address when ipv6 or ipv6-mapped address expected"); }; - self.receiver_is_ipv4[index] = addr.is_ipv4(); + unsafe { + let name = &mut *buffer.name_v6.get(); + + name.sin6_port = addr.port().to_be(); + name.sin6_addr.s6_addr = addr.ip().octets(); + } } - let mut cursor = Cursor::new(self.buffers.index_mut(index).as_mut_slice()); + unsafe { + let bytes = (&mut *buffer.bytes.get()).as_mut_slice(); - match response.write(&mut cursor) { - Ok(()) => { - self.iovecs[index].iov_len = cursor.position() as usize; - self.response_types[index] = ResponseType::from_response(response); - self.free[index] = false; + let mut cursor = Cursor::new(bytes); - self.likely_next_free_index = index + 1; + match response.write(&mut cursor) { + Ok(()) => { + (&mut *buffer.iovec.get()).iov_len = cursor.position() as usize; - let sqe = SendMsg::new(SOCKET_IDENTIFIER, self.msghdrs.index_mut(index)) - .build() - .user_data(index as u64); + buffer.response_type = ResponseType::from_response(response); + buffer.free = false; - Ok(sqe) + self.likely_next_free_index = index + 1; + + let sqe = SendMsg::new(SOCKET_IDENTIFIER, buffer.msghdr.get()) + .build() + .user_data(index as u64); + + Ok(sqe) + } + Err(err) => Err(Error::SerializationFailed(err)), } - Err(err) => Err(Error::SerializationFailed(err)), } } fn next_free_index(&self) -> Result { - if self.likely_next_free_index >= self.free.len() { + if self.likely_next_free_index >= self.buffers.len() { return Err(Error::NoBuffers); } - for (i, free) in self.free[self.likely_next_free_index..] + for (i, buffer) in self.buffers[self.likely_next_free_index..] .iter() - .copied() .enumerate() { - if free { + if buffer.free { return Ok(self.likely_next_free_index + i); } } From fa93f38d82ddc28e55588e039c65922f2fbcc7e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Mar 2023 13:33:14 +0100 Subject: [PATCH 2/8] udp: uring: improve SendBuffers code --- .../src/workers/socket/uring/send_buffers.rs | 159 ++++++++++-------- 1 file changed, 90 insertions(+), 69 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/send_buffers.rs b/aquatic_udp/src/workers/socket/uring/send_buffers.rs index cd711f0..6a8d7dd 100644 --- a/aquatic_udp/src/workers/socket/uring/send_buffers.rs +++ b/aquatic_udp/src/workers/socket/uring/send_buffers.rs @@ -46,7 +46,7 @@ struct SendBuffer { } impl SendBuffer { - fn new() -> Self { + fn new_with_null_pointers() -> Self { Self { name_v4: UnsafeCell::new(libc::sockaddr_in { sin_family: libc::AF_INET as u16, @@ -80,6 +80,86 @@ impl SendBuffer { response_type: ResponseType::Connect, } } + + /// # Safety + /// + /// - SendBuffer must be stored at a fixed location in memory + unsafe fn setup_pointers(&mut self, socket_is_ipv4: bool) { + let iovec = &mut *self.iovec.get(); + + iovec.iov_base = self.bytes.get() as *mut libc::c_void; + iovec.iov_len = (&*self.bytes.get()).len(); + + let msghdr = &mut *self.msghdr.get(); + + msghdr.msg_iov = self.iovec.get(); + + if socket_is_ipv4 { + msghdr.msg_name = self.name_v4.get() as *mut libc::c_void; + msghdr.msg_namelen = core::mem::size_of::() as u32; + } else { + msghdr.msg_name = self.name_v6.get() as *mut libc::c_void; + msghdr.msg_namelen = core::mem::size_of::() as u32; + } + } + + /// # Safety + /// + /// - SendBuffer must be stored at a fixed location in memory + /// - SendBuffer.setup_pointers must have been called previously + unsafe fn prepare_entry( + &mut self, + response: &Response, + addr: CanonicalSocketAddr, + socket_is_ipv4: bool, + ) -> Result { + // Set receiver socket addr + if socket_is_ipv4 { + self.receiver_is_ipv4 = true; + + let addr = if let Some(SocketAddr::V4(addr)) = addr.get_ipv4() { + addr + } else { + panic!("ipv6 address in ipv4 mode"); + }; + + let name = &mut *self.name_v4.get(); + + name.sin_port = addr.port().to_be(); + name.sin_addr.s_addr = u32::from(*addr.ip()).to_be(); + } else { + self.receiver_is_ipv4 = addr.is_ipv4(); + + let addr = if let SocketAddr::V6(addr) = addr.get_ipv6_mapped() { + addr + } else { + panic!("ipv4 address when ipv6 or ipv6-mapped address expected"); + }; + + let name = &mut *self.name_v6.get(); + + name.sin6_port = addr.port().to_be(); + name.sin6_addr.s6_addr = addr.ip().octets(); + } + + let bytes = (&mut *self.bytes.get()).as_mut_slice(); + + let mut cursor = Cursor::new(bytes); + + match response.write(&mut cursor) { + Ok(()) => { + (&mut *self.iovec.get()).iov_len = cursor.position() as usize; + + self.response_type = ResponseType::from_response(response); + self.free = false; + + let sqe = SendMsg::new(SOCKET_IDENTIFIER, self.msghdr.get()).build(); + + Ok(sqe) + } + Err(err) => Err(Error::SerializationFailed(err)), + } + } } pub struct SendBuffers { @@ -92,30 +172,15 @@ impl SendBuffers { pub fn new(config: &Config, capacity: usize) -> Self { let socket_is_ipv4 = config.network.address.is_ipv4(); - let mut buffers = ::std::iter::repeat_with(|| SendBuffer::new()) + let mut buffers = ::std::iter::repeat_with(|| SendBuffer::new_with_null_pointers()) .take(capacity) .collect::>() .into_boxed_slice(); for buffer in buffers.iter_mut() { + // Safety: OK because buffers are stored in fixed memory location unsafe { - let iovec = &mut *buffer.iovec.get(); - - iovec.iov_base = buffer.bytes.get() as *mut libc::c_void; - iovec.iov_len = (&*buffer.bytes.get()).len(); - } - unsafe { - let msghdr = &mut *buffer.msghdr.get(); - - msghdr.msg_iov = buffer.iovec.get(); - - if socket_is_ipv4 { - msghdr.msg_name = buffer.name_v4.get() as *mut libc::c_void; - msghdr.msg_namelen = core::mem::size_of::() as u32; - } else { - msghdr.msg_name = buffer.name_v6.get() as *mut libc::c_void; - msghdr.msg_namelen = core::mem::size_of::() as u32; - } + buffer.setup_pointers(socket_is_ipv4); } } @@ -152,60 +217,16 @@ impl SendBuffers { let buffer = self.buffers.index_mut(index); - // Set receiver socket addr - if self.socket_is_ipv4 { - buffer.receiver_is_ipv4 = true; - - let addr = if let Some(SocketAddr::V4(addr)) = addr.get_ipv4() { - addr - } else { - panic!("ipv6 address in ipv4 mode"); - }; - - unsafe { - let name = &mut *buffer.name_v4.get(); - - name.sin_port = addr.port().to_be(); - name.sin_addr.s_addr = u32::from(*addr.ip()).to_be(); - } - } else { - buffer.receiver_is_ipv4 = addr.is_ipv4(); - - let addr = if let SocketAddr::V6(addr) = addr.get_ipv6_mapped() { - addr - } else { - panic!("ipv4 address when ipv6 or ipv6-mapped address expected"); - }; - - unsafe { - let name = &mut *buffer.name_v6.get(); - - name.sin6_port = addr.port().to_be(); - name.sin6_addr.s6_addr = addr.ip().octets(); - } - } - + // Safety: OK because buffers are stored in fixed memory location + // and buffer pointers were set up in SendBuffers::new() unsafe { - let bytes = (&mut *buffer.bytes.get()).as_mut_slice(); - - let mut cursor = Cursor::new(bytes); - - match response.write(&mut cursor) { - Ok(()) => { - (&mut *buffer.iovec.get()).iov_len = cursor.position() as usize; - - buffer.response_type = ResponseType::from_response(response); - buffer.free = false; - + match buffer.prepare_entry(response, addr, self.socket_is_ipv4) { + Ok(entry) => { self.likely_next_free_index = index + 1; - let sqe = SendMsg::new(SOCKET_IDENTIFIER, buffer.msghdr.get()) - .build() - .user_data(index as u64); - - Ok(sqe) + Ok(entry.user_data(index as u64)) } - Err(err) => Err(Error::SerializationFailed(err)), + Err(err) => Err(err), } } } From aa2c36a373c0b8072eb7d617aa0a4964633bb0ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Mar 2023 13:43:11 +0100 Subject: [PATCH 3/8] udp: uring: rewrite RecvHelper to use UnsafeCell --- .../src/workers/socket/uring/recv_helper.rs | 43 ++++++++++--------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/recv_helper.rs b/aquatic_udp/src/workers/socket/uring/recv_helper.rs index 920cb2f..96fd551 100644 --- a/aquatic_udp/src/workers/socket/uring/recv_helper.rs +++ b/aquatic_udp/src/workers/socket/uring/recv_helper.rs @@ -1,4 +1,5 @@ use std::{ + cell::UnsafeCell, net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, ptr::null_mut, }; @@ -15,49 +16,49 @@ pub struct RecvHelper { network_address: IpAddr, max_scrape_torrents: u8, #[allow(dead_code)] - name_v4: Box, - msghdr_v4: Box, + name_v4: Box>, + msghdr_v4: Box>, #[allow(dead_code)] - name_v6: Box, - msghdr_v6: Box, + name_v6: Box>, + msghdr_v6: Box>, } impl RecvHelper { pub fn new(config: &Config) -> Self { - let mut name_v4 = Box::new(libc::sockaddr_in { + let name_v4 = Box::new(UnsafeCell::new(libc::sockaddr_in { sin_family: 0, sin_port: 0, sin_addr: libc::in_addr { s_addr: 0 }, sin_zero: [0; 8], - }); + })); - let msghdr_v4 = Box::new(libc::msghdr { - msg_name: &mut name_v4 as *mut _ as *mut libc::c_void, + let msghdr_v4 = Box::new(UnsafeCell::new(libc::msghdr { + msg_name: name_v4.get() as *mut libc::c_void, msg_namelen: core::mem::size_of::() as u32, msg_iov: null_mut(), msg_iovlen: 0, msg_control: null_mut(), msg_controllen: 0, msg_flags: 0, - }); + })); - let mut name_v6 = Box::new(libc::sockaddr_in6 { + let name_v6 = Box::new(UnsafeCell::new(libc::sockaddr_in6 { sin6_family: 0, sin6_port: 0, sin6_flowinfo: 0, sin6_addr: libc::in6_addr { s6_addr: [0; 16] }, sin6_scope_id: 0, - }); + })); - let msghdr_v6 = Box::new(libc::msghdr { - msg_name: &mut name_v6 as *mut _ as *mut libc::c_void, + let msghdr_v6 = Box::new(UnsafeCell::new(libc::msghdr { + msg_name: name_v6.get() as *mut libc::c_void, msg_namelen: core::mem::size_of::() as u32, msg_iov: null_mut(), msg_iovlen: 0, msg_control: null_mut(), msg_controllen: 0, msg_flags: 0, - }); + })); Self { network_address: config.network.address.ip(), @@ -71,9 +72,9 @@ impl RecvHelper { pub fn create_entry(&self, buf_group: u16) -> io_uring::squeue::Entry { let msghdr: *const libc::msghdr = if self.network_address.is_ipv4() { - &*self.msghdr_v4 + self.msghdr_v4.get() } else { - &*self.msghdr_v6 + self.msghdr_v6.get() }; RecvMsgMulti::new(SOCKET_IDENTIFIER, msghdr, buf_group) @@ -85,10 +86,12 @@ impl RecvHelper { &self, buffer: &[u8], ) -> (Result, CanonicalSocketAddr) { - let msghdr = if self.network_address.is_ipv4() { - &self.msghdr_v4 - } else { - &self.msghdr_v6 + let msghdr = unsafe { + if self.network_address.is_ipv4() { + &*(self.msghdr_v4.get() as *const _) + } else { + &*(self.msghdr_v6.get() as *const _) + } }; let msg = RecvMsgOut::parse(buffer, msghdr).unwrap(); From 9bb69627c88676a6dc3af2effb3abf6d7a12b1a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Mar 2023 13:54:22 +0100 Subject: [PATCH 4/8] udp: uring: branch less in RecvHelper::parse --- .../src/workers/socket/uring/recv_helper.rs | 40 +++++++++++-------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/recv_helper.rs b/aquatic_udp/src/workers/socket/uring/recv_helper.rs index 96fd551..808c2f9 100644 --- a/aquatic_udp/src/workers/socket/uring/recv_helper.rs +++ b/aquatic_udp/src/workers/socket/uring/recv_helper.rs @@ -1,6 +1,6 @@ use std::{ cell::UnsafeCell, - net::{IpAddr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, + net::{Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, ptr::null_mut, }; @@ -13,7 +13,7 @@ use crate::config::Config; use super::{SOCKET_IDENTIFIER, USER_DATA_RECV}; pub struct RecvHelper { - network_address: IpAddr, + socket_is_ipv4: bool, max_scrape_torrents: u8, #[allow(dead_code)] name_v4: Box>, @@ -61,7 +61,7 @@ impl RecvHelper { })); Self { - network_address: config.network.address.ip(), + socket_is_ipv4: config.network.address.is_ipv4(), max_scrape_torrents: config.protocol.max_scrape_torrents, name_v4, msghdr_v4, @@ -71,7 +71,7 @@ impl RecvHelper { } pub fn create_entry(&self, buf_group: u16) -> io_uring::squeue::Entry { - let msghdr: *const libc::msghdr = if self.network_address.is_ipv4() { + let msghdr: *const libc::msghdr = if self.socket_is_ipv4 { self.msghdr_v4.get() } else { self.msghdr_v6.get() @@ -86,25 +86,31 @@ impl RecvHelper { &self, buffer: &[u8], ) -> (Result, CanonicalSocketAddr) { - let msghdr = unsafe { - if self.network_address.is_ipv4() { - &*(self.msghdr_v4.get() as *const _) - } else { - &*(self.msghdr_v6.get() as *const _) - } - }; + let (msg, addr) = if self.socket_is_ipv4 { + let msg = unsafe { + let msghdr = &*(self.msghdr_v4.get() as *const _); - let msg = RecvMsgOut::parse(buffer, msghdr).unwrap(); + RecvMsgOut::parse(buffer, msghdr).unwrap() + }; - let addr = unsafe { - if self.network_address.is_ipv4() { + let addr = unsafe { let name_data = *(msg.name_data().as_ptr() as *const libc::sockaddr_in); SocketAddr::V4(SocketAddrV4::new( u32::from_be(name_data.sin_addr.s_addr).into(), u16::from_be(name_data.sin_port), )) - } else { + }; + + (msg, addr) + } else { + let msg = unsafe { + let msghdr = &*(self.msghdr_v6.get() as *const _); + + RecvMsgOut::parse(buffer, msghdr).unwrap() + }; + + let addr = unsafe { let name_data = *(msg.name_data().as_ptr() as *const libc::sockaddr_in6); SocketAddr::V6(SocketAddrV6::new( @@ -113,7 +119,9 @@ impl RecvHelper { u32::from_be(name_data.sin6_flowinfo), u32::from_be(name_data.sin6_scope_id), )) - } + }; + + (msg, addr) }; ( From d862da2aef35f00af32eb5149bd7ff6845f09506 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Mar 2023 14:14:15 +0100 Subject: [PATCH 5/8] udp: uring: add recv_helper::Error type --- aquatic_udp/src/workers/socket/uring/mod.rs | 81 ++++++++++--------- .../src/workers/socket/uring/recv_helper.rs | 33 +++++--- 2 files changed, 68 insertions(+), 46 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/mod.rs b/aquatic_udp/src/workers/socket/uring/mod.rs index b68ddc6..16df167 100644 --- a/aquatic_udp/src/workers/socket/uring/mod.rs +++ b/aquatic_udp/src/workers/socket/uring/mod.rs @@ -361,51 +361,60 @@ impl SocketWorker { let buffer = buffer.as_slice(); - let (res_request, addr) = self.recv_helper.parse(buffer); + let addr = match self.recv_helper.parse(buffer) { + Ok((request, addr)) => { + self.handle_request(pending_scrape_valid_until, request, addr); - match res_request { - Ok(request) => self.handle_request(pending_scrape_valid_until, request, addr), - Err(RequestParseError::Sendable { - connection_id, - transaction_id, - err, - }) => { - ::log::debug!("Couldn't parse request from {:?}: {}", addr, err); - - if self.validator.connection_id_valid(addr, connection_id) { - let response = ErrorResponse { + addr + } + Err(self::recv_helper::Error::RequestParseError(err, addr)) => { + match err { + RequestParseError::Sendable { + connection_id, transaction_id, - message: err.right_or("Parse error").into(), - }; + err, + } => { + ::log::debug!("Couldn't parse request from {:?}: {}", addr, err); - self.local_responses.push_back((response.into(), addr)); + if self.validator.connection_id_valid(addr, connection_id) { + let response = ErrorResponse { + transaction_id, + message: err.right_or("Parse error").into(), + }; + + self.local_responses.push_back((response.into(), addr)); + } + } + RequestParseError::Unsendable { err } => { + ::log::debug!("Couldn't parse request from {:?}: {}", addr, err); + } } + + addr } - Err(RequestParseError::Unsendable { err }) => { - ::log::debug!("Couldn't parse request from {:?}: {}", addr, err); + Err(self::recv_helper::Error::InvalidSocketAddress) => { + ::log::debug!("Ignored request claiming to be from port 0"); + + return; } - } + Err(self::recv_helper::Error::RecvMsgParseError) => { + ::log::error!("RecvMsgOut::parse failed"); + + return; + } + }; if self.config.statistics.active() { - if addr.is_ipv4() { - self.shared_state - .statistics_ipv4 - .bytes_received - .fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed); - self.shared_state - .statistics_ipv4 - .requests_received - .fetch_add(1, Ordering::Relaxed); + let (statistics, extra_bytes) = if addr.is_ipv4() { + (&self.shared_state.statistics_ipv4, EXTRA_PACKET_SIZE_IPV4) } else { - self.shared_state - .statistics_ipv6 - .bytes_received - .fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed); - self.shared_state - .statistics_ipv6 - .requests_received - .fetch_add(1, Ordering::Relaxed); - } + (&self.shared_state.statistics_ipv6, EXTRA_PACKET_SIZE_IPV6) + }; + + statistics + .bytes_received + .fetch_add(buffer.len() + extra_bytes, Ordering::Relaxed); + statistics.requests_received.fetch_add(1, Ordering::Relaxed); } } diff --git a/aquatic_udp/src/workers/socket/uring/recv_helper.rs b/aquatic_udp/src/workers/socket/uring/recv_helper.rs index 808c2f9..f87e208 100644 --- a/aquatic_udp/src/workers/socket/uring/recv_helper.rs +++ b/aquatic_udp/src/workers/socket/uring/recv_helper.rs @@ -12,6 +12,12 @@ use crate::config::Config; use super::{SOCKET_IDENTIFIER, USER_DATA_RECV}; +pub enum Error { + RecvMsgParseError, + RequestParseError(RequestParseError, CanonicalSocketAddr), + InvalidSocketAddress, +} + pub struct RecvHelper { socket_is_ipv4: bool, max_scrape_torrents: u8, @@ -82,15 +88,12 @@ impl RecvHelper { .user_data(USER_DATA_RECV) } - pub fn parse( - &self, - buffer: &[u8], - ) -> (Result, CanonicalSocketAddr) { + pub fn parse(&self, buffer: &[u8]) -> Result<(Request, CanonicalSocketAddr), Error> { let (msg, addr) = if self.socket_is_ipv4 { let msg = unsafe { let msghdr = &*(self.msghdr_v4.get() as *const _); - RecvMsgOut::parse(buffer, msghdr).unwrap() + RecvMsgOut::parse(buffer, msghdr).map_err(|_| Error::RecvMsgParseError)? }; let addr = unsafe { @@ -102,12 +105,16 @@ impl RecvHelper { )) }; + if addr.port() == 0 { + return Err(Error::InvalidSocketAddress); + } + (msg, addr) } else { let msg = unsafe { let msghdr = &*(self.msghdr_v6.get() as *const _); - RecvMsgOut::parse(buffer, msghdr).unwrap() + RecvMsgOut::parse(buffer, msghdr).map_err(|_| Error::RecvMsgParseError)? }; let addr = unsafe { @@ -121,12 +128,18 @@ impl RecvHelper { )) }; + if addr.port() == 0 { + return Err(Error::InvalidSocketAddress); + } + (msg, addr) }; - ( - Request::from_bytes(msg.payload_data(), self.max_scrape_torrents), - CanonicalSocketAddr::new(addr), - ) + let addr = CanonicalSocketAddr::new(addr); + + let request = Request::from_bytes(msg.payload_data(), self.max_scrape_torrents) + .map_err(|err| Error::RequestParseError(err, addr))?; + + Ok((request, addr)) } } From 1be6d4fa61369a6dcae87740c4b3cca385a86143 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Mar 2023 14:23:59 +0100 Subject: [PATCH 6/8] udp: uring: improve log messages --- aquatic_udp/src/workers/socket/uring/mod.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/mod.rs b/aquatic_udp/src/workers/socket/uring/mod.rs index 16df167..faf34cd 100644 --- a/aquatic_udp/src/workers/socket/uring/mod.rs +++ b/aquatic_udp/src/workers/socket/uring/mod.rs @@ -196,7 +196,7 @@ impl SocketWorker { break; } Err(send_buffers::Error::SerializationFailed(err)) => { - ::log::error!("write response to buffer: {:#}", err); + ::log::error!("Failed serializing response: {:#}", err); } } } else { @@ -245,7 +245,7 @@ impl SocketWorker { break; } Err(send_buffers::Error::SerializationFailed(err)) => { - ::log::error!("write response to buffer: {:#}", err); + ::log::error!("Failed serializing response: {:#}", err); } } } @@ -291,7 +291,7 @@ impl SocketWorker { if result < 0 { ::log::error!( - "send: {:#}", + "Couldn't send response: {:#}", ::std::io::Error::from_raw_os_error(-result) ); } else if self.config.statistics.active() { @@ -337,8 +337,14 @@ impl SocketWorker { let result = cqe.result(); if result < 0 { - // Will produce ENOBUFS if there were no free buffers - ::log::warn!("recv: {:#}", ::std::io::Error::from_raw_os_error(-result)); + if -result == libc::ENOBUFS { + ::log::warn!("recv failed due to lack of buffers, try increasing ring size"); + } else { + ::log::warn!( + "recv failed: {:#}", + ::std::io::Error::from_raw_os_error(-result) + ); + } return; } @@ -347,12 +353,12 @@ impl SocketWorker { match self.buf_ring.get_buf(result as u32, cqe.flags()) { Ok(Some(buffer)) => buffer, Ok(None) => { - ::log::error!("Couldn't get buffer"); + ::log::error!("Couldn't get recv buffer"); return; } Err(err) => { - ::log::error!("Couldn't get buffer: {:#}", err); + ::log::error!("Couldn't get recv buffer: {:#}", err); return; } From aae19c4cb3895749d6a97f255b2ef477b7e0db43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Mar 2023 14:40:29 +0100 Subject: [PATCH 7/8] udp: uring: combine SendBuffers metadata lookups --- aquatic_udp/src/workers/socket/uring/mod.rs | 27 ++++++++++--------- .../src/workers/socket/uring/send_buffers.rs | 8 +++--- 2 files changed, 17 insertions(+), 18 deletions(-) diff --git a/aquatic_udp/src/workers/socket/uring/mod.rs b/aquatic_udp/src/workers/socket/uring/mod.rs index faf34cd..7adf062 100644 --- a/aquatic_udp/src/workers/socket/uring/mod.rs +++ b/aquatic_udp/src/workers/socket/uring/mod.rs @@ -297,24 +297,25 @@ impl SocketWorker { } else if self.config.statistics.active() { let send_buffer_index = send_buffer_index as usize; - let (statistics, extra_bytes) = - if self.send_buffers.receiver_is_ipv4(send_buffer_index) { - (&self.shared_state.statistics_ipv4, EXTRA_PACKET_SIZE_IPV4) - } else { - (&self.shared_state.statistics_ipv6, EXTRA_PACKET_SIZE_IPV6) - }; + let (response_type, receiver_is_ipv4) = + self.send_buffers.response_type_and_ipv4(send_buffer_index); + + let (statistics, extra_bytes) = if receiver_is_ipv4 { + (&self.shared_state.statistics_ipv4, EXTRA_PACKET_SIZE_IPV4) + } else { + (&self.shared_state.statistics_ipv6, EXTRA_PACKET_SIZE_IPV6) + }; statistics .bytes_sent .fetch_add(result as usize + extra_bytes, Ordering::Relaxed); - let response_counter = - match self.send_buffers.response_type(send_buffer_index) { - ResponseType::Connect => &statistics.responses_sent_connect, - ResponseType::Announce => &statistics.responses_sent_announce, - ResponseType::Scrape => &statistics.responses_sent_scrape, - ResponseType::Error => &statistics.responses_sent_error, - }; + let response_counter = match response_type { + ResponseType::Connect => &statistics.responses_sent_connect, + ResponseType::Announce => &statistics.responses_sent_announce, + ResponseType::Scrape => &statistics.responses_sent_scrape, + ResponseType::Error => &statistics.responses_sent_error, + }; response_counter.fetch_add(1, Ordering::Relaxed); } diff --git a/aquatic_udp/src/workers/socket/uring/send_buffers.rs b/aquatic_udp/src/workers/socket/uring/send_buffers.rs index 6a8d7dd..38fe6c5 100644 --- a/aquatic_udp/src/workers/socket/uring/send_buffers.rs +++ b/aquatic_udp/src/workers/socket/uring/send_buffers.rs @@ -191,12 +191,10 @@ impl SendBuffers { } } - pub fn receiver_is_ipv4(&mut self, index: usize) -> bool { - self.buffers[index].receiver_is_ipv4 - } + pub fn response_type_and_ipv4(&self, index: usize) -> (ResponseType, bool) { + let buffer = self.buffers.get(index).unwrap(); - pub fn response_type(&mut self, index: usize) -> ResponseType { - self.buffers[index].response_type + (buffer.response_type, buffer.receiver_is_ipv4) } pub fn mark_index_as_free(&mut self, index: usize) { From cff6b0cce1ff6012bc4c8a60c70d9053513b6ed3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Wed, 8 Mar 2023 14:46:34 +0100 Subject: [PATCH 8/8] Update TODO --- TODO.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/TODO.md b/TODO.md index 2e3608b..e75a256 100644 --- a/TODO.md +++ b/TODO.md @@ -3,6 +3,8 @@ ## High priority * udp uring + * should queues be synced? + * miri * uneven performance? * thiserror? * CI