require Priority, Cancellable arguments, remove extra members

This commit is contained in:
yggverse 2024-12-01 04:35:19 +02:00
parent 2df9f36599
commit 8f910672e2
7 changed files with 30 additions and 102 deletions

View file

@ -71,8 +71,8 @@ impl Client {
pub fn request_async( pub fn request_async(
&self, &self,
uri: Uri, uri: Uri,
priority: Option<Priority>, priority: Priority,
cancellable: Option<Cancellable>, cancellable: Cancellable,
certificate: Option<TlsCertificate>, certificate: Option<TlsCertificate>,
callback: impl Fn(Result<Response, Error>) + 'static, callback: impl Fn(Result<Response, Error>) + 'static,
) { ) {
@ -86,20 +86,12 @@ impl Client {
match crate::gio::network_address::from_uri(&uri, crate::DEFAULT_PORT) { match crate::gio::network_address::from_uri(&uri, crate::DEFAULT_PORT) {
Ok(network_address) => self.socket.connect_async( Ok(network_address) => self.socket.connect_async(
&network_address.clone(), &network_address.clone(),
match cancellable { Some(&cancellable.clone()),
Some(ref cancellable) => Some(cancellable.clone()),
None => None::<Cancellable>,
}
.as_ref(),
move |result| match result { move |result| match result {
Ok(socket_connection) => { Ok(socket_connection) => {
// Wrap required connection dependencies into the struct holder // Wrap required connection dependencies into the struct holder
match Connection::new( match Connection::new(socket_connection, certificate, Some(network_address))
socket_connection, {
certificate,
Some(network_address),
cancellable.clone(),
) {
Ok(connection) => { Ok(connection) => {
// Begin new request // Begin new request
request_async( request_async(
@ -126,21 +118,14 @@ impl Client {
pub fn request_async( pub fn request_async(
connection: Connection, connection: Connection,
query: String, query: String,
priority: Option<Priority>, priority: Priority,
cancellable: Option<Cancellable>, cancellable: Cancellable,
callback: impl Fn(Result<Response, Error>) + 'static, callback: impl Fn(Result<Response, Error>) + 'static,
) { ) {
connection.stream().output_stream().write_bytes_async( connection.stream().output_stream().write_bytes_async(
&Bytes::from(format!("{query}\r\n").as_bytes()), &Bytes::from(format!("{query}\r\n").as_bytes()),
match priority { priority,
Some(priority) => priority, Some(&cancellable.clone()),
None => Priority::DEFAULT,
},
match cancellable {
Some(ref cancellable) => Some(cancellable.clone()),
None => None::<Cancellable>,
}
.as_ref(),
move |result| match result { move |result| match result {
Ok(_) => { Ok(_) => {
Response::from_request_async(connection, priority, cancellable, move |result| { Response::from_request_async(connection, priority, cancellable, move |result| {

View file

@ -2,13 +2,12 @@ pub mod error;
pub use error::Error; pub use error::Error;
use gio::{ use gio::{
prelude::{CancellableExt, IOStreamExt, TlsConnectionExt}, prelude::TlsConnectionExt, IOStream, NetworkAddress, SocketConnection, TlsCertificate,
Cancellable, IOStream, NetworkAddress, SocketConnection, TlsCertificate, TlsClientConnection, TlsClientConnection,
}; };
use glib::object::{Cast, IsA, ObjectExt}; use glib::object::{Cast, IsA, ObjectExt};
pub struct Connection { pub struct Connection {
pub cancellable: Option<Cancellable>,
pub socket_connection: SocketConnection, pub socket_connection: SocketConnection,
pub tls_client_connection: TlsClientConnection, pub tls_client_connection: TlsClientConnection,
} }
@ -21,17 +20,11 @@ impl Connection {
socket_connection: SocketConnection, socket_connection: SocketConnection,
certificate: Option<TlsCertificate>, certificate: Option<TlsCertificate>,
server_identity: Option<NetworkAddress>, server_identity: Option<NetworkAddress>,
cancellable: Option<Cancellable>,
) -> Result<Self, Error> { ) -> Result<Self, Error> {
if socket_connection.is_closed() {
return Err(Error::Closed);
}
Ok(Self { Ok(Self {
cancellable,
socket_connection: socket_connection.clone(), socket_connection: socket_connection.clone(),
tls_client_connection: match TlsClientConnection::new( tls_client_connection: match TlsClientConnection::new(
&socket_connection.clone(), &socket_connection,
server_identity.as_ref(), server_identity.as_ref(),
) { ) {
Ok(tls_client_connection) => { Ok(tls_client_connection) => {
@ -58,38 +51,12 @@ impl Connection {
}) })
} }
// Actions
/// Apply `cancel` action to `Self` [Cancellable](https://docs.gtk.org/gio/method.Cancellable.cancel.html)
/// * return `Error` on `Cancellable` not found
pub fn cancel(&self) -> Result<(), Error> {
match self.cancellable {
Some(ref cancellable) => {
cancellable.cancel();
Ok(())
}
None => Err(Error::Cancel),
}
}
/// Close owned [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html)
/// * return `Ok(false)` if `Cancellable` not defined
pub fn close(&self) -> Result<bool, Error> {
match self.cancellable {
Some(ref cancellable) => match self.socket_connection.close(Some(cancellable)) {
Ok(()) => Ok(true),
Err(e) => Err(Error::SocketConnection(e)),
},
None => Ok(false),
}
}
// Getters // Getters
/// Get [IOStream](https://docs.gtk.org/gio/class.IOStream.html) /// Get [IOStream](https://docs.gtk.org/gio/class.IOStream.html)
/// for [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html) /// for [SocketConnection](https://docs.gtk.org/gio/class.SocketConnection.html)
/// or [TlsClientConnection](https://docs.gtk.org/gio/iface.TlsClientConnection.html) (if available) /// or [TlsClientConnection](https://docs.gtk.org/gio/iface.TlsClientConnection.html) (if available)
/// * compatible with user (certificate) and guest (certificate-less) connection types /// * compatible with user (certificate) and guest (certificate-less) connection type
/// * useful also to keep `Connection` active in async I/O context /// * useful also to keep `Connection` active in async I/O context
pub fn stream(&self) -> impl IsA<IOStream> { pub fn stream(&self) -> impl IsA<IOStream> {
// * do not replace with `tls_client_connection.base_io_stream()` // * do not replace with `tls_client_connection.base_io_stream()`

View file

@ -2,24 +2,12 @@ use std::fmt::{Display, Formatter, Result};
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
Cancel,
Closed,
Rehandshake(glib::Error),
SocketConnection(glib::Error),
TlsClientConnection(glib::Error), TlsClientConnection(glib::Error),
} }
impl Display for Error { impl Display for Error {
fn fmt(&self, f: &mut Formatter) -> Result { fn fmt(&self, f: &mut Formatter) -> Result {
match self { match self {
Self::Cancel => write!(f, "Cancellable not found"),
Self::Closed => write!(f, "Connection closed"),
Self::Rehandshake(e) => {
write!(f, "Rehandshake error: {e}")
}
Self::SocketConnection(e) => {
write!(f, "Socket connection error: {e}")
}
Self::TlsClientConnection(e) => { Self::TlsClientConnection(e) => {
write!(f, "TLS client connection error: {e}") write!(f, "TLS client connection error: {e}")
} }

View file

@ -21,8 +21,8 @@ impl Response {
pub fn from_request_async( pub fn from_request_async(
connection: Connection, connection: Connection,
priority: Option<Priority>, priority: Priority,
cancellable: Option<Cancellable>, cancellable: Cancellable,
callback: impl FnOnce(Result<Self, Error>) + 'static, callback: impl FnOnce(Result<Self, Error>) + 'static,
) { ) {
Meta::from_stream_async(connection.stream(), priority, cancellable, |result| { Meta::from_stream_async(connection.stream(), priority, cancellable, |result| {

View file

@ -51,21 +51,15 @@ impl Text {
/// Asynchronously create new `Self` from [IOStream](https://docs.gtk.org/gio/class.IOStream.html) /// Asynchronously create new `Self` from [IOStream](https://docs.gtk.org/gio/class.IOStream.html)
pub fn from_stream_async( pub fn from_stream_async(
stream: impl IsA<IOStream>, stream: impl IsA<IOStream>,
priority: Option<Priority>, priority: Priority,
cancellable: Option<Cancellable>, cancellable: Cancellable,
on_complete: impl FnOnce(Result<Self, Error>) + 'static, on_complete: impl FnOnce(Result<Self, Error>) + 'static,
) { ) {
read_all_from_stream_async( read_all_from_stream_async(
Vec::with_capacity(BUFFER_CAPACITY), Vec::with_capacity(BUFFER_CAPACITY),
stream, stream,
match cancellable { cancellable,
Some(value) => Some(value), priority,
None => None::<Cancellable>,
},
match priority {
Some(value) => value,
None => Priority::DEFAULT,
},
|result| match result { |result| match result {
Ok(buffer) => on_complete(Self::from_utf8(&buffer)), Ok(buffer) => on_complete(Self::from_utf8(&buffer)),
Err(e) => on_complete(Err(e)), Err(e) => on_complete(Err(e)),
@ -83,14 +77,14 @@ impl Text {
pub fn read_all_from_stream_async( pub fn read_all_from_stream_async(
mut buffer: Vec<u8>, mut buffer: Vec<u8>,
stream: impl IsA<IOStream>, stream: impl IsA<IOStream>,
cancelable: Option<Cancellable>, cancelable: Cancellable,
priority: Priority, priority: Priority,
callback: impl FnOnce(Result<Vec<u8>, Error>) + 'static, callback: impl FnOnce(Result<Vec<u8>, Error>) + 'static,
) { ) {
stream.input_stream().read_bytes_async( stream.input_stream().read_bytes_async(
BUFFER_CAPACITY, BUFFER_CAPACITY,
priority, priority,
cancelable.clone().as_ref(), Some(&cancelable.clone()),
move |result| match result { move |result| match result {
Ok(bytes) => { Ok(bytes) => {
// No bytes were read, end of stream // No bytes were read, end of stream

View file

@ -78,21 +78,15 @@ impl Meta {
/// Asynchronously create new `Self` from [IOStream](https://docs.gtk.org/gio/class.IOStream.html) /// Asynchronously create new `Self` from [IOStream](https://docs.gtk.org/gio/class.IOStream.html)
pub fn from_stream_async( pub fn from_stream_async(
stream: impl IsA<IOStream>, stream: impl IsA<IOStream>,
priority: Option<Priority>, priority: Priority,
cancellable: Option<Cancellable>, cancellable: Cancellable,
on_complete: impl FnOnce(Result<Self, Error>) + 'static, on_complete: impl FnOnce(Result<Self, Error>) + 'static,
) { ) {
read_from_stream_async( read_from_stream_async(
Vec::with_capacity(MAX_LEN), Vec::with_capacity(MAX_LEN),
stream, stream,
match cancellable { cancellable,
Some(value) => Some(value), priority,
None => None::<Cancellable>,
},
match priority {
Some(value) => value,
None => Priority::DEFAULT,
},
|result| match result { |result| match result {
Ok(buffer) => on_complete(Self::from_utf8(&buffer)), Ok(buffer) => on_complete(Self::from_utf8(&buffer)),
Err(e) => on_complete(Err(e)), Err(e) => on_complete(Err(e)),
@ -110,14 +104,14 @@ impl Meta {
pub fn read_from_stream_async( pub fn read_from_stream_async(
mut buffer: Vec<u8>, mut buffer: Vec<u8>,
stream: impl IsA<IOStream>, stream: impl IsA<IOStream>,
cancellable: Option<Cancellable>, cancellable: Cancellable,
priority: Priority, priority: Priority,
on_complete: impl FnOnce(Result<Vec<u8>, Error>) + 'static, on_complete: impl FnOnce(Result<Vec<u8>, Error>) + 'static,
) { ) {
stream.input_stream().read_async( stream.input_stream().read_async(
vec![0], vec![0],
priority, priority,
cancellable.clone().as_ref(), Some(&cancellable.clone()),
move |result| match result { move |result| match result {
Ok((mut bytes, size)) => { Ok((mut bytes, size)) => {
// Expect valid header length // Expect valid header length

View file

@ -15,7 +15,7 @@ use glib::{object::IsA, Bytes, Priority};
/// * calculate bytes processed on chunk load /// * calculate bytes processed on chunk load
pub fn from_stream_async( pub fn from_stream_async(
base_io_stream: impl IsA<IOStream>, base_io_stream: impl IsA<IOStream>,
cancelable: Option<Cancellable>, cancelable: Cancellable,
priority: Priority, priority: Priority,
bytes_in_chunk: usize, bytes_in_chunk: usize,
bytes_total_limit: usize, bytes_total_limit: usize,
@ -38,7 +38,7 @@ pub fn from_stream_async(
pub fn read_all_from_stream_async( pub fn read_all_from_stream_async(
memory_input_stream: MemoryInputStream, memory_input_stream: MemoryInputStream,
base_io_stream: impl IsA<IOStream>, base_io_stream: impl IsA<IOStream>,
cancelable: Option<Cancellable>, cancelable: Cancellable,
priority: Priority, priority: Priority,
bytes: (usize, usize, usize), bytes: (usize, usize, usize),
callback: ( callback: (
@ -52,7 +52,7 @@ pub fn read_all_from_stream_async(
base_io_stream.input_stream().read_bytes_async( base_io_stream.input_stream().read_bytes_async(
bytes_in_chunk, bytes_in_chunk,
priority, priority,
cancelable.clone().as_ref(), Some(&cancelable.clone()),
move |result| match result { move |result| match result {
Ok(bytes) => { Ok(bytes) => {
// Update bytes total // Update bytes total