aquatic_http: move protocol module to new crate aquatic_http_protocol

This commit is contained in:
Joakim Frostegård 2020-07-19 21:59:31 +02:00
parent 4caf174da5
commit 4ac2012a2a
36 changed files with 65 additions and 24 deletions

View file

@ -12,9 +12,9 @@ use smartstring::{SmartString, LazyCompact};
pub use aquatic_common::{ValidUntil, convert_ipv4_mapped_ipv4};
use crate::protocol::common::*;
use crate::protocol::request::Request;
use crate::protocol::response::{Response, ResponsePeer};
use aquatic_http_protocol::common::*;
use aquatic_http_protocol::request::Request;
use aquatic_http_protocol::response::{Response, ResponsePeer};
pub trait Ip: Copy + Eq + ::std::hash::Hash {}

View file

@ -8,12 +8,12 @@ use parking_lot::MutexGuard;
use rand::{Rng, SeedableRng, rngs::SmallRng};
use aquatic_common::extract_response_peers;
use aquatic_http_protocol::request::*;
use aquatic_http_protocol::response::*;
use crate::common::*;
use crate::config::Config;
use crate::protocol::request::*;
use crate::protocol::response::*;
pub fn run_request_worker(

View file

@ -12,7 +12,6 @@ pub mod common;
pub mod config;
pub mod handler;
pub mod network;
pub mod protocol;
pub mod tasks;
use common::*;

View file

@ -9,9 +9,9 @@ use mio::net::TcpStream;
use native_tls::{TlsAcceptor, MidHandshakeTlsStream};
use aquatic_common_tcp::network::stream::Stream;
use aquatic_http_protocol::request::Request;
use crate::common::*;
use crate::protocol::request::Request;
#[derive(Debug)]

View file

@ -12,10 +12,10 @@ use mio::{Events, Poll, Interest, Token};
use mio::net::TcpListener;
use aquatic_common_tcp::network::utils::create_listener;
use aquatic_http_protocol::response::*;
use crate::common::*;
use crate::config::Config;
use crate::protocol::response::*;
use connection::*;

View file

@ -1,71 +0,0 @@
use std::str::FromStr;
use serde::Serialize;
use super::utils::*;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, Serialize)]
#[serde(transparent)]
pub struct PeerId(
#[serde(
serialize_with = "serialize_20_bytes",
)]
pub [u8; 20]
);
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize)]
#[serde(transparent)]
pub struct InfoHash(
#[serde(
serialize_with = "serialize_20_bytes",
)]
pub [u8; 20]
);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum AnnounceEvent {
Started,
Stopped,
Completed,
Empty
}
impl Default for AnnounceEvent {
fn default() -> Self {
Self::Empty
}
}
impl FromStr for AnnounceEvent {
type Err = String;
fn from_str(value: &str) -> std::result::Result<Self, String> {
match value {
"started" => Ok(Self::Started),
"stopped" => Ok(Self::Stopped),
"completed" => Ok(Self::Completed),
"empty" => Ok(Self::Empty),
value => Err(format!("Unknown value: {}", value))
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for InfoHash {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
let mut arr = [b'x'; 20];
arr[0] = u8::arbitrary(g);
arr[1] = u8::arbitrary(g);
arr[18] = u8::arbitrary(g);
arr[19] = u8::arbitrary(g);
Self(arr)
}
}

View file

@ -1,4 +0,0 @@
pub mod common;
pub mod request;
pub mod response;
mod utils;

View file

@ -1,323 +0,0 @@
use anyhow::Context;
use hashbrown::HashMap;
use smartstring::{SmartString, LazyCompact};
use super::common::*;
use super::utils::*;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AnnounceRequest {
pub info_hash: InfoHash,
pub peer_id: PeerId,
pub port: u16,
pub bytes_left: usize,
pub event: AnnounceEvent,
pub compact: bool,
/// Number of response peers wanted
pub numwant: Option<usize>,
pub key: Option<SmartString<LazyCompact>>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ScrapeRequest {
pub info_hashes: Vec<InfoHash>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Request {
Announce(AnnounceRequest),
Scrape(ScrapeRequest),
}
impl Request {
/// Parse Request from http path (GET `/announce?info_hash=...`)
///
/// Existing serde-url decode crates were insufficient, so the decision was
/// made to create a custom parser. serde_urlencoded doesn't support multiple
/// values with same key, and serde_qs pulls in lots of dependencies. Both
/// would need preprocessing for the binary format used for info_hash and
/// peer_id.
pub fn from_http_get_path(path: &str) -> anyhow::Result<Self> {
::log::debug!("request GET path: {}", path);
let mut split_parts= path.splitn(2, '?');
let location = split_parts.next()
.with_context(|| "no location")?;
let query_string = split_parts.next()
.with_context(|| "no query string")?;
let mut info_hashes = Vec::new();
let mut data = HashMap::new();
Self::parse_key_value_pairs_memchr(
&mut info_hashes,
&mut data,
query_string
)?;
if location == "/announce" {
let numwant = if let Some(s) = data.remove("numwant"){
let numwant = s.parse::<usize>()
.map_err(|err|
anyhow::anyhow!("parse 'numwant': {}", err)
)?;
Some(numwant)
} else {
None
};
let key = if let Some(s) = data.remove("key"){
if s.len() > 100 {
return Err(anyhow::anyhow!("'key' is too long"))
}
Some(s)
} else {
None
};
let port = if let Some(port) = data.remove("port"){
port.parse().with_context(|| "parse port")?
} else {
return Err(anyhow::anyhow!("no port"));
};
let bytes_left = if let Some(left) = data.remove("left"){
left.parse().with_context(|| "parse bytes left")?
} else {
return Err(anyhow::anyhow!("no left"));
};
let event = if let Some(event) = data.remove("event"){
if let Ok(event) = event.parse(){
event
} else {
return Err(anyhow::anyhow!("invalid event: {}", event));
}
} else {
AnnounceEvent::default()
};
let compact = if let Some(compact) = data.remove("compact"){
if compact.as_str() == "1" {
true
} else {
return Err(anyhow::anyhow!("compact set, but not to 1"));
}
} else {
true
};
let request = AnnounceRequest {
info_hash: info_hashes.pop()
.with_context(|| "no info_hash")
.and_then(deserialize_20_bytes)
.map(InfoHash)?,
peer_id: data.remove("peer_id")
.with_context(|| "no peer_id")
.and_then(deserialize_20_bytes)
.map(PeerId)?,
port,
bytes_left,
event,
compact,
numwant,
key,
};
Ok(Request::Announce(request))
} else {
let mut parsed_info_hashes = Vec::with_capacity(info_hashes.len());
for info_hash in info_hashes {
parsed_info_hashes.push(InfoHash(deserialize_20_bytes(info_hash)?));
}
let request = ScrapeRequest {
info_hashes: parsed_info_hashes,
};
Ok(Request::Scrape(request))
}
}
/// Seems to be somewhat faster than non-memchr version
fn parse_key_value_pairs_memchr<'a>(
info_hashes: &mut Vec<SmartString<LazyCompact>>,
data: &mut HashMap<&'a str, SmartString<LazyCompact>>,
query_string: &'a str,
) -> anyhow::Result<()> {
let query_string_bytes = query_string.as_bytes();
let mut ampersand_iter = ::memchr::memchr_iter(b'&', query_string_bytes);
let mut position = 0usize;
for equal_sign_index in ::memchr::memchr_iter(b'=', query_string_bytes){
let segment_end = ampersand_iter.next()
.unwrap_or(query_string.len());
let key = query_string.get(position..equal_sign_index)
.with_context(|| format!("no key at {}..{}", position, equal_sign_index))?;
let value = query_string.get(equal_sign_index + 1..segment_end)
.with_context(|| format!("no value at {}..{}", equal_sign_index + 1, segment_end))?;
// whitelist keys to avoid having to use ddos-resistant hashmap
match key {
"info_hash" => {
let value = Self::urldecode_memchr(value)?;
info_hashes.push(value);
},
"peer_id" | "port" | "left" | "event" | "compact" | "numwant" | "key" => {
let value = Self::urldecode_memchr(value)?;
data.insert(key, value);
},
k => {
::log::info!("ignored unrecognized key: {}", k)
}
}
if segment_end == query_string.len(){
break
} else {
position = segment_end + 1;
}
}
Ok(())
}
/// The info hashes and peer id's that are received are url-encoded byte
/// by byte, e.g., %fa for byte 0xfa. However, they need to be parsed as
/// UTF-8 string, meaning that non-ascii bytes are invalid characters.
/// Therefore, these bytes must be converted to their equivalent multi-byte
/// UTF-8 encodings.
fn urldecode(value: &str) -> anyhow::Result<String> {
let mut processed = String::new();
for (i, part) in value.split('%').enumerate(){
if i == 0 {
processed.push_str(part);
} else if part.len() >= 2 {
let mut two_first = String::with_capacity(2);
for (j, c) in part.chars().enumerate(){
if j == 0 {
two_first.push(c);
} else if j == 1 {
two_first.push(c);
let byte = u8::from_str_radix(&two_first, 16)?;
processed.push(byte as char);
} else {
processed.push(c);
}
}
} else {
return Err(anyhow::anyhow!(
"url decode: too few characters in '%{}'", part
))
}
}
Ok(processed)
}
/// Quite a bit faster than non-memchr version
fn urldecode_memchr(value: &str) -> anyhow::Result<SmartString<LazyCompact>> {
let mut processed = SmartString::new();
let bytes = value.as_bytes();
let iter = ::memchr::memchr_iter(b'%', bytes);
let mut str_index_after_hex = 0usize;
for i in iter {
match (bytes.get(i), bytes.get(i + 1), bytes.get(i + 2)){
(Some(0..=127), Some(0..=127), Some(0..=127)) => {
if i > 0 {
processed.push_str(&value[str_index_after_hex..i]);
}
str_index_after_hex = i + 3;
let hex = &value[i + 1..i + 3];
let byte = u8::from_str_radix(&hex, 16)?;
processed.push(byte as char);
},
_ => {
return Err(anyhow::anyhow!(
"invalid urlencoded segment at byte {} in {}", i, value
));
}
}
}
if let Some(rest_of_str) = value.get(str_index_after_hex..){
processed.push_str(rest_of_str);
}
Ok(processed)
}
}
#[cfg(test)]
mod tests {
use super::*;
static ANNOUNCE_REQUEST_PATH: &str = "/announce?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9&peer_id=-ABC940-5ert69muw5t8&port=12345&uploaded=0&downloaded=0&left=1&numwant=0&key=4ab4b877&compact=1&supportcrypto=1&event=started";
static SCRAPE_REQUEST_PATH: &str = "/scrape?info_hash=%04%0bkV%3f%5cr%14%a6%b7%98%adC%c3%c9.%40%24%00%b9";
static REFERENCE_INFO_HASH: [u8; 20] = [0x04, 0x0b, b'k', b'V', 0x3f, 0x5c, b'r', 0x14, 0xa6, 0xb7, 0x98, 0xad, b'C', 0xc3, 0xc9, b'.', 0x40, 0x24, 0x00, 0xb9];
static REFERENCE_PEER_ID: [u8; 20] = [b'-', b'A', b'B', b'C', b'9', b'4', b'0', b'-', b'5', b'e', b'r', b't', b'6', b'9', b'm', b'u', b'w', b'5', b't', b'8'];
#[test]
fn test_urldecode(){
let f = Request::urldecode_memchr;
assert_eq!(f("").unwrap(), "".to_string());
assert_eq!(f("abc").unwrap(), "abc".to_string());
assert_eq!(f("%21").unwrap(), "!".to_string());
assert_eq!(f("%21%3D").unwrap(), "!=".to_string());
assert_eq!(f("abc%21def%3Dghi").unwrap(), "abc!def=ghi".to_string());
assert!(f("%").is_err());
assert!(f("%å7").is_err());
}
#[test]
fn test_announce_request_from_path(){
let parsed_request = Request::from_http_get_path(
ANNOUNCE_REQUEST_PATH
).unwrap();
let reference_request = Request::Announce(AnnounceRequest {
info_hash: InfoHash(REFERENCE_INFO_HASH),
peer_id: PeerId(REFERENCE_PEER_ID),
port: 12345,
bytes_left: 1,
event: AnnounceEvent::Started,
compact: true,
numwant: Some(0),
key: Some("4ab4b877".into())
});
assert_eq!(parsed_request, reference_request);
}
#[test]
fn test_scrape_request_from_path(){
let parsed_request = Request::from_http_get_path(
SCRAPE_REQUEST_PATH
).unwrap();
let reference_request = Request::Scrape(ScrapeRequest {
info_hashes: vec![InfoHash(REFERENCE_INFO_HASH)],
});
assert_eq!(parsed_request, reference_request);
}
}

View file

@ -1,317 +0,0 @@
use std::net::{Ipv4Addr, Ipv6Addr};
use std::collections::BTreeMap;
use serde::Serialize;
use super::common::*;
use super::utils::*;
#[derive(Debug, Clone, Serialize)]
pub struct ResponsePeer<I>{
pub ip_address: I,
pub port: u16
}
#[derive(Debug, Clone, Serialize)]
#[serde(transparent)]
pub struct ResponsePeerListV4(
#[serde(serialize_with = "serialize_response_peers_ipv4")]
pub Vec<ResponsePeer<Ipv4Addr>>
);
#[derive(Debug, Clone, Serialize)]
#[serde(transparent)]
pub struct ResponsePeerListV6(
#[serde(serialize_with = "serialize_response_peers_ipv6")]
pub Vec<ResponsePeer<Ipv6Addr>>
);
#[derive(Debug, Clone, Serialize)]
pub struct ScrapeStatistics {
pub complete: usize,
pub incomplete: usize,
pub downloaded: usize,
}
#[derive(Debug, Clone, Serialize)]
pub struct AnnounceResponse {
#[serde(rename = "interval")]
pub announce_interval: usize,
pub complete: usize,
pub incomplete: usize,
pub peers: ResponsePeerListV4,
pub peers6: ResponsePeerListV6,
}
impl AnnounceResponse {
fn to_bytes(&self) -> Vec<u8> {
let peers_bytes_len = self.peers.0.len() * 6;
let peers6_bytes_len = self.peers6.0.len() * 18;
let mut bytes = Vec::with_capacity(
12 +
5 + // Upper estimate
15 +
5 + // Upper estimate
12 +
5 + // Upper estimate
8 +
peers_bytes_len +
8 +
peers6_bytes_len +
1
);
bytes.extend_from_slice(b"d8:completei");
let _ = itoa::write(&mut bytes, self.complete);
bytes.extend_from_slice(b"e10:incompletei");
let _ = itoa::write(&mut bytes, self.incomplete);
bytes.extend_from_slice(b"e8:intervali");
let _ = itoa::write(&mut bytes, self.announce_interval);
bytes.extend_from_slice(b"e5:peers");
let _ = itoa::write(&mut bytes, peers_bytes_len);
bytes.push(b':');
for peer in self.peers.0.iter() {
bytes.extend_from_slice(&u32::from(peer.ip_address).to_be_bytes());
bytes.extend_from_slice(&peer.port.to_be_bytes())
}
bytes.extend_from_slice(b"6:peers6");
let _ = itoa::write(&mut bytes, peers6_bytes_len);
bytes.push(b':');
for peer in self.peers6.0.iter() {
bytes.extend_from_slice(&u128::from(peer.ip_address).to_be_bytes());
bytes.extend_from_slice(&peer.port.to_be_bytes())
}
bytes.push(b'e');
bytes
}
}
#[derive(Debug, Clone, Serialize)]
pub struct ScrapeResponse {
/// BTreeMap instead of HashMap since keys need to be serialized in order
pub files: BTreeMap<InfoHash, ScrapeStatistics>,
}
impl ScrapeResponse {
fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::with_capacity(
9 +
self.files.len() * (
3 +
20 +
12 +
5 + // Upper estimate
31 +
5 + // Upper estimate
2
) +
2
);
bytes.extend_from_slice(b"d5:filesd");
for (info_hash, statistics) in self.files.iter(){
bytes.extend_from_slice(b"20:");
bytes.extend_from_slice(&info_hash.0);
bytes.extend_from_slice(b"d8:completei");
let _ = itoa::write(&mut bytes, statistics.complete);
bytes.extend_from_slice(b"e10:downloadedi0e10:incompletei");
let _ = itoa::write(&mut bytes, statistics.incomplete);
bytes.extend_from_slice(b"ee");
}
bytes.extend_from_slice(b"ee");
bytes
}
}
#[derive(Debug, Clone, Serialize)]
pub struct FailureResponse {
pub failure_reason: String,
}
impl FailureResponse {
fn to_bytes(&self) -> Vec<u8> {
let reason_bytes = self.failure_reason.as_bytes();
let mut bytes = Vec::with_capacity(
18 +
3 + // Upper estimate
1 +
reason_bytes.len() +
1
);
bytes.extend_from_slice(b"d14:failure_reason");
let _ = itoa::write(&mut bytes, reason_bytes.len());
bytes.push(b':');
bytes.extend_from_slice(reason_bytes);
bytes.push(b'e');
bytes
}
}
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub enum Response {
Announce(AnnounceResponse),
Scrape(ScrapeResponse),
Failure(FailureResponse),
}
impl Response {
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Response::Announce(r) => r.to_bytes(),
Response::Failure(r) => r.to_bytes(),
Response::Scrape(r) => r.to_bytes(),
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for ResponsePeer<Ipv4Addr> {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
Self {
ip_address: Ipv4Addr::arbitrary(g),
port: u16::arbitrary(g)
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for ResponsePeer<Ipv6Addr> {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
Self {
ip_address: Ipv6Addr::arbitrary(g),
port: u16::arbitrary(g)
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for ResponsePeerListV4 {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
Self(Vec::arbitrary(g))
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for ResponsePeerListV6 {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
Self(Vec::arbitrary(g))
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for ScrapeStatistics {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
Self {
complete: usize::arbitrary(g),
incomplete: usize::arbitrary(g),
downloaded: 0,
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for AnnounceResponse {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
Self {
announce_interval: usize::arbitrary(g),
complete: usize::arbitrary(g),
incomplete: usize::arbitrary(g),
peers: ResponsePeerListV4::arbitrary(g),
peers6: ResponsePeerListV6::arbitrary(g),
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for ScrapeResponse {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
Self {
files: BTreeMap::arbitrary(g),
}
}
}
#[cfg(test)]
impl quickcheck::Arbitrary for FailureResponse {
fn arbitrary<G: quickcheck::Gen>(g: &mut G) -> Self {
Self {
failure_reason: String::arbitrary(g),
}
}
}
#[cfg(test)]
mod tests {
use quickcheck_macros::*;
use super::*;
#[quickcheck]
fn test_announce_response_to_bytes(response: AnnounceResponse) -> bool {
let reference = bendy::serde::to_bytes(
&Response::Announce(response.clone())
).unwrap();
response.to_bytes() == reference
}
#[quickcheck]
fn test_scrape_response_to_bytes(response: ScrapeResponse) -> bool {
let reference = bendy::serde::to_bytes(
&Response::Scrape(response.clone())
).unwrap();
let hand_written = response.to_bytes();
let success = hand_written == reference;
if !success {
println!("reference: {}", String::from_utf8_lossy(&reference));
println!("hand_written: {}", String::from_utf8_lossy(&hand_written));
}
success
}
#[quickcheck]
fn test_failure_response_to_bytes(response: FailureResponse) -> bool {
let reference = bendy::serde::to_bytes(
&Response::Failure(response.clone())
).unwrap();
response.to_bytes() == reference
}
}

View file

@ -1,73 +0,0 @@
use std::net::{Ipv4Addr, Ipv6Addr};
use serde::Serializer;
use smartstring::{SmartString, LazyCompact};
use super::response::ResponsePeer;
/// Not for serde
pub fn deserialize_20_bytes(value: SmartString<LazyCompact>) -> anyhow::Result<[u8; 20]> {
let mut arr = [0u8; 20];
let mut char_iter = value.chars();
for a in arr.iter_mut(){
if let Some(c) = char_iter.next(){
if c as u32 > 255 {
return Err(anyhow::anyhow!(
"character not in single byte range: {:#?}",
c
));
}
*a = c as u8;
} else {
return Err(anyhow::anyhow!("less than 20 bytes: {:#?}", value));
}
}
if char_iter.next().is_some(){
Err(anyhow::anyhow!("more than 20 bytes: {:#?}", value))
} else {
Ok(arr)
}
}
#[inline]
pub fn serialize_20_bytes<S>(
bytes: &[u8; 20],
serializer: S
) -> Result<S::Ok, S::Error> where S: Serializer {
serializer.serialize_bytes(bytes)
}
pub fn serialize_response_peers_ipv4<S>(
response_peers: &[ResponsePeer<Ipv4Addr>],
serializer: S
) -> Result<S::Ok, S::Error> where S: Serializer {
let mut bytes = Vec::with_capacity(response_peers.len() * 6);
for peer in response_peers {
bytes.extend_from_slice(&u32::from(peer.ip_address).to_be_bytes());
bytes.extend_from_slice(&peer.port.to_be_bytes())
}
serializer.serialize_bytes(&bytes)
}
pub fn serialize_response_peers_ipv6<S>(
response_peers: &[ResponsePeer<Ipv6Addr>],
serializer: S
) -> Result<S::Ok, S::Error> where S: Serializer {
let mut bytes = Vec::with_capacity(response_peers.len() * 6);
for peer in response_peers {
bytes.extend_from_slice(&u128::from(peer.ip_address).to_be_bytes());
bytes.extend_from_slice(&peer.port.to_be_bytes())
}
serializer.serialize_bytes(&bytes)
}