From 496e9811389c2f3993c6a4ccc00e0be50cec2c81 Mon Sep 17 00:00:00 2001 From: yggverse Date: Thu, 24 Oct 2024 20:22:44 +0300 Subject: [PATCH] delegate buffer features to input level, delete byte_buffer, input_stream mods as deprecated --- src/client/connection.rs | 4 +- src/client/connection/input.rs | 156 +++++++++++++++++ src/client/connection/input/buffer.rs | 87 ++++++++++ .../byte_buffer => input/buffer}/error.rs | 0 src/client/connection/input/error.rs | 6 + src/client/connection/input_stream.rs | 4 - .../connection/input_stream/byte_buffer.rs | 158 ------------------ 7 files changed, 252 insertions(+), 163 deletions(-) create mode 100644 src/client/connection/input.rs create mode 100644 src/client/connection/input/buffer.rs rename src/client/connection/{input_stream/byte_buffer => input/buffer}/error.rs (100%) create mode 100644 src/client/connection/input/error.rs delete mode 100644 src/client/connection/input_stream.rs delete mode 100644 src/client/connection/input_stream/byte_buffer.rs diff --git a/src/client/connection.rs b/src/client/connection.rs index 5764660..7c2e44f 100644 --- a/src/client/connection.rs +++ b/src/client/connection.rs @@ -1,3 +1,5 @@ -pub mod input_stream; +pub mod input; + +pub use input::Input; // @TODO diff --git a/src/client/connection/input.rs b/src/client/connection/input.rs new file mode 100644 index 0000000..288d4e1 --- /dev/null +++ b/src/client/connection/input.rs @@ -0,0 +1,156 @@ +pub mod buffer; +pub mod error; + +pub use buffer::Buffer; +pub use error::Error; + +use gio::{prelude::InputStreamExt, Cancellable, InputStream}; +use glib::Priority; + +// Defaults +pub const DEFAULT_READ_CHUNK: usize = 0x100; + +pub struct Input { + buffer: Buffer, + stream: InputStream, +} + +impl Input { + // Constructors + + /// Create new `Input` from `gio::InputStream` + /// + /// https://docs.gtk.org/gio/class.InputStream.html + pub fn new_from_stream(stream: InputStream) -> Self { + Self { + buffer: Buffer::new(), + stream, + } + } + + // Actions + + /// Synchronously read all bytes from `gio::InputStream` to `input::Buffer` + /// + /// Options: + /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html + /// * `chunk` max bytes to read per chunk (256 by default) + pub fn read_all( + mut self, + cancelable: Option, + chunk: Option, + ) -> Result { + loop { + // Continue bytes reading + match self.stream.read_bytes( + match chunk { + Some(value) => value, + None => DEFAULT_READ_CHUNK, + }, + match cancelable.clone() { + Some(value) => Some(value), + None => None::, + } + .as_ref(), + ) { + Ok(bytes) => { + // No bytes were read, end of stream + if bytes.len() == 0 { + return Ok(self.buffer); + } + + // Save chunk to buffer + match self.buffer.push(bytes) { + Ok(_) => continue, + Err(buffer::Error::Overflow) => return Err(Error::BufferOverflow), + Err(_) => return Err(Error::BufferWrite), + }; + } + Err(_) => return Err(Error::StreamChunkRead), + }; + } + } + + /// Asynchronously read all bytes from `gio::InputStream` to `input::Buffer`, + /// + /// applies `callback` function on last byte reading complete. + /// + /// Options: + /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html (`None::<&Cancellable>` by default) + /// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html (`Priority::DEFAULT` by default) + /// * `chunk` optional max bytes to read per chunk (`DEFAULT_READ_CHUNK` by default) + /// * `callback` user function to apply on async iteration complete + pub fn read_all_async( + mut self, + cancelable: Option, + priority: Option, + chunk: Option, + callback: impl FnOnce(Result) + 'static, + ) { + // Continue bytes reading + self.stream.clone().read_bytes_async( + match chunk { + Some(value) => value, + None => DEFAULT_READ_CHUNK, + }, + match priority { + Some(value) => value, + None => Priority::DEFAULT, + }, + match cancelable.clone() { + Some(value) => Some(value), + None => None::, + } + .as_ref(), + move |result| { + match result { + Ok(bytes) => { + // No bytes were read, end of stream + if bytes.len() == 0 { + return callback(Ok(self.buffer)); + } + + // Save chunk to buffer + match self.buffer.push(bytes) { + Err(buffer::Error::Overflow) => { + return callback(Err(Error::BufferOverflow)) + } + + // Other errors related to write issues @TODO test + Err(_) => return callback(Err(Error::BufferWrite)), + + // Async function, nothing to return yet + _ => (), + }; + + // Continue bytes reading... + self.read_all_async(cancelable, priority, chunk, callback); + } + Err(_) => callback(Err(Error::StreamChunkReadAsync)), + } + }, + ); + } + + // Setters + + pub fn set_buffer(&mut self, buffer: Buffer) { + self.buffer = buffer; + } + + pub fn set_stream(&mut self, stream: InputStream) { + self.stream = stream; + } + + // Getters + + /// Get reference to `Buffer` + pub fn buffer(&self) -> &Buffer { + &self.buffer + } + + /// Get reference to `gio::InputStream` + pub fn stream(&self) -> &InputStream { + &self.stream + } +} diff --git a/src/client/connection/input/buffer.rs b/src/client/connection/input/buffer.rs new file mode 100644 index 0000000..b9264c7 --- /dev/null +++ b/src/client/connection/input/buffer.rs @@ -0,0 +1,87 @@ +pub mod error; +pub use error::Error; + +use glib::Bytes; + +pub const DEFAULT_CAPACITY: usize = 0x400; +pub const DEFAULT_MAX_SIZE: usize = 0xfffff; + +pub struct Buffer { + bytes: Vec, + max_size: usize, +} + +impl Buffer { + // Constructors + + /// Create new dynamically allocated `Buffer` with default `capacity` and `max_size` limit + pub fn new() -> Self { + Self::new_with_options(Some(DEFAULT_CAPACITY), Some(DEFAULT_MAX_SIZE)) + } + + /// Create new dynamically allocated `Buffer` with options + /// + /// Options: + /// * `capacity` initial bytes request to reduce extra memory overwrites (1024 by default) + /// * `max_size` max bytes to prevent memory overflow (1M by default) + pub fn new_with_options(capacity: Option, max_size: Option) -> Self { + Self { + bytes: Vec::with_capacity(match capacity { + Some(value) => value, + None => DEFAULT_CAPACITY, + }), + max_size: match max_size { + Some(value) => value, + None => DEFAULT_MAX_SIZE, + }, + } + } + + // Setters + + /// Set new `Buffer.max_size` value to prevent memory overflow + /// + /// Use `DEFAULT_MAX_SIZE` if `None` given. + pub fn set_max_size(&mut self, value: Option) { + self.max_size = match value { + Some(size) => size, + None => DEFAULT_MAX_SIZE, + } + } + + // Actions + + /// Push `glib::Bytes` to `Buffer.bytes` + /// + /// Return `Error::Overflow` on `Buffer.max_size` reached. + pub fn push(&mut self, bytes: Bytes) -> Result { + // Calculate new size value + let total = self.bytes.len() + bytes.len(); + + // Validate overflow + if total > self.max_size { + return Err(Error::Overflow); + } + + // Success + self.bytes.push(bytes); + + Ok(total) + } + + // Getters + + /// Get reference to bytes collected + pub fn bytes(&self) -> &Vec { + &self.bytes + } + + /// Return copy of bytes as UTF-8 vector + pub fn to_utf8(&self) -> Vec { + self.bytes + .iter() + .flat_map(|byte| byte.iter()) + .cloned() + .collect() + } +} diff --git a/src/client/connection/input_stream/byte_buffer/error.rs b/src/client/connection/input/buffer/error.rs similarity index 100% rename from src/client/connection/input_stream/byte_buffer/error.rs rename to src/client/connection/input/buffer/error.rs diff --git a/src/client/connection/input/error.rs b/src/client/connection/input/error.rs new file mode 100644 index 0000000..4bcff86 --- /dev/null +++ b/src/client/connection/input/error.rs @@ -0,0 +1,6 @@ +pub enum Error { + BufferOverflow, + BufferWrite, + StreamChunkRead, + StreamChunkReadAsync, +} diff --git a/src/client/connection/input_stream.rs b/src/client/connection/input_stream.rs deleted file mode 100644 index faef427..0000000 --- a/src/client/connection/input_stream.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod byte_buffer; -pub use byte_buffer::ByteBuffer; - -// @TODO diff --git a/src/client/connection/input_stream/byte_buffer.rs b/src/client/connection/input_stream/byte_buffer.rs deleted file mode 100644 index 2e23b5d..0000000 --- a/src/client/connection/input_stream/byte_buffer.rs +++ /dev/null @@ -1,158 +0,0 @@ -pub mod error; - -pub use error::Error; - -use gio::{prelude::InputStreamExt, Cancellable, InputStream}; -use glib::{object::IsA, Bytes, Priority}; - -pub const DEFAULT_CAPACITY: usize = 0x400; -pub const DEFAULT_CHUNK_SIZE: usize = 0x100; -pub const DEFAULT_MAX_SIZE: usize = 0xfffff; - -pub struct ByteBuffer { - bytes: Vec, -} - -impl ByteBuffer { - // Constructors - - /// Create new dynamically allocated bytes buffer with default capacity - pub fn new() -> Self { - Self::with_capacity(Some(DEFAULT_CAPACITY)) - } - - /// Create new dynamically allocated bytes buffer with initial capacity - /// - /// Options: - /// * initial bytes request to reduce extra memory overwrites (1024 by default) - pub fn with_capacity(value: Option) -> Self { - Self { - bytes: Vec::with_capacity(match value { - Some(capacity) => capacity, - None => DEFAULT_CAPACITY, - }), - } - } - - // Readers - - /// Populate bytes buffer synchronously from `gio::InputStream` - /// - /// Options: - /// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html - /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html - /// * `chunk_size` bytes limit to read per iter (256 by default) - /// * `max_size` bytes limit to prevent memory overflow (1M by default) - pub fn read_input_stream( - mut self, - input_stream: InputStream, - cancellable: Option<&impl IsA>, - chunk_size: Option, - max_size: Option, - ) -> Result { - // Disallow unlimited buffer, use defaults on None - let limit = match max_size { - Some(value) => value, - None => DEFAULT_MAX_SIZE, - }; - - loop { - // Check buffer size to prevent memory overflow - if self.bytes.len() > limit { - return Err(Error::Overflow); - } - - // Continue bytes reading - match input_stream.read_bytes( - match chunk_size { - Some(value) => value, - None => DEFAULT_CHUNK_SIZE, - }, - cancellable, - ) { - Ok(bytes) => { - // No bytes were read, end of stream - if bytes.len() == 0 { - return Ok(self); - } - - // Save chunk to buffer - self.bytes.push(bytes); - } - Err(_) => return Err(Error::StreamChunkRead), - }; - } - } - - /// Populate bytes buffer asynchronously from `gio::InputStream`, - /// apply callback function to `Ok(Self)` on success - /// - /// Options: - /// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html - /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html - /// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html - /// * `chunk_size` optional bytes limit to read per iter (256 by default) - /// * `max_size` optional bytes limit to prevent memory overflow (1M by default) - /// * `callback` user function to apply on complete - pub fn read_input_stream_async( - mut self, - input_stream: InputStream, - cancellable: Cancellable, - priority: Priority, - chunk_size: Option, - max_size: Option, - callback: impl FnOnce(Result) + 'static, - ) { - // Clone reference counted chunk dependencies - let _input_stream = input_stream.clone(); - let _cancellable = cancellable.clone(); - - // Continue bytes reading - input_stream.read_bytes_async( - match max_size { - Some(value) => value, - None => DEFAULT_MAX_SIZE, - }, - priority, - Some(&cancellable), - move |result| -> () { - match result { - Ok(bytes) => { - // No bytes were read, end of stream - if bytes.len() == 0 { - return callback(Ok(self)); - } - - // Save chunk to buffer - self.bytes.push(bytes); - - // Continue bytes reading... - self.read_input_stream_async( - _input_stream, - _cancellable, - priority, - chunk_size, - max_size, - callback, - ); - } - Err(_) => callback(Err(Error::StreamChunkReadAsync)), - } - }, - ); - } - - /// Get link to bytes collected - pub fn bytes(&self) -> &Vec { - &self.bytes - } - - /// Return a copy of the bytes in UTF-8 - pub fn to_utf8(&self) -> Vec { - self.bytes - .iter() - .flat_map(|byte| byte.iter()) - .cloned() - .collect() - } -}