Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add parser module & proxy structures for scheduler #63

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ serde_derive = "1.0.142"
confy = "0.4.0"
anyhow = "1.0.62"
thiserror = "1.0.32"
async-stream = "0.3.3"
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use log::debug;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};

use proto::scheduler::{
instance_service_server::InstanceService, Instance, InstanceIdentifier, InstanceStatus,
};
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response};

use crate::{manager::Manager, Event};

Expand All @@ -25,8 +23,8 @@ impl InstanceService for InstanceListener {
async fn create(
&self,
request: Request<Instance>,
) -> Result<Response<Self::CreateStream>, Status> {
debug!("received request: {:?}", request);
) -> Result<Response<Self::CreateStream>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);
let (tx, rx) = Manager::create_mpsc_channel();

match self
Expand All @@ -38,33 +36,22 @@ impl InstanceService for InstanceListener {
return Ok(Response::new(ReceiverStream::new(rx)));
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}

type CreateStream = ReceiverStream<Result<InstanceStatus, Status>>;
type CreateStream = ReceiverStream<Result<InstanceStatus, tonic::Status>>;

async fn start(&self, request: Request<InstanceIdentifier>) -> Result<Response<()>, Status> {
debug!("received request: {:?}", request);
let (tx, rx) = Manager::create_oneshot_channel();

match self
.sender
.send(Event::InstanceStart(request.into_inner().id, tx))
.await
{
Ok(_) => {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
}
}
async fn start(&self, _: Request<InstanceIdentifier>) -> Result<Response<()>, tonic::Status> {
Err(tonic::Status::unimplemented("not implemented"))
}

async fn stop(&self, request: Request<InstanceIdentifier>) -> Result<Response<()>, Status> {
debug!("received request: {:?}", request);
async fn stop(
&self,
request: Request<InstanceIdentifier>,
) -> Result<Response<()>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);
let (tx, rx) = Manager::create_oneshot_channel();

match self
Expand All @@ -76,13 +63,16 @@ impl InstanceService for InstanceListener {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}

async fn destroy(&self, request: Request<InstanceIdentifier>) -> Result<Response<()>, Status> {
debug!("received request: {:?}", request);
async fn destroy(
&self,
request: Request<InstanceIdentifier>,
) -> Result<Response<()>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);
let (tx, rx) = Manager::create_oneshot_channel();

match self
Expand All @@ -94,7 +84,7 @@ impl InstanceService for InstanceListener {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions scheduler/src/instance/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod listener;
pub mod scheduled;
82 changes: 82 additions & 0 deletions scheduler/src/instance/scheduled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use proto::scheduler::{Instance, InstanceStatus, Status};
use tokio::sync::mpsc;

use crate::{NodeIdentifier, ProxyError};

/// InstanceScheduled represents an instance that is scheduled to a node. It is used to send
/// messages to the node and contains the node identifier where it's scheduled.
///
/// Properties:
///
/// * `id`: The id of the instance.
/// * `instance`: The instance that is being registered.
/// * `node_id`: The node identifier of the node that the instance is running on.
/// * `tx`: This is the channel that the instance will use to send status updates to the controller.
#[derive(Debug, Clone)]
pub struct InstanceScheduled {
pub id: String,
pub instance: Instance,
pub node_id: Option<NodeIdentifier>,
pub tx: mpsc::Sender<Result<InstanceStatus, tonic::Status>>,
}

impl InstanceScheduled {
/// `new` creates a new `InstanceStatus` struct
///
/// Arguments:
///
/// * `id`: The id of the instance.
/// * `instance`: The instance that we want to run.
/// * `node_id`: The node identifier of the node that the instance is running on.
/// * `tx`: This is the channel that the instance will use to send status updates to the controller.
///
/// Returns:
///
/// A new instance of the struct `InstanceStatus`
pub fn new(
id: String,
instance: Instance,
node_id: Option<NodeIdentifier>,
tx: mpsc::Sender<Result<InstanceStatus, tonic::Status>>,
) -> Self {
Self {
id,
instance,
node_id,
tx,
}
}

/// This function updates the status of the instance and sends the updated status to the controller
///
/// Arguments:
///
/// * `status`: The status of the node.
/// * `description`: A string that describes the status of the node.
///
/// Returns:
///
/// A Result<(), ProxyError>
pub async fn change_status(
&mut self,
status: Status,
description: Option<String>,
) -> Result<(), ProxyError> {
self.instance.status = status.into();

self.tx
.send(Ok(InstanceStatus {
id: self.id.clone(),
status: status.into(),
status_description: description.unwrap_or_else(|| "".to_string()),
resource: match self.instance.status() {
Status::Running => self.instance.resource.clone(),
_ => None,
},
}))
.await
.map_err(|_| ProxyError::ChannelSenderError)?;

Ok(())
}
}
23 changes: 21 additions & 2 deletions scheduler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::IpAddr;

use proto::scheduler::{
Instance, InstanceStatus, NodeRegisterRequest, NodeRegisterResponse, NodeStatus,
NodeUnregisterRequest, NodeUnregisterResponse,
Expand All @@ -7,9 +9,10 @@ use tokio::sync::{mpsc, oneshot};
use tonic::Response;

pub mod config;
pub mod instance_listener;
pub mod instance;
pub mod manager;
pub mod node_listener;
pub mod node;
pub mod parser;
pub mod storage;

#[derive(Error, Debug)]
Expand All @@ -26,13 +29,28 @@ pub enum SchedulerError {
Unknown,
}

#[derive(Error, Debug)]
pub enum ProxyError {
#[error("an transport error occurred from tonic: {0}")]
TonicTransportError(#[from] tonic::transport::Error),
#[error("an status error occurred from tonic: {0}")]
TonicStatusError(#[from] tonic::Status),
#[error("the gRPC client was not found")]
GrpcClientNotFound,
#[error("the gRPC stream was not found")]
GrpcStreamNotFound,
#[error("an error occurred while sending a message to the channel")]
ChannelSenderError,
}

#[derive(Debug)]
#[allow(dead_code)]
pub struct Node {
id: String,
}

pub type NodeIdentifier = String;
pub type InstanceIdentifier = String;

#[derive(Debug)]
pub enum Event {
Expand All @@ -57,6 +75,7 @@ pub enum Event {
// Node events
NodeRegister(
NodeRegisterRequest,
IpAddr,
oneshot::Sender<Result<Response<NodeRegisterResponse>, tonic::Status>>,
),
NodeUnregister(
Expand Down
9 changes: 4 additions & 5 deletions scheduler/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,10 @@ use tokio::sync::mpsc;
use tokio::{sync::oneshot, task::JoinHandle};
use tonic::{transport::Server, Response};

use crate::instance::listener::InstanceListener;
use crate::node::listener::NodeListener;
use crate::SchedulerError;
use crate::{
config::Config, instance_listener::InstanceListener, node_listener::NodeListener,
storage::Storage, Event, Node,
};
use crate::{config::Config, storage::Storage, Event, Node};

#[derive(Debug)]
pub struct Manager {
Expand Down Expand Up @@ -141,7 +140,7 @@ impl Manager {
info!("received instance destroy event : {:?}", id);
tx.send(Ok(Response::new(()))).unwrap();
}
Event::NodeRegister(request, tx) => {
Event::NodeRegister(request, _, tx) => {
info!("received node register event : {:?}", request);
tx.send(Ok(Response::new(NodeRegisterResponse::default())))
.unwrap();
Expand Down
43 changes: 23 additions & 20 deletions scheduler/src/node_listener.rs → scheduler/src/node/listener.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use log::debug;
use proto::scheduler::{
node_service_server::NodeService, NodeRegisterRequest, NodeRegisterResponse, NodeStatus,
NodeUnregisterRequest, NodeUnregisterResponse,
};
use tokio::sync::mpsc;
use tonic::{Request, Response, Status, Streaming};
use tonic::{Request, Response, Streaming};

use crate::{manager::Manager, Event};

#[derive(Debug)]
#[allow(dead_code)]
pub struct NodeListener {
sender: mpsc::Sender<Event>,
}
Expand All @@ -25,33 +23,34 @@ impl NodeService for NodeListener {
async fn status(
&self,
request: Request<Streaming<NodeStatus>>,
) -> Result<Response<()>, Status> {
) -> Result<Response<()>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);

let mut stream = request.into_inner();
let (tx, mut rx) = Manager::create_mpsc_channel();

// send each status to the manager
loop {
let (tx, mut rx) = Manager::create_mpsc_channel();
let message = stream.message().await?;

match message {
Some(node_status) => {
debug!("Node status: {:?}", node_status);
self.sender
.send(Event::NodeStatus(node_status, tx.clone()))
.await
.unwrap();

// wait for the manager to respond
if let Some(res) = rx.recv().await {
match res {
Ok(()) => {
debug!("Node status updated successfully");
}
Err(err) => {
debug!("Error updating node status: {:?}", err);
return Err(err);
}
Ok(_) => {}
Err(err) => return Err(err),
}
}
}
None => {
log::error!("Node status stream closed");
// todo: emit node crash event (get the node id from the first status)
return Ok(Response::new(()));
}
}
Expand All @@ -61,29 +60,33 @@ impl NodeService for NodeListener {
async fn register(
&self,
request: Request<NodeRegisterRequest>,
) -> Result<Response<NodeRegisterResponse>, Status> {
debug!("{:?}", request);
) -> Result<Response<NodeRegisterResponse>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);

let (tx, rx) = Manager::create_oneshot_channel();
let remote_addr = request.remote_addr().unwrap().ip();
log::debug!("Registering a new node from: {:?}", remote_addr);

match self
.sender
.send(Event::NodeRegister(request.into_inner(), tx))
.send(Event::NodeRegister(request.into_inner(), remote_addr, tx))
.await
{
Ok(_) => {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}

async fn unregister(
&self,
request: Request<NodeUnregisterRequest>,
) -> Result<Response<NodeUnregisterResponse>, Status> {
debug!("{:?}", request);
) -> Result<Response<NodeUnregisterResponse>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);

let (tx, rx) = Manager::create_oneshot_channel();

match self
Expand All @@ -95,7 +98,7 @@ impl NodeService for NodeListener {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions scheduler/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use proto::scheduler::{Resource, Status};

pub mod listener;
pub mod registered;

#[derive(Debug, Clone)]
pub struct Node {
pub id: String,
pub status: Status,
pub resource: Option<Resource>,
}
Loading