From 20270d6b308c72a1684935cfc5ecfd904eefa6cd Mon Sep 17 00:00:00 2001 From: yggverse Date: Thu, 31 Oct 2024 05:17:02 +0200 Subject: [PATCH] move body component to data::text namespace --- src/client/response.rs | 4 +- src/client/response/body.rs | 200 ------------------------- src/client/response/body/error.rs | 5 - src/client/response/data.rs | 3 +- src/client/response/data/text.rs | 121 +++++++++++++++ src/client/response/data/text/error.rs | 4 +- 6 files changed, 127 insertions(+), 210 deletions(-) delete mode 100644 src/client/response/body.rs delete mode 100644 src/client/response/body/error.rs create mode 100644 src/client/response/data/text.rs diff --git a/src/client/response.rs b/src/client/response.rs index fdfb6a8..e9ecdf2 100644 --- a/src/client/response.rs +++ b/src/client/response.rs @@ -1,8 +1,6 @@ -//! Read and parse Gemini response as GObject +//! Read and parse Gemini response as Object -pub mod body; pub mod data; pub mod meta; -pub use body::Body; pub use meta::Meta; diff --git a/src/client/response/body.rs b/src/client/response/body.rs deleted file mode 100644 index a00eda3..0000000 --- a/src/client/response/body.rs +++ /dev/null @@ -1,200 +0,0 @@ -pub mod error; -pub use error::Error; - -use gio::{ - prelude::{IOStreamExt, InputStreamExt}, - Cancellable, SocketConnection, -}; -use glib::{Bytes, GString, Priority}; - -pub const DEFAULT_CAPACITY: usize = 0x400; -pub const DEFAULT_MAX_SIZE: usize = 0xfffff; - -/// Body container with memory-overflow-safe, dynamically allocated [Bytes](https://docs.gtk.org/glib/struct.Bytes.html) buffer -/// -/// **Features** -/// -/// * configurable `capacity` and `max_size` options -/// * build-in [InputStream](https://docs.gtk.org/gio/class.InputStream.html) parser -/// -/// **Notice** -/// -/// * Recommended for gemtext documents -/// * For media types, use native stream processors (e.g. [Pixbuf](https://docs.gtk.org/gdk-pixbuf/ctor.Pixbuf.new_from_stream.html) for images) -pub struct Body { - buffer: Vec, - max_size: usize, -} - -impl Body { - // Constructors - - /// Create new empty `Self` with default `capacity` and `max_size` preset - pub fn new() -> Self { - Self::new_with_options(Some(DEFAULT_CAPACITY), Some(DEFAULT_MAX_SIZE)) - } - - /// Create new new `Self` with options - /// - /// Options: - /// * `capacity` initial bytes request to reduce extra memory reallocation (`DEFAULT_CAPACITY` if `None`) - /// * `max_size` max bytes to prevent memory overflow by unknown stream source (`DEFAULT_MAX_SIZE` if `None`) - pub fn new_with_options(capacity: Option, max_size: Option) -> Self { - Self { - buffer: Vec::with_capacity(match capacity { - Some(value) => value, - None => DEFAULT_CAPACITY, - }), - max_size: match max_size { - Some(value) => value, - None => DEFAULT_MAX_SIZE, - }, - } - } - - /// Simple way to create `Self` buffer from active [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) - /// - /// **Options** - /// * `socket_connection` - [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) to read bytes from - /// * `callback` function to apply on async operations complete, return `Result)>` - /// - /// **Notes** - /// - /// * method requires entire [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html), - /// not just [InputStream](https://docs.gtk.org/gio/class.InputStream.html) because of async features; - /// * use this method after `Header` bytes taken from input stream connected (otherwise, take a look on high-level `Response` parser) - pub fn from_socket_connection_async( - socket_connection: SocketConnection, - callback: impl FnOnce(Result)>) + 'static, - ) { - Self::read_all_from_socket_connection_async( - Self::new(), - socket_connection, - None, - None, - None, - callback, - ); - } - - // Actions - - /// Asynchronously read all [Bytes](https://docs.gtk.org/glib/struct.Bytes.html) - /// from [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) to `Self.buffer` by `chunk` - /// - /// Useful to grab entire stream without risk of memory overflow (according to `Self.max_size`), - /// reduce extra memory reallocations by `capacity` option. - /// - /// **Notes** - /// - /// We are using entire [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) reference - /// instead of [InputStream](https://docs.gtk.org/gio/class.InputStream.html) just to keep main connection alive in the async chunks context - /// - /// **Options** - /// * `socket_connection` - [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) to read bytes from - /// * `cancellable` - [Cancellable](https://docs.gtk.org/gio/class.Cancellable.html) or `None::<&Cancellable>` by default - /// * `priority` - [Priority::DEFAULT](https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html) by default - /// * `chunk` optional bytes count to read per chunk (`0x100` by default) - /// * `callback` function to apply on all async operations complete, return `Result)>` - pub fn read_all_from_socket_connection_async( - mut self, - socket_connection: SocketConnection, - cancelable: Option, - priority: Option, - chunk: Option, - callback: impl FnOnce(Result)>) + 'static, - ) { - socket_connection.input_stream().read_bytes_async( - match chunk { - Some(value) => value, - None => 0x100, - }, - match priority { - Some(value) => value, - None => Priority::DEFAULT, - }, - match cancelable.clone() { - Some(value) => Some(value), - None => None::, - } - .as_ref(), - move |result| match result { - Ok(bytes) => { - // No bytes were read, end of stream - if bytes.len() == 0 { - return callback(Ok(self)); - } - - // Save chunk to buffer - if let Err(reason) = self.push(bytes) { - return callback(Err((reason, None))); - }; - - // Continue bytes read.. - self.read_all_from_socket_connection_async( - socket_connection, - cancelable, - priority, - chunk, - callback, - ); - } - Err(reason) => callback(Err((Error::InputStreamRead, Some(reason.message())))), - }, - ); - } - - /// Push [Bytes](https://docs.gtk.org/glib/struct.Bytes.html) to `Self.buffer` - /// - /// Return `Error::Overflow` on `max_size` reached - pub fn push(&mut self, bytes: Bytes) -> Result { - // Calculate new size value - let total = self.buffer.len() + bytes.len(); - - // Validate overflow - if total > self.max_size { - return Err(Error::BufferOverflow); - } - - // Success - self.buffer.push(bytes); - - Ok(total) - } - - // Setters - - /// Set new `max_size` value, `DEFAULT_MAX_SIZE` if `None` - pub fn set_max_size(&mut self, value: Option) { - self.max_size = match value { - Some(size) => size, - None => DEFAULT_MAX_SIZE, - } - } - - // Getters - - /// Get reference to `Self.buffer` [Bytes](https://docs.gtk.org/glib/struct.Bytes.html) collected - pub fn buffer(&self) -> &Vec { - &self.buffer - } - - /// Return copy of `Self.buffer` [Bytes](https://docs.gtk.org/glib/struct.Bytes.html) as UTF-8 vector - pub fn to_utf8(&self) -> Vec { - self.buffer - .iter() - .flat_map(|byte| byte.iter()) - .cloned() - .collect() - } - - // Intentable getters - - /// Try convert `Self.buffer` [Bytes](https://docs.gtk.org/glib/struct.Bytes.html) to GString - pub fn to_gstring(&self) -> Result { - match GString::from_utf8(self.to_utf8()) { - Ok(result) => Ok(result), - Err(_) => Err(Error::Decode), - } - } -} diff --git a/src/client/response/body/error.rs b/src/client/response/body/error.rs deleted file mode 100644 index b90b9c2..0000000 --- a/src/client/response/body/error.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub enum Error { - Decode, - InputStreamRead, - BufferOverflow, -} diff --git a/src/client/response/data.rs b/src/client/response/data.rs index ca43b82..0009fe8 100644 --- a/src/client/response/data.rs +++ b/src/client/response/data.rs @@ -1,6 +1,7 @@ //! Gemini response could have different MIME type for data. -//! Use one of these components to parse response according to content type expected. +//! Use one of components below to parse response according to content type expected. //! //! * MIME type could be detected using `client::response::Meta` parser pub mod text; +pub use text::Text; diff --git a/src/client/response/data/text.rs b/src/client/response/data/text.rs new file mode 100644 index 0000000..10ed150 --- /dev/null +++ b/src/client/response/data/text.rs @@ -0,0 +1,121 @@ +//! Tools for Text-based response + +pub mod error; +pub use error::Error; + +// Local dependencies +use gio::{ + prelude::{IOStreamExt, InputStreamExt}, + Cancellable, SocketConnection, +}; +use glib::{GString, Priority}; + +// Default limits +pub const BUFFER_CAPACITY: usize = 0x400; // 1024 +pub const BUFFER_MAX_SIZE: usize = 0xfffff; // 1M + +/// Container for text-based response data +pub struct Text { + data: GString, +} + +impl Text { + // Constructors + + /// Create new `Self` with options + pub fn new(data: GString) -> Self { + Self { data } + } + + /// Create new `Self` from UTF-8 buffer + pub fn from_utf8(buffer: &[u8]) -> Result)> { + match GString::from_utf8(buffer.into()) { + Ok(data) => Ok(Self::new(data)), + Err(_) => Err((Error::Decode, None)), + } + } + + /// Asynchronously create new `Self` from [InputStream](https://docs.gtk.org/gio/class.InputStream.html) + /// for given [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) + pub fn from_socket_connection_async( + socket_connection: SocketConnection, + priority: Option, + cancellable: Option, + on_complete: impl FnOnce(Result)>) + 'static, + ) { + read_all_from_socket_connection_async( + Vec::with_capacity(BUFFER_CAPACITY), + socket_connection, + match cancellable { + Some(value) => Some(value), + None => None::, + }, + match priority { + Some(value) => value, + None => Priority::DEFAULT, + }, + |result| match result { + Ok(buffer) => on_complete(Self::from_utf8(&buffer)), + Err(reason) => on_complete(Err(reason)), + }, + ); + } + + // Getters + + /// Get reference to `Self` data + pub fn data(&self) -> &GString { + &self.data + } +} + +// Tools + +/// Asynchronously read all bytes from [InputStream](https://docs.gtk.org/gio/class.InputStream.html) +/// for given [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) +/// +/// Return UTF-8 buffer collected. +/// +/// * this function implements low-level helper for `Text::from_socket_connection_async`, also provides public API for external integrations +/// * requires `SocketConnection` instead of `InputStream` to keep connection alive (by increasing reference count in async context) @TODO +pub fn read_all_from_socket_connection_async( + mut buffer: Vec, + socket_connection: SocketConnection, + cancelable: Option, + priority: Priority, + callback: impl FnOnce(Result, (Error, Option<&str>)>) + 'static, +) { + socket_connection.input_stream().read_bytes_async( + BUFFER_CAPACITY, + priority, + cancelable.clone().as_ref(), + move |result| match result { + Ok(bytes) => { + // No bytes were read, end of stream + if bytes.len() == 0 { + return callback(Ok(buffer)); + } + + // Validate overflow + if buffer.len() + bytes.len() > BUFFER_MAX_SIZE { + return callback(Err((Error::BufferOverflow, None))); + } + + // Save chunks to buffer + for &byte in bytes.iter() { + buffer.push(byte); + } + + // Continue bytes reading + read_all_from_socket_connection_async( + buffer, + socket_connection, + cancelable, + priority, + callback, + ); + } + Err(reason) => callback(Err((Error::InputStream, Some(reason.message())))), + }, + ); +} diff --git a/src/client/response/data/text/error.rs b/src/client/response/data/text/error.rs index 8b32c89..3007499 100644 --- a/src/client/response/data/text/error.rs +++ b/src/client/response/data/text/error.rs @@ -1,4 +1,6 @@ #[derive(Debug)] pub enum Error { - // nothing yet.. + BufferOverflow, + Decode, + InputStream, }