diff --git a/src/ban.rs b/src/ban.rs new file mode 100644 index 0000000..937e471 --- /dev/null +++ b/src/ban.rs @@ -0,0 +1,45 @@ +use chrono::{DateTime, Local}; +use librqbit::dht::Id20; +use std::{collections::HashMap, time::Duration}; + +pub struct Ban { + index: HashMap>, + timeout: u64, +} + +impl Ban { + pub fn init(timeout: u64, capacity: usize) -> Self { + Self { + index: HashMap::with_capacity(capacity), + timeout, + } + } + + pub fn has(&self, key: &Id20) -> bool { + self.index.contains_key(key) + } + + pub fn total(&self) -> usize { + self.index.len() + } + + pub fn update(&mut self, time: DateTime) { + self.index.retain(|i, &mut expires| { + if time > expires { + log::debug!( + "remove ban for `{}` by the timeout expiration ({expires})", + i.as_string() + ); + false + } else { + true + } + }) + } + + pub fn add(&mut self, key: Id20) -> DateTime { + let t = Local::now() + Duration::from_secs(self.index.len() as u64 * self.timeout); + assert!(self.index.insert(key, t).is_none()); + t + } +} diff --git a/src/main.rs b/src/main.rs index c419e72..556ad7f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,19 +1,16 @@ mod api; +mod ban; mod config; mod preload; use anyhow::Result; -use chrono::DateTime; +use ban::Ban; use config::Config; use librqbit::{ AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, SessionOptions, }; use preload::Preload; -use std::{ - collections::{HashMap, HashSet}, - num::NonZero, - time::Duration, -}; +use std::{collections::HashSet, num::NonZero, time::Duration}; use url::Url; #[tokio::main] @@ -59,23 +56,12 @@ async fn main() -> Result<()> { }, ) .await?; - let mut ban: HashMap> = - HashMap::with_capacity(config.index_capacity); + let mut ban = Ban::init(config.timeout, config.index_capacity); log::info!("crawler started at {time_init}"); loop { let time_queue = Local::now(); log::debug!("queue crawl begin at {time_queue}..."); - ban.retain(|i, &mut expires| { - if time_queue > expires { - log::debug!( - "remove ban for `{}` by the timeout expiration ({expires})", - i.as_string() - ); - false - } else { - true - } - }); + ban.update(time_queue); for source in &config.infohash { log::debug!("index source `{source}`..."); // grab latest info-hashes from this source @@ -99,7 +85,7 @@ async fn main() -> Result<()> { log::debug!("torrent `{h}` exists, skip."); continue; } - if ban.contains_key(&i) { + if ban.has(&i) { log::debug!("torrent `{h}` is temporary banned, skip for this queue."); continue; } @@ -194,15 +180,13 @@ async fn main() -> Result<()> { ) .await { - let t = Local::now() - + Duration::from_secs(ban.len() as u64 * config.timeout); - log::debug!( - "skip awaiting the completion of preload `{h}` data (`{e}`), ban until {t}." - ); - assert!(ban.insert(i, t).is_none()); session .delete(librqbit::api::TorrentIdOrHash::Id(id), false) .await?; // * do not collect billions of slow torrents in the session pool + log::debug!( + "skip awaiting the completion of preload `{h}` data (`{e}`), ban until {}.", + ban.add(i) + ); continue; } log::debug!("torrent `{h}` preload completed."); @@ -219,21 +203,15 @@ async fn main() -> Result<()> { log::debug!("torrent `{h}` resolved.") } Ok(_) => panic!(), - Err(e) => { - let t = Local::now() - + Duration::from_secs(ban.len() as u64 * config.timeout); - log::debug!("failed to resolve torrent `{h}`: `{e}`, ban until {t}."); - assert!(ban.insert(i, t).is_none()) - } + Err(e) => log::debug!( + "failed to resolve torrent `{h}`: `{e}`, ban until {}.", + ban.add(i) + ), }, - Err(e) => { - let t = - Local::now() + Duration::from_secs(ban.len() as u64 * config.timeout); - log::debug!( - "skip awaiting the completion of adding torrent `{h}` data (`{e}`), ban until {t}." - ); - assert!(ban.insert(i, t).is_none()) - } + Err(e) => log::debug!( + "skip awaiting the completion of adding torrent `{h}` data (`{e}`), ban until {}.", + ban.add(i) + ), } } } @@ -245,7 +223,7 @@ async fn main() -> Result<()> { Local::now() .signed_duration_since(time_init) .as_seconds_f32(), - ban.len(), + ban.total(), config.sleep ); std::thread::sleep(Duration::from_secs(config.sleep))