mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-03-31 17:55:36 +00:00
udp: improve udp uring code
This commit is contained in:
parent
1248c945a9
commit
358c8951c0
2 changed files with 57 additions and 43 deletions
|
|
@ -85,7 +85,7 @@ pub struct SocketWorker {
|
|||
buf_ring: BufRing,
|
||||
send_buffers: SendBuffers,
|
||||
recv_helper: RecvHelper,
|
||||
local_responses: VecDeque<(Response, CanonicalSocketAddr)>,
|
||||
local_responses: VecDeque<(CanonicalSocketAddr, Response)>,
|
||||
resubmittable_sqe_buf: Vec<io_uring::squeue::Entry>,
|
||||
recv_sqe: io_uring::squeue::Entry,
|
||||
pulse_timeout_sqe: io_uring::squeue::Entry,
|
||||
|
|
@ -192,7 +192,7 @@ impl SocketWorker {
|
|||
|
||||
// Enqueue local responses
|
||||
for _ in 0..sq_space {
|
||||
if let Some((response, addr)) = self.local_responses.pop_front() {
|
||||
if let Some((addr, response)) = self.local_responses.pop_front() {
|
||||
match self.send_buffers.prepare_entry(response, addr) {
|
||||
Ok(entry) => {
|
||||
unsafe { ring.submission().push(&entry).unwrap() };
|
||||
|
|
@ -200,7 +200,7 @@ impl SocketWorker {
|
|||
num_send_added += 1;
|
||||
}
|
||||
Err(send_buffers::Error::NoBuffers(response)) => {
|
||||
self.local_responses.push_front((response, addr));
|
||||
self.local_responses.push_front((addr, response));
|
||||
|
||||
break;
|
||||
}
|
||||
|
|
@ -231,7 +231,9 @@ impl SocketWorker {
|
|||
fn handle_cqe(&mut self, cqe: io_uring::cqueue::Entry) {
|
||||
match cqe.user_data() {
|
||||
USER_DATA_RECV => {
|
||||
self.handle_recv_cqe(&cqe);
|
||||
if let Some((addr, response)) = self.handle_recv_cqe(&cqe) {
|
||||
self.local_responses.push_back((addr, response));
|
||||
}
|
||||
|
||||
if !io_uring::cqueue::more(cqe.flags()) {
|
||||
self.resubmittable_sqe_buf.push(self.recv_sqe.clone());
|
||||
|
|
@ -290,12 +292,15 @@ impl SocketWorker {
|
|||
}
|
||||
}
|
||||
|
||||
fn handle_recv_cqe(&mut self, cqe: &io_uring::cqueue::Entry) {
|
||||
fn handle_recv_cqe(
|
||||
&mut self,
|
||||
cqe: &io_uring::cqueue::Entry,
|
||||
) -> Option<(CanonicalSocketAddr, Response)> {
|
||||
let result = cqe.result();
|
||||
|
||||
if result < 0 {
|
||||
if -result == libc::ENOBUFS {
|
||||
::log::info!("recv failed due to lack of buffers. If increasing ring size doesn't help, get faster hardware");
|
||||
::log::info!("recv failed due to lack of buffers, try increasing ring size");
|
||||
} else {
|
||||
::log::warn!(
|
||||
"recv failed: {:#}",
|
||||
|
|
@ -303,7 +308,7 @@ impl SocketWorker {
|
|||
);
|
||||
}
|
||||
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
|
||||
let buffer = unsafe {
|
||||
|
|
@ -312,23 +317,48 @@ impl SocketWorker {
|
|||
Ok(None) => {
|
||||
::log::error!("Couldn't get recv buffer");
|
||||
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
Err(err) => {
|
||||
::log::error!("Couldn't get recv buffer: {:#}", err);
|
||||
|
||||
return;
|
||||
return None;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let addr = match self.recv_helper.parse(buffer.as_slice()) {
|
||||
match self.recv_helper.parse(buffer.as_slice()) {
|
||||
Ok((request, addr)) => {
|
||||
self.handle_request(request, addr);
|
||||
if self.config.statistics.active() {
|
||||
let (statistics, extra_bytes) = if addr.is_ipv4() {
|
||||
(&self.statistics.ipv4, EXTRA_PACKET_SIZE_IPV4)
|
||||
} else {
|
||||
(&self.statistics.ipv6, EXTRA_PACKET_SIZE_IPV6)
|
||||
};
|
||||
|
||||
addr
|
||||
statistics
|
||||
.bytes_received
|
||||
.fetch_add(buffer.len() + extra_bytes, Ordering::Relaxed);
|
||||
statistics.requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
return self.handle_request(request, addr);
|
||||
}
|
||||
Err(self::recv_helper::Error::RequestParseError(err, addr)) => {
|
||||
if self.config.statistics.active() {
|
||||
if addr.is_ipv4() {
|
||||
self.statistics
|
||||
.ipv4
|
||||
.bytes_received
|
||||
.fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed);
|
||||
} else {
|
||||
self.statistics
|
||||
.ipv6
|
||||
.bytes_received
|
||||
.fetch_add(buffer.len() + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
match err {
|
||||
RequestParseError::Sendable {
|
||||
connection_id,
|
||||
|
|
@ -343,60 +373,43 @@ impl SocketWorker {
|
|||
message: err.into(),
|
||||
};
|
||||
|
||||
self.local_responses.push_back((response.into(), addr));
|
||||
return Some((addr, Response::Error(response)));
|
||||
}
|
||||
}
|
||||
RequestParseError::Unsendable { err } => {
|
||||
::log::debug!("Couldn't parse request from {:?}: {}", addr, err);
|
||||
}
|
||||
}
|
||||
|
||||
addr
|
||||
}
|
||||
Err(self::recv_helper::Error::InvalidSocketAddress) => {
|
||||
::log::debug!("Ignored request claiming to be from port 0");
|
||||
|
||||
return;
|
||||
}
|
||||
Err(self::recv_helper::Error::RecvMsgParseError) => {
|
||||
::log::error!("RecvMsgOut::parse failed");
|
||||
|
||||
return;
|
||||
}
|
||||
Err(self::recv_helper::Error::RecvMsgTruncated) => {
|
||||
::log::warn!("RecvMsgOut::parse failed: sockaddr or payload truncated");
|
||||
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
if self.config.statistics.active() {
|
||||
let (statistics, extra_bytes) = if addr.is_ipv4() {
|
||||
(&self.statistics.ipv4, EXTRA_PACKET_SIZE_IPV4)
|
||||
} else {
|
||||
(&self.statistics.ipv6, EXTRA_PACKET_SIZE_IPV6)
|
||||
};
|
||||
|
||||
statistics
|
||||
.bytes_received
|
||||
.fetch_add(buffer.len() + extra_bytes, Ordering::Relaxed);
|
||||
statistics.requests.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
fn handle_request(&mut self, request: Request, src: CanonicalSocketAddr) {
|
||||
fn handle_request(
|
||||
&mut self,
|
||||
request: Request,
|
||||
src: CanonicalSocketAddr,
|
||||
) -> Option<(CanonicalSocketAddr, Response)> {
|
||||
let access_list_mode = self.config.access_list.mode;
|
||||
|
||||
match request {
|
||||
Request::Connect(request) => {
|
||||
let connection_id = self.validator.create_connection_id(src);
|
||||
|
||||
let response = Response::Connect(ConnectResponse {
|
||||
connection_id,
|
||||
connection_id: self.validator.create_connection_id(src),
|
||||
transaction_id: request.transaction_id,
|
||||
});
|
||||
|
||||
self.local_responses.push_back((response, src));
|
||||
return Some((src, response));
|
||||
}
|
||||
Request::Announce(request) => {
|
||||
if self
|
||||
|
|
@ -417,14 +430,14 @@ impl SocketWorker {
|
|||
self.peer_valid_until,
|
||||
);
|
||||
|
||||
self.local_responses.push_back((response, src));
|
||||
return Some((src, response));
|
||||
} else {
|
||||
let response = Response::Error(ErrorResponse {
|
||||
transaction_id: request.transaction_id,
|
||||
message: "Info hash not allowed".into(),
|
||||
});
|
||||
|
||||
self.local_responses.push_back((response, src))
|
||||
return Some((src, response));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -436,10 +449,12 @@ impl SocketWorker {
|
|||
let response =
|
||||
Response::Scrape(self.shared_state.torrent_maps.scrape(request, src));
|
||||
|
||||
self.local_responses.push_back((response, src));
|
||||
return Some((src, response));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue