Initial commit: aquatic, udp bittorrent tracker

This commit is contained in:
Joakim Frostegård 2020-04-04 22:08:32 +02:00
commit ace2e1a296
18 changed files with 1386 additions and 0 deletions

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
/target

291
Cargo.lock generated Normal file
View file

@ -0,0 +1,291 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
[[package]]
name = "ahash"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0989268a37e128d4d7a8028f1c60099430113fdbc70419010601ce51a228e4fe"
dependencies = [
"const-random",
]
[[package]]
name = "aquatic"
version = "0.1.0"
dependencies = [
"bittorrent_udp",
"dashmap",
"indexmap",
"mio",
"net2",
"rand",
]
[[package]]
name = "autocfg"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8aac770f1885fd7e387acedd76065302551364496e46b3dd00860b2f8359b9d"
[[package]]
name = "bittorrent_udp"
version = "0.1.0"
dependencies = [
"byteorder",
]
[[package]]
name = "byteorder"
version = "1.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "const-random"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f1af9ac737b2dd2d577701e59fd09ba34822f6f2ebdb30a7647405d9e55e16a"
dependencies = [
"const-random-macro",
"proc-macro-hack",
]
[[package]]
name = "const-random-macro"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25e4c606eb459dd29f7c57b2e0879f2b6f14ee130918c2b78ccb58a9624e6c7a"
dependencies = [
"getrandom",
"proc-macro-hack",
]
[[package]]
name = "dashmap"
version = "3.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0899c830f359a74834c84ed43b4c0cb6fd714a6fa64e20ab78c78f8cf86d8fc0"
dependencies = [
"ahash",
"cfg-if",
"num_cpus",
]
[[package]]
name = "getrandom"
version = "0.1.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7abc8dd8451921606d809ba32e95b6111925cd2906060d2dcc29c070220503eb"
dependencies = [
"cfg-if",
"libc",
"wasi",
]
[[package]]
name = "hermit-abi"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "725cf19794cf90aa94e65050cb4191ff5d8fa87a498383774c47b332e3af952e"
dependencies = [
"libc",
]
[[package]]
name = "indexmap"
version = "1.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "076f042c5b7b98f31d205f1249267e12a6518c1481e9dae9764af19b707d2292"
dependencies = [
"autocfg",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dea0c0405123bba743ee3f91f49b1c7cfb684eef0da0a50110f758ccf24cdff0"
[[package]]
name = "log"
version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14b6052be84e6b71ab17edffc2eeabf5c2c3ae1fdb464aae35ac50c67a44e1f7"
dependencies = [
"cfg-if",
]
[[package]]
name = "mio"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e9971bc8349a361217a8f2a41f5d011274686bd4436465ba51730921039d7fb"
dependencies = [
"lazy_static",
"libc",
"log",
"miow",
"ntapi",
"winapi",
]
[[package]]
name = "miow"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "396aa0f2003d7df8395cb93e09871561ccc3e785f0acb369170e8cc74ddf9226"
dependencies = [
"socket2",
"winapi",
]
[[package]]
name = "net2"
version = "0.2.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42550d9fb7b6684a6d404d9fa7250c2eb2646df731d1c06afc06dcee9e1bcf88"
dependencies = [
"cfg-if",
"libc",
"winapi",
]
[[package]]
name = "ntapi"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26e041cd983acbc087e30fcba770380cfa352d0e392e175b2344ebaf7ea0602"
dependencies = [
"winapi",
]
[[package]]
name = "num_cpus"
version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46203554f085ff89c235cd12f7075f3233af9b11ed7c9e16dfe2560d03313ce6"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "ppv-lite86"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74490b50b9fbe561ac330df47c08f3f33073d2d00c150f719147d7c54522fa1b"
[[package]]
name = "proc-macro-hack"
version = "0.5.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d659fe7c6d27f25e9d80a1a094c223f5246f6a6596453e09d7229bf42750b63"
[[package]]
name = "rand"
version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03"
dependencies = [
"getrandom",
"libc",
"rand_chacha",
"rand_core",
"rand_hc",
"rand_pcg",
]
[[package]]
name = "rand_chacha"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402"
dependencies = [
"ppv-lite86",
"rand_core",
]
[[package]]
name = "rand_core"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19"
dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c"
dependencies = [
"rand_core",
]
[[package]]
name = "rand_pcg"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16abd0c1b639e9eb4d7c50c0b8100b0d0f849be2349829c740fe8e6eb4816429"
dependencies = [
"rand_core",
]
[[package]]
name = "redox_syscall"
version = "0.1.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2439c63f3f6139d1b57529d16bc3b8bb855230c8efcc5d3a896c8bea7c3b1e84"
[[package]]
name = "socket2"
version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03088793f677dce356f3ccc2edb1b314ad191ab702a5de3faf49304f7e104918"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"winapi",
]
[[package]]
name = "wasi"
version = "0.9.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519"
[[package]]
name = "winapi"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8093091eeb260906a183e6ae1abdba2ef5ef2257a21801128899c3fc699229c6"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"

17
Cargo.toml Normal file
View file

@ -0,0 +1,17 @@
[workspace]
members = [
"bittorrent_udp",
"aquatic",
]
[profile.dev]
debug = true
[profile.release]
debug = true
lto = true
[profile.bench]
opt-level = 3
lto = true

23
aquatic/Cargo.toml Normal file
View file

@ -0,0 +1,23 @@
[package]
name = "aquatic"
version = "0.1.0"
authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2018"
[[bin]]
name = "aquatic"
path = "src/main.rs"
[dependencies]
bittorrent_udp = { path = "../bittorrent_udp" }
dashmap = "3"
indexmap = "1"
net2 = "0.2"
[dependencies.rand]
version = "0.7"
features = ["small_rng"]
[dependencies.mio]
version = "0.7"
features = ["udp", "os-poll", "os-util"]

134
aquatic/src/handler.rs Normal file
View file

@ -0,0 +1,134 @@
use std::net::SocketAddr;
use std::sync::atomic::Ordering;
use std::time::Instant;
use rand::{self, SeedableRng, rngs::SmallRng, thread_rng};
use rand::seq::IteratorRandom;
use bittorrent_udp::types::*;
use crate::types::*;
pub fn gen_responses(
state: &State,
connect_requests: Vec<(ConnectRequest, SocketAddr)>,
announce_requests: Vec<(AnnounceRequest, SocketAddr)>
)-> Vec<(Response, SocketAddr)> {
let mut responses = Vec::new();
let now = Time(Instant::now());
for (request, src) in connect_requests {
let connection_id = ConnectionId(rand::random());
let key = ConnectionKey {
connection_id,
socket_addr: src,
};
state.connections.insert(key, now);
responses.push((Response::Connect(
ConnectResponse {
connection_id,
transaction_id: request.transaction_id,
}
), src));
}
for (request, src) in announce_requests {
let connection_key = ConnectionKey {
connection_id: request.connection_id,
socket_addr: src,
};
if !state.connections.contains_key(&connection_key){
continue;
}
let mut torrent_data = state.torrents
.entry(request.info_hash)
.or_insert_with(|| TorrentData::default());
let peer_key = PeerMapKey {
ip: src.ip(),
peer_id: request.peer_id,
};
let peer = Peer::from_announce_and_ip(&request, src.ip());
let peer_status = peer.status;
let opt_removed_peer = if peer.status == PeerStatus::Stopped {
torrent_data.peers.remove(&peer_key)
} else {
torrent_data.peers.insert(peer_key, peer)
};
match peer_status {
PeerStatus::Leeching => {
torrent_data.num_leechers.fetch_add(1, Ordering::SeqCst);
},
PeerStatus::Seeding => {
torrent_data.num_seeders.fetch_add(1, Ordering::SeqCst);
},
PeerStatus::Stopped => {}
};
if let Some(removed_peer) = opt_removed_peer {
match removed_peer.status {
PeerStatus::Leeching => {
torrent_data.num_leechers.fetch_sub(1, Ordering::SeqCst);
},
PeerStatus::Seeding => {
torrent_data.num_seeders.fetch_sub(1, Ordering::SeqCst);
},
PeerStatus::Stopped => {}
}
}
let response_peers = extract_response_peers(&torrent_data.peers, 100); // FIXME num peers
let response = Response::Announce(AnnounceResponse {
transaction_id: request.transaction_id,
announce_interval: AnnounceInterval(
600 // config.announce_interval as i32
),
leechers: NumberOfPeers(torrent_data.num_leechers.load(Ordering::SeqCst) as i32),
seeders: NumberOfPeers(torrent_data.num_seeders.load(Ordering::SeqCst) as i32),
peers: response_peers
});
responses.push((response, src));
}
responses
}
/// Extract response peers
///
/// Do a random selection of peers if there are more than
/// `number_of_peers_to_take`. I tried out just selecting a random range,
/// but this might cause issues with the announcing peer getting back too
/// homogenous peers (based on when they were inserted into the map.)
///
/// Don't care if we send back announcing peer.
pub fn extract_response_peers(
peer_map: &PeerMap,
number_of_peers_to_take: usize,
) -> Vec<ResponsePeer> {
let peer_map_len = peer_map.len();
if peer_map_len <= number_of_peers_to_take {
peer_map.values()
.map(Peer::to_response_peer)
.collect()
} else {
let mut rng = SmallRng::from_rng(thread_rng()).unwrap();
peer_map.values()
.map(Peer::to_response_peer)
.choose_multiple(&mut rng, number_of_peers_to_take)
}
}

25
aquatic/src/main.rs Normal file
View file

@ -0,0 +1,25 @@
use std::time::Duration;
mod handler;
mod network;
mod types;
use types::State;
fn main(){
let addr = ([127, 0, 0, 1], 3000).into();
let socket = network::create_socket(addr, 4096 * 8);
let state = State::new();
for i in 1..4 {
let socket = socket.try_clone().unwrap();
let state = state.clone();
::std::thread::spawn(move || {
network::run_event_loop(state, socket, i, 4096, Duration::from_millis(1000));
});
}
network::run_event_loop(state, socket, 0, 4096, Duration::from_millis(1000));
}

150
aquatic/src/network.rs Normal file
View file

@ -0,0 +1,150 @@
use std::net::SocketAddr;
use std::time::Duration;
use std::io::ErrorKind;
use mio::{Events, Poll, Interest, Token};
use mio::net::UdpSocket;
use net2::{UdpSocketExt, UdpBuilder};
use net2::unix::UnixUdpBuilderExt;
use bittorrent_udp::types::IpVersion;
use bittorrent_udp::converters::{response_to_bytes, request_from_bytes};
use crate::types::*;
use crate::handler::*;
pub fn create_socket(
addr: SocketAddr,
recv_buffer_size: usize,
) -> ::std::net::UdpSocket {
let mut builder = &{
if addr.is_ipv4(){
UdpBuilder::new_v4().expect("socket: build")
} else {
UdpBuilder::new_v6().expect("socket: build")
}
};
builder = builder.reuse_port(true)
.expect("socket: set reuse port");
let socket = builder.bind(&addr)
.expect(&format!("socket: bind to {}", addr));
socket.set_nonblocking(true)
.expect("socket: set nonblocking");
if let Err(err) = socket.set_recv_buffer_size(recv_buffer_size){
eprintln!(
"socket: failed setting recv buffer to {}: {:?}",
recv_buffer_size,
err
);
}
socket
}
pub fn run_event_loop(
state: State,
socket: ::std::net::UdpSocket,
token_num: usize,
event_capacity: usize,
poll_timeout: Duration,
){
let mut buffer = [0u8; 4096];
let mut socket = UdpSocket::from_std(socket);
let mut poll = Poll::new().expect("create poll");
let interests = Interest::READABLE | Interest::WRITABLE;
poll.registry()
.register(&mut socket, Token(token_num), interests)
.unwrap();
let mut events = Events::with_capacity(event_capacity);
loop {
poll.poll(&mut events, Some(poll_timeout))
.expect("failed polling");
for event in events.iter(){
let token = event.token();
if token.0 == token_num {
if event.is_readable(){
let mut connect_requests: Vec<(ConnectRequest, SocketAddr)> = Vec::with_capacity(event_capacity);
let mut announce_requests: Vec<(AnnounceRequest, SocketAddr)> = Vec::with_capacity(event_capacity);
loop {
match socket.recv_from(&mut buffer) {
Ok((amt, src)) => {
let request = request_from_bytes(
&buffer[..amt],
255u8
);
match request {
Request::Connect(r) => {
connect_requests.push((r, src));
},
Request::Announce(r) => {
announce_requests.push((r, src));
},
_ => {
// FIXME
}
}
},
Err(err) => {
match err.kind() {
ErrorKind::WouldBlock => {
break;
},
err => {
eprintln!("recv_from error: {:?}", err);
break;
}
}
}
}
}
let responses = gen_responses(
&state,
connect_requests,
announce_requests
);
for (response, src) in responses {
let bytes = response_to_bytes(&response, IpVersion::IPv4);
match socket.send_to(&bytes[..], src){
Ok(_bytes_sent) => {
},
Err(err) => {
match err.kind(){
ErrorKind::WouldBlock => {
break;
},
err => {
eprintln!("send_to error: {:?}", err);
break;
}
}
}
}
}
poll.registry().reregister(&mut socket, token, interests).unwrap();
}
}
}
}
}

129
aquatic/src/types.rs Normal file
View file

@ -0,0 +1,129 @@
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use std::net::{SocketAddr, IpAddr};
use std::time::Instant;
use dashmap::DashMap;
use indexmap::IndexMap;
pub use bittorrent_udp::types::*;
#[derive(Debug, Clone, Copy)]
pub struct Time(pub Instant);
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ConnectionKey {
pub connection_id: ConnectionId,
pub socket_addr: SocketAddr
}
pub type ConnectionMap = DashMap<ConnectionKey, Time>;
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum PeerStatus {
Seeding,
Leeching,
Stopped
}
impl PeerStatus {
/// Determine peer status from announce event and number of bytes left.
///
/// Likely, the last branch will be taken most of the time.
pub fn from_event_and_bytes_left(
event: AnnounceEvent,
bytes_left: NumberOfBytes
) -> Self {
if event == AnnounceEvent::Stopped {
Self::Stopped
} else if bytes_left.0 == 0 {
Self::Seeding
} else {
Self::Leeching
}
}
}
#[derive(Clone, Debug)]
pub struct Peer {
pub id: PeerId,
pub connection_id: ConnectionId,
pub ip_address: IpAddr,
pub port: Port,
pub status: PeerStatus,
pub last_announce: Time
}
impl Peer {
pub fn to_response_peer(&self) -> ResponsePeer {
ResponsePeer {
ip_address: self.ip_address,
port: self.port
}
}
pub fn from_announce_and_ip(
announce_request: &AnnounceRequest,
ip_address: IpAddr
) -> Self {
Self {
id: announce_request.peer_id,
connection_id: announce_request.connection_id,
ip_address,
port: announce_request.port,
status: PeerStatus::from_event_and_bytes_left(
announce_request.event,
announce_request.bytes_left
),
last_announce: Time(Instant::now())
}
}
}
#[derive(PartialEq, Eq, Hash, Clone)]
pub struct PeerMapKey {
pub ip: IpAddr,
pub peer_id: PeerId
}
pub type PeerMap = IndexMap<PeerMapKey, Peer>;
pub struct TorrentData {
pub peers: PeerMap,
pub num_seeders: AtomicUsize,
pub num_leechers: AtomicUsize,
}
impl Default for TorrentData {
fn default() -> Self {
Self {
peers: IndexMap::new(),
num_seeders: AtomicUsize::new(0),
num_leechers: AtomicUsize::new(0),
}
}
}
pub type TorrentMap = DashMap<InfoHash, TorrentData>;
#[derive(Clone)]
pub struct State {
pub connections: Arc<ConnectionMap>,
pub torrents: Arc<TorrentMap>,
}
impl State {
pub fn new() -> Self {
Self {
connections: Arc::new(DashMap::new()),
torrents: Arc::new(DashMap::new()),
}
}
}

View file

@ -0,0 +1,8 @@
[package]
name = "bittorrent_udp"
version = "0.1.0"
authors = ["Joakim Frostegård <joakim.frostegard@gmail.com>"]
edition = "2018"
[dependencies]
byteorder = "1"

View file

@ -0,0 +1,21 @@
use crate::types;
pub fn event_from_i32(i: i32) -> types::AnnounceEvent {
match i {
1 => types::AnnounceEvent::Completed,
2 => types::AnnounceEvent::Started,
3 => types::AnnounceEvent::Stopped,
_ => types::AnnounceEvent::None
}
}
pub fn event_to_i32(event: types::AnnounceEvent) -> i32 {
match event {
types::AnnounceEvent::None => 0,
types::AnnounceEvent::Completed => 1,
types::AnnounceEvent::Started => 2,
types::AnnounceEvent::Stopped => 3
}
}

View file

@ -0,0 +1,6 @@
pub mod common;
pub mod requests;
pub mod responses;
pub use self::requests::*;
pub use self::responses::*;

View file

@ -0,0 +1,187 @@
use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian};
use std::io;
use std::io::Read;
use std::net::Ipv4Addr;
use crate::types;
use super::common::*;
const MAGIC_NUMBER: i64 = 4_497_486_125_440;
pub fn request_to_bytes(request: &types::Request) -> Vec<u8> {
let mut bytes = Vec::new();
match request {
types::Request::Connect(r) => {
bytes.write_i64::<NetworkEndian>(MAGIC_NUMBER).unwrap();
bytes.write_i32::<NetworkEndian>(0).unwrap();
bytes.write_i32::<NetworkEndian>(r.transaction_id.0).unwrap();
},
types::Request::Announce(r) => {
bytes.write_i64::<NetworkEndian>(r.connection_id.0).unwrap();
bytes.write_i32::<NetworkEndian>(1).unwrap();
bytes.write_i32::<NetworkEndian>(r.transaction_id.0).unwrap();
bytes.extend(r.info_hash.0.iter());
bytes.extend(r.peer_id.0.iter());
bytes.write_i64::<NetworkEndian>(r.bytes_downloaded.0).unwrap();
bytes.write_i64::<NetworkEndian>(r.bytes_left.0).unwrap();
bytes.write_i64::<NetworkEndian>(r.bytes_uploaded.0).unwrap();
bytes.write_i32::<NetworkEndian>(event_to_i32(r.event)).unwrap();
bytes.extend(&r.ip_address.map_or([0; 4], |ip| ip.octets()));
bytes.write_u32::<NetworkEndian>(0).unwrap(); // IP
bytes.write_u32::<NetworkEndian>(r.key.0).unwrap();
bytes.write_i32::<NetworkEndian>(r.peers_wanted.0).unwrap();
bytes.write_u16::<NetworkEndian>(r.port.0).unwrap();
},
types::Request::Scrape(r) => {
bytes.write_i64::<NetworkEndian>(r.connection_id.0).unwrap();
bytes.write_i32::<NetworkEndian>(2).unwrap();
bytes.write_i32::<NetworkEndian>(r.transaction_id.0).unwrap();
for info_hash in &r.info_hashes {
bytes.extend(info_hash.0.iter());
}
}
_ => () // Invalid requests should never happen
}
bytes
}
pub fn request_from_bytes(
bytes: &[u8],
max_scrape_torrents: u8,
) -> types::Request {
match try_request_from_bytes(bytes, max_scrape_torrents){
Ok(request) => request,
Err(_) => types::Request::Error
}
}
fn try_request_from_bytes(
bytes: &[u8],
max_scrape_torrents: u8,
) -> Result<types::Request,io::Error> {
let mut bytes = io::Cursor::new(bytes);
let connection_id = bytes.read_i64::<NetworkEndian>()?;
let action = bytes.read_i32::<NetworkEndian>()?;
let transaction_id = bytes.read_i32::<NetworkEndian>()?;
match action {
// Connect
0 => {
if connection_id == MAGIC_NUMBER {
Ok(types::Request::Connect(types::ConnectRequest {
transaction_id:types::TransactionId(transaction_id)
}))
}
else {
Ok(types::Request::Invalid(types::InvalidRequest {
transaction_id:types::TransactionId(transaction_id),
message:
"Please send protocol identifier in connect request"
.to_string()
}))
}
},
// Announce
1 => {
let mut info_hash = [0; 20];
let mut peer_id = [0; 20];
let mut ip = [0; 4];
bytes.read_exact(&mut info_hash)?;
bytes.read_exact(&mut peer_id)?;
let bytes_downloaded = bytes.read_i64::<NetworkEndian>()?;
let bytes_left = bytes.read_i64::<NetworkEndian>()?;
let bytes_uploaded = bytes.read_i64::<NetworkEndian>()?;
let event = bytes.read_i32::<NetworkEndian>()?;
bytes.read_exact(&mut ip)?;
let key = bytes.read_u32::<NetworkEndian>()?;
let peers_wanted = bytes.read_i32::<NetworkEndian>()?;
let port = bytes.read_u16::<NetworkEndian>()?;
let opt_ip = if ip == [0; 4] {
None
}
else {
Some(Ipv4Addr::new(ip[0], ip[1], ip[2], ip[3]))
};
Ok(types::Request::Announce(types::AnnounceRequest {
connection_id: types::ConnectionId(connection_id),
transaction_id: types::TransactionId(transaction_id),
info_hash: types::InfoHash(info_hash),
peer_id: types::PeerId(peer_id),
bytes_downloaded: types::NumberOfBytes(bytes_downloaded),
bytes_uploaded: types::NumberOfBytes(bytes_uploaded),
bytes_left: types::NumberOfBytes(bytes_left),
event: event_from_i32(event),
ip_address: opt_ip,
key: types::PeerKey(key),
peers_wanted: types::NumberOfPeers(peers_wanted),
port: types::Port(port)
}))
},
// Scrape
2 => {
let mut info_hashes = Vec::new();
let mut info_hash = [0; 20];
let mut i = 0;
loop {
if i > max_scrape_torrents {
return Ok(types::Request::Invalid(types::InvalidRequest {
transaction_id: types::TransactionId(transaction_id),
message: format!(
"Too many torrents. Maximum is {}",
max_scrape_torrents
)
}));
}
if bytes.read_exact(&mut info_hash).is_err(){
break
}
info_hashes.push(types::InfoHash(info_hash));
i += 1;
}
Ok(types::Request::Scrape(types::ScrapeRequest {
connection_id: types::ConnectionId(connection_id),
transaction_id: types::TransactionId(transaction_id),
info_hashes
}))
}
_ => Ok(types::Request::Invalid(types::InvalidRequest {
transaction_id: types::TransactionId(transaction_id),
message: "Invalid action".to_string()
}))
}
}

View file

@ -0,0 +1,231 @@
use byteorder::{ReadBytesExt, WriteBytesExt, NetworkEndian};
use std::io;
use std::io::Read;
use std::net::{IpAddr, Ipv6Addr, Ipv4Addr};
use crate::types;
pub fn response_to_bytes(
response: &types::Response,
ip_version: types::IpVersion
) -> Vec<u8> {
let mut bytes = Vec::new();
match response {
types::Response::Connect(r) => {
bytes.write_i32::<NetworkEndian>(0).unwrap();
bytes.write_i32::<NetworkEndian>(r.transaction_id.0).unwrap();
bytes.write_i64::<NetworkEndian>(r.connection_id.0).unwrap();
},
types::Response::Announce(r) => {
bytes.write_i32::<NetworkEndian>(1).unwrap();
bytes.write_i32::<NetworkEndian>(r.transaction_id.0).unwrap();
bytes.write_i32::<NetworkEndian>(r.announce_interval.0).unwrap();
bytes.write_i32::<NetworkEndian>(r.leechers.0).unwrap();
bytes.write_i32::<NetworkEndian>(r.seeders.0).unwrap();
// Write peer IPs and ports. Silently ignore peers with wrong
// IP version
for peer in r.peers.iter(){
let mut correct = false;
match peer.ip_address {
IpAddr::V4(ip) => {
if let types::IpVersion::IPv4 = ip_version {
bytes.extend(&ip.octets());
correct = true;
}
},
IpAddr::V6(ip) => {
if let types::IpVersion::IPv6 = ip_version {
bytes.extend(&ip.octets());
correct = true;
}
}
}
if correct {
bytes.write_u16::<NetworkEndian>(peer.port.0).unwrap();
}
}
},
types::Response::Scrape(r) => {
bytes.write_i32::<NetworkEndian>(2).unwrap();
bytes.write_i32::<NetworkEndian>(r.transaction_id.0).unwrap();
for torrent_stat in r.torrent_stats.iter(){
bytes.write_i32::<NetworkEndian>(torrent_stat.seeders.0)
.unwrap();
bytes.write_i32::<NetworkEndian>(torrent_stat.completed.0)
.unwrap();
bytes.write_i32::<NetworkEndian>(torrent_stat.leechers.0)
.unwrap();
}
},
types::Response::Error(r) => {
bytes.write_i32::<NetworkEndian>(3).unwrap();
bytes.write_i32::<NetworkEndian>(r.transaction_id.0).unwrap();
bytes.extend(r.message.as_bytes().iter());
},
}
bytes
}
pub fn response_from_bytes(
bytes: &[u8],
ip_version: types::IpVersion,
) -> Result<types::Response, io::Error> {
let mut bytes = io::Cursor::new(bytes);
let action = bytes.read_i32::<NetworkEndian>()?;
let transaction_id = bytes.read_i32::<NetworkEndian>()?;
match action {
// Connect
0 => {
let connection_id = bytes.read_i64::<NetworkEndian>()?;
Ok(types::Response::Connect(types::ConnectResponse {
connection_id: types::ConnectionId(connection_id),
transaction_id: types::TransactionId(transaction_id)
}))
},
// Announce
1 => {
let announce_interval = bytes.read_i32::<NetworkEndian>()?;
let leechers = bytes.read_i32::<NetworkEndian>()?;
let seeders = bytes.read_i32::<NetworkEndian>()?;
let mut peers = Vec::new();
loop {
let mut opt_ip_address = None;
match ip_version {
types::IpVersion::IPv4 => {
let mut ip_bytes = [0; 4];
if bytes.read_exact(&mut ip_bytes).is_ok() {
opt_ip_address = Some(IpAddr::V4(Ipv4Addr::new(
ip_bytes[0],
ip_bytes[1],
ip_bytes[2],
ip_bytes[3],
)));
}
},
types::IpVersion::IPv6 => {
let mut ip_bytes = [0; 16];
if bytes.read_exact(&mut ip_bytes).is_ok() {
let mut ip_bytes_ref = &ip_bytes[..];
opt_ip_address = Some(IpAddr::V6(Ipv6Addr::new(
ip_bytes_ref.read_u16::<NetworkEndian>()?,
ip_bytes_ref.read_u16::<NetworkEndian>()?,
ip_bytes_ref.read_u16::<NetworkEndian>()?,
ip_bytes_ref.read_u16::<NetworkEndian>()?,
ip_bytes_ref.read_u16::<NetworkEndian>()?,
ip_bytes_ref.read_u16::<NetworkEndian>()?,
ip_bytes_ref.read_u16::<NetworkEndian>()?,
ip_bytes_ref.read_u16::<NetworkEndian>()?,
)));
}
},
}
if let Some(ip_address) = opt_ip_address {
if let Ok(port) = bytes.read_u16::<NetworkEndian>() {
peers.push(types::ResponsePeer {
ip_address,
port: types::Port(port),
});
}
else {
break;
}
}
else {
break;
}
}
Ok(types::Response::Announce(types::AnnounceResponse {
transaction_id: types::TransactionId(transaction_id),
announce_interval: types::AnnounceInterval(announce_interval),
leechers: types::NumberOfPeers(leechers),
seeders: types::NumberOfPeers(seeders),
peers
}))
},
// Scrape
2 => {
let mut stats = Vec::new();
// TODO: transition to while let && when available
loop {
if let Ok(seeders) = bytes.read_i32::<NetworkEndian>() {
if let Ok(downloaded) = bytes.read_i32::<NetworkEndian>() {
if let Ok(leechers) = bytes.read_i32::<NetworkEndian>() {
stats.push(types::TorrentScrapeStatistics {
seeders: types::NumberOfPeers(seeders),
completed: types::NumberOfDownloads(downloaded),
leechers: types::NumberOfPeers(leechers)
});
}
else {
break;
}
}
else {
break;
}
}
else {
break;
}
}
Ok(types::Response::Scrape(types::ScrapeResponse {
transaction_id: types::TransactionId(transaction_id),
torrent_stats: stats
}))
},
// Error
3 => {
let mut message_bytes = Vec::new();
bytes.read_to_end(&mut message_bytes)?;
let message = match String::from_utf8(message_bytes) {
Ok(message) => message,
Err(_) => "".to_string()
};
Ok(types::Response::Error(types::ErrorResponse {
transaction_id: types::TransactionId(transaction_id),
message
}))
},
_ => {
Ok(types::Response::Error(types::ErrorResponse {
transaction_id: types::TransactionId(transaction_id),
message: "Invalid action".to_string()
}))
}
}
}

View file

@ -0,0 +1,2 @@
pub mod converters;
pub mod types;

View file

@ -0,0 +1,50 @@
use std::net;
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum IpVersion {
IPv4,
IPv6
}
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct AnnounceInterval (pub i32);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct InfoHash (pub [u8; 20]);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct ConnectionId (pub i64);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct TransactionId (pub i32);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct NumberOfBytes (pub i64);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct NumberOfPeers (pub i32);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct NumberOfDownloads (pub i32);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct Port (pub u16);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, PartialOrd, Ord)]
pub struct PeerId (pub [u8; 20]);
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub struct PeerKey (pub u32);
#[derive(Hash, PartialEq, Eq, Clone, Debug)]
pub struct ResponsePeer {
pub ip_address: net::IpAddr,
pub port: Port,
}

View file

@ -0,0 +1,7 @@
pub mod common;
pub mod request;
pub mod response;
pub use self::common::*;
pub use self::request::*;
pub use self::response::*;

View file

@ -0,0 +1,59 @@
use std::net::Ipv4Addr;
use super::common::*;
#[derive(PartialEq, Eq, Hash, Clone, Copy, Debug)]
pub enum AnnounceEvent {
Started,
Stopped,
Completed,
None
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct ConnectRequest {
pub transaction_id: TransactionId
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct AnnounceRequest {
pub connection_id: ConnectionId,
pub transaction_id: TransactionId,
pub info_hash: InfoHash,
pub peer_id: PeerId,
pub bytes_downloaded: NumberOfBytes,
pub bytes_uploaded: NumberOfBytes,
pub bytes_left: NumberOfBytes,
pub event: AnnounceEvent,
pub ip_address: Option<Ipv4Addr>,
pub key: PeerKey,
pub peers_wanted: NumberOfPeers,
pub port: Port
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct ScrapeRequest {
pub connection_id: ConnectionId,
pub transaction_id: TransactionId,
pub info_hashes: Vec<InfoHash>
}
/// This is used for returning specific errors from the parser
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct InvalidRequest {
pub transaction_id: TransactionId,
pub message: String
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub enum Request {
Connect(ConnectRequest),
Announce(AnnounceRequest),
Scrape(ScrapeRequest),
Invalid(InvalidRequest),
/// Should ideally only be used when no transaction id can be parsed,
/// but is currently also used as a catch-all for non-specific errors
Error,
}

View file

@ -0,0 +1,45 @@
use super::common::*;
#[derive(PartialEq, Eq, Debug, Copy, Clone)]
pub struct TorrentScrapeStatistics {
pub seeders: NumberOfPeers,
pub completed: NumberOfDownloads,
pub leechers: NumberOfPeers
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct ConnectResponse {
pub connection_id: ConnectionId,
pub transaction_id: TransactionId
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct AnnounceResponse {
pub transaction_id: TransactionId,
pub announce_interval: AnnounceInterval,
pub leechers: NumberOfPeers,
pub seeders: NumberOfPeers,
pub peers: Vec<ResponsePeer>
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct ScrapeResponse {
pub transaction_id: TransactionId,
pub torrent_stats: Vec<TorrentScrapeStatistics>
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub struct ErrorResponse {
pub transaction_id: TransactionId,
pub message: String
}
#[derive(PartialEq, Eq, Clone, Debug)]
pub enum Response {
Connect(ConnectResponse),
Announce(AnnounceResponse),
Scrape(ScrapeResponse),
Error(ErrorResponse),
}