From 9d22c45725c304ba97298908d9446b7a3146aced Mon Sep 17 00:00:00 2001 From: Taiki Yamaguchi Date: Tue, 29 Mar 2022 01:01:08 +0800 Subject: [PATCH 1/3] Thread module to be used for server connection handler --- src/lib.rs | 1 + src/net/mod.rs | 5 +++ src/net/server.rs | 46 +++++++++++++++++++++++++ src/net/thread.rs | 85 +++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+) create mode 100644 src/net/mod.rs create mode 100644 src/net/server.rs create mode 100644 src/net/thread.rs diff --git a/src/lib.rs b/src/lib.rs index 29e9689fb..4eea8635b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ pub mod cli; pub mod error; pub mod helpers; +pub mod net; pub mod report; pub mod threshold; pub mod user; diff --git a/src/net/mod.rs b/src/net/mod.rs new file mode 100644 index 000000000..9fb820e9a --- /dev/null +++ b/src/net/mod.rs @@ -0,0 +1,5 @@ +pub use self::server::Server; +pub use self::thread::Thread; + +mod server; +mod thread; diff --git a/src/net/server.rs b/src/net/server.rs new file mode 100644 index 000000000..5f81f7c98 --- /dev/null +++ b/src/net/server.rs @@ -0,0 +1,46 @@ +use crate::net::Thread; + +pub struct Server { + connection_handler_thread: Thread, +} + +impl Server { + #[must_use] + pub fn new() -> Server { + Server { + connection_handler_thread: Thread::new(), + } + } + + pub fn start(&self) { + self.start_connection_handler(); + } + + fn start_connection_handler(&self) { + self.connection_handler_thread.execute(|| { + // listen + // read + // write + }); + } +} + +impl Default for Server { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::Server; + use std::thread; + use std::time; + + #[test] + fn test_no_panic() { + let server = Server::new(); + server.start(); + thread::sleep(time::Duration::from_millis(500)); + } +} diff --git a/src/net/thread.rs b/src/net/thread.rs new file mode 100644 index 000000000..994860c57 --- /dev/null +++ b/src/net/thread.rs @@ -0,0 +1,85 @@ +use std::sync::{mpsc, Arc, Mutex}; +use std::thread; + +pub struct Thread { + worker: Worker, + sender: mpsc::Sender, +} + +pub struct Worker { + id: usize, + thread: Option>, +} + +type Job = Box; + +pub enum Message { + NewJob(Job), + Terminate, +} + +impl Thread { + #[must_use] + pub fn new() -> Thread { + let (sender, receiver) = mpsc::channel(); + let receiver = Arc::new(Mutex::new(receiver)); + + Thread { + worker: Worker::new(0, receiver), + sender, + } + } + + /// Sends a function to the running thread for it to be executed. + /// # Panics + /// If the thread has been terminated. + pub fn execute(&self, f: F) + where + F: FnOnce() + Send + 'static, + { + let job = Box::new(f); + + self.sender.send(Message::NewJob(job)).unwrap(); + } +} + +impl Default for Thread { + fn default() -> Self { + Self::new() + } +} + +impl Drop for Thread { + fn drop(&mut self) { + println!("Terminating thread {}", self.worker.id); + self.sender.send(Message::Terminate).unwrap(); + + if let Some(thread) = self.worker.thread.take() { + thread.join().unwrap(); + } + } +} + +impl Worker { + fn new(id: usize, receiver: Arc>>) -> Worker { + let thread = thread::spawn(move || loop { + let message = receiver.lock().unwrap().recv().unwrap(); + + match message { + Message::NewJob(job) => { + println!("Worker {} received a job; executing.", id); + job(); + } + Message::Terminate => { + println!("Worker {} is shutting down.", id); + break; + } + } + }); + + Worker { + id, + thread: Some(thread), + } + } +} From dd845058352d26b2d662a4f97b3b8426575f7ce8 Mon Sep 17 00:00:00 2001 From: Taiki Yamaguchi Date: Wed, 30 Mar 2022 16:07:43 +0800 Subject: [PATCH 2/3] Handle errors gracefully --- src/error.rs | 3 +++ src/net/mod.rs | 2 +- src/net/server.rs | 14 ++++++++------ src/net/thread.rs | 48 ++++++++++++++++++++++++++++++++--------------- 4 files changed, 45 insertions(+), 22 deletions(-) diff --git a/src/error.rs b/src/error.rs index 119d58ca0..79bc8b664 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,6 +7,7 @@ pub enum Error { NotEnoughHelpers, NotFound, TooManyHelpers, + DeadThread(std::sync::mpsc::SendError), #[cfg(feature = "cli")] Hex(hex::FromHexError), @@ -44,6 +45,8 @@ impl std::fmt::Display for Error { } forward_errors! { + std::sync::mpsc::SendError => DeadThread, + #[cfg(feature = "cli")] hex::FromHexError => Hex, std::io::Error => Io, diff --git a/src/net/mod.rs b/src/net/mod.rs index 9fb820e9a..2839b23e0 100644 --- a/src/net/mod.rs +++ b/src/net/mod.rs @@ -1,5 +1,5 @@ pub use self::server::Server; -pub use self::thread::Thread; +pub use self::thread::{Message, Thread}; mod server; mod thread; diff --git a/src/net/server.rs b/src/net/server.rs index 5f81f7c98..8e2bf74d5 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -16,12 +16,17 @@ impl Server { self.start_connection_handler(); } + /// Spawns a new thread to handle incoming connections. + /// # Panics + /// If the thread could not be spawned. fn start_connection_handler(&self) { - self.connection_handler_thread.execute(|| { + if let Err(e) = self.connection_handler_thread.execute(|| { // listen // read // write - }); + }) { + panic!("Could not start the connection handler: {}", e); + } } } @@ -34,13 +39,10 @@ impl Default for Server { #[cfg(test)] mod tests { use super::Server; - use std::thread; - use std::time; #[test] - fn test_no_panic() { + fn no_panic() { let server = Server::new(); server.start(); - thread::sleep(time::Duration::from_millis(500)); } } diff --git a/src/net/thread.rs b/src/net/thread.rs index 994860c57..b208f64a6 100644 --- a/src/net/thread.rs +++ b/src/net/thread.rs @@ -1,12 +1,15 @@ +use log::{error, info}; use std::sync::{mpsc, Arc, Mutex}; use std::thread; +use crate::error::Res; + pub struct Thread { worker: Worker, sender: mpsc::Sender, } -pub struct Worker { +struct Worker { id: usize, thread: Option>, } @@ -25,21 +28,23 @@ impl Thread { let receiver = Arc::new(Mutex::new(receiver)); Thread { - worker: Worker::new(0, receiver), + worker: Worker::spawn(0, receiver), sender, } } /// Sends a function to the running thread for it to be executed. - /// # Panics - /// If the thread has been terminated. - pub fn execute(&self, f: F) + /// # Errors + /// If the thread has been terminated. + pub fn execute(&self, f: F) -> Res<()> where F: FnOnce() + Send + 'static, { let job = Box::new(f); - self.sender.send(Message::NewJob(job)).unwrap(); + self.sender.send(Message::NewJob(job))?; + + Ok(()) } } @@ -51,27 +56,40 @@ impl Default for Thread { impl Drop for Thread { fn drop(&mut self) { - println!("Terminating thread {}", self.worker.id); - self.sender.send(Message::Terminate).unwrap(); - - if let Some(thread) = self.worker.thread.take() { - thread.join().unwrap(); + info!("Terminating thread {}", self.worker.id); + if let Ok(()) = self.sender.send(Message::Terminate) { + if let Some(thread) = self.worker.thread.take() { + thread.join().unwrap(); + } } } } impl Worker { - fn new(id: usize, receiver: Arc>>) -> Worker { + fn spawn(id: usize, receiver: Arc>>) -> Worker { let thread = thread::spawn(move || loop { - let message = receiver.lock().unwrap().recv().unwrap(); + let message: Message; + + // Receive a message and release the lock before executing anything else. + if let Ok(receiver) = receiver.lock() { + if let Ok(msg) = receiver.recv() { + message = msg; + } else { + error!("Sender channel is closed."); + break; + } + } else { + error!("Failed to lock the mutex."); + break; + } match message { Message::NewJob(job) => { - println!("Worker {} received a job; executing.", id); + info!("Worker {} received a job; executing.", id); job(); } Message::Terminate => { - println!("Worker {} is shutting down.", id); + info!("Worker {} is shutting down.", id); break; } } From 58a649b4013b83c7b98da6670f597aa01328ffed Mon Sep 17 00:00:00 2001 From: Taiki Yamaguchi Date: Fri, 1 Apr 2022 14:27:32 +0800 Subject: [PATCH 3/3] Replace worker.id with ThreadId --- src/net/thread.rs | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/src/net/thread.rs b/src/net/thread.rs index b208f64a6..87640545b 100644 --- a/src/net/thread.rs +++ b/src/net/thread.rs @@ -1,4 +1,5 @@ use log::{error, info}; +use std::fmt::{Debug, Formatter}; use std::sync::{mpsc, Arc, Mutex}; use std::thread; @@ -10,7 +11,6 @@ pub struct Thread { } struct Worker { - id: usize, thread: Option>, } @@ -28,7 +28,7 @@ impl Thread { let receiver = Arc::new(Mutex::new(receiver)); Thread { - worker: Worker::spawn(0, receiver), + worker: Worker::spawn(receiver), sender, } } @@ -56,7 +56,7 @@ impl Default for Thread { impl Drop for Thread { fn drop(&mut self) { - info!("Terminating thread {}", self.worker.id); + info!("Terminating thread {:?}", self.worker); if let Ok(()) = self.sender.send(Message::Terminate) { if let Some(thread) = self.worker.thread.take() { thread.join().unwrap(); @@ -66,7 +66,7 @@ impl Drop for Thread { } impl Worker { - fn spawn(id: usize, receiver: Arc>>) -> Worker { + fn spawn(receiver: Arc>>) -> Worker { let thread = thread::spawn(move || loop { let message: Message; @@ -85,19 +85,33 @@ impl Worker { match message { Message::NewJob(job) => { - info!("Worker {} received a job; executing.", id); + info!( + "Worker {:?} received a job; executing.", + thread::current().id() + ); job(); } Message::Terminate => { - info!("Worker {} is shutting down.", id); + info!("Worker {:?} is shutting down.", thread::current().id()); break; } } }); + info!("Spawned worker {:?}.", thread.thread().id()); + Worker { - id, thread: Some(thread), } } } + +impl Debug for Worker { + fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result { + if let Some(thread) = &self.thread { + write!(f, "{:?}", thread.thread().id()) + } else { + write!(f, "(dead thread)") + } + } +}