Skip to content

Commit

Permalink
fix: (controller) restart after failing
Browse files Browse the repository at this point in the history
Added cooldown restart when failing on some tasks.
Added time to erase nodes.

Signed-off-by: Alexandre Gomez <[email protected]>
  • Loading branch information
Roxxas96 committed Oct 8, 2022
1 parent 9195937 commit b6b2484
Show file tree
Hide file tree
Showing 9 changed files with 134 additions and 52 deletions.
76 changes: 48 additions & 28 deletions controller/lib/src/external_api/instance/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,18 @@ impl InstanceController {
body: web::Json<InstanceDTO>,
data: web::Data<ActixAppState>,
) -> impl Responder {
let instance_service =
match InstanceService::new(&data.grpc_address, &data.etcd_address).await {
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};
let instance_service = match InstanceService::new(
&data.grpc_address,
&data.etcd_address,
data.grpc_client_connection_max_retries,
)
.await
{
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};

let mut workload_service = match WorkloadService::new(&data.etcd_address).await {
Ok(service) => service,
Expand Down Expand Up @@ -142,13 +147,18 @@ impl InstanceController {
params: web::Path<(String, String)>,
data: web::Data<ActixAppState>,
) -> impl Responder {
let mut instance_service =
match InstanceService::new(&data.grpc_address, &data.etcd_address).await {
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};
let mut instance_service = match InstanceService::new(
&data.grpc_address,
&data.etcd_address,
data.grpc_client_connection_max_retries,
)
.await
{
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};

let (namespace, name) = params.into_inner();
match instance_service
Expand Down Expand Up @@ -183,13 +193,18 @@ impl InstanceController {
params: web::Path<(String, String)>,
data: web::Data<ActixAppState>,
) -> impl Responder {
let mut instance_service =
match InstanceService::new(&data.grpc_address, &data.etcd_address).await {
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};
let mut instance_service = match InstanceService::new(
&data.grpc_address,
&data.etcd_address,
data.grpc_client_connection_max_retries,
)
.await
{
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};

let (namespace, name) = params.into_inner();
match instance_service
Expand All @@ -216,13 +231,18 @@ impl InstanceController {
pagination: Option<web::Query<Pagination>>,
data: web::Data<ActixAppState>,
) -> impl Responder {
let mut instance_service =
match InstanceService::new(&data.grpc_address, &data.etcd_address).await {
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};
let mut instance_service = match InstanceService::new(
&data.grpc_address,
&data.etcd_address,
data.grpc_client_connection_max_retries,
)
.await
{
Ok(service) => service,
Err(err) => {
return InstanceControllerError::InstanceServiceError(err).into();
}
};

let instances = match pagination {
Some(pagination) => {
Expand Down
20 changes: 14 additions & 6 deletions controller/lib/src/external_api/instance/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;

use super::model::Instance;
use crate::etcd::{EtcdClient, EtcdClientError};
Expand All @@ -10,7 +11,7 @@ use log::{debug, trace};
use proto::controller::InstanceState;
use serde_json;
use thiserror::Error;
use tokio::sync::Mutex;
use tokio::{sync::Mutex, time};
use tonic::{Request, Status};

#[derive(Debug, Error)]
Expand Down Expand Up @@ -46,11 +47,15 @@ impl InstanceService {
pub async fn new(
grpc_address: &str,
etcd_address: &SocketAddr,
grpc_client_connection_max_retries: u32,
) -> Result<Self, InstanceServiceError> {
Ok(InstanceService {
grpc_service: SchedulerClientInterface::new(grpc_address.to_string())
.await
.map_err(InstanceServiceError::SchedulerClientInterfaceError)?,
grpc_service: SchedulerClientInterface::new(
grpc_address.to_string(),
grpc_client_connection_max_retries,
)
.await
.map_err(InstanceServiceError::SchedulerClientInterfaceError)?,
etcd_service: EtcdClient::new(etcd_address.to_string())
.await
.map_err(InstanceServiceError::EtcdError)?,
Expand Down Expand Up @@ -149,6 +154,8 @@ impl InstanceService {
pub fn schedule_instance(this: Arc<Mutex<Self>>, mut instance: Instance) {
//Spawn a thread to start the instance
tokio::spawn(async move {
let mut cooldown = 1;

loop {
let mut stream = this
.clone()
Expand Down Expand Up @@ -196,8 +203,9 @@ impl InstanceService {
}

instance.num_restarts += 1;

debug!("Restarting instance {}", instance.id);
debug!("Restarting instance {} in {}s", instance.id, cooldown);
time::sleep(Duration::from_secs(cooldown)).await;
cooldown *= 2;

this.clone()
.lock()
Expand Down
3 changes: 3 additions & 0 deletions controller/lib/src/external_api/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct ExternalAPIInterface {}
pub struct ActixAppState {
pub etcd_address: SocketAddr,
pub grpc_address: String,
pub grpc_client_connection_max_retries: u32,
}

impl ExternalAPIInterface {
Expand All @@ -32,6 +33,7 @@ impl ExternalAPIInterface {
num_workers: usize,
etcd_address: SocketAddr,
grpc_address: String,
grpc_client_connection_max_retries: u32,
) -> Result<Self, ExternalAPIInterfaceError> {
let mut etcd_client = EtcdClient::new(etcd_address.to_string())
.await
Expand Down Expand Up @@ -65,6 +67,7 @@ impl ExternalAPIInterface {
.app_data(web::Data::new(ActixAppState {
etcd_address,
grpc_address: grpc_address.clone(),
grpc_client_connection_max_retries,
}))
.route("/health", web::get().to(HttpResponse::Ok))
.service(workload::controller::WorkloadController {}.services())
Expand Down
32 changes: 27 additions & 5 deletions controller/lib/src/grpc_client/interface.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use log::{debug, trace};
use std::time::Duration;

use log::{debug, error, trace};
use proto::scheduler::instance_service_client::InstanceServiceClient;
use proto::scheduler::{Instance, InstanceIdentifier, InstanceStatus};
use thiserror::Error;
use tokio::time;
use tonic::transport::{Channel, Error};
use tonic::{Request, Response, Status, Streaming};

Expand All @@ -20,17 +23,36 @@ pub struct SchedulerClientInterface {
impl SchedulerClientInterface {
pub async fn new(
instance_client_address: String,
max_retries: u32,
) -> Result<Self, SchedulerClientInterfaceError> {
debug!(
"Starting gRPC client for scheduler Instance Service on {}",
instance_client_address,
);

let instance_client = InstanceServiceClient::connect(instance_client_address)
.await
.map_err(SchedulerClientInterfaceError::ConnectionError)?;
let mut retries: u32 = 0;
let mut cooldown = Duration::from_secs(1);

Ok(Self { instance_client })
loop {
match InstanceServiceClient::connect(instance_client_address.clone()).await {
Ok(instance_client) => {
debug!("Connected to scheduler Instance Service");
return Ok(SchedulerClientInterface { instance_client });
}
Err(e) => {
if retries >= max_retries {
return Err(SchedulerClientInterfaceError::ConnectionError(e));
}
retries += 1;
error!(
"Failed to connect to scheduler Instance Service, retrying in {} seconds",
cooldown.as_secs()
);
time::sleep(cooldown).await;
cooldown *= 2;
}
}
}
}

pub async fn create_instance(
Expand Down
15 changes: 11 additions & 4 deletions controller/lib/src/internal_api/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,23 @@ impl InternalAPIInterface {
address: SocketAddr,
etcd_address: SocketAddr,
grpc_address: String,
grpc_client_connection_max_retries: u32,
time_after_node_erased: u64,
) -> Result<Self, InternalAPIInterfaceError> {
info!("Starting gRPC server listening on {}", address);

tokio::spawn(async move {
Server::builder()
.add_service(NodeServiceServer::new(
NodeController::new(&etcd_address, &grpc_address)
.await
.map_err(InternalAPIInterfaceError::NodeControllerError)
.unwrap(),
NodeController::new(
&etcd_address,
&grpc_address,
grpc_client_connection_max_retries,
time_after_node_erased,
)
.await
.map_err(InternalAPIInterfaceError::NodeControllerError)
.unwrap(),
))
.serve(address)
.await
Expand Down
16 changes: 12 additions & 4 deletions controller/lib/src/internal_api/node/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,27 @@ pub enum NodeControllerError {
/// * `node_service`: An instance of the NodeService that will implement the logic.
pub struct NodeController {
node_service: Arc<Mutex<NodeService>>,
time_after_node_erased: u64,
}

impl NodeController {
pub async fn new(
etcd_address: &SocketAddr,
grpc_address: &str,
grpc_client_connection_max_retries: u32,
time_after_node_erased: u64,
) -> Result<Self, NodeControllerError> {
Ok(NodeController {
node_service: Arc::new(Mutex::new(
NodeService::new(etcd_address, grpc_address)
.await
.map_err(NodeControllerError::NodeServiceError)?,
NodeService::new(
etcd_address,
grpc_address,
grpc_client_connection_max_retries,
)
.await
.map_err(NodeControllerError::NodeServiceError)?,
)),
time_after_node_erased,
})
}
}
Expand Down Expand Up @@ -61,7 +69,7 @@ impl proto::controller::node_service_server::NodeService for NodeController {
.clone()
.lock()
.await
.update_node_status(stream)
.update_node_status(stream, self.time_after_node_erased)
.await
.map_err(|err| {
error!("Error updating node status: {}", err);
Expand Down
15 changes: 10 additions & 5 deletions controller/lib/src/internal_api/node/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,20 @@ impl NodeService {
pub async fn new(
etcd_address: &SocketAddr,
grpc_address: &str,
grpc_client_connection_max_retries: u32,
) -> Result<Self, NodeServiceError> {
Ok(NodeService {
etcd_interface: EtcdClient::new(etcd_address.to_string())
.await
.map_err(NodeServiceError::EtcdError)?,
instance_service: Arc::new(Mutex::new(
InstanceService::new(grpc_address, etcd_address)
.await
.map_err(NodeServiceError::InstanceServiceError)?,
InstanceService::new(
grpc_address,
etcd_address,
grpc_client_connection_max_retries,
)
.await
.map_err(NodeServiceError::InstanceServiceError)?,
)),
})
}
Expand All @@ -63,6 +68,7 @@ impl NodeService {
pub async fn update_node_status(
&mut self,
mut stream: Streaming<proto::controller::NodeStatus>,
time_after_node_erased: u64,
) -> Result<String, NodeServiceError> {
let mut last_instances: Vec<Instance> = vec![];
let mut node_id = String::new();
Expand Down Expand Up @@ -107,8 +113,7 @@ impl NodeService {
InstanceService::schedule_instance(self.instance_service.clone(), instance)
}

//Delete Node after 5 min
time::sleep(Duration::from_secs(300)).await;
time::sleep(Duration::from_secs(time_after_node_erased)).await;
self.etcd_interface.delete(&node_id).await;

Ok(node_id)
Expand Down
6 changes: 6 additions & 0 deletions controller/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@ pub struct KudoControllerConfig {
pub struct InternalAPIConfig {
pub grpc_server_addr: SocketAddr,
pub grpc_client_addr: String,
pub grpc_client_connection_max_retries: u32,
pub time_after_node_erased: u64,
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct ExternalAPIConfig {
pub http_server_addr: SocketAddr,
pub http_server_num_workers: usize,
pub etcd_address: SocketAddr,
pub grpc_client_connection_max_retries: u32,
}

impl Default for KudoControllerConfig {
Expand All @@ -29,6 +32,8 @@ impl Default for KudoControllerConfig {
50051,
),
grpc_client_addr: "http://127.0.0.1:50052".to_string(),
grpc_client_connection_max_retries: 32,
time_after_node_erased: 350,
},
external_api: ExternalAPIConfig {
http_server_addr: SocketAddr::new(
Expand All @@ -40,6 +45,7 @@ impl Default for KudoControllerConfig {
std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
2379,
),
grpc_client_connection_max_retries: 32,
},
}
}
Expand Down
3 changes: 3 additions & 0 deletions controller/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
config.internal_api.grpc_server_addr,
config.external_api.etcd_address,
config.internal_api.grpc_client_addr.clone(),
config.internal_api.grpc_client_connection_max_retries,
config.internal_api.time_after_node_erased,
);

info!("Starting External API");
Expand All @@ -32,6 +34,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
config.external_api.http_server_num_workers,
config.external_api.etcd_address,
config.internal_api.grpc_client_addr,
config.internal_api.grpc_client_connection_max_retries,
);

let join = tokio::join!(internal_api, external_api);
Expand Down

0 comments on commit b6b2484

Please sign in to comment.