Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(abci): fix tokio spawning in grpc test #81

Merged
merged 2 commits into from
Jul 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions abci/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,4 @@ hex = { version = "0.4.3" }
lazy_static = { version = "1.4.0" }
# For tests of gRPC server
tonic = { version = "0.11.0" }
pollster = { version = "0.3.0" }
32 changes: 21 additions & 11 deletions abci/tests/common/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl TenderdashDocker {
tokio::signal::ctrl_c().await.unwrap();
error!("Received ctrl+c, removing Tenderdash container");

let stopped = timeout(Duration::from_secs(15), Self::stop(id, &docker))
let stopped = timeout(Duration::from_secs(15), Self::stop(id, docker))
.await
.expect("timeout removing tenderdash container");
if stopped.is_err() {
Expand Down Expand Up @@ -210,24 +210,28 @@ impl TenderdashDocker {
/// Print 200 most recent logs from Tenderdash on standard error.
#[allow(dead_code)]
pub fn print_logs(&self) {
let id = &self.id;
let id = self.id.clone();

if !id.is_empty() {
debug!("Printing Tenderdash logs");
let rt = &self.runtime;
let docker = &self.docker;
let docker = self.docker.clone();

rt.block_on(Self::emit_logs(id, docker))
// We use `pollster` to block on the future, as tokio might not block_on()
// when called inside a running runtime.
let fut = rt.spawn(Self::emit_logs(id, docker));
pollster::block_on(fut)
.expect("cannot spawn log emitting fn")
.expect("cannot emit logs");
}
}

async fn emit_logs(id: &str, docker: &Docker) -> Result<(), anyhow::Error> {
async fn emit_logs(id: String, docker: Docker) -> Result<(), anyhow::Error> {
let stderror = tokio::io::stderr();
let mut dest = tokio::io::BufWriter::new(stderror);

let mut logs = docker.logs(
id,
&id,
Some(bollard::container::LogsOptions {
follow: false,
stdout: true,
Expand All @@ -251,7 +255,7 @@ impl TenderdashDocker {
Ok(())
}

async fn stop(id: String, docker: &Docker) -> Result<(), anyhow::Error> {
async fn stop(id: String, docker: Docker) -> Result<(), anyhow::Error> {
debug!("Stopping Tenderdash container");
docker
.remove_container(
Expand All @@ -270,9 +274,13 @@ impl TenderdashDocker {
impl Drop for TenderdashDocker {
fn drop(&mut self) {
if !self.id.is_empty() {
let _ = self
.runtime
.block_on(Self::stop(self.id.clone(), &self.docker));
let id = self.id.clone();
let docker = self.docker.clone();

let fut = self.runtime.spawn(Self::stop(id, docker));
pollster::block_on(fut)
.expect("cannot stop container")
.expect("cannot stop container");
}
}
}
Expand All @@ -281,9 +289,11 @@ impl Drop for TenderdashDocker {
#[allow(dead_code)]
pub fn setup_td_logs_panic(td_docker: &Arc<TenderdashDocker>) {
let weak_ref = Arc::downgrade(td_docker);
std::panic::set_hook(Box::new(move |_| {
let original = std::panic::take_hook();
std::panic::set_hook(Box::new(move |info| {
if let Some(td) = weak_ref.upgrade() {
td.print_logs()
}
original(info);
}));
}
13 changes: 5 additions & 8 deletions abci/tests/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tenderdash_abci::proto;
use tonic::{async_trait, Response, Status};

#[cfg(feature = "docker-tests")]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
/// Test server listening on ipv4 address.
///
/// See [tcp_server_test()].
Expand All @@ -34,7 +34,7 @@ async fn test_ipv4_server() {
}

#[cfg(feature = "docker-tests")]
#[tokio::test]
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
#[ignore = "IPv6 does not work for gRPC, most likely bug on Tenderdash side"]
/// Test server listening on ipv6 address.
///
Expand Down Expand Up @@ -74,7 +74,7 @@ async fn grpc_server_test(test_name: &str, bind_address: &str) {
let addr = bind_address.parse().expect("address must be valid");
let server_cancel = cancel.clone();
let server_handle = tokio::spawn(async move {
tracing::debug!("starting gRPC server");
tracing::debug!(?addr, "starting gRPC server");
Server::builder()
.add_service(AbciApplicationServer::new(app))
.serve_with_shutdown(addr, server_cancel.cancelled())
Expand All @@ -87,7 +87,7 @@ async fn grpc_server_test(test_name: &str, bind_address: &str) {
let container_name = format!("tenderdash_{}", test_name);

let td = tokio::task::spawn_blocking(move || {
tracing::debug!("starting Tenderdash in Docker container");
tracing::debug!(addr=?socket_uri, "starting Tenderdash in Docker container");
let td = Arc::new(common::docker::TenderdashDocker::new(
&container_name,
None,
Expand All @@ -113,10 +113,7 @@ async fn grpc_server_test(test_name: &str, bind_address: &str) {
}
}

tokio::task::spawn_blocking(move || drop(td))
.await
.expect("tenderdash cleanup");

drop(td);
tracing::info!("Test finished successfully");
}

Expand Down
Loading