diff --git a/src/error.rs b/src/error.rs index 119d58ca0..79bc8b664 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,6 +7,7 @@ pub enum Error { NotEnoughHelpers, NotFound, TooManyHelpers, + DeadThread(std::sync::mpsc::SendError), #[cfg(feature = "cli")] Hex(hex::FromHexError), @@ -44,6 +45,8 @@ impl std::fmt::Display for Error { } forward_errors! { + std::sync::mpsc::SendError => DeadThread, + #[cfg(feature = "cli")] hex::FromHexError => Hex, std::io::Error => Io, diff --git a/src/lib.rs b/src/lib.rs index 29e9689fb..4eea8635b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod cli; pub mod error; pub mod helpers; +pub mod net; pub mod report; pub mod threshold; pub mod user; diff --git a/src/net/mod.rs b/src/net/mod.rs new file mode 100644 index 000000000..2839b23e0 --- /dev/null +++ b/src/net/mod.rs @@ -0,0 +1,5 @@ +pub use self::server::Server; +pub use self::thread::{Message, Thread}; + +mod server; +mod thread; diff --git a/src/net/server.rs b/src/net/server.rs new file mode 100644 index 000000000..8e2bf74d5 --- /dev/null +++ b/src/net/server.rs @@ -0,0 +1,48 @@ +use crate::net::Thread; + +pub struct Server { + connection_handler_thread: Thread, +} + +impl Server { + #[must_use] + pub fn new() -> Server { + Server { + connection_handler_thread: Thread::new(), + } + } + + pub fn start(&self) { + self.start_connection_handler(); + } + + /// Spawns a new thread to handle incoming connections. + /// # Panics + /// If the thread could not be spawned. + fn start_connection_handler(&self) { + if let Err(e) = self.connection_handler_thread.execute(|| { + // listen + // read + // write + }) { + panic!("Could not start the connection handler: {}", e); + } + } +} + +impl Default for Server { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::Server; + + #[test] + fn no_panic() { + let server = Server::new(); + server.start(); + } +} diff --git a/src/net/thread.rs b/src/net/thread.rs new file mode 100644 index 000000000..87640545b --- /dev/null +++ b/src/net/thread.rs @@ -0,0 +1,117 @@ +use log::{error, info}; +use std::fmt::{Debug, Formatter}; +use std::sync::{mpsc, Arc, Mutex}; +use std::thread; + +use crate::error::Res; + +pub struct Thread { + worker: Worker, + sender: mpsc::Sender, +} + +struct Worker { + thread: Option>, +} + +type Job = Box; + +pub enum Message { + NewJob(Job), + Terminate, +} + +impl Thread { + #[must_use] + pub fn new() -> Thread { + let (sender, receiver) = mpsc::channel(); + let receiver = Arc::new(Mutex::new(receiver)); + + Thread { + worker: Worker::spawn(receiver), + sender, + } + } + + /// Sends a function to the running thread for it to be executed. + /// # Errors + /// If the thread has been terminated. + pub fn execute(&self, f: F) -> Res<()> + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + + self.sender.send(Message::NewJob(job))?; + + Ok(()) + } +} + +impl Default for Thread { + fn default() -> Self { + Self::new() + } +} + +impl Drop for Thread { + fn drop(&mut self) { + info!("Terminating thread {:?}", self.worker); + if let Ok(()) = self.sender.send(Message::Terminate) { + if let Some(thread) = self.worker.thread.take() { + thread.join().unwrap(); + } + } + } +} + +impl Worker { + fn spawn(receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || loop { + let message: Message; + + // Receive a message and release the lock before executing anything else. + if let Ok(receiver) = receiver.lock() { + if let Ok(msg) = receiver.recv() { + message = msg; + } else { + error!("Sender channel is closed."); + break; + } + } else { + error!("Failed to lock the mutex."); + break; + } + + match message { + Message::NewJob(job) => { + info!( + "Worker {:?} received a job; executing.", + thread::current().id() + ); + job(); + } + Message::Terminate => { + info!("Worker {:?} is shutting down.", thread::current().id()); + break; + } + } + }); + + info!("Spawned worker {:?}.", thread.thread().id()); + + Worker { + thread: Some(thread), + } + } +} + +impl Debug for Worker { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + if let Some(thread) = &self.thread { + write!(f, "{:?}", thread.thread().id()) + } else { + write!(f, "(dead thread)") + } + } +}