udp: integration test: test scrape too, refactor

This commit is contained in:
Joakim Frostegård 2023-08-27 17:54:26 +02:00
parent 20eef8677d
commit 589d45a05d

View file

@ -8,19 +8,26 @@ use std::{
use anyhow::Context; use anyhow::Context;
use aquatic_udp::{common::BUFFER_SIZE, config::Config}; use aquatic_udp::{common::BUFFER_SIZE, config::Config};
use aquatic_udp_protocol::{ use aquatic_udp_protocol::{
common::PeerId, AnnounceEvent, AnnounceRequest, AnnounceResponse, ConnectRequest, InfoHash, common::PeerId, AnnounceEvent, AnnounceRequest, AnnounceResponse, ConnectRequest, ConnectionId,
NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, TransactionId, InfoHash, NumberOfBytes, NumberOfPeers, PeerKey, Port, Request, Response, ScrapeRequest,
ScrapeResponse, TransactionId,
}; };
const PEER_PORT_START: u16 = 30_000;
const TRACKER_PORT: u16 = 40_111;
const PEERS_WANTED: usize = 10; const PEERS_WANTED: usize = 10;
#[test] #[test]
fn test_multiple_connect_and_announce() { fn test_multiple_connect_announce_scrape() -> anyhow::Result<()> {
let config = Config::default(); const TRACKER_PORT: u16 = 40_111;
const PEER_PORT_START: u16 = 30_000;
let tracker_port = run_tracker(config); let mut config = Config::default();
config.network.address.set_port(TRACKER_PORT);
run_tracker(config);
let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, TRACKER_PORT));
let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0));
let info_hash = InfoHash([0; 20]); let info_hash = InfoHash([0; 20]);
@ -36,25 +43,28 @@ fn test_multiple_connect_and_announce() {
num_leechers += 1; num_leechers += 1;
} }
let response = match connect_and_announce( let socket = UdpSocket::bind(peer_addr)?;
tracker_port, socket.set_read_timeout(Some(Duration::from_secs(1)))?;
let connection_id = connect(&socket, tracker_addr).with_context(|| "connect")?;
let announce_response = announce(
&socket,
tracker_addr,
connection_id,
PEER_PORT_START + i as u16, PEER_PORT_START + i as u16,
info_hash, info_hash,
is_seeder, is_seeder,
) { )
Ok(response) => response, .with_context(|| "announce")?;
Err(err) => {
panic!("connect_and_announce failed: {:#}", err);
}
};
assert_eq!(response.peers.len(), i.min(PEERS_WANTED)); assert_eq!(announce_response.peers.len(), i.min(PEERS_WANTED));
assert_eq!(response.seeders.0, num_seeders); assert_eq!(announce_response.seeders.0, num_seeders);
assert_eq!(response.leechers.0, num_leechers); assert_eq!(announce_response.leechers.0, num_leechers);
let response_peer_ports: HashSet<u16, RandomState> = let response_peer_ports: HashSet<u16, RandomState> =
HashSet::from_iter(response.peers.iter().map(|p| p.port.0)); HashSet::from_iter(announce_response.peers.iter().map(|p| p.port.0));
let expected_peer_ports: HashSet<u16, RandomState> = let expected_peer_ports: HashSet<u16, RandomState> =
HashSet::from_iter((0..i).map(|i| PEER_PORT_START + i as u16)); HashSet::from_iter((0..i).map(|i| PEER_PORT_START + i as u16));
@ -63,113 +73,133 @@ fn test_multiple_connect_and_announce() {
} else { } else {
assert_eq!(response_peer_ports, expected_peer_ports); assert_eq!(response_peer_ports, expected_peer_ports);
} }
let scrape_response = scrape(
&socket,
tracker_addr,
connection_id,
vec![info_hash, InfoHash([1; 20])],
)
.with_context(|| "scrape")?;
assert_eq!(scrape_response.torrent_stats[0].seeders.0, num_seeders);
assert_eq!(scrape_response.torrent_stats[0].leechers.0, num_leechers);
assert_eq!(scrape_response.torrent_stats[1].seeders.0, 0);
assert_eq!(scrape_response.torrent_stats[1].leechers.0, 0);
} }
Ok(())
} }
// FIXME: should ideally try different ports and use sync primitives to find // FIXME: should ideally try different ports and use sync primitives to find
// out if tracker was successfully started // out if tracker was successfully started
fn run_tracker(mut config: Config) -> u16 { fn run_tracker(config: Config) {
let port = TRACKER_PORT;
config.network.address.set_port(port);
::std::thread::spawn(move || { ::std::thread::spawn(move || {
aquatic_udp::run(config).unwrap(); aquatic_udp::run(config).unwrap();
}); });
::std::thread::sleep(Duration::from_secs(1)); ::std::thread::sleep(Duration::from_secs(1));
port
} }
fn connect_and_announce( fn connect(socket: &UdpSocket, tracker_addr: SocketAddr) -> anyhow::Result<ConnectionId> {
tracker_port: u16, let request = Request::Connect(ConnectRequest {
transaction_id: TransactionId(0),
});
let response = request_and_response(&socket, tracker_addr, request)?;
if let Response::Connect(response) = response {
Ok(response.connection_id)
} else {
Err(anyhow::anyhow!("not connect response: {:?}", response))
}
}
fn announce(
socket: &UdpSocket,
tracker_addr: SocketAddr,
connection_id: ConnectionId,
peer_port: u16, peer_port: u16,
info_hash: InfoHash, info_hash: InfoHash,
seeder: bool, seeder: bool,
) -> anyhow::Result<AnnounceResponse<Ipv4Addr>> { ) -> anyhow::Result<AnnounceResponse<Ipv4Addr>> {
let peer_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)); let mut peer_id = PeerId([0; 20]);
let tracker_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, tracker_port));
let socket = UdpSocket::bind(peer_addr)?; for chunk in peer_id.0.chunks_exact_mut(2) {
let mut buffer = [0u8; BUFFER_SIZE]; chunk.copy_from_slice(&peer_port.to_ne_bytes());
socket.set_read_timeout(Some(Duration::from_secs(1)))?;
{
let mut buffer = Cursor::new(&mut buffer[..]);
Request::Connect(ConnectRequest {
transaction_id: TransactionId(0),
})
.write(&mut buffer)
.with_context(|| "write connect request")?;
let bytes_written = buffer.position() as usize;
socket
.send_to(&(buffer.into_inner())[..bytes_written], tracker_addr)
.with_context(|| "send connect request")?;
} }
let connection_id = { let request = Request::Announce(AnnounceRequest {
let (bytes_read, _) = socket connection_id,
.recv_from(&mut buffer) transaction_id: TransactionId(0),
.with_context(|| "recv connect response")?; info_hash,
peer_id,
bytes_downloaded: NumberOfBytes(0),
bytes_uploaded: NumberOfBytes(0),
bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }),
event: AnnounceEvent::Started,
ip_address: None,
key: PeerKey(0),
peers_wanted: NumberOfPeers(PEERS_WANTED as i32),
port: Port(peer_port),
});
if let Response::Connect(response) = Response::from_bytes(&buffer[..bytes_read], true) let response = request_and_response(&socket, tracker_addr, request)?;
.with_context(|| "parse connect response")?
{
response.connection_id
} else {
panic!("Not connect response");
}
};
{ if let Response::AnnounceIpv4(response) = response {
let mut buffer = Cursor::new(&mut buffer[..]); Ok(response)
} else {
let mut peer_id = PeerId([0; 20]); return Err(anyhow::anyhow!("not announce response: {:?}", response));
}
for chunk in peer_id.0.chunks_exact_mut(2) { }
chunk.copy_from_slice(&peer_port.to_ne_bytes());
} fn scrape(
socket: &UdpSocket,
Request::Announce(AnnounceRequest { tracker_addr: SocketAddr,
connection_id, connection_id: ConnectionId,
transaction_id: TransactionId(0), info_hashes: Vec<InfoHash>,
info_hash, ) -> anyhow::Result<ScrapeResponse> {
peer_id, let request = Request::Scrape(ScrapeRequest {
bytes_downloaded: NumberOfBytes(0), connection_id,
bytes_uploaded: NumberOfBytes(0), transaction_id: TransactionId(0),
bytes_left: NumberOfBytes(if seeder { 0 } else { 1 }), info_hashes,
event: AnnounceEvent::Started, });
ip_address: None,
key: PeerKey(0), let response = request_and_response(&socket, tracker_addr, request)?;
peers_wanted: NumberOfPeers(PEERS_WANTED as i32),
port: Port(peer_port), if let Response::Scrape(response) = response {
}) Ok(response)
.write(&mut buffer) } else {
.with_context(|| "write announce request")?; return Err(anyhow::anyhow!("not scrape response: {:?}", response));
}
let bytes_written = buffer.position() as usize; }
socket fn request_and_response(
.send_to(&(buffer.into_inner())[..bytes_written], tracker_addr) socket: &UdpSocket,
.with_context(|| "send announce request")?; tracker_addr: SocketAddr,
} request: Request,
) -> anyhow::Result<Response> {
{ let mut buffer = [0u8; BUFFER_SIZE];
let (bytes_read, _) = socket
.recv_from(&mut buffer) {
.with_context(|| "recv announce response")?; let mut buffer = Cursor::new(&mut buffer[..]);
if let Response::AnnounceIpv4(response) = Response::from_bytes(&buffer[..bytes_read], true) request
.with_context(|| "parse announce response")? .write(&mut buffer)
{ .with_context(|| "write request")?;
Ok(response)
} else { let bytes_written = buffer.position() as usize;
panic!("Not announce response");
} socket
.send_to(&(buffer.into_inner())[..bytes_written], tracker_addr)
.with_context(|| "send request")?;
}
{
let (bytes_read, _) = socket
.recv_from(&mut buffer)
.with_context(|| "recv response")?;
Ok(Response::from_bytes(&buffer[..bytes_read], true).with_context(|| "parse response")?)
} }
} }