implement separated mod for the Ban feature

This commit is contained in:
yggverse 2025-08-13 19:24:01 +03:00
parent 9cd28eaa3b
commit bf35e8e361
2 changed files with 64 additions and 41 deletions

45
src/ban.rs Normal file
View file

@ -0,0 +1,45 @@
use chrono::{DateTime, Local};
use librqbit::dht::Id20;
use std::{collections::HashMap, time::Duration};
pub struct Ban {
index: HashMap<Id20, DateTime<Local>>,
timeout: u64,
}
impl Ban {
pub fn init(timeout: u64, capacity: usize) -> Self {
Self {
index: HashMap::with_capacity(capacity),
timeout,
}
}
pub fn has(&self, key: &Id20) -> bool {
self.index.contains_key(key)
}
pub fn total(&self) -> usize {
self.index.len()
}
pub fn update(&mut self, time: DateTime<Local>) {
self.index.retain(|i, &mut expires| {
if time > expires {
log::debug!(
"remove ban for `{}` by the timeout expiration ({expires})",
i.as_string()
);
false
} else {
true
}
})
}
pub fn add(&mut self, key: Id20) -> DateTime<Local> {
let t = Local::now() + Duration::from_secs(self.index.len() as u64 * self.timeout);
assert!(self.index.insert(key, t).is_none());
t
}
}

View file

@ -1,19 +1,16 @@
mod api; mod api;
mod ban;
mod config; mod config;
mod preload; mod preload;
use anyhow::Result; use anyhow::Result;
use chrono::DateTime; use ban::Ban;
use config::Config; use config::Config;
use librqbit::{ use librqbit::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, SessionOptions, AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, SessionOptions,
}; };
use preload::Preload; use preload::Preload;
use std::{ use std::{collections::HashSet, num::NonZero, time::Duration};
collections::{HashMap, HashSet},
num::NonZero,
time::Duration,
};
use url::Url; use url::Url;
#[tokio::main] #[tokio::main]
@ -59,23 +56,12 @@ async fn main() -> Result<()> {
}, },
) )
.await?; .await?;
let mut ban: HashMap<librqbit::dht::Id20, DateTime<Local>> = let mut ban = Ban::init(config.timeout, config.index_capacity);
HashMap::with_capacity(config.index_capacity);
log::info!("crawler started at {time_init}"); log::info!("crawler started at {time_init}");
loop { loop {
let time_queue = Local::now(); let time_queue = Local::now();
log::debug!("queue crawl begin at {time_queue}..."); log::debug!("queue crawl begin at {time_queue}...");
ban.retain(|i, &mut expires| { ban.update(time_queue);
if time_queue > expires {
log::debug!(
"remove ban for `{}` by the timeout expiration ({expires})",
i.as_string()
);
false
} else {
true
}
});
for source in &config.infohash { for source in &config.infohash {
log::debug!("index source `{source}`..."); log::debug!("index source `{source}`...");
// grab latest info-hashes from this source // grab latest info-hashes from this source
@ -99,7 +85,7 @@ async fn main() -> Result<()> {
log::debug!("torrent `{h}` exists, skip."); log::debug!("torrent `{h}` exists, skip.");
continue; continue;
} }
if ban.contains_key(&i) { if ban.has(&i) {
log::debug!("torrent `{h}` is temporary banned, skip for this queue."); log::debug!("torrent `{h}` is temporary banned, skip for this queue.");
continue; continue;
} }
@ -194,15 +180,13 @@ async fn main() -> Result<()> {
) )
.await .await
{ {
let t = Local::now()
+ Duration::from_secs(ban.len() as u64 * config.timeout);
log::debug!(
"skip awaiting the completion of preload `{h}` data (`{e}`), ban until {t}."
);
assert!(ban.insert(i, t).is_none());
session session
.delete(librqbit::api::TorrentIdOrHash::Id(id), false) .delete(librqbit::api::TorrentIdOrHash::Id(id), false)
.await?; // * do not collect billions of slow torrents in the session pool .await?; // * do not collect billions of slow torrents in the session pool
log::debug!(
"skip awaiting the completion of preload `{h}` data (`{e}`), ban until {}.",
ban.add(i)
);
continue; continue;
} }
log::debug!("torrent `{h}` preload completed."); log::debug!("torrent `{h}` preload completed.");
@ -219,21 +203,15 @@ async fn main() -> Result<()> {
log::debug!("torrent `{h}` resolved.") log::debug!("torrent `{h}` resolved.")
} }
Ok(_) => panic!(), Ok(_) => panic!(),
Err(e) => { Err(e) => log::debug!(
let t = Local::now() "failed to resolve torrent `{h}`: `{e}`, ban until {}.",
+ Duration::from_secs(ban.len() as u64 * config.timeout); ban.add(i)
log::debug!("failed to resolve torrent `{h}`: `{e}`, ban until {t}."); ),
assert!(ban.insert(i, t).is_none())
}
}, },
Err(e) => { Err(e) => log::debug!(
let t = "skip awaiting the completion of adding torrent `{h}` data (`{e}`), ban until {}.",
Local::now() + Duration::from_secs(ban.len() as u64 * config.timeout); ban.add(i)
log::debug!( ),
"skip awaiting the completion of adding torrent `{h}` data (`{e}`), ban until {t}."
);
assert!(ban.insert(i, t).is_none())
}
} }
} }
} }
@ -245,7 +223,7 @@ async fn main() -> Result<()> {
Local::now() Local::now()
.signed_duration_since(time_init) .signed_duration_since(time_init)
.as_seconds_f32(), .as_seconds_f32(),
ban.len(), ban.total(),
config.sleep config.sleep
); );
std::thread::sleep(Duration::from_secs(config.sleep)) std::thread::sleep(Duration::from_secs(config.sleep))