Skip to content

Commit

Permalink
feat(proto)!: grpc server and client support (#50)
Browse files Browse the repository at this point in the history
* feat(proto): grpc server and client support

* chore: make ServerRuntime public

* test(proto): test grpc server with tenderdash in docker

* chore(proto): fix imports

* chore: build issues

* chore: abci grpc-server feature

* chore: imports and features

* chore: grpc test

* chore: re-export tonic

* chore: fix missing serde serialization

* chore(proto): fix feature flags

* chore(proto): fix warn

* fix: wrong json serialization of ConsensusParams

* fix: wrong json serialization of ConsensusParams

* chore: fix build

* chore: simplify features

* chore: self review, simplify features

* chore: comments

* test(abci): grpc test uses latest tenderdash tag

* chore: bump version to 0.14.0-dev.7 for tenderdash v0.14.0-dev.3, abci 0.26.0

* chore: fix missing serde serialization

* fix: wrong json serialization of ConsensusParams

* build(deps): update chrono and replace deprecated fn calls

---------

Co-authored-by: Ivan Shumkov <[email protected]>
  • Loading branch information
lklimek and shumkov authored Mar 12, 2024
1 parent c5ece7b commit 672f8ba
Show file tree
Hide file tree
Showing 14 changed files with 285 additions and 32 deletions.
16 changes: 14 additions & 2 deletions abci/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
version = "0.14.0-dev.6"
version = "0.14.0-dev.7"
name = "tenderdash-abci"
edition = "2021"
license = "Apache-2.0"
Expand All @@ -11,7 +11,15 @@ description = """tenderdash-abci provides a simple framework with which to build
low-level applications on top of Tenderdash."""

[features]
default = ["server", "docker-tests", "crypto", "tcp", "unix", "tracing-span"]
default = [
"server",
"docker-tests",
"crypto",
"tcp",
"unix",
"grpc",
"tracing-span",
]
# docker-tests includes integration tests that require docker to be available
docker-tests = ["server"]
server = [
Expand All @@ -20,6 +28,8 @@ server = [
"dep:tokio-util",
"dep:futures",
]

grpc = ["tenderdash-proto/grpc"]
crypto = ["dep:lhash"]
tcp = ["server"]
unix = ["server"]
Expand Down Expand Up @@ -66,3 +76,5 @@ futures = { version = "0.3.26" }
tokio = { version = "1", features = ["macros", "signal", "time", "io-std"] }
hex = { version = "0.4" }
lazy_static = { version = "1.4" }
# For tests of gRPC server
tonic = { version = "0.11" }
2 changes: 1 addition & 1 deletion abci/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub use application::{check_version, Application, RequestDispatcher};
use prost::{DecodeError, EncodeError};
#[allow(deprecated)]
#[cfg(feature = "server")]
pub use server::{start_server, CancellationToken, Server, ServerBuilder};
pub use server::{start_server, CancellationToken, Server, ServerBuilder, ServerRuntime};
pub use tenderdash_proto as proto;

#[cfg(feature = "crypto")]
Expand Down
24 changes: 21 additions & 3 deletions abci/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ use std::{
str::FromStr,
};

use futures::Future;
#[cfg(feature = "tcp")]
use tokio::net::TcpListener;
#[cfg(feature = "unix")]
use tokio::net::UnixListener;
use tokio::runtime::{Handle, Runtime};
use tokio::{
runtime::{Handle, Runtime},
task::JoinHandle,
};
pub use tokio_util::sync::CancellationToken;

use self::generic::GenericServer;
Expand Down Expand Up @@ -208,10 +212,24 @@ impl<'a, App: RequestDispatcher + 'a> ServerBuilder<App> {
}

/// Server runtime that must be alive for the whole lifespan of the server
pub(crate) struct ServerRuntime {
pub struct ServerRuntime {
/// Runtime stored here to ensure it is never dropped
_runtime: Option<Runtime>,
handle: Handle,
pub handle: Handle,
}

impl ServerRuntime {
pub fn block_on<F: std::future::Future>(&self, future: F) -> F::Output {
self.handle.block_on(future)
}

pub fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
self.handle.spawn(future)
}
}

impl Default for ServerRuntime {
Expand Down
38 changes: 26 additions & 12 deletions abci/tests/common/docker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use bollard::{
Docker, API_DEFAULT_VERSION,
};
use futures::StreamExt;
use tokio::{io::AsyncWriteExt, runtime::Runtime, time::timeout};
use tenderdash_abci::ServerRuntime;
use tokio::{io::AsyncWriteExt, time::timeout};
use tracing::{debug, error, info};
use url::Url;

Expand All @@ -16,7 +17,7 @@ pub struct TenderdashDocker {
name: String,
docker: Docker,
image: String,
runtime: Runtime,
runtime: ServerRuntime,
}
impl TenderdashDocker {
/// new() creates and starts new Tenderdash docker container for provided
Expand All @@ -31,8 +32,8 @@ impl TenderdashDocker {
///
/// * `tag` - Docker tag to use; provide empty string to use default
/// * `app_address` - address of ABCI app server; for example,
/// `tcp://172.17.0.1:4567`, `tcp://[::ffff:ac11:1]:5678` or
/// `unix:///path/to/file`
/// `tcp://172.17.0.1:4567`, `tcp://[::ffff:ac11:1]:5678`,
/// `grpc://172.17.01:5678` or `unix:///path/to/file`
pub(crate) fn new(
container_name: &str,
tag: Option<&str>,
Expand All @@ -45,14 +46,14 @@ impl TenderdashDocker {
};

let app_address = url::Url::parse(app_address).expect("invalid app address");
if app_address.scheme() != "tcp" && app_address.scheme() != "unix" {
panic!("app_address must be either tcp:// or unix://");
if app_address.scheme() != "tcp"
&& app_address.scheme() != "unix"
&& app_address.scheme() != "grpc"
{
panic!("app_address must be either grpc://, tcp:// or unix://");
}

let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("cannot initialize tokio runtime");
let runtime = tenderdash_abci::ServerRuntime::default();

info!("Starting Tenderdash docker container");

Expand Down Expand Up @@ -152,12 +153,24 @@ impl TenderdashDocker {
None
};

let app_address = app_address.to_string().replace('/', "\\/");
let (abci, app_address) = match app_address.scheme() {
"grpc" => {
let address = app_address
.to_string()
.replace("grpc://", "")
.replace('/', "\\/");
("grpc", address)
},
_ => ("socket", app_address.to_string().replace('/', "\\/")),
};

debug!("Tenderdash will connect to ABCI address: {}", app_address);
let container_config = Config {
image: Some(self.image.clone()),
env: Some(vec![format!("PROXY_APP={}", app_address)]),
env: Some(vec![
format!("PROXY_APP={}", app_address),
format!("ABCI={}", abci),
]),
host_config: Some(HostConfig {
binds,
..Default::default()
Expand Down Expand Up @@ -263,6 +276,7 @@ impl Drop for TenderdashDocker {
}
}
}

/// Use custom panic handler to dump logs on panic
#[allow(dead_code)]
pub fn setup_td_logs_panic(td_docker: &Arc<TenderdashDocker>) {
Expand Down
148 changes: 148 additions & 0 deletions abci/tests/grpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
//! Test gRPC server for ABCI protocol.
//!
//! This test verifies that the gRPC server generated with tonic as part of the
//! tenderdash-proto crate can successfully connect to Tenderdash instance.
//!
//! This test should be implemented in the tenderdash-proto crate; however, it
//! is implemented here to use already existing docker container testing
//! logic.
#![cfg(feature = "grpc")]

use std::sync::Arc;

use tenderdash_abci::{
proto::abci::{
abci_application_server::AbciApplication, RequestEcho, RequestInfo, ResponseInfo,
},
CancellationToken,
};
mod common;
use tenderdash_abci::proto;
use tonic::{async_trait, Response, Status};

#[cfg(feature = "docker-tests")]
#[tokio::test]
/// Test server listening on ipv4 address.
///
/// See [tcp_server_test()].
async fn test_ipv4_server() {
// we assume the host uses default Docker network configuration, with the host
// using 172.17.0.1
let bind_address = "172.17.0.1:1234".to_string();

grpc_server_test("v4", bind_address.as_str()).await;
}

#[cfg(feature = "docker-tests")]
#[tokio::test]
/// Test server listening on ipv6 address.
///
/// See [tcp_server_test()].
async fn test_ipv6_server() {
// we assume the host uses default Docker network configuration, with the host
// using 172.17.0.1. This is IPv6 notation of the IPv4 address.
let bind_address = "[::ffff:ac11:1]:5678".to_string();

grpc_server_test("v6", bind_address.as_str()).await;
}

#[cfg(feature = "docker-tests")]
/// Feature: ABCI App TCO server
///
/// * Given that we have Tenderdash instance using TCP connection to communicate
/// with ABCI APP
/// * When we estabilish connection with Tenderdash
/// * Then Tenderdash sends Info request
async fn grpc_server_test(test_name: &str, bind_address: &str) {
use core::panic;

use proto::abci::abci_application_server::AbciApplicationServer;
use tonic::transport::Server;

tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::new("debug"))
.with_ansi(true)
.try_init()
.ok();

let cancel = CancellationToken::new();
let app = TestApp {
cancel: cancel.clone(),
};

let addr = bind_address.parse().expect("address must be valid");
let server_cancel = cancel.clone();
let server_handle = tokio::spawn(async move {
tracing::debug!("starting gRPC server");
Server::builder()
.add_service(AbciApplicationServer::new(app))
.serve_with_shutdown(addr, server_cancel.cancelled())
.await
.expect("server failed");
tracing::debug!("gRPC server stopped");
});

let socket_uri = format!("grpc://{}", bind_address);
let container_name = format!("tenderdash_{}", test_name);

let td = tokio::task::spawn_blocking(move || {
tracing::debug!("starting Tenderdash in Docker container");
let td = Arc::new(common::docker::TenderdashDocker::new(
&container_name,
None,
&socket_uri,
));
common::docker::setup_td_logs_panic(&td);
tracing::debug!("started Tenderdash in Docker container");

td
})
.await
.expect("start tenderdash");

tokio::select! {
_ = tokio::time::sleep(tokio::time::Duration::from_secs(60)) => {
panic!("Test timed out");
}
_ = cancel.cancelled() => {
tracing::debug!("CancellationToken cancelled");
}
ret = server_handle => {
ret.expect("gRPC server failed");
}
}

tokio::task::spawn_blocking(move || drop(td))
.await
.expect("tenderdash cleanup");

tracing::info!("Test finished successfully");
}

pub struct TestApp {
// when test succeeds, we cancel this token to finish it
cancel: CancellationToken,
}
#[async_trait]
impl AbciApplication for TestApp {
async fn echo(
&self,
request: tonic::Request<RequestEcho>,
) -> Result<tonic::Response<proto::abci::ResponseEcho>, Status> {
tracing::info!(?request, "Echo called");
Ok(Response::new(proto::abci::ResponseEcho {
message: request.into_inner().message,
}))
}
async fn info(
&self,
_request: tonic::Request<RequestInfo>,
) -> std::result::Result<tonic::Response<ResponseInfo>, tonic::Status> {
tracing::info!("Info called, test successful");
let resp = ResponseInfo {
..Default::default()
};
self.cancel.cancel();
Ok(Response::new(resp))
}
}
14 changes: 13 additions & 1 deletion proto-compiler/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
version = "0.14.0-dev.6"
version = "0.14.0-dev.7"
name = "tenderdash-proto-compiler"
authors = ["Informal Systems <[email protected]>", "Dash Core Group"]
edition = "2021"
Expand All @@ -17,3 +17,15 @@ regex = { "version" = "1.7.1" }
ureq = { "version" = "2.6.2" }
zip = { version = "0.6.4", default-features = false, features = ["deflate"] }
fs_extra = { version = "1.3.0" }
tonic-build = { version = "0.11.0", optional = true }


[features]
default = []
# Enable gRPC support; needed by server and client features.
# Conflicts with no_std
grpc = ["dep:tonic-build"]
# Build the gRPC server. Requires tenderdash-proto/std feature.
server = ["grpc"]
# Build the gRPC client. Requires tenderdash-proto/std feature.
client = ["grpc"]
8 changes: 8 additions & 0 deletions proto-compiler/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ pub fn proto_compile() {
let tenderdash_ver = tenderdash_version(tenderdash_dir);

println!("[info] => Creating structs.");

#[cfg(feature = "grpc")]
tonic_build::configure()
.generate_default_stubs(true)
.compile_with_config(pb, &protos, &proto_includes_paths)
.unwrap();

#[cfg(not(feature = "grpc"))]
pb.compile_protos(&protos, &proto_includes_paths).unwrap();

println!("[info] => Removing old structs and copying new structs.");
Expand Down
Loading

0 comments on commit 672f8ba

Please sign in to comment.