add new constructors, add read_input_stream_async method, change read_input_stream api

This commit is contained in:
yggverse 2024-10-24 15:36:47 +03:00
parent e8381892ef
commit 62c0f5ea83
2 changed files with 92 additions and 19 deletions

View file

@ -3,7 +3,7 @@ pub mod error;
pub use error::Error; pub use error::Error;
use gio::{prelude::InputStreamExt, Cancellable, InputStream}; use gio::{prelude::InputStreamExt, Cancellable, InputStream};
use glib::{object::IsA, Bytes}; use glib::{object::IsA, Bytes, Priority};
pub const DEFAULT_CAPACITY: usize = 0x400; pub const DEFAULT_CAPACITY: usize = 0x400;
pub const DEFAULT_CHUNK_SIZE: usize = 0x100; pub const DEFAULT_CHUNK_SIZE: usize = 0x100;
@ -14,25 +14,42 @@ pub struct ByteBuffer {
} }
impl ByteBuffer { impl ByteBuffer {
/// Create dynamically allocated bytes buffer from `gio::InputStream` // Constructors
/// Create new dynamically allocated bytes buffer with default capacity
pub fn new() -> Self {
Self::with_capacity(Some(DEFAULT_CAPACITY))
}
/// Create new dynamically allocated bytes buffer with initial capacity
/// ///
/// Options: /// Options:
/// * `capacity` bytes request to reduce extra memory overwrites (1024 by default) /// * initial bytes request to reduce extra memory overwrites (1024 by default)
pub fn with_capacity(value: Option<usize>) -> Self {
Self {
bytes: Vec::with_capacity(match value {
Some(capacity) => capacity,
None => DEFAULT_CAPACITY,
}),
}
}
// Readers
/// Populate bytes buffer synchronously from `gio::InputStream`
///
/// Options:
/// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html
/// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html
/// * `chunk_size` bytes limit to read per iter (256 by default) /// * `chunk_size` bytes limit to read per iter (256 by default)
/// * `max_size` bytes limit to prevent memory overflow (1M by default) /// * `max_size` bytes limit to prevent memory overflow (1M by default)
pub fn from_input_stream( pub fn read_input_stream(
input_stream: &InputStream, // @TODO mut self,
input_stream: InputStream,
cancellable: Option<&impl IsA<Cancellable>>, cancellable: Option<&impl IsA<Cancellable>>,
capacity: Option<usize>,
chunk_size: Option<usize>, chunk_size: Option<usize>,
max_size: Option<usize>, max_size: Option<usize>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
// Create buffer with initial capacity
let mut buffer: Vec<Bytes> = Vec::with_capacity(match capacity {
Some(value) => value,
None => DEFAULT_CAPACITY,
});
// Disallow unlimited buffer, use defaults on None // Disallow unlimited buffer, use defaults on None
let limit = match max_size { let limit = match max_size {
Some(value) => value, Some(value) => value,
@ -41,7 +58,7 @@ impl ByteBuffer {
loop { loop {
// Check buffer size to prevent memory overflow // Check buffer size to prevent memory overflow
if buffer.len() > limit { if self.bytes.len() > limit {
return Err(Error::Overflow); return Err(Error::Overflow);
} }
@ -56,18 +73,73 @@ impl ByteBuffer {
Ok(bytes) => { Ok(bytes) => {
// No bytes were read, end of stream // No bytes were read, end of stream
if bytes.len() == 0 { if bytes.len() == 0 {
break; return Ok(self);
} }
// Save chunk to buffer // Save chunk to buffer
buffer.push(bytes); self.bytes.push(bytes);
} }
Err(_) => return Err(Error::Stream), Err(_) => return Err(Error::StreamChunkRead),
}; };
} }
}
// Done /// Populate bytes buffer asynchronously from `gio::InputStream`,
Ok(Self { bytes: buffer }) /// apply callback function to `Ok(Self)` on success
///
/// Options:
/// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html
/// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html
/// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html
/// * `chunk_size` optional bytes limit to read per iter (256 by default)
/// * `max_size` optional bytes limit to prevent memory overflow (1M by default)
/// * `callback` user function to apply on complete
pub fn read_input_stream_async(
mut self,
input_stream: InputStream,
cancellable: Cancellable,
priority: Priority,
chunk_size: Option<usize>,
max_size: Option<usize>,
callback: impl FnOnce(Result<Self, Error>) + 'static,
) {
// Clone reference counted chunk dependencies
let _input_stream = input_stream.clone();
let _cancellable = cancellable.clone();
// Continue bytes reading
input_stream.read_bytes_async(
match max_size {
Some(value) => value,
None => DEFAULT_MAX_SIZE,
},
priority,
Some(&cancellable),
move |result| -> () {
match result {
Ok(bytes) => {
// No bytes were read, end of stream
if bytes.len() == 0 {
return callback(Ok(self));
}
// Save chunk to buffer
self.bytes.push(bytes);
// Continue bytes reading...
self.read_input_stream_async(
_input_stream,
_cancellable,
priority,
chunk_size,
max_size,
callback,
);
}
Err(_) => callback(Err(Error::StreamChunkReadAsync)),
}
},
);
} }
/// Get link to bytes collected /// Get link to bytes collected

View file

@ -1,4 +1,5 @@
pub enum Error { pub enum Error {
Overflow, Overflow,
Stream, StreamChunkRead,
StreamChunkReadAsync,
} }