remove aquatic_common_tcp crate, move contents into aquatic_http

It doesn't make a lot of sense to make a separate crate for
the few things here. I don't really want tight coupling between
the crates anyway, since it impedes making changes in them
and makes understanding them more difficult.
This commit is contained in:
Joakim Frostegård 2020-07-21 23:01:34 +02:00
parent 32402a4dca
commit f1f708465a
15 changed files with 117 additions and 162 deletions

View file

@ -17,7 +17,6 @@ path = "src/bin/main.rs"
anyhow = "1"
aquatic_cli_helpers = { path = "../aquatic_cli_helpers" }
aquatic_common = { path = "../aquatic_common" }
aquatic_common_tcp = { path = "../aquatic_common_tcp" }
aquatic_http_protocol = { path = "../aquatic_http_protocol" }
either = "1"
flume = "0.7"
@ -35,6 +34,7 @@ rand = { version = "0.7", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
simplelog = "0.8"
smartstring = "0.2"
socket2 = { version = "0.3", features = ["reuseport"] }
[dev-dependencies]
quickcheck = "0.9"

View file

@ -169,4 +169,4 @@ impl ResponseChannelSender {
pub type SocketWorkerStatus = Option<Result<(), String>>;
pub type SocketWorkerStatuses = Arc<Mutex<Vec<SocketWorkerStatus>>>;
pub type SocketWorkerStatuses = Arc<Mutex<Vec<SocketWorkerStatus>>>;

View file

@ -2,7 +2,24 @@ use std::net::SocketAddr;
use serde::{Serialize, Deserialize};
pub use aquatic_common_tcp::config::*;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
Off,
Error,
Warn,
Info,
Debug,
Trace
}
impl Default for LogLevel {
fn default() -> Self {
Self::Error
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
@ -21,6 +38,15 @@ pub struct Config {
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct TlsConfig {
pub use_tls: bool,
pub tls_pkcs12_path: String,
pub tls_pkcs12_password: String,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct NetworkConfig {
@ -46,6 +72,41 @@ pub struct ProtocolConfig {
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct HandlerConfig {
/// Maximum number of requests to receive from channel before locking
/// mutex and starting work
pub max_requests_per_iter: usize,
pub channel_recv_timeout_microseconds: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct CleaningConfig {
/// Clean peers this often (seconds)
pub interval: u64,
/// Remove peers that haven't announced for this long (seconds)
pub max_peer_age: u64,
/// Remove connections that are older than this (seconds)
pub max_connection_age: u64,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(default)]
pub struct PrivilegeConfig {
/// Chroot and switch user after binding to sockets
pub drop_privileges: bool,
/// Chroot to this path
pub chroot_path: String,
/// User to switch to after chrooting
pub user: String,
}
impl Default for Config {
fn default() -> Self {
Self {
@ -82,4 +143,47 @@ impl Default for ProtocolConfig {
peer_announce_interval: 120,
}
}
}
}
impl Default for HandlerConfig {
fn default() -> Self {
Self {
max_requests_per_iter: 10000,
channel_recv_timeout_microseconds: 200,
}
}
}
impl Default for CleaningConfig {
fn default() -> Self {
Self {
interval: 30,
max_peer_age: 180,
max_connection_age: 180,
}
}
}
impl Default for PrivilegeConfig {
fn default() -> Self {
Self {
drop_privileges: false,
chroot_path: ".".to_string(),
user: "nobody".to_string(),
}
}
}
impl Default for TlsConfig {
fn default() -> Self {
Self {
use_tls: false,
tls_pkcs12_path: "".into(),
tls_pkcs12_password: "".into(),
}
}
}

View file

@ -6,8 +6,6 @@ use anyhow::Context;
use parking_lot::Mutex;
use privdrop::PrivDrop;
use aquatic_common_tcp::network::utils::create_tls_acceptor;
pub mod common;
pub mod config;
pub mod handler;
@ -16,6 +14,7 @@ pub mod tasks;
use common::*;
use config::Config;
use network::utils::create_tls_acceptor;
pub fn run(config: Config) -> anyhow::Result<()> {

View file

@ -8,11 +8,12 @@ use mio::Token;
use mio::net::TcpStream;
use native_tls::{TlsAcceptor, MidHandshakeTlsStream};
use aquatic_common_tcp::network::stream::Stream;
use aquatic_http_protocol::request::{Request, RequestParseError};
use crate::common::*;
use super::stream::Stream;
#[derive(Debug)]
pub enum RequestReadError {

View file

@ -1,5 +1,3 @@
pub mod connection;
use std::time::{Duration, Instant};
use std::io::ErrorKind;
use std::sync::Arc;
@ -11,13 +9,17 @@ use native_tls::TlsAcceptor;
use mio::{Events, Poll, Interest, Token};
use mio::net::TcpListener;
use aquatic_common_tcp::network::utils::create_listener;
use aquatic_http_protocol::response::*;
use crate::common::*;
use crate::config::Config;
pub mod connection;
pub mod stream;
pub mod utils;
use connection::*;
use utils::*;
pub fn run_socket_worker(

View file

@ -0,0 +1,76 @@
use std::net::{SocketAddr};
use std::io::{Read, Write};
use mio::net::TcpStream;
use native_tls::TlsStream;
pub enum Stream {
TcpStream(TcpStream),
TlsStream(TlsStream<TcpStream>),
}
impl Stream {
#[inline]
pub fn get_peer_addr(&self) -> SocketAddr {
match self {
Self::TcpStream(stream) => stream.peer_addr().unwrap(),
Self::TlsStream(stream) => stream.get_ref().peer_addr().unwrap(),
}
}
}
impl Read for Stream {
#[inline]
fn read(&mut self, buf: &mut [u8]) -> Result<usize, ::std::io::Error> {
match self {
Self::TcpStream(stream) => stream.read(buf),
Self::TlsStream(stream) => stream.read(buf),
}
}
/// Not used but provided for completeness
#[inline]
fn read_vectored(
&mut self,
bufs: &mut [::std::io::IoSliceMut<'_>]
) -> ::std::io::Result<usize> {
match self {
Self::TcpStream(stream) => stream.read_vectored(bufs),
Self::TlsStream(stream) => stream.read_vectored(bufs),
}
}
}
impl Write for Stream {
#[inline]
fn write(&mut self, buf: &[u8]) -> ::std::io::Result<usize> {
match self {
Self::TcpStream(stream) => stream.write(buf),
Self::TlsStream(stream) => stream.write(buf),
}
}
/// Not used but provided for completeness
#[inline]
fn write_vectored(
&mut self,
bufs: &[::std::io::IoSlice<'_>]
) -> ::std::io::Result<usize> {
match self {
Self::TcpStream(stream) => stream.write_vectored(bufs),
Self::TlsStream(stream) => stream.write_vectored(bufs),
}
}
#[inline]
fn flush(&mut self) -> ::std::io::Result<()> {
match self {
Self::TcpStream(stream) => stream.flush(),
Self::TlsStream(stream) => stream.flush(),
}
}
}

View file

@ -0,0 +1,64 @@
use std::fs::File;
use std::io::Read;
use std::net::SocketAddr;
use anyhow::Context;
use native_tls::{Identity, TlsAcceptor};
use socket2::{Socket, Domain, Type, Protocol};
use crate::config::TlsConfig;
pub fn create_tls_acceptor(
config: &TlsConfig,
) -> anyhow::Result<Option<TlsAcceptor>> {
if config.use_tls {
let mut identity_bytes = Vec::new();
let mut file = File::open(&config.tls_pkcs12_path)
.context("Couldn't open pkcs12 identity file")?;
file.read_to_end(&mut identity_bytes)
.context("Couldn't read pkcs12 identity file")?;
let identity = Identity::from_pkcs12(
&identity_bytes[..],
&config.tls_pkcs12_password
).context("Couldn't parse pkcs12 identity file")?;
let acceptor = TlsAcceptor::new(identity)
.context("Couldn't create TlsAcceptor from pkcs12 identity")?;
Ok(Some(acceptor))
} else {
Ok(None)
}
}
pub fn create_listener(
address: SocketAddr,
ipv6_only: bool
) -> ::anyhow::Result<::std::net::TcpListener> {
let builder = if address.is_ipv4(){
Socket::new(Domain::ipv4(), Type::stream(), Some(Protocol::tcp()))
} else {
Socket::new(Domain::ipv6(), Type::stream(), Some(Protocol::tcp()))
}.context("Couldn't create socket2::Socket")?;
if ipv6_only {
builder.set_only_v6(true)
.context("Couldn't put socket in ipv6 only mode")?
}
builder.set_nonblocking(true)
.context("Couldn't put socket in non-blocking mode")?;
builder.set_reuse_port(true)
.context("Couldn't put socket in reuse_port mode")?;
builder.bind(&address.into()).with_context(||
format!("Couldn't bind socket to address {}", address)
)?;
builder.listen(128)
.context("Couldn't listen for connections on socket")?;
Ok(builder.into_tcp_listener())
}