replace SocketConnection with IOStream implementation (for future client certificate support)

This commit is contained in:
yggverse 2024-11-15 21:46:52 +02:00
parent c1c06667c9
commit d6dc4d6870
4 changed files with 49 additions and 104 deletions

View file

@ -1,6 +1,6 @@
[package] [package]
name = "ggemini" name = "ggemini"
version = "0.9.0" version = "0.10.0"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"
readme = "README.md" readme = "README.md"

View file

@ -6,9 +6,9 @@ pub use error::Error;
// Local dependencies // Local dependencies
use gio::{ use gio::{
prelude::{IOStreamExt, InputStreamExt}, prelude::{IOStreamExt, InputStreamExt},
Cancellable, SocketConnection, Cancellable, IOStream,
}; };
use glib::{GString, Priority}; use glib::{object::IsA, GString, Priority};
// Default limits // Default limits
pub const BUFFER_CAPACITY: usize = 0x400; // 1024 pub const BUFFER_CAPACITY: usize = 0x400; // 1024
@ -42,17 +42,16 @@ impl Text {
} }
} }
/// Asynchronously create new `Self` from [InputStream](https://docs.gtk.org/gio/class.InputStream.html) /// Asynchronously create new `Self` from [IOStream](https://docs.gtk.org/gio/class.IOStream.html)
/// for given [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) pub fn from_stream_async(
pub fn from_socket_connection_async( stream: impl IsA<IOStream>,
socket_connection: SocketConnection,
priority: Option<Priority>, priority: Option<Priority>,
cancellable: Option<Cancellable>, cancellable: Option<Cancellable>,
on_complete: impl FnOnce(Result<Self, (Error, Option<&str>)>) + 'static, on_complete: impl FnOnce(Result<Self, (Error, Option<&str>)>) + 'static,
) { ) {
read_all_from_socket_connection_async( read_all_from_stream_async(
Vec::with_capacity(BUFFER_CAPACITY), Vec::with_capacity(BUFFER_CAPACITY),
socket_connection, stream,
match cancellable { match cancellable {
Some(value) => Some(value), Some(value) => Some(value),
None => None::<Cancellable>, None => None::<Cancellable>,
@ -78,21 +77,18 @@ impl Text {
// Tools // Tools
/// Asynchronously read all bytes from [InputStream](https://docs.gtk.org/gio/class.InputStream.html) /// Asynchronously read all bytes from [IOStream](https://docs.gtk.org/gio/class.IOStream.html)
/// for given [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html)
/// ///
/// Return UTF-8 buffer collected. /// Return UTF-8 buffer collected
/// /// * require `IOStream` reference to keep `Connection` active in async thread
/// * this function implements low-level helper for `Text::from_socket_connection_async`, also provides public API for external integrations pub fn read_all_from_stream_async(
/// * requires `SocketConnection` instead of `InputStream` to keep connection alive (by increasing reference count in async context) @TODO
pub fn read_all_from_socket_connection_async(
mut buffer: Vec<u8>, mut buffer: Vec<u8>,
socket_connection: SocketConnection, stream: impl IsA<IOStream>,
cancelable: Option<Cancellable>, cancelable: Option<Cancellable>,
priority: Priority, priority: Priority,
callback: impl FnOnce(Result<Vec<u8>, (Error, Option<&str>)>) + 'static, callback: impl FnOnce(Result<Vec<u8>, (Error, Option<&str>)>) + 'static,
) { ) {
socket_connection.input_stream().read_bytes_async( stream.input_stream().read_bytes_async(
BUFFER_CAPACITY, BUFFER_CAPACITY,
priority, priority,
cancelable.clone().as_ref(), cancelable.clone().as_ref(),
@ -114,13 +110,7 @@ pub fn read_all_from_socket_connection_async(
} }
// Continue bytes reading // Continue bytes reading
read_all_from_socket_connection_async( read_all_from_stream_async(buffer, stream, cancelable, priority, callback);
buffer,
socket_connection,
cancelable,
priority,
callback,
);
} }
Err(reason) => callback(Err((Error::InputStream, Some(reason.message())))), Err(reason) => callback(Err((Error::InputStream, Some(reason.message())))),
}, },

View file

@ -15,9 +15,9 @@ pub use status::Status;
use gio::{ use gio::{
prelude::{IOStreamExt, InputStreamExtManual}, prelude::{IOStreamExt, InputStreamExtManual},
Cancellable, SocketConnection, Cancellable, IOStream,
}; };
use glib::Priority; use glib::{object::IsA, Priority};
pub const MAX_LEN: usize = 0x400; // 1024 pub const MAX_LEN: usize = 0x400; // 1024
@ -95,17 +95,16 @@ impl Meta {
} }
} }
/// Asynchronously create new `Self` from [InputStream](https://docs.gtk.org/gio/class.InputStream.html) /// Asynchronously create new `Self` from [IOStream](https://docs.gtk.org/gio/class.IOStream.html)
/// for given [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) pub fn from_stream_async(
pub fn from_socket_connection_async( stream: impl IsA<IOStream>,
socket_connection: SocketConnection,
priority: Option<Priority>, priority: Option<Priority>,
cancellable: Option<Cancellable>, cancellable: Option<Cancellable>,
on_complete: impl FnOnce(Result<Self, (Error, Option<&str>)>) + 'static, on_complete: impl FnOnce(Result<Self, (Error, Option<&str>)>) + 'static,
) { ) {
read_from_socket_connection_async( read_from_stream_async(
Vec::with_capacity(MAX_LEN), Vec::with_capacity(MAX_LEN),
socket_connection, stream,
match cancellable { match cancellable {
Some(value) => Some(value), Some(value) => Some(value),
None => None::<Cancellable>, None => None::<Cancellable>,
@ -138,21 +137,18 @@ impl Meta {
// Tools // Tools
/// Asynchronously read all meta bytes from [InputStream](https://docs.gtk.org/gio/class.InputStream.html) /// Asynchronously read all meta bytes from [IOStream](https://docs.gtk.org/gio/class.IOStream.html)
/// for given [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html)
/// ///
/// Return UTF-8 buffer collected. /// Return UTF-8 buffer collected
/// /// * require `IOStream` reference to keep `Connection` active in async thread
/// * this function implements low-level helper for `Meta::from_socket_connection_async`, also provides public API for external integrations pub fn read_from_stream_async(
/// * requires `SocketConnection` instead of `InputStream` to keep connection alive (by increasing reference count in async context) @TODO
pub fn read_from_socket_connection_async(
mut buffer: Vec<u8>, mut buffer: Vec<u8>,
connection: SocketConnection, stream: impl IsA<IOStream>,
cancellable: Option<Cancellable>, cancellable: Option<Cancellable>,
priority: Priority, priority: Priority,
on_complete: impl FnOnce(Result<Vec<u8>, (Error, Option<&str>)>) + 'static, on_complete: impl FnOnce(Result<Vec<u8>, (Error, Option<&str>)>) + 'static,
) { ) {
connection.input_stream().read_async( stream.input_stream().read_async(
vec![0], vec![0],
priority, priority,
cancellable.clone().as_ref(), cancellable.clone().as_ref(),
@ -165,9 +161,9 @@ pub fn read_from_socket_connection_async(
// Read next byte without record // Read next byte without record
if bytes.contains(&b'\r') { if bytes.contains(&b'\r') {
return read_from_socket_connection_async( return read_from_stream_async(
buffer, buffer,
connection, stream,
cancellable, cancellable,
priority, priority,
on_complete, on_complete,
@ -183,13 +179,7 @@ pub fn read_from_socket_connection_async(
buffer.append(&mut bytes); buffer.append(&mut bytes);
// Continue // Continue
read_from_socket_connection_async( read_from_stream_async(buffer, stream, cancellable, priority, on_complete);
buffer,
connection,
cancellable,
priority,
on_complete,
);
} }
Err((_, reason)) => on_complete(Err((Error::InputStream, Some(reason.message())))), Err((_, reason)) => on_complete(Err((Error::InputStream, Some(reason.message())))),
}, },

View file

@ -3,33 +3,18 @@ pub use error::Error;
use gio::{ use gio::{
prelude::{IOStreamExt, InputStreamExt, MemoryInputStreamExt}, prelude::{IOStreamExt, InputStreamExt, MemoryInputStreamExt},
Cancellable, MemoryInputStream, SocketConnection, Cancellable, IOStream, MemoryInputStream,
}; };
use glib::{Bytes, Priority}; use glib::{object::IsA, Bytes, Priority};
/// Asynchronously create new [MemoryInputStream](https://docs.gtk.org/gio/class.MemoryInputStream.html) /// Asynchronously create new [MemoryInputStream](https://docs.gtk.org/gio/class.MemoryInputStream.html)
/// from [InputStream](https://docs.gtk.org/gio/class.InputStream.html) /// from [IOStream](https://docs.gtk.org/gio/class.IOStream.html)
/// for given [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html)
/// ///
/// Useful to create dynamically allocated, memory-safe buffer /// **Useful for**
/// from remote connections, where final size of target data could not be known by Gemini protocol restrictions. /// * safe read (of memory overflow) to dynamically allocated buffer, where final size of target data unknown
/// Also, could be useful for [Pixbuf](https://docs.gtk.org/gdk-pixbuf/class.Pixbuf.html) or /// * calculate bytes processed on chunk load
/// loading widgets like [Spinner](https://gnome.pages.gitlab.gnome.org/libadwaita/doc/main/class.Spinner.html) pub fn from_stream_async(
/// to display bytes on async data loading. base_io_stream: impl IsA<IOStream>,
///
/// * this function takes entire `SocketConnection` reference (not `MemoryInputStream`) just to keep connection alive in the async context
///
/// **Implementation**
///
/// Implements low-level `read_all_from_socket_connection_async` function:
/// * recursively read all bytes from `InputStream` for `SocketConnection` according to `bytes_in_chunk` argument
/// * calculates total bytes length on every chunk iteration, validate sum with `bytes_total_limit` argument
/// * stop reading `InputStream` with `Result` on zero bytes in chunk received
/// * applies callback functions:
/// * `on_chunk` - return reference to [Bytes](https://docs.gtk.org/glib/struct.Bytes.html) and `bytes_total` collected for every chunk in reading loop
/// * `on_complete` - return `MemoryInputStream` on success or `Error` on failure as `Result`
pub fn from_socket_connection_async(
socket_connection: SocketConnection,
cancelable: Option<Cancellable>, cancelable: Option<Cancellable>,
priority: Priority, priority: Priority,
bytes_in_chunk: usize, bytes_in_chunk: usize,
@ -37,9 +22,9 @@ pub fn from_socket_connection_async(
on_chunk: impl Fn((Bytes, usize)) + 'static, on_chunk: impl Fn((Bytes, usize)) + 'static,
on_complete: impl FnOnce(Result<MemoryInputStream, (Error, Option<&str>)>) + 'static, on_complete: impl FnOnce(Result<MemoryInputStream, (Error, Option<&str>)>) + 'static,
) { ) {
read_all_from_socket_connection_async( read_all_from_stream_async(
MemoryInputStream::new(), MemoryInputStream::new(),
socket_connection, base_io_stream,
cancelable, cancelable,
priority, priority,
bytes_in_chunk, bytes_in_chunk,
@ -50,32 +35,12 @@ pub fn from_socket_connection_async(
); );
} }
/// Low-level helper for `from_socket_connection_async` function, /// Asynchronously read entire [InputStream](https://docs.gtk.org/gio/class.InputStream.html)
/// also provides public API for external usage. /// from [IOStream](https://docs.gtk.org/gio/class.IOStream.html)
/// /// * require `IOStream` reference to keep `Connection` active in async thread
/// Asynchronously read [InputStream](https://docs.gtk.org/gio/class.InputStream.html) pub fn read_all_from_stream_async(
/// from [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html)
/// to given [MemoryInputStream](https://docs.gtk.org/gio/class.MemoryInputStream.html).
///
/// Useful to create dynamically allocated, memory-safe buffer
/// from remote connections, where final size of target data could not be known by Gemini protocol restrictions.
/// Also, could be useful for [Pixbuf](https://docs.gtk.org/gdk-pixbuf/class.Pixbuf.html) or
/// loading widgets like [Spinner](https://gnome.pages.gitlab.gnome.org/libadwaita/doc/main/class.Spinner.html)
/// to display bytes on async data loading.
///
/// * this function takes entire `SocketConnection` reference (not `MemoryInputStream`) just to keep connection alive in the async context
///
/// **Implementation**
///
/// * recursively read all bytes from `InputStream` for `SocketConnection` according to `bytes_in_chunk` argument
/// * calculates total bytes length on every chunk iteration, validate sum with `bytes_total_limit` argument
/// * stop reading `InputStream` with `Result` on zero bytes in chunk received, otherwise continue next chunk request in loop
/// * applies callback functions:
/// * `on_chunk` - return reference to [Bytes](https://docs.gtk.org/glib/struct.Bytes.html) and `bytes_total` collected for every chunk in reading loop
/// * `on_complete` - return `MemoryInputStream` on success or `Error` on failure as `Result`
pub fn read_all_from_socket_connection_async(
memory_input_stream: MemoryInputStream, memory_input_stream: MemoryInputStream,
socket_connection: SocketConnection, base_io_stream: impl IsA<IOStream>,
cancelable: Option<Cancellable>, cancelable: Option<Cancellable>,
priority: Priority, priority: Priority,
bytes_in_chunk: usize, bytes_in_chunk: usize,
@ -84,7 +49,7 @@ pub fn read_all_from_socket_connection_async(
on_chunk: impl Fn((Bytes, usize)) + 'static, on_chunk: impl Fn((Bytes, usize)) + 'static,
on_complete: impl FnOnce(Result<MemoryInputStream, (Error, Option<&str>)>) + 'static, on_complete: impl FnOnce(Result<MemoryInputStream, (Error, Option<&str>)>) + 'static,
) { ) {
socket_connection.input_stream().read_bytes_async( base_io_stream.input_stream().read_bytes_async(
bytes_in_chunk, bytes_in_chunk,
priority, priority,
cancelable.clone().as_ref(), cancelable.clone().as_ref(),
@ -110,9 +75,9 @@ pub fn read_all_from_socket_connection_async(
memory_input_stream.add_bytes(&bytes); memory_input_stream.add_bytes(&bytes);
// Continue // Continue
read_all_from_socket_connection_async( read_all_from_stream_async(
memory_input_stream, memory_input_stream,
socket_connection, base_io_stream,
cancelable, cancelable,
priority, priority,
bytes_in_chunk, bytes_in_chunk,