ws: remove peer from all torrent maps when connection is closed

This commit is contained in:
Joakim Frostegård 2022-07-05 13:13:53 +02:00
parent b30da1a930
commit 720ceacf99
5 changed files with 134 additions and 17 deletions

10
TODO.md
View file

@ -20,13 +20,9 @@
* stagger cleaning tasks? * stagger cleaning tasks?
* aquatic_ws * aquatic_ws
* remove peer from all torrent maps when connection is closed * Can peer IP address change after connection has been established
* store `Vec<InfoHash>` in ConnectionReference, containing all used due to some kind of renegotition? It would cause issues.
info hashes. When connection is closed, send * Add cleaning task for ConnectionHandle.announced_info_hashes?
InMessage::ConnectionClosed or similar to request workers.
Storing PeerId in ConnectionReference will also be necessary, as
well as making sure clients only use a single one. Alternatively,
a HashMap<PeerId, Vec<InfoHash>> can be used for storage.
* RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity * RES memory still high after traffic stops, even if torrent maps and connection slabs go down to 0 len and capacity
* replacing indexmap_amortized / simd_json with equivalents doesn't help * replacing indexmap_amortized / simd_json with equivalents doesn't help
* SinkExt::send maybe doesn't wake up properly? * SinkExt::send maybe doesn't wake up properly?

View file

@ -4,6 +4,7 @@ use aquatic_common::access_list::AccessListArcSwap;
use aquatic_common::CanonicalSocketAddr; use aquatic_common::CanonicalSocketAddr;
pub use aquatic_common::ValidUntil; pub use aquatic_common::ValidUntil;
use aquatic_ws_protocol::{InfoHash, PeerId};
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub struct State { pub struct State {
@ -28,3 +29,12 @@ pub struct ConnectionMeta {
pub peer_addr: CanonicalSocketAddr, pub peer_addr: CanonicalSocketAddr,
pub pending_scrape_id: Option<PendingScrapeId>, pub pending_scrape_id: Option<PendingScrapeId>,
} }
#[derive(Clone, Copy, Debug)]
pub enum SwarmControlMessage {
ConnectionClosed {
info_hash: InfoHash,
peer_id: PeerId,
peer_addr: CanonicalSocketAddr,
},
}

View file

@ -36,6 +36,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE); let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE);
let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16); let response_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE * 16);
let control_mesh_builder = MeshBuilder::partial(num_peers, SHARED_IN_CHANNEL_SIZE);
let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel(); let (sentinel_watcher, sentinel) = PanicSentinelWatcher::create_with_sentinel();
let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers); let priv_dropper = PrivilegeDropper::new(config.privileges.clone(), config.socket_workers);
@ -52,6 +53,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let config = config.clone(); let config = config.clone();
let state = state.clone(); let state = state.clone();
let tls_config = tls_config.clone(); let tls_config = tls_config.clone();
let control_mesh_builder = control_mesh_builder.clone();
let request_mesh_builder = request_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone();
let priv_dropper = priv_dropper.clone(); let priv_dropper = priv_dropper.clone();
@ -71,6 +73,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
config, config,
state, state,
tls_config, tls_config,
control_mesh_builder,
request_mesh_builder, request_mesh_builder,
response_mesh_builder, response_mesh_builder,
priv_dropper, priv_dropper,
@ -86,6 +89,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
let sentinel = sentinel.clone(); let sentinel = sentinel.clone();
let config = config.clone(); let config = config.clone();
let state = state.clone(); let state = state.clone();
let control_mesh_builder = control_mesh_builder.clone();
let request_mesh_builder = request_mesh_builder.clone(); let request_mesh_builder = request_mesh_builder.clone();
let response_mesh_builder = response_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone();
@ -103,6 +107,7 @@ pub fn run(config: Config) -> ::anyhow::Result<()> {
sentinel, sentinel,
config, config,
state, state,
control_mesh_builder,
request_mesh_builder, request_mesh_builder,
response_mesh_builder, response_mesh_builder,
) )

View file

