From b6b2484bb2384782e0ea830eb30a55c3bd29cfa2 Mon Sep 17 00:00:00 2001 From: Alexandre Gomez Date: Fri, 7 Oct 2022 16:51:17 +0200 Subject: [PATCH] fix: (controller) restart after failing Added cooldown restart when failing on some tasks. Added time to erase nodes. Signed-off-by: Alexandre Gomez --- .../src/external_api/instance/controller.rs | 76 ++++++++++++------- .../lib/src/external_api/instance/service.rs | 20 +++-- controller/lib/src/external_api/interface.rs | 3 + controller/lib/src/grpc_client/interface.rs | 32 ++++++-- controller/lib/src/internal_api/interface.rs | 15 +++- .../lib/src/internal_api/node/controller.rs | 16 +++- .../lib/src/internal_api/node/service.rs | 15 ++-- controller/src/config.rs | 6 ++ controller/src/main.rs | 3 + 9 files changed, 134 insertions(+), 52 deletions(-) diff --git a/controller/lib/src/external_api/instance/controller.rs b/controller/lib/src/external_api/instance/controller.rs index b4c12305..8ebc9ec9 100644 --- a/controller/lib/src/external_api/instance/controller.rs +++ b/controller/lib/src/external_api/instance/controller.rs @@ -89,13 +89,18 @@ impl InstanceController { body: web::Json, data: web::Data, ) -> impl Responder { - let instance_service = - match InstanceService::new(&data.grpc_address, &data.etcd_address).await { - Ok(service) => service, - Err(err) => { - return InstanceControllerError::InstanceServiceError(err).into(); - } - }; + let instance_service = match InstanceService::new( + &data.grpc_address, + &data.etcd_address, + data.grpc_client_connection_max_retries, + ) + .await + { + Ok(service) => service, + Err(err) => { + return InstanceControllerError::InstanceServiceError(err).into(); + } + }; let mut workload_service = match WorkloadService::new(&data.etcd_address).await { Ok(service) => service, @@ -142,13 +147,18 @@ impl InstanceController { params: web::Path<(String, String)>, data: web::Data, ) -> impl Responder { - let mut instance_service = - match InstanceService::new(&data.grpc_address, &data.etcd_address).await { - Ok(service) => service, - Err(err) => { - return InstanceControllerError::InstanceServiceError(err).into(); - } - }; + let mut instance_service = match InstanceService::new( + &data.grpc_address, + &data.etcd_address, + data.grpc_client_connection_max_retries, + ) + .await + { + Ok(service) => service, + Err(err) => { + return InstanceControllerError::InstanceServiceError(err).into(); + } + }; let (namespace, name) = params.into_inner(); match instance_service @@ -183,13 +193,18 @@ impl InstanceController { params: web::Path<(String, String)>, data: web::Data, ) -> impl Responder { - let mut instance_service = - match InstanceService::new(&data.grpc_address, &data.etcd_address).await { - Ok(service) => service, - Err(err) => { - return InstanceControllerError::InstanceServiceError(err).into(); - } - }; + let mut instance_service = match InstanceService::new( + &data.grpc_address, + &data.etcd_address, + data.grpc_client_connection_max_retries, + ) + .await + { + Ok(service) => service, + Err(err) => { + return InstanceControllerError::InstanceServiceError(err).into(); + } + }; let (namespace, name) = params.into_inner(); match instance_service @@ -216,13 +231,18 @@ impl InstanceController { pagination: Option>, data: web::Data, ) -> impl Responder { - let mut instance_service = - match InstanceService::new(&data.grpc_address, &data.etcd_address).await { - Ok(service) => service, - Err(err) => { - return InstanceControllerError::InstanceServiceError(err).into(); - } - }; + let mut instance_service = match InstanceService::new( + &data.grpc_address, + &data.etcd_address, + data.grpc_client_connection_max_retries, + ) + .await + { + Ok(service) => service, + Err(err) => { + return InstanceControllerError::InstanceServiceError(err).into(); + } + }; let instances = match pagination { Some(pagination) => { diff --git a/controller/lib/src/external_api/instance/service.rs b/controller/lib/src/external_api/instance/service.rs index cf746bcb..cf905568 100644 --- a/controller/lib/src/external_api/instance/service.rs +++ b/controller/lib/src/external_api/instance/service.rs @@ -1,5 +1,6 @@ use std::net::{Ipv4Addr, SocketAddr}; use std::sync::Arc; +use std::time::Duration; use super::model::Instance; use crate::etcd::{EtcdClient, EtcdClientError}; @@ -10,7 +11,7 @@ use log::{debug, trace}; use proto::controller::InstanceState; use serde_json; use thiserror::Error; -use tokio::sync::Mutex; +use tokio::{sync::Mutex, time}; use tonic::{Request, Status}; #[derive(Debug, Error)] @@ -46,11 +47,15 @@ impl InstanceService { pub async fn new( grpc_address: &str, etcd_address: &SocketAddr, + grpc_client_connection_max_retries: u32, ) -> Result { Ok(InstanceService { - grpc_service: SchedulerClientInterface::new(grpc_address.to_string()) - .await - .map_err(InstanceServiceError::SchedulerClientInterfaceError)?, + grpc_service: SchedulerClientInterface::new( + grpc_address.to_string(), + grpc_client_connection_max_retries, + ) + .await + .map_err(InstanceServiceError::SchedulerClientInterfaceError)?, etcd_service: EtcdClient::new(etcd_address.to_string()) .await .map_err(InstanceServiceError::EtcdError)?, @@ -149,6 +154,8 @@ impl InstanceService { pub fn schedule_instance(this: Arc>, mut instance: Instance) { //Spawn a thread to start the instance tokio::spawn(async move { + let mut cooldown = 1; + loop { let mut stream = this .clone() @@ -196,8 +203,9 @@ impl InstanceService { } instance.num_restarts += 1; - - debug!("Restarting instance {}", instance.id); + debug!("Restarting instance {} in {}s", instance.id, cooldown); + time::sleep(Duration::from_secs(cooldown)).await; + cooldown *= 2; this.clone() .lock() diff --git a/controller/lib/src/external_api/interface.rs b/controller/lib/src/external_api/interface.rs index 40e06ece..3c16e287 100644 --- a/controller/lib/src/external_api/interface.rs +++ b/controller/lib/src/external_api/interface.rs @@ -24,6 +24,7 @@ pub struct ExternalAPIInterface {} pub struct ActixAppState { pub etcd_address: SocketAddr, pub grpc_address: String, + pub grpc_client_connection_max_retries: u32, } impl ExternalAPIInterface { @@ -32,6 +33,7 @@ impl ExternalAPIInterface { num_workers: usize, etcd_address: SocketAddr, grpc_address: String, + grpc_client_connection_max_retries: u32, ) -> Result { let mut etcd_client = EtcdClient::new(etcd_address.to_string()) .await @@ -65,6 +67,7 @@ impl ExternalAPIInterface { .app_data(web::Data::new(ActixAppState { etcd_address, grpc_address: grpc_address.clone(), + grpc_client_connection_max_retries, })) .route("/health", web::get().to(HttpResponse::Ok)) .service(workload::controller::WorkloadController {}.services()) diff --git a/controller/lib/src/grpc_client/interface.rs b/controller/lib/src/grpc_client/interface.rs index c253a0a0..ef79d750 100644 --- a/controller/lib/src/grpc_client/interface.rs +++ b/controller/lib/src/grpc_client/interface.rs @@ -1,7 +1,10 @@ -use log::{debug, trace}; +use std::time::Duration; + +use log::{debug, error, trace}; use proto::scheduler::instance_service_client::InstanceServiceClient; use proto::scheduler::{Instance, InstanceIdentifier, InstanceStatus}; use thiserror::Error; +use tokio::time; use tonic::transport::{Channel, Error}; use tonic::{Request, Response, Status, Streaming}; @@ -20,17 +23,36 @@ pub struct SchedulerClientInterface { impl SchedulerClientInterface { pub async fn new( instance_client_address: String, + max_retries: u32, ) -> Result { debug!( "Starting gRPC client for scheduler Instance Service on {}", instance_client_address, ); - let instance_client = InstanceServiceClient::connect(instance_client_address) - .await - .map_err(SchedulerClientInterfaceError::ConnectionError)?; + let mut retries: u32 = 0; + let mut cooldown = Duration::from_secs(1); - Ok(Self { instance_client }) + loop { + match InstanceServiceClient::connect(instance_client_address.clone()).await { + Ok(instance_client) => { + debug!("Connected to scheduler Instance Service"); + return Ok(SchedulerClientInterface { instance_client }); + } + Err(e) => { + if retries >= max_retries { + return Err(SchedulerClientInterfaceError::ConnectionError(e)); + } + retries += 1; + error!( + "Failed to connect to scheduler Instance Service, retrying in {} seconds", + cooldown.as_secs() + ); + time::sleep(cooldown).await; + cooldown *= 2; + } + } + } } pub async fn create_instance( diff --git a/controller/lib/src/internal_api/interface.rs b/controller/lib/src/internal_api/interface.rs index 85740128..565c523e 100644 --- a/controller/lib/src/internal_api/interface.rs +++ b/controller/lib/src/internal_api/interface.rs @@ -25,16 +25,23 @@ impl InternalAPIInterface { address: SocketAddr, etcd_address: SocketAddr, grpc_address: String, + grpc_client_connection_max_retries: u32, + time_after_node_erased: u64, ) -> Result { info!("Starting gRPC server listening on {}", address); tokio::spawn(async move { Server::builder() .add_service(NodeServiceServer::new( - NodeController::new(&etcd_address, &grpc_address) - .await - .map_err(InternalAPIInterfaceError::NodeControllerError) - .unwrap(), + NodeController::new( + &etcd_address, + &grpc_address, + grpc_client_connection_max_retries, + time_after_node_erased, + ) + .await + .map_err(InternalAPIInterfaceError::NodeControllerError) + .unwrap(), )) .serve(address) .await diff --git a/controller/lib/src/internal_api/node/controller.rs b/controller/lib/src/internal_api/node/controller.rs index dabd6455..3a84d560 100644 --- a/controller/lib/src/internal_api/node/controller.rs +++ b/controller/lib/src/internal_api/node/controller.rs @@ -20,19 +20,27 @@ pub enum NodeControllerError { /// * `node_service`: An instance of the NodeService that will implement the logic. pub struct NodeController { node_service: Arc>, + time_after_node_erased: u64, } impl NodeController { pub async fn new( etcd_address: &SocketAddr, grpc_address: &str, + grpc_client_connection_max_retries: u32, + time_after_node_erased: u64, ) -> Result { Ok(NodeController { node_service: Arc::new(Mutex::new( - NodeService::new(etcd_address, grpc_address) - .await - .map_err(NodeControllerError::NodeServiceError)?, + NodeService::new( + etcd_address, + grpc_address, + grpc_client_connection_max_retries, + ) + .await + .map_err(NodeControllerError::NodeServiceError)?, )), + time_after_node_erased, }) } } @@ -61,7 +69,7 @@ impl proto::controller::node_service_server::NodeService for NodeController { .clone() .lock() .await - .update_node_status(stream) + .update_node_status(stream, self.time_after_node_erased) .await .map_err(|err| { error!("Error updating node status: {}", err); diff --git a/controller/lib/src/internal_api/node/service.rs b/controller/lib/src/internal_api/node/service.rs index 5a12edfc..ccdae444 100644 --- a/controller/lib/src/internal_api/node/service.rs +++ b/controller/lib/src/internal_api/node/service.rs @@ -36,15 +36,20 @@ impl NodeService { pub async fn new( etcd_address: &SocketAddr, grpc_address: &str, + grpc_client_connection_max_retries: u32, ) -> Result { Ok(NodeService { etcd_interface: EtcdClient::new(etcd_address.to_string()) .await .map_err(NodeServiceError::EtcdError)?, instance_service: Arc::new(Mutex::new( - InstanceService::new(grpc_address, etcd_address) - .await - .map_err(NodeServiceError::InstanceServiceError)?, + InstanceService::new( + grpc_address, + etcd_address, + grpc_client_connection_max_retries, + ) + .await + .map_err(NodeServiceError::InstanceServiceError)?, )), }) } @@ -63,6 +68,7 @@ impl NodeService { pub async fn update_node_status( &mut self, mut stream: Streaming, + time_after_node_erased: u64, ) -> Result { let mut last_instances: Vec = vec![]; let mut node_id = String::new(); @@ -107,8 +113,7 @@ impl NodeService { InstanceService::schedule_instance(self.instance_service.clone(), instance) } - //Delete Node after 5 min - time::sleep(Duration::from_secs(300)).await; + time::sleep(Duration::from_secs(time_after_node_erased)).await; self.etcd_interface.delete(&node_id).await; Ok(node_id) diff --git a/controller/src/config.rs b/controller/src/config.rs index 84549460..fcefc296 100644 --- a/controller/src/config.rs +++ b/controller/src/config.rs @@ -11,6 +11,8 @@ pub struct KudoControllerConfig { pub struct InternalAPIConfig { pub grpc_server_addr: SocketAddr, pub grpc_client_addr: String, + pub grpc_client_connection_max_retries: u32, + pub time_after_node_erased: u64, } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -18,6 +20,7 @@ pub struct ExternalAPIConfig { pub http_server_addr: SocketAddr, pub http_server_num_workers: usize, pub etcd_address: SocketAddr, + pub grpc_client_connection_max_retries: u32, } impl Default for KudoControllerConfig { @@ -29,6 +32,8 @@ impl Default for KudoControllerConfig { 50051, ), grpc_client_addr: "http://127.0.0.1:50052".to_string(), + grpc_client_connection_max_retries: 32, + time_after_node_erased: 350, }, external_api: ExternalAPIConfig { http_server_addr: SocketAddr::new( @@ -40,6 +45,7 @@ impl Default for KudoControllerConfig { std::net::IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 2379, ), + grpc_client_connection_max_retries: 32, }, } } diff --git a/controller/src/main.rs b/controller/src/main.rs index b784db9a..83703829 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -22,6 +22,8 @@ async fn main() -> Result<(), Box> { config.internal_api.grpc_server_addr, config.external_api.etcd_address, config.internal_api.grpc_client_addr.clone(), + config.internal_api.grpc_client_connection_max_retries, + config.internal_api.time_after_node_erased, ); info!("Starting External API"); @@ -32,6 +34,7 @@ async fn main() -> Result<(), Box> { config.external_api.http_server_num_workers, config.external_api.etcd_address, config.internal_api.grpc_client_addr, + config.internal_api.grpc_client_connection_max_retries, ); let join = tokio::join!(internal_api, external_api);