diff --git a/ampd/src/config.rs b/ampd/src/config.rs index c3080841d..bf7814680 100644 --- a/ampd/src/config.rs +++ b/ampd/src/config.rs @@ -2,6 +2,7 @@ use serde::Deserialize; use crate::broadcaster; use crate::evm::{deserialize_evm_chain_configs, EvmChainConfig}; +use crate::handlers::multisig::MultisigConfig; use crate::tofnd::Config as TofndConfig; use crate::url::Url; @@ -15,6 +16,7 @@ pub struct Config { pub evm_chains: Vec, pub tofnd_config: TofndConfig, pub event_buffer_cap: usize, + pub multisig: Option, } impl Default for Config { @@ -26,6 +28,7 @@ impl Default for Config { evm_chains: vec![], tofnd_config: TofndConfig::default(), event_buffer_cap: 100000, + multisig: None, } } } diff --git a/ampd/src/handlers/errors.rs b/ampd/src/handlers/errors.rs index ed4c5502b..37e0f2b11 100644 --- a/ampd/src/handlers/errors.rs +++ b/ampd/src/handlers/errors.rs @@ -8,4 +8,6 @@ pub enum Error { Finalizer, #[error("failed to deserialize the event")] DeserializeEvent, + #[error("failed to get signature from tofnd")] + Sign, } diff --git a/ampd/src/handlers/multisig.rs b/ampd/src/handlers/multisig.rs index 65e61c839..c1c52a1ec 100644 --- a/ampd/src/handlers/multisig.rs +++ b/ampd/src/handlers/multisig.rs @@ -1,26 +1,40 @@ use std::collections::HashMap; +use std::convert::TryInto; +use async_trait::async_trait; +use cosmrs::cosmwasm::MsgExecuteContract; +use cosmwasm_std::{HexBinary, Uint64}; use ecdsa::VerifyingKey; -use hex::FromHex; +use error_stack::{IntoReport, ResultExt}; +use hex::{encode, FromHex}; use serde::de::Error as DeserializeError; use serde::{Deserialize, Deserializer}; +use tracing::info; +use events::Error::EventTypeMismatch; use events_derive; -use multisig::types::KeyID; +use events_derive::try_from; +use multisig::msg::ExecuteMsg; +use crate::event_processor::EventHandler; +use crate::handlers::errors::Error::{self, DeserializeEvent}; +use crate::queue::queued_broadcaster::BroadcasterClient; +use crate::tofnd::grpc::SharableEcdsaClient; use crate::tofnd::MessageDigest; use crate::types::PublicKey; use crate::types::TMAddress; -use events_derive::try_from; -#[allow(dead_code)] +#[derive(Debug, Deserialize)] +pub struct MultisigConfig { + pub address: TMAddress, +} + #[derive(Debug, Deserialize)] #[try_from("wasm-signing_started")] struct SigningStartedEvent { #[serde(rename = "_contract_address")] contract_address: TMAddress, session_id: u64, - key_id: KeyID, #[serde(deserialize_with = "deserialize_public_keys")] pub_keys: HashMap, #[serde(with = "hex")] @@ -52,30 +66,144 @@ where .collect() } +pub struct Handler +where + B: BroadcasterClient, +{ + worker: TMAddress, + multisig: TMAddress, + broadcaster: B, + signer: SharableEcdsaClient, +} + +impl Handler +where + B: BroadcasterClient, +{ + pub fn new( + worker: TMAddress, + multisig: TMAddress, + broadcaster: B, + signer: SharableEcdsaClient, + ) -> Self { + Self { + worker, + multisig, + broadcaster, + signer, + } + } + + async fn broadcast_signature( + &self, + session_id: impl Into, + signature: impl Into, + ) -> error_stack::Result<(), Error> { + let msg = serde_json::to_vec(&ExecuteMsg::SubmitSignature { + session_id: session_id.into(), + signature: signature.into(), + }) + .expect("submit signature msg should serialize"); + + let tx = MsgExecuteContract { + sender: self.worker.as_ref().clone(), + contract: self.multisig.as_ref().clone(), + msg, + funds: vec![], + }; + + self.broadcaster + .broadcast(tx) + .await + .change_context(Error::Broadcaster) + } +} + +#[async_trait] +impl EventHandler for Handler +where + B: BroadcasterClient + Send + Sync, +{ + type Err = Error; + + async fn handle(&self, event: &events::Event) -> error_stack::Result<(), Error> { + let SigningStartedEvent { + contract_address, + session_id, + pub_keys, + msg, + } = match event.try_into() as error_stack::Result<_, _> { + Err(report) if matches!(report.current_context(), EventTypeMismatch(_)) => { + return Ok(()); + } + result => result.change_context(DeserializeEvent)?, + }; + + if self.multisig != contract_address { + return Ok(()); + } + + info!( + session_id = session_id, + msg = encode(&msg), + "get signing request", + ); + + match pub_keys.get(&self.worker) { + Some(pub_key) => { + let signature = self + .signer + .sign(self.multisig.to_string().as_str(), msg, pub_key) + .await + .change_context(Error::Sign)?; + + info!(signature = encode(&signature), "ready to submit signature"); + + self.broadcast_signature(session_id, signature).await?; + + Ok(()) + } + None => { + info!("worker is not a participant"); + Ok(()) + } + } + } +} + #[cfg(test)] mod test { - use error_stack::Result; - use std::collections::HashMap; - use std::convert::{TryFrom, TryInto}; use base64::engine::general_purpose::STANDARD; use base64::Engine; + use std::collections::HashMap; + use std::convert::{TryFrom, TryInto}; + use std::time::Duration; + + use cosmos_sdk_proto::cosmos::base::abci::v1beta1::TxResponse; + use cosmrs::{AccountId, Gas}; use cosmwasm_std::{Addr, HexBinary, Uint64}; use ecdsa::SigningKey; + use error_stack::{Report, Result}; use rand::rngs::OsRng; use tendermint::abci; - use multisig::events::Event::SigningStarted; - use multisig::types::{MsgToSign, PublicKey}; - use super::*; + use crate::broadcaster::MockBroadcaster; + use crate::queue::queued_broadcaster::{QueuedBroadcaster, QueuedBroadcasterClient}; + use crate::tofnd; + use crate::tofnd::grpc::{MockEcdsaClient, SharableEcdsaClient}; use crate::types; + use multisig::events::Event::SigningStarted; + use multisig::types::{KeyID, MsgToSign, PublicKey}; + + const MULTISIG_ADDRESS: &str = "axelarvaloper1zh9wrak6ke4n6fclj5e8yk397czv430ygs5jz7"; - fn rand_account() -> String { + fn rand_account() -> TMAddress { types::PublicKey::from(SigningKey::random(&mut OsRng).verifying_key()) .account_id("axelar") .unwrap() - .to_string() + .into() } fn rand_public_key() -> PublicKey { @@ -91,7 +219,7 @@ mod test { fn signing_started_event() -> events::Event { let pub_keys = (0..10) - .map(|_| (rand_account(), rand_public_key())) + .map(|_| (rand_account().to_string(), rand_public_key())) .collect::>(); let poll_started = SigningStarted { @@ -106,10 +234,7 @@ mod test { let mut event: cosmwasm_std::Event = poll_started.into(); event.ty = format!("wasm-{}", event.ty); - event = event.add_attribute( - "_contract_address", - "axelarvaloper1zh9wrak6ke4n6fclj5e8yk397czv430ygs5jz7", - ); + event = event.add_attribute("_contract_address", MULTISIG_ADDRESS); events::Event::try_from(abci::Event::new( event.ty, @@ -123,6 +248,22 @@ mod test { .unwrap() } + fn get_handler( + worker: TMAddress, + multisig: TMAddress, + signer: SharableEcdsaClient, + ) -> Handler { + let mut broadcaster = MockBroadcaster::new(); + broadcaster + .expect_broadcast() + .returning(|_| Ok(TxResponse::default())); + + let (broadcaster, _) = + QueuedBroadcaster::new(broadcaster, Gas::default(), 100, Duration::from_secs(5)); + + Handler::new(worker, multisig, broadcaster.client(), signer) + } + #[test] fn should_not_deserialize_incorrect_event_type() { // incorrect event type @@ -150,7 +291,7 @@ mod test { let invalid_pub_key: [u8; 32] = rand::random(); let mut map: HashMap = HashMap::new(); map.insert( - rand_account(), + rand_account().to_string(), PublicKey::unchecked(HexBinary::from(invalid_pub_key.as_slice())), ); match event { @@ -177,4 +318,58 @@ mod test { assert!(event.is_ok()); } + + #[tokio::test] + async fn should_not_handle_event_if_multisig_address_does_not_match() { + let mut client = MockEcdsaClient::new(); + client + .expect_sign() + .returning(move |_, _, _| Err(Report::from(tofnd::error::Error::SignFailed))); + + let handler = get_handler( + rand_account(), + rand_account(), + SharableEcdsaClient::new(client), + ); + + assert!(handler.handle(&signing_started_event()).await.is_ok()); + } + + #[tokio::test] + async fn should_not_handle_event_if_worker_is_not_a_participant() { + let mut client = MockEcdsaClient::new(); + client + .expect_sign() + .returning(move |_, _, _| Err(Report::from(tofnd::error::Error::SignFailed))); + + let handler = get_handler( + rand_account(), + TMAddress::from(MULTISIG_ADDRESS.parse::().unwrap()), + SharableEcdsaClient::new(client), + ); + + assert!(handler.handle(&signing_started_event()).await.is_ok()); + } + + #[tokio::test] + async fn should_not_handle_event_if_sign_failed() { + let mut client = MockEcdsaClient::new(); + client + .expect_sign() + .returning(move |_, _, _| Err(Report::from(tofnd::error::Error::SignFailed))); + + let event = signing_started_event(); + let signing_started: SigningStartedEvent = ((&event).try_into() as Result<_, _>).unwrap(); + let worker = signing_started.pub_keys.keys().next().unwrap().clone(); + let handler = get_handler( + worker, + TMAddress::from(MULTISIG_ADDRESS.parse::().unwrap()), + SharableEcdsaClient::new(client), + ); + + assert!(matches!( + *handler.handle(&event).await.unwrap_err().current_context(), + Error::Sign + )); + } } diff --git a/ampd/src/lib.rs b/ampd/src/lib.rs index 4e0dff366..5dece4859 100644 --- a/ampd/src/lib.rs +++ b/ampd/src/lib.rs @@ -13,9 +13,10 @@ use tracing::info; use crate::config::Config; use broadcaster::{accounts::account, Broadcaster}; -use event_processor::EventProcessor; +use event_processor::{EventHandler, EventProcessor}; use events::Event; use evm::EvmChainConfig; +use handlers::multisig::MultisigConfig; use queue::queued_broadcaster::{QueuedBroadcaster, QueuedBroadcasterDriver}; use report::Error; use state::StateUpdater; @@ -48,6 +49,7 @@ pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> { evm_chains, tofnd_config, event_buffer_cap, + multisig, } = cfg; let tm_client = @@ -102,8 +104,10 @@ pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> { broadcast, event_buffer_cap, ) - .configure_evm_chains(worker, evm_chains) + .configure_evm_chains(&worker, evm_chains) .await? + .configure_multisig(&worker, multisig) + .await .run() .await } @@ -164,7 +168,7 @@ where async fn configure_evm_chains( mut self, - worker: TMAddress, + worker: &TMAddress, evm_chains: Vec, ) -> Result, Error> { for config in evm_chains { @@ -177,23 +181,50 @@ where self.broadcaster.client(), ); - let (handler, rx) = handlers::end_block::with_block_height_notifier(handler); - self.state_updater.register_event(&label, rx); - - let sub: HandlerStream<_> = - match self.state_updater.state().handler_block_height(&label) { - None => Box::pin(self.event_sub.sub()), - Some(&completed_height) => Box::pin(event_sub::skip_to_block( - self.event_sub.sub(), - completed_height.increment(), - )), - }; - self.event_processor.add_handler(handler, sub); + self.register_handler(label.as_ref(), handler).await; } Ok(self) } + async fn configure_multisig( + mut self, + worker: &TMAddress, + multisig: Option, + ) -> App { + if let Some(config) = multisig { + self.register_handler( + "multisig-handler", + handlers::multisig::Handler::new( + worker.clone(), + config.address, + self.broadcaster.client(), + self.ecdsa_client.clone(), + ), + ) + .await; + } + + self + } + + async fn register_handler(&mut self, label: &str, handler: H) + where + H: EventHandler + Send + Sync + 'static, + { + let (handler, rx) = handlers::end_block::with_block_height_notifier(handler); + self.state_updater.register_event(label, rx); + + let sub: HandlerStream<_> = match self.state_updater.state().handler_block_height(label) { + None => Box::pin(self.event_sub.sub()), + Some(&completed_height) => Box::pin(event_sub::skip_to_block( + self.event_sub.sub(), + completed_height.increment(), + )), + }; + self.event_processor.add_handler(handler, sub); + } + async fn run(self) -> Result<(), Error> { let Self { event_sub, diff --git a/ampd/src/tofnd/mod.rs b/ampd/src/tofnd/mod.rs index 95b359899..bd67f9268 100644 --- a/ampd/src/tofnd/mod.rs +++ b/ampd/src/tofnd/mod.rs @@ -53,3 +53,9 @@ impl From<[u8; 32]> for MessageDigest { MessageDigest(digest) } } + +impl AsRef<[u8]> for MessageDigest { + fn as_ref(&self) -> &[u8] { + &self.0 + } +}