mirror of
https://github.com/YGGverse/ggemini.git
synced 2026-03-31 17:15:31 +00:00
delegate buffer features to input level, delete byte_buffer, input_stream mods as deprecated
This commit is contained in:
parent
ee476e56d2
commit
496e981138
7 changed files with 252 additions and 163 deletions
|
|
@ -1,3 +1,5 @@
|
|||
pub mod input_stream;
|
||||
pub mod input;
|
||||
|
||||
pub use input::Input;
|
||||
|
||||
// @TODO
|
||||
|
|
|
|||
156
src/client/connection/input.rs
Normal file
156
src/client/connection/input.rs
Normal file
|
|
@ -0,0 +1,156 @@
|
|||
pub mod buffer;
|
||||
pub mod error;
|
||||
|
||||
pub use buffer::Buffer;
|
||||
pub use error::Error;
|
||||
|
||||
use gio::{prelude::InputStreamExt, Cancellable, InputStream};
|
||||
use glib::Priority;
|
||||
|
||||
// Defaults
|
||||
pub const DEFAULT_READ_CHUNK: usize = 0x100;
|
||||
|
||||
pub struct Input {
|
||||
buffer: Buffer,
|
||||
stream: InputStream,
|
||||
}
|
||||
|
||||
impl Input {
|
||||
// Constructors
|
||||
|
||||
/// Create new `Input` from `gio::InputStream`
|
||||
///
|
||||
/// https://docs.gtk.org/gio/class.InputStream.html
|
||||
pub fn new_from_stream(stream: InputStream) -> Self {
|
||||
Self {
|
||||
buffer: Buffer::new(),
|
||||
stream,
|
||||
}
|
||||
}
|
||||
|
||||
// Actions
|
||||
|
||||
/// Synchronously read all bytes from `gio::InputStream` to `input::Buffer`
|
||||
///
|
||||
/// Options:
|
||||
/// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html
|
||||
/// * `chunk` max bytes to read per chunk (256 by default)
|
||||
pub fn read_all(
|
||||
mut self,
|
||||
cancelable: Option<Cancellable>,
|
||||
chunk: Option<usize>,
|
||||
) -> Result<Buffer, Error> {
|
||||
loop {
|
||||
// Continue bytes reading
|
||||
match self.stream.read_bytes(
|
||||
match chunk {
|
||||
Some(value) => value,
|
||||
None => DEFAULT_READ_CHUNK,
|
||||
},
|
||||
match cancelable.clone() {
|
||||
Some(value) => Some(value),
|
||||
None => None::<Cancellable>,
|
||||
}
|
||||
.as_ref(),
|
||||
) {
|
||||
Ok(bytes) => {
|
||||
// No bytes were read, end of stream
|
||||
if bytes.len() == 0 {
|
||||
return Ok(self.buffer);
|
||||
}
|
||||
|
||||
// Save chunk to buffer
|
||||
match self.buffer.push(bytes) {
|
||||
Ok(_) => continue,
|
||||
Err(buffer::Error::Overflow) => return Err(Error::BufferOverflow),
|
||||
Err(_) => return Err(Error::BufferWrite),
|
||||
};
|
||||
}
|
||||
Err(_) => return Err(Error::StreamChunkRead),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// Asynchronously read all bytes from `gio::InputStream` to `input::Buffer`,
|
||||
///
|
||||
/// applies `callback` function on last byte reading complete.
|
||||
///
|
||||
/// Options:
|
||||
/// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html (`None::<&Cancellable>` by default)
|
||||
/// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html (`Priority::DEFAULT` by default)
|
||||
/// * `chunk` optional max bytes to read per chunk (`DEFAULT_READ_CHUNK` by default)
|
||||
/// * `callback` user function to apply on async iteration complete
|
||||
pub fn read_all_async(
|
||||
mut self,
|
||||
cancelable: Option<Cancellable>,
|
||||
priority: Option<Priority>,
|
||||
chunk: Option<usize>,
|
||||
callback: impl FnOnce(Result<Buffer, Error>) + 'static,
|
||||
) {
|
||||
// Continue bytes reading
|
||||
self.stream.clone().read_bytes_async(
|
||||
match chunk {
|
||||
Some(value) => value,
|
||||
None => DEFAULT_READ_CHUNK,
|
||||
},
|
||||
match priority {
|
||||
Some(value) => value,
|
||||
None => Priority::DEFAULT,
|
||||
},
|
||||
match cancelable.clone() {
|
||||
Some(value) => Some(value),
|
||||
None => None::<Cancellable>,
|
||||
}
|
||||
.as_ref(),
|
||||
move |result| {
|
||||
match result {
|
||||
Ok(bytes) => {
|
||||
// No bytes were read, end of stream
|
||||
if bytes.len() == 0 {
|
||||
return callback(Ok(self.buffer));
|
||||
}
|
||||
|
||||
// Save chunk to buffer
|
||||
match self.buffer.push(bytes) {
|
||||
Err(buffer::Error::Overflow) => {
|
||||
return callback(Err(Error::BufferOverflow))
|
||||
}
|
||||
|
||||
// Other errors related to write issues @TODO test
|
||||
Err(_) => return callback(Err(Error::BufferWrite)),
|
||||
|
||||
// Async function, nothing to return yet
|
||||
_ => (),
|
||||
};
|
||||
|
||||
// Continue bytes reading...
|
||||
self.read_all_async(cancelable, priority, chunk, callback);
|
||||
}
|
||||
Err(_) => callback(Err(Error::StreamChunkReadAsync)),
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Setters
|
||||
|
||||
pub fn set_buffer(&mut self, buffer: Buffer) {
|
||||
self.buffer = buffer;
|
||||
}
|
||||
|
||||
pub fn set_stream(&mut self, stream: InputStream) {
|
||||
self.stream = stream;
|
||||
}
|
||||
|
||||
// Getters
|
||||
|
||||
/// Get reference to `Buffer`
|
||||
pub fn buffer(&self) -> &Buffer {
|
||||
&self.buffer
|
||||
}
|
||||
|
||||
/// Get reference to `gio::InputStream`
|
||||
pub fn stream(&self) -> &InputStream {
|
||||
&self.stream
|
||||
}
|
||||
}
|
||||
87
src/client/connection/input/buffer.rs
Normal file
87
src/client/connection/input/buffer.rs
Normal file
|
|
@ -0,0 +1,87 @@
|
|||
pub mod error;
|
||||
pub use error::Error;
|
||||
|
||||
use glib::Bytes;
|
||||
|
||||
pub const DEFAULT_CAPACITY: usize = 0x400;
|
||||
pub const DEFAULT_MAX_SIZE: usize = 0xfffff;
|
||||
|
||||
pub struct Buffer {
|
||||
bytes: Vec<Bytes>,
|
||||
max_size: usize,
|
||||
}
|
||||
|
||||
impl Buffer {
|
||||
// Constructors
|
||||
|
||||
/// Create new dynamically allocated `Buffer` with default `capacity` and `max_size` limit
|
||||
pub fn new() -> Self {
|
||||
Self::new_with_options(Some(DEFAULT_CAPACITY), Some(DEFAULT_MAX_SIZE))
|
||||
}
|
||||
|
||||
/// Create new dynamically allocated `Buffer` with options
|
||||
///
|
||||
/// Options:
|
||||
/// * `capacity` initial bytes request to reduce extra memory overwrites (1024 by default)
|
||||
/// * `max_size` max bytes to prevent memory overflow (1M by default)
|
||||
pub fn new_with_options(capacity: Option<usize>, max_size: Option<usize>) -> Self {
|
||||
Self {
|
||||
bytes: Vec::with_capacity(match capacity {
|
||||
Some(value) => value,
|
||||
None => DEFAULT_CAPACITY,
|
||||
}),
|
||||
max_size: match max_size {
|
||||
Some(value) => value,
|
||||
None => DEFAULT_MAX_SIZE,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Setters
|
||||
|
||||
/// Set new `Buffer.max_size` value to prevent memory overflow
|
||||
///
|
||||
/// Use `DEFAULT_MAX_SIZE` if `None` given.
|
||||
pub fn set_max_size(&mut self, value: Option<usize>) {
|
||||
self.max_size = match value {
|
||||
Some(size) => size,
|
||||
None => DEFAULT_MAX_SIZE,
|
||||
}
|
||||
}
|
||||
|
||||
// Actions
|
||||
|
||||
/// Push `glib::Bytes` to `Buffer.bytes`
|
||||
///
|
||||
/// Return `Error::Overflow` on `Buffer.max_size` reached.
|
||||
pub fn push(&mut self, bytes: Bytes) -> Result<usize, Error> {
|
||||
// Calculate new size value
|
||||
let total = self.bytes.len() + bytes.len();
|
||||
|
||||
// Validate overflow
|
||||
if total > self.max_size {
|
||||
return Err(Error::Overflow);
|
||||
}
|
||||
|
||||
// Success
|
||||
self.bytes.push(bytes);
|
||||
|
||||
Ok(total)
|
||||
}
|
||||
|
||||
// Getters
|
||||
|
||||
/// Get reference to bytes collected
|
||||
pub fn bytes(&self) -> &Vec<Bytes> {
|
||||
&self.bytes
|
||||
}
|
||||
|
||||
/// Return copy of bytes as UTF-8 vector
|
||||
pub fn to_utf8(&self) -> Vec<u8> {
|
||||
self.bytes
|
||||
.iter()
|
||||
.flat_map(|byte| byte.iter())
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
6
src/client/connection/input/error.rs
Normal file
6
src/client/connection/input/error.rs
Normal file
|
|
@ -0,0 +1,6 @@
|
|||
pub enum Error {
|
||||
BufferOverflow,
|
||||
BufferWrite,
|
||||
StreamChunkRead,
|
||||
StreamChunkReadAsync,
|
||||
}
|
||||
|
|
@ -1,4 +0,0 @@
|
|||
pub mod byte_buffer;
|
||||
pub use byte_buffer::ByteBuffer;
|
||||
|
||||
// @TODO
|
||||
|
|
@ -1,158 +0,0 @@
|
|||
pub mod error;
|
||||
|
||||
pub use error::Error;
|
||||
|
||||
use gio::{prelude::InputStreamExt, Cancellable, InputStream};
|
||||
use glib::{object::IsA, Bytes, Priority};
|
||||
|
||||
pub const DEFAULT_CAPACITY: usize = 0x400;
|
||||
pub const DEFAULT_CHUNK_SIZE: usize = 0x100;
|
||||
pub const DEFAULT_MAX_SIZE: usize = 0xfffff;
|
||||
|
||||
pub struct ByteBuffer {
|
||||
bytes: Vec<Bytes>,
|
||||
}
|
||||
|
||||
impl ByteBuffer {
|
||||
// Constructors
|
||||
|
||||
/// Create new dynamically allocated bytes buffer with default capacity
|
||||
pub fn new() -> Self {
|
||||
Self::with_capacity(Some(DEFAULT_CAPACITY))
|
||||
}
|
||||
|
||||
/// Create new dynamically allocated bytes buffer with initial capacity
|
||||
///
|
||||
/// Options:
|
||||
/// * initial bytes request to reduce extra memory overwrites (1024 by default)
|
||||
pub fn with_capacity(value: Option<usize>) -> Self {
|
||||
Self {
|
||||
bytes: Vec::with_capacity(match value {
|
||||
Some(capacity) => capacity,
|
||||
None => DEFAULT_CAPACITY,
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
// Readers
|
||||
|
||||
/// Populate bytes buffer synchronously from `gio::InputStream`
|
||||
///
|
||||
/// Options:
|
||||
/// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html
|
||||
/// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html
|
||||
/// * `chunk_size` bytes limit to read per iter (256 by default)
|
||||
/// * `max_size` bytes limit to prevent memory overflow (1M by default)
|
||||
pub fn read_input_stream(
|
||||
mut self,
|
||||
input_stream: InputStream,
|
||||
cancellable: Option<&impl IsA<Cancellable>>,
|
||||
chunk_size: Option<usize>,
|
||||
max_size: Option<usize>,
|
||||
) -> Result<Self, Error> {
|
||||
// Disallow unlimited buffer, use defaults on None
|
||||
let limit = match max_size {
|
||||
Some(value) => value,
|
||||
None => DEFAULT_MAX_SIZE,
|
||||
};
|
||||
|
||||
loop {
|
||||
// Check buffer size to prevent memory overflow
|
||||
if self.bytes.len() > limit {
|
||||
return Err(Error::Overflow);
|
||||
}
|
||||
|
||||
// Continue bytes reading
|
||||
match input_stream.read_bytes(
|
||||
match chunk_size {
|
||||
Some(value) => value,
|
||||
None => DEFAULT_CHUNK_SIZE,
|
||||
},
|
||||
cancellable,
|
||||
) {
|
||||
Ok(bytes) => {
|
||||
// No bytes were read, end of stream
|
||||
if bytes.len() == 0 {
|
||||
return Ok(self);
|
||||
}
|
||||
|
||||
// Save chunk to buffer
|
||||
self.bytes.push(bytes);
|
||||
}
|
||||
Err(_) => return Err(Error::StreamChunkRead),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/// Populate bytes buffer asynchronously from `gio::InputStream`,
|
||||
/// apply callback function to `Ok(Self)` on success
|
||||
///
|
||||
/// Options:
|
||||
/// * `input_stream` https://docs.gtk.org/gio/class.InputStream.html
|
||||
/// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html
|
||||
/// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html
|
||||
/// * `chunk_size` optional bytes limit to read per iter (256 by default)
|
||||
/// * `max_size` optional bytes limit to prevent memory overflow (1M by default)
|
||||
/// * `callback` user function to apply on complete
|
||||
pub fn read_input_stream_async(
|
||||
mut self,
|
||||
input_stream: InputStream,
|
||||
cancellable: Cancellable,
|
||||
priority: Priority,
|
||||
chunk_size: Option<usize>,
|
||||
max_size: Option<usize>,
|
||||
callback: impl FnOnce(Result<Self, Error>) + 'static,
|
||||
) {
|
||||
// Clone reference counted chunk dependencies
|
||||
let _input_stream = input_stream.clone();
|
||||
let _cancellable = cancellable.clone();
|
||||
|
||||
// Continue bytes reading
|
||||
input_stream.read_bytes_async(
|
||||
match max_size {
|
||||
Some(value) => value,
|
||||
None => DEFAULT_MAX_SIZE,
|
||||
},
|
||||
priority,
|
||||
Some(&cancellable),
|
||||
move |result| -> () {
|
||||
match result {
|
||||
Ok(bytes) => {
|
||||
// No bytes were read, end of stream
|
||||
if bytes.len() == 0 {
|
||||
return callback(Ok(self));
|
||||
}
|
||||
|
||||
// Save chunk to buffer
|
||||
self.bytes.push(bytes);
|
||||
|
||||
// Continue bytes reading...
|
||||
self.read_input_stream_async(
|
||||
_input_stream,
|
||||
_cancellable,
|
||||
priority,
|
||||
chunk_size,
|
||||
max_size,
|
||||
callback,
|
||||
);
|
||||
}
|
||||
Err(_) => callback(Err(Error::StreamChunkReadAsync)),
|
||||
}
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
/// Get link to bytes collected
|
||||
pub fn bytes(&self) -> &Vec<Bytes> {
|
||||
&self.bytes
|
||||
}
|
||||
|
||||
/// Return a copy of the bytes in UTF-8
|
||||
pub fn to_utf8(&self) -> Vec<u8> {
|
||||
self.bytes
|
||||
.iter()
|
||||
.flat_map(|byte| byte.iter())
|
||||
.cloned()
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue