From 62c0f5ea83a43f62c5076cb11abbc30427e9b9ee Mon Sep 17 00:00:00 2001 From: yggverse Date: Thu, 24 Oct 2024 15:36:47 +0300 Subject: [PATCH] add new constructors, add read_input_stream_async method, change read_input_stream api --- .../connection/input_stream/byte_buffer.rs | 108 +++++++++++++++--- .../input_stream/byte_buffer/error.rs | 3 +- 2 files changed, 92 insertions(+), 19 deletions(-) diff --git a/src/client/connection/input_stream/byte_buffer.rs b/src/client/connection/input_stream/byte_buffer.rs index fc7906e..2e23b5d 100644 --- a/src/client/connection/input_stream/byte_buffer.rs +++ b/src/client/connection/input_stream/byte_buffer.rs @@ -3,7 +3,7 @@ pub mod error; pub use error::Error; use gio::{prelude::InputStreamExt, Cancellable, InputStream}; -use glib::{object::IsA, Bytes}; +use glib::{object::IsA, Bytes, Priority}; pub const DEFAULT_CAPACITY: usize = 0x400; pub const DEFAULT_CHUNK_SIZE: usize = 0x100; @@ -14,25 +14,42 @@ pub struct ByteBuffer { } impl ByteBuffer { - /// Create dynamically allocated bytes buffer from `gio::InputStream` + // Constructors + + /// Create new dynamically allocated bytes buffer with default capacity + pub fn new() -> Self { + Self::with_capacity(Some(DEFAULT_CAPACITY)) + } + + /// Create new dynamically allocated bytes buffer with initial capacity /// /// Options: - /// * `capacity` bytes request to reduce extra memory overwrites (1024 by default) + /// * initial bytes request to reduce extra memory overwrites (1024 by default) + pub fn with_capacity(value: Option) -> Self { + Self { + bytes: Vec::with_capacity(match value { + Some(capacity) => capacity, + None => DEFAULT_CAPACITY, + }), + } + } + + // Readers + + /// Populate bytes buffer synchronously from `gio::InputStream` + /// + /// Options: + /// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html + /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html /// * `chunk_size` bytes limit to read per iter (256 by default) /// * `max_size` bytes limit to prevent memory overflow (1M by default) - pub fn from_input_stream( - input_stream: &InputStream, // @TODO + pub fn read_input_stream( + mut self, + input_stream: InputStream, cancellable: Option<&impl IsA>, - capacity: Option, chunk_size: Option, max_size: Option, ) -> Result { - // Create buffer with initial capacity - let mut buffer: Vec = Vec::with_capacity(match capacity { - Some(value) => value, - None => DEFAULT_CAPACITY, - }); - // Disallow unlimited buffer, use defaults on None let limit = match max_size { Some(value) => value, @@ -41,7 +58,7 @@ impl ByteBuffer { loop { // Check buffer size to prevent memory overflow - if buffer.len() > limit { + if self.bytes.len() > limit { return Err(Error::Overflow); } @@ -56,18 +73,73 @@ impl ByteBuffer { Ok(bytes) => { // No bytes were read, end of stream if bytes.len() == 0 { - break; + return Ok(self); } // Save chunk to buffer - buffer.push(bytes); + self.bytes.push(bytes); } - Err(_) => return Err(Error::Stream), + Err(_) => return Err(Error::StreamChunkRead), }; } + } - // Done - Ok(Self { bytes: buffer }) + /// Populate bytes buffer asynchronously from `gio::InputStream`, + /// apply callback function to `Ok(Self)` on success + /// + /// Options: + /// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html + /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html + /// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html + /// * `chunk_size` optional bytes limit to read per iter (256 by default) + /// * `max_size` optional bytes limit to prevent memory overflow (1M by default) + /// * `callback` user function to apply on complete + pub fn read_input_stream_async( + mut self, + input_stream: InputStream, + cancellable: Cancellable, + priority: Priority, + chunk_size: Option, + max_size: Option, + callback: impl FnOnce(Result) + 'static, + ) { + // Clone reference counted chunk dependencies + let _input_stream = input_stream.clone(); + let _cancellable = cancellable.clone(); + + // Continue bytes reading + input_stream.read_bytes_async( + match max_size { + Some(value) => value, + None => DEFAULT_MAX_SIZE, + }, + priority, + Some(&cancellable), + move |result| -> () { + match result { + Ok(bytes) => { + // No bytes were read, end of stream + if bytes.len() == 0 { + return callback(Ok(self)); + } + + // Save chunk to buffer + self.bytes.push(bytes); + + // Continue bytes reading... + self.read_input_stream_async( + _input_stream, + _cancellable, + priority, + chunk_size, + max_size, + callback, + ); + } + Err(_) => callback(Err(Error::StreamChunkReadAsync)), + } + }, + ); } /// Get link to bytes collected diff --git a/src/client/connection/input_stream/byte_buffer/error.rs b/src/client/connection/input_stream/byte_buffer/error.rs index 3a6960e..bab66ff 100644 --- a/src/client/connection/input_stream/byte_buffer/error.rs +++ b/src/client/connection/input_stream/byte_buffer/error.rs @@ -1,4 +1,5 @@ pub enum Error { Overflow, - Stream, + StreamChunkRead, + StreamChunkReadAsync, }