diff --git a/aquatic_udp/tests/integration.rs b/aquatic_udp/tests/integration.rs index a413d88..147294a 100644 --- a/aquatic_udp/tests/integration.rs +++ b/aquatic_udp/tests/integration.rs @@ -8,19 +8,26 @@ use std::{ use anyhow::Context; use aquatic_udp::{common::BUFFER_SIZE, config::Config}; use aquatic_udp_protocol::{ - common::PeerId, AnnounceEvent, AnnounceRequest, AnnounceResponse, ConnectRequest, InfoHash, - NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, TransactionId, + common::PeerId, AnnounceEvent, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectionId, + InfoHash, NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, ScrapeRequest, + ScrapeResponse, TransactionId, }; -const PEER_PORT_START: u16 = 30_000; -const TRACKER_PORT: u16 = 40_111; const PEERS_WANTED: usize = 10; #[test] -fn test_multiple_connect_and_announce() { - let config = Config::default(); +fn test_multiple_connect_announce_scrape() -> anyhow::Result<()> { + const TRACKER_PORT: u16 = 40_111; + const PEER_PORT_START: u16 = 30_000; - let tracker_port = run_tracker(config); + let mut config = Config::default(); + + config.network.address.set_port(TRACKER_PORT); + + run_tracker(config); + + let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TRACKER_PORT)); + let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); let info_hash = InfoHash([0; 20]); @@ -36,25 +43,28 @@ fn test_multiple_connect_and_announce() { num_leechers += 1; } - let response = match connect_and_announce( - tracker_port, + let socket = UdpSocket::bind(peer_addr)?; + socket.set_read_timeout(Some(Duration::from_secs(1)))?; + + let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?; + + let announce_response = announce( + &socket, + tracker_addr, + connection_id, PEER_PORT_START + i as u16, info_hash, is_seeder, - ) { - Ok(response) => response, - Err(err) => { - panic!("connect_and_announce failed: {:#}", err); - } - }; + ) + .with_context(|| "announce")?; - assert_eq!(response.peers.len(), i.min(PEERS_WANTED)); + assert_eq!(announce_response.peers.len(), i.min(PEERS_WANTED)); - assert_eq!(response.seeders.0, num_seeders); - assert_eq!(response.leechers.0, num_leechers); + assert_eq!(announce_response.seeders.0, num_seeders); + assert_eq!(announce_response.leechers.0, num_leechers); let response_peer_ports: HashSet = - HashSet::from_iter(response.peers.iter().map(|p| p.port.0)); + HashSet::from_iter(announce_response.peers.iter().map(|p| p.port.0)); let expected_peer_ports: HashSet = HashSet::from_iter((0..i).map(|i| PEER_PORT_START + i as u16)); @@ -63,113 +73,133 @@ fn test_multiple_connect_and_announce() { } else { assert_eq!(response_peer_ports, expected_peer_ports); } + + let scrape_response = scrape( + &socket, + tracker_addr, + connection_id, + vec![info_hash, InfoHash([1; 20])], + ) + .with_context(|| "scrape")?; + + assert_eq!(scrape_response.torrent_stats[0].seeders.0, num_seeders); + assert_eq!(scrape_response.torrent_stats[0].leechers.0, num_leechers); + assert_eq!(scrape_response.torrent_stats[1].seeders.0, 0); + assert_eq!(scrape_response.torrent_stats[1].leechers.0, 0); } + + Ok(()) } // FIXME: should ideally try different ports and use sync primitives to find // out if tracker was successfully started -fn run_tracker(mut config: Config) -> u16 { - let port = TRACKER_PORT; - - config.network.address.set_port(port); - +fn run_tracker(config: Config) { ::std::thread::spawn(move || { aquatic_udp::run(config).unwrap(); }); ::std::thread::sleep(Duration::from_secs(1)); - - port } -fn connect_and_announce( - tracker_port: u16, +fn connect(socket: &UdpSocket, tracker_addr: SocketAddr) -> anyhow::Result { + let request = Request::Connect(ConnectRequest { + transaction_id: TransactionId(0), + }); + + let response = request_and_response(&socket, tracker_addr, request)?; + + if let Response::Connect(response) = response { + Ok(response.connection_id) + } else { + Err(anyhow::anyhow!("not connect response: {:?}", response)) + } +} + +fn announce( + socket: &UdpSocket, + tracker_addr: SocketAddr, + connection_id: ConnectionId, peer_port: u16, info_hash: InfoHash, seeder: bool, ) -> anyhow::Result> { - let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); - let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, tracker_port)); + let mut peer_id = PeerId([0; 20]); - let socket = UdpSocket::bind(peer_addr)?; - let mut buffer = [0u8; BUFFER_SIZE]; - - socket.set_read_timeout(Some(Duration::from_secs(1)))?; - - { - let mut buffer = Cursor::new(&mut buffer[..]); - - Request::Connect(ConnectRequest { - transaction_id: TransactionId(0), - }) - .write(&mut buffer) - .with_context(|| "write connect request")?; - - let bytes_written = buffer.position() as usize; - - socket - .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) - .with_context(|| "send connect request")?; + for chunk in peer_id.0.chunks_exact_mut(2) { + chunk.copy_from_slice(&peer_port.to_ne_bytes()); } - let connection_id = { - let (bytes_read, _) = socket - .recv_from(&mut buffer) - .with_context(|| "recv connect response")?; + let request = Request::Announce(AnnounceRequest { + connection_id, + transaction_id: TransactionId(0), + info_hash, + peer_id, + bytes_downloaded: NumberOfBytes(0), + bytes_uploaded: NumberOfBytes(0), + bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }), + event: AnnounceEvent::Started, + ip_address: None, + key: PeerKey(0), + peers_wanted: NumberOfPeers(PEERS_WANTED as i32), + port: Port(peer_port), + }); - if let Response::Connect(response) = Response::from_bytes(&buffer[..bytes_read], true) - .with_context(|| "parse connect response")? - { - response.connection_id - } else { - panic!("Not connect response"); - } - }; + let response = request_and_response(&socket, tracker_addr, request)?; - { - let mut buffer = Cursor::new(&mut buffer[..]); - - let mut peer_id = PeerId([0; 20]); - - for chunk in peer_id.0.chunks_exact_mut(2) { - chunk.copy_from_slice(&peer_port.to_ne_bytes()); - } - - Request::Announce(AnnounceRequest { - connection_id, - transaction_id: TransactionId(0), - info_hash, - peer_id, - bytes_downloaded: NumberOfBytes(0), - bytes_uploaded: NumberOfBytes(0), - bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }), - event: AnnounceEvent::Started, - ip_address: None, - key: PeerKey(0), - peers_wanted: NumberOfPeers(PEERS_WANTED as i32), - port: Port(peer_port), - }) - .write(&mut buffer) - .with_context(|| "write announce request")?; - - let bytes_written = buffer.position() as usize; - - socket - .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) - .with_context(|| "send announce request")?; - } - - { - let (bytes_read, _) = socket - .recv_from(&mut buffer) - .with_context(|| "recv announce response")?; - - if let Response::AnnounceIpv4(response) = Response::from_bytes(&buffer[..bytes_read], true) - .with_context(|| "parse announce response")? - { - Ok(response) - } else { - panic!("Not announce response"); - } + if let Response::AnnounceIpv4(response) = response { + Ok(response) + } else { + return Err(anyhow::anyhow!("not announce response: {:?}", response)); + } +} + +fn scrape( + socket: &UdpSocket, + tracker_addr: SocketAddr, + connection_id: ConnectionId, + info_hashes: Vec, +) -> anyhow::Result { + let request = Request::Scrape(ScrapeRequest { + connection_id, + transaction_id: TransactionId(0), + info_hashes, + }); + + let response = request_and_response(&socket, tracker_addr, request)?; + + if let Response::Scrape(response) = response { + Ok(response) + } else { + return Err(anyhow::anyhow!("not scrape response: {:?}", response)); + } +} + +fn request_and_response( + socket: &UdpSocket, + tracker_addr: SocketAddr, + request: Request, +) -> anyhow::Result { + let mut buffer = [0u8; BUFFER_SIZE]; + + { + let mut buffer = Cursor::new(&mut buffer[..]); + + request + .write(&mut buffer) + .with_context(|| "write request")?; + + let bytes_written = buffer.position() as usize; + + socket + .send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) + .with_context(|| "send request")?; + } + + { + let (bytes_read, _) = socket + .recv_from(&mut buffer) + .with_context(|| "recv response")?; + + Ok(Response::from_bytes(&buffer[..bytes_read], true).with_context(|| "parse response")?) } }