@ -48,6 +48,7 @@ struct ConnectionReference {
valid_until: ValidUntil, valid_until: ValidUntil,
peer_id: Option<PeerId>, peer_id: Option<PeerId>,
announced_info_hashes: HashSet<InfoHash>, announced_info_hashes: HashSet<InfoHash>,
peer_addr: CanonicalSocketAddr,
} }
pub async fn run_socket_worker( pub async fn run_socket_worker(
@ -55,6 +56,7 @@ pub async fn run_socket_worker(
config: Config, config: Config,
state: State, state: State,
tls_config: Arc<RustlsConfig>, tls_config: Arc<RustlsConfig>,
control_message_mesh_builder: MeshBuilder<SwarmControlMessage, Partial>,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
priv_dropper: PrivilegeDropper, priv_dropper: PrivilegeDropper,
@ -64,6 +66,12 @@ pub async fn run_socket_worker(
let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener"); let listener = create_tcp_listener(&config, priv_dropper).expect("create tcp listener");
let (control_message_senders, _) = control_message_mesh_builder
.join(Role::Producer)
.await
.unwrap();
let control_message_senders = Rc::new(control_message_senders);
let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap(); let (in_message_senders, _) = in_message_mesh_builder.join(Role::Producer).await.unwrap();
let in_message_senders = Rc::new(in_message_senders); let in_message_senders = Rc::new(in_message_senders);
@ -107,6 +115,18 @@ pub async fn run_socket_worker(
while let Some(stream) = incoming.next().await { while let Some(stream) = incoming.next().await {
match stream { match stream {
Ok(stream) => { Ok(stream) => {
let peer_addr = match stream.peer_addr() {
Ok(peer_addr) => CanonicalSocketAddr::new(peer_addr),
Err(err) => {
::log::info!(
"could not extract peer address, closing connection: {:#}",
err
);
continue;
}
};
let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE); let (out_message_sender, out_message_receiver) = new_bounded(LOCAL_CHANNEL_SIZE);
let out_message_sender = Rc::new(out_message_sender); let out_message_sender = Rc::new(out_message_sender);
@ -116,13 +136,14 @@ pub async fn run_socket_worker(
valid_until: ValidUntil::new(config.cleaning.max_connection_idle), valid_until: ValidUntil::new(config.cleaning.max_connection_idle),
peer_id: None, peer_id: None,
announced_info_hashes: Default::default(), announced_info_hashes: Default::default(),
peer_addr,
}); });
::log::info!("accepting stream: {}", key); ::log::info!("accepting stream: {}", key);
let task_handle = spawn_local_into(enclose!((config, access_list, in_message_senders, connection_slab, tls_config) async move { let task_handle = spawn_local_into(enclose!((config, access_list, control_message_senders, in_message_senders, connection_slab, tls_config) async move {
if let Err(err) = run_connection( if let Err(err) = run_connection(
config, config.clone(),
access_list, access_list,
in_message_senders, in_message_senders,
tq_prioritized, tq_prioritized,
@ -133,12 +154,40 @@ pub async fn run_socket_worker(
out_message_consumer_id, out_message_consumer_id,
ConnectionId(key), ConnectionId(key),
tls_config, tls_config,
stream stream,
peer_addr,
).await { ).await {
::log::debug!("Connection::run() error: {:?}", err); ::log::debug!("Connection::run() error: {:?}", err);
} }
connection_slab.borrow_mut().try_remove(key); // Remove reference in separate statement to avoid
// multiple RefCell borrows
let opt_reference = connection_slab.borrow_mut().try_remove(key);
// Tell swarm workers to remove peer
if let Some(reference) = opt_reference {
if let Some(peer_id) = reference.peer_id {
for info_hash in reference.announced_info_hashes {
let message = SwarmControlMessage::ConnectionClosed {
info_hash,
peer_id,
peer_addr: reference.peer_addr,
};
let consumer_index =
calculate_in_message_consumer_index(&config, info_hash);
// Only fails when receiver is closed
control_message_senders
.send_to(
consumer_index,
message
)
.await
.unwrap();
}
}
}
}), tq_regular) }), tq_regular)
.unwrap() .unwrap()
.detach(); .detach();
@ -148,7 +197,7 @@ pub async fn run_socket_worker(
} }
} }
Err(err) => { Err(err) => {
::log::error!("accept connection: {:?}", err); ::log::error!("accept connection: {:#}", err);
} }
} }
} }
@ -221,12 +270,8 @@ async fn run_connection(
connection_id: ConnectionId, connection_id: ConnectionId,
tls_config: Arc<RustlsConfig>, tls_config: Arc<RustlsConfig>,
stream: TcpStream, stream: TcpStream,
peer_addr: CanonicalSocketAddr,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let peer_addr = stream
.peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?;
let peer_addr = CanonicalSocketAddr::new(peer_addr);
let tls_acceptor: TlsAcceptor = tls_config.into(); let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?; let stream = tls_acceptor.accept(stream).await?;
@ -357,6 +402,7 @@ impl ConnectionReader {
) )
})?; })?;
// Store peer id / check if stored peer id matches
match &mut connection_reference.peer_id { match &mut connection_reference.peer_id {
Some(peer_id) if *peer_id != announce_request.peer_id => { Some(peer_id) if *peer_id != announce_request.peer_id => {
self.send_error_response( self.send_error_response(
@ -373,6 +419,7 @@ impl ConnectionReader {
} }
} }
// Remember info hash for later
connection_reference connection_reference
.announced_info_hashes .announced_info_hashes
.insert(announce_request.info_hash); .insert(announce_request.info_hash);

View file

@ -68,6 +68,22 @@ impl Default for TorrentData {
} }
} }
impl TorrentData {
pub fn remove_peer(&mut self, peer_id: PeerId) {
if let Some(peer) = self.peers.remove(&peer_id) {
match peer.status {
PeerStatus::Leeching => {
self.num_leechers -= 1;
}
PeerStatus::Seeding => {
self.num_seeders -= 1;
}
PeerStatus::Stopped => (),
}
}
}
}
type TorrentMap = AmortizedIndexMap<InfoHash, TorrentData>; type TorrentMap = AmortizedIndexMap<InfoHash, TorrentData>;
#[derive(Default)] #[derive(Default)]
@ -131,9 +147,15 @@ pub async fn run_swarm_worker(
_sentinel: PanicSentinel, _sentinel: PanicSentinel,
config: Config, config: Config,
state: State, state: State,
control_message_mesh_builder: MeshBuilder<SwarmControlMessage, Partial>,
in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>, in_message_mesh_builder: MeshBuilder<(ConnectionMeta, InMessage), Partial>,
out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>, out_message_mesh_builder: MeshBuilder<(ConnectionMeta, OutMessage), Partial>,
) { ) {
let (_, mut control_message_receivers) = control_message_mesh_builder
.join(Role::Consumer)
.await
.unwrap();
let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap(); let (_, mut in_message_receivers) = in_message_mesh_builder.join(Role::Consumer).await.unwrap();
let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap(); let (out_message_senders, _) = out_message_mesh_builder.join(Role::Producer).await.unwrap();
@ -153,6 +175,13 @@ pub async fn run_swarm_worker(
let mut handles = Vec::new(); let mut handles = Vec::new();
for (_, receiver) in control_message_receivers.streams() {
let handle =
spawn_local(handle_control_message_stream(torrents.clone(), receiver)).detach();
handles.push(handle);
}
for (_, receiver) in in_message_receivers.streams() { for (_, receiver) in in_message_receivers.streams() {
let handle = spawn_local(handle_request_stream( let handle = spawn_local(handle_request_stream(
config.clone(), config.clone(),
@ -170,6 +199,36 @@ pub async fn run_swarm_worker(
} }
} }
async fn handle_control_message_stream<S>(torrents: Rc<RefCell<TorrentMaps>>, mut stream: S)
where
S: futures_lite::Stream<Item = SwarmControlMessage> + ::std::marker::Unpin,
{
while let Some(message) = stream.next().await {
match message {
SwarmControlMessage::ConnectionClosed {
info_hash,
peer_id,
peer_addr,
} => {
::log::debug!(
"Removing peer {} from torrents because connection was closed",
peer_addr.get()
);
if peer_addr.is_ipv4() {
if let Some(torrent_data) = torrents.borrow_mut().ipv4.get_mut(&info_hash) {
torrent_data.remove_peer(peer_id);
}
} else {
if let Some(torrent_data) = torrents.borrow_mut().ipv6.get_mut(&info_hash) {
torrent_data.remove_peer(peer_id);
}
}
}
}
}
}
async fn handle_request_stream<S>( async fn handle_request_stream<S>(
config: Config, config: Config,
torrents: Rc<RefCell<TorrentMaps>>, torrents: Rc<RefCell<TorrentMaps>>,