diff --git a/aquatic_udp/src/lib/other/network_uring.rs b/aquatic_udp/src/lib/other/network_uring.rs index cae1923..520fff9 100644 --- a/aquatic_udp/src/lib/other/network_uring.rs +++ b/aquatic_udp/src/lib/other/network_uring.rs @@ -119,11 +119,6 @@ pub fn run_socket_worker( let mut access_list_cache = create_access_list_cache(&state.access_list); let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new(); - let cleaning_duration = Duration::from_secs(config.cleaning.connection_cleaning_interval); - - let mut iter_counter = 0usize; - let mut last_cleaning = Instant::now(); - let mut buffers: Vec<[u8; MAX_PACKET_SIZE]> = (0..NUM_BUFFERS).map(|_| [0; MAX_PACKET_SIZE]).collect(); @@ -195,6 +190,11 @@ pub fn run_socket_worker( let fd = Fixed(0); + let cleaning_duration = Duration::from_secs(config.cleaning.connection_cleaning_interval); + + let mut iter_counter = 0usize; + let mut last_cleaning = Instant::now(); + loop { while let Some(entry) = cq.next() { let user_data: UserData = entry.user_data().into(); @@ -265,7 +265,7 @@ pub fn run_socket_worker( send_entries.remove(slab_key); if entry.result() < 0 { - ::log::info!( + ::log::error!( "sendmsg error: {:#}", ::std::io::Error::from_raw_os_error(-entry.result()) ); @@ -376,103 +376,6 @@ pub fn run_socket_worker( } } -fn queue_response( - config: &Config, - sq: &mut SubmissionQueue, - fd: Fixed, - send_events: &mut Slab<()>, - buffers: &mut [[u8; MAX_PACKET_SIZE]], - iovs: &mut [iovec], - sockaddrs_ipv4: &mut [sockaddr_in], - sockaddrs_ipv6: &mut [sockaddr_in6], - msghdrs: &mut [msghdr], - response: Response, - addr: SocketAddr, -) { - let slab_key = send_events.insert(()); - let user_data = UserData::SendMsg { slab_key }; - - let buffer_index = user_data.get_buffer_index(); - - let mut cursor = Cursor::new(&mut buffers[buffer_index][..]); - - match response.write(&mut cursor, ip_version_from_ip(addr.ip())) { - Ok(()) => { - iovs[buffer_index].iov_len = cursor.position() as usize; - - if config.network.address.is_ipv4() { - let addr = if let SocketAddr::V4(addr) = addr { - addr - } else { - unreachable!(); - }; - - sockaddrs_ipv4[buffer_index].sin_addr.s_addr = u32::to_be((*addr.ip()).into()); - sockaddrs_ipv4[buffer_index].sin_port = u16::to_be(addr.port()); - } else { - let mut octets = match addr { - SocketAddr::V4(addr) => addr.ip().to_ipv6_mapped().octets(), - SocketAddr::V6(addr) => addr.ip().octets(), - }; - - for byte in octets.iter_mut() { - *byte = byte.to_be(); - } - - sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr = octets; - sockaddrs_ipv6[buffer_index].sin6_port = u16::to_be(addr.port()); - } - } - Err(err) => { - ::log::error!("Response::write error: {:?}", err); - } - } - - let msghdr_ptr: *mut msghdr = &mut msghdrs[buffer_index]; - - let entry = io_uring::opcode::SendMsg::new(fd, msghdr_ptr) - .build() - .user_data(user_data.into()); - - unsafe { - sq.push(&entry).unwrap(); - } -} - -fn create_socket(config: &Config) -> ::std::net::UdpSocket { - let socket = if config.network.address.is_ipv4() { - Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) - } else { - Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) - } - .expect("create socket"); - - socket.set_reuse_port(true).expect("socket: set reuse port"); - - socket - .set_nonblocking(true) - .expect("socket: set nonblocking"); - - socket - .bind(&config.network.address.into()) - .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); - - let recv_buffer_size = config.network.socket_recv_buffer_size; - - if recv_buffer_size != 0 { - if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) { - ::log::error!( - "socket: failed setting recv buffer to {}: {:?}", - recv_buffer_size, - err - ); - } - } - - socket.into() -} - -#[inline] fn handle_request( config: &Config, state: &State, @@ -555,6 +458,106 @@ fn handle_request( } } +fn queue_response( + config: &Config, + sq: &mut SubmissionQueue, + fd: Fixed, + send_entries: &mut Slab<()>, + buffers: &mut [[u8; MAX_PACKET_SIZE]], + iovs: &mut [iovec], + sockaddrs_ipv4: &mut [sockaddr_in], + sockaddrs_ipv6: &mut [sockaddr_in6], + msghdrs: &mut [msghdr], + response: Response, + addr: SocketAddr, +) { + let slab_key = send_entries.insert(()); + let user_data = UserData::SendMsg { slab_key }; + + let buffer_index = user_data.get_buffer_index(); + + let mut cursor = Cursor::new(&mut buffers[buffer_index][..]); + + match response.write(&mut cursor, ip_version_from_ip(addr.ip())) { + Ok(()) => { + iovs[buffer_index].iov_len = cursor.position() as usize; + + if config.network.address.is_ipv4() { + let addr = if let SocketAddr::V4(addr) = addr { + addr + } else { + unreachable!(); + }; + + sockaddrs_ipv4[buffer_index].sin_addr.s_addr = u32::to_be((*addr.ip()).into()); + sockaddrs_ipv4[buffer_index].sin_port = u16::to_be(addr.port()); + } else { + let mut octets = match addr { + SocketAddr::V4(addr) => addr.ip().to_ipv6_mapped().octets(), + SocketAddr::V6(addr) => addr.ip().octets(), + }; + + for byte in octets.iter_mut() { + *byte = byte.to_be(); + } + + sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr = octets; + sockaddrs_ipv6[buffer_index].sin6_port = u16::to_be(addr.port()); + } + } + Err(err) => { + ::log::error!("Response::write error: {:?}", err); + + send_entries.remove(slab_key); + + return; + } + } + + let msghdr_ptr: *mut msghdr = &mut msghdrs[buffer_index]; + + let entry = io_uring::opcode::SendMsg::new(fd, msghdr_ptr) + .build() + .user_data(user_data.into()); + + unsafe { + sq.push(&entry).unwrap(); + } +} + +fn create_socket(config: &Config) -> ::std::net::UdpSocket { + let socket = if config.network.address.is_ipv4() { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } + .expect("create socket"); + + socket.set_reuse_port(true).expect("socket: set reuse port"); + + socket + .set_nonblocking(true) + .expect("socket: set nonblocking"); + + socket + .bind(&config.network.address.into()) + .unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err)); + + let recv_buffer_size = config.network.socket_recv_buffer_size; + + if recv_buffer_size != 0 { + if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) { + ::log::error!( + "socket: failed setting recv buffer to {}: {:?}", + recv_buffer_size, + err + ); + } + } + + socket.into() +} + fn ip_version_from_ip(ip: IpAddr) -> IpVersion { match ip { IpAddr::V4(_) => IpVersion::IPv4,