Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cancel/kill all invocations to a service or service instance #1529

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions cli/src/clients/datafusion_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,40 @@ impl Display for JournalEntryType {
}
}

#[derive(Debug, Clone)]
pub struct SimpleInvocation {
pub id: String,
pub target: String,
pub status: InvocationState,
}

#[derive(Debug, Clone, PartialEq, ArrowField, ArrowDeserialize)]
struct SimpleInvocationRowResult {
id: Option<String>,
target: Option<String>,
status: String,
}

pub async fn find_active_invocations_simple(
client: &DataFusionHttpClient,
filter: &str,
) -> Result<Vec<SimpleInvocation>> {
let query = format!(
"SELECT id, target, status FROM sys_invocation WHERE {}",
filter
);
let rows = client
.run_query_and_map_results::<SimpleInvocationRowResult>(query)
.await?
.map(|row| SimpleInvocation {
id: row.id.expect("id"),
target: row.target.expect("target"),
status: row.status.parse().expect("Unexpected status"),
})
.collect();
Ok(rows)
}

#[derive(Debug, Clone)]
pub struct InvocationDetailed {
pub invocation: Invocation,
Expand Down
82 changes: 82 additions & 0 deletions cli/src/commands/services/cancel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use crate::cli_env::CliEnv;
use crate::clients::datafusion_helpers::find_active_invocations_simple;
use crate::clients::{self, MetaClientInterface};
use crate::ui::console::{confirm_or_exit, Styled};
use crate::ui::invocations::render_simple_invocation_list;
use crate::ui::stylesheet::Style;
use crate::{c_println, c_success};

use anyhow::{bail, Result};
use cling::prelude::*;

#[derive(Run, Parser, Collect, Clone)]
#[cling(run = "run_cancel")]
#[clap(visible_alias = "rm")]
pub struct Cancel {
/// A target string exact match or prefix, e.g.:
/// * `serviceName`
/// * `serviceName/handler`
/// * `virtualObjectName`
/// * `virtualObjectName/key`
/// * `virtualObjectName/key/handler`
/// * `workflowName`
/// * `workflowName/key`
/// * `workflowName/key/handler`
query: String,
/// Ungracefully kill the invocation and its children
#[clap(long)]
kill: bool,
}

pub async fn run_cancel(State(env): State<CliEnv>, opts: &Cancel) -> Result<()> {
let client = crate::clients::MetasClient::new(&env)?;
let sql_client = clients::DataFusionHttpClient::new(&env)?;

let q = opts.query.trim();
let filter =match q.find('/').unwrap_or_default() {
0 => format!("target LIKE '{}/%'", q),
// If there's one slash, let's add the wildcard depending on the service type,
// so we discriminate correctly with serviceName/handlerName with workflowName/workflowKey
1 => format!("(target = '{}' AND target_service_ty = 'service') OR (target LIKE '{}/%' AND target_service_ty != 'service'))", q, q),
// Can only be exact match here
_ => format!("target LIKE '{}'", q),
};

let invocations = find_active_invocations_simple(&sql_client, &filter).await?;
if invocations.is_empty() {
bail!("No invocations found for query {}!", opts.query);
};

render_simple_invocation_list(&env, &invocations);

// Get the invocation and confirm
let prompt = format!(
"Are you sure you want to {} these invocations?",
if opts.kill {
Styled(Style::Danger, "kill")
} else {
Styled(Style::Warn, "cancel")
},
);
confirm_or_exit(&env, &prompt)?;

for inv in invocations {
let result = client.cancel_invocation(&inv.id, opts.kill).await?;
let _ = result.success_or_error()?;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A) Should we put an upper bound on how many we display on the screen? (maybe a few examples and a message that says how many more?)
B) Do you think it'd cause problems to have this be an unbounded loop of cancellations (in situations with potentially large number of invocations)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put an upper bound on how many we display on the screen?

We could introduce an upper bound, but then this means either that we don't show all the invocations we kill, or we need to shave off some invocations, meaning that during dev the user might have to run many times the kill command to kill all the invocations...

Do you think it'd cause problems to have this be an unbounded loop of cancellations (in situations with potentially large number of invocations)?

Only way to find out is testing it 😄 In principle here we just append commands to the log, so many commands probably will slow down the system a bit while all of those are processed.


c_println!();
c_success!("Request was sent successfully");

Ok(())
}
3 changes: 3 additions & 0 deletions cli/src/commands/services/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

mod cancel;
mod describe;
mod list;
mod status;
Expand All @@ -23,4 +24,6 @@ pub enum Services {
Describe(describe::Describe),
/// Prints activity information about a given service (and method)
Status(status::Status),
/// Cancel all the invocations to the given service/service instance
Cancel(cancel::Cancel),
}
19 changes: 17 additions & 2 deletions cli/src/ui/invocations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
// by the Apache License, Version 2.0.

use chrono_humanize::Tense;
use comfy_table::Table;
use comfy_table::{Attribute, Cell, Table};
use dialoguer::console::style;
use dialoguer::console::Style as DStyle;
use dialoguer::console::StyledObject;

use crate::cli_env::CliEnv;
use crate::clients::datafusion_helpers::JournalEntry;
use crate::clients::datafusion_helpers::JournalEntryType;
use crate::clients::datafusion_helpers::{Invocation, InvocationState};
use crate::clients::datafusion_helpers::{JournalEntry, SimpleInvocation};
use crate::ui::console::Icon;
use crate::ui::console::StyledTable;
use crate::{c_indent_table, c_indentln, c_println};
Expand Down Expand Up @@ -211,6 +211,21 @@ pub fn add_invocation_to_kv_table(table: &mut Table, invocation: &Invocation) {
}
}

pub fn render_simple_invocation_list(env: &CliEnv, invocations: &[SimpleInvocation]) {
let mut invocations_table = Table::new_styled(&env.ui_config);
invocations_table.set_styled_header(vec!["ID", "TARGET", "STATUS"]);

for inv in invocations {
invocations_table.add_row(vec![
Cell::new(&inv.id).add_attribute(Attribute::Bold),
Cell::new(&inv.target),
Cell::new(invocation_status(inv.status)),
]);
}
c_indent_table!(0, invocations_table);
c_println!();
}

// [2023-12-14 15:38:52.500 +00:00] rIEqK14GCdkAYxo-wzTfrK2e6tJssIrtQ CheckoutProcess::checkout
// Status: backing-off (Retried 67 time(s). Next retry in in 9 seconds and 616 ms))
// Deployment: bG9jYWxob3N0OjkwODEv
Expand Down
Loading