diff --git a/src/main.rs b/src/main.rs index 7daea92..5c88e9a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -26,7 +26,7 @@ use ya_counters::service::{CountersService, CountersServiceBuilder}; use ya_counters::TimeCounter; use ya_gsb_http_proxy::gsb_to_http::GsbToHttpProxy; use ya_gsb_http_proxy::message::GsbHttpCallMessage; -use ya_service_bus::typed as gsb; +use ya_service_bus::typed::{self as gsb, Endpoint}; use ya_transfer::transfer::{DeployImage, Shutdown, TransferService, TransferServiceContext}; use crate::agreement::AgreementDesc; @@ -66,56 +66,34 @@ async fn activity_loop( let report_service = gsb::service(report_url); while let Some(()) = process.report() { - // make it a function match counters.send(GetCounters).await { Ok(resp) => match resp { Ok(current_usage) => { - let timestamp = Utc::now().timestamp(); - match report_service - .call(activity::local::SetUsage { - activity_id: activity_id.to_string(), - usage: ActivityUsage { - current_usage: Some(current_usage), - timestamp, - }, - timeout: None, - }) - .await - { - Ok(Ok(())) => log::debug!("Successfully sent activity usage message"), - Ok(Err(rpc_message_error)) => { - log::error!("rpcMessageError : {:?}", rpc_message_error) - } - Err(err) => log::error!("other error : {:?}", err), - } + set_usage_msg(&report_service, activity_id, current_usage).await } Err(err) => match err { CounterError::UsageLimitExceeded(info) => { - log::warn!("Usage limit exceeded: {}", info); - // TODO State::Terminated + set_terminate_state_msg( + &report_service, + activity_id, + Some(format!("Usage limit exceeded: {}", info)), + None, + ) + .await + } + error => { + log::error!("Unable to retrieve counters: {:?}", error); + anyhow::bail!("Runtime exited because of retrieving counters failure"); } - error => log::warn!("Unable to retrieve counters: {:?}", error), }, }, Err(e) => log::warn!("Unable to report activity usage: {:?}", e), } - // select! { _ = tokio::time::sleep(Duration::from_secs(1)) => {}, status = process.clone() => { - if let Err(err) = report_service.call(activity::local::SetState { - activity_id: activity_id.to_string(), - state: ActivityState { - state: StatePair::from(State::Terminated), - reason: Some("process exit".to_string()), - error_message: Some(format!("status: {:?}", status)), - }, - timeout: None, - credentials: None, - }).await { - log::error!("Failed to send state. Err {err}"); - } + set_terminate_state_msg(&report_service, activity_id, Some("process exit".to_string()), Some(format!("status: {:?}", status))).await; log::error!("process exit: {:?}", status); anyhow::bail!("Runtime exited"); } @@ -125,6 +103,50 @@ async fn activity_loop( Ok(()) } +async fn set_usage_msg(report_service: &Endpoint, activity_id: &str, current_usage: Vec) { + let timestamp = Utc::now().timestamp(); + match report_service + .call(activity::local::SetUsage { + activity_id: activity_id.into(), + usage: ActivityUsage { + current_usage: Some(current_usage), + timestamp, + }, + timeout: None, + }) + .await + { + Ok(Ok(())) => log::debug!("Successfully sent activity usage message"), + Ok(Err(rpc_message_error)) => { + log::error!("rpcMessageError : {:?}", rpc_message_error) + } + Err(err) => log::error!("other error : {:?}", err), + } +} + +async fn set_terminate_state_msg( + report_service: &Endpoint, + activity_id: &str, + reason: Option, + error_message: Option, +) { + if let Err(err) = report_service + .call(activity::local::SetState { + activity_id: activity_id.into(), + state: ActivityState { + state: StatePair::from(State::Terminated), + reason, + error_message, + }, + timeout: None, + credentials: None, + }) + .await + { + log::error!("Failed to send state. Err {err}"); + } +} + #[actix_rt::main] async fn main() -> anyhow::Result<()> { let panic_hook = std::panic::take_hook();