Skip to content

Commit

Permalink
feat(jstzd): serve config in jstzd server
Browse files Browse the repository at this point in the history
  • Loading branch information
huancheng-trili committed Nov 19, 2024
1 parent f15e8c6 commit 3eaef9d
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 31 deletions.
58 changes: 52 additions & 6 deletions crates/jstzd/src/task/jstzd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use anyhow::Result;
use async_dropper_simple::AsyncDrop;
use async_trait::async_trait;
use axum::{
extract::State,
extract::{Path, State},
response::IntoResponse,
routing::{get, put},
Router,
};
Expand All @@ -13,20 +14,24 @@ use octez::r#async::{
node_config::OctezNodeConfig,
protocol::ProtocolParameter,
};
use serde::Serialize;
use std::sync::Arc;
use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle};

#[derive(Clone)]
struct Jstzd {
octez_node: Arc<RwLock<OctezNode>>,
baker: Arc<RwLock<OctezBaker>>,
}

#[derive(Clone)]
#[derive(Clone, Serialize)]
pub struct JstzdConfig {
#[serde(rename(serialize = "octez-node"))]
octez_node_config: OctezNodeConfig,
#[serde(rename(serialize = "octez-baker"))]
baker_config: OctezBakerConfig,
#[serde(rename(serialize = "octez-client"))]
octez_client_config: OctezClientConfig,
#[serde(skip_serializing)]
protocol_params: ProtocolParameter,
}

Expand All @@ -44,6 +49,18 @@ impl JstzdConfig {
protocol_params,
}
}

pub fn octez_node_config(&self) -> &OctezNodeConfig {
&self.octez_node_config
}

pub fn octez_client_config(&self) -> &OctezClientConfig {
&self.octez_client_config
}

pub fn baker_config(&self) -> &OctezBakerConfig {
&self.baker_config
}
}

#[async_trait]
Expand Down Expand Up @@ -150,13 +167,15 @@ impl Jstzd {
}
}

#[derive(Clone)]
pub struct JstzdServer {
jstzd_config: JstzdConfig,
jstzd_server_port: u16,
state: Arc<RwLock<ServerState>>,
}

