Skip to content

Commit

Permalink
feat(scheduler): add NodeRegistered structure
Browse files Browse the repository at this point in the history
Signed-off-by: iverly <[email protected]>
  • Loading branch information
iverly committed Aug 29, 2022
1 parent 63afc6e commit 00bb280
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 23 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions scheduler/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ serde_derive = "1.0.142"
confy = "0.4.0"
anyhow = "1.0.62"
thiserror = "1.0.32"
async-stream = "0.3.3"
14 changes: 13 additions & 1 deletion scheduler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::net::IpAddr;

use proto::scheduler::{
Instance, InstanceStatus, NodeRegisterRequest, NodeRegisterResponse, NodeStatus,
NodeUnregisterRequest, NodeUnregisterResponse,
Expand All @@ -9,7 +11,8 @@ use tonic::Response;
pub mod config;
pub mod instance;
pub mod manager;
pub mod node_listener;
pub mod node;
pub mod parser;
pub mod storage;

#[derive(Error, Debug)]
Expand All @@ -28,6 +31,14 @@ pub enum SchedulerError {

#[derive(Error, Debug)]
pub enum ProxyError {
#[error("an transport error occurred from tonic: {0}")]
TonicTransportError(#[from] tonic::transport::Error),
#[error("an status error occurred from tonic: {0}")]
TonicStatusError(#[from] tonic::Status),
#[error("the gRPC client was not found")]
GrpcClientNotFound,
#[error("the gRPC stream was not found")]
GrpcStreamNotFound,
#[error("an error occurred while sending a message to the channel")]
ChannelSenderError,
}
Expand Down Expand Up @@ -64,6 +75,7 @@ pub enum Event {
// Node events
NodeRegister(
NodeRegisterRequest,
IpAddr,
oneshot::Sender<Result<Response<NodeRegisterResponse>, tonic::Status>>,
),
NodeUnregister(
Expand Down
5 changes: 3 additions & 2 deletions scheduler/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use tokio::{sync::oneshot, task::JoinHandle};
use tonic::{transport::Server, Response};

use crate::instance::listener::InstanceListener;
use crate::node::listener::NodeListener;
use crate::SchedulerError;
use crate::{config::Config, node_listener::NodeListener, storage::Storage, Event, Node};
use crate::{config::Config, storage::Storage, Event, Node};

#[derive(Debug)]
pub struct Manager {
Expand Down Expand Up @@ -139,7 +140,7 @@ impl Manager {
info!("received instance destroy event : {:?}", id);
tx.send(Ok(Response::new(()))).unwrap();
}
Event::NodeRegister(request, tx) => {
Event::NodeRegister(request, _, tx) => {
info!("received node register event : {:?}", request);
tx.send(Ok(Response::new(NodeRegisterResponse::default())))
.unwrap();
Expand Down
43 changes: 23 additions & 20 deletions scheduler/src/node_listener.rs → scheduler/src/node/listener.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use log::debug;
use proto::scheduler::{
node_service_server::NodeService, NodeRegisterRequest, NodeRegisterResponse, NodeStatus,
NodeUnregisterRequest, NodeUnregisterResponse,
};
use tokio::sync::mpsc;
use tonic::{Request, Response, Status, Streaming};
use tonic::{Request, Response, Streaming};

use crate::{manager::Manager, Event};

#[derive(Debug)]
#[allow(dead_code)]
pub struct NodeListener {
sender: mpsc::Sender<Event>,
}
Expand All @@ -25,33 +23,34 @@ impl NodeService for NodeListener {
async fn status(
&self,
request: Request<Streaming<NodeStatus>>,
) -> Result<Response<()>, Status> {
) -> Result<Response<()>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);

let mut stream = request.into_inner();
let (tx, mut rx) = Manager::create_mpsc_channel();

// send each status to the manager
loop {
let (tx, mut rx) = Manager::create_mpsc_channel();
let message = stream.message().await?;

match message {
Some(node_status) => {
debug!("Node status: {:?}", node_status);
self.sender
.send(Event::NodeStatus(node_status, tx.clone()))
.await
.unwrap();

// wait for the manager to respond
if let Some(res) = rx.recv().await {
match res {
Ok(()) => {
debug!("Node status updated successfully");
}
Err(err) => {
debug!("Error updating node status: {:?}", err);
return Err(err);
}
Ok(_) => {}
Err(err) => return Err(err),
}
}
}
None => {
log::error!("Node status stream closed");
// todo: emit node crash event (get the node id from the first status)
return Ok(Response::new(()));
}
}
Expand All @@ -61,29 +60,33 @@ impl NodeService for NodeListener {
async fn register(
&self,
request: Request<NodeRegisterRequest>,
) -> Result<Response<NodeRegisterResponse>, Status> {
debug!("{:?}", request);
) -> Result<Response<NodeRegisterResponse>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);

let (tx, rx) = Manager::create_oneshot_channel();
let remote_addr = request.remote_addr().unwrap().ip();
log::debug!("Registering a new node from: {:?}", remote_addr);

match self
.sender
.send(Event::NodeRegister(request.into_inner(), tx))
.send(Event::NodeRegister(request.into_inner(), remote_addr, tx))
.await
{
Ok(_) => {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}

async fn unregister(
&self,
request: Request<NodeUnregisterRequest>,
) -> Result<Response<NodeUnregisterResponse>, Status> {
debug!("{:?}", request);
) -> Result<Response<NodeUnregisterResponse>, tonic::Status> {
log::debug!("received gRPC request: {:?}", request);

let (tx, rx) = Manager::create_oneshot_channel();

match self
Expand All @@ -95,7 +98,7 @@ impl NodeService for NodeListener {
return rx.await.unwrap();
}
Err(_) => {
return Err(Status::internal("could not send event to manager"));
return Err(tonic::Status::internal("could not send event to manager"));
}
}
}
Expand Down
11 changes: 11 additions & 0 deletions scheduler/src/node/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use proto::scheduler::{Resource, Status};

pub mod listener;
pub mod registered;

#[derive(Debug, Clone)]
pub struct Node {
pub id: String,
pub status: Status,
pub resource: Option<Resource>,
}
Loading

0 comments on commit 00bb280

Please sign in to comment.