ws: refactor, bug fixes, improvements (#155)

- split swarm worker into two modules
- split socket worker into two modules
- keep track of which offers peers have sent and only allow matching
answers
- always clean up after closing connection
- use channel for telling connections to close
- move some logic into new ConnectionRunner struct
- use slotmap for connection reference storage
- fix double counting of error responses
- actually close connections that take too long to send responses to
- remove announced_info_hashes entry on AnnounceEvent::Stopped
This commit is contained in:
Joakim Frostegård 2023-11-09 18:06:21 +01:00 committed by GitHub
parent af9d5a55f6
commit fe5ccf6646
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1770 additions and 1583 deletions

View file

@ -125,6 +125,7 @@ impl Connection {
&self.load_test_state,
&mut self.rng,
self.peer_id,
self.send_answer.is_none(),
);
// If self.send_answer is set and request is announce request, make

View file

@ -12,6 +12,7 @@ pub fn create_random_request(
state: &LoadTestState,
rng: &mut impl Rng,
peer_id: PeerId,
announce_gen_offers: bool,
) -> InMessage {
let weights = [
config.torrents.weight_announce as u32,
@ -23,7 +24,9 @@ pub fn create_random_request(
let dist = WeightedIndex::new(&weights).expect("random request weighted index");
match items[dist.sample(rng)] {
RequestType::Announce => create_announce_request(config, state, rng, peer_id),
RequestType::Announce => {
create_announce_request(config, state, rng, peer_id, announce_gen_offers)
}
RequestType::Scrape => create_scrape_request(config, state, rng),
}
}
@ -34,6 +37,7 @@ fn create_announce_request(
state: &LoadTestState,
rng: &mut impl Rng,
peer_id: PeerId,
gen_offers: bool,
) -> InMessage {
let (event, bytes_left) = {
if rng.gen_bool(config.torrents.peer_seeder_probability) {
@ -45,17 +49,23 @@ fn create_announce_request(
let info_hash_index = select_info_hash_index(config, &state, rng);
let mut offers = Vec::with_capacity(config.torrents.offers_per_request);
let offers = if gen_offers {
let mut offers = Vec::with_capacity(config.torrents.offers_per_request);
for _ in 0..config.torrents.offers_per_request {
offers.push(AnnounceRequestOffer {
offer_id: OfferId(rng.gen()),
offer: RtcOffer {
t: RtcOfferType::Offer,
sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into()
},
})
}
for _ in 0..config.torrents.offers_per_request {
offers.push(AnnounceRequestOffer {
offer_id: OfferId(rng.gen()),
offer: RtcOffer {
t: RtcOfferType::Offer,
sdp: "abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-abcdefg-".into()
},
})
}
offers
} else {
Vec::new()
};
InMessage::AnnounceRequest(AnnounceRequest {
action: AnnounceAction::Announce,