From 20eef8677d718b183ac80ce64cdb371a488154d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 27 Aug 2023 12:17:51 +0200 Subject: [PATCH 1/6] udp: add integration test --- aquatic_udp/tests/integration.rs | 175 +++++++++++++++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 aquatic_udp/tests/integration.rs diff --git a/aquatic_udp/tests/integration.rs b/aquatic_udp/tests/integration.rs new file mode 100644 index 0000000..a413d88 --- /dev/null +++ b/aquatic_udp/tests/integration.rs @@ -0,0 +1,175 @@ +use std::{ + collections::{hash_map::RandomState, HashSet}, + io::Cursor, + net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket}, + time::Duration, +}; + +use anyhow::Context; +use aquatic_udp::{common::BUFFER_SIZE, config::Config}; +use aquatic_udp_protocol::{ + common::PeerId, AnnounceEvent, AnnounceRequest, AnnounceResponse, ConnectRequest, InfoHash, + NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, TransactionId, +}; + +const PEER_PORT_START: u16 = 30_000; +const TRACKER_PORT: u16 = 40_111; +const PEERS_WANTED: usize = 10; + +#[test] +fn test_multiple_connect_and_announce() { + let config = Config::default(); + + let tracker_port = run_tracker(config); + + let info_hash = InfoHash([0; 20]); + + let mut num_seeders = 0; + let mut num_leechers = 0; + + for i in 0..20 { + let is_seeder = i % 3 == 0; + + if is_seeder { + num_seeders += 1; + } else { + num_leechers += 1; + } + + let response = match connect_and_announce( + tracker_port, + PEER_PORT_START + i as u16, + info_hash, + is_seeder, + ) { + Ok(response) => response, + Err(err) => { + panic!("connect_and_announce failed: {:#}", err); + } + }; + + assert_eq!(response.peers.len(), i.min(PEERS_WANTED)); + + assert_eq!(response.seeders.0, num_seeders); + assert_eq!(response.leechers.0, num_leechers); + + let response_peer_ports: HashSet = + HashSet::from_iter(response.peers.iter().map(|p| p.port.0)); + let expected_peer_ports: HashSet = + HashSet::from_iter((0..i).map(|i| PEER_PORT_START + i as u16)); + + if i > PEERS_WANTED { + assert!(response_peer_ports.is_subset(&expected_peer_ports)); + } else { + assert_eq!(response_peer_ports, expected_peer_ports); + } + } +} + +// FIXME: should ideally try different ports and use sync primitives to find +// out if tracker was successfully started +fn run_tracker(mut config: Config) -> u16 { + let port = TRACKER_PORT; + + config.network.address.set_port(port); + + ::std::thread::spawn(move || { + aquatic_udp::run(config).unwrap(); + }); + + ::std::thread::sleep(Duration::from_secs(1)); + + port +} + +fn connect_and_announce( + tracker_port: u16, + peer_port: u16, + info_hash: InfoHash, + seeder: bool, +) -> anyhow::Result> { + let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); + let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, tracker_port)); + + let socket = UdpSocket::bind(peer_addr)?; + let mut buffer = [0u8; BUFFER_SIZE]; + + socket.set_read_timeout(Some(Duration::from_secs(1)))?; + + { + let mut buffer = Cursor::new(&mut buffer[..]); + + Request::Connect(ConnectRequest { + transaction_id: TransactionId(0), + }) + .write(&mut buffer) + .with_context(|| "write connect request")?; + + let bytes_written = buffer.position() as usize; + + socket + .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) + .with_context(|| "send connect request")?; + } + + let connection_id = { + let (bytes_read, _) = socket + .recv_from(&mut buffer) + .with_context(|| "recv connect response")?; + + if let Response::Connect(response) = Response::from_bytes(&buffer[..bytes_read], true) + .with_context(|| "parse connect response")? + { + response.connection_id + } else { + panic!("Not connect response"); + } + }; + + { + let mut buffer = Cursor::new(&mut buffer[..]); + + let mut peer_id = PeerId([0; 20]); + + for chunk in peer_id.0.chunks_exact_mut(2) { + chunk.copy_from_slice(&peer_port.to_ne_bytes()); + } + + Request::Announce(AnnounceRequest { + connection_id, + transaction_id: TransactionId(0), + info_hash, + peer_id, + bytes_downloaded: NumberOfBytes(0), + bytes_uploaded: NumberOfBytes(0), + bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }), + event: AnnounceEvent::Started, + ip_address: None, + key: PeerKey(0), + peers_wanted: NumberOfPeers(PEERS_WANTED as i32), + port: Port(peer_port), + }) + .write(&mut buffer) + .with_context(|| "write announce request")?; + + let bytes_written = buffer.position() as usize; + + socket + .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) + .with_context(|| "send announce request")?; + } + + { + let (bytes_read, _) = socket + .recv_from(&mut buffer) + .with_context(|| "recv announce response")?; + + if let Response::AnnounceIpv4(response) = Response::from_bytes(&buffer[..bytes_read], true) + .with_context(|| "parse announce response")? + { + Ok(response) + } else { + panic!("Not announce response"); + } + } +} From 589d45a05d4508c9a32a8b46e27b53611d70d2cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 27 Aug 2023 17:54:26 +0200 Subject: [PATCH 2/6] udp: integration test: test scrape too, refactor --- aquatic_udp/tests/integration.rs | 242 +++++++++++++++++-------------- 1 file changed, 136 insertions(+), 106 deletions(-) diff --git a/aquatic_udp/tests/integration.rs b/aquatic_udp/tests/integration.rs index a413d88..147294a 100644 --- a/aquatic_udp/tests/integration.rs +++ b/aquatic_udp/tests/integration.rs @@ -8,19 +8,26 @@ use std::{ use anyhow::Context; use aquatic_udp::{common::BUFFER_SIZE, config::Config}; use aquatic_udp_protocol::{ - common::PeerId, AnnounceEvent, AnnounceRequest, AnnounceResponse, ConnectRequest, InfoHash, - NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, TransactionId, + common::PeerId, AnnounceEvent, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectionId, + InfoHash, NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, ScrapeRequest, + ScrapeResponse, TransactionId, }; -const PEER_PORT_START: u16 = 30_000; -const TRACKER_PORT: u16 = 40_111; const PEERS_WANTED: usize = 10; #[test] -fn test_multiple_connect_and_announce() { - let config = Config::default(); +fn test_multiple_connect_announce_scrape() -> anyhow::Result<()> { + const TRACKER_PORT: u16 = 40_111; + const PEER_PORT_START: u16 = 30_000; - let tracker_port = run_tracker(config); + let mut config = Config::default(); + + config.network.address.set_port(TRACKER_PORT); + + run_tracker(config); + + let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TRACKER_PORT)); + let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); let info_hash = InfoHash([0; 20]); @@ -36,25 +43,28 @@ fn test_multiple_connect_and_announce() { num_leechers += 1; } - let response = match connect_and_announce( - tracker_port, + let socket = UdpSocket::bind(peer_addr)?; + socket.set_read_timeout(Some(Duration::from_secs(1)))?; + + let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; + + let announce_response = announce( + &socket, + tracker_addr, + connection_id, PEER_PORT_START + i as u16, info_hash, is_seeder, - ) { - Ok(response) => response, - Err(err) => { - panic!("connect_and_announce failed: {:#}", err); - } - }; + ) + .with_context(|| "announce")?; - assert_eq!(response.peers.len(), i.min(PEERS_WANTED)); + assert_eq!(announce_response.peers.len(), i.min(PEERS_WANTED)); - assert_eq!(response.seeders.0, num_seeders); - assert_eq!(response.leechers.0, num_leechers); + assert_eq!(announce_response.seeders.0, num_seeders); + assert_eq!(announce_response.leechers.0, num_leechers); let response_peer_ports: HashSet = - HashSet::from_iter(response.peers.iter().map(|p| p.port.0)); + HashSet::from_iter(announce_response.peers.iter().map(|p| p.port.0)); let expected_peer_ports: HashSet = HashSet::from_iter((0..i).map(|i| PEER_PORT_START + i as u16)); @@ -63,113 +73,133 @@ fn test_multiple_connect_and_announce() { } else { assert_eq!(response_peer_ports, expected_peer_ports); } + + let scrape_response = scrape( + &socket, + tracker_addr, + connection_id, + vec![info_hash, InfoHash([1; 20])], + ) + .with_context(|| "scrape")?; + + assert_eq!(scrape_response.torrent_stats[0].seeders.0, num_seeders); + assert_eq!(scrape_response.torrent_stats[0].leechers.0, num_leechers); + assert_eq!(scrape_response.torrent_stats[1].seeders.0, 0); + assert_eq!(scrape_response.torrent_stats[1].leechers.0, 0); } + + Ok(()) } // FIXME: should ideally try different ports and use sync primitives to find // out if tracker was successfully started -fn run_tracker(mut config: Config) -> u16 { - let port = TRACKER_PORT; - - config.network.address.set_port(port); - +fn run_tracker(config: Config) { ::std::thread::spawn(move || { aquatic_udp::run(config).unwrap(); }); ::std::thread::sleep(Duration::from_secs(1)); - - port } -fn connect_and_announce( - tracker_port: u16, +fn connect(socket: &UdpSocket, tracker_addr: SocketAddr) -> anyhow::Result { + let request = Request::Connect(ConnectRequest { + transaction_id: TransactionId(0), + }); + + let response = request_and_response(&socket, tracker_addr, request)?; + + if let Response::Connect(response) = response { + Ok(response.connection_id) + } else { + Err(anyhow::anyhow!("not connect response: {:?}", response)) + } +} + +fn announce( + socket: &UdpSocket, + tracker_addr: SocketAddr, + connection_id: ConnectionId, peer_port: u16, info_hash: InfoHash, seeder: bool, ) -> anyhow::Result> { - let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); - let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, tracker_port)); + let mut peer_id = PeerId([0; 20]); - let socket = UdpSocket::bind(peer_addr)?; - let mut buffer = [0u8; BUFFER_SIZE]; - - socket.set_read_timeout(Some(Duration::from_secs(1)))?; - - { - let mut buffer = Cursor::new(&mut buffer[..]); - - Request::Connect(ConnectRequest { - transaction_id: TransactionId(0), - }) - .write(&mut buffer) - .with_context(|| "write connect request")?; - - let bytes_written = buffer.position() as usize; - - socket - .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) - .with_context(|| "send connect request")?; + for chunk in peer_id.0.chunks_exact_mut(2) { + chunk.copy_from_slice(&peer_port.to_ne_bytes()); } - let connection_id = { - let (bytes_read, _) = socket - .recv_from(&mut buffer) - .with_context(|| "recv connect response")?; + let request = Request::Announce(AnnounceRequest { + connection_id, + transaction_id: TransactionId(0), + info_hash, + peer_id, + bytes_downloaded: NumberOfBytes(0), + bytes_uploaded: NumberOfBytes(0), + bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }), + event: AnnounceEvent::Started, + ip_address: None, + key: PeerKey(0), + peers_wanted: NumberOfPeers(PEERS_WANTED as i32), + port: Port(peer_port), + }); - if let Response::Connect(response) = Response::from_bytes(&buffer[..bytes_read], true) - .with_context(|| "parse connect response")? - { - response.connection_id - } else { - panic!("Not connect response"); - } - }; + let response = request_and_response(&socket, tracker_addr, request)?; - { - let mut buffer = Cursor::new(&mut buffer[..]); - - let mut peer_id = PeerId([0; 20]); - - for chunk in peer_id.0.chunks_exact_mut(2) { - chunk.copy_from_slice(&peer_port.to_ne_bytes()); - } - - Request::Announce(AnnounceRequest { - connection_id, - transaction_id: TransactionId(0), - info_hash, - peer_id, - bytes_downloaded: NumberOfBytes(0), - bytes_uploaded: NumberOfBytes(0), - bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }), - event: AnnounceEvent::Started, - ip_address: None, - key: PeerKey(0), - peers_wanted: NumberOfPeers(PEERS_WANTED as i32), - port: Port(peer_port), - }) - .write(&mut buffer) - .with_context(|| "write announce request")?; - - let bytes_written = buffer.position() as usize; - - socket - .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) - .with_context(|| "send announce request")?; - } - - { - let (bytes_read, _) = socket - .recv_from(&mut buffer) - .with_context(|| "recv announce response")?; - - if let Response::AnnounceIpv4(response) = Response::from_bytes(&buffer[..bytes_read], true) - .with_context(|| "parse announce response")? - { - Ok(response) - } else { - panic!("Not announce response"); - } + if let Response::AnnounceIpv4(response) = response { + Ok(response) + } else { + return Err(anyhow::anyhow!("not announce response: {:?}", response)); + } +} + +fn scrape( + socket: &UdpSocket, + tracker_addr: SocketAddr, + connection_id: ConnectionId, + info_hashes: Vec, +) -> anyhow::Result { + let request = Request::Scrape(ScrapeRequest { + connection_id, + transaction_id: TransactionId(0), + info_hashes, + }); + + let response = request_and_response(&socket, tracker_addr, request)?; + + if let Response::Scrape(response) = response { + Ok(response) + } else { + return Err(anyhow::anyhow!("not scrape response: {:?}", response)); + } +} + +fn request_and_response( + socket: &UdpSocket, + tracker_addr: SocketAddr, + request: Request, +) -> anyhow::Result { + let mut buffer = [0u8; BUFFER_SIZE]; + + { + let mut buffer = Cursor::new(&mut buffer[..]); + + request + .write(&mut buffer) + .with_context(|| "write request")?; + + let bytes_written = buffer.position() as usize; + + socket + .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) + .with_context(|| "send request")?; + } + + { + let (bytes_read, _) = socket + .recv_from(&mut buffer) + .with_context(|| "recv response")?; + + Ok(Response::from_bytes(&buffer[..bytes_read], true).with_context(|| "parse response")?) } } From 48e383b6a9e8b5d2cc10bb5eeb0f4655b58787c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 27 Aug 2023 18:20:38 +0200 Subject: [PATCH 3/6] udp: integration: add test for invalid connection id, refactor --- aquatic_udp/tests/integration.rs | 82 +++++++++++++++++++++++--------- 1 file changed, 59 insertions(+), 23 deletions(-) diff --git a/aquatic_udp/tests/integration.rs b/aquatic_udp/tests/integration.rs index 147294a..3903b43 100644 --- a/aquatic_udp/tests/integration.rs +++ b/aquatic_udp/tests/integration.rs @@ -8,17 +8,16 @@ use std::{ use anyhow::Context; use aquatic_udp::{common::BUFFER_SIZE, config::Config}; use aquatic_udp_protocol::{ - common::PeerId, AnnounceEvent, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectionId, - InfoHash, NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, ScrapeRequest, - ScrapeResponse, TransactionId, + common::PeerId, AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, InfoHash, + NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, ScrapeRequest, ScrapeResponse, + TransactionId, }; -const PEERS_WANTED: usize = 10; - #[test] fn test_multiple_connect_announce_scrape() -> anyhow::Result<()> { const TRACKER_PORT: u16 = 40_111; const PEER_PORT_START: u16 = 30_000; + const PEERS_WANTED: usize = 10; let mut config = Config::default(); @@ -48,15 +47,24 @@ fn test_multiple_connect_announce_scrape() -> anyhow::Result<()> { let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; - let announce_response = announce( - &socket, - tracker_addr, - connection_id, - PEER_PORT_START + i as u16, - info_hash, - is_seeder, - ) - .with_context(|| "announce")?; + let announce_response = { + let response = announce( + &socket, + tracker_addr, + connection_id, + PEER_PORT_START + i as u16, + info_hash, + PEERS_WANTED, + is_seeder, + ) + .with_context(|| "announce")?; + + if let Response::AnnounceIpv4(response) = response { + response + } else { + return Err(anyhow::anyhow!("not announce response: {:?}", response)); + } + }; assert_eq!(announce_response.peers.len(), i.min(PEERS_WANTED)); @@ -91,6 +99,39 @@ fn test_multiple_connect_announce_scrape() -> anyhow::Result<()> { Ok(()) } +#[test] +fn test_announce_with_invalid_connection_id() -> anyhow::Result<()> { + const TRACKER_PORT: u16 = 40_112; + + let mut config = Config::default(); + + config.network.address.set_port(TRACKER_PORT); + + let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TRACKER_PORT)); + let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); + + run_tracker(config); + + let socket = UdpSocket::bind(peer_addr)?; + socket.set_read_timeout(Some(Duration::from_secs(1)))?; + + let res_response = announce( + &socket, + tracker_addr, + ConnectionId(0), + 1, + InfoHash([0; 20]), + 100, + false, + ); + + // No response should be sent by tracker. Ideally, we would like to test + // that the error is in fact a would-block one on the socket. + assert!(matches!(res_response, Err(_))); + + Ok(()) +} + // FIXME: should ideally try different ports and use sync primitives to find // out if tracker was successfully started fn run_tracker(config: Config) { @@ -121,8 +162,9 @@ fn announce( connection_id: ConnectionId, peer_port: u16, info_hash: InfoHash, + peers_wanted: usize, seeder: bool, -) -> anyhow::Result> { +) -> anyhow::Result { let mut peer_id = PeerId([0; 20]); for chunk in peer_id.0.chunks_exact_mut(2) { @@ -140,17 +182,11 @@ fn announce( event: AnnounceEvent::Started, ip_address: None, key: PeerKey(0), - peers_wanted: NumberOfPeers(PEERS_WANTED as i32), + peers_wanted: NumberOfPeers(peers_wanted as i32), port: Port(peer_port), }); - let response = request_and_response(&socket, tracker_addr, request)?; - - if let Response::AnnounceIpv4(response) = response { - Ok(response) - } else { - return Err(anyhow::anyhow!("not announce response: {:?}", response)); - } + Ok(request_and_response(&socket, tracker_addr, request)?) } fn scrape( From b2f2ecf5ef45ef604cdf1a4449ca9cb6ec9a2794 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 27 Aug 2023 18:36:54 +0200 Subject: [PATCH 4/6] udp: integration: improve invalid connection id test --- aquatic_udp/tests/integration.rs | 57 +++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 16 deletions(-) diff --git a/aquatic_udp/tests/integration.rs b/aquatic_udp/tests/integration.rs index 3903b43..602b2f4 100644 --- a/aquatic_udp/tests/integration.rs +++ b/aquatic_udp/tests/integration.rs @@ -1,6 +1,6 @@ use std::{ collections::{hash_map::RandomState, HashSet}, - io::Cursor, + io::{Cursor, ErrorKind}, net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket}, time::Duration, }; @@ -107,29 +107,54 @@ fn test_announce_with_invalid_connection_id() -> anyhow::Result<()> { config.network.address.set_port(TRACKER_PORT); + run_tracker(config); + let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TRACKER_PORT)); let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); - run_tracker(config); - let socket = UdpSocket::bind(peer_addr)?; + socket.set_read_timeout(Some(Duration::from_secs(1)))?; - let res_response = announce( - &socket, - tracker_addr, - ConnectionId(0), - 1, - InfoHash([0; 20]), - 100, - false, - ); + // Make sure that the tracker in fact responds to requests + let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; - // No response should be sent by tracker. Ideally, we would like to test - // that the error is in fact a would-block one on the socket. - assert!(matches!(res_response, Err(_))); + let mut buffer = [0u8; BUFFER_SIZE]; - Ok(()) + { + let mut buffer = Cursor::new(&mut buffer[..]); + + let request = Request::Announce(AnnounceRequest { + connection_id: ConnectionId(!connection_id.0), + transaction_id: TransactionId(0), + info_hash: InfoHash([0; 20]), + peer_id: PeerId([0; 20]), + bytes_downloaded: NumberOfBytes(0), + bytes_uploaded: NumberOfBytes(0), + bytes_left: NumberOfBytes(0), + event: AnnounceEvent::Started, + ip_address: None, + key: PeerKey(0), + peers_wanted: NumberOfPeers(-1), + port: Port(1), + }); + + request + .write(&mut buffer) + .with_context(|| "write request")?; + + let bytes_written = buffer.position() as usize; + + socket + .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) + .with_context(|| "send request")?; + } + + match socket.recv_from(&mut buffer) { + Ok(_) => Err(anyhow::anyhow!("received response")), + Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(()), + Err(err) => Err(err.into()), + } } // FIXME: should ideally try different ports and use sync primitives to find From 10cd6f9a387b6b8aacfed8931feffec95c06a355 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 27 Aug 2023 19:04:43 +0200 Subject: [PATCH 5/6] udp: integration: add access list tests --- Cargo.lock | 31 +++++++++- aquatic_udp/Cargo.toml | 2 + aquatic_udp/tests/integration.rs | 97 +++++++++++++++++++++++++++++++- 3 files changed, 128 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9874064..5150048 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -249,6 +249,7 @@ dependencies = [ "signal-hook", "slab", "socket2 0.5.3", + "tempfile", "time", "tinytemplate", ] @@ -978,6 +979,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fastrand" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6999dc1837253364c2ebb0704ba97994bd874e8f195d665c50b7548f6ea92764" + [[package]] name = "flate2" version = "1.0.27" @@ -1079,7 +1086,7 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" dependencies = [ - "fastrand", + "fastrand 1.9.0", "futures-core", "futures-io", "memchr", @@ -2203,6 +2210,15 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "redox_syscall" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "567664f262709473930a4bf9e51bf2ebf3348f2e748ccc50dea20646858f8f29" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "regex" version = "1.9.3" @@ -2579,6 +2595,19 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "tempfile" +version = "3.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb94d2f3cc536af71caac6b6fcebf65860b347e7ce0cc9ebe8f70d3e521054ef" +dependencies = [ + "cfg-if", + "fastrand 2.0.0", + "redox_syscall", + "rustix", + "windows-sys 0.48.0", +] + [[package]] name = "textwrap" version = "0.16.0" diff --git a/aquatic_udp/Cargo.toml b/aquatic_udp/Cargo.toml index 59d0d0e..474ef97 100644 --- a/aquatic_udp/Cargo.toml +++ b/aquatic_udp/Cargo.toml @@ -55,5 +55,7 @@ time = { version = "0.3", features = ["formatting"] } tinytemplate = "1" [dev-dependencies] +hex = "0.4" +tempfile = "3" quickcheck = "1" quickcheck_macros = "1" diff --git a/aquatic_udp/tests/integration.rs b/aquatic_udp/tests/integration.rs index 602b2f4..947c645 100644 --- a/aquatic_udp/tests/integration.rs +++ b/aquatic_udp/tests/integration.rs @@ -1,11 +1,13 @@ use std::{ collections::{hash_map::RandomState, HashSet}, - io::{Cursor, ErrorKind}, + fs::File, + io::{Cursor, ErrorKind, Write}, net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket}, time::Duration, }; use anyhow::Context; +use aquatic_common::access_list::AccessListMode; use aquatic_udp::{common::BUFFER_SIZE, config::Config}; use aquatic_udp_protocol::{ common::PeerId, AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, InfoHash, @@ -157,6 +159,99 @@ fn test_announce_with_invalid_connection_id() -> anyhow::Result<()> { } } +#[test] +fn test_access_list_deny() -> anyhow::Result<()> { + const TRACKER_PORT: u16 = 40_113; + + let deny = InfoHash([0; 20]); + let allow = InfoHash([1; 20]); + + test_access_list(TRACKER_PORT, allow, deny, deny, AccessListMode::Deny)?; + + Ok(()) +} + +#[test] +fn test_access_list_allow() -> anyhow::Result<()> { + const TRACKER_PORT: u16 = 40_114; + + let allow = InfoHash([0; 20]); + let deny = InfoHash([1; 20]); + + test_access_list(TRACKER_PORT, allow, deny, allow, AccessListMode::Allow)?; + + Ok(()) +} + +fn test_access_list( + tracker_port: u16, + info_hash_success: InfoHash, + info_hash_fail: InfoHash, + info_hash_in_list: InfoHash, + mode: AccessListMode, +) -> anyhow::Result<()> { + let access_list_dir = tempfile::tempdir().with_context(|| "get temporary directory")?; + let access_list_path = access_list_dir.path().join("access-list.txt"); + + let mut access_list_file = + File::create(&access_list_path).with_context(|| "create access list file")?; + writeln!( + access_list_file, + "{}", + hex::encode_upper(info_hash_in_list.0) + ) + .with_context(|| "write to access list file")?; + + let mut config = Config::default(); + + config.network.address.set_port(tracker_port); + + config.access_list.mode = mode; + config.access_list.path = access_list_path; + + run_tracker(config); + + let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, tracker_port)); + let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); + + let socket = UdpSocket::bind(peer_addr)?; + socket.set_read_timeout(Some(Duration::from_secs(1)))?; + + let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; + + let response = announce( + &socket, + tracker_addr, + connection_id, + 1, + info_hash_fail, + 10, + false, + ) + .with_context(|| "announce")?; + + assert!( + matches!(response, Response::Error(_)), + "response should be error but is {:?}", + response + ); + + let response = announce( + &socket, + tracker_addr, + connection_id, + 1, + info_hash_success, + 10, + false, + ) + .with_context(|| "announce")?; + + assert!(matches!(response, Response::AnnounceIpv4(_))); + + Ok(()) +} + // FIXME: should ideally try different ports and use sync primitives to find // out if tracker was successfully started fn run_tracker(config: Config) { From e5a986eeec74ca4e23421e7d5f823babbbb4e769 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Sun, 27 Aug 2023 19:17:09 +0200 Subject: [PATCH 6/6] udp: split integration tests into separate files --- aquatic_udp/tests/access_list.rs | 108 ++++++ aquatic_udp/tests/common/mod.rs | 123 +++++++ aquatic_udp/tests/integration.rs | 361 --------------------- aquatic_udp/tests/invalid_connection_id.rs | 74 +++++ aquatic_udp/tests/requests_responses.rs | 99 ++++++ 5 files changed, 404 insertions(+), 361 deletions(-) create mode 100644 aquatic_udp/tests/access_list.rs create mode 100644 aquatic_udp/tests/common/mod.rs delete mode 100644 aquatic_udp/tests/integration.rs create mode 100644 aquatic_udp/tests/invalid_connection_id.rs create mode 100644 aquatic_udp/tests/requests_responses.rs diff --git a/aquatic_udp/tests/access_list.rs b/aquatic_udp/tests/access_list.rs new file mode 100644 index 0000000..cd0ec79 --- /dev/null +++ b/aquatic_udp/tests/access_list.rs @@ -0,0 +1,108 @@ +mod common; + +use common::*; + +use std::{ + fs::File, + io::Write, + net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket}, + time::Duration, +}; + +use anyhow::Context; +use aquatic_common::access_list::AccessListMode; +use aquatic_udp::config::Config; +use aquatic_udp_protocol::{InfoHash, Response}; + +#[test] +fn test_access_list_deny() -> anyhow::Result<()> { + const TRACKER_PORT: u16 = 40_113; + + let deny = InfoHash([0; 20]); + let allow = InfoHash([1; 20]); + + test_access_list(TRACKER_PORT, allow, deny, deny, AccessListMode::Deny)?; + + Ok(()) +} + +#[test] +fn test_access_list_allow() -> anyhow::Result<()> { + const TRACKER_PORT: u16 = 40_114; + + let allow = InfoHash([0; 20]); + let deny = InfoHash([1; 20]); + + test_access_list(TRACKER_PORT, allow, deny, allow, AccessListMode::Allow)?; + + Ok(()) +} + +fn test_access_list( + tracker_port: u16, + info_hash_success: InfoHash, + info_hash_fail: InfoHash, + info_hash_in_list: InfoHash, + mode: AccessListMode, +) -> anyhow::Result<()> { + let access_list_dir = tempfile::tempdir().with_context(|| "get temporary directory")?; + let access_list_path = access_list_dir.path().join("access-list.txt"); + + let mut access_list_file = + File::create(&access_list_path).with_context(|| "create access list file")?; + writeln!( + access_list_file, + "{}", + hex::encode_upper(info_hash_in_list.0) + ) + .with_context(|| "write to access list file")?; + + let mut config = Config::default(); + + config.network.address.set_port(tracker_port); + + config.access_list.mode = mode; + config.access_list.path = access_list_path; + + run_tracker(config); + + let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, tracker_port)); + let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); + + let socket = UdpSocket::bind(peer_addr)?; + socket.set_read_timeout(Some(Duration::from_secs(1)))?; + + let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; + + let response = announce( + &socket, + tracker_addr, + connection_id, + 1, + info_hash_fail, + 10, + false, + ) + .with_context(|| "announce")?; + + assert!( + matches!(response, Response::Error(_)), + "response should be error but is {:?}", + response + ); + + let response = announce( + &socket, + tracker_addr, + connection_id, + 1, + info_hash_success, + 10, + false, + ) + .with_context(|| "announce")?; + + assert!(matches!(response, Response::AnnounceIpv4(_))); + + Ok(()) +} diff --git a/aquatic_udp/tests/common/mod.rs b/aquatic_udp/tests/common/mod.rs new file mode 100644 index 0000000..f981e56 --- /dev/null +++ b/aquatic_udp/tests/common/mod.rs @@ -0,0 +1,123 @@ +#![allow(dead_code)] + +use std::{ + io::Cursor, + net::{SocketAddr, UdpSocket}, + time::Duration, +}; + +use anyhow::Context; +use aquatic_udp::{common::BUFFER_SIZE, config::Config}; +use aquatic_udp_protocol::{ + common::PeerId, AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, InfoHash, + NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, ScrapeRequest, ScrapeResponse, + TransactionId, +}; + +// FIXME: should ideally try different ports and use sync primitives to find +// out if tracker was successfully started +pub fn run_tracker(config: Config) { + ::std::thread::spawn(move || { + aquatic_udp::run(config).unwrap(); + }); + + ::std::thread::sleep(Duration::from_secs(1)); +} + +pub fn connect(socket: &UdpSocket, tracker_addr: SocketAddr) -> anyhow::Result { + let request = Request::Connect(ConnectRequest { + transaction_id: TransactionId(0), + }); + + let response = request_and_response(&socket, tracker_addr, request)?; + + if let Response::Connect(response) = response { + Ok(response.connection_id) + } else { + Err(anyhow::anyhow!("not connect response: {:?}", response)) + } +} + +pub fn announce( + socket: &UdpSocket, + tracker_addr: SocketAddr, + connection_id: ConnectionId, + peer_port: u16, + info_hash: InfoHash, + peers_wanted: usize, + seeder: bool, +) -> anyhow::Result { + let mut peer_id = PeerId([0; 20]); + + for chunk in peer_id.0.chunks_exact_mut(2) { + chunk.copy_from_slice(&peer_port.to_ne_bytes()); + } + + let request = Request::Announce(AnnounceRequest { + connection_id, + transaction_id: TransactionId(0), + info_hash, + peer_id, + bytes_downloaded: NumberOfBytes(0), + bytes_uploaded: NumberOfBytes(0), + bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }), + event: AnnounceEvent::Started, + ip_address: None, + key: PeerKey(0), + peers_wanted: NumberOfPeers(peers_wanted as i32), + port: Port(peer_port), + }); + + Ok(request_and_response(&socket, tracker_addr, request)?) +} + +pub fn scrape( + socket: &UdpSocket, + tracker_addr: SocketAddr, + connection_id: ConnectionId, + info_hashes: Vec, +) -> anyhow::Result { + let request = Request::Scrape(ScrapeRequest { + connection_id, + transaction_id: TransactionId(0), + info_hashes, + }); + + let response = request_and_response(&socket, tracker_addr, request)?; + + if let Response::Scrape(response) = response { + Ok(response) + } else { + return Err(anyhow::anyhow!("not scrape response: {:?}", response)); + } +} + +pub fn request_and_response( + socket: &UdpSocket, + tracker_addr: SocketAddr, + request: Request, +) -> anyhow::Result { + let mut buffer = [0u8; BUFFER_SIZE]; + + { + let mut buffer = Cursor::new(&mut buffer[..]); + + request + .write(&mut buffer) + .with_context(|| "write request")?; + + let bytes_written = buffer.position() as usize; + + socket + .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) + .with_context(|| "send request")?; + } + + { + let (bytes_read, _) = socket + .recv_from(&mut buffer) + .with_context(|| "recv response")?; + + Ok(Response::from_bytes(&buffer[..bytes_read], true).with_context(|| "parse response")?) + } +} diff --git a/aquatic_udp/tests/integration.rs b/aquatic_udp/tests/integration.rs deleted file mode 100644 index 947c645..0000000 --- a/aquatic_udp/tests/integration.rs +++ /dev/null @@ -1,361 +0,0 @@ -use std::{ - collections::{hash_map::RandomState, HashSet}, - fs::File, - io::{Cursor, ErrorKind, Write}, - net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket}, - time::Duration, -}; - -use anyhow::Context; -use aquatic_common::access_list::AccessListMode; -use aquatic_udp::{common::BUFFER_SIZE, config::Config}; -use aquatic_udp_protocol::{ - common::PeerId, AnnounceEvent, AnnounceRequest, ConnectRequest, ConnectionId, InfoHash, - NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, ScrapeRequest, ScrapeResponse, - TransactionId, -}; - -#[test] -fn test_multiple_connect_announce_scrape() -> anyhow::Result<()> { - const TRACKER_PORT: u16 = 40_111; - const PEER_PORT_START: u16 = 30_000; - const PEERS_WANTED: usize = 10; - - let mut config = Config::default(); - - config.network.address.set_port(TRACKER_PORT); - - run_tracker(config); - - let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TRACKER_PORT)); - let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); - - let info_hash = InfoHash([0; 20]); - - let mut num_seeders = 0; - let mut num_leechers = 0; - - for i in 0..20 { - let is_seeder = i % 3 == 0; - - if is_seeder { - num_seeders += 1; - } else { - num_leechers += 1; - } - - let socket = UdpSocket::bind(peer_addr)?; - socket.set_read_timeout(Some(Duration::from_secs(1)))?; - - let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; - - let announce_response = { - let response = announce( - &socket, - tracker_addr, - connection_id, - PEER_PORT_START + i as u16, - info_hash, - PEERS_WANTED, - is_seeder, - ) - .with_context(|| "announce")?; - - if let Response::AnnounceIpv4(response) = response { - response - } else { - return Err(anyhow::anyhow!("not announce response: {:?}", response)); - } - }; - - assert_eq!(announce_response.peers.len(), i.min(PEERS_WANTED)); - - assert_eq!(announce_response.seeders.0, num_seeders); - assert_eq!(announce_response.leechers.0, num_leechers); - - let response_peer_ports: HashSet = - HashSet::from_iter(announce_response.peers.iter().map(|p| p.port.0)); - let expected_peer_ports: HashSet = - HashSet::from_iter((0..i).map(|i| PEER_PORT_START + i as u16)); - - if i > PEERS_WANTED { - assert!(response_peer_ports.is_subset(&expected_peer_ports)); - } else { - assert_eq!(response_peer_ports, expected_peer_ports); - } - - let scrape_response = scrape( - &socket, - tracker_addr, - connection_id, - vec![info_hash, InfoHash([1; 20])], - ) - .with_context(|| "scrape")?; - - assert_eq!(scrape_response.torrent_stats[0].seeders.0, num_seeders); - assert_eq!(scrape_response.torrent_stats[0].leechers.0, num_leechers); - assert_eq!(scrape_response.torrent_stats[1].seeders.0, 0); - assert_eq!(scrape_response.torrent_stats[1].leechers.0, 0); - } - - Ok(()) -} - -#[test] -fn test_announce_with_invalid_connection_id() -> anyhow::Result<()> { - const TRACKER_PORT: u16 = 40_112; - - let mut config = Config::default(); - - config.network.address.set_port(TRACKER_PORT); - - run_tracker(config); - - let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TRACKER_PORT)); - let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); - - let socket = UdpSocket::bind(peer_addr)?; - - socket.set_read_timeout(Some(Duration::from_secs(1)))?; - - // Make sure that the tracker in fact responds to requests - let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; - - let mut buffer = [0u8; BUFFER_SIZE]; - - { - let mut buffer = Cursor::new(&mut buffer[..]); - - let request = Request::Announce(AnnounceRequest { - connection_id: ConnectionId(!connection_id.0), - transaction_id: TransactionId(0), - info_hash: InfoHash([0; 20]), - peer_id: PeerId([0; 20]), - bytes_downloaded: NumberOfBytes(0), - bytes_uploaded: NumberOfBytes(0), - bytes_left: NumberOfBytes(0), - event: AnnounceEvent::Started, - ip_address: None, - key: PeerKey(0), - peers_wanted: NumberOfPeers(-1), - port: Port(1), - }); - - request - .write(&mut buffer) - .with_context(|| "write request")?; - - let bytes_written = buffer.position() as usize; - - socket - .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) - .with_context(|| "send request")?; - } - - match socket.recv_from(&mut buffer) { - Ok(_) => Err(anyhow::anyhow!("received response")), - Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(()), - Err(err) => Err(err.into()), - } -} - -#[test] -fn test_access_list_deny() -> anyhow::Result<()> { - const TRACKER_PORT: u16 = 40_113; - - let deny = InfoHash([0; 20]); - let allow = InfoHash([1; 20]); - - test_access_list(TRACKER_PORT, allow, deny, deny, AccessListMode::Deny)?; - - Ok(()) -} - -#[test] -fn test_access_list_allow() -> anyhow::Result<()> { - const TRACKER_PORT: u16 = 40_114; - - let allow = InfoHash([0; 20]); - let deny = InfoHash([1; 20]); - - test_access_list(TRACKER_PORT, allow, deny, allow, AccessListMode::Allow)?; - - Ok(()) -} - -fn test_access_list( - tracker_port: u16, - info_hash_success: InfoHash, - info_hash_fail: InfoHash, - info_hash_in_list: InfoHash, - mode: AccessListMode, -) -> anyhow::Result<()> { - let access_list_dir = tempfile::tempdir().with_context(|| "get temporary directory")?; - let access_list_path = access_list_dir.path().join("access-list.txt"); - - let mut access_list_file = - File::create(&access_list_path).with_context(|| "create access list file")?; - writeln!( - access_list_file, - "{}", - hex::encode_upper(info_hash_in_list.0) - ) - .with_context(|| "write to access list file")?; - - let mut config = Config::default(); - - config.network.address.set_port(tracker_port); - - config.access_list.mode = mode; - config.access_list.path = access_list_path; - - run_tracker(config); - - let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, tracker_port)); - let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); - - let socket = UdpSocket::bind(peer_addr)?; - socket.set_read_timeout(Some(Duration::from_secs(1)))?; - - let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; - - let response = announce( - &socket, - tracker_addr, - connection_id, - 1, - info_hash_fail, - 10, - false, - ) - .with_context(|| "announce")?; - - assert!( - matches!(response, Response::Error(_)), - "response should be error but is {:?}", - response - ); - - let response = announce( - &socket, - tracker_addr, - connection_id, - 1, - info_hash_success, - 10, - false, - ) - .with_context(|| "announce")?; - - assert!(matches!(response, Response::AnnounceIpv4(_))); - - Ok(()) -} - -// FIXME: should ideally try different ports and use sync primitives to find -// out if tracker was successfully started -fn run_tracker(config: Config) { - ::std::thread::spawn(move || { - aquatic_udp::run(config).unwrap(); - }); - - ::std::thread::sleep(Duration::from_secs(1)); -} - -fn connect(socket: &UdpSocket, tracker_addr: SocketAddr) -> anyhow::Result { - let request = Request::Connect(ConnectRequest { - transaction_id: TransactionId(0), - }); - - let response = request_and_response(&socket, tracker_addr, request)?; - - if let Response::Connect(response) = response { - Ok(response.connection_id) - } else { - Err(anyhow::anyhow!("not connect response: {:?}", response)) - } -} - -fn announce( - socket: &UdpSocket, - tracker_addr: SocketAddr, - connection_id: ConnectionId, - peer_port: u16, - info_hash: InfoHash, - peers_wanted: usize, - seeder: bool, -) -> anyhow::Result { - let mut peer_id = PeerId([0; 20]); - - for chunk in peer_id.0.chunks_exact_mut(2) { - chunk.copy_from_slice(&peer_port.to_ne_bytes()); - } - - let request = Request::Announce(AnnounceRequest { - connection_id, - transaction_id: TransactionId(0), - info_hash, - peer_id, - bytes_downloaded: NumberOfBytes(0), - bytes_uploaded: NumberOfBytes(0), - bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }), - event: AnnounceEvent::Started, - ip_address: None, - key: PeerKey(0), - peers_wanted: NumberOfPeers(peers_wanted as i32), - port: Port(peer_port), - }); - - Ok(request_and_response(&socket, tracker_addr, request)?) -} - -fn scrape( - socket: &UdpSocket, - tracker_addr: SocketAddr, - connection_id: ConnectionId, - info_hashes: Vec, -) -> anyhow::Result { - let request = Request::Scrape(ScrapeRequest { - connection_id, - transaction_id: TransactionId(0), - info_hashes, - }); - - let response = request_and_response(&socket, tracker_addr, request)?; - - if let Response::Scrape(response) = response { - Ok(response) - } else { - return Err(anyhow::anyhow!("not scrape response: {:?}", response)); - } -} - -fn request_and_response( - socket: &UdpSocket, - tracker_addr: SocketAddr, - request: Request, -) -> anyhow::Result { - let mut buffer = [0u8; BUFFER_SIZE]; - - { - let mut buffer = Cursor::new(&mut buffer[..]); - - request - .write(&mut buffer) - .with_context(|| "write request")?; - - let bytes_written = buffer.position() as usize; - - socket - .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) - .with_context(|| "send request")?; - } - - { - let (bytes_read, _) = socket - .recv_from(&mut buffer) - .with_context(|| "recv response")?; - - Ok(Response::from_bytes(&buffer[..bytes_read], true).with_context(|| "parse response")?) - } -} diff --git a/aquatic_udp/tests/invalid_connection_id.rs b/aquatic_udp/tests/invalid_connection_id.rs new file mode 100644 index 0000000..e2d211c --- /dev/null +++ b/aquatic_udp/tests/invalid_connection_id.rs @@ -0,0 +1,74 @@ +mod common; + +use common::*; + +use std::{ + io::{Cursor, ErrorKind}, + net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket}, + time::Duration, +}; + +use anyhow::Context; +use aquatic_udp::{common::BUFFER_SIZE, config::Config}; +use aquatic_udp_protocol::{ + common::PeerId, AnnounceEvent, AnnounceRequest, ConnectionId, InfoHash, NumberOfBytes, + NumberOfPeers, PeerKey, Port, Request, TransactionId, +}; + +#[test] +fn test_announce_with_invalid_connection_id() -> anyhow::Result<()> { + const TRACKER_PORT: u16 = 40_112; + + let mut config = Config::default(); + + config.network.address.set_port(TRACKER_PORT); + + run_tracker(config); + + let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TRACKER_PORT)); + let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); + + let socket = UdpSocket::bind(peer_addr)?; + + socket.set_read_timeout(Some(Duration::from_secs(1)))?; + + // Make sure that the tracker in fact responds to requests + let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; + + let mut buffer = [0u8; BUFFER_SIZE]; + + { + let mut buffer = Cursor::new(&mut buffer[..]); + + let request = Request::Announce(AnnounceRequest { + connection_id: ConnectionId(!connection_id.0), + transaction_id: TransactionId(0), + info_hash: InfoHash([0; 20]), + peer_id: PeerId([0; 20]), + bytes_downloaded: NumberOfBytes(0), + bytes_uploaded: NumberOfBytes(0), + bytes_left: NumberOfBytes(0), + event: AnnounceEvent::Started, + ip_address: None, + key: PeerKey(0), + peers_wanted: NumberOfPeers(-1), + port: Port(1), + }); + + request + .write(&mut buffer) + .with_context(|| "write request")?; + + let bytes_written = buffer.position() as usize; + + socket + .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) + .with_context(|| "send request")?; + } + + match socket.recv_from(&mut buffer) { + Ok(_) => Err(anyhow::anyhow!("received response")), + Err(err) if err.kind() == ErrorKind::WouldBlock => Ok(()), + Err(err) => Err(err.into()), + } +} diff --git a/aquatic_udp/tests/requests_responses.rs b/aquatic_udp/tests/requests_responses.rs new file mode 100644 index 0000000..c84f0f8 --- /dev/null +++ b/aquatic_udp/tests/requests_responses.rs @@ -0,0 +1,99 @@ +mod common; + +use common::*; + +use std::{ + collections::{hash_map::RandomState, HashSet}, + net::{Ipv4Addr, SocketAddr, SocketAddrV4, UdpSocket}, + time::Duration, +}; + +use anyhow::Context; +use aquatic_udp::config::Config; +use aquatic_udp_protocol::{InfoHash, Response}; + +#[test] +fn test_multiple_connect_announce_scrape() -> anyhow::Result<()> { + const TRACKER_PORT: u16 = 40_111; + const PEER_PORT_START: u16 = 30_000; + const PEERS_WANTED: usize = 10; + + let mut config = Config::default(); + + config.network.address.set_port(TRACKER_PORT); + + run_tracker(config); + + let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TRACKER_PORT)); + let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); + + let info_hash = InfoHash([0; 20]); + + let mut num_seeders = 0; + let mut num_leechers = 0; + + for i in 0..20 { + let is_seeder = i % 3 == 0; + + if is_seeder { + num_seeders += 1; + } else { + num_leechers += 1; + } + + let socket = UdpSocket::bind(peer_addr)?; + socket.set_read_timeout(Some(Duration::from_secs(1)))?; + + let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; + + let announce_response = { + let response = announce( + &socket, + tracker_addr, + connection_id, + PEER_PORT_START + i as u16, + info_hash, + PEERS_WANTED, + is_seeder, + ) + .with_context(|| "announce")?; + + if let Response::AnnounceIpv4(response) = response { + response + } else { + return Err(anyhow::anyhow!("not announce response: {:?}", response)); + } + }; + + assert_eq!(announce_response.peers.len(), i.min(PEERS_WANTED)); + + assert_eq!(announce_response.seeders.0, num_seeders); + assert_eq!(announce_response.leechers.0, num_leechers); + + let response_peer_ports: HashSet = + HashSet::from_iter(announce_response.peers.iter().map(|p| p.port.0)); + let expected_peer_ports: HashSet = + HashSet::from_iter((0..i).map(|i| PEER_PORT_START + i as u16)); + + if i > PEERS_WANTED { + assert!(response_peer_ports.is_subset(&expected_peer_ports)); + } else { + assert_eq!(response_peer_ports, expected_peer_ports); + } + + let scrape_response = scrape( + &socket, + tracker_addr, + connection_id, + vec![info_hash, InfoHash([1; 20])], + ) + .with_context(|| "scrape")?; + + assert_eq!(scrape_response.torrent_stats[0].seeders.0, num_seeders); + assert_eq!(scrape_response.torrent_stats[0].leechers.0, num_leechers); + assert_eq!(scrape_response.torrent_stats[1].seeders.0, 0); + assert_eq!(scrape_response.torrent_stats[1].leechers.0, 0); + } + + Ok(()) +}