Merge pull request #50 from greatest-ape/work-2022-02-03

Simplify http request buffering, use CanonicalSocketAddr in http and ws, update deps and TODO
This commit is contained in:
Joakim Frostegård 2022-02-04 10:42:59 +01:00 committed by GitHub
commit 7c57548565
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 142 additions and 177 deletions

144
Cargo.lock generated
View file

@ -39,9 +39,9 @@ dependencies = [
[[package]]
name = "anyhow"
version = "1.0.51"
version = "1.0.53"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b26702f315f53b6071259e15dd9d64528213b44d61de1ec926eca7715d62203"
checksum = "94a45b455c14666b85fc40a019e8ab9eb75e3a124e05494f5397122bc9eb06e0"
[[package]]
name = "aquatic"
@ -182,7 +182,7 @@ dependencies = [
"serde",
"signal-hook",
"slab",
"socket2 0.4.2",
"socket2 0.4.4",
"tinytemplate",
"toml_config",
]
@ -222,7 +222,7 @@ dependencies = [
"rand",
"rand_distr",
"serde",
"socket2 0.4.2",
"socket2 0.4.4",
"toml_config",
]
@ -267,7 +267,7 @@ dependencies = [
"serde",
"signal-hook",
"slab",
"socket2 0.4.2",
"socket2 0.4.4",
"toml_config",
"tungstenite",
]
@ -366,9 +366,9 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
[[package]]
name = "backtrace"
version = "0.3.63"
version = "0.3.64"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "321629d8ba6513061f26707241fa9bc89524ff1cd7a915a97ef0c62c666ce1b6"
checksum = "5e121dee8023ce33ab248d9ce1493df03c3b38a659b240096fcbd7048ff9c31f"
dependencies = [
"addr2line",
"cc",
@ -443,9 +443,9 @@ checksum = "8ff9f338986406db85e2b5deb40a9255b796ca03a194c7457403d215173f3fd5"
[[package]]
name = "bumpalo"
version = "3.8.0"
version = "3.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1e260c3a9040a7c19a12468758f4c16f31a81a1fe087482be9570ec864bb6c"
checksum = "a4a45a46ab1f2412e53d3a0ade76ffad2025804294569aae387231a0cd6e0899"
[[package]]
name = "byteorder"
@ -593,9 +593,9 @@ dependencies = [
[[package]]
name = "crossbeam-channel"
version = "0.5.1"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06ed27e177f16d65f0f0c22a213e17c696ace5dd64b14258b52f9417ccb52db4"
checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa"
dependencies = [
"cfg-if",
"crossbeam-utils",
@ -614,9 +614,9 @@ dependencies = [
[[package]]
name = "crossbeam-epoch"
version = "0.9.5"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ec02e091aa634e2c3ada4a392989e7c3116673ef0ac5b72232439094d73b7fd"
checksum = "97242a70df9b89a65d0b6df3c4bf5b9ce03c5b7309019777fbde37e7537f8762"
dependencies = [
"cfg-if",
"crossbeam-utils",
@ -627,9 +627,9 @@ dependencies = [
[[package]]
name = "crossbeam-queue"
version = "0.3.2"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b10ddc024425c88c2ad148c1b0fd53f4c6d38db9697c9f1588381212fa657c9"
checksum = "b979d76c9fcb84dffc80a73f7290da0f83e4c95773494674cb44b76d13a7a110"
dependencies = [
"cfg-if",
"crossbeam-utils",
@ -637,9 +637,9 @@ dependencies = [
[[package]]
name = "crossbeam-utils"
version = "0.8.5"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82cfc11ce7f2c3faef78d8a684447b40d503d9681acebed6cb728d45940c4db"
checksum = "cfcae03edb34f947e64acdb1c33ec169824e20657e9ecb61cef6c8c74dcb8120"
dependencies = [
"cfg-if",
"lazy_static",
@ -749,9 +749,9 @@ dependencies = [
[[package]]
name = "fastrand"
version = "1.6.0"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "779d043b6a0b90cc4c0ed7ee380a6504394cee7efd7db050e3774eee387324b2"
checksum = "c3fcf0cee53519c866c09b5de1f6c56ff9d647101f81c1964fa632e148896cdf"
dependencies = [
"instant",
]
@ -898,9 +898,9 @@ dependencies = [
[[package]]
name = "generic-array"
version = "0.14.4"
version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "501466ecc8a30d1d3b7fc9229b122b2ce8ed6e9d9223f1138d4babb253e51817"
checksum = "fd48d33ec7f05fbfa152300fdad764757cbded343c1aa1cff2fbaf4134851803"
dependencies = [
"typenum",
"version_check",
@ -908,9 +908,9 @@ dependencies = [
[[package]]
name = "getrandom"
version = "0.2.3"
version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c"
dependencies = [
"cfg-if",
"libc",
@ -1049,13 +1049,13 @@ checksum = "12cb882ccb290b8646e554b157ab0b71e64e8d5bef775cd66b6531e52d302669"
[[package]]
name = "http"
version = "0.2.5"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1323096b05d41827dadeaee54c9981958c0f94e670bc94ed80037d1a7b8b186b"
checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"
dependencies = [
"bytes",
"fnv",
"itoa 0.4.8",
"itoa 1.0.1",
]
[[package]]
@ -1154,9 +1154,9 @@ checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35"
[[package]]
name = "js-sys"
version = "0.3.55"
version = "0.3.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84"
checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04"
dependencies = [
"wasm-bindgen",
]
@ -1179,9 +1179,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]]
name = "libc"
version = "0.2.112"
version = "0.2.117"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125"
checksum = "e74d72e0f9b65b5b4ca49a346af3976df0f9c61d550727f349ecd559f251a26c"
[[package]]
name = "libm"
@ -1200,9 +1200,9 @@ dependencies = [
[[package]]
name = "lock_api"
version = "0.4.5"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
checksum = "88943dd7ef4a2e5a4bfa2753aaab3013e34ce2533d1996fb18ef591e315e2b3b"
dependencies = [
"scopeguard",
]
@ -1476,9 +1476,9 @@ checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project-lite"
version = "0.2.7"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d31d11c69a6b52a174b42bdc0c30e5e11670f90788b2c471c31c1d17d449443"
checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c"
[[package]]
name = "pin-utils"
@ -1522,9 +1522,9 @@ dependencies = [
[[package]]
name = "ppv-lite86"
version = "0.2.15"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed0cfbc8191465bed66e1718596ee0b0b35d5ee1f41c5df2189d0fe8bde535ba"
checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "privdrop"
@ -1538,9 +1538,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
version = "1.0.34"
version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f84e92c0f7c9d58328b85a78557813e4bd845130db68d7184635344399423b1"
checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029"
dependencies = [
"unicode-xid",
]
@ -1569,9 +1569,9 @@ dependencies = [
[[package]]
name = "quote"
version = "1.0.10"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05"
checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145"
dependencies = [
"proc-macro2",
]
@ -1609,9 +1609,9 @@ dependencies = [
[[package]]
name = "rand_distr"
version = "0.4.2"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "964d548f8e7d12e102ef183a0de7e98180c9f8729f555897a857b96e48122d2f"
checksum = "32cb0b9bc82b0a0876c2dd994a7e7a2683d3e7390ca40e6886785ef0c7e3ee31"
dependencies = [
"num-traits",
"rand",
@ -1788,9 +1788,9 @@ checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012"
[[package]]
name = "serde"
version = "1.0.132"
version = "1.0.136"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9875c23cf305cd1fd7eb77234cbb705f21ea6a72c637a5c6db5fe4b8e7f008"
checksum = "ce31e24b01e1e524df96f1c2fdd054405f8d7376249a5110886fb4b658484789"
dependencies = [
"serde_derive",
]
@ -1826,9 +1826,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.132"
version = "1.0.136"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecc0db5cb2556c0e558887d9bbdcf6ac4471e83ff66cf696e5419024d1606276"
checksum = "08597e7152fcd306f41838ed3e37be9eaeed2b61c42e2117266a554fab4662f9"
dependencies = [
"proc-macro2",
"quote",
@ -1837,9 +1837,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.73"
version = "1.0.78"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bcbd0344bc6533bc7ec56df11d42fb70f1b912351c0825ccb7211b59d8af7cf5"
checksum = "d23c1ba4cf0efd44be32017709280b32d1cea5c3f1275c3b6d9e8bc54f758085"
dependencies = [
"itoa 1.0.1",
"ryu",
@ -1899,9 +1899,9 @@ checksum = "c970da16e7c682fa90a261cf0724dee241c9f7831635ecc4e988ae8f3b505559"
[[package]]
name = "simplelog"
version = "0.11.1"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ecabc0118918611790b8615670ab79296272cbe09496b6884b02b1e929c20886"
checksum = "c1348164456f72ca0116e4538bdaabb0ddb622c7d9f16387c725af3e96d6001c"
dependencies = [
"chrono",
"log",
@ -1916,9 +1916,9 @@ checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]]
name = "smallvec"
version = "1.7.0"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "smartstring"
@ -1942,9 +1942,9 @@ dependencies = [
[[package]]
name = "socket2"
version = "0.4.2"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516"
checksum = "66d72b759436ae32898a2af0a14218dbf55efde3feeb170eb623637db85ee1e0"
dependencies = [
"libc",
"winapi 0.3.9",
@ -1964,9 +1964,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "syn"
version = "1.0.82"
version = "1.0.86"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8daf5dd0bb60cbd4137b1b587d2fc0ae729bc07cf01cd70b36a1ed5ade3b9d59"
checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b"
dependencies = [
"proc-macro2",
"quote",
@ -2150,9 +2150,9 @@ dependencies = [
[[package]]
name = "typenum"
version = "1.14.0"
version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec"
checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "unicode-bidi"
@ -2225,9 +2225,9 @@ dependencies = [
[[package]]
name = "version_check"
version = "0.9.3"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fecdca9a5291cc2b8dcf7dc02453fee791a280f3743cb0905f8822ae463b3fe"
checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f"
[[package]]
name = "waker-fn"
@ -2254,9 +2254,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6"
[[package]]
name = "wasm-bindgen"
version = "0.2.78"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce"
checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06"
dependencies = [
"cfg-if",
"wasm-bindgen-macro",
@ -2264,9 +2264,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.78"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b"
checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca"
dependencies = [
"bumpalo",
"lazy_static",
@ -2279,9 +2279,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.78"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9"
checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@ -2289,9 +2289,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.78"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab"
checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc"
dependencies = [
"proc-macro2",
"quote",
@ -2302,15 +2302,15 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.78"
version = "0.2.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc"
checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2"
[[package]]
name = "web-sys"
version = "0.3.55"
version = "0.3.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb"
checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb"
dependencies = [
"js-sys",
"wasm-bindgen",

View file

@ -11,6 +11,8 @@
* implement socket_recv_size and ipv6_only in glommio implementations
* newer glommio versions might use SIGUSR1 internally, see glommio fe33e30
* CI
* file transfer CI for all implementations
* test access lists?

View file

@ -1,4 +1,4 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::time::{Duration, Instant};
use ahash::RandomState;
@ -92,19 +92,6 @@ where
}
}
#[inline]
pub fn convert_ipv4_mapped_ipv6(ip_address: IpAddr) -> IpAddr {
if let IpAddr::V6(ip) = ip_address {
if let [0, 0, 0, 0, 0, 0xffff, ..] = ip.segments() {
ip.to_ipv4().expect("convert ipv4-mapped ip").into()
} else {
ip_address
}
} else {
ip_address
}
}
/// SocketAddr that is not an IPv6-mapped IPv4 address
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)]
pub struct CanonicalSocketAddr(SocketAddr);

View file

@ -1,13 +1,13 @@
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::sync::Arc;
use std::time::Instant;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::AHashIndexMap;
use aquatic_common::{AHashIndexMap, CanonicalSocketAddr};
use either::Either;
use smartstring::{LazyCompact, SmartString};
pub use aquatic_common::{convert_ipv4_mapped_ipv6, ValidUntil};
pub use aquatic_common::ValidUntil;
use aquatic_http_protocol::common::*;
use aquatic_http_protocol::response::ResponsePeer;
@ -31,13 +31,13 @@ pub struct ConnectionId(pub usize);
pub enum ChannelRequest {
Announce {
request: AnnounceRequest,
peer_addr: SocketAddr,
peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
response_consumer_id: ConsumerId,
},
Scrape {
request: ScrapeRequest,
peer_addr: SocketAddr,
peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
response_consumer_id: ConsumerId,
},
@ -47,12 +47,12 @@ pub enum ChannelRequest {
pub enum ChannelResponse {
Announce {
response: AnnounceResponse,
peer_addr: SocketAddr,
peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
},
Scrape {
response: ScrapeResponse,
peer_addr: SocketAddr,
peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
},
}
@ -64,7 +64,7 @@ impl ChannelResponse {
Self::Scrape { connection_id, .. } => *connection_id,
}
}
pub fn get_peer_addr(&self) -> SocketAddr {
pub fn get_peer_addr(&self) -> CanonicalSocketAddr {
match self {
Self::Announce { peer_addr, .. } => *peer_addr,
Self::Scrape { peer_addr, .. } => *peer_addr,
@ -82,7 +82,7 @@ pub struct ConnectionMeta {
/// Index of socket worker responsible for this connection. Required for
/// sending back response through correct channel to correct worker.
pub response_consumer_id: ConsumerId,
pub peer_addr: SocketAddr,
pub peer_addr: CanonicalSocketAddr,
/// Connection id local to socket worker
pub connection_id: ConnectionId,
}

View file

@ -159,11 +159,7 @@ pub fn handle_announce_request(
meta: ConnectionMeta,
request: AnnounceRequest,
) -> AnnounceResponse {
let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip());
::log::debug!("peer ip: {:?}", peer_ip);
match peer_ip {
match meta.peer_addr.get().ip() {
IpAddr::V4(peer_ip_address) => {
let torrent_data: &mut TorrentData<Ipv4Addr> =
torrent_maps.ipv4.entry(request.info_hash).or_default();
@ -323,7 +319,7 @@ pub fn handle_scrape_request(
files: BTreeMap::new(),
};
let peer_ip = convert_ipv4_mapped_ipv6(meta.peer_addr.ip());
let peer_ip = meta.peer_addr.get().ip();
// If request.info_hashes is empty, don't return scrape for all
// torrents, even though reference server does it. It is too expensive.

View file

@ -1,12 +1,12 @@
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::CanonicalSocketAddr;
use aquatic_http_protocol::common::InfoHash;
use aquatic_http_protocol::request::{Request, RequestParseError, ScrapeRequest};
use aquatic_http_protocol::response::{
@ -28,9 +28,8 @@ use slab::Slab;
use crate::common::*;
use crate::config::Config;
const INTERMEDIATE_BUFFER_SIZE: usize = 1024;
const MAX_REQUEST_SIZE: usize = 2048;
const MAX_RESPONSE_SIZE: usize = 4096;
const REQUEST_BUFFER_SIZE: usize = 2048;
const RESPONSE_BUFFER_SIZE: usize = 4096;
const RESPONSE_HEADER_A: &[u8] = b"HTTP/1.1 200 OK\r\nContent-Length: ";
const RESPONSE_HEADER_B: &[u8] = b" ";
@ -171,11 +170,11 @@ struct Connection {
response_receiver: LocalReceiver<ChannelResponse>,
response_consumer_id: ConsumerId,
stream: TlsStream<TcpStream>,
peer_addr: SocketAddr,
peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
request_buffer: [u8; MAX_REQUEST_SIZE],
request_buffer: [u8; REQUEST_BUFFER_SIZE],
request_buffer_position: usize,
response_buffer: [u8; MAX_RESPONSE_SIZE],
response_buffer: [u8; RESPONSE_BUFFER_SIZE],
}
impl Connection {
@ -192,11 +191,12 @@ impl Connection {
let peer_addr = stream
.peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?;
let peer_addr = CanonicalSocketAddr::new(peer_addr);
let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?;
let mut response_buffer = [0; MAX_RESPONSE_SIZE];
let mut response_buffer = [0; RESPONSE_BUFFER_SIZE];
response_buffer[..RESPONSE_HEADER.len()].copy_from_slice(&RESPONSE_HEADER);
@ -209,7 +209,7 @@ impl Connection {
stream,
peer_addr,
connection_id,
request_buffer: [0; MAX_REQUEST_SIZE],
request_buffer: [0; REQUEST_BUFFER_SIZE],
request_buffer_position: 0,
response_buffer,
};
@ -244,54 +244,44 @@ impl Connection {
}
async fn read_request(&mut self) -> anyhow::Result<Either<FailureResponse, Request>> {
let mut buf = [0u8; INTERMEDIATE_BUFFER_SIZE];
self.request_buffer_position = 0;
loop {
::log::debug!("read");
if self.request_buffer_position == self.request_buffer.len() {
return Err(anyhow::anyhow!("request buffer is full"));
}
let bytes_read = self.stream.read(&mut buf).await?;
let bytes_read = self
.stream
.read(&mut self.request_buffer[self.request_buffer_position..])
.await?;
if bytes_read == 0 {
return Err(anyhow::anyhow!("peer closed connection"));
}
let request_buffer_end = self.request_buffer_position + bytes_read;
self.request_buffer_position += bytes_read;
if request_buffer_end > self.request_buffer.len() {
return Err(anyhow::anyhow!("request too large"));
} else {
let request_buffer_slice =
&mut self.request_buffer[self.request_buffer_position..request_buffer_end];
match Request::from_bytes(&self.request_buffer[..self.request_buffer_position]) {
Ok(request) => {
::log::debug!("received request: {:?}", request);
request_buffer_slice.copy_from_slice(&buf[..bytes_read]);
return Ok(Either::Right(request));
}
Err(RequestParseError::Invalid(err)) => {
::log::debug!("invalid request: {:?}", err);
self.request_buffer_position = request_buffer_end;
let response = FailureResponse {
failure_reason: "Invalid request".into(),
};
match Request::from_bytes(&self.request_buffer[..self.request_buffer_position]) {
Ok(request) => {
::log::debug!("received request: {:?}", request);
self.request_buffer_position = 0;
return Ok(Either::Right(request));
}
Err(RequestParseError::Invalid(err)) => {
::log::debug!("invalid request: {:?}", err);
let response = FailureResponse {
failure_reason: "Invalid request".into(),
};
return Ok(Either::Left(response));
}
Err(RequestParseError::NeedMoreData) => {
::log::debug!(
"need more request data. current data: {:?}",
std::str::from_utf8(
&self.request_buffer[..self.request_buffer_position]
)
);
}
return Ok(Either::Left(response));
}
Err(RequestParseError::NeedMoreData) => {
::log::debug!(
"need more request data. current data: {:?}",
std::str::from_utf8(&self.request_buffer[..self.request_buffer_position])
);
}
}
}

View file

@ -16,7 +16,7 @@ pub fn handle_announce_request(
request_sender_meta: ConnectionMeta,
request: AnnounceRequest,
) {
let torrent_data: &mut TorrentData = if request_sender_meta.converted_peer_ip.is_ipv4() {
let torrent_data: &mut TorrentData = if request_sender_meta.peer_addr.is_ipv4() {
torrent_maps.ipv4.entry(request.info_hash).or_default()
} else {
torrent_maps.ipv6.entry(request.info_hash).or_default()
@ -25,11 +25,9 @@ pub fn handle_announce_request(
// If there is already a peer with this peer_id, check that socket
// addr is same as that of request sender. Otherwise, ignore request.
// Since peers have access to each others peer_id's, they could send
// requests using them, causing all sorts of issues. Checking naive
// (non-converted) socket addresses is enough, since state is split
// on converted peer ip.
// requests using them, causing all sorts of issues.
if let Some(previous_peer) = torrent_data.peers.get(&request.peer_id) {
if request_sender_meta.naive_peer_addr != previous_peer.connection_meta.naive_peer_addr {
if request_sender_meta.peer_addr != previous_peer.connection_meta.peer_addr {
return;
}
}
@ -167,7 +165,7 @@ pub fn handle_scrape_request(
files: HashMap::with_capacity(num_to_take),
};
let torrent_map: &mut TorrentMap = if meta.converted_peer_ip.is_ipv4() {
let torrent_map: &mut TorrentMap = if meta.peer_addr.is_ipv4() {
&mut torrent_maps.ipv4
} else {
&mut torrent_maps.ipv6

View file

@ -2,12 +2,11 @@ pub mod handlers;
use std::fs::File;
use std::io::BufReader;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Instant;
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::AHashIndexMap;
use aquatic_common::{AHashIndexMap, CanonicalSocketAddr};
pub use aquatic_common::ValidUntil;
@ -30,10 +29,7 @@ pub struct ConnectionMeta {
/// sending back response through correct channel to correct worker.
pub out_message_consumer_id: ConsumerId,
pub connection_id: ConnectionId,
/// Peer address as received from socket, meaning it wasn't converted to
/// an IPv4 address if it was a IPv4-mapped IPv6 address
pub naive_peer_addr: SocketAddr,
pub converted_peer_ip: IpAddr,
pub peer_addr: CanonicalSocketAddr,
pub pending_scrape_id: Option<PendingScrapeId>,
}

View file

@ -1,14 +1,13 @@
use std::borrow::Cow;
use std::cell::RefCell;
use std::collections::BTreeMap;
use std::net::SocketAddr;
use std::rc::Rc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use aquatic_common::access_list::{create_access_list_cache, AccessListArcSwap, AccessListCache};
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_common::CanonicalSocketAddr;
use aquatic_ws_protocol::*;
use async_tungstenite::WebSocketStream;
use futures::stream::{SplitSink, SplitStream};
@ -221,6 +220,7 @@ async fn run_connection(
let peer_addr = stream
.peer_addr()
.map_err(|err| anyhow::anyhow!("Couldn't get peer addr: {:?}", err))?;
let peer_addr = CanonicalSocketAddr::new(peer_addr);
let tls_acceptor: TlsAcceptor = tls_config.into();
let stream = tls_acceptor.accept(stream).await?;
@ -293,7 +293,7 @@ struct ConnectionReader {
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
out_message_consumer_id: ConsumerId,
ws_in: SplitStream<WebSocketStream<TlsStream<TcpStream>>>,
peer_addr: SocketAddr,
peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
}
@ -432,8 +432,7 @@ impl ConnectionReader {
ConnectionMeta {
connection_id: self.connection_id,
out_message_consumer_id: self.out_message_consumer_id,
naive_peer_addr: self.peer_addr,
converted_peer_ip: convert_ipv4_mapped_ipv6(self.peer_addr.ip()),
peer_addr: self.peer_addr,
pending_scrape_id,
}
}
@ -445,7 +444,7 @@ struct ConnectionWriter {
connection_slab: Rc<RefCell<Slab<ConnectionReference>>>,
ws_out: SplitSink<WebSocketStream<TlsStream<TcpStream>>, tungstenite::Message>,
pending_scrape_slab: Rc<RefCell<Slab<PendingScrapeResponse>>>,
peer_addr: SocketAddr,
peer_addr: CanonicalSocketAddr,
connection_id: ConnectionId,
}
@ -456,7 +455,7 @@ impl ConnectionWriter {
anyhow::anyhow!("ConnectionWriter couldn't receive message, sender is closed")
})?;
if meta.naive_peer_addr != self.peer_addr {
if meta.peer_addr != self.peer_addr {
return Err(anyhow::anyhow!("peer addresses didn't match"));
}
@ -530,7 +529,7 @@ impl ConnectionWriter {
Err(err) => {
::log::info!(
"send_out_message: send to {} took to long: {}",
self.peer_addr,
self.peer_addr.get(),
err
);

View file

@ -153,7 +153,7 @@ impl Connection<NotRegistered> {
}
pub fn close(self) {
::log::debug!("will close connection to {}", self.meta.naive_peer_addr);
::log::debug!("will close connection to {}", self.meta.peer_addr.get());
match self.state {
ConnectionState::TlsHandshaking(inner) => inner.close(),

View file

@ -4,13 +4,13 @@ use std::time::{Duration, Instant};
use anyhow::Context;
use aquatic_common::access_list::AccessListQuery;
use aquatic_common::CanonicalSocketAddr;
use hashbrown::HashMap;
use mio::net::TcpListener;
use mio::{Events, Interest, Poll, Token};
use socket2::{Domain, Protocol, Socket, Type};
use tungstenite::protocol::WebSocketConfig;
use aquatic_common::convert_ipv4_mapped_ipv6;
use aquatic_ws_protocol::*;
use crate::common::*;
@ -263,20 +263,17 @@ fn accept_new_streams(
loop {
match listener.accept() {
Ok((stream, _)) => {
let naive_peer_addr = if let Ok(peer_addr) = stream.peer_addr() {
peer_addr
let peer_addr = if let Ok(peer_addr) = stream.peer_addr() {
CanonicalSocketAddr::new(peer_addr)
} else {
continue;
};
connections.insert_and_register_new(poll, move |token| {
let converted_peer_ip = convert_ipv4_mapped_ipv6(naive_peer_addr.ip());
let meta = ConnectionMeta {
out_message_consumer_id: ConsumerId(socket_worker_index),
connection_id: ConnectionId(token.0),
naive_peer_addr,
converted_peer_ip,
peer_addr,
pending_scrape_id: None, // FIXME
};
@ -348,11 +345,11 @@ where
let mut remove_connection = false;
if let Some(connection) = connections.get_mut(&token) {
if connection.get_meta().naive_peer_addr != meta.naive_peer_addr {
if connection.get_meta().peer_addr != meta.peer_addr {
::log::warn!(
"socket worker error: connection socket addr {} didn't match channel {}. Token: {}.",
connection.get_meta().naive_peer_addr,
meta.naive_peer_addr,
connection.get_meta().peer_addr.get(),
meta.peer_addr.get(),
token.0
);