pub mod error; pub use error::Error; use gio::{ prelude::{IOStreamExt, InputStreamExt, OutputStreamExtManual}, Cancellable, FileOutputStream, IOStream, }; use glib::{object::IsA, Bytes, Priority}; /// Asynchronously move all bytes from [IOStream](https://docs.gtk.org/gio/class.IOStream.html) /// to [FileOutputStream](https://docs.gtk.org/gio/class.FileOutputStream.html) /// * require `IOStream` reference to keep `Connection` active in async thread pub fn from_stream_async( io_stream: impl IsA, file_output_stream: FileOutputStream, cancellable: Cancellable, priority: Priority, (chunk, limit, mut total): ( usize, // bytes_in_chunk Option, // bytes_total_limit, `None` to unlimited usize, // bytes_total ), (on_chunk, on_complete): ( impl Fn(Bytes, usize) + 'static, // on_chunk impl FnOnce(Result<(FileOutputStream, usize), Error>) + 'static, // on_complete ), ) { io_stream.input_stream().read_bytes_async( chunk, priority, Some(&cancellable.clone()), move |result| match result { Ok(bytes) => { total += bytes.len(); on_chunk(bytes.clone(), total); if let Some(limit) = limit { if total > limit { return on_complete(Err(Error::BytesTotal(total, limit))); } } if bytes.len() == 0 { return on_complete(Ok((file_output_stream, total))); } // Make sure **all bytes** sent to the destination // > A partial write is performed with the size of a message block, which is 16kB // > https://docs.openssl.org/3.0/man3/SSL_write/#notes file_output_stream.clone().write_all_async( bytes.clone(), priority, Some(&cancellable.clone()), move |result| { match result { Ok(_) => { // continue read.. from_stream_async( io_stream, file_output_stream, cancellable, priority, (chunk, limit, total), (on_chunk, on_complete), ); } Err((b, e)) => on_complete(Err(Error::OutputStream(b, e))), } }, ); } Err(e) => on_complete(Err(Error::InputStream(e))), }, ) }