diff --git a/src/client.rs b/src/client.rs index 3158ff2..2690c91 100644 --- a/src/client.rs +++ b/src/client.rs @@ -7,35 +7,40 @@ pub use error::Error; pub use response::Response; pub use socket::Socket; -use gio::Cancellable; -use glib::{Priority, Uri}; +use glib::Uri; -/// High-level API to make async request, get `Response` and close connection. -pub fn request_async( +/// High-level API to make single async request +/// +/// 1. open new socket connection for [Uri](https://docs.gtk.org/glib/struct.Uri.html) +/// 2. send request +/// 3. read response +/// 4. close connection +/// 5. return `Result` to `callback` function +pub fn simple_socket_request_async( uri: Uri, - cancelable: Option, - priority: Option, callback: impl FnOnce(Result) + 'static, ) { - // Create new socket connection - Socket::new().connect_async( - uri.clone(), - cancelable.clone(), - move |connect| match connect { - Ok(connection) => { - connection.request_async(uri, cancelable, priority, None, |request| { - callback(match request { - Ok(buffer) => match Response::from_utf8(&buffer) { - Ok(response) => Ok(response), - Err(_) => Err(Error::Response), - }, - Err(_) => Err(Error::Request), - }); - - //connection.close_async(cancelable, priority, |_| {}); // @TODO - }) - } - Err(_) => callback(Err(Error::Connection)), - }, - ); + Socket::new().connect_async(uri.clone(), None, move |connect| match connect { + Ok(connection) => { + connection.request_async(uri, None, None, None, move |connection, response| { + connection.close_async( + None, + None, + Some(|close| { + callback(match close { + Ok(_) => match response { + Ok(buffer) => match Response::from_utf8(&buffer) { + Ok(response) => Ok(response), + Err(_) => Err(Error::Response), + }, + Err(_) => Err(Error::Request), + }, + Err(_) => Err(Error::Close), + }) + }), + ); + }) + } + Err(_) => callback(Err(Error::Connection)), + }); } diff --git a/src/client/connection.rs b/src/client/connection.rs deleted file mode 100644 index 7c2e44f..0000000 --- a/src/client/connection.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod input; - -pub use input::Input; - -// @TODO diff --git a/src/client/connection/input.rs b/src/client/connection/input.rs deleted file mode 100644 index 288d4e1..0000000 --- a/src/client/connection/input.rs +++ /dev/null @@ -1,156 +0,0 @@ -pub mod buffer; -pub mod error; - -pub use buffer::Buffer; -pub use error::Error; - -use gio::{prelude::InputStreamExt, Cancellable, InputStream}; -use glib::Priority; - -// Defaults -pub const DEFAULT_READ_CHUNK: usize = 0x100; - -pub struct Input { - buffer: Buffer, - stream: InputStream, -} - -impl Input { - // Constructors - - /// Create new `Input` from `gio::InputStream` - /// - /// https://docs.gtk.org/gio/class.InputStream.html - pub fn new_from_stream(stream: InputStream) -> Self { - Self { - buffer: Buffer::new(), - stream, - } - } - - // Actions - - /// Synchronously read all bytes from `gio::InputStream` to `input::Buffer` - /// - /// Options: - /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html - /// * `chunk` max bytes to read per chunk (256 by default) - pub fn read_all( - mut self, - cancelable: Option, - chunk: Option, - ) -> Result { - loop { - // Continue bytes reading - match self.stream.read_bytes( - match chunk { - Some(value) => value, - None => DEFAULT_READ_CHUNK, - }, - match cancelable.clone() { - Some(value) => Some(value), - None => None::, - } - .as_ref(), - ) { - Ok(bytes) => { - // No bytes were read, end of stream - if bytes.len() == 0 { - return Ok(self.buffer); - } - - // Save chunk to buffer - match self.buffer.push(bytes) { - Ok(_) => continue, - Err(buffer::Error::Overflow) => return Err(Error::BufferOverflow), - Err(_) => return Err(Error::BufferWrite), - }; - } - Err(_) => return Err(Error::StreamChunkRead), - }; - } - } - - /// Asynchronously read all bytes from `gio::InputStream` to `input::Buffer`, - /// - /// applies `callback` function on last byte reading complete. - /// - /// Options: - /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html (`None::<&Cancellable>` by default) - /// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html (`Priority::DEFAULT` by default) - /// * `chunk` optional max bytes to read per chunk (`DEFAULT_READ_CHUNK` by default) - /// * `callback` user function to apply on async iteration complete - pub fn read_all_async( - mut self, - cancelable: Option, - priority: Option, - chunk: Option, - callback: impl FnOnce(Result) + 'static, - ) { - // Continue bytes reading - self.stream.clone().read_bytes_async( - match chunk { - Some(value) => value, - None => DEFAULT_READ_CHUNK, - }, - match priority { - Some(value) => value, - None => Priority::DEFAULT, - }, - match cancelable.clone() { - Some(value) => Some(value), - None => None::, - } - .as_ref(), - move |result| { - match result { - Ok(bytes) => { - // No bytes were read, end of stream - if bytes.len() == 0 { - return callback(Ok(self.buffer)); - } - - // Save chunk to buffer - match self.buffer.push(bytes) { - Err(buffer::Error::Overflow) => { - return callback(Err(Error::BufferOverflow)) - } - - // Other errors related to write issues @TODO test - Err(_) => return callback(Err(Error::BufferWrite)), - - // Async function, nothing to return yet - _ => (), - }; - - // Continue bytes reading... - self.read_all_async(cancelable, priority, chunk, callback); - } - Err(_) => callback(Err(Error::StreamChunkReadAsync)), - } - }, - ); - } - - // Setters - - pub fn set_buffer(&mut self, buffer: Buffer) { - self.buffer = buffer; - } - - pub fn set_stream(&mut self, stream: InputStream) { - self.stream = stream; - } - - // Getters - - /// Get reference to `Buffer` - pub fn buffer(&self) -> &Buffer { - &self.buffer - } - - /// Get reference to `gio::InputStream` - pub fn stream(&self) -> &InputStream { - &self.stream - } -} diff --git a/src/client/connection/input/buffer.rs b/src/client/connection/input/buffer.rs deleted file mode 100644 index b9264c7..0000000 --- a/src/client/connection/input/buffer.rs +++ /dev/null @@ -1,87 +0,0 @@ -pub mod error; -pub use error::Error; - -use glib::Bytes; - -pub const DEFAULT_CAPACITY: usize = 0x400; -pub const DEFAULT_MAX_SIZE: usize = 0xfffff; - -pub struct Buffer { - bytes: Vec, - max_size: usize, -} - -impl Buffer { - // Constructors - - /// Create new dynamically allocated `Buffer` with default `capacity` and `max_size` limit - pub fn new() -> Self { - Self::new_with_options(Some(DEFAULT_CAPACITY), Some(DEFAULT_MAX_SIZE)) - } - - /// Create new dynamically allocated `Buffer` with options - /// - /// Options: - /// * `capacity` initial bytes request to reduce extra memory overwrites (1024 by default) - /// * `max_size` max bytes to prevent memory overflow (1M by default) - pub fn new_with_options(capacity: Option, max_size: Option) -> Self { - Self { - bytes: Vec::with_capacity(match capacity { - Some(value) => value, - None => DEFAULT_CAPACITY, - }), - max_size: match max_size { - Some(value) => value, - None => DEFAULT_MAX_SIZE, - }, - } - } - - // Setters - - /// Set new `Buffer.max_size` value to prevent memory overflow - /// - /// Use `DEFAULT_MAX_SIZE` if `None` given. - pub fn set_max_size(&mut self, value: Option) { - self.max_size = match value { - Some(size) => size, - None => DEFAULT_MAX_SIZE, - } - } - - // Actions - - /// Push `glib::Bytes` to `Buffer.bytes` - /// - /// Return `Error::Overflow` on `Buffer.max_size` reached. - pub fn push(&mut self, bytes: Bytes) -> Result { - // Calculate new size value - let total = self.bytes.len() + bytes.len(); - - // Validate overflow - if total > self.max_size { - return Err(Error::Overflow); - } - - // Success - self.bytes.push(bytes); - - Ok(total) - } - - // Getters - - /// Get reference to bytes collected - pub fn bytes(&self) -> &Vec { - &self.bytes - } - - /// Return copy of bytes as UTF-8 vector - pub fn to_utf8(&self) -> Vec { - self.bytes - .iter() - .flat_map(|byte| byte.iter()) - .cloned() - .collect() - } -} diff --git a/src/client/connection/input/buffer/error.rs b/src/client/connection/input/buffer/error.rs deleted file mode 100644 index bab66ff..0000000 --- a/src/client/connection/input/buffer/error.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub enum Error { - Overflow, - StreamChunkRead, - StreamChunkReadAsync, -} diff --git a/src/client/connection/input/error.rs b/src/client/connection/input/error.rs deleted file mode 100644 index 4bcff86..0000000 --- a/src/client/connection/input/error.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub enum Error { - BufferOverflow, - BufferWrite, - StreamChunkRead, - StreamChunkReadAsync, -} diff --git a/src/client/error.rs b/src/client/error.rs index d14b8f1..10cda29 100644 --- a/src/client/error.rs +++ b/src/client/error.rs @@ -1,5 +1,6 @@ pub enum Error { + Close, Connection, - Response, Request, + Response, } diff --git a/src/client/socket.rs b/src/client/socket.rs index 1146cf1..6729e18 100644 --- a/src/client/socket.rs +++ b/src/client/socket.rs @@ -13,6 +13,7 @@ use glib::Uri; pub struct Socket { client: SocketClient, + default_port: u16, } impl Socket { @@ -26,7 +27,10 @@ impl Socket { client.set_tls_validation_flags(TlsCertificateFlags::INSECURE); client.set_tls(true); - Self { client } + Self { + client, + default_port: DEFAULT_PORT, + } } // Actions @@ -38,7 +42,7 @@ impl Socket { ) { self.client.connect_to_uri_async( uri.to_str().as_str(), - DEFAULT_PORT, + self.default_port, match cancelable.clone() { Some(value) => Some(value), None => None::, @@ -53,9 +57,18 @@ impl Socket { ); } + // Setters + + /// Set default port for socket connections (1965 by default) + pub fn set_default_port(&mut self, default_port: u16) { + self.default_port = default_port; + } + // Getters - /// Return ref to `gio::SocketClient` GObject + /// Get reference to `gio::SocketClient` + /// + /// https://docs.gtk.org/gio/class.SocketClient.html pub fn client(&self) -> &SocketClient { self.client.as_ref() } diff --git a/src/client/socket/connection.rs b/src/client/socket/connection.rs index b9b6e2f..bf187ab 100644 --- a/src/client/socket/connection.rs +++ b/src/client/socket/connection.rs @@ -16,61 +16,62 @@ pub struct Connection { impl Connection { // Constructors + /// Create new `Self` from [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) pub fn new_from(connection: SocketConnection) -> Self { Self { connection } } // Actions - /// MIddle-level API to make single async socket request for current connection: + /// Middle-level API to make async socket request for current connection: /// - /// 1. send `Uri` request to the ouput stream; - /// 2. write entire input stream into the new `Vec` buffer on success; - /// 3. close current connection if callback function does not prevent that action by return. + /// 1. send request for [Uri](https://docs.gtk.org/glib/struct.Uri.html) + /// to the ouput [OutputStream](https://docs.gtk.org/gio/class.OutputStream.html); + /// 2. write entire [InputStream](https://docs.gtk.org/gio/class.InputStream.html) + /// into `Vec` buffer on success; + /// 3. return taken `Self` with `Result(Vec, Error)` on complete. pub fn request_async( self, uri: Uri, cancelable: Option, priority: Option, chunk: Option, - callback: impl FnOnce(Result, Error>) + 'static, + callback: impl FnOnce(Self, Result, Error>) + 'static, ) { - // Send request Output::new_from_stream(self.connection.output_stream()).write_async( &Bytes::from(gformat!("{}\r\n", uri.to_str()).as_bytes()), cancelable.clone(), priority, move |output| match output { Ok(_) => { - // Read response Input::new_from_stream(self.connection.input_stream()).read_all_async( cancelable.clone(), priority, chunk, move |input| { - // Apply callback function - callback(match input { - Ok(buffer) => Ok(buffer.to_utf8()), - Err(error) => Err(match error { - input::Error::BufferOverflow => Error::InputBufferOverflow, - input::Error::BufferWrite => Error::InputBufferWrite, - input::Error::StreamChunkRead => Error::InputStreamChunkRead, - }), - }); - - // Close connection if callback act does not prevent that - self.close_async(cancelable, priority, |_| {}); // @TODO + callback( + self, + match input { + Ok(buffer) => Ok(buffer.to_utf8()), + Err(error) => Err(match error { + input::Error::BufferOverflow => Error::InputBufferOverflow, + input::Error::BufferWrite => Error::InputBufferWrite, + input::Error::StreamChunkRead => { + Error::InputStreamChunkRead + } + }), + }, + ); }, ); } Err(error) => { - // Apply callback function - callback(Err(match error { - output::Error::StreamWrite => Error::OutputStreamWrite, - })); - - // Close connection if callback act does not prevent that - self.close_async(cancelable, priority, |_| {}); // @TODO + callback( + self, + Err(match error { + output::Error::StreamWrite => Error::OutputStreamWrite, + }), + ); } }, ); @@ -79,14 +80,14 @@ impl Connection { /// Asynchronously close current connection /// /// Options: - /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html (`None::<&Cancellable>` by default) - /// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html (`Priority::DEFAULT` by default) - /// * `callback` function to apply on complete + /// * `cancellable` see [Cancellable](https://docs.gtk.org/gio/class.Cancellable.html) (`None::<&Cancellable>` by default) + /// * `priority` [Priority::DEFAULT](https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html) by default + /// * `callback` optional function to apply on complete or `None` to skip pub fn close_async( - self, + &self, cancelable: Option, priority: Option, - callback: impl FnOnce(Result<(), Error>) + 'static, + callback: Option) + 'static>, ) { self.connection.close_async( match priority { @@ -99,16 +100,21 @@ impl Connection { } .as_ref(), |result| { - callback(match result { - Ok(_) => Ok(()), - Err(_) => Err(Error::Close), - }) + if let Some(call) = callback { + call(match result { + Ok(_) => Ok(()), + Err(_) => Err(Error::Close), + }) + } }, ); } // Getters + /// Get reference to `gio::SocketConnection` + /// + /// https://docs.gtk.org/gio/class.SocketConnection.html pub fn connection(&self) -> &SocketConnection { &self.connection }