From 587dbc6260a3a0dd222cefb06afee829ab8e096a Mon Sep 17 00:00:00 2001 From: iverly Date: Mon, 8 Jan 2024 13:07:52 +0100 Subject: [PATCH] feat(proxy): use event handlers for handle_listener_events Signed-off-by: iverly --- event/Cargo.toml | 1 + event/src/handlers/delete_backend.rs | 7 ++-- event/src/handlers/list_backend.rs | 17 ++------- event/src/handlers/put_backend.rs | 11 +++--- proxy/Cargo.toml | 1 + proxy/src/lib.rs | 55 +++++++++++----------------- 6 files changed, 37 insertions(+), 55 deletions(-) diff --git a/event/Cargo.toml b/event/Cargo.toml index 14dd799..f473d14 100644 --- a/event/Cargo.toml +++ b/event/Cargo.toml @@ -11,3 +11,4 @@ storage = { path = "../storage" } shared = { path = "../shared" } tokio = { version = "1.26.0", features = [ "sync" ] } tonic = "0.7.2" +anyhow = "1.0.63" diff --git a/event/src/handlers/delete_backend.rs b/event/src/handlers/delete_backend.rs index d6b3f23..3323b0f 100644 --- a/event/src/handlers/delete_backend.rs +++ b/event/src/handlers/delete_backend.rs @@ -1,6 +1,7 @@ use std::sync::Arc; -use proto::proxy::Backend; +use anyhow::{anyhow, Result}; +use shared::models::backend::Backend; use storage::Storage; use tokio::sync::{oneshot, Mutex}; @@ -17,13 +18,13 @@ impl DeleteBackendHandler { pub async fn handle( storage: Arc>, backend: Backend, - tx: oneshot::Sender>, + tx: oneshot::Sender>, ) { let mut storage = storage.lock().await; let result = storage .remove_backend(&backend.hostname) - .map_err(|e| tonic::Status::internal(format!("Failed to delete backend: {}", e))); + .map_err(|e| anyhow!("Failed to delete backend: {}", e)); let _ = tx.send(result); } diff --git a/event/src/handlers/list_backend.rs b/event/src/handlers/list_backend.rs index 7aee64e..3c4b3a6 100644 --- a/event/src/handlers/list_backend.rs +++ b/event/src/handlers/list_backend.rs @@ -1,11 +1,10 @@ use std::sync::Arc; -use proto::proxy::Backend; +use anyhow::Result; +use shared::models::backend::Backend; use storage::Storage; use tokio::sync::{oneshot, Mutex}; -use crate::tonic_backend_from_proxy; - pub struct ListBackendHandler {} impl ListBackendHandler { @@ -16,18 +15,10 @@ impl ListBackendHandler { /// * `storage`: Arc> - the storage object that holds all the backends /// * `backend`: The backend to add to the storage. /// * `tx`: This is the channel that the client is listening on. - pub async fn handle( - storage: Arc>, - tx: oneshot::Sender, tonic::Status>>, - ) { + pub async fn handle(storage: Arc>, tx: oneshot::Sender>>) { let storage = storage.lock().await; - let backends = storage - .get_backends() - .clone() - .into_values() - .map(tonic_backend_from_proxy) - .collect(); + let backends = storage.get_backends().clone().into_values().collect(); let _ = tx.send(Ok(backends)); } diff --git a/event/src/handlers/put_backend.rs b/event/src/handlers/put_backend.rs index eff7d82..af4fc13 100644 --- a/event/src/handlers/put_backend.rs +++ b/event/src/handlers/put_backend.rs @@ -1,11 +1,10 @@ use std::sync::Arc; -use proto::proxy::Backend; +use anyhow::{anyhow, Result}; +use shared::models::backend::Backend; use storage::Storage; use tokio::sync::{oneshot, Mutex}; -use crate::proxy_backend_from_tonic; - pub struct PutBackendHandler {} impl PutBackendHandler { @@ -19,13 +18,13 @@ impl PutBackendHandler { pub async fn handle( storage: Arc>, backend: Backend, - tx: oneshot::Sender>, + tx: oneshot::Sender>, ) { let mut storage = storage.lock().await; let result = storage - .add_backend(proxy_backend_from_tonic(backend)) - .map_err(|e| tonic::Status::internal(format!("Failed to add backend: {}", e))); + .add_backend(backend) + .map_err(|e| anyhow!(format!("Failed to add backend: {}", e))); let _ = tx.send(result); } diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index 52f7262..c395db8 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -10,6 +10,7 @@ protocol = { path = "../protocol" } shared = { path = "../shared" } listener = { path = "../listener" } storage = { path = "../storage" } +event = { path = "../event" } log = "0.4.17" tokio = { version = "1.21.0", features = ["rt", "net", "io-util", "sync"] } anyhow = "1.0.63" diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index eb80462..d0a5d14 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -1,6 +1,10 @@ use std::{env, sync::Arc}; use anyhow::{anyhow, Ok, Result}; +use event::handlers::{ + delete_backend::DeleteBackendHandler, list_backend::ListBackendHandler, + put_backend::PutBackendHandler, +}; use listener::{event::Event, Listener}; use log::debug; use storage::Storage; @@ -216,6 +220,19 @@ impl Proxy { Ok(()) } + /// The function `handle_listener_events` handles events received from a channel by spawning async + /// tasks to handle different types of events. + /// + /// Arguments: + /// + /// * `rx`: A `Receiver` which is used to receive events from some event source. + /// * `storage`: `storage` is an `Arc>` which is a shared mutable state that is + /// protected by a mutex. It allows multiple threads to access and modify the `Storage` struct + /// concurrently. + /// + /// Returns: + /// + /// a `Result<()>`. async fn handle_listener_events( mut rx: Receiver, storage: Arc>, @@ -228,42 +245,14 @@ impl Proxy { tokio::spawn(async move { match event { - Event::DeleteBackend(backend, tx) => { - tx.send(storage.lock().await.remove_backend(&backend.hostname)) - .map_err(|_| { - log::error!("failed to send delete backend response"); - anyhow!("failed to send delete backend response") - })?; + Event::ListBackends(tx) => { + ListBackendHandler::handle(storage, tx).await; } Event::PutBackend(backend, tx) => { - tx.send(storage.lock().await.add_backend( - shared::models::backend::Backend::new( - backend.hostname, - backend.redirect_ip, - backend.redirect_port, - ), - )) - .map_err(|_| { - log::error!("failed to send put backend response"); - anyhow!("failed to send put backend response") - })?; + PutBackendHandler::handle(storage, backend, tx).await; } - Event::ListBackends(tx) => { - tx.send(Ok(storage - .lock() - .await - .get_backends() - .iter() - .map(|backend| shared::models::backend::Backend { - hostname: backend.1.hostname().to_string(), - redirect_ip: backend.1.redirect_ip().to_string(), - redirect_port: backend.1.redirect_port(), - }) - .collect::>())) - .map_err(|_| { - log::error!("failed to send list backends response"); - anyhow!("failed to send list backends response") - })?; + Event::DeleteBackend(backend, tx) => { + DeleteBackendHandler::handle(storage, backend, tx).await; } } Ok(())