Skip to content

Commit

Permalink
Set state and set usage methods
Browse files Browse the repository at this point in the history
  • Loading branch information
pwalski committed Apr 1, 2024
1 parent 2131fe2 commit f119a8f
Showing 1 changed file with 58 additions and 36 deletions.
94 changes: 58 additions & 36 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use ya_counters::service::{CountersService, CountersServiceBuilder};
use ya_counters::TimeCounter;
use ya_gsb_http_proxy::gsb_to_http::GsbToHttpProxy;
use ya_gsb_http_proxy::message::GsbHttpCallMessage;
use ya_service_bus::typed as gsb;
use ya_service_bus::typed::{self as gsb, Endpoint};
use ya_transfer::transfer::{DeployImage, Shutdown, TransferService, TransferServiceContext};

use crate::agreement::AgreementDesc;
Expand Down Expand Up @@ -66,56 +66,34 @@ async fn activity_loop<T: process::Runtime + Clone + Unpin + 'static>(
let report_service = gsb::service(report_url);

while let Some(()) = process.report() {
// make it a function
match counters.send(GetCounters).await {
Ok(resp) => match resp {
Ok(current_usage) => {
let timestamp = Utc::now().timestamp();
match report_service
.call(activity::local::SetUsage {
activity_id: activity_id.to_string(),
usage: ActivityUsage {
current_usage: Some(current_usage),
timestamp,
},
timeout: None,
})
.await
{
Ok(Ok(())) => log::debug!("Successfully sent activity usage message"),
Ok(Err(rpc_message_error)) => {
log::error!("rpcMessageError : {:?}", rpc_message_error)
}
Err(err) => log::error!("other error : {:?}", err),
}
set_usage_msg(&report_service, activity_id, current_usage).await
}
Err(err) => match err {
CounterError::UsageLimitExceeded(info) => {
log::warn!("Usage limit exceeded: {}", info);
// TODO State::Terminated
set_terminate_state_msg(
&report_service,
activity_id,
Some(format!("Usage limit exceeded: {}", info)),
None,
)
.await
}
error => {
log::error!("Unable to retrieve counters: {:?}", error);
anyhow::bail!("Runtime exited because of retrieving counters failure");
}
error => log::warn!("Unable to retrieve counters: {:?}", error),
},
},
Err(e) => log::warn!("Unable to report activity usage: {:?}", e),
}
//

select! {
_ = tokio::time::sleep(Duration::from_secs(1)) => {},
status = process.clone() => {
if let Err(err) = report_service.call(activity::local::SetState {
activity_id: activity_id.to_string(),
state: ActivityState {
state: StatePair::from(State::Terminated),
reason: Some("process exit".to_string()),
error_message: Some(format!("status: {:?}", status)),
},
timeout: None,
credentials: None,
}).await {
log::error!("Failed to send state. Err {err}");
}
set_terminate_state_msg(&report_service, activity_id, Some("process exit".to_string()), Some(format!("status: {:?}", status))).await;
log::error!("process exit: {:?}", status);
anyhow::bail!("Runtime exited");
}
Expand All @@ -125,6 +103,50 @@ async fn activity_loop<T: process::Runtime + Clone + Unpin + 'static>(
Ok(())
}

async fn set_usage_msg(report_service: &Endpoint, activity_id: &str, current_usage: Vec<f64>) {
let timestamp = Utc::now().timestamp();
match report_service
.call(activity::local::SetUsage {
activity_id: activity_id.into(),
usage: ActivityUsage {
current_usage: Some(current_usage),
timestamp,
},
timeout: None,
})
.await
{
Ok(Ok(())) => log::debug!("Successfully sent activity usage message"),
Ok(Err(rpc_message_error)) => {
log::error!("rpcMessageError : {:?}", rpc_message_error)
}
Err(err) => log::error!("other error : {:?}", err),
}
}

async fn set_terminate_state_msg(
report_service: &Endpoint,
activity_id: &str,
reason: Option<String>,
error_message: Option<String>,
) {
if let Err(err) = report_service
.call(activity::local::SetState {
activity_id: activity_id.into(),
state: ActivityState {
state: StatePair::from(State::Terminated),
reason,
error_message,
},
timeout: None,
credentials: None,
})
.await
{
log::error!("Failed to send state. Err {err}");
}
}

#[actix_rt::main]
async fn main() -> anyhow::Result<()> {
let panic_hook = std::panic::take_hook();
Expand Down

0 comments on commit f119a8f

Please sign in to comment.