From ace2e1a296bc9e3cb27b0c62f00f2b5854414221 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sat, 4 Apr 2020 22:08:32 +0200 Subject: [PATCH] Initial commit: aquatic, udp bittorrent tracker --- .gitignore | 1 + Cargo.lock | 291 +++++++++++++++++++++ Cargo.toml | 17 ++ aquatic/Cargo.toml | 23 ++ aquatic/src/handler.rs | 134 ++++++++++ aquatic/src/main.rs | 25 ++ aquatic/src/network.rs | 150 +++++++++++ aquatic/src/types.rs | 129 +++++++++ bittorrent_udp/Cargo.toml | 8 + bittorrent_udp/src/converters/common.rs | 21 ++ bittorrent_udp/src/converters/mod.rs | 6 + bittorrent_udp/src/converters/requests.rs | 187 +++++++++++++ bittorrent_udp/src/converters/responses.rs | 231 ++++++++++++++++ bittorrent_udp/src/lib.rs | 2 + bittorrent_udp/src/types/common.rs | 50 ++++ bittorrent_udp/src/types/mod.rs | 7 + bittorrent_udp/src/types/request.rs | 59 +++++ bittorrent_udp/src/types/response.rs | 45 ++++ 18 files changed, 1386 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 aquatic/Cargo.toml create mode 100644 aquatic/src/handler.rs create mode 100644 aquatic/src/main.rs create mode 100644 aquatic/src/network.rs create mode 100644 aquatic/src/types.rs create mode 100644 bittorrent_udp/Cargo.toml create mode 100644 bittorrent_udp/src/converters/common.rs create mode 100644 bittorrent_udp/src/converters/mod.rs create mode 100644 bittorrent_udp/src/converters/requests.rs create mode 100644 bittorrent_udp/src/converters/responses.rs create mode 100644 bittorrent_udp/src/lib.rs create mode 100644 bittorrent_udp/src/types/common.rs create mode 100644 bittorrent_udp/src/types/mod.rs create mode 100644 bittorrent_udp/src/types/request.rs create mode 100644 bittorrent_udp/src/types/response.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..94f3b2a --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,291 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "ahash" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0989268a37e128d4d7a8028f1c60099430113fdbc70419010601ce51a228e4fe" +dependencies = [ + "const-random", +] + +[[package]] +name = "aquatic" +version = "0.1.0" +dependencies = [ + "bittorrent_udp", + "dashmap", + "indexmap", + "mio", + "net2", + "rand", +] + +[[package]] +name = "autocfg" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d" + +[[package]] +name = "bittorrent_udp" +version = "0.1.0" +dependencies = [ + "byteorder", +] + +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "const-random" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a" +dependencies = [ + "const-random-macro", + "proc-macro-hack", +] + +[[package]] +name = "const-random-macro" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a" +dependencies = [ + "getrandom", + "proc-macro-hack", +] + +[[package]] +name = "dashmap" +version = "3.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0899c830f359a74834c84ed43b4c0cb6fd714a6fa64e20ab78c78f8cf86d8fc0" +dependencies = [ + "ahash", + "cfg-if", + "num_cpus", +] + +[[package]] +name = "getrandom" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "hermit-abi" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "725cf19794cf90aa94e65050cb4191ff5d8fa87a498383774c47b332e3af952e" +dependencies = [ + "libc", +] + +[[package]] +name = "indexmap" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292" +dependencies = [ + "autocfg", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dea0c0405123bba743ee3f91f49b1c7cfb684eef0da0a50110f758ccf24cdff0" + +[[package]] +name = "log" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "mio" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e9971bc8349a361217a8f2a41f5d011274686bd4436465ba51730921039d7fb" +dependencies = [ + "lazy_static", + "libc", + "log", + "miow", + "ntapi", + "winapi", +] + +[[package]] +name = "miow" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "396aa0f2003d7df8395cb93e09871561ccc3e785f0acb369170e8cc74ddf9226" +dependencies = [ + "socket2", + "winapi", +] + +[[package]] +name = "net2" +version = "0.2.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88" +dependencies = [ + "cfg-if", + "libc", + "winapi", +] + +[[package]] +name = "ntapi" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26e041cd983acbc087e30fcba770380cfa352d0e392e175b2344ebaf7ea0602" +dependencies = [ + "winapi", +] + +[[package]] +name = "num_cpus" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b" + +[[package]] +name = "proc-macro-hack" +version = "0.5.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63" + +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom", + "libc", + "rand_chacha", + "rand_core", + "rand_hc", + "rand_pcg", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core", +] + +[[package]] +name = "rand_pcg" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16abd0c1b639e9eb4d7c50c0b8100b0d0f849be2349829c740fe8e6eb4816429" +dependencies = [ + "rand_core", +] + +[[package]] +name = "redox_syscall" +version = "0.1.56" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84" + +[[package]] +name = "socket2" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "winapi", +] + +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + +[[package]] +name = "winapi" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..f3b691a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,17 @@ +[workspace] + +members = [ + "bittorrent_udp", + "aquatic", +] + +[profile.dev] +debug = true + +[profile.release] +debug = true +lto = true + +[profile.bench] +opt-level = 3 +lto = true \ No newline at end of file diff --git a/aquatic/Cargo.toml b/aquatic/Cargo.toml new file mode 100644 index 0000000..cf5a09d --- /dev/null +++ b/aquatic/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "aquatic" +version = "0.1.0" +authors = ["Joakim FrostegÄrd "] +edition = "2018" + +[[bin]] +name = "aquatic" +path = "src/main.rs" + +[dependencies] +bittorrent_udp = { path = "../bittorrent_udp" } +dashmap = "3" +indexmap = "1" +net2 = "0.2" + +[dependencies.rand] +version = "0.7" +features = ["small_rng"] + +[dependencies.mio] +version = "0.7" +features = ["udp", "os-poll", "os-util"] \ No newline at end of file diff --git a/aquatic/src/handler.rs b/aquatic/src/handler.rs new file mode 100644 index 0000000..6c786b9 --- /dev/null +++ b/aquatic/src/handler.rs @@ -0,0 +1,134 @@ +use std::net::SocketAddr; +use std::sync::atomic::Ordering; +use std::time::Instant; + +use rand::{self, SeedableRng, rngs::SmallRng, thread_rng}; +use rand::seq::IteratorRandom; + +use bittorrent_udp::types::*; + +use crate::types::*; + + +pub fn gen_responses( + state: &State, + connect_requests: Vec<(ConnectRequest, SocketAddr)>, + announce_requests: Vec<(AnnounceRequest, SocketAddr)> +)-> Vec<(Response, SocketAddr)> { + let mut responses = Vec::new(); + + let now = Time(Instant::now()); + + for (request, src) in connect_requests { + let connection_id = ConnectionId(rand::random()); + + let key = ConnectionKey { + connection_id, + socket_addr: src, + }; + + state.connections.insert(key, now); + + responses.push((Response::Connect( + ConnectResponse { + connection_id, + transaction_id: request.transaction_id, + } + ), src)); + } + + for (request, src) in announce_requests { + let connection_key = ConnectionKey { + connection_id: request.connection_id, + socket_addr: src, + }; + + if !state.connections.contains_key(&connection_key){ + continue; + } + + let mut torrent_data = state.torrents + .entry(request.info_hash) + .or_insert_with(|| TorrentData::default()); + + let peer_key = PeerMapKey { + ip: src.ip(), + peer_id: request.peer_id, + }; + + let peer = Peer::from_announce_and_ip(&request, src.ip()); + let peer_status = peer.status; + + let opt_removed_peer = if peer.status == PeerStatus::Stopped { + torrent_data.peers.remove(&peer_key) + } else { + torrent_data.peers.insert(peer_key, peer) + }; + + match peer_status { + PeerStatus::Leeching => { + torrent_data.num_leechers.fetch_add(1, Ordering::SeqCst); + }, + PeerStatus::Seeding => { + torrent_data.num_seeders.fetch_add(1, Ordering::SeqCst); + }, + PeerStatus::Stopped => {} + }; + + if let Some(removed_peer) = opt_removed_peer { + match removed_peer.status { + PeerStatus::Leeching => { + torrent_data.num_leechers.fetch_sub(1, Ordering::SeqCst); + }, + PeerStatus::Seeding => { + torrent_data.num_seeders.fetch_sub(1, Ordering::SeqCst); + }, + PeerStatus::Stopped => {} + } + } + + let response_peers = extract_response_peers(&torrent_data.peers, 100); // FIXME num peers + + let response = Response::Announce(AnnounceResponse { + transaction_id: request.transaction_id, + announce_interval: AnnounceInterval( + 600 // config.announce_interval as i32 + ), + leechers: NumberOfPeers(torrent_data.num_leechers.load(Ordering::SeqCst) as i32), + seeders: NumberOfPeers(torrent_data.num_seeders.load(Ordering::SeqCst) as i32), + peers: response_peers + }); + + responses.push((response, src)); + } + + responses +} + + +/// Extract response peers +/// +/// Do a random selection of peers if there are more than +/// `number_of_peers_to_take`. I tried out just selecting a random range, +/// but this might cause issues with the announcing peer getting back too +/// homogenous peers (based on when they were inserted into the map.) +/// +/// Don't care if we send back announcing peer. +pub fn extract_response_peers( + peer_map: &PeerMap, + number_of_peers_to_take: usize, +) -> Vec { + let peer_map_len = peer_map.len(); + + if peer_map_len <= number_of_peers_to_take { + peer_map.values() + .map(Peer::to_response_peer) + .collect() + } else { + let mut rng = SmallRng::from_rng(thread_rng()).unwrap(); + + peer_map.values() + .map(Peer::to_response_peer) + .choose_multiple(&mut rng, number_of_peers_to_take) + } +} \ No newline at end of file diff --git a/aquatic/src/main.rs b/aquatic/src/main.rs new file mode 100644 index 0000000..613186f --- /dev/null +++ b/aquatic/src/main.rs @@ -0,0 +1,25 @@ +use std::time::Duration; + +mod handler; +mod network; +mod types; + +use types::State; + + +fn main(){ + let addr = ([127, 0, 0, 1], 3000).into(); + let socket = network::create_socket(addr, 4096 * 8); + let state = State::new(); + + for i in 1..4 { + let socket = socket.try_clone().unwrap(); + let state = state.clone(); + + ::std::thread::spawn(move || { + network::run_event_loop(state, socket, i, 4096, Duration::from_millis(1000)); + }); + } + + network::run_event_loop(state, socket, 0, 4096, Duration::from_millis(1000)); +} diff --git a/aquatic/src/network.rs b/aquatic/src/network.rs new file mode 100644 index 0000000..28680d3 --- /dev/null +++ b/aquatic/src/network.rs @@ -0,0 +1,150 @@ +use std::net::SocketAddr; +use std::time::Duration; +use std::io::ErrorKind; + +use mio::{Events, Poll, Interest, Token}; +use mio::net::UdpSocket; +use net2::{UdpSocketExt, UdpBuilder}; +use net2::unix::UnixUdpBuilderExt; + +use bittorrent_udp::types::IpVersion; +use bittorrent_udp::converters::{response_to_bytes, request_from_bytes}; + +use crate::types::*; +use crate::handler::*; + + +pub fn create_socket( + addr: SocketAddr, + recv_buffer_size: usize, +) -> ::std::net::UdpSocket { + + let mut builder = &{ + if addr.is_ipv4(){ + UdpBuilder::new_v4().expect("socket: build") + } else { + UdpBuilder::new_v6().expect("socket: build") + } + }; + + builder = builder.reuse_port(true) + .expect("socket: set reuse port"); + + let socket = builder.bind(&addr) + .expect(&format!("socket: bind to {}", addr)); + + socket.set_nonblocking(true) + .expect("socket: set nonblocking"); + + if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size){ + eprintln!( + "socket: failed setting recv buffer to {}: {:?}", + recv_buffer_size, + err + ); + } + + socket +} + + +pub fn run_event_loop( + state: State, + socket: ::std::net::UdpSocket, + token_num: usize, + event_capacity: usize, + poll_timeout: Duration, +){ + let mut buffer = [0u8; 4096]; + + let mut socket = UdpSocket::from_std(socket); + let mut poll = Poll::new().expect("create poll"); + + let interests = Interest::READABLE | Interest::WRITABLE; + + poll.registry() + .register(&mut socket, Token(token_num), interests) + .unwrap(); + + let mut events = Events::with_capacity(event_capacity); + + loop { + poll.poll(&mut events, Some(poll_timeout)) + .expect("failed polling"); + + for event in events.iter(){ + let token = event.token(); + + if token.0 == token_num { + if event.is_readable(){ + let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::with_capacity(event_capacity); + let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::with_capacity(event_capacity); + + loop { + match socket.recv_from(&mut buffer) { + Ok((amt, src)) => { + let request = request_from_bytes( + &buffer[..amt], + 255u8 + ); + + match request { + Request::Connect(r) => { + connect_requests.push((r, src)); + }, + Request::Announce(r) => { + announce_requests.push((r, src)); + }, + _ => { + // FIXME + } + } + }, + Err(err) => { + match err.kind() { + ErrorKind::WouldBlock => { + break; + }, + err => { + eprintln!("recv_from error: {:?}", err); + + break; + } + } + } + } + } + + let responses = gen_responses( + &state, + connect_requests, + announce_requests + ); + + for (response, src) in responses { + let bytes = response_to_bytes(&response, IpVersion::IPv4); + + match socket.send_to(&bytes[..], src){ + Ok(_bytes_sent) => { + }, + Err(err) => { + match err.kind(){ + ErrorKind::WouldBlock => { + break; + }, + err => { + eprintln!("send_to error: {:?}", err); + + break; + } + } + } + } + } + + poll.registry().reregister(&mut socket, token, interests).unwrap(); + } + } + } + } +} \ No newline at end of file diff --git a/aquatic/src/types.rs b/aquatic/src/types.rs new file mode 100644 index 0000000..14a1f6a --- /dev/null +++ b/aquatic/src/types.rs @@ -0,0 +1,129 @@ +use std::sync::Arc; +use std::sync::atomic::AtomicUsize; +use std::net::{SocketAddr, IpAddr}; +use std::time::Instant; + +use dashmap::DashMap; +use indexmap::IndexMap; + +pub use bittorrent_udp::types::*; + + +#[derive(Debug, Clone, Copy)] +pub struct Time(pub Instant); + + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ConnectionKey { + pub connection_id: ConnectionId, + pub socket_addr: SocketAddr +} + +pub type ConnectionMap = DashMap; + + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub enum PeerStatus { + Seeding, + Leeching, + Stopped +} + +impl PeerStatus { + /// Determine peer status from announce event and number of bytes left. + /// + /// Likely, the last branch will be taken most of the time. + pub fn from_event_and_bytes_left( + event: AnnounceEvent, + bytes_left: NumberOfBytes + ) -> Self { + if event == AnnounceEvent::Stopped { + Self::Stopped + } else if bytes_left.0 == 0 { + Self::Seeding + } else { + Self::Leeching + } + } +} + +#[derive(Clone, Debug)] +pub struct Peer { + pub id: PeerId, + pub connection_id: ConnectionId, + pub ip_address: IpAddr, + pub port: Port, + pub status: PeerStatus, + pub last_announce: Time +} + + +impl Peer { + pub fn to_response_peer(&self) -> ResponsePeer { + ResponsePeer { + ip_address: self.ip_address, + port: self.port + } + } + pub fn from_announce_and_ip( + announce_request: &AnnounceRequest, + ip_address: IpAddr + ) -> Self { + Self { + id: announce_request.peer_id, + connection_id: announce_request.connection_id, + ip_address, + port: announce_request.port, + status: PeerStatus::from_event_and_bytes_left( + announce_request.event, + announce_request.bytes_left + ), + last_announce: Time(Instant::now()) + } + } +} + +#[derive(PartialEq, Eq, Hash, Clone)] +pub struct PeerMapKey { + pub ip: IpAddr, + pub peer_id: PeerId +} + + +pub type PeerMap = IndexMap; + +pub struct TorrentData { + pub peers: PeerMap, + pub num_seeders: AtomicUsize, + pub num_leechers: AtomicUsize, +} + + +impl Default for TorrentData { + fn default() -> Self { + Self { + peers: IndexMap::new(), + num_seeders: AtomicUsize::new(0), + num_leechers: AtomicUsize::new(0), + } + } +} + + +pub type TorrentMap = DashMap; + + +#[derive(Clone)] +pub struct State { + pub connections: Arc, + pub torrents: Arc, +} + +impl State { + pub fn new() -> Self { + Self { + connections: Arc::new(DashMap::new()), + torrents: Arc::new(DashMap::new()), + } + } +} \ No newline at end of file diff --git a/bittorrent_udp/Cargo.toml b/bittorrent_udp/Cargo.toml new file mode 100644 index 0000000..6d8f756 --- /dev/null +++ b/bittorrent_udp/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "bittorrent_udp" +version = "0.1.0" +authors = ["Joakim FrostegÄrd "] +edition = "2018" + +[dependencies] +byteorder = "1" \ No newline at end of file diff --git a/bittorrent_udp/src/converters/common.rs b/bittorrent_udp/src/converters/common.rs new file mode 100644 index 0000000..8ae18d4 --- /dev/null +++ b/bittorrent_udp/src/converters/common.rs @@ -0,0 +1,21 @@ +use crate::types; + + +pub fn event_from_i32(i: i32) -> types::AnnounceEvent { + match i { + 1 => types::AnnounceEvent::Completed, + 2 => types::AnnounceEvent::Started, + 3 => types::AnnounceEvent::Stopped, + _ => types::AnnounceEvent::None + } +} + + +pub fn event_to_i32(event: types::AnnounceEvent) -> i32 { + match event { + types::AnnounceEvent::None => 0, + types::AnnounceEvent::Completed => 1, + types::AnnounceEvent::Started => 2, + types::AnnounceEvent::Stopped => 3 + } +} \ No newline at end of file diff --git a/bittorrent_udp/src/converters/mod.rs b/bittorrent_udp/src/converters/mod.rs new file mode 100644 index 0000000..52c9219 --- /dev/null +++ b/bittorrent_udp/src/converters/mod.rs @@ -0,0 +1,6 @@ +pub mod common; +pub mod requests; +pub mod responses; + +pub use self::requests::*; +pub use self::responses::*; \ No newline at end of file diff --git a/bittorrent_udp/src/converters/requests.rs b/bittorrent_udp/src/converters/requests.rs new file mode 100644 index 0000000..8801914 --- /dev/null +++ b/bittorrent_udp/src/converters/requests.rs @@ -0,0 +1,187 @@ +use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian}; + +use std::io; +use std::io::Read; +use std::net::Ipv4Addr; + +use crate::types; + +use super::common::*; + + +const MAGIC_NUMBER: i64 = 4_497_486_125_440; + + +pub fn request_to_bytes(request: &types::Request) -> Vec { + let mut bytes = Vec::new(); + + match request { + types::Request::Connect(r) => { + bytes.write_i64::(MAGIC_NUMBER).unwrap(); + bytes.write_i32::(0).unwrap(); + bytes.write_i32::(r.transaction_id.0).unwrap(); + }, + + types::Request::Announce(r) => { + bytes.write_i64::(r.connection_id.0).unwrap(); + bytes.write_i32::(1).unwrap(); + bytes.write_i32::(r.transaction_id.0).unwrap(); + + bytes.extend(r.info_hash.0.iter()); + bytes.extend(r.peer_id.0.iter()); + + bytes.write_i64::(r.bytes_downloaded.0).unwrap(); + bytes.write_i64::(r.bytes_left.0).unwrap(); + bytes.write_i64::(r.bytes_uploaded.0).unwrap(); + + bytes.write_i32::(event_to_i32(r.event)).unwrap(); + + bytes.extend(&r.ip_address.map_or([0; 4], |ip| ip.octets())); + + bytes.write_u32::(0).unwrap(); // IP + bytes.write_u32::(r.key.0).unwrap(); + bytes.write_i32::(r.peers_wanted.0).unwrap(); + bytes.write_u16::(r.port.0).unwrap(); + }, + + types::Request::Scrape(r) => { + bytes.write_i64::(r.connection_id.0).unwrap(); + bytes.write_i32::(2).unwrap(); + bytes.write_i32::(r.transaction_id.0).unwrap(); + + for info_hash in &r.info_hashes { + bytes.extend(info_hash.0.iter()); + } + } + + _ => () // Invalid requests should never happen + } + + bytes +} + + +pub fn request_from_bytes( + bytes: &[u8], + max_scrape_torrents: u8, +) -> types::Request { + + match try_request_from_bytes(bytes, max_scrape_torrents){ + Ok(request) => request, + Err(_) => types::Request::Error + } +} + + +fn try_request_from_bytes( + bytes: &[u8], + max_scrape_torrents: u8, +) -> Result { + + let mut bytes = io::Cursor::new(bytes); + + let connection_id = bytes.read_i64::()?; + let action = bytes.read_i32::()?; + let transaction_id = bytes.read_i32::()?; + + match action { + // Connect + 0 => { + if connection_id == MAGIC_NUMBER { + Ok(types::Request::Connect(types::ConnectRequest { + transaction_id:types::TransactionId(transaction_id) + })) + } + else { + Ok(types::Request::Invalid(types::InvalidRequest { + transaction_id:types::TransactionId(transaction_id), + message: + "Please send protocol identifier in connect request" + .to_string() + })) + } + }, + + // Announce + 1 => { + let mut info_hash = [0; 20]; + let mut peer_id = [0; 20]; + let mut ip = [0; 4]; + + bytes.read_exact(&mut info_hash)?; + bytes.read_exact(&mut peer_id)?; + + let bytes_downloaded = bytes.read_i64::()?; + let bytes_left = bytes.read_i64::()?; + let bytes_uploaded = bytes.read_i64::()?; + let event = bytes.read_i32::()?; + + bytes.read_exact(&mut ip)?; + + let key = bytes.read_u32::()?; + let peers_wanted = bytes.read_i32::()?; + let port = bytes.read_u16::()?; + + let opt_ip = if ip == [0; 4] { + None + } + else { + Some(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3])) + }; + + Ok(types::Request::Announce(types::AnnounceRequest { + connection_id: types::ConnectionId(connection_id), + transaction_id: types::TransactionId(transaction_id), + info_hash: types::InfoHash(info_hash), + peer_id: types::PeerId(peer_id), + bytes_downloaded: types::NumberOfBytes(bytes_downloaded), + bytes_uploaded: types::NumberOfBytes(bytes_uploaded), + bytes_left: types::NumberOfBytes(bytes_left), + event: event_from_i32(event), + ip_address: opt_ip, + key: types::PeerKey(key), + peers_wanted: types::NumberOfPeers(peers_wanted), + port: types::Port(port) + })) + }, + + // Scrape + 2 => { + let mut info_hashes = Vec::new(); + let mut info_hash = [0; 20]; + + let mut i = 0; + + loop { + if i > max_scrape_torrents { + return Ok(types::Request::Invalid(types::InvalidRequest { + transaction_id: types::TransactionId(transaction_id), + message: format!( + "Too many torrents. Maximum is {}", + max_scrape_torrents + ) + })); + } + + if bytes.read_exact(&mut info_hash).is_err(){ + break + } + + info_hashes.push(types::InfoHash(info_hash)); + + i += 1; + } + + Ok(types::Request::Scrape(types::ScrapeRequest { + connection_id: types::ConnectionId(connection_id), + transaction_id: types::TransactionId(transaction_id), + info_hashes + })) + } + + _ => Ok(types::Request::Invalid(types::InvalidRequest { + transaction_id: types::TransactionId(transaction_id), + message: "Invalid action".to_string() + })) + } +} diff --git a/bittorrent_udp/src/converters/responses.rs b/bittorrent_udp/src/converters/responses.rs new file mode 100644 index 0000000..12727b6 --- /dev/null +++ b/bittorrent_udp/src/converters/responses.rs @@ -0,0 +1,231 @@ +use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian}; + +use std::io; +use std::io::Read; +use std::net::{IpAddr, Ipv6Addr, Ipv4Addr}; + +use crate::types; + + +pub fn response_to_bytes( + response: &types::Response, + ip_version: types::IpVersion +) -> Vec { + let mut bytes = Vec::new(); + + match response { + types::Response::Connect(r) => { + bytes.write_i32::(0).unwrap(); + bytes.write_i32::(r.transaction_id.0).unwrap(); + bytes.write_i64::(r.connection_id.0).unwrap(); + }, + + types::Response::Announce(r) => { + bytes.write_i32::(1).unwrap(); + bytes.write_i32::(r.transaction_id.0).unwrap(); + bytes.write_i32::(r.announce_interval.0).unwrap(); + bytes.write_i32::(r.leechers.0).unwrap(); + bytes.write_i32::(r.seeders.0).unwrap(); + + // Write peer IPs and ports. Silently ignore peers with wrong + // IP version + for peer in r.peers.iter(){ + let mut correct = false; + + match peer.ip_address { + IpAddr::V4(ip) => { + if let types::IpVersion::IPv4 = ip_version { + bytes.extend(&ip.octets()); + correct = true; + } + }, + IpAddr::V6(ip) => { + if let types::IpVersion::IPv6 = ip_version { + bytes.extend(&ip.octets()); + correct = true; + } + } + } + + if correct { + bytes.write_u16::(peer.port.0).unwrap(); + } + } + }, + + types::Response::Scrape(r) => { + bytes.write_i32::(2).unwrap(); + bytes.write_i32::(r.transaction_id.0).unwrap(); + + for torrent_stat in r.torrent_stats.iter(){ + bytes.write_i32::(torrent_stat.seeders.0) + .unwrap(); + bytes.write_i32::(torrent_stat.completed.0) + .unwrap(); + bytes.write_i32::(torrent_stat.leechers.0) + .unwrap(); + } + }, + + types::Response::Error(r) => { + bytes.write_i32::(3).unwrap(); + bytes.write_i32::(r.transaction_id.0).unwrap(); + + bytes.extend(r.message.as_bytes().iter()); + }, + } + + bytes +} + + +pub fn response_from_bytes( + bytes: &[u8], + ip_version: types::IpVersion, +) -> Result { + + let mut bytes = io::Cursor::new(bytes); + + let action = bytes.read_i32::()?; + let transaction_id = bytes.read_i32::()?; + + match action { + + // Connect + 0 => { + let connection_id = bytes.read_i64::()?; + + Ok(types::Response::Connect(types::ConnectResponse { + connection_id: types::ConnectionId(connection_id), + transaction_id: types::TransactionId(transaction_id) + })) + }, + + // Announce + 1 => { + let announce_interval = bytes.read_i32::()?; + let leechers = bytes.read_i32::()?; + let seeders = bytes.read_i32::()?; + + let mut peers = Vec::new(); + + loop { + let mut opt_ip_address = None; + + match ip_version { + types::IpVersion::IPv4 => { + let mut ip_bytes = [0; 4]; + + if bytes.read_exact(&mut ip_bytes).is_ok() { + opt_ip_address = Some(IpAddr::V4(Ipv4Addr::new( + ip_bytes[0], + ip_bytes[1], + ip_bytes[2], + ip_bytes[3], + ))); + } + }, + types::IpVersion::IPv6 => { + let mut ip_bytes = [0; 16]; + + if bytes.read_exact(&mut ip_bytes).is_ok() { + let mut ip_bytes_ref = &ip_bytes[..]; + + opt_ip_address = Some(IpAddr::V6(Ipv6Addr::new( + ip_bytes_ref.read_u16::()?, + ip_bytes_ref.read_u16::()?, + ip_bytes_ref.read_u16::()?, + ip_bytes_ref.read_u16::()?, + ip_bytes_ref.read_u16::()?, + ip_bytes_ref.read_u16::()?, + ip_bytes_ref.read_u16::()?, + ip_bytes_ref.read_u16::()?, + ))); + } + }, + } + if let Some(ip_address) = opt_ip_address { + if let Ok(port) = bytes.read_u16::() { + peers.push(types::ResponsePeer { + ip_address, + port: types::Port(port), + }); + } + else { + break; + } + } + else { + break; + } + } + + Ok(types::Response::Announce(types::AnnounceResponse { + transaction_id: types::TransactionId(transaction_id), + announce_interval: types::AnnounceInterval(announce_interval), + leechers: types::NumberOfPeers(leechers), + seeders: types::NumberOfPeers(seeders), + peers + })) + + }, + + // Scrape + 2 => { + let mut stats = Vec::new(); + + // TODO: transition to while let && when available + loop { + if let Ok(seeders) = bytes.read_i32::() { + if let Ok(downloaded) = bytes.read_i32::() { + if let Ok(leechers) = bytes.read_i32::() { + stats.push(types::TorrentScrapeStatistics { + seeders: types::NumberOfPeers(seeders), + completed: types::NumberOfDownloads(downloaded), + leechers: types::NumberOfPeers(leechers) + }); + } + else { + break; + } + } + else { + break; + } + } + else { + break; + } + } + + Ok(types::Response::Scrape(types::ScrapeResponse { + transaction_id: types::TransactionId(transaction_id), + torrent_stats: stats + })) + }, + + // Error + 3 => { + let mut message_bytes = Vec::new(); + + bytes.read_to_end(&mut message_bytes)?; + + let message = match String::from_utf8(message_bytes) { + Ok(message) => message, + Err(_) => "".to_string() + }; + + Ok(types::Response::Error(types::ErrorResponse { + transaction_id: types::TransactionId(transaction_id), + message + })) + }, + + _ => { + Ok(types::Response::Error(types::ErrorResponse { + transaction_id: types::TransactionId(transaction_id), + message: "Invalid action".to_string() + })) + } + } +} \ No newline at end of file diff --git a/bittorrent_udp/src/lib.rs b/bittorrent_udp/src/lib.rs new file mode 100644 index 0000000..1cad8f7 --- /dev/null +++ b/bittorrent_udp/src/lib.rs @@ -0,0 +1,2 @@ +pub mod converters; +pub mod types; \ No newline at end of file diff --git a/bittorrent_udp/src/types/common.rs b/bittorrent_udp/src/types/common.rs new file mode 100644 index 0000000..7bfd66d --- /dev/null +++ b/bittorrent_udp/src/types/common.rs @@ -0,0 +1,50 @@ +use std::net; + + + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub enum IpVersion { + IPv4, + IPv6 +} + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub struct AnnounceInterval (pub i32); + + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub struct InfoHash (pub [u8; 20]); + + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub struct ConnectionId (pub i64); + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub struct TransactionId (pub i32); + + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub struct NumberOfBytes (pub i64); + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub struct NumberOfPeers (pub i32); + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub struct NumberOfDownloads (pub i32); + + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub struct Port (pub u16); + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, PartialOrd, Ord)] +pub struct PeerId (pub [u8; 20]); + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub struct PeerKey (pub u32); + + +#[derive(Hash, PartialEq, Eq, Clone, Debug)] +pub struct ResponsePeer { + pub ip_address: net::IpAddr, + pub port: Port, +} \ No newline at end of file diff --git a/bittorrent_udp/src/types/mod.rs b/bittorrent_udp/src/types/mod.rs new file mode 100644 index 0000000..bf686fa --- /dev/null +++ b/bittorrent_udp/src/types/mod.rs @@ -0,0 +1,7 @@ +pub mod common; +pub mod request; +pub mod response; + +pub use self::common::*; +pub use self::request::*; +pub use self::response::*; diff --git a/bittorrent_udp/src/types/request.rs b/bittorrent_udp/src/types/request.rs new file mode 100644 index 0000000..7d63348 --- /dev/null +++ b/bittorrent_udp/src/types/request.rs @@ -0,0 +1,59 @@ +use std::net::Ipv4Addr; + +use super::common::*; + + +#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)] +pub enum AnnounceEvent { + Started, + Stopped, + Completed, + None +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct ConnectRequest { + pub transaction_id: TransactionId +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct AnnounceRequest { + pub connection_id: ConnectionId, + pub transaction_id: TransactionId, + pub info_hash: InfoHash, + pub peer_id: PeerId, + pub bytes_downloaded: NumberOfBytes, + pub bytes_uploaded: NumberOfBytes, + pub bytes_left: NumberOfBytes, + pub event: AnnounceEvent, + pub ip_address: Option, + pub key: PeerKey, + pub peers_wanted: NumberOfPeers, + pub port: Port +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct ScrapeRequest { + pub connection_id: ConnectionId, + pub transaction_id: TransactionId, + pub info_hashes: Vec +} + +/// This is used for returning specific errors from the parser +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct InvalidRequest { + pub transaction_id: TransactionId, + pub message: String +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub enum Request { + Connect(ConnectRequest), + Announce(AnnounceRequest), + Scrape(ScrapeRequest), + Invalid(InvalidRequest), + + /// Should ideally only be used when no transaction id can be parsed, + /// but is currently also used as a catch-all for non-specific errors + Error, +} \ No newline at end of file diff --git a/bittorrent_udp/src/types/response.rs b/bittorrent_udp/src/types/response.rs new file mode 100644 index 0000000..196647b --- /dev/null +++ b/bittorrent_udp/src/types/response.rs @@ -0,0 +1,45 @@ +use super::common::*; + + + +#[derive(PartialEq, Eq, Debug, Copy, Clone)] +pub struct TorrentScrapeStatistics { + pub seeders: NumberOfPeers, + pub completed: NumberOfDownloads, + pub leechers: NumberOfPeers +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct ConnectResponse { + pub connection_id: ConnectionId, + pub transaction_id: TransactionId +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct AnnounceResponse { + pub transaction_id: TransactionId, + pub announce_interval: AnnounceInterval, + pub leechers: NumberOfPeers, + pub seeders: NumberOfPeers, + pub peers: Vec +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct ScrapeResponse { + pub transaction_id: TransactionId, + pub torrent_stats: Vec +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct ErrorResponse { + pub transaction_id: TransactionId, + pub message: String +} + +#[derive(PartialEq, Eq, Clone, Debug)] +pub enum Response { + Connect(ConnectResponse), + Announce(AnnounceResponse), + Scrape(ScrapeResponse), + Error(ErrorResponse), +}