udp: improve statistics structs

- Now, workers don't need to keep track of which atomic usize
  to update
- Additionally, prometheus now gets separate information per
  socket worker
This commit is contained in:
Joakim Frostegård 2024-02-02 13:37:43 +01:00
parent e2e525b560
commit 405bbaca93
12 changed files with 400 additions and 294 deletions

View file

@ -4,7 +4,6 @@ use std::time::{Duration, Instant};
use anyhow::Context;
use aquatic_common::access_list::AccessListCache;
use aquatic_common::ServerStartInstant;
use mio::net::UdpSocket;
use mio::{Events, Interest, Poll, Token};
@ -35,11 +34,11 @@ enum PollMode {
pub struct SocketWorker {
config: Config,
shared_state: State,
statistics: CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
request_sender: ConnectedRequestSender,
response_receiver: ConnectedResponseReceiver,
access_list_cache: AccessListCache,
validator: ConnectionValidator,
server_start_instant: ServerStartInstant,
pending_scrape_responses: PendingScrapeResponseSlab,
socket: UdpSocket,
opt_resend_buffer: Option<Vec<(CanonicalSocketAddr, Response)>>,
@ -51,10 +50,10 @@ pub struct SocketWorker {
impl SocketWorker {
pub fn run(
shared_state: State,
config: Config,
shared_state: State,
statistics: CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
validator: ConnectionValidator,
server_start_instant: ServerStartInstant,
request_sender: ConnectedRequestSender,
response_receiver: ConnectedResponseReceiver,
priv_dropper: PrivilegeDropper,
@ -66,8 +65,8 @@ impl SocketWorker {
let mut worker = Self {
config,
shared_state,
statistics,
validator,
server_start_instant,
request_sender,
response_receiver,
access_list_cache,
@ -96,7 +95,7 @@ impl SocketWorker {
Duration::from_secs(self.config.cleaning.pending_scrape_cleaning_interval);
let mut pending_scrape_valid_until = ValidUntil::new(
self.server_start_instant,
self.shared_state.server_start_instant,
self.config.cleaning.max_pending_scrape_age,
);
let mut last_pending_scrape_cleaning = Instant::now();
@ -133,7 +132,7 @@ impl SocketWorker {
for (addr, response) in resend_buffer.drain(..) {
Self::send_response(
&self.config,
&self.shared_state,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut None,
@ -159,7 +158,7 @@ impl SocketWorker {
// Run periodic ValidUntil updates and state cleaning
if iter_counter % 256 == 0 {
let seconds_since_start = self.server_start_instant.seconds_elapsed();
let seconds_since_start = self.shared_state.server_start_instant.seconds_elapsed();
pending_scrape_valid_until = ValidUntil::new_with_now(
seconds_since_start,
@ -180,26 +179,49 @@ impl SocketWorker {
}
fn read_and_handle_requests(&mut self, pending_scrape_valid_until: ValidUntil) {
let mut requests_received_ipv4: usize = 0;
let mut requests_received_ipv6: usize = 0;
let mut bytes_received_ipv4: usize = 0;
let mut bytes_received_ipv6 = 0;
let max_scrape_torrents = self.config.protocol.max_scrape_torrents;
loop {
match self.socket.recv_from(&mut self.buffer[..]) {
Ok((bytes_read, src)) => {
if src.port() == 0 {
::log::debug!("Ignored request from {} because source port is zero", src);
let src_port = src.port();
let src = CanonicalSocketAddr::new(src);
// Use canonical address for statistics
let opt_statistics = if self.config.statistics.active() {
if src.is_ipv4() {
let statistics = &self.statistics.ipv4;
statistics
.bytes_received
.fetch_add(bytes_read + EXTRA_PACKET_SIZE_IPV4, Ordering::Relaxed);
Some(statistics)
} else {
let statistics = &self.statistics.ipv6;
statistics
.bytes_received
.fetch_add(bytes_read + EXTRA_PACKET_SIZE_IPV6, Ordering::Relaxed);
Some(statistics)
}
} else {
None
};
if src_port == 0 {
::log::debug!("Ignored request because source port is zero");
continue;
}
let src = CanonicalSocketAddr::new(src);
let request_parsable = match Request::parse_bytes(
&self.buffer[..bytes_read],
self.config.protocol.max_scrape_torrents,
) {
match Request::parse_bytes(&self.buffer[..bytes_read], max_scrape_torrents) {
Ok(request) => {
if let Some(statistics) = opt_statistics {
statistics.requests.fetch_add(1, Ordering::Relaxed);
}
if let Err(HandleRequestError::RequestChannelFull(failed_requests)) =
self.handle_request(pending_scrape_valid_until, request, src)
{
@ -208,52 +230,36 @@ impl SocketWorker {
break;
}
}
Err(RequestParseError::Sendable {
connection_id,
transaction_id,
err,
}) if self.validator.connection_id_valid(src, connection_id) => {
let response = ErrorResponse {
transaction_id,
message: err.into(),
};
true
Self::send_response(
&self.config,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
Response::Error(response),
src,
);
::log::debug!("request parse error (sent error response): {:?}", err);
}
Err(err) => {
::log::debug!("Request::from_bytes error: {:?}", err);
if let RequestParseError::Sendable {
connection_id,
transaction_id,
err,
} = err
{
if self.validator.connection_id_valid(src, connection_id) {
let response = ErrorResponse {
transaction_id,
message: err.into(),
};
Self::send_response(
&self.config,
&self.shared_state,
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
Response::Error(response),
src,
);
}
}
false
::log::debug!(
"request parse error (didn't send error response): {:?}",
err
);
}
};
// Update statistics for converted address
if src.is_ipv4() {
if request_parsable {
requests_received_ipv4 += 1;
}
bytes_received_ipv4 += bytes_read + EXTRA_PACKET_SIZE_IPV4;
} else {
if request_parsable {
requests_received_ipv6 += 1;
}
bytes_received_ipv6 += bytes_read + EXTRA_PACKET_SIZE_IPV6;
}
}
Err(err) if err.kind() == ErrorKind::WouldBlock => {
break;
@ -263,25 +269,6 @@ impl SocketWorker {
}
}
}
if self.config.statistics.active() {
self.shared_state
.statistics_ipv4
.requests_received
.fetch_add(requests_received_ipv4, Ordering::Relaxed);
self.shared_state
.statistics_ipv6
.requests_received
.fetch_add(requests_received_ipv6, Ordering::Relaxed);
self.shared_state
.statistics_ipv4
.bytes_received
.fetch_add(bytes_received_ipv4, Ordering::Relaxed);
self.shared_state
.statistics_ipv6
.bytes_received
.fetch_add(bytes_received_ipv6, Ordering::Relaxed);
}
}
fn handle_request(
@ -303,7 +290,7 @@ impl SocketWorker {
Self::send_response(
&self.config,
&self.shared_state,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
@ -339,7 +326,7 @@ impl SocketWorker {
Self::send_response(
&self.config,
&self.shared_state,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
@ -407,7 +394,7 @@ impl SocketWorker {
Self::send_response(
&self.config,
&self.shared_state,
&self.statistics,
&mut self.socket,
&mut self.buffer,
&mut self.opt_resend_buffer,
@ -419,7 +406,7 @@ impl SocketWorker {
fn send_response(
config: &Config,
shared_state: &State,
statistics: &CachePaddedArc<IpVersionStatistics<SocketWorkerStatistics>>,
socket: &mut UdpSocket,
buffer: &mut [u8],
opt_resend_buffer: &mut Option<Vec<(CanonicalSocketAddr, Response)>>,
@ -447,7 +434,7 @@ impl SocketWorker {
match socket.send_to(&buffer.into_inner()[..bytes_written], addr) {
Ok(amt) if config.statistics.active() => {
let stats = if canonical_addr.is_ipv4() {
let stats = &shared_state.statistics_ipv4;
let stats = &statistics.ipv4;
stats
.bytes_sent
@ -455,7 +442,7 @@ impl SocketWorker {
stats
} else {
let stats = &shared_state.statistics_ipv6;
let stats = &statistics.ipv6;
stats
.bytes_sent
@ -466,18 +453,16 @@ impl SocketWorker {
match response {
Response::Connect(_) => {
stats.responses_sent_connect.fetch_add(1, Ordering::Relaxed);
stats.responses_connect.fetch_add(1, Ordering::Relaxed);
}
Response::AnnounceIpv4(_) | Response::AnnounceIpv6(_) => {
stats
.responses_sent_announce
.fetch_add(1, Ordering::Relaxed);
stats.responses_announce.fetch_add(1, Ordering::Relaxed);
}
Response::Scrape(_) => {
stats.responses_sent_scrape.fetch_add(1, Ordering::Relaxed);
stats.responses_scrape.fetch_add(1, Ordering::Relaxed);
}
Response::Error(_) => {
stats.responses_sent_error.fetch_add(1, Ordering::Relaxed);
stats.responses_error.fetch_add(1, Ordering::Relaxed);
}
}
}