struct ServerState {
jstzd_config: JstzdConfig,
jstzd_config_json: serde_json::Map<String, serde_json::Value>,
jstzd: Option<Jstzd>,
server_handle: Option<JoinHandle<()>>,
}
Expand All @@ -172,9 +191,14 @@ impl AsyncDrop for JstzdServer {
impl JstzdServer {
pub fn new(config: JstzdConfig, port: u16) -> Self {
Self {
jstzd_config: config,
jstzd_server_port: port,
state: Arc::new(RwLock::new(ServerState {
jstzd_config_json: serde_json::to_value(&config)
.unwrap()
.as_object()
.unwrap()
.to_owned(),
jstzd_config: config,
jstzd: None,
server_handle: None,
})),
Expand All @@ -192,12 +216,14 @@ impl JstzdServer {
}

pub async fn run(&mut self) -> Result<()> {
let jstzd = Jstzd::spawn(self.jstzd_config.clone()).await?;
let jstzd = Jstzd::spawn(self.state.read().await.jstzd_config.clone()).await?;
self.state.write().await.jstzd.replace(jstzd);

let router = Router::new()
.route("/health", get(health_check_handler))
.route("/shutdown", put(shutdown_handler))
.route("/config/:config_type", get(config_handler))
.route("/config/", get(all_config_handler))
.with_state(self.state.clone());
let listener = TcpListener::bind(("0.0.0.0", self.jstzd_server_port)).await?;

Expand Down Expand Up @@ -253,3 +279,23 @@ async fn shutdown_handler(state: State<Arc<RwLock<ServerState>>>) -> http::Statu
};
http::StatusCode::NO_CONTENT
}

async fn all_config_handler(state: State<Arc<RwLock<ServerState>>>) -> impl IntoResponse {
let config = &state.read().await.jstzd_config_json;
serde_json::to_string(config).unwrap().into_response()
}

async fn config_handler(
state: State<Arc<RwLock<ServerState>>>,
Path(config_type): Path<String>,
) -> impl IntoResponse {
let config = &state.read().await.jstzd_config_json;
match config.get(&config_type) {
Some(v) => match serde_json::to_string(v) {
Ok(s) => s.into_response(),
// TODO: log this error
Err(_) => http::StatusCode::INTERNAL_SERVER_ERROR.into_response(),
},
None => http::StatusCode::NOT_FOUND.into_response(),
}
}
128 changes: 103 additions & 25 deletions crates/jstzd/tests/jstzd_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
mod utils;
use std::path::PathBuf;

use jstzd::task::jstzd::{JstzdConfig, JstzdServer};
use jstzd::task::utils::retry;
use jstzd::{EXCHANGER_ADDRESS, JSTZ_NATIVE_BRIDGE_ADDRESS};
use octez::r#async::baker::{BakerBinaryPath, OctezBakerConfigBuilder};
Expand All @@ -22,13 +23,38 @@ const CONTRACT_NAMES: [(&str, &str); 2] = [
#[tokio::test(flavor = "multi_thread")]
async fn jstzd_test() {
let rpc_endpoint = Endpoint::localhost(unused_port());
let jstzd_port = unused_port();
let (mut jstzd, config) = create_jstzd_server(&rpc_endpoint, jstzd_port).await;

jstzd.run().await.unwrap();
ensure_jstzd_components_are_up(&rpc_endpoint, jstzd_port).await;
let octez_client = OctezClient::new(config.octez_client_config().clone());
check_bootstrap_contracts(&octez_client).await;

assert!(jstzd.health_check().await);

fetch_config_test(config, jstzd_port).await;

reqwest::Client::new()
.put(&format!("http://localhost:{}/shutdown", jstzd_port))
.send()
.await
.unwrap();

ensure_jstzd_components_are_down(&rpc_endpoint, jstzd_port).await;
}

async fn create_jstzd_server(
octez_node_rpc_endpoint: &Endpoint,
jstzd_port: u16,
) -> (JstzdServer, JstzdConfig) {
let run_options = OctezNodeRunOptionsBuilder::new()
.set_synchronisation_threshold(0)
.set_network("sandbox")
.build();
let octez_node_config = OctezNodeConfigBuilder::new()
.set_network("sandbox")
.set_rpc_endpoint(&rpc_endpoint)
.set_rpc_endpoint(octez_node_rpc_endpoint)
.set_run_options(&run_options)
.build()
.unwrap();
Expand Down Expand Up @@ -60,51 +86,53 @@ async fn jstzd_test() {
.build()
.expect("Failed to build baker config");

let config = jstzd::task::jstzd::JstzdConfig::new(
let config = JstzdConfig::new(
octez_node_config,
baker_config,
octez_client_config.clone(),
protocol_params,
);
let jstzd_port = unused_port();
let mut jstzd = jstzd::task::jstzd::JstzdServer::new(config, jstzd_port);
jstzd.run().await.unwrap();
(JstzdServer::new(config.clone(), jstzd_port), config)
}

async fn ensure_jstzd_components_are_up(
octez_node_rpc_endpoint: &Endpoint,
jstzd_port: u16,
) {
let jstz_health_check_endpoint = format!("http://localhost:{}/health", jstzd_port);
let octez_node_health_check_endpoint = format!("{}/health/ready", rpc_endpoint);
let octez_node_health_check_endpoint =
format!("{}/health/ready", octez_node_rpc_endpoint);

let jstzd_running = retry(30, 1000, || async {
let res = reqwest::get(&jstz_health_check_endpoint).await;
Ok(res.is_ok())
})
.await;
assert!(jstzd_running);

let node_running = retry(30, 1000, || async {
let res = reqwest::get(&octez_node_health_check_endpoint).await;
Ok(res.is_ok())
})
.await;
assert!(node_running);
// check if individual components are up / jstzd health check indeed covers all components
assert!(reqwest::get(&octez_node_health_check_endpoint)
.await
.is_ok());

let baker_running = retry(30, 1000, || async {
if run_ps().await.contains("octez-baker") {
let last_level = get_block_level(&rpc_endpoint.to_string()).await;
let last_level = get_block_level(&octez_node_rpc_endpoint.to_string()).await;
return Ok(last_level > 2);
}
Ok(false)
})
.await;
assert!(baker_running);
assert!(jstzd.health_check().await);

let octez_client = OctezClient::new(octez_client_config);
check_bootstrap_contracts(&octez_client).await;
}

reqwest::Client::new()
.put(&format!("http://localhost:{}/shutdown", jstzd_port))
.send()
.await
.unwrap();
async fn ensure_jstzd_components_are_down(
octez_node_rpc_endpoint: &Endpoint,
jstzd_port: u16,
) {
let jstz_health_check_endpoint = format!("http://localhost:{}/health", jstzd_port);
let octez_node_health_check_endpoint =
format!("{}/health/ready", octez_node_rpc_endpoint);

let jstzd_stopped = retry(30, 1000, || async {
let res = reqwest::get(&jstz_health_check_endpoint).await;
Expand All @@ -116,6 +144,8 @@ async fn jstzd_test() {
.await;
assert!(jstzd_stopped);

// check if individual components are terminated
// and jstzd indeed tears down all components before it shuts down
let node_destroyed = retry(30, 1000, || async {
let res = reqwest::get(&octez_node_health_check_endpoint).await;
// Should get an error since the node should have been terminated
Expand All @@ -132,11 +162,59 @@ async fn jstzd_test() {
})
.await;
assert!(baker_destroyed);
}

assert!(!jstzd.health_check().await);
async fn fetch_config_test(jstzd_config: JstzdConfig, jstzd_port: u16) {
let mut full_config = serde_json::json!({});
for (key, expected_json) in [
(
"octez-node",
serde_json::to_value(jstzd_config.octez_node_config()).unwrap(),
),
(
"octez-client",
serde_json::to_value(jstzd_config.octez_client_config()).unwrap(),
),
(
"octez-baker",
serde_json::to_value(jstzd_config.baker_config()).unwrap(),
),
] {
let res =
reqwest::get(&format!("http://localhost:{}/config/{}", jstzd_port, key))
.await
.unwrap();
assert_eq!(
expected_json,
serde_json::from_str::<serde_json::Value>(&res.text().await.unwrap())
.unwrap(),
"config mismatch at /config/{}",
key
);
full_config
.as_object_mut()
.unwrap()
.insert(key.to_owned(), expected_json);
}

// stop should be idempotent and thus should be okay after jstzd is already stopped
jstzd.stop().await.unwrap();
// invalid config type
assert_eq!(
reqwest::get(&format!("http://localhost:{}/config/foobar", jstzd_port))
.await
.unwrap()
.status(),
reqwest::StatusCode::NOT_FOUND
);

// all configs
let res = reqwest::get(&format!("http://localhost:{}/config/", jstzd_port))
.await
.unwrap();
assert_eq!(
full_config,
serde_json::from_str::<serde_json::Value>(&res.text().await.unwrap()).unwrap(),
"config mismatch at /config/",
);
}

async fn run_ps() -> String {
Expand Down

0 comments on commit 3eaef9d

Please sign in to comment.