Merge pull request #3 from YGGverse/restart-session

Restart session
This commit is contained in:
oooo-ps 2025-09-09 11:37:15 +03:00 committed by GitHub
commit e3e1dfd4c1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -5,7 +5,7 @@ mod preload;
use anyhow::Result;
use config::Config;
use librqbit::{
AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, SessionOptions,
AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, Session, SessionOptions,
};
use preload::Preload;
use std::{collections::HashSet, num::NonZero, time::Duration};
@ -36,37 +36,45 @@ async fn main() -> Result<()> {
config.preload_max_filecount,
config.preload_max_filesize,
)?;
let session = librqbit::Session::new_with_opts(
preload.root().clone(),
SessionOptions {
bind_device_name: config.bind,
blocklist_url: config.blocklist.map(|b| b.into()),
listen: None,
connect: Some(ConnectionOptions {
enable_tcp: !config.disable_tcp,
proxy_url: config.proxy_url.map(|u| u.to_string()),
..ConnectionOptions::default()
}),
disable_dht_persistence: true,
disable_dht: !config.enable_dht,
disable_local_service_discovery: !config.enable_lsd,
disable_upload: true,
persistence: None,
ratelimits: librqbit::limits::LimitsConfig {
download_bps: config.download_limit.and_then(NonZero::new),
..librqbit::limits::LimitsConfig::default()
},
trackers: config.tracker.iter().cloned().collect(),
..SessionOptions::default()
},
)
.await?;
let mut ban = HashSet::with_capacity(config.index_capacity);
log::info!("crawler started at {time_init}");
loop {
let time_queue = Local::now();
log::debug!("queue crawl begin at {time_queue}...");
// Please, note:
// * it's important to start new `Session` inside the crawler loop:
// https://github.com/ikatson/rqbit/issues/481
// * when fix and after starting it once (outside the loop),
// remove also each torrent after resolve it with `session.delete`, to prevent impl panic (see `single-session` branch)
let session = Session::new_with_opts(
preload.root().clone(),
SessionOptions {
bind_device_name: config.bind.clone(),
blocklist_url: config.blocklist.as_ref().map(|b| b.to_string()),
listen: None,
connect: Some(ConnectionOptions {
enable_tcp: !config.disable_tcp,
proxy_url: config.proxy_url.as_ref().map(|u| u.to_string()),
..ConnectionOptions::default()
}),
disable_dht_persistence: true,
disable_dht: !config.enable_dht,
disable_local_service_discovery: !config.enable_lsd,
disable_upload: true,
fastresume: false,
persistence: None,
ratelimits: librqbit::limits::LimitsConfig {
download_bps: config.download_limit.and_then(NonZero::new),
..librqbit::limits::LimitsConfig::default()
},
trackers: config.tracker.iter().cloned().collect(),
..SessionOptions::default()
},
)
.await?;
// build unique ID index from the multiple info-hash sources
let mut queue = HashSet::with_capacity(config.index_capacity);
for source in &config.infohash {
@ -150,7 +158,7 @@ async fn main() -> Result<()> {
.await
{
Ok(r) => match r {
Ok(AddTorrentResponse::Added(id, mt)) => {
Ok(AddTorrentResponse::Added(_, mt)) => {
assert!(mt.is_paused());
let mut keep_files = HashSet::with_capacity(
config.preload_max_filecount.unwrap_or_default(),
@ -209,9 +217,6 @@ async fn main() -> Result<()> {
"skip awaiting the completion of preload torrent data for `{h}` (`{e}`), ban for the next queue.",
);
assert!(ban.insert(i));
session
.delete(librqbit::api::TorrentIdOrHash::Id(id), false)
.await?; // * do not collect billions of slow torrents in the session pool
continue;
}
log::debug!("torrent `{h}` preload completed.");
@ -219,9 +224,6 @@ async fn main() -> Result<()> {
// cleanup tmp (see rqbit#408)
log::debug!("persist torrent `{h}` with `{}` files...", keep_files.len());
preload.commit(&h, bytes, Some(keep_files))?;
session
.delete(librqbit::api::TorrentIdOrHash::Id(id), false)
.await?; // torrent data was moved on commit; there's no sense in keeping it
log::info!("torrent `{h}` resolved.")
}
Ok(_) => panic!(),
@ -240,6 +242,9 @@ async fn main() -> Result<()> {
}
}
}
session.stop().await;
log::info!(
"queue completed at {time_queue} (time: {} / uptime: {} / banned: {}) await {} seconds to continue...",
Local::now()