Merge pull request #80 from greatest-ape/work-2022-07-05

Improve aquatic_ws; add cli flag for printing parsed config; don't allow unrecognized config keys
This commit is contained in:
Joakim Frostegård 2022-07-05 14:18:26 +02:00 committed by GitHub
commit 1c72d7ce01
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 237 additions and 71 deletions

View file

@ -82,15 +82,8 @@ echo "log_level = 'trace'
[network]
address = '127.0.0.1:3002'
# glommio
tls_certificate_path = './cert.crt'
tls_private_key_path = './key.pk8'
# mio
use_tls = true
tls_pkcs12_path = './identity.pfx'
tls_pkcs12_password = 'p'
" > ws.toml
./target/debug/aquatic ws -c ws.toml > "$HOME/wss.log" 2>&1 &

8
Cargo.lock generated
View file

@ -1834,9 +1834,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.12.0"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7709cef83f0c1f58f666e746a08b21e0085f7440fa6a29cc194d68aac97a4225"
checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1"
[[package]]
name = "oorandom"
@ -2457,9 +2457,9 @@ checksum = "f27f6278552951f1f2b8cf9da965d10969b2efdea95a6ec47987ab46edfe263a"
[[package]]
name = "simple_logger"
version = "2.1.0"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c75a9723083573ace81ad0cdfc50b858aa3c366c48636edb4109d73122a0c0ea"
checksum = "166fea527c36d9b8a0a88c0c6d4c5077c699d9ffb5cf890b231a3f08c35f3d40"
dependencies = [
"atty",
"colored",

View file

@ -30,7 +30,7 @@ Features at a glance:
Known users:
- [explodie.org public tracker](https://explodie.org/opentracker.html) (`udp://explodie.org:6969`), typically serving ~100,000 requests per second
- [explodie.org public tracker](https://explodie.org/opentracker.html) (`udp://explodie.org:6969`), typically [serving ~80,000 requests per second](https://explodie.org/tracker-stats.html)
## Usage

24
TODO.md
View file

@ -2,9 +2,6 @@
## High priority
* udp: add IP blocklist, which would be more flexible than just adding option
for disallowing requests (claiming to be) from localhost
## Medium priority
* quit whole program if any thread panics
@ -14,17 +11,18 @@
* Save JoinHandles
* When preparing to quit because of PanicSentinel sending SIGTERM, loop
through them, extract error and log it
* config: fail on unrecognized keys?
* Run cargo-deny in CI
* udp: add IP blocklist, which would be more flexible than just adding option
for disallowing requests (claiming to be) from localhost
* stagger cleaning tasks?
* aquatic_ws
* remove peer from all torrent maps when connection is closed
* store `Vec<InfoHash>` in ConnectionReference, containing all used
info hashes. When connection is closed, send
InMessage::ConnectionClosed or similar to request workers.
Storing PeerId in ConnectionReference will also be necessary, as
well as making sure clients only use a single one. Alternatively,
a HashMap<PeerId, Vec<InfoHash>> can be used for storage.
* Can peer IP address change after connection has been established
due to some kind of renegotition? It would cause issues.
* Add cleaning task for ConnectionHandle.announced_info_hashes?
* RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity
* replacing indexmap_amortized / simd_json with equivalents doesn't help
* SinkExt::send maybe doesn't wake up properly?
@ -43,12 +41,8 @@
## Low priority
* config
* add flag to print parsed config when starting
* aquatic_udp
* what poll event capacity is actually needed?
* stagger connection cleaning intervals?
* load test
* move additional request sending to for each received response, maybe
with probability 0.2

View file

@ -26,7 +26,7 @@ impl Default for LogLevel {
}
}
pub trait Config: Default + TomlConfig + DeserializeOwned {
pub trait Config: Default + TomlConfig + DeserializeOwned + std::fmt::Debug {
fn get_log_level(&self) -> Option<LogLevel> {
None
}
@ -36,6 +36,7 @@ pub trait Config: Default + TomlConfig + DeserializeOwned {
pub struct Options {
config_file: Option<String>,
print_config: bool,
print_parsed_config: bool,
print_version: bool,
}
@ -59,6 +60,9 @@ impl Options {
"-p" | "--print-config" => {
options.print_config = true;
}
"-P" => {
options.print_parsed_config = true;
}
"-v" | "--version" => {
options.print_version = true;
}
@ -148,6 +152,10 @@ where
start_logger(log_level)?;
}
if options.print_parsed_config {
println!("Running with configuration: {:#?}", config);
}
app_fn(config)
}
}
@ -162,6 +170,7 @@ where
println!(" -c, --config-file Load config from this path");
println!(" -h, --help Print this help message");
println!(" -p, --print-config Print default config");
println!(" -P Print parsed config");
println!(" -v, --version Print version information");
if let Some(error) = opt_error {

View file

@ -10,7 +10,7 @@ use serde::Deserialize;
use aquatic_toml_config::TomlConfig;
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct PrivilegeConfig {
/// Chroot and switch group and user after binding to sockets
pub drop_privileges: bool,

View file

@ -11,7 +11,7 @@ use aquatic_common::cli::LogLevel;
/// aquatic_http configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
/// them on to the swarm workers. They then receive responses from the
@ -52,7 +52,7 @@ impl aquatic_common::cli::Config for Config {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
@ -82,7 +82,7 @@ impl Default for NetworkConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
@ -103,7 +103,7 @@ impl Default for ProtocolConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub torrent_cleaning_interval: u64,

View file

@ -7,7 +7,7 @@ use serde::Deserialize;
/// aquatic_http_load_test configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
pub server_address: SocketAddr,
pub log_level: LogLevel,
@ -34,7 +34,7 @@ impl aquatic_common::cli::Config for Config {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct TorrentConfig {
pub number_of_torrents: usize,
/// Pareto shape

View file

@ -8,7 +8,7 @@ use aquatic_common::cli::LogLevel;
/// aquatic_http_private configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
/// them on to the swarm workers. They then receive responses from the
@ -50,7 +50,7 @@ impl aquatic_common::cli::Config for Config {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
@ -73,7 +73,7 @@ impl Default for NetworkConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
@ -94,7 +94,7 @@ impl Default for ProtocolConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub torrent_cleaning_interval: u64,

View file

@ -8,7 +8,7 @@ use aquatic_toml_config::TomlConfig;
/// aquatic_udp configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
/// them on to the swarm workers. They then receive responses from the
@ -65,7 +65,7 @@ impl aquatic_common::cli::Config for Config {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
@ -117,7 +117,7 @@ impl Default for NetworkConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: u8,
@ -138,7 +138,7 @@ impl Default for ProtocolConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct StatisticsConfig {
/// Collect and print/write statistics this often (seconds)
pub interval: u64,
@ -168,7 +168,7 @@ impl Default for StatisticsConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct CleaningConfig {
/// Clean torrents this often (seconds)
pub torrent_cleaning_interval: u64,

View file

@ -2,6 +2,7 @@ use aquatic_toml_config::TomlConfig;
use serde::Deserialize;
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct BenchConfig {
pub num_rounds: usize,
pub num_threads: usize,

View file

@ -9,7 +9,7 @@ use aquatic_toml_config::TomlConfig;
/// aquatic_udp_load_test configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
/// Server address
///
@ -43,7 +43,7 @@ impl Default for Config {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {
/// True means bind to one localhost IP per socket.
///
@ -86,7 +86,7 @@ impl Default for NetworkConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct RequestConfig {
/// Number of torrents to simulate
pub number_of_torrents: usize,

View file

@ -4,6 +4,7 @@ use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::CanonicalSocketAddr;
pub use aquatic_common::ValidUntil;
use aquatic_ws_protocol::{InfoHash, PeerId};
#[derive(Default, Clone)]
pub struct State {
@ -28,3 +29,12 @@ pub struct ConnectionMeta {
pub peer_addr: CanonicalSocketAddr,
pub pending_scrape_id: Option<PendingScrapeId>,
}
#[derive(Clone, Copy, Debug)]
pub enum SwarmControlMessage {
ConnectionClosed {
info_hash: InfoHash,
peer_id: PeerId,
peer_addr: CanonicalSocketAddr,
},
}

View file

@ -10,7 +10,7 @@ use aquatic_toml_config::TomlConfig;
/// aquatic_ws configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
/// Socket workers receive requests from the socket, parse them and send
/// them on to the swarm workers. They then receive responses from the
@ -51,7 +51,7 @@ impl aquatic_common::cli::Config for Config {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct NetworkConfig {
/// Bind to this address
pub address: SocketAddr,
@ -86,7 +86,7 @@ impl Default for NetworkConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct ProtocolConfig {
/// Maximum number of torrents to accept in scrape request
pub max_scrape_torrents: usize,
@ -107,7 +107,7 @@ impl Default for ProtocolConfig {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub torrent_cleaning_interval: u64,

View file

@ -36,6 +36,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16);
let control_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE);
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
@ -52,6 +53,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let config = config.clone();
let state = state.clone();
let tls_config = tls_config.clone();
let control_mesh_builder = control_mesh_builder.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
let priv_dropper = priv_dropper.clone();
@ -71,6 +73,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
config,
state,
tls_config,
control_mesh_builder,
request_mesh_builder,
response_mesh_builder,
priv_dropper,
@ -86,6 +89,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let sentinel = sentinel.clone();
let config = config.clone();
let state = state.clone();
let control_mesh_builder = control_mesh_builder.clone();
let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone();
@ -103,6 +107,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
sentinel,
config,
state,
control_mesh_builder,
request_mesh_builder,
response_mesh_builder,
)

View file

@ -25,7 +25,7 @@ use glommio::net::{TcpListener, TcpStream};
use glommio::task::JoinHandle;
use glommio::timer::{sleep, timeout, TimerActionRepeat};
use glommio::{enclose, prelude::*};
use hashbrown::HashMap;
use hashbrown::{HashMap, HashSet};
use slab::Slab;
use crate::config::Config;
@ -46,6 +46,9 @@ struct ConnectionReference {
out_message_sender: Rc<LocalSender<(ConnectionMeta, OutMessage)>>,
/// Updated after sending message to peer
valid_until: ValidUntil,
peer_id: Option<PeerId>,
announced_info_hashes: HashSet<InfoHash>,
peer_addr: CanonicalSocketAddr,
}
pub async fn run_socket_worker(
@ -53,6 +56,7 @@ pub async fn run_socket_worker(
config: Config,
state: State,
tls_config: Arc<RustlsConfig>,
control_message_mesh_builder: MeshBuilder<SwarmControlMessage, Partial>,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
priv_dropper: PrivilegeDropper,
@ -62,6 +66,12 @@ pub async fn run_socket_worker(
let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener");
let (control_message_senders, _) = control_message_mesh_builder
.join(Role::Producer)
.await
.unwrap();
let control_message_senders = Rc::new(control_message_senders);
let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap();
let in_message_senders = Rc::new(in_message_senders);
@ -105,6 +115,18 @@ pub async fn run_socket_worker(
while let Some(stream) = incoming.next().await {
match stream {
Ok(stream) => {
let peer_addr = match stream.peer_addr() {
Ok(peer_addr) => CanonicalSocketAddr::new(peer_addr),
Err(err) => {
::log::info!(
"could not extract peer address, closing connection: {:#}",
err
);
continue;
}
};
let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE);
let out_message_sender = Rc::new(out_message_sender);
@ -112,13 +134,16 @@ pub async fn run_socket_worker(
task_handle: None,
out_message_sender: out_message_sender.clone(),
valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
peer_id: None,
announced_info_hashes: Default::default(),
peer_addr,
});
::log::info!("accepting stream: {}", key);
let task_handle = spawn_local_into(enclose!((config, access_list, in_message_senders, connection_slab, tls_config) async move {
let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, tls_config) async move {
if let Err(err) = run_connection(
config,
config.clone(),
access_list,
in_message_senders,
tq_prioritized,
@ -129,12 +154,40 @@ pub async fn run_socket_worker(
out_message_consumer_id,
ConnectionId(key),
tls_config,
stream
stream,
peer_addr,
).await {
::log::debug!("Connection::run() error: {:?}", err);
}
connection_slab.borrow_mut().try_remove(key);
// Remove reference in separate statement to avoid
// multiple RefCell borrows
let opt_reference = connection_slab.borrow_mut().try_remove(key);
// Tell swarm workers to remove peer
if let Some(reference) = opt_reference {
if let Some(peer_id) = reference.peer_id {
for info_hash in reference.announced_info_hashes {
let message = SwarmControlMessage::ConnectionClosed {
info_hash,
peer_id,
peer_addr: reference.peer_addr,
};
let consumer_index =
calculate_in_message_consumer_index(&config, info_hash);
// Only fails when receiver is closed
control_message_senders
.send_to(
consumer_index,
message
)
.await
.unwrap();
}
}
}
}), tq_regular)
.unwrap()
.detach();
@ -144,7 +197,7 @@ pub async fn run_socket_worker(
}
}
Err(err) => {
::log::error!("accept connection: {:?}", err);
::log::error!("accept connection: {:#}", err);
}
}
}
@ -217,12 +270,8 @@ async fn run_connection(
connection_id: ConnectionId,
tls_config: Arc<RustlsConfig>,
stream: TcpStream,
peer_addr: CanonicalSocketAddr,
) -> anyhow::Result<()> {
let peer_addr = stream
.peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?;
let peer_addr = CanonicalSocketAddr::new(peer_addr);
let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?;
@ -240,10 +289,11 @@ async fn run_connection(
let access_list_cache = create_access_list_cache(&access_list);
let reader_handle = spawn_local_into(
enclose!((config, pending_scrape_slab) async move {
enclose!((config, connection_slab, pending_scrape_slab) async move {
let mut reader = ConnectionReader {
config,
access_list_cache,
connection_slab,
in_message_senders,
out_message_sender,
pending_scrape_slab,
@ -289,6 +339,7 @@ async fn run_connection(
struct ConnectionReader {
config: Rc<Config>,
access_list_cache: AccessListCache,
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
in_message_senders: Rc<Senders<(ConnectionMeta, InMessage)>>,
out_message_sender: Rc<LocalSender<(ConnectionMeta, OutMessage)>>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
@ -320,7 +371,7 @@ impl ConnectionReader {
Err(err) => {
::log::debug!("Couldn't parse in_message: {:?}", err);
self.send_error_response("Invalid request".into(), None)
self.send_error_response("Invalid request".into(), None, None)
.await?;
}
}
@ -339,6 +390,41 @@ impl ConnectionReader {
.load()
.allows(self.config.access_list.mode, &info_hash.0)
{
{
let mut connection_slab = self.connection_slab.borrow_mut();
let connection_reference = connection_slab
.get_mut(self.connection_id.0)
.ok_or_else(|| {
anyhow::anyhow!(
"connection reference {} not found in slab",
self.connection_id.0
)
})?;
// Store peer id / check if stored peer id matches
match &mut connection_reference.peer_id {
Some(peer_id) if *peer_id != announce_request.peer_id => {
self.send_error_response(
"Only one peer id can be used per connection".into(),
Some(ErrorResponseAction::Announce),
Some(info_hash),
)
.await?;
return Err(anyhow::anyhow!("Peer used more than one PeerId"));
}
Some(_) => (),
opt_peer_id @ None => {
*opt_peer_id = Some(announce_request.peer_id);
}
}
// Remember info hash for later
connection_reference
.announced_info_hashes
.insert(announce_request.info_hash);
}
let in_message = InMessage::AnnounceRequest(announce_request);
let consumer_index =
@ -354,8 +440,12 @@ impl ConnectionReader {
.unwrap();
::log::info!("sent message to swarm worker");
} else {
self.send_error_response("Info hash not allowed".into(), Some(info_hash))
.await?;
self.send_error_response(
"Info hash not allowed".into(),
Some(ErrorResponseAction::Announce),
Some(info_hash),
)
.await?;
}
}
InMessage::ScrapeRequest(ScrapeRequest { info_hashes, .. }) => {
@ -364,8 +454,12 @@ impl ConnectionReader {
} else {
// If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive.
self.send_error_response("Full scrapes are not allowed".into(), None)
.await?;
self.send_error_response(
"Full scrapes are not allowed".into(),
Some(ErrorResponseAction::Scrape),
None,
)
.await?;
return Ok(());
};
@ -415,10 +509,11 @@ impl ConnectionReader {
async fn send_error_response(
&self,
failure_reason: Cow<'static, str>,
action: Option<ErrorResponseAction>,
info_hash: Option<InfoHash>,
) -> anyhow::Result<()> {
let out_message = OutMessage::ErrorResponse(ErrorResponse {
action: Some(ErrorResponseAction::Scrape),
action,
failure_reason,
info_hash,
});

View file

@ -68,6 +68,22 @@ impl Default for TorrentData {
}
}
impl TorrentData {
pub fn remove_peer(&mut self, peer_id: PeerId) {
if let Some(peer) = self.peers.remove(&peer_id) {
match peer.status {
PeerStatus::Leeching => {
self.num_leechers -= 1;
}
PeerStatus::Seeding => {
self.num_seeders -= 1;
}
PeerStatus::Stopped => (),
}
}
}
}
type TorrentMap = AmortizedIndexMap<InfoHash, TorrentData>;
#[derive(Default)]
@ -131,9 +147,15 @@ pub async fn run_swarm_worker(
_sentinel: PanicSentinel,
config: Config,
state: State,
control_message_mesh_builder: MeshBuilder<SwarmControlMessage, Partial>,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
) {
let (_, mut control_message_receivers) = control_message_mesh_builder
.join(Role::Consumer)
.await
.unwrap();
let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap();
let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap();
@ -153,6 +175,13 @@ pub async fn run_swarm_worker(
let mut handles = Vec::new();
for (_, receiver) in control_message_receivers.streams() {
let handle =
spawn_local(handle_control_message_stream(torrents.clone(), receiver)).detach();
handles.push(handle);
}
for (_, receiver) in in_message_receivers.streams() {
let handle = spawn_local(handle_request_stream(
config.clone(),
@ -170,6 +199,36 @@ pub async fn run_swarm_worker(
}
}
async fn handle_control_message_stream<S>(torrents: Rc<RefCell<TorrentMaps>>, mut stream: S)
where
S: futures_lite::Stream<Item = SwarmControlMessage> + ::std::marker::Unpin,
{
while let Some(message) = stream.next().await {
match message {
SwarmControlMessage::ConnectionClosed {
info_hash,
peer_id,
peer_addr,
} => {
::log::debug!(
"Removing peer {} from torrents because connection was closed",
peer_addr.get()
);
if peer_addr.is_ipv4() {
if let Some(torrent_data) = torrents.borrow_mut().ipv4.get_mut(&info_hash) {
torrent_data.remove_peer(peer_id);
}
} else {
if let Some(torrent_data) = torrents.borrow_mut().ipv6.get_mut(&info_hash) {
torrent_data.remove_peer(peer_id);
}
}
}
}
}
}
async fn handle_request_stream<S>(
config: Config,
torrents: Rc<RefCell<TorrentMaps>>,

View file

@ -7,7 +7,7 @@ use serde::Deserialize;
/// aquatic_ws_load_test configuration
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct Config {
pub server_address: SocketAddr,
pub log_level: LogLevel,
@ -41,7 +41,7 @@ impl Default for Config {
}
#[derive(Clone, Debug, PartialEq, TomlConfig, Deserialize)]
#[serde(default)]
#[serde(default, deny_unknown_fields)]
pub struct TorrentConfig {
pub offers_per_request: usize,
pub number_of_torrents: usize,