diff --git a/aquatic_udp/src/lib/glommio/common.rs b/aquatic_udp/src/lib/glommio/common.rs index 3ab9210..d1d398c 100644 --- a/aquatic_udp/src/lib/glommio/common.rs +++ b/aquatic_udp/src/lib/glommio/common.rs @@ -20,7 +20,11 @@ pub async fn update_access_list(config: Config, access_list: Rc { if let Err(err) = access_list.borrow_mut().insert_from_line(&buf) { - ::log::error!("Couln't parse access list line '{}': {:?}", buf, err); + ::log::error!( + "Couln't parse access list line '{}': {:?}", + buf, + err + ); } } Err(err) => { @@ -32,7 +36,7 @@ pub async fn update_access_list(config: Config, access_list: Rc { ::log::error!("Couldn't open access list file: {:?}", err) } diff --git a/aquatic_udp/src/lib/glommio/handlers.rs b/aquatic_udp/src/lib/glommio/handlers.rs index adeefaa..8ccc0c9 100644 --- a/aquatic_udp/src/lib/glommio/handlers.rs +++ b/aquatic_udp/src/lib/glommio/handlers.rs @@ -19,6 +19,7 @@ pub async fn run_request_worker( config: Config, request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, + access_list: AccessList, ) { let (_, mut request_receivers) = request_mesh_builder.join(Role::Consumer).await.unwrap(); let (response_senders, _) = response_mesh_builder.join(Role::Producer).await.unwrap(); @@ -26,7 +27,7 @@ pub async fn run_request_worker( let response_senders = Rc::new(response_senders); let torrents = Rc::new(RefCell::new(TorrentMaps::default())); - let access_list = Rc::new(RefCell::new(AccessList::default())); + let access_list = Rc::new(RefCell::new(access_list)); // Periodically clean torrents and update access list TimerActionRepeat::repeat(enclose!((config, torrents, access_list) move || { diff --git a/aquatic_udp/src/lib/glommio/mod.rs b/aquatic_udp/src/lib/glommio/mod.rs index 78c6e5a..1064fd7 100644 --- a/aquatic_udp/src/lib/glommio/mod.rs +++ b/aquatic_udp/src/lib/glommio/mod.rs @@ -5,6 +5,7 @@ use std::sync::{atomic::AtomicUsize, Arc}; +use aquatic_common::access_list::AccessList; use glommio::channels::channel_mesh::MeshBuilder; use glommio::prelude::*; @@ -23,6 +24,12 @@ pub fn run(config: Config) -> anyhow::Result<()> { }); } + let access_list = if config.access_list.mode.is_on() { + AccessList::create_from_path(&config.access_list.path).expect("Load access list") + } else { + AccessList::default() + }; + let num_peers = config.socket_workers + config.request_workers; let request_mesh_builder = MeshBuilder::partial(num_peers, SHARED_CHANNEL_SIZE); @@ -37,6 +44,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); let num_bound_sockets = num_bound_sockets.clone(); + let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -50,6 +58,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { request_mesh_builder, response_mesh_builder, num_bound_sockets, + access_list, ) .await }); @@ -61,6 +70,7 @@ pub fn run(config: Config) -> anyhow::Result<()> { let config = config.clone(); let request_mesh_builder = request_mesh_builder.clone(); let response_mesh_builder = response_mesh_builder.clone(); + let access_list = access_list.clone(); let mut builder = LocalExecutorBuilder::default(); @@ -70,7 +80,13 @@ pub fn run(config: Config) -> anyhow::Result<()> { } let executor = builder.spawn(|| async move { - handlers::run_request_worker(config, request_mesh_builder, response_mesh_builder).await + handlers::run_request_worker( + config, + request_mesh_builder, + response_mesh_builder, + access_list, + ) + .await }); executors.push(executor); diff --git a/aquatic_udp/src/lib/glommio/network.rs b/aquatic_udp/src/lib/glommio/network.rs index 747e094..d66db0d 100644 --- a/aquatic_udp/src/lib/glommio/network.rs +++ b/aquatic_udp/src/lib/glommio/network.rs @@ -30,6 +30,7 @@ pub async fn run_socket_worker( request_mesh_builder: MeshBuilder<(usize, AnnounceRequest, SocketAddr), Partial>, response_mesh_builder: MeshBuilder<(AnnounceResponse, SocketAddr), Partial>, num_bound_sockets: Arc, + access_list: AccessList, ) { let (local_sender, local_receiver) = new_unbounded(); @@ -57,6 +58,7 @@ pub async fn run_socket_worker( response_consumer_index, local_sender, socket.clone(), + access_list, )) .detach(); @@ -77,6 +79,7 @@ async fn read_requests( response_consumer_index: usize, local_sender: LocalSender<(Response, SocketAddr)>, socket: Rc, + access_list: AccessList, ) { let mut rng = StdRng::from_entropy(); @@ -84,7 +87,7 @@ async fn read_requests( let max_connection_age = config.cleaning.max_connection_age; let connection_valid_until = Rc::new(RefCell::new(ValidUntil::new(max_connection_age))); - let access_list = Rc::new(RefCell::new(AccessList::default())); + let access_list = Rc::new(RefCell::new(access_list)); let connections = Rc::new(RefCell::new(ConnectionMap::default())); // Periodically update connection_valid_until