diff --git a/src/gio.rs b/src/gio.rs index 8206018..72a89d8 100644 --- a/src/gio.rs +++ b/src/gio.rs @@ -1,2 +1,3 @@ +pub mod file_output_stream; pub mod memory_input_stream; pub mod network_address; diff --git a/src/gio/file_output_stream.rs b/src/gio/file_output_stream.rs new file mode 100644 index 0000000..cfb8f78 --- /dev/null +++ b/src/gio/file_output_stream.rs @@ -0,0 +1,83 @@ +pub mod error; +pub use error::Error; + +use gio::{ + prelude::{IOStreamExt, InputStreamExt, OutputStreamExtManual}, + Cancellable, FileOutputStream, IOStream, +}; +use glib::{object::IsA, Bytes, Priority}; + +/// Asynchronously move all bytes from [IOStream](https://docs.gtk.org/gio/class.IOStream.html) +/// to [FileOutputStream](https://docs.gtk.org/gio/class.FileOutputStream.html) +/// * require `IOStream` reference to keep `Connection` active in async thread +pub fn move_all_from_stream_async( + base_io_stream: impl IsA, + file_output_stream: FileOutputStream, + cancellable: Cancellable, + priority: Priority, + bytes: ( + usize, // bytes_in_chunk + usize, // bytes_total_limit + usize, // bytes_total + ), + callback: ( + impl Fn((Bytes, usize)) + 'static, // on_chunk + impl FnOnce(Result) + 'static, // on_complete + ), +) { + let (on_chunk, on_complete) = callback; + let (bytes_in_chunk, bytes_total_limit, bytes_total) = bytes; + + base_io_stream.input_stream().read_bytes_async( + bytes_in_chunk, + priority, + Some(&cancellable.clone()), + move |result| match result { + Ok(bytes) => { + // Update bytes total + let bytes_total = bytes_total + bytes.len(); + + // Callback chunk function + on_chunk((bytes.clone(), bytes_total)); + + // Validate max size + if bytes_total > bytes_total_limit { + return on_complete(Err(Error::BytesTotal(bytes_total, bytes_total_limit))); + } + + // No bytes were read, end of stream + if bytes.len() == 0 { + return on_complete(Ok(file_output_stream)); + } + + // Write chunk bytes + file_output_stream.clone().write_async( + bytes.clone(), + priority, + Some(&cancellable.clone()), + move |result| { + match result { + Ok(_) => { + // Continue + move_all_from_stream_async( + base_io_stream, + file_output_stream, + cancellable, + priority, + (bytes_in_chunk, bytes_total_limit, bytes_total), + (on_chunk, on_complete), + ); + } + Err((bytes, e)) => { + on_complete(Err(Error::OutputStream(bytes.clone(), e))) + } + } + }, + ); + } + Err(e) => { + on_complete(Err(Error::InputStream(e))); + } + }, + ); +} diff --git a/src/gio/file_output_stream/error.rs b/src/gio/file_output_stream/error.rs new file mode 100644 index 0000000..6b1ee76 --- /dev/null +++ b/src/gio/file_output_stream/error.rs @@ -0,0 +1,24 @@ +use std::fmt::{Display, Formatter, Result}; + +#[derive(Debug)] +pub enum Error { + BytesTotal(usize, usize), + InputStream(glib::Error), + OutputStream(glib::Bytes, glib::Error), +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter) -> Result { + match self { + Self::BytesTotal(total, limit) => { + write!(f, "Bytes total limit reached: {total} / {limit}") + } + Self::InputStream(e) => { + write!(f, "Input stream error: {e}") + } + Self::OutputStream(_, e) => { + write!(f, "Output stream error: {e}") + } + } + } +}