diff --git a/scheduler/src/instance_listener.rs b/scheduler/src/instance/listener.rs similarity index 54% rename from scheduler/src/instance_listener.rs rename to scheduler/src/instance/listener.rs index 9db6fac3..2d4f7f39 100644 --- a/scheduler/src/instance_listener.rs +++ b/scheduler/src/instance/listener.rs @@ -1,11 +1,9 @@ -use log::debug; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status}; - use proto::scheduler::{ instance_service_server::InstanceService, Instance, InstanceIdentifier, InstanceStatus, }; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response}; use crate::{manager::Manager, Event}; @@ -25,8 +23,8 @@ impl InstanceService for InstanceListener { async fn create( &self, request: Request, - ) -> Result, Status> { - debug!("received request: {:?}", request); + ) -> Result, tonic::Status> { + log::debug!("received gRPC request: {:?}", request); let (tx, rx) = Manager::create_mpsc_channel(); match self @@ -38,33 +36,22 @@ impl InstanceService for InstanceListener { return Ok(Response::new(ReceiverStream::new(rx))); } Err(_) => { - return Err(Status::internal("could not send event to manager")); + return Err(tonic::Status::internal("could not send event to manager")); } } } - type CreateStream = ReceiverStream>; + type CreateStream = ReceiverStream>; - async fn start(&self, request: Request) -> Result, Status> { - debug!("received request: {:?}", request); - let (tx, rx) = Manager::create_oneshot_channel(); - - match self - .sender - .send(Event::InstanceStart(request.into_inner().id, tx)) - .await - { - Ok(_) => { - return rx.await.unwrap(); - } - Err(_) => { - return Err(Status::internal("could not send event to manager")); - } - } + async fn start(&self, _: Request) -> Result, tonic::Status> { + Err(tonic::Status::unimplemented("not implemented")) } - async fn stop(&self, request: Request) -> Result, Status> { - debug!("received request: {:?}", request); + async fn stop( + &self, + request: Request, + ) -> Result, tonic::Status> { + log::debug!("received gRPC request: {:?}", request); let (tx, rx) = Manager::create_oneshot_channel(); match self @@ -76,13 +63,16 @@ impl InstanceService for InstanceListener { return rx.await.unwrap(); } Err(_) => { - return Err(Status::internal("could not send event to manager")); + return Err(tonic::Status::internal("could not send event to manager")); } } } - async fn destroy(&self, request: Request) -> Result, Status> { - debug!("received request: {:?}", request); + async fn destroy( + &self, + request: Request, + ) -> Result, tonic::Status> { + log::debug!("received gRPC request: {:?}", request); let (tx, rx) = Manager::create_oneshot_channel(); match self @@ -94,7 +84,7 @@ impl InstanceService for InstanceListener { return rx.await.unwrap(); } Err(_) => { - return Err(Status::internal("could not send event to manager")); + return Err(tonic::Status::internal("could not send event to manager")); } } } diff --git a/scheduler/src/instance/mod.rs b/scheduler/src/instance/mod.rs new file mode 100644 index 00000000..5301ebcb --- /dev/null +++ b/scheduler/src/instance/mod.rs @@ -0,0 +1,2 @@ +pub mod listener; +pub mod scheduled; diff --git a/scheduler/src/instance/scheduled.rs b/scheduler/src/instance/scheduled.rs new file mode 100644 index 00000000..10f8ef51 --- /dev/null +++ b/scheduler/src/instance/scheduled.rs @@ -0,0 +1,82 @@ +use proto::scheduler::{Instance, InstanceStatus, Status}; +use tokio::sync::mpsc; + +use crate::{NodeIdentifier, ProxyError}; + +/// InstanceScheduled represents an instance that is scheduled to a node. It is used to send +/// messages to the node and contains the node identifier where it's scheduled. +/// +/// Properties: +/// +/// * `id`: The id of the instance. +/// * `instance`: The instance that is being registered. +/// * `node_id`: The node identifier of the node that the instance is running on. +/// * `tx`: This is the channel that the instance will use to send status updates to the controller. +#[derive(Debug, Clone)] +pub struct InstanceScheduled { + pub id: String, + pub instance: Instance, + pub node_id: Option, + pub tx: mpsc::Sender>, +} + +impl InstanceScheduled { + /// `new` creates a new `InstanceStatus` struct + /// + /// Arguments: + /// + /// * `id`: The id of the instance. + /// * `instance`: The instance that we want to run. + /// * `node_id`: The node identifier of the node that the instance is running on. + /// * `tx`: This is the channel that the instance will use to send status updates to the controller. + /// + /// Returns: + /// + /// A new instance of the struct `InstanceStatus` + pub fn new( + id: String, + instance: Instance, + node_id: Option, + tx: mpsc::Sender>, + ) -> Self { + Self { + id, + instance, + node_id, + tx, + } + } + + /// This function updates the status of the instance and sends the updated status to the controller + /// + /// Arguments: + /// + /// * `status`: The status of the node. + /// * `description`: A string that describes the status of the node. + /// + /// Returns: + /// + /// A Result<(), ProxyError> + pub async fn change_status( + &mut self, + status: Status, + description: Option, + ) -> Result<(), ProxyError> { + self.instance.status = status.into(); + + self.tx + .send(Ok(InstanceStatus { + id: self.id.clone(), + status: status.into(), + status_description: description.unwrap_or("".to_string()), + resource: match self.instance.status() { + Status::Running => self.instance.resource.clone(), + _ => None, + }, + })) + .await + .map_err(|_| ProxyError::ChannelSenderError)?; + + Ok(()) + } +} diff --git a/scheduler/src/lib.rs b/scheduler/src/lib.rs index a6a4c225..524943a4 100644 --- a/scheduler/src/lib.rs +++ b/scheduler/src/lib.rs @@ -7,7 +7,7 @@ use tokio::sync::{mpsc, oneshot}; use tonic::Response; pub mod config; -pub mod instance_listener; +pub mod instance; pub mod manager; pub mod node_listener; pub mod storage; @@ -26,6 +26,12 @@ pub enum SchedulerError { Unknown, } +#[derive(Error, Debug)] +pub enum ProxyError { + #[error("an error occurred while sending a message to the channel")] + ChannelSenderError, +} + #[derive(Debug)] #[allow(dead_code)] pub struct Node { @@ -33,6 +39,7 @@ pub struct Node { } pub type NodeIdentifier = String; +pub type InstanceIdentifier = String; #[derive(Debug)] pub enum Event { diff --git a/scheduler/src/manager.rs b/scheduler/src/manager.rs index f58bd960..88cafcc9 100644 --- a/scheduler/src/manager.rs +++ b/scheduler/src/manager.rs @@ -10,11 +10,9 @@ use tokio::sync::mpsc; use tokio::{sync::oneshot, task::JoinHandle}; use tonic::{transport::Server, Response}; +use crate::instance::listener::InstanceListener; use crate::SchedulerError; -use crate::{ - config::Config, instance_listener::InstanceListener, node_listener::NodeListener, - storage::Storage, Event, Node, -}; +use crate::{config::Config, node_listener::NodeListener, storage::Storage, Event, Node}; #[derive(Debug)] pub struct Manager {