diff --git a/src/client.rs b/src/client.rs index c34d664..3158ff2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,10 +1,41 @@ -pub mod connection; pub mod error; pub mod response; pub mod socket; pub use error::Error; + pub use response::Response; pub use socket::Socket; -// @TODO +use gio::Cancellable; +use glib::{Priority, Uri}; + +/// High-level API to make async request, get `Response` and close connection. +pub fn request_async( + uri: Uri, + cancelable: Option, + priority: Option, + callback: impl FnOnce(Result) + 'static, +) { + // Create new socket connection + Socket::new().connect_async( + uri.clone(), + cancelable.clone(), + move |connect| match connect { + Ok(connection) => { + connection.request_async(uri, cancelable, priority, None, |request| { + callback(match request { + Ok(buffer) => match Response::from_utf8(&buffer) { + Ok(response) => Ok(response), + Err(_) => Err(Error::Response), + }, + Err(_) => Err(Error::Request), + }); + + //connection.close_async(cancelable, priority, |_| {}); // @TODO + }) + } + Err(_) => callback(Err(Error::Connection)), + }, + ); +} diff --git a/src/client/error.rs b/src/client/error.rs index 4cf662c..d14b8f1 100644 --- a/src/client/error.rs +++ b/src/client/error.rs @@ -1,8 +1,5 @@ pub enum Error { - Close, - Connect, - Input, - Output, + Connection, Response, - BufferOverflow, + Request, } diff --git a/src/client/socket.rs b/src/client/socket.rs index 1f9bf8f..1146cf1 100644 --- a/src/client/socket.rs +++ b/src/client/socket.rs @@ -1,23 +1,62 @@ -use gio::{prelude::SocketClientExt, SocketClient, SocketProtocol, TlsCertificateFlags}; +pub mod connection; +pub mod error; + +pub use connection::Connection; +pub use error::Error; + +pub const DEFAULT_PORT: u16 = 1965; + +use gio::{ + prelude::SocketClientExt, Cancellable, SocketClient, SocketProtocol, TlsCertificateFlags, +}; +use glib::Uri; pub struct Socket { - gobject: SocketClient, + client: SocketClient, } impl Socket { + // Constructors + /// Create new `gio::SocketClient` preset for Gemini Protocol pub fn new() -> Self { - let gobject = SocketClient::new(); + let client = SocketClient::new(); - gobject.set_protocol(SocketProtocol::Tcp); - gobject.set_tls_validation_flags(TlsCertificateFlags::INSECURE); - gobject.set_tls(true); + client.set_protocol(SocketProtocol::Tcp); + client.set_tls_validation_flags(TlsCertificateFlags::INSECURE); + client.set_tls(true); - Self { gobject } + Self { client } } + // Actions + pub fn connect_async( + &self, + uri: Uri, + cancelable: Option, + callback: impl FnOnce(Result) + 'static, + ) { + self.client.connect_to_uri_async( + uri.to_str().as_str(), + DEFAULT_PORT, + match cancelable.clone() { + Some(value) => Some(value), + None => None::, + } + .as_ref(), + |result| { + callback(match result { + Ok(connection) => Ok(Connection::new_from(connection)), + Err(_) => Err(Error::Connection), + }) + }, + ); + } + + // Getters + /// Return ref to `gio::SocketClient` GObject - pub fn gobject(&self) -> &SocketClient { - self.gobject.as_ref() + pub fn client(&self) -> &SocketClient { + self.client.as_ref() } } diff --git a/src/client/socket/connection.rs b/src/client/socket/connection.rs new file mode 100644 index 0000000..b9b6e2f --- /dev/null +++ b/src/client/socket/connection.rs @@ -0,0 +1,115 @@ +pub mod error; +pub mod input; +pub mod output; + +pub use error::Error; +pub use input::Input; +pub use output::Output; + +use gio::{prelude::IOStreamExt, Cancellable, SocketConnection}; +use glib::{gformat, Bytes, Priority, Uri}; + +pub struct Connection { + connection: SocketConnection, +} + +impl Connection { + // Constructors + + pub fn new_from(connection: SocketConnection) -> Self { + Self { connection } + } + + // Actions + + /// MIddle-level API to make single async socket request for current connection: + /// + /// 1. send `Uri` request to the ouput stream; + /// 2. write entire input stream into the new `Vec` buffer on success; + /// 3. close current connection if callback function does not prevent that action by return. + pub fn request_async( + self, + uri: Uri, + cancelable: Option, + priority: Option, + chunk: Option, + callback: impl FnOnce(Result, Error>) + 'static, + ) { + // Send request + Output::new_from_stream(self.connection.output_stream()).write_async( + &Bytes::from(gformat!("{}\r\n", uri.to_str()).as_bytes()), + cancelable.clone(), + priority, + move |output| match output { + Ok(_) => { + // Read response + Input::new_from_stream(self.connection.input_stream()).read_all_async( + cancelable.clone(), + priority, + chunk, + move |input| { + // Apply callback function + callback(match input { + Ok(buffer) => Ok(buffer.to_utf8()), + Err(error) => Err(match error { + input::Error::BufferOverflow => Error::InputBufferOverflow, + input::Error::BufferWrite => Error::InputBufferWrite, + input::Error::StreamChunkRead => Error::InputStreamChunkRead, + }), + }); + + // Close connection if callback act does not prevent that + self.close_async(cancelable, priority, |_| {}); // @TODO + }, + ); + } + Err(error) => { + // Apply callback function + callback(Err(match error { + output::Error::StreamWrite => Error::OutputStreamWrite, + })); + + // Close connection if callback act does not prevent that + self.close_async(cancelable, priority, |_| {}); // @TODO + } + }, + ); + } + + /// Asynchronously close current connection + /// + /// Options: + /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html (`None::<&Cancellable>` by default) + /// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html (`Priority::DEFAULT` by default) + /// * `callback` function to apply on complete + pub fn close_async( + self, + cancelable: Option, + priority: Option, + callback: impl FnOnce(Result<(), Error>) + 'static, + ) { + self.connection.close_async( + match priority { + Some(value) => value, + None => Priority::DEFAULT, + }, + match cancelable.clone() { + Some(value) => Some(value), + None => None::, + } + .as_ref(), + |result| { + callback(match result { + Ok(_) => Ok(()), + Err(_) => Err(Error::Close), + }) + }, + ); + } + + // Getters + + pub fn connection(&self) -> &SocketConnection { + &self.connection + } +} diff --git a/src/client/socket/connection/error.rs b/src/client/socket/connection/error.rs new file mode 100644 index 0000000..7405d6a --- /dev/null +++ b/src/client/socket/connection/error.rs @@ -0,0 +1,7 @@ +pub enum Error { + Close, + InputBufferOverflow, + InputBufferWrite, + InputStreamChunkRead, + OutputStreamWrite, +} diff --git a/src/client/socket/connection/input.rs b/src/client/socket/connection/input.rs new file mode 100644 index 0000000..16ba332 --- /dev/null +++ b/src/client/socket/connection/input.rs @@ -0,0 +1,155 @@ +pub mod buffer; +pub mod error; + +pub use buffer::Buffer; +pub use error::Error; + +use gio::{prelude::InputStreamExt, Cancellable, InputStream}; +use glib::Priority; + +pub const DEFAULT_READ_CHUNK: usize = 0x100; + +pub struct Input { + buffer: Buffer, + stream: InputStream, +} + +impl Input { + // Constructors + + /// Create new `Input` from `gio::InputStream` + /// + /// https://docs.gtk.org/gio/class.InputStream.html + pub fn new_from_stream(stream: InputStream) -> Self { + Self { + buffer: Buffer::new(), + stream, + } + } + + // Actions + + /// Synchronously read all bytes from `gio::InputStream` to `input::Buffer` + /// + /// Options: + /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html + /// * `chunk` max bytes to read per chunk (256 by default) + pub fn read_all( + mut self, + cancelable: Option, + chunk: Option, + ) -> Result { + loop { + // Continue bytes reading + match self.stream.read_bytes( + match chunk { + Some(value) => value, + None => DEFAULT_READ_CHUNK, + }, + match cancelable.clone() { + Some(value) => Some(value), + None => None::, + } + .as_ref(), + ) { + Ok(bytes) => { + // No bytes were read, end of stream + if bytes.len() == 0 { + return Ok(self.buffer); + } + + // Save chunk to buffer + match self.buffer.push(bytes) { + Ok(_) => continue, + Err(buffer::Error::Overflow) => return Err(Error::BufferOverflow), + Err(_) => return Err(Error::BufferWrite), + }; + } + Err(_) => return Err(Error::StreamChunkRead), + }; + } + } + + /// Asynchronously read all bytes from `gio::InputStream` to `input::Buffer`, + /// + /// applies `callback` function on last byte reading complete. + /// + /// Options: + /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html (`None::<&Cancellable>` by default) + /// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html (`Priority::DEFAULT` by default) + /// * `chunk` optional max bytes to read per chunk (`DEFAULT_READ_CHUNK` by default) + /// * `callback` user function to apply on async iteration complete or `None` to skip + pub fn read_all_async( + mut self, + cancelable: Option, + priority: Option, + chunk: Option, + callback: impl FnOnce(Result) + 'static, + ) { + // Continue bytes reading + self.stream.clone().read_bytes_async( + match chunk { + Some(value) => value, + None => DEFAULT_READ_CHUNK, + }, + match priority { + Some(value) => value, + None => Priority::DEFAULT, + }, + match cancelable.clone() { + Some(value) => Some(value), + None => None::, + } + .as_ref(), + move |result| { + match result { + Ok(bytes) => { + // No bytes were read, end of stream + if bytes.len() == 0 { + return callback(Ok(self.buffer)); + } + + // Save chunk to buffer + match self.buffer.push(bytes) { + Err(buffer::Error::Overflow) => { + return callback(Err(Error::BufferOverflow)) + } + + // Other errors related to write issues @TODO test + Err(_) => return callback(Err(Error::BufferWrite)), + + // Async function, nothing to return yet + _ => (), + }; + + // Continue bytes reading... + self.read_all_async(cancelable, priority, chunk, callback); + } + Err(_) => callback(Err(Error::StreamChunkRead)), + } + }, + ); + } + + // Setters + + pub fn set_buffer(&mut self, buffer: Buffer) { + self.buffer = buffer; + } + + pub fn set_stream(&mut self, stream: InputStream) { + self.stream = stream; + } + + // Getters + + /// Get reference to `Buffer` + pub fn buffer(&self) -> &Buffer { + &self.buffer + } + + /// Get reference to `gio::InputStream` + pub fn stream(&self) -> &InputStream { + &self.stream + } +} diff --git a/src/client/socket/connection/input/buffer.rs b/src/client/socket/connection/input/buffer.rs new file mode 100644 index 0000000..b9264c7 --- /dev/null +++ b/src/client/socket/connection/input/buffer.rs @@ -0,0 +1,87 @@ +pub mod error; +pub use error::Error; + +use glib::Bytes; + +pub const DEFAULT_CAPACITY: usize = 0x400; +pub const DEFAULT_MAX_SIZE: usize = 0xfffff; + +pub struct Buffer { + bytes: Vec, + max_size: usize, +} + +impl Buffer { + // Constructors + + /// Create new dynamically allocated `Buffer` with default `capacity` and `max_size` limit + pub fn new() -> Self { + Self::new_with_options(Some(DEFAULT_CAPACITY), Some(DEFAULT_MAX_SIZE)) + } + + /// Create new dynamically allocated `Buffer` with options + /// + /// Options: + /// * `capacity` initial bytes request to reduce extra memory overwrites (1024 by default) + /// * `max_size` max bytes to prevent memory overflow (1M by default) + pub fn new_with_options(capacity: Option, max_size: Option) -> Self { + Self { + bytes: Vec::with_capacity(match capacity { + Some(value) => value, + None => DEFAULT_CAPACITY, + }), + max_size: match max_size { + Some(value) => value, + None => DEFAULT_MAX_SIZE, + }, + } + } + + // Setters + + /// Set new `Buffer.max_size` value to prevent memory overflow + /// + /// Use `DEFAULT_MAX_SIZE` if `None` given. + pub fn set_max_size(&mut self, value: Option) { + self.max_size = match value { + Some(size) => size, + None => DEFAULT_MAX_SIZE, + } + } + + // Actions + + /// Push `glib::Bytes` to `Buffer.bytes` + /// + /// Return `Error::Overflow` on `Buffer.max_size` reached. + pub fn push(&mut self, bytes: Bytes) -> Result { + // Calculate new size value + let total = self.bytes.len() + bytes.len(); + + // Validate overflow + if total > self.max_size { + return Err(Error::Overflow); + } + + // Success + self.bytes.push(bytes); + + Ok(total) + } + + // Getters + + /// Get reference to bytes collected + pub fn bytes(&self) -> &Vec { + &self.bytes + } + + /// Return copy of bytes as UTF-8 vector + pub fn to_utf8(&self) -> Vec { + self.bytes + .iter() + .flat_map(|byte| byte.iter()) + .cloned() + .collect() + } +} diff --git a/src/client/socket/connection/input/buffer/error.rs b/src/client/socket/connection/input/buffer/error.rs new file mode 100644 index 0000000..68dd893 --- /dev/null +++ b/src/client/socket/connection/input/buffer/error.rs @@ -0,0 +1,4 @@ +pub enum Error { + Overflow, + StreamChunkRead, +} diff --git a/src/client/socket/connection/input/error.rs b/src/client/socket/connection/input/error.rs new file mode 100644 index 0000000..1c383fb --- /dev/null +++ b/src/client/socket/connection/input/error.rs @@ -0,0 +1,5 @@ +pub enum Error { + BufferOverflow, + BufferWrite, + StreamChunkRead, +} diff --git a/src/client/socket/connection/output.rs b/src/client/socket/connection/output.rs new file mode 100644 index 0000000..d6f1821 --- /dev/null +++ b/src/client/socket/connection/output.rs @@ -0,0 +1,71 @@ +pub mod error; + +pub use error::Error; + +use gio::{prelude::OutputStreamExt, Cancellable, OutputStream}; +use glib::{Bytes, Priority}; + +pub struct Output { + stream: OutputStream, +} + +impl Output { + // Constructors + + /// Create new `Output` from `gio::OutputStream` + /// + /// https://docs.gtk.org/gio/class.OutputStream.html + pub fn new_from_stream(stream: OutputStream) -> Self { + Self { stream } + } + + // Actions + + /// Asynchronously write all bytes to `gio::OutputStream`, + /// + /// applies `callback` function on last byte sent. + /// + /// Options: + /// * `cancellable` https://docs.gtk.org/gio/class.Cancellable.html (`None::<&Cancellable>` by default) + /// * `priority` e.g. https://docs.gtk.org/glib/const.PRIORITY_DEFAULT.html (`Priority::DEFAULT` by default) + /// * `callback` user function to apply on complete + pub fn write_async( + &self, + bytes: &Bytes, + cancelable: Option, + priority: Option, + callback: impl FnOnce(Result) + 'static, + ) { + self.stream.write_bytes_async( + bytes, + match priority { + Some(value) => value, + None => Priority::DEFAULT, + }, + match cancelable.clone() { + Some(value) => Some(value), + None => None::, + } + .as_ref(), + move |result| { + callback(match result { + Ok(size) => Ok(size), + Err(_) => Err(Error::StreamWrite), + }) + }, + ); + } + + // Setters + + pub fn set_stream(&mut self, stream: OutputStream) { + self.stream = stream; + } + + // Getters + + /// Get reference to `gio::OutputStream` + pub fn stream(&self) -> &OutputStream { + &self.stream + } +} diff --git a/src/client/socket/connection/output/error.rs b/src/client/socket/connection/output/error.rs new file mode 100644 index 0000000..27547a1 --- /dev/null +++ b/src/client/socket/connection/output/error.rs @@ -0,0 +1,3 @@ +pub enum Error { + StreamWrite, +} diff --git a/src/client/socket/error.rs b/src/client/socket/error.rs new file mode 100644 index 0000000..1863964 --- /dev/null +++ b/src/client/socket/error.rs @@ -0,0 +1,3 @@ +pub enum Error { + Connection, +}