diff --git a/src/main.rs b/src/main.rs index 965cf58..08aa225 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ mod preload; use anyhow::Result; use config::Config; use librqbit::{ - AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, SessionOptions, + AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, Session, SessionOptions, }; use preload::Preload; use std::{collections::HashSet, num::NonZero, time::Duration}; @@ -36,37 +36,45 @@ async fn main() -> Result<()> { config.preload_max_filecount, config.preload_max_filesize, )?; - let session = librqbit::Session::new_with_opts( - preload.root().clone(), - SessionOptions { - bind_device_name: config.bind, - blocklist_url: config.blocklist.map(|b| b.into()), - listen: None, - connect: Some(ConnectionOptions { - enable_tcp: !config.disable_tcp, - proxy_url: config.proxy_url.map(|u| u.to_string()), - ..ConnectionOptions::default() - }), - disable_dht_persistence: true, - disable_dht: !config.enable_dht, - disable_local_service_discovery: !config.enable_lsd, - disable_upload: true, - persistence: None, - ratelimits: librqbit::limits::LimitsConfig { - download_bps: config.download_limit.and_then(NonZero::new), - ..librqbit::limits::LimitsConfig::default() - }, - trackers: config.tracker.iter().cloned().collect(), - ..SessionOptions::default() - }, - ) - .await?; + let mut ban = HashSet::with_capacity(config.index_capacity); log::info!("crawler started at {time_init}"); loop { let time_queue = Local::now(); log::debug!("queue crawl begin at {time_queue}..."); + // Please, note: + // * it's important to start new `Session` inside the crawler loop: + // https://github.com/ikatson/rqbit/issues/481 + // * when fix and after starting it once (outside the loop), + // remove also each torrent after resolve it with `session.delete`, to prevent impl panic (see `single-session` branch) + let session = Session::new_with_opts( + preload.root().clone(), + SessionOptions { + bind_device_name: config.bind.clone(), + blocklist_url: config.blocklist.as_ref().map(|b| b.to_string()), + listen: None, + connect: Some(ConnectionOptions { + enable_tcp: !config.disable_tcp, + proxy_url: config.proxy_url.as_ref().map(|u| u.to_string()), + ..ConnectionOptions::default() + }), + disable_dht_persistence: true, + disable_dht: !config.enable_dht, + disable_local_service_discovery: !config.enable_lsd, + disable_upload: true, + fastresume: false, + persistence: None, + ratelimits: librqbit::limits::LimitsConfig { + download_bps: config.download_limit.and_then(NonZero::new), + ..librqbit::limits::LimitsConfig::default() + }, + trackers: config.tracker.iter().cloned().collect(), + ..SessionOptions::default() + }, + ) + .await?; + // build unique ID index from the multiple info-hash sources let mut queue = HashSet::with_capacity(config.index_capacity); for source in &config.infohash { @@ -150,7 +158,7 @@ async fn main() -> Result<()> { .await { Ok(r) => match r { - Ok(AddTorrentResponse::Added(id, mt)) => { + Ok(AddTorrentResponse::Added(_, mt)) => { assert!(mt.is_paused()); let mut keep_files = HashSet::with_capacity( config.preload_max_filecount.unwrap_or_default(), @@ -209,9 +217,6 @@ async fn main() -> Result<()> { "skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban for the next queue.", ); assert!(ban.insert(i)); - session - .delete(librqbit::api::TorrentIdOrHash::Id(id), false) - .await?; // * do not collect billions of slow torrents in the session pool continue; } log::debug!("torrent `{h}` preload completed."); @@ -219,9 +224,6 @@ async fn main() -> Result<()> { // cleanup tmp (see rqbit#408) log::debug!("persist torrent `{h}` with `{}` files...", keep_files.len()); preload.commit(&h, bytes, Some(keep_files))?; - session - .delete(librqbit::api::TorrentIdOrHash::Id(id), false) - .await?; // torrent data was moved on commit; there's no sense in keeping it log::info!("torrent `{h}` resolved.") } Ok(_) => panic!(), @@ -240,6 +242,9 @@ async fn main() -> Result<()> { } } } + + session.stop().await; + log::info!( "queue completed at {time_queue} (time: {} / uptime: {} / banned: {}) await {} seconds to continue...", Local::now()