Skip to content

Commit

Permalink
feat(scheduler): add InstanceScheduled structure
Browse files Browse the repository at this point in the history
Signed-off-by: iverly <[email protected]>
  • Loading branch information
iverly committed Aug 29, 2022
1 parent 3480c1a commit 63afc6e
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use log::debug;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

use proto::scheduler::{
instance_service_server::InstanceService, Instance, InstanceIdentifier, InstanceStatus,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

use crate::{manager::Manager, Event};

Expand All @@ -25,8 +23,8 @@ impl InstanceService for InstanceListener {
async fn create(
&self,
request: Request<Instance>,
) -> Result<Response<Self::CreateStream>, Status> {
debug!("received request: {:?}", request);
) -> Result<Response<Self::CreateStream>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);
let (tx, rx) = Manager::create_mpsc_channel();

match self
Expand All @@ -38,33 +36,22 @@ impl InstanceService for InstanceListener {
return Ok(Response::new(ReceiverStream::new(rx)));
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}

type CreateStream = ReceiverStream<Result<InstanceStatus, Status>>;
type CreateStream = ReceiverStream<Result<InstanceStatus, tonic::Status>>;

async fn start(&self, request: Request<InstanceIdentifier>) -> Result<Response<()>, Status> {
debug!("received request: {:?}", request);
let (tx, rx) = Manager::create_oneshot_channel();

match self
.sender
.send(Event::InstanceStart(request.into_inner().id, tx))
.await
{
Ok(_) => {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
}
}
async fn start(&self, _: Request<InstanceIdentifier>) -> Result<Response<()>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}

async fn stop(&self, request: Request<InstanceIdentifier>) -> Result<Response<()>, Status> {
debug!("received request: {:?}", request);
async fn stop(
&self,
request: Request<InstanceIdentifier>,
) -> Result<Response<()>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);
let (tx, rx) = Manager::create_oneshot_channel();

match self
Expand All @@ -76,13 +63,16 @@ impl InstanceService for InstanceListener {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}

async fn destroy(&self, request: Request<InstanceIdentifier>) -> Result<Response<()>, Status> {
debug!("received request: {:?}", request);
async fn destroy(
&self,
request: Request<InstanceIdentifier>,
) -> Result<Response<()>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);
let (tx, rx) = Manager::create_oneshot_channel();

match self
Expand All @@ -94,7 +84,7 @@ impl InstanceService for InstanceListener {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions scheduler/src/instance/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod listener;
pub mod scheduled;
82 changes: 82 additions & 0 deletions scheduler/src/instance/scheduled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use proto::scheduler::{Instance, InstanceStatus, Status};
use tokio::sync::mpsc;

use crate::{NodeIdentifier, ProxyError};

/// InstanceScheduled represents an instance that is scheduled to a node. It is used to send
/// messages to the node and contains the node identifier where it's scheduled.
///
/// Properties:
///
/// * `id`: The id of the instance.
/// * `instance`: The instance that is being registered.
/// * `node_id`: The node identifier of the node that the instance is running on.
/// * `tx`: This is the channel that the instance will use to send status updates to the controller.
#[derive(Debug, Clone)]
pub struct InstanceScheduled {
pub id: String,
pub instance: Instance,
pub node_id: Option<NodeIdentifier>,
pub tx: mpsc::Sender<Result<InstanceStatus, tonic::Status>>,
}

impl InstanceScheduled {
/// `new` creates a new `InstanceStatus` struct
///
/// Arguments:
///
/// * `id`: The id of the instance.
/// * `instance`: The instance that we want to run.
/// * `node_id`: The node identifier of the node that the instance is running on.
/// * `tx`: This is the channel that the instance will use to send status updates to the controller.
///
/// Returns:
///
/// A new instance of the struct `InstanceStatus`
pub fn new(
id: String,
instance: Instance,
node_id: Option<NodeIdentifier>,
tx: mpsc::Sender<Result<InstanceStatus, tonic::Status>>,
) -> Self {
Self {
id,
instance,
node_id,
tx,
}
}

/// This function updates the status of the instance and sends the updated status to the controller
///
/// Arguments:
///
/// * `status`: The status of the node.
/// * `description`: A string that describes the status of the node.
///
/// Returns:
///
/// A Result<(), ProxyError>
pub async fn change_status(
&mut self,
status: Status,
description: Option<String>,
) -> Result<(), ProxyError> {
self.instance.status = status.into();

self.tx
.send(Ok(InstanceStatus {
id: self.id.clone(),
status: status.into(),
status_description: description.unwrap_or("".to_string()),
resource: match self.instance.status() {
Status::Running => self.instance.resource.clone(),
_ => None,
},
}))
.await
.map_err(|_| ProxyError::ChannelSenderError)?;

Ok(())
}
}
9 changes: 8 additions & 1 deletion scheduler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tokio::sync::{mpsc, oneshot};
use tonic::Response;

pub mod config;
pub mod instance_listener;
pub mod instance;
pub mod manager;
pub mod node_listener;
pub mod storage;
Expand All @@ -26,13 +26,20 @@ pub enum SchedulerError {
Unknown,
}

#[derive(Error, Debug)]
pub enum ProxyError {
#[error("an error occurred while sending a message to the channel")]
ChannelSenderError,
}

#[derive(Debug)]
#[allow(dead_code)]
pub struct Node {
id: String,
}

pub type NodeIdentifier = String;
pub type InstanceIdentifier = String;

#[derive(Debug)]
pub enum Event {
Expand Down
6 changes: 2 additions & 4 deletions scheduler/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ use tokio::sync::mpsc;
use tokio::{sync::oneshot, task::JoinHandle};
use tonic::{transport::Server, Response};

use crate::instance::listener::InstanceListener;
use crate::SchedulerError;
use crate::{
config::Config, instance_listener::InstanceListener, node_listener::NodeListener,
storage::Storage, Event, Node,
};
use crate::{config::Config, node_listener::NodeListener, storage::Storage, Event, Node};

#[derive(Debug)]
pub struct Manager {
Expand Down

0 comments on commit 63afc6e

Please sign in to comment.