mirror of
https://github.com/YGGverse/aquatic-crawler.git
synced 2026-03-31 17:15:35 +00:00
remove extra features
This commit is contained in:
parent
1395257882
commit
8cbae5019d
12 changed files with 153 additions and 972 deletions
351
src/main.rs
351
src/main.rs
|
|
@ -1,25 +1,15 @@
|
|||
mod api;
|
||||
mod config;
|
||||
mod format;
|
||||
mod index;
|
||||
mod peers;
|
||||
mod preload;
|
||||
mod rss;
|
||||
mod torrent;
|
||||
mod trackers;
|
||||
|
||||
use anyhow::Result;
|
||||
use config::Config;
|
||||
use index::Index;
|
||||
use librqbit::{
|
||||
AddTorrent, AddTorrentOptions, AddTorrentResponse, ByteBufOwned, ConnectionOptions,
|
||||
ListenerOptions, PeerConnectionOptions, SessionOptions, ValidatedTorrentMetaV1Info,
|
||||
AddTorrent, AddTorrentOptions, AddTorrentResponse, ConnectionOptions, ListenerOptions,
|
||||
PeerConnectionOptions, SessionOptions,
|
||||
};
|
||||
use peers::Peers;
|
||||
use rss::Rss;
|
||||
use std::{collections::HashSet, num::NonZero, path::PathBuf, str::FromStr, time::Duration};
|
||||
use torrent::Torrent;
|
||||
use trackers::Trackers;
|
||||
use preload::Preload;
|
||||
use std::{collections::HashSet, num::NonZero, str::FromStr, time::Duration};
|
||||
use url::Url;
|
||||
|
||||
#[tokio::main]
|
||||
|
|
@ -27,29 +17,21 @@ async fn main() -> Result<()> {
|
|||
use chrono::Local;
|
||||
use clap::Parser;
|
||||
use tokio::time;
|
||||
|
||||
// init debug
|
||||
if std::env::var("RUST_LOG").is_ok() {
|
||||
tracing_subscriber::fmt::init()
|
||||
} // librqbit
|
||||
// init components
|
||||
let time_init = Local::now();
|
||||
let config = Config::parse();
|
||||
if std::env::var("RUST_LOG").is_ok() {
|
||||
tracing_subscriber::fmt::init()
|
||||
}
|
||||
let peers = Peers::init(&config.initial_peer)?;
|
||||
let preload = preload::init(
|
||||
let preload = Preload::init(
|
||||
config.preload,
|
||||
config.preload_regex,
|
||||
config.preload_max_filecount,
|
||||
config.preload_max_filesize,
|
||||
config.preload_total_size,
|
||||
config.preload_clear,
|
||||
)?;
|
||||
let trackers = Trackers::init(&config.tracker)?;
|
||||
let torrent = config.export_torrents.map(|p| Torrent::init(&p).unwrap());
|
||||
let session = librqbit::Session::new_with_opts(
|
||||
match preload {
|
||||
Some(ref p) => p.path(),
|
||||
None => PathBuf::new(),
|
||||
},
|
||||
preload.root().clone(),
|
||||
SessionOptions {
|
||||
bind_device_name: config.bind,
|
||||
listen: match config.listen {
|
||||
|
|
@ -66,8 +48,8 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
},
|
||||
connect: Some(ConnectionOptions {
|
||||
enable_tcp: config.enable_tcp,
|
||||
proxy_url: config.proxy_url,
|
||||
enable_tcp: !config.disable_tcp,
|
||||
proxy_url: config.proxy_url.map(|u| u.to_string()),
|
||||
peer_opts: Some(PeerConnectionOptions {
|
||||
connect_timeout: config.peer_connect_timeout.map(Duration::from_secs),
|
||||
read_write_timeout: config.peer_read_write_timeout.map(Duration::from_secs),
|
||||
|
|
@ -82,54 +64,35 @@ async fn main() -> Result<()> {
|
|||
upload_bps: config.upload_limit.and_then(NonZero::new),
|
||||
download_bps: config.download_limit.and_then(NonZero::new),
|
||||
},
|
||||
trackers: trackers.list().clone(),
|
||||
trackers: config.tracker.iter().cloned().collect(),
|
||||
..SessionOptions::default()
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
// begin
|
||||
println!("Crawler started on {time_init}");
|
||||
let mut index = Index::init(
|
||||
config.index_capacity,
|
||||
config.index_timeout,
|
||||
config.export_rss.is_some(),
|
||||
config.export_rss.is_some(),
|
||||
config.export_rss.is_some() && config.index_list,
|
||||
);
|
||||
log::info!("Crawler started on {time_init}");
|
||||
loop {
|
||||
let time_queue = Local::now();
|
||||
if config.debug {
|
||||
println!("\tQueue crawl begin on {time_queue}...")
|
||||
}
|
||||
index.refresh();
|
||||
log::debug!("Queue crawl begin on {time_queue}...");
|
||||
for source in &config.infohash {
|
||||
if config.debug {
|
||||
println!("\tIndex source `{source}`...")
|
||||
}
|
||||
log::debug!("Index source `{source}`...");
|
||||
// grab latest info-hashes from this source
|
||||
// * aquatic server may update the stats at this moment, handle result manually
|
||||
for i in match api::get(source, config.index_capacity) {
|
||||
Some(i) => i,
|
||||
None => {
|
||||
// skip without panic
|
||||
if config.debug {
|
||||
eprintln!(
|
||||
"The feed `{source}` has an incomplete format (or is still updating); skip."
|
||||
)
|
||||
}
|
||||
log::warn!(
|
||||
"The feed `{source}` has an incomplete format (or is still updating); skip."
|
||||
);
|
||||
continue;
|
||||
}
|
||||
} {
|
||||
// convert to string once
|
||||
let i = i.to_string();
|
||||
// is already indexed?
|
||||
if index.has(&i) {
|
||||
if preload.contains_torrent(&i)? {
|
||||
continue;
|
||||
}
|
||||
if config.debug {
|
||||
println!("\t\tIndex `{i}`...")
|
||||
}
|
||||
log::debug!("Index `{i}`...");
|
||||
// run the crawler in single thread for performance reasons,
|
||||
// use `timeout` argument option to skip the dead connections.
|
||||
match time::timeout(
|
||||
|
|
@ -137,18 +100,18 @@ async fn main() -> Result<()> {
|
|||
session.add_torrent(
|
||||
AddTorrent::from_url(magnet(
|
||||
&i,
|
||||
if config.export_trackers && !trackers.is_empty() {
|
||||
Some(trackers.list())
|
||||
} else {
|
||||
if config.tracker.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(config.tracker.as_ref())
|
||||
},
|
||||
)),
|
||||
Some(AddTorrentOptions {
|
||||
paused: true, // continue after `only_files` init
|
||||
overwrite: true,
|
||||
disable_trackers: trackers.is_empty(),
|
||||
initial_peers: peers.initial_peers(),
|
||||
list_only: preload.as_ref().is_none_or(|p| p.regex.is_none()),
|
||||
disable_trackers: config.tracker.is_empty(),
|
||||
initial_peers: config.initial_peer.clone(),
|
||||
list_only: false,
|
||||
// it is important to blacklist all files preload until initiation
|
||||
only_files: Some(Vec::with_capacity(
|
||||
config.preload_max_filecount.unwrap_or_default(),
|
||||
|
|
@ -156,8 +119,9 @@ async fn main() -> Result<()> {
|
|||
// the destination folder to preload files match `only_files_regex`
|
||||
// * e.g. images for audio albums
|
||||
output_folder: preload
|
||||
.as_ref()
|
||||
.map(|p| p.output_folder(&i, true).unwrap()),
|
||||
.output_folder(&i)?
|
||||
.to_str()
|
||||
.map(|s| s.to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
),
|
||||
|
|
@ -167,183 +131,79 @@ async fn main() -> Result<()> {
|
|||
Ok(r) => match r {
|
||||
// on `preload_regex` case only
|
||||
Ok(AddTorrentResponse::Added(id, mt)) => {
|
||||
let mut only_files_size = 0;
|
||||
let mut only_files_keep = Vec::with_capacity(
|
||||
let mut keep_files = HashSet::with_capacity(
|
||||
config.preload_max_filecount.unwrap_or_default(),
|
||||
);
|
||||
let mut only_files = HashSet::with_capacity(
|
||||
config.preload_max_filecount.unwrap_or_default(),
|
||||
);
|
||||
mt.wait_until_initialized().await?;
|
||||
let (name, size, list) = mt.with_metadata(|m| {
|
||||
// init preload files list
|
||||
if let Some(ref p) = preload {
|
||||
for (id, info) in m.file_infos.iter().enumerate() {
|
||||
if p.matches(info.relative_filename.to_str().unwrap()) {
|
||||
if p.max_filesize.is_some_and(|limit| {
|
||||
only_files_size + info.len > limit
|
||||
}) {
|
||||
if config.debug {
|
||||
println!(
|
||||
"\t\t\ttotal files size limit `{i}` reached!"
|
||||
)
|
||||
}
|
||||
break;
|
||||
}
|
||||
if p.max_filecount
|
||||
.is_some_and(|limit| only_files.len() + 1 > limit)
|
||||
{
|
||||
if config.debug {
|
||||
println!(
|
||||
"\t\t\ttotal files count limit for `{i}` reached!"
|
||||
)
|
||||
}
|
||||
break;
|
||||
}
|
||||
only_files_size += info.len;
|
||||
if let Some(ref p) = preload {
|
||||
only_files_keep
|
||||
.push(p.absolute(&i, &info.relative_filename))
|
||||
}
|
||||
only_files.insert(id);
|
||||
}
|
||||
let bytes = mt.with_metadata(|m| {
|
||||
for (id, info) in m.file_infos.iter().enumerate() {
|
||||
if preload
|
||||
.max_filecount
|
||||
.is_some_and(|limit| only_files.len() + 1 > limit)
|
||||
{
|
||||
log::debug!(
|
||||
"file count limit reached, skip `{id}` for `{i}`"
|
||||
);
|
||||
break;
|
||||
}
|
||||
if preload.max_filesize.is_some_and(|limit| info.len > limit) {
|
||||
log::debug!(
|
||||
"file size limit reached, skip `{id}` for `{i}`"
|
||||
);
|
||||
continue;
|
||||
}
|
||||
if preload.regex.as_ref().is_some_and(|r| {
|
||||
!r.is_match(&info.relative_filename.to_string_lossy())
|
||||
}) {
|
||||
log::debug!("regex filter, skip `{id}` for `{i}`");
|
||||
continue;
|
||||
}
|
||||
assert!(keep_files.insert(info.relative_filename.clone()));
|
||||
assert!(only_files.insert(id));
|
||||
}
|
||||
if let Some(ref t) = torrent {
|
||||
save_torrent_file(t, &i, &m.torrent_bytes, config.debug)
|
||||
}
|
||||
|
||||
(
|
||||
m.info.name().as_ref().map(|n| n.to_string()),
|
||||
size(&m.info),
|
||||
list(&m.info, config.index_list_limit),
|
||||
)
|
||||
m.torrent_bytes.to_vec()
|
||||
})?;
|
||||
session.update_only_files(&mt, &only_files).await?;
|
||||
session.unpause(&mt).await?;
|
||||
// await for `preload_regex` files download to continue
|
||||
mt.wait_until_completed().await?;
|
||||
// cleanup irrelevant files (see rqbit#408)
|
||||
preload.cleanup(&i, Some(keep_files))?;
|
||||
preload.persist_torrent_bytes(&i, &bytes)?;
|
||||
// remove torrent from session as indexed
|
||||
session
|
||||
.delete(librqbit::api::TorrentIdOrHash::Id(id), false)
|
||||
.await?;
|
||||
// cleanup irrelevant files (see rqbit#408)
|
||||
if let Some(p) = &preload {
|
||||
p.cleanup(&i, Some(only_files_keep))?
|
||||
}
|
||||
|
||||
if config.debug {
|
||||
println!("\t\t\tadd `{i}` to index.")
|
||||
}
|
||||
|
||||
index.insert(
|
||||
i,
|
||||
only_files_size,
|
||||
size,
|
||||
list,
|
||||
name.map(|n| n.to_string()),
|
||||
)
|
||||
log::debug!("torrent `{i}` indexed.")
|
||||
}
|
||||
Ok(AddTorrentResponse::ListOnly(r)) => {
|
||||
if let Some(ref t) = torrent {
|
||||
save_torrent_file(t, &i, &r.torrent_bytes, config.debug)
|
||||
}
|
||||
|
||||
// @TODO
|
||||
// use `r.info` for Memory, SQLite,
|
||||
// Manticore and other alternative storage type
|
||||
|
||||
if config.debug {
|
||||
println!("\t\t\tadd `{i}` to index.")
|
||||
}
|
||||
|
||||
index.insert(
|
||||
i,
|
||||
0,
|
||||
size(&r.info),
|
||||
list(&r.info, config.index_list_limit),
|
||||
r.info.name().map(|n| n.to_string()),
|
||||
)
|
||||
}
|
||||
// unexpected as should be deleted
|
||||
Ok(AddTorrentResponse::AlreadyManaged(..)) => panic!(),
|
||||
Err(e) => eprintln!("Failed to resolve `{i}`: `{e}`."),
|
||||
Ok(_) => panic!(),
|
||||
Err(e) => log::debug!("Failed to resolve `{i}`: `{e}`."),
|
||||
},
|
||||
Err(e) => {
|
||||
if config.debug {
|
||||
println!("\t\t\tfailed to resolve `{i}`: `{e}`")
|
||||
}
|
||||
}
|
||||
Err(e) => log::debug!("failed to resolve `{i}`: `{e}`"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(ref export_rss) = config.export_rss
|
||||
&& index.is_changed()
|
||||
{
|
||||
let mut rss = Rss::new(
|
||||
export_rss,
|
||||
&config.export_rss_title,
|
||||
&config.export_rss_link,
|
||||
&config.export_rss_description,
|
||||
if config.export_trackers && !trackers.is_empty() {
|
||||
Some(trackers.list().clone())
|
||||
} else {
|
||||
None
|
||||
},
|
||||
)?;
|
||||
for (k, v) in index.list() {
|
||||
rss.push(
|
||||
k,
|
||||
v.name().unwrap_or(k),
|
||||
rss::item_description(v.size(), v.list()),
|
||||
Some(&v.time.to_rfc2822()),
|
||||
)?
|
||||
}
|
||||
rss.commit()?
|
||||
}
|
||||
if preload
|
||||
.as_ref()
|
||||
.is_some_and(|p| p.total_size.is_some_and(|s| index.nodes() > s))
|
||||
{
|
||||
panic!("Preload content size {} bytes reached!", 0)
|
||||
}
|
||||
if config.debug {
|
||||
println!(
|
||||
"Queue completed on {time_queue}\n\ttotal: {}\n\ttime: {} s\n\tuptime: {} s\n\tawait {} seconds to continue...",
|
||||
index.len(),
|
||||
Local::now()
|
||||
.signed_duration_since(time_queue)
|
||||
.as_seconds_f32(),
|
||||
Local::now()
|
||||
.signed_duration_since(time_init)
|
||||
.as_seconds_f32(),
|
||||
config.sleep,
|
||||
)
|
||||
}
|
||||
log::debug!(
|
||||
"Queue completed at {time_queue} (time: {} / uptime: {}) await {} seconds to continue...",
|
||||
Local::now()
|
||||
.signed_duration_since(time_queue)
|
||||
.as_seconds_f32(),
|
||||
Local::now()
|
||||
.signed_duration_since(time_init)
|
||||
.as_seconds_f32(),
|
||||
config.sleep,
|
||||
);
|
||||
std::thread::sleep(Duration::from_secs(config.sleep))
|
||||
}
|
||||
}
|
||||
|
||||
/// Shared handler function to save resolved torrents as file
|
||||
fn save_torrent_file(t: &Torrent, i: &str, b: &[u8], d: bool) {
|
||||
match t.persist(i, b) {
|
||||
Ok(r) => {
|
||||
if d {
|
||||
match r {
|
||||
Some(p) => println!("\t\t\tadd torrent file `{}`", p.to_string_lossy()),
|
||||
None => println!("\t\t\ttorrent file `{i}` already exists"),
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => eprintln!("Error on save torrent file `{i}`: `{e}`"),
|
||||
}
|
||||
}
|
||||
|
||||
/// Build magnet URI
|
||||
fn magnet(infohash: &str, trackers: Option<&HashSet<Url>>) -> String {
|
||||
let mut m = if infohash.len() == 40 {
|
||||
format!("magnet:?xt=urn:btih:{infohash}")
|
||||
fn magnet(info_hash: &str, trackers: Option<&Vec<Url>>) -> String {
|
||||
let mut m = if info_hash.len() == 40 {
|
||||
format!("magnet:?xt=urn:btih:{info_hash}")
|
||||
} else {
|
||||
todo!("infohash v2 is not supported by librqbit")
|
||||
};
|
||||
|
|
@ -355,62 +215,3 @@ fn magnet(infohash: &str, trackers: Option<&HashSet<Url>>) -> String {
|
|||
}
|
||||
m
|
||||
}
|
||||
|
||||
/// Count total size, including torrent files
|
||||
fn size(meta: &ValidatedTorrentMetaV1Info<ByteBufOwned>) -> u64 {
|
||||
let mut t = 0;
|
||||
if let Some(l) = meta.info().length {
|
||||
t += l
|
||||
}
|
||||
if let Some(ref files) = meta.info().files {
|
||||
for f in files {
|
||||
t += f.length
|
||||
}
|
||||
}
|
||||
t
|
||||
}
|
||||
|
||||
fn list(
|
||||
meta: &ValidatedTorrentMetaV1Info<ByteBufOwned>,
|
||||
limit: usize,
|
||||
) -> Option<Vec<(Option<String>, u64)>> {
|
||||
meta.info().files.as_ref().map(|files| {
|
||||
let mut b = Vec::with_capacity(files.len());
|
||||
let mut i = files.iter();
|
||||
let mut t = 0;
|
||||
for f in i.by_ref() {
|
||||
if t < limit {
|
||||
t += 1;
|
||||
b.push((
|
||||
String::from_utf8(
|
||||
f.path
|
||||
.iter()
|
||||
.enumerate()
|
||||
.flat_map(|(n, b)| {
|
||||
if n == 0 {
|
||||
b.0.to_vec()
|
||||
} else {
|
||||
let mut p = vec![b'/'];
|
||||
p.extend(b.0.to_vec());
|
||||
p
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
)
|
||||
.ok(),
|
||||
f.length,
|
||||
));
|
||||
continue;
|
||||
}
|
||||
// limit reached: count sizes left and use placeholder as the last item name
|
||||
let mut l = 0;
|
||||
for f in i.by_ref() {
|
||||
l += f.length
|
||||
}
|
||||
b.push((Some("...".to_string()), l));
|
||||
break;
|
||||
}
|
||||
b[..t].sort_by(|a, b| a.0.cmp(&b.0)); // @TODO optional
|
||||
b
|
||||
})
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue