udp: uring: clean up, improve error handling

This commit is contained in:
Joakim Frostegård 2021-11-14 02:57:19 +01:00
parent 5a34bd4b81
commit a665b38536

View file

@ -119,11 +119,6 @@ pub fn run_socket_worker(
let mut access_list_cache = create_access_list_cache(&state.access_list);
let mut local_responses: Vec<(Response, SocketAddr)> = Vec::new();
let cleaning_duration = Duration::from_secs(config.cleaning.connection_cleaning_interval);
let mut iter_counter = 0usize;
let mut last_cleaning = Instant::now();
let mut buffers: Vec<[u8; MAX_PACKET_SIZE]> =
(0..NUM_BUFFERS).map(|_| [0; MAX_PACKET_SIZE]).collect();
@ -195,6 +190,11 @@ pub fn run_socket_worker(
let fd = Fixed(0);
let cleaning_duration = Duration::from_secs(config.cleaning.connection_cleaning_interval);
let mut iter_counter = 0usize;
let mut last_cleaning = Instant::now();
loop {
while let Some(entry) = cq.next() {
let user_data: UserData = entry.user_data().into();
@ -265,7 +265,7 @@ pub fn run_socket_worker(
send_entries.remove(slab_key);
if entry.result() < 0 {
::log::info!(
::log::error!(
"sendmsg error: {:#}",
::std::io::Error::from_raw_os_error(-entry.result())
);
@ -376,103 +376,6 @@ pub fn run_socket_worker(
}
}
fn queue_response(
config: &Config,
sq: &mut SubmissionQueue,
fd: Fixed,
send_events: &mut Slab<()>,
buffers: &mut [[u8; MAX_PACKET_SIZE]],
iovs: &mut [iovec],
sockaddrs_ipv4: &mut [sockaddr_in],
sockaddrs_ipv6: &mut [sockaddr_in6],
msghdrs: &mut [msghdr],
response: Response,
addr: SocketAddr,
) {
let slab_key = send_events.insert(());
let user_data = UserData::SendMsg { slab_key };
let buffer_index = user_data.get_buffer_index();
let mut cursor = Cursor::new(&mut buffers[buffer_index][..]);
match response.write(&mut cursor, ip_version_from_ip(addr.ip())) {
Ok(()) => {
iovs[buffer_index].iov_len = cursor.position() as usize;
if config.network.address.is_ipv4() {
let addr = if let SocketAddr::V4(addr) = addr {
addr
} else {
unreachable!();
};
sockaddrs_ipv4[buffer_index].sin_addr.s_addr = u32::to_be((*addr.ip()).into());
sockaddrs_ipv4[buffer_index].sin_port = u16::to_be(addr.port());
} else {
let mut octets = match addr {
SocketAddr::V4(addr) => addr.ip().to_ipv6_mapped().octets(),
SocketAddr::V6(addr) => addr.ip().octets(),
};
for byte in octets.iter_mut() {
*byte = byte.to_be();
}
sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr = octets;
sockaddrs_ipv6[buffer_index].sin6_port = u16::to_be(addr.port());
}
}
Err(err) => {
::log::error!("Response::write error: {:?}", err);
}
}
let msghdr_ptr: *mut msghdr = &mut msghdrs[buffer_index];
let entry = io_uring::opcode::SendMsg::new(fd, msghdr_ptr)
.build()
.user_data(user_data.into());
unsafe {
sq.push(&entry).unwrap();
}
}
fn create_socket(config: &Config) -> ::std::net::UdpSocket {
let socket = if config.network.address.is_ipv4() {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
} else {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
}
.expect("create socket");
socket.set_reuse_port(true).expect("socket: set reuse port");
socket
.set_nonblocking(true)
.expect("socket: set nonblocking");
socket
.bind(&config.network.address.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err));
let recv_buffer_size = config.network.socket_recv_buffer_size;
if recv_buffer_size != 0 {
if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) {
::log::error!(
"socket: failed setting recv buffer to {}: {:?}",
recv_buffer_size,
err
);
}
}
socket.into()
}
#[inline]
fn handle_request(
config: &Config,
state: &State,
@ -555,6 +458,106 @@ fn handle_request(
}
}
fn queue_response(
config: &Config,
sq: &mut SubmissionQueue,
fd: Fixed,
send_entries: &mut Slab<()>,
buffers: &mut [[u8; MAX_PACKET_SIZE]],
iovs: &mut [iovec],
sockaddrs_ipv4: &mut [sockaddr_in],
sockaddrs_ipv6: &mut [sockaddr_in6],
msghdrs: &mut [msghdr],
response: Response,
addr: SocketAddr,
) {
let slab_key = send_entries.insert(());
let user_data = UserData::SendMsg { slab_key };
let buffer_index = user_data.get_buffer_index();
let mut cursor = Cursor::new(&mut buffers[buffer_index][..]);
match response.write(&mut cursor, ip_version_from_ip(addr.ip())) {
Ok(()) => {
iovs[buffer_index].iov_len = cursor.position() as usize;
if config.network.address.is_ipv4() {
let addr = if let SocketAddr::V4(addr) = addr {
addr
} else {
unreachable!();
};
sockaddrs_ipv4[buffer_index].sin_addr.s_addr = u32::to_be((*addr.ip()).into());
sockaddrs_ipv4[buffer_index].sin_port = u16::to_be(addr.port());
} else {
let mut octets = match addr {
SocketAddr::V4(addr) => addr.ip().to_ipv6_mapped().octets(),
SocketAddr::V6(addr) => addr.ip().octets(),
};
for byte in octets.iter_mut() {
*byte = byte.to_be();
}
sockaddrs_ipv6[buffer_index].sin6_addr.s6_addr = octets;
sockaddrs_ipv6[buffer_index].sin6_port = u16::to_be(addr.port());
}
}
Err(err) => {
::log::error!("Response::write error: {:?}", err);
send_entries.remove(slab_key);
return;
}
}
let msghdr_ptr: *mut msghdr = &mut msghdrs[buffer_index];
let entry = io_uring::opcode::SendMsg::new(fd, msghdr_ptr)
.build()
.user_data(user_data.into());
unsafe {
sq.push(&entry).unwrap();
}
}
fn create_socket(config: &Config) -> ::std::net::UdpSocket {
let socket = if config.network.address.is_ipv4() {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
} else {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
}
.expect("create socket");
socket.set_reuse_port(true).expect("socket: set reuse port");
socket
.set_nonblocking(true)
.expect("socket: set nonblocking");
socket
.bind(&config.network.address.into())
.unwrap_or_else(|err| panic!("socket: bind to {}: {:?}", config.network.address, err));
let recv_buffer_size = config.network.socket_recv_buffer_size;
if recv_buffer_size != 0 {
if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size) {
::log::error!(
"socket: failed setting recv buffer to {}: {:?}",
recv_buffer_size,
err
);
}
}
socket.into()
}
fn ip_version_from_ip(ip: IpAddr) -> IpVersion {
match ip {
IpAddr::V4(_) => IpVersion::IPv4,