diff --git a/Cargo.lock b/Cargo.lock index 0efa2b1a..4d7a36d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1881,6 +1881,7 @@ name = "scheduler" version = "0.1.0" dependencies = [ "anyhow", + "async-stream", "confy", "env_logger 0.8.4", "log", diff --git a/scheduler/Cargo.toml b/scheduler/Cargo.toml index d00ca89e..13e1459f 100644 --- a/scheduler/Cargo.toml +++ b/scheduler/Cargo.toml @@ -17,3 +17,4 @@ serde_derive = "1.0.142" confy = "0.4.0" anyhow = "1.0.62" thiserror = "1.0.32" +async-stream = "0.3.3" diff --git a/scheduler/src/lib.rs b/scheduler/src/lib.rs index 524943a4..45a45f8d 100644 --- a/scheduler/src/lib.rs +++ b/scheduler/src/lib.rs @@ -1,3 +1,5 @@ +use std::net::IpAddr; + use proto::scheduler::{ Instance, InstanceStatus, NodeRegisterRequest, NodeRegisterResponse, NodeStatus, NodeUnregisterRequest, NodeUnregisterResponse, @@ -9,7 +11,8 @@ use tonic::Response; pub mod config; pub mod instance; pub mod manager; -pub mod node_listener; +pub mod node; +pub mod parser; pub mod storage; #[derive(Error, Debug)] @@ -28,6 +31,14 @@ pub enum SchedulerError { #[derive(Error, Debug)] pub enum ProxyError { + #[error("an transport error occurred from tonic: {0}")] + TonicTransportError(#[from] tonic::transport::Error), + #[error("an status error occurred from tonic: {0}")] + TonicStatusError(#[from] tonic::Status), + #[error("the gRPC client was not found")] + GrpcClientNotFound, + #[error("the gRPC stream was not found")] + GrpcStreamNotFound, #[error("an error occurred while sending a message to the channel")] ChannelSenderError, } @@ -64,6 +75,7 @@ pub enum Event { // Node events NodeRegister( NodeRegisterRequest, + IpAddr, oneshot::Sender, tonic::Status>>, ), NodeUnregister( diff --git a/scheduler/src/manager.rs b/scheduler/src/manager.rs index 88cafcc9..166d5d97 100644 --- a/scheduler/src/manager.rs +++ b/scheduler/src/manager.rs @@ -11,8 +11,9 @@ use tokio::{sync::oneshot, task::JoinHandle}; use tonic::{transport::Server, Response}; use crate::instance::listener::InstanceListener; +use crate::node::listener::NodeListener; use crate::SchedulerError; -use crate::{config::Config, node_listener::NodeListener, storage::Storage, Event, Node}; +use crate::{config::Config, storage::Storage, Event, Node}; #[derive(Debug)] pub struct Manager { @@ -139,7 +140,7 @@ impl Manager { info!("received instance destroy event : {:?}", id); tx.send(Ok(Response::new(()))).unwrap(); } - Event::NodeRegister(request, tx) => { + Event::NodeRegister(request, _, tx) => { info!("received node register event : {:?}", request); tx.send(Ok(Response::new(NodeRegisterResponse::default()))) .unwrap(); diff --git a/scheduler/src/node_listener.rs b/scheduler/src/node/listener.rs similarity index 62% rename from scheduler/src/node_listener.rs rename to scheduler/src/node/listener.rs index b5b505cd..68c2341d 100644 --- a/scheduler/src/node_listener.rs +++ b/scheduler/src/node/listener.rs @@ -1,15 +1,13 @@ -use log::debug; use proto::scheduler::{ node_service_server::NodeService, NodeRegisterRequest, NodeRegisterResponse, NodeStatus, NodeUnregisterRequest, NodeUnregisterResponse, }; use tokio::sync::mpsc; -use tonic::{Request, Response, Status, Streaming}; +use tonic::{Request, Response, Streaming}; use crate::{manager::Manager, Event}; #[derive(Debug)] -#[allow(dead_code)] pub struct NodeListener { sender: mpsc::Sender, } @@ -25,33 +23,34 @@ impl NodeService for NodeListener { async fn status( &self, request: Request>, - ) -> Result, Status> { + ) -> Result, tonic::Status> { + log::debug!("received gRPC request: {:?}", request); + let mut stream = request.into_inner(); - let (tx, mut rx) = Manager::create_mpsc_channel(); + // send each status to the manager loop { + let (tx, mut rx) = Manager::create_mpsc_channel(); let message = stream.message().await?; + match message { Some(node_status) => { - debug!("Node status: {:?}", node_status); self.sender .send(Event::NodeStatus(node_status, tx.clone())) .await .unwrap(); + // wait for the manager to respond if let Some(res) = rx.recv().await { match res { - Ok(()) => { - debug!("Node status updated successfully"); - } - Err(err) => { - debug!("Error updating node status: {:?}", err); - return Err(err); - } + Ok(_) => {} + Err(err) => return Err(err), } } } None => { + log::error!("Node status stream closed"); + // todo: emit node crash event (get the node id from the first status) return Ok(Response::new(())); } } @@ -61,20 +60,23 @@ impl NodeService for NodeListener { async fn register( &self, request: Request, - ) -> Result, Status> { - debug!("{:?}", request); + ) -> Result, tonic::Status> { + log::debug!("received gRPC request: {:?}", request); + let (tx, rx) = Manager::create_oneshot_channel(); + let remote_addr = request.remote_addr().unwrap().ip(); + log::debug!("Registering a new node from: {:?}", remote_addr); match self .sender - .send(Event::NodeRegister(request.into_inner(), tx)) + .send(Event::NodeRegister(request.into_inner(), remote_addr, tx)) .await { Ok(_) => { return rx.await.unwrap(); } Err(_) => { - return Err(Status::internal("could not send event to manager")); + return Err(tonic::Status::internal("could not send event to manager")); } } } @@ -82,8 +84,9 @@ impl NodeService for NodeListener { async fn unregister( &self, request: Request, - ) -> Result, Status> { - debug!("{:?}", request); + ) -> Result, tonic::Status> { + log::debug!("received gRPC request: {:?}", request); + let (tx, rx) = Manager::create_oneshot_channel(); match self @@ -95,7 +98,7 @@ impl NodeService for NodeListener { return rx.await.unwrap(); } Err(_) => { - return Err(Status::internal("could not send event to manager")); + return Err(tonic::Status::internal("could not send event to manager")); } } } diff --git a/scheduler/src/node/mod.rs b/scheduler/src/node/mod.rs new file mode 100644 index 00000000..952b18e3 --- /dev/null +++ b/scheduler/src/node/mod.rs @@ -0,0 +1,11 @@ +use proto::scheduler::{Resource, Status}; + +pub mod listener; +pub mod registered; + +#[derive(Debug, Clone)] +pub struct Node { + pub id: String, + pub status: Status, + pub resource: Option, +} diff --git a/scheduler/src/node/registered.rs b/scheduler/src/node/registered.rs new file mode 100644 index 00000000..557ba2f5 --- /dev/null +++ b/scheduler/src/node/registered.rs @@ -0,0 +1,279 @@ +use std::{net::IpAddr, sync::Arc}; + +use proto::{ + agent::{instance_service_client::InstanceServiceClient, Signal, SignalInstruction}, + controller::node_service_client::NodeServiceClient, + scheduler::{Instance, Resource, Status}, +}; +use tokio::sync::{mpsc, Mutex}; +use tonic::{Request, Streaming}; + +use crate::{ + manager::Manager, + parser::{instance::InstanceParser, resource::ResourceParser}, + storage::{IStorage, Storage}, + InstanceIdentifier, ProxyError, +}; + +use super::Node; + +/// NodeRegistered represents a node that is registered to the cluster. It is responsible for +/// keeping track of the node's status to the controller, create/stop/destroy instances on the node. +/// +/// Properties: +/// +/// * `id`: The id of the node. +/// * `node`: The node that is being registered. +/// * `address`: The address of the node. +/// * `tx`: This is the channel that the node will use to send status updates to the controller. +/// * `grpc_client`: The grpc client for the node. +/// * `instances`: The instances that are running on the node. +#[derive(Debug)] +pub struct NodeRegistered { + pub id: String, + pub node: Node, + pub address: IpAddr, + pub tx: Option>>, + pub grpc_client: Option>, + pub instances: Storage, +} + +impl NodeRegistered { + /// `new` creates a new `NodeProxied` struct + /// + /// Arguments: + /// + /// * `id`: The id of the node. + /// * `node`: The node that this proxied node is connected to. + /// * `address`: The IP address of the node. + /// + /// Returns: + /// + /// A new instance of the NodeProxied struct. + pub fn new(id: String, node: Node, address: IpAddr) -> Self { + NodeRegistered { + id, + node, + address, + tx: None, + grpc_client: None, + instances: Storage::new(), + } + } + + /// This function connects to the gRPC server and stores the client in the `grpc_client` field of + /// the `Proxy` struct + /// + /// Returns: + /// + /// A Result<(), ProxyError> + pub async fn connect(&mut self) -> Result<(), ProxyError> { + let addr = format!("http://{}:{}", self.address, "50053"); + + let client = InstanceServiceClient::connect(addr) + .await + .map_err(ProxyError::TonicTransportError)?; + + self.grpc_client = Some(client); + Ok(()) + } + + /// It creates the node status stream between the controller and the scheduler. + /// It forwards the node status to the controller. + /// + /// Arguments: + /// + /// * `client`: The client that will be used to send the request to the node. + /// + /// Returns: + /// + /// A Result<(), ProxyError> + pub async fn open_node_status_stream( + &mut self, + client: Arc>>>, + ) -> Result<(), ProxyError> { + if self.tx.is_some() { + return Ok(()); + } + + let (tx, mut rx) = Manager::create_mpsc_channel(); + self.tx = Some(tx); + + let node_status_stream = async_stream::stream! { + loop { + let event = rx.recv().await; + match event { + Some(Ok(node_status)) => { + yield node_status; + } + Some(Err(_)) => { + break; + } + None => { + break; + } + } + } + + log::debug!("Node status stream closed"); + // todo: emit node crash event (get the node id from the first status) + }; + + let request = Self::wrap_request(node_status_stream); + + tokio::spawn(async move { + client + .lock() + .await + .as_mut() + .unwrap() + .update_node_status(request) + .await + }); + + Ok(()) + } + + /// This function updates the status of the node and sends the updated status to the controller + /// + /// Arguments: + /// + /// * `status`: The status of the node. + /// * `description`: A string that describes the status of the node. + /// * `resource`: The resource that the node is currently running. + /// + /// Returns: + /// + /// A Result<(), ProxyError> + pub async fn update_status( + &mut self, + status: Status, + description: Option, + resource: Option, + ) -> Result<(), ProxyError> { + self.node.status = status; + self.node.resource = resource; + + self.tx + .as_mut() + .ok_or(ProxyError::GrpcStreamNotFound)? + .send(Ok(proto::controller::NodeStatus { + id: self.id.clone(), + state: self.node.status.into(), + status_description: description.unwrap_or_else(|| "".to_string()), + resource: match self.node.status { + Status::Running => Some(ResourceParser::to_controller_resource( + self.node.resource.clone().unwrap(), + )), + _ => None, + }, + instances: self + .instances + .get_all() + .values() + .map(|instance| InstanceParser::fake_controller_instance(instance.id.clone())) + .collect(), + })) + .await + .map_err(|_| ProxyError::ChannelSenderError)?; + + Ok(()) + } + + /// Create a new instance to the node and return the InstanceStatus streaming. + /// + /// Arguments: + /// + /// * `instance`: Instance - The instance to create. + /// + /// Returns: + /// + /// Streaming of InstanceStatus - The streaming of the instance status. + pub async fn create_instance( + &mut self, + instance: Instance, + ) -> Result, ProxyError> { + let client = self + .grpc_client + .as_mut() + .ok_or(ProxyError::GrpcClientNotFound)?; + + let request = Self::wrap_request(InstanceParser::to_agent_instance(instance.clone())); + + let response = client + .create(request) + .await + .map_err(ProxyError::TonicStatusError)?; + + Ok(response.into_inner()) + } + + /// Send a stop signal to the node for the given instance. + /// + /// Arguments: + /// + /// * `id`: InstanceIdentifier - The instance identifier of the instance to stop. + /// + /// Returns: + /// + /// A future that resolves to a result of either a unit or a ProxyError. + pub async fn stop_instance(&mut self, id: InstanceIdentifier) -> Result<(), ProxyError> { + let client = self + .grpc_client + .as_mut() + .ok_or(ProxyError::GrpcClientNotFound)?; + + let request = Self::wrap_request(SignalInstruction { + signal: Signal::Stop.into(), + instance: Some(InstanceParser::fake_agent_instance(id)), + }); + + client + .signal(request) + .await + .map_err(ProxyError::TonicStatusError)?; + + Ok(()) + } + + /// Send a kill signal to the node for the given instance. + /// + /// Arguments: + /// + /// * `id`: InstanceIdentifier - The instance identifier of the instance to be killed. + /// + /// Returns: + /// + /// A future that resolves to a result of either a unit or a ProxyError. + pub async fn kill_instance(&mut self, id: InstanceIdentifier) -> Result<(), ProxyError> { + let client = self + .grpc_client + .as_mut() + .ok_or(ProxyError::GrpcClientNotFound)?; + + let request = Self::wrap_request(SignalInstruction { + signal: Signal::Kill.into(), + instance: Some(InstanceParser::fake_agent_instance(id)), + }); + + client + .signal(request) + .await + .map_err(ProxyError::TonicStatusError)?; + + Ok(()) + } + + /// This function takes a request and returns a request wrapped with tonic. + /// + /// Arguments: + /// + /// * `request`: The request object that you want to wrap. + /// + /// Returns: + /// + /// A Request object with the request as the inner value. + pub fn wrap_request(request: T) -> Request { + Request::new(request) + } +}