mirror of
https://github.com/YGGverse/aquatic.git
synced 2026-04-02 10:45:30 +00:00
udp: reorder code in swarm storage
This commit is contained in:
parent
a873dddb89
commit
cb39eb69c8
1 changed files with 143 additions and 144 deletions
|
|
@ -18,27 +18,146 @@ use rand::Rng;
|
||||||
use crate::common::*;
|
use crate::common::*;
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
pub struct TorrentMaps {
|
||||||
struct Peer<I: Ip> {
|
pub ipv4: TorrentMap<Ipv4AddrBytes>,
|
||||||
ip_address: I,
|
pub ipv6: TorrentMap<Ipv6AddrBytes>,
|
||||||
port: Port,
|
|
||||||
is_seeder: bool,
|
|
||||||
valid_until: ValidUntil,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<I: Ip> Peer<I> {
|
impl Default for TorrentMaps {
|
||||||
fn to_response_peer(_: &PeerId, peer: &Self) -> ResponsePeer<I> {
|
fn default() -> Self {
|
||||||
ResponsePeer {
|
Self {
|
||||||
ip_address: peer.ip_address,
|
ipv4: TorrentMap(Default::default()),
|
||||||
port: peer.port,
|
ipv6: TorrentMap(Default::default()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type PeerMap<I> = IndexMap<PeerId, Peer<I>>;
|
impl TorrentMaps {
|
||||||
|
/// Remove forbidden or inactive torrents, reclaim space and update statistics
|
||||||
|
pub fn clean_and_update_statistics(
|
||||||
|
&mut self,
|
||||||
|
config: &Config,
|
||||||
|
state: &State,
|
||||||
|
statistics_sender: &Sender<StatisticsMessage>,
|
||||||
|
access_list: &Arc<AccessListArcSwap>,
|
||||||
|
server_start_instant: ServerStartInstant,
|
||||||
|
worker_index: SwarmWorkerIndex,
|
||||||
|
) {
|
||||||
|
let mut cache = create_access_list_cache(access_list);
|
||||||
|
let mode = config.access_list.mode;
|
||||||
|
let now = server_start_instant.seconds_elapsed();
|
||||||
|
|
||||||
|
let ipv4 =
|
||||||
|
self.ipv4
|
||||||
|
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
|
||||||
|
let ipv6 =
|
||||||
|
self.ipv6
|
||||||
|
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
|
||||||
|
|
||||||
|
if config.statistics.active() {
|
||||||
|
state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release);
|
||||||
|
state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release);
|
||||||
|
|
||||||
|
if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) {
|
||||||
|
if let Err(err) = statistics_sender.try_send(message) {
|
||||||
|
::log::error!("couldn't send statistics message: {:#}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) {
|
||||||
|
if let Err(err) = statistics_sender.try_send(message) {
|
||||||
|
::log::error!("couldn't send statistics message: {:#}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
pub struct TorrentMap<I: Ip>(pub IndexMap<InfoHash, TorrentData<I>>);
|
||||||
|
|
||||||
|
impl<I: Ip> TorrentMap<I> {
|
||||||
|
pub fn scrape(&mut self, request: PendingScrapeRequest, response: &mut PendingScrapeResponse) {
|
||||||
|
response.slab_key = request.slab_key;
|
||||||
|
|
||||||
|
let torrent_stats = request.info_hashes.into_iter().map(|(i, info_hash)| {
|
||||||
|
let stats = self
|
||||||
|
.0
|
||||||
|
.get(&info_hash)
|
||||||
|
.map(|torrent_data| torrent_data.scrape_statistics())
|
||||||
|
.unwrap_or_else(|| create_torrent_scrape_statistics(0, 0));
|
||||||
|
|
||||||
|
(i, stats)
|
||||||
|
});
|
||||||
|
|
||||||
|
response.torrent_stats.extend(torrent_stats);
|
||||||
|
}
|
||||||
|
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
|
||||||
|
fn clean_and_get_statistics(
|
||||||
|
&mut self,
|
||||||
|
config: &Config,
|
||||||
|
statistics_sender: &Sender<StatisticsMessage>,
|
||||||
|
access_list_cache: &mut AccessListCache,
|
||||||
|
access_list_mode: AccessListMode,
|
||||||
|
now: SecondsSinceServerStart,
|
||||||
|
) -> (usize, Option<Histogram<u64>>) {
|
||||||
|
let mut num_peers = 0;
|
||||||
|
|
||||||
|
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.torrent_peer_histograms
|
||||||
|
{
|
||||||
|
match Histogram::new(3) {
|
||||||
|
Ok(histogram) => Some(histogram),
|
||||||
|
Err(err) => {
|
||||||
|
::log::error!("Couldn't create peer histogram: {:#}", err);
|
||||||
|
|
||||||
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
self.0.retain(|info_hash, torrent| {
|
||||||
|
if !access_list_cache
|
||||||
|
.load()
|
||||||
|
.allows(access_list_mode, &info_hash.0)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
torrent.clean(config, statistics_sender, now);
|
||||||
|
|
||||||
|
num_peers += torrent.peers.len();
|
||||||
|
|
||||||
|
match opt_histogram {
|
||||||
|
Some(ref mut histogram) if torrent.peers.len() != 0 => {
|
||||||
|
let n = torrent
|
||||||
|
.peers
|
||||||
|
.len()
|
||||||
|
.try_into()
|
||||||
|
.expect("Couldn't fit usize into u64");
|
||||||
|
|
||||||
|
if let Err(err) = histogram.record(n) {
|
||||||
|
::log::error!("Couldn't record {} to histogram: {:#}", n, err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
|
||||||
|
!torrent.peers.is_empty()
|
||||||
|
});
|
||||||
|
|
||||||
|
self.0.shrink_to_fit();
|
||||||
|
|
||||||
|
(num_peers, opt_histogram)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn num_torrents(&self) -> usize {
|
||||||
|
self.0.len()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct TorrentData<I: Ip> {
|
pub struct TorrentData<I: Ip> {
|
||||||
peers: PeerMap<I>,
|
peers: IndexMap<PeerId, Peer<I>>,
|
||||||
num_seeders: usize,
|
num_seeders: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -195,143 +314,23 @@ impl<I: Ip> Default for TorrentData<I> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct TorrentMap<I: Ip>(pub IndexMap<InfoHash, TorrentData<I>>);
|
struct Peer<I: Ip> {
|
||||||
|
ip_address: I,
|
||||||
impl<I: Ip> TorrentMap<I> {
|
port: Port,
|
||||||
pub fn scrape(&mut self, request: PendingScrapeRequest, response: &mut PendingScrapeResponse) {
|
is_seeder: bool,
|
||||||
response.slab_key = request.slab_key;
|
valid_until: ValidUntil,
|
||||||
|
|
||||||
let torrent_stats = request.info_hashes.into_iter().map(|(i, info_hash)| {
|
|
||||||
let stats = self
|
|
||||||
.0
|
|
||||||
.get(&info_hash)
|
|
||||||
.map(|torrent_data| torrent_data.scrape_statistics())
|
|
||||||
.unwrap_or_else(|| create_torrent_scrape_statistics(0, 0));
|
|
||||||
|
|
||||||
(i, stats)
|
|
||||||
});
|
|
||||||
|
|
||||||
response.torrent_stats.extend(torrent_stats);
|
|
||||||
}
|
|
||||||
/// Remove forbidden or inactive torrents, reclaim space and return number of remaining peers
|
|
||||||
fn clean_and_get_statistics(
|
|
||||||
&mut self,
|
|
||||||
config: &Config,
|
|
||||||
statistics_sender: &Sender<StatisticsMessage>,
|
|
||||||
access_list_cache: &mut AccessListCache,
|
|
||||||
access_list_mode: AccessListMode,
|
|
||||||
now: SecondsSinceServerStart,
|
|
||||||
) -> (usize, Option<Histogram<u64>>) {
|
|
||||||
let mut num_peers = 0;
|
|
||||||
|
|
||||||
let mut opt_histogram: Option<Histogram<u64>> = if config.statistics.torrent_peer_histograms
|
|
||||||
{
|
|
||||||
match Histogram::new(3) {
|
|
||||||
Ok(histogram) => Some(histogram),
|
|
||||||
Err(err) => {
|
|
||||||
::log::error!("Couldn't create peer histogram: {:#}", err);
|
|
||||||
|
|
||||||
None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
self.0.retain(|info_hash, torrent| {
|
|
||||||
if !access_list_cache
|
|
||||||
.load()
|
|
||||||
.allows(access_list_mode, &info_hash.0)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
torrent.clean(config, statistics_sender, now);
|
|
||||||
|
|
||||||
num_peers += torrent.peers.len();
|
|
||||||
|
|
||||||
match opt_histogram {
|
|
||||||
Some(ref mut histogram) if torrent.peers.len() != 0 => {
|
|
||||||
let n = torrent
|
|
||||||
.peers
|
|
||||||
.len()
|
|
||||||
.try_into()
|
|
||||||
.expect("Couldn't fit usize into u64");
|
|
||||||
|
|
||||||
if let Err(err) = histogram.record(n) {
|
|
||||||
::log::error!("Couldn't record {} to histogram: {:#}", n, err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => (),
|
|
||||||
}
|
|
||||||
|
|
||||||
!torrent.peers.is_empty()
|
|
||||||
});
|
|
||||||
|
|
||||||
self.0.shrink_to_fit();
|
|
||||||
|
|
||||||
(num_peers, opt_histogram)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn num_torrents(&self) -> usize {
|
|
||||||
self.0.len()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TorrentMaps {
|
impl<I: Ip> Peer<I> {
|
||||||
pub ipv4: TorrentMap<Ipv4AddrBytes>,
|
fn to_response_peer(_: &PeerId, peer: &Self) -> ResponsePeer<I> {
|
||||||
pub ipv6: TorrentMap<Ipv6AddrBytes>,
|
ResponsePeer {
|
||||||
}
|
ip_address: peer.ip_address,
|
||||||
|
port: peer.port,
|
||||||
impl Default for TorrentMaps {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
ipv4: TorrentMap(Default::default()),
|
|
||||||
ipv6: TorrentMap(Default::default()),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TorrentMaps {
|
|
||||||
/// Remove forbidden or inactive torrents, reclaim space and update statistics
|
|
||||||
pub fn clean_and_update_statistics(
|
|
||||||
&mut self,
|
|
||||||
config: &Config,
|
|
||||||
state: &State,
|
|
||||||
statistics_sender: &Sender<StatisticsMessage>,
|
|
||||||
access_list: &Arc<AccessListArcSwap>,
|
|
||||||
server_start_instant: ServerStartInstant,
|
|
||||||
worker_index: SwarmWorkerIndex,
|
|
||||||
) {
|
|
||||||
let mut cache = create_access_list_cache(access_list);
|
|
||||||
let mode = config.access_list.mode;
|
|
||||||
let now = server_start_instant.seconds_elapsed();
|
|
||||||
|
|
||||||
let ipv4 =
|
|
||||||
self.ipv4
|
|
||||||
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
|
|
||||||
let ipv6 =
|
|
||||||
self.ipv6
|
|
||||||
.clean_and_get_statistics(config, statistics_sender, &mut cache, mode, now);
|
|
||||||
|
|
||||||
if config.statistics.active() {
|
|
||||||
state.statistics_ipv4.peers[worker_index.0].store(ipv4.0, Ordering::Release);
|
|
||||||
state.statistics_ipv6.peers[worker_index.0].store(ipv6.0, Ordering::Release);
|
|
||||||
|
|
||||||
if let Some(message) = ipv4.1.map(StatisticsMessage::Ipv4PeerHistogram) {
|
|
||||||
if let Err(err) = statistics_sender.try_send(message) {
|
|
||||||
::log::error!("couldn't send statistics message: {:#}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if let Some(message) = ipv6.1.map(StatisticsMessage::Ipv6PeerHistogram) {
|
|
||||||
if let Err(err) = statistics_sender.try_send(message) {
|
|
||||||
::log::error!("couldn't send statistics message: {:#}", err);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/// Extract response peers
|
/// Extract response peers
|
||||||
///
|
///
|
||||||
/// If there are more peers in map than `max_num_peers_to_take`, do a random
|
/// If there are more peers in map than `max_num_peers_to_take`, do a random
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue