diff --git a/src/ban.rs b/src/ban.rs deleted file mode 100644 index c455ccd..0000000 --- a/src/ban.rs +++ /dev/null @@ -1,70 +0,0 @@ -use chrono::{DateTime, Local}; -use librqbit::dht::Id20; -use std::{collections::HashMap, time::Duration}; - -pub struct Item { - pub expires: DateTime, - pub info_hash: String, -} - -pub struct Ban { - index: HashMap>>, - timeout: Duration, -} - -impl Ban { - pub fn init(timeout: u64, capacity: usize) -> Self { - Self { - index: HashMap::with_capacity(capacity), - timeout: Duration::from_secs(timeout), - } - } - - pub fn get(&self, key: &Id20) -> Option<&Option>> { - self.index.get(key) - } - - pub fn total(&self) -> usize { - self.index.len() - } - - /// * return removed `Item` details - pub fn update(&mut self, time: DateTime) -> Vec { - let mut b = Vec::with_capacity(self.index.len()); - self.index.retain(|i, &mut e| { - if let Some(expires) = e - && time > expires - { - b.push(Item { - expires, - info_hash: i.as_string(), - }); - false - } else { - true - } - }); - b - } - - /// Add torrent to the ban list - /// - /// If the `is_permanent` option is `true` - ban permanently, - /// or **automatically** count optimal ban time based on the current index max time and timeout offset value. - /// - /// * return expiration time or `None` if the ban is permanent - pub fn add(&mut self, key: Id20, is_permanent: bool) -> Option> { - let t = if is_permanent { - None - } else { - Some( - self.index - .values() - .max() - .map_or(Local::now(), |t| t.unwrap_or(Local::now()) + self.timeout), - ) - }; - assert!(self.index.insert(key, t).is_none()); - t - } -} diff --git a/src/main.rs b/src/main.rs index 68439c5..01c03a4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,10 +1,8 @@ mod api; -mod ban; mod config; mod preload; use anyhow::Result; -use ban::Ban; use config::Config; use librqbit::{ AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, SessionOptions, @@ -63,18 +61,11 @@ async fn main() -> Result<()> { }, ) .await?; - let mut ban = Ban::init(config.timeout, config.index_capacity); + let mut ban = HashSet::with_capacity(config.index_capacity); log::info!("crawler started at {time_init}"); loop { let time_queue = Local::now(); log::debug!("queue crawl begin at {time_queue}..."); - for r in ban.update(time_queue) { - log::debug!( - "remove ban for `{}` as expired on {}", - r.info_hash, - r.expires - ) - } for source in &config.infohash { log::debug!("index source `{source}`..."); // grab latest info-hashes from this source @@ -98,14 +89,9 @@ async fn main() -> Result<()> { log::debug!("torrent `{h}` exists, skip."); continue; } - if let Some(t) = ban.get(&i) { - log::debug!( - "torrent `{h}` banned {}, skip for this queue.", - match t { - Some(v) => format!("until {v}"), - None => "permanently".into(), - } - ); + // skip banned entry, remove it from the ban list to retry on the next iteration + if ban.remove(&i) { + log::debug!("torrent `{h}` is banned, skip for this queue."); continue; } log::info!("resolve `{h}`..."); @@ -197,12 +183,9 @@ async fn main() -> Result<()> { .await { log::info!( - "skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban {}.", - match ban.add(i, false) { - Some(t) => format!("until {t}"), - None => "permanently".into(), // @TODO feature, do not unwrap - } + "skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban for the next queue.", ); + assert!(ban.insert(i)); session .delete(librqbit::api::TorrentIdOrHash::Id(id), false) .await?; // * do not collect billions of slow torrents in the session pool @@ -222,21 +205,19 @@ async fn main() -> Result<()> { log::info!("torrent `{h}` resolved.") } Ok(_) => panic!(), - Err(e) => log::warn!( - "failed to resolve torrent `{h}`: `{e}`, ban {}.", - match ban.add(i, false) { - Some(t) => format!("until {t}"), - None => "permanently".into(), // @TODO feature, do not unwrap - } - ), - }, - Err(e) => log::info!( - "skip awaiting the completion of adding torrent `{h}` (`{e}`), ban {}.", - match ban.add(i, false) { - Some(t) => format!("until {t}"), - None => "permanently".into(), // @TODO feature, do not unwrap + Err(e) => { + log::warn!( + "failed to resolve torrent `{h}`: `{e}`, ban for the next queue." + ); + assert!(ban.insert(i)) } - ), + }, + Err(e) => { + log::info!( + "skip awaiting the completion of adding torrent `{h}` (`{e}`), ban for the next queue." + ); + assert!(ban.insert(i)) + } } } } @@ -248,7 +229,7 @@ async fn main() -> Result<()> { Local::now() .signed_duration_since(time_init) .as_seconds_f32(), - ban.total(), + ban.len(), config.sleep ); std::thread::sleep(Duration::from_secs(config.sleep))