Skip to content

Commit

Permalink
delete pod to stop a node instead of scaling sts
Browse files Browse the repository at this point in the history
  • Loading branch information
aluon committed Nov 6, 2024
1 parent 13a9587 commit cef1b6e
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 17 deletions.
45 changes: 43 additions & 2 deletions testsuite/forge/src/backend/k8s/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
backend::k8s::stateful_set, get_free_port, scale_stateful_set_replicas, FullNode,
HealthCheckError, Node, NodeExt, Result, Validator, Version, KUBECTL_BIN, LOCALHOST,
backend::k8s::stateful_set, create_k8s_client, get_free_port, scale_stateful_set_replicas,
FullNode, HealthCheckError, Node, NodeExt, Result, Validator, Version, KUBECTL_BIN, LOCALHOST,
NODE_METRIC_PORT, REST_API_HAPROXY_SERVICE_PORT, REST_API_SERVICE_PORT,
};
use anyhow::{anyhow, format_err};
Expand All @@ -14,6 +14,8 @@ use aptos_logger::info;
use aptos_rest_client::Client as RestClient;
use aptos_sdk::types::PeerId;
use aptos_state_sync_driver::metadata_storage::STATE_SYNC_DB_NAME;
use k8s_openapi::api::core::v1::Pod;
use kube::{api::DeleteParams, Api};
use reqwest::Url;
use serde_json::Value;
use std::{
Expand Down Expand Up @@ -110,6 +112,41 @@ impl K8sNode {
}
}

pub async fn stop_for_duration(&self, duration: Duration) -> Result<()> {
info!(
"Stopping node {} for {} seconds",
self.stateful_set_name(),
duration.as_secs()
);

let kube_client = create_k8s_client().await?;
let pod_api: Api<Pod> = Api::namespaced(kube_client.clone(), self.namespace());
let pod_name = format!("{}-0", self.stateful_set_name());

let deadline = Instant::now() + duration;

// Initial pod delete
pod_api.delete(&pod_name, &DeleteParams::default()).await?;

// Keep deleting the pod if it recovers before the deadline
while Instant::now() < deadline {
match self.wait_until_healthy(deadline).await {
Ok(_) => {
info!("Pod {} recovered, deleting again", pod_name);
pod_api.delete(&pod_name, &DeleteParams::default()).await?;
tokio::time::sleep(Duration::from_secs(1)).await;
},
Err(e) => {
info!("Pod {} still stopped: {:?}", pod_name, e);
break;
},
}
}

// Wait for the pod to recover
self.wait_until_healthy(deadline).await
}

pub fn port_forward_rest_api(&self) -> Result<()> {
let remote_rest_api_port = if self.haproxy_enabled {
REST_API_HAPROXY_SERVICE_PORT
Expand Down Expand Up @@ -273,6 +310,10 @@ impl Node for K8sNode {
fn service_name(&self) -> Option<String> {
Some(self.service_name.clone())
}

async fn stop_for_duration(&self, duration: Duration) -> Result<()> {
self.stop_for_duration(duration).await
}
}

impl Validator for K8sNode {}
Expand Down
5 changes: 5 additions & 0 deletions testsuite/forge/src/backend/local/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
path::PathBuf,
process::{Child, Command},
str::FromStr,
time::Duration,
};
use url::Url;

Expand Down Expand Up @@ -379,6 +380,10 @@ impl Node for LocalNode {
fn service_name(&self) -> Option<String> {
None
}

async fn stop_for_duration(&self, _duration: Duration) -> Result<()> {
todo!()
}
}

impl Validator for LocalNode {}
Expand Down
5 changes: 5 additions & 0 deletions testsuite/forge/src/interface/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ pub trait Node: Send + Sync {
fn expose_metric(&self) -> Result<u64>;

fn service_name(&self) -> Option<String>;

/// Stops the node by deleting its pod and ensures it stays down for the specified duration.
/// If the pod recovers early, it will be deleted again. After the duration completes,
/// waits for the pod to become healthy before returning.
async fn stop_for_duration(&self, duration: Duration) -> Result<()>;
}

/// Trait used to represent a running Validator
Expand Down
5 changes: 3 additions & 2 deletions testsuite/testcases/src/fullnode_reboot_stress_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ impl NetworkLoadTest for FullNodeRebootStressTest {
.full_node(*all_fullnodes.choose(&mut rng).unwrap())
.unwrap()
};
fullnode_to_reboot.stop().await?;
fullnode_to_reboot.start().await?;
fullnode_to_reboot
.stop_for_duration(Duration::from_secs(0))
.await?;
}
tokio::time::sleep(Duration::from_secs(WAIT_TIME_BETWEEN_REBOOTS_SECS)).await;
}
Expand Down
28 changes: 15 additions & 13 deletions testsuite/testcases/src/validator_reboot_stress_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,22 @@ impl NetworkLoadTest for ValidatorRebootStressTest {
.cloned()
.collect()
};
for adr in &addresses {
let swarm = swarm.read().await;
let validator_to_reboot = swarm.validator(*adr).unwrap();
validator_to_reboot.stop().await?;
}
if self.down_time_secs > 0.0 {
tokio::time::sleep(Duration::from_secs_f32(self.down_time_secs)).await;
}

for adr in &addresses {
let swarm = swarm.read().await;
let validator_to_reboot = swarm.validator(*adr).unwrap();
validator_to_reboot.start().await?;
}
let stop_pod_futures: Vec<_> = addresses
.iter()
.map(|adr| {
let swarm = Arc::clone(&swarm);
let down_time = self.down_time_secs;
async move {
let swarm = swarm.read().await;
let validator_to_reboot = swarm.validator(*adr).unwrap();
validator_to_reboot
.stop_for_duration(Duration::from_secs_f32(down_time))
.await
}
})
.collect();
futures::future::try_join_all(stop_pod_futures).await?;

if self.pause_secs > 0.0 {
tokio::time::sleep(Duration::from_secs_f32(self.pause_secs)).await;
Expand Down

0 comments on commit cef1b6e

Please sign in to comment.