From 4067e420c31fb7397763e6e7e64fd6a1f738e34c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Joakim=20Frosteg=C3=A5rd?= Date: Fri, 31 Jul 2020 06:27:05 +0200 Subject: [PATCH] udp: response parsing: parse ipv6 peers on action 4 --- aquatic_udp_load_test/src/network.rs | 9 +-- .../src/converters/responses.rs | 67 +++++++++++-------- 2 files changed, 40 insertions(+), 36 deletions(-) diff --git a/aquatic_udp_load_test/src/network.rs b/aquatic_udp_load_test/src/network.rs index fc1c60f..8fdde03 100644 --- a/aquatic_udp_load_test/src/network.rs +++ b/aquatic_udp_load_test/src/network.rs @@ -60,11 +60,6 @@ pub fn run_socket_thread( let mut socket = UdpSocket::from_std(create_socket(config, addr)); let mut buffer = [0u8; MAX_PACKET_SIZE]; - let ip_version = match config.server_address { - SocketAddr::V4(_) => IpVersion::IPv4, - SocketAddr::V6(_) => IpVersion::IPv6, - }; - let token = Token(thread_id.0 as usize); let interests = Interest::READABLE; let timeout = Duration::from_micros(config.network.poll_timeout); @@ -89,7 +84,6 @@ pub fn run_socket_thread( if event.is_readable(){ read_responses( thread_id, - ip_version, &socket, &mut buffer, &mut local_state, @@ -132,14 +126,13 @@ pub fn run_socket_thread( fn read_responses( thread_id: ThreadId, - ip_version: IpVersion, socket: &UdpSocket, buffer: &mut [u8], ls: &mut SocketWorkerLocalStatistics, responses: &mut Vec<(ThreadId, Response)>, ){ while let Ok(amt) = socket.recv(buffer) { - match response_from_bytes(&buffer[0..amt], ip_version){ + match response_from_bytes(&buffer[0..amt]){ Ok(response) => { match response { Response::Announce(ref r) => { diff --git a/aquatic_udp_protocol/src/converters/responses.rs b/aquatic_udp_protocol/src/converters/responses.rs index 322bf3f..f80caaa 100644 --- a/aquatic_udp_protocol/src/converters/responses.rs +++ b/aquatic_udp_protocol/src/converters/responses.rs @@ -79,10 +79,7 @@ pub fn response_to_bytes( #[inline] -pub fn response_from_bytes( - bytes: &[u8], - ip_version: IpVersion, -) -> Result { +pub fn response_from_bytes(bytes: &[u8]) -> Result { let mut cursor = Cursor::new(bytes); let action = cursor.read_i32::()?; @@ -107,29 +104,16 @@ pub fn response_from_bytes( let position = cursor.position() as usize; let inner = cursor.into_inner(); - let peers = if ip_version == IpVersion::IPv4 { - inner[position..].chunks_exact(6).map(|chunk| { - let ip_bytes: [u8; 4] = (&chunk[..4]).try_into().unwrap(); - let ip_address = IpAddr::V4(Ipv4Addr::from(ip_bytes)); - let port = (&chunk[4..]).read_u16::().unwrap(); + let peers = inner[position..].chunks_exact(6).map(|chunk| { + let ip_bytes: [u8; 4] = (&chunk[..4]).try_into().unwrap(); + let ip_address = IpAddr::V4(Ipv4Addr::from(ip_bytes)); + let port = (&chunk[4..]).read_u16::().unwrap(); - ResponsePeer { - ip_address, - port: Port(port), - } - }).collect() - } else { - inner[position..].chunks_exact(18).map(|chunk| { - let ip_bytes: [u8; 16] = (&chunk[..16]).try_into().unwrap(); - let ip_address = IpAddr::V6(Ipv6Addr::from(ip_bytes)); - let port = (&chunk[16..]).read_u16::().unwrap(); - - ResponsePeer { - ip_address, - port: Port(port), - } - }).collect() - }; + ResponsePeer { + ip_address, + port: Port(port), + } + }).collect(); Ok((AnnounceResponse { transaction_id: TransactionId(transaction_id), @@ -138,7 +122,6 @@ pub fn response_from_bytes( seeders: NumberOfPeers(seeders), peers }).into()) - }, // Scrape 2 => { @@ -174,6 +157,34 @@ pub fn response_from_bytes( message: String::from_utf8_lossy(&inner[position..]).into() }).into()) }, + // IPv6 announce + 4 => { + let announce_interval = cursor.read_i32::()?; + let leechers = cursor.read_i32::()?; + let seeders = cursor.read_i32::()?; + + let position = cursor.position() as usize; + let inner = cursor.into_inner(); + + let peers = inner[position..].chunks_exact(18).map(|chunk| { + let ip_bytes: [u8; 16] = (&chunk[..16]).try_into().unwrap(); + let ip_address = IpAddr::V6(Ipv6Addr::from(ip_bytes)); + let port = (&chunk[16..]).read_u16::().unwrap(); + + ResponsePeer { + ip_address, + port: Port(port), + } + }).collect(); + + Ok((AnnounceResponse { + transaction_id: TransactionId(transaction_id), + announce_interval: AnnounceInterval(announce_interval), + leechers: NumberOfPeers(leechers), + seeders: NumberOfPeers(seeders), + peers + }).into()) + }, _ => { Ok((ErrorResponse { transaction_id: TransactionId(transaction_id), @@ -195,7 +206,7 @@ mod tests { let mut buf = Vec::new(); response_to_bytes(&mut buf, response.clone(), ip_version).unwrap(); - let r2 = response_from_bytes(&buf[..], ip_version).unwrap(); + let r2 = response_from_bytes(&buf[..]).unwrap(); let success = response == r2;