Skip to content

Commit

Permalink
fix: correctly read and persist state to disk (#101)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgorenflo authored Sep 16, 2023
1 parent 402ee54 commit 00377b8
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 111 deletions.
61 changes: 49 additions & 12 deletions ampd/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::path::PathBuf;
use std::pin::Pin;

use cosmos_sdk_proto::cosmos::{
auth::v1beta1::query_client::QueryClient, tx::v1beta1::service_client::ServiceClient,
};
use error_stack::{FutureExt, Result, ResultExt};
use thiserror::Error;
use tokio::signal::unix::{signal, SignalKind};
use tokio::sync::oneshot;
use tokio::task::JoinSet;
use tokio_stream::Stream;
use tokio_util::sync::CancellationToken;
Expand All @@ -20,7 +21,7 @@ use tofnd::grpc::{MultisigClient, SharableEcdsaClient};
use types::TMAddress;

use crate::config::Config;
use crate::error::Error;
use crate::state::State;

mod broadcaster;
pub mod config;
Expand All @@ -40,7 +41,16 @@ const PREFIX: &str = "axelar";

type HandlerStream<E> = Pin<Box<dyn Stream<Item = Result<Event, E>> + Send>>;

pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> {
pub async fn run(cfg: Config, state: State) -> (State, Result<(), Error>) {
let app = prepare_app(cfg, state.clone()).await;

match app {
Ok(app) => app.run().await,
Err(err) => (state, Err(err)),
}
}

async fn prepare_app(cfg: Config, state: State) -> Result<App<impl Broadcaster>, Error> {
let Config {
tm_jsonrpc,
tm_grpc,
Expand All @@ -63,7 +73,7 @@ pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> {
.change_context(Error::Connection)?;
let ecdsa_client = SharableEcdsaClient::new(multisig_client);

let mut state_updater = StateUpdater::new(state_path).change_context(Error::StateUpdater)?;
let mut state_updater = StateUpdater::new(state);
let pub_key = match state_updater.state().pub_key {
Some(pub_key) => pub_key,
None => {
Expand Down Expand Up @@ -103,9 +113,7 @@ pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> {
broadcast,
event_buffer_cap,
)
.configure_handlers(worker, handlers)?
.run()
.await
.configure_handlers(worker, handlers)
}

struct App<T>
Expand Down Expand Up @@ -234,7 +242,7 @@ where
self.event_processor.add_handler(handler, sub);
}

async fn run(self) -> Result<(), Error> {
async fn run(self) -> (State, Result<(), Error>) {
let Self {
event_sub,
event_processor,
Expand All @@ -259,23 +267,52 @@ where
exit_token.cancel();
});

let (state_tx, mut state_rx) = oneshot::channel::<State>();
let mut set = JoinSet::new();
set.spawn(event_sub.run().change_context(Error::EventSub));
set.spawn(event_processor.run().change_context(Error::EventProcessor));
set.spawn(broadcaster.run().change_context(Error::Broadcaster));
set.spawn(state_updater.run().change_context(Error::StateUpdater));
set.spawn(async move {
// assert: the app must wait for this task to exit before trying to receive the state
state_tx
.send(state_updater.run().await)
.expect("the state receiver should still be alive");
Ok(())
});

let res = match (set.join_next().await, token.is_cancelled()) {
let execution_result = match (set.join_next().await, token.is_cancelled()) {
(Some(result), false) => {
token.cancel();
result.change_context(Error::Task)?
result.unwrap_or_else(|err| Err(err).change_context(Error::Task))
}
(Some(_), true) => Ok(()),
(None, _) => panic!("all tasks exited unexpectedly"),
};

while (set.join_next().await).is_some() {}
// assert: all tasks have exited, it is safe to receive the state
let state = state_rx
.try_recv()
.expect("the state sender should have been able to send the state");

res
(state, execution_result)
}
}

#[derive(Error, Debug)]
pub enum Error {
#[error("event sub failed")]
EventSub,
#[error("event processor failed")]
EventProcessor,
#[error("broadcaster failed")]
Broadcaster,
#[error("tofnd failed")]
Tofnd,
#[error("connection failed")]
Connection,
#[error("task execution failed")]
Task,
#[error("failed to return updated state")]
ReturnState,
}
60 changes: 39 additions & 21 deletions ampd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ use ::config::{Config as cfg, Environment, File, FileFormat, FileSourceFile};
use clap::{Parser, ValueEnum};
use config::ConfigError;
use error_stack::{Report, ResultExt};
use thiserror::Error;
use tracing::{error, info};
use valuable::Valuable;

use ampd::config::Config;
use ampd::error::Error;
use ampd::run;
use ampd::{run, state};
use axelar_wasm_std::utils::InspectorResult;
use axelar_wasm_std::FnExt;
use report::LoggableError;

#[derive(Debug, Parser, Valuable)]
Expand Down Expand Up @@ -47,7 +46,7 @@ async fn main() -> ExitCode {
info!(args = args.as_value(), "starting daemon");
let result = run_daemon(&args)
.await
.tap_err(|report| error!(err = LoggableError::from(report).as_value(), "{report}"));
.tap_err(|report| error!(err = LoggableError::from(report).as_value(), "{report:#}"));
info!("shutting down");

match result {
Expand All @@ -63,6 +62,30 @@ async fn main() -> ExitCode {
}
}

async fn run_daemon(args: &Args) -> Result<(), Report<Error>> {
let cfg = init_config(&args.config);
let state_path = expand_home_dir(&args.state);

let state = state::load(&state_path).change_context(Error::Fatal)?;
let (state, execution_result) = run(cfg, state).await;
let state_flush_result = state::flush(&state, state_path).change_context(Error::Fatal);

let execution_result = execution_result.change_context(Error::Fatal);
match (execution_result, state_flush_result) {
// both execution and persisting state failed: return the merged error
(Err(mut report), Err(state_err)) => {
report.extend_one(state_err);
Err(report)
}

// any single path failed: report the error
(Err(report), Ok(())) | (Ok(()), Err(report)) => Err(report),

// no errors in either execution or persisting state
(Ok(()), Ok(())) => Ok(()),
}
}

fn set_up_logger(output: &Output) {
match output {
Output::Json => {
Expand All @@ -74,13 +97,6 @@ fn set_up_logger(output: &Output) {
};
}

async fn run_daemon(args: &Args) -> Result<(), Report<Error>> {
let cfg = init_config(&args.config);
let state_path = check_state_path(args.state.as_path())?;

run(cfg, state_path).await
}

fn init_config(config_paths: &[PathBuf]) -> Config {
let files = find_config_files(config_paths);

Expand All @@ -93,7 +109,6 @@ fn init_config(config_paths: &[PathBuf]) -> Config {
fn find_config_files(config: &[PathBuf]) -> Vec<File<FileSourceFile, FileFormat>> {
let files = config
.iter()
.map(PathBuf::as_path)
.map(expand_home_dir)
.map(canonicalize)
.filter_map(Result::ok)
Expand All @@ -119,16 +134,19 @@ fn parse_config(
.map_err(Report::from)
}

fn check_state_path(path: &Path) -> error_stack::Result<PathBuf, Error> {
expand_home_dir(path)
.then(canonicalize)
.change_context(Error::StateLocation(path.to_string_lossy().into_owned()))
}

fn expand_home_dir(path: &Path) -> PathBuf {
let Ok(home_subfolder) = path.strip_prefix("~") else {
return path.to_path_buf();
fn expand_home_dir(path: impl AsRef<Path>) -> PathBuf {
let path = path.as_ref();
let Ok(home_subfolder) = path.strip_prefix("~") else{
return path.to_path_buf()
};

dirs::home_dir().map_or(path.to_path_buf(), |home| home.join(home_subfolder))
}

#[derive(Error, Debug)]
enum Error {
#[error("failed to load config")]
LoadConfig,
#[error("fatal failure")]
Fatal,
}
Loading

0 comments on commit 00377b8

Please sign in to comment.