optimize ban logic: remove timeouts, ban for next queue only, then retry

This commit is contained in:
yggverse 2025-08-18 20:45:15 +03:00
parent 8f0b6790cd
commit 3643d456d0
2 changed files with 19 additions and 108 deletions

View file

@ -1,70 +0,0 @@
use chrono::{DateTime, Local};
use librqbit::dht::Id20;
use std::{collections::HashMap, time::Duration};
pub struct Item {
pub expires: DateTime<Local>,
pub info_hash: String,
}
pub struct Ban {
index: HashMap<Id20, Option<DateTime<Local>>>,
timeout: Duration,
}
impl Ban {
pub fn init(timeout: u64, capacity: usize) -> Self {
Self {
index: HashMap::with_capacity(capacity),
timeout: Duration::from_secs(timeout),
}
}
pub fn get(&self, key: &Id20) -> Option<&Option<DateTime<Local>>> {
self.index.get(key)
}
pub fn total(&self) -> usize {
self.index.len()
}
/// * return removed `Item` details
pub fn update(&mut self, time: DateTime<Local>) -> Vec<Item> {
let mut b = Vec::with_capacity(self.index.len());
self.index.retain(|i, &mut e| {
if let Some(expires) = e
&& time > expires
{
b.push(Item {
expires,
info_hash: i.as_string(),
});
false
} else {
true
}
});
b
}
/// Add torrent to the ban list
///
/// If the `is_permanent` option is `true` - ban permanently,
/// or **automatically** count optimal ban time based on the current index max time and timeout offset value.
///
/// * return expiration time or `None` if the ban is permanent
pub fn add(&mut self, key: Id20, is_permanent: bool) -> Option<DateTime<Local>> {
let t = if is_permanent {
None
} else {
Some(
self.index
.values()
.max()
.map_or(Local::now(), |t| t.unwrap_or(Local::now()) + self.timeout),
)
};
assert!(self.index.insert(key, t).is_none());
t
}
}

View file

@ -1,10 +1,8 @@
mod api;
mod ban;
mod config;
mod preload;
use anyhow::Result;
use ban::Ban;
use config::Config;
use librqbit::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, SessionOptions,
@ -63,18 +61,11 @@ async fn main() -> Result<()> {
},
)
.await?;
let mut ban = Ban::init(config.timeout, config.index_capacity);
let mut ban = HashSet::with_capacity(config.index_capacity);
log::info!("crawler started at {time_init}");
loop {
let time_queue = Local::now();
log::debug!("queue crawl begin at {time_queue}...");
for r in ban.update(time_queue) {
log::debug!(
"remove ban for `{}` as expired on {}",
r.info_hash,
r.expires
)
}
for source in &config.infohash {
log::debug!("index source `{source}`...");
// grab latest info-hashes from this source
@ -98,14 +89,9 @@ async fn main() -> Result<()> {
log::debug!("torrent `{h}` exists, skip.");
continue;
}
if let Some(t) = ban.get(&i) {
log::debug!(
"torrent `{h}` banned {}, skip for this queue.",
match t {
Some(v) => format!("until {v}"),
None => "permanently".into(),
}
);
// skip banned entry, remove it from the ban list to retry on the next iteration
if ban.remove(&i) {
log::debug!("torrent `{h}` is banned, skip for this queue.");
continue;
}
log::info!("resolve `{h}`...");
@ -197,12 +183,9 @@ async fn main() -> Result<()> {
.await
{
log::info!(
"skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban {}.",
match ban.add(i, false) {
Some(t) => format!("until {t}"),
None => "permanently".into(), // @TODO feature, do not unwrap
}
"skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban for the next queue.",
);
assert!(ban.insert(i));
session
.delete(librqbit::api::TorrentIdOrHash::Id(id), false)
.await?; // * do not collect billions of slow torrents in the session pool
@ -222,21 +205,19 @@ async fn main() -> Result<()> {
log::info!("torrent `{h}` resolved.")
}
Ok(_) => panic!(),
Err(e) => log::warn!(
"failed to resolve torrent `{h}`: `{e}`, ban {}.",
match ban.add(i, false) {
Some(t) => format!("until {t}"),
None => "permanently".into(), // @TODO feature, do not unwrap
}
),
},
Err(e) => log::info!(
"skip awaiting the completion of adding torrent `{h}` (`{e}`), ban {}.",
match ban.add(i, false) {
Some(t) => format!("until {t}"),
None => "permanently".into(), // @TODO feature, do not unwrap
Err(e) => {
log::warn!(
"failed to resolve torrent `{h}`: `{e}`, ban for the next queue."
);
assert!(ban.insert(i))
}
),
},
Err(e) => {
log::info!(
"skip awaiting the completion of adding torrent `{h}` (`{e}`), ban for the next queue."
);
assert!(ban.insert(i))
}
}
}
}
@ -248,7 +229,7 @@ async fn main() -> Result<()> {
Local::now()
.signed_duration_since(time_init)
.as_seconds_f32(),
ban.total(),
ban.len(),
config.sleep
);
std::thread::sleep(Duration::from_secs(config.sleep))