From d3032ef2c869a046b65e94ba07699f478b5362c6 Mon Sep 17 00:00:00 2001 From: Haiyi Zhong Date: Tue, 29 Aug 2023 00:05:28 -0400 Subject: [PATCH 1/5] feat(ampd): multisig contract signing started handler --- ampd/src/config.rs | 3 + ampd/src/handlers/errors.rs | 2 + ampd/src/handlers/multisig.rs | 209 ++++++++++++++++++++++++++++++++-- ampd/src/lib.rs | 64 ++++++++--- 4 files changed, 250 insertions(+), 28 deletions(-) diff --git a/ampd/src/config.rs b/ampd/src/config.rs index 2199746d8..2b04a44b5 100644 --- a/ampd/src/config.rs +++ b/ampd/src/config.rs @@ -3,6 +3,7 @@ use serde::Deserialize; use crate::broadcaster; use crate::evm::{deserialize_evm_chain_configs, EvmChainConfig}; use crate::tofnd::Config as TofndConfig; +use crate::types::TMAddress; use crate::url::Url; use crate::ECDSASigningKey; @@ -18,6 +19,7 @@ pub struct Config { #[serde(with = "hex")] pub private_key: ECDSASigningKey, pub event_buffer_cap: usize, + pub multisig_contract: Option, } impl Default for Config { @@ -30,6 +32,7 @@ impl Default for Config { tofnd_config: TofndConfig::default(), private_key: ECDSASigningKey::random(), event_buffer_cap: 100000, + multisig_contract: None, } } } diff --git a/ampd/src/handlers/errors.rs b/ampd/src/handlers/errors.rs index ff8eaf488..d5ecb0d02 100644 --- a/ampd/src/handlers/errors.rs +++ b/ampd/src/handlers/errors.rs @@ -8,4 +8,6 @@ pub enum Error { Finalizer, #[error("failed to deserialize the event")] DeserializeEvent(#[from] serde_json::Error), + #[error("failed to get signature from tofnd")] + Sign, } diff --git a/ampd/src/handlers/multisig.rs b/ampd/src/handlers/multisig.rs index 8eca2c251..a18fd5939 100644 --- a/ampd/src/handlers/multisig.rs +++ b/ampd/src/handlers/multisig.rs @@ -1,15 +1,23 @@ use std::collections::HashMap; -use std::convert::TryFrom; +use std::convert::{TryFrom, TryInto}; +use async_trait::async_trait; +use cosmrs::cosmwasm::MsgExecuteContract; +use cosmwasm_std::{HexBinary, Uint64}; use ecdsa::VerifyingKey; +use error_stack::{IntoReport, ResultExt}; use hex::FromHex; -use multisig::types::KeyID; use serde::de::value::MapDeserializer; use serde::de::Error as DeserializeError; use serde::{Deserialize, Deserializer}; +use multisig::msg::ExecuteMsg; + +use crate::event_processor::EventHandler; use crate::event_sub; use crate::handlers::errors::Error; +use crate::queue::queued_broadcaster::BroadcasterClient; +use crate::tofnd::grpc::SharableEcdsaClient; use crate::tofnd::MessageDigest; use crate::types::PublicKey; use crate::types::TMAddress; @@ -22,7 +30,6 @@ struct SigningStartedEvent { #[serde(rename = "_contract_address")] contract_address: TMAddress, session_id: u64, - key_id: KeyID, #[serde(deserialize_with = "deserialize_public_keys")] pub_keys: HashMap, #[serde(with = "hex")] @@ -72,21 +79,126 @@ impl TryFrom<&event_sub::Event> for Option { } } +pub struct Handler +where + B: BroadcasterClient, +{ + worker: TMAddress, + multisig: TMAddress, + broadcaster: B, + signer: SharableEcdsaClient, +} + +impl Handler +where + B: BroadcasterClient, +{ + pub fn new( + worker: TMAddress, + multisig: TMAddress, + broadcaster: B, + signer: SharableEcdsaClient, + ) -> Self { + Self { + worker, + multisig, + broadcaster, + signer, + } + } + + async fn broadcast_signature( + &self, + session_id: Uint64, + signature: HexBinary, + ) -> error_stack::Result<(), Error> { + let msg = serde_json::to_vec(&ExecuteMsg::SubmitSignature { + session_id, + signature, + }) + .expect("submit signature msg should serialize"); + + let tx = MsgExecuteContract { + sender: self.worker.as_ref().clone(), + contract: self.multisig.as_ref().clone(), + msg, + funds: vec![], + }; + + self.broadcaster + .broadcast(tx) + .await + .change_context(Error::Broadcaster) + } +} + +#[async_trait] +impl EventHandler for Handler +where + B: BroadcasterClient + Send + Sync, +{ + type Err = Error; + + async fn handle(&self, event: &event_sub::Event) -> error_stack::Result<(), Error> { + let SigningStartedEvent { + contract_address, + session_id, + pub_keys, + msg, + } = match event.try_into().into_report()? { + Some(event) => event, + None => return Ok(()), + }; + + if self.multisig != contract_address { + return Ok(()); + } + + if !pub_keys.contains_key(&self.worker) { + return Ok(()); + } + + let signature = self + .signer + .sign( + self.multisig.to_string().as_str(), + msg, + &pub_keys[&self.worker], + ) + .await + .change_context(Error::Sign)?; + + self.broadcast_signature(session_id.into(), signature.into()) + .await?; + + return Ok(()); + } +} + #[cfg(test)] mod test { - use base64::engine::general_purpose::STANDARD; - use base64::Engine; use std::collections::HashMap; use std::convert::TryInto; + use std::time::Duration; + use base64::engine::general_purpose::STANDARD; + use base64::Engine; + use cosmos_sdk_proto::cosmos::base::abci::v1beta1::TxResponse; + use cosmrs::{AccountId, Gas}; use cosmwasm_std::{Addr, HexBinary, Uint64}; - use multisig::events::Event::SigningStarted; - use multisig::types::{MsgToSign, PublicKey}; + use error_stack::Report; use tendermint::abci; - use crate::broadcaster::key::ECDSASigningKey; + use multisig::events::Event::SigningStarted; + use multisig::types::{KeyID, MsgToSign, PublicKey}; use super::*; + use crate::broadcaster::{key::ECDSASigningKey, MockBroadcaster}; + use crate::queue::queued_broadcaster::{QueuedBroadcaster, QueuedBroadcasterClient}; + use crate::tofnd; + use crate::tofnd::grpc::{MockEcdsaClient, SharableEcdsaClient}; + + const MULTISIG_ADDRESS: &str = "axelarvaloper1zh9wrak6ke4n6fclj5e8yk397czv430ygs5jz7"; fn rand_account() -> String { ECDSASigningKey::random().address().to_string() @@ -120,10 +232,7 @@ mod test { let mut event: cosmwasm_std::Event = poll_started.into(); event.ty = format!("wasm-{}", event.ty); - event = event.add_attribute( - "_contract_address", - "axelarvaloper1zh9wrak6ke4n6fclj5e8yk397czv430ygs5jz7", - ); + event = event.add_attribute("_contract_address", MULTISIG_ADDRESS); abci::Event::new( event.ty, @@ -138,6 +247,22 @@ mod test { .unwrap() } + fn get_handler( + worker: TMAddress, + multisig: TMAddress, + signer: SharableEcdsaClient, + ) -> Handler { + let mut broadcaster = MockBroadcaster::new(); + broadcaster + .expect_broadcast() + .returning(|_| Ok(TxResponse::default())); + + let (broadcaster, _) = + QueuedBroadcaster::new(broadcaster, Gas::default(), 100, Duration::from_secs(5)); + + Handler::new(worker, multisig, broadcaster.client(), signer) + } + #[test] fn should_not_deserialize_incorrect_event_type() { // incorrect event type @@ -185,4 +310,64 @@ mod test { assert!(event.is_some()); } + + #[tokio::test] + async fn should_not_handle_event_if_multisig_address_does_not_match() { + let mut client = MockEcdsaClient::new(); + client + .expect_sign() + .returning(move |_, _, _| Err(Report::from(tofnd::error::Error::SignFailed))); + + let handler = get_handler( + ECDSASigningKey::random().address(), + ECDSASigningKey::random().address(), + SharableEcdsaClient::new(client), + ); + + assert!(handler.handle(&signing_started_event()).await.is_ok()); + } + + #[tokio::test] + async fn should_not_handle_event_if_worker_is_not_a_participant() { + let mut client = MockEcdsaClient::new(); + client + .expect_sign() + .returning(move |_, _, _| Err(Report::from(tofnd::error::Error::SignFailed))); + + let handler = get_handler( + ECDSASigningKey::random().address(), + TMAddress::from(MULTISIG_ADDRESS.parse::().unwrap()), + SharableEcdsaClient::new(client), + ); + + assert!(handler.handle(&signing_started_event()).await.is_ok()); + } + + #[tokio::test] + async fn should_not_handle_event_if_sign_failed() { + let mut client = MockEcdsaClient::new(); + client + .expect_sign() + .returning(move |_, _, _| Err(Report::from(tofnd::error::Error::SignFailed))); + + let event = signing_started_event(); + let signing_started: Option = (&event).try_into().unwrap(); + let worker = signing_started + .unwrap() + .pub_keys + .keys() + .next() + .unwrap() + .clone(); + let handler = get_handler( + worker, + TMAddress::from(MULTISIG_ADDRESS.parse::().unwrap()), + SharableEcdsaClient::new(client), + ); + + assert!(matches!( + *handler.handle(&event).await.unwrap_err().current_context(), + Error::Sign + )); + } } diff --git a/ampd/src/lib.rs b/ampd/src/lib.rs index a511c8ee5..fdcacf734 100644 --- a/ampd/src/lib.rs +++ b/ampd/src/lib.rs @@ -11,9 +11,8 @@ use tokio_stream::Stream; use tokio_util::sync::CancellationToken; use tracing::info; -use crate::config::Config; use broadcaster::{accounts::account, key::ECDSASigningKey, Broadcaster}; -use event_processor::EventProcessor; +use event_processor::{EventHandler, EventProcessor}; use event_sub::Event; use evm::EvmChainConfig; use queue::queued_broadcaster::{QueuedBroadcaster, QueuedBroadcasterDriver}; @@ -22,6 +21,8 @@ use state::StateUpdater; use tofnd::grpc::{MultisigClient, SharableEcdsaClient}; use types::TMAddress; +use crate::config::Config; + mod broadcaster; pub mod config; mod event_processor; @@ -47,6 +48,7 @@ pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> { tofnd_config, private_key, event_buffer_cap, + multisig_contract, } = cfg; let tm_client = @@ -99,7 +101,9 @@ pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> { broadcast, event_buffer_cap, ) - .configure_evm_chains(worker, evm_chains) + .configure_evm_chains(&worker, evm_chains) + .await? + .configure_multisig(&worker, multisig_contract) .await? .run() .await @@ -161,7 +165,7 @@ where async fn configure_evm_chains( mut self, - worker: TMAddress, + worker: &TMAddress, evm_chains: Vec, ) -> Result, Error> { for config in evm_chains { @@ -174,23 +178,51 @@ where self.broadcaster.client(), ); - let (handler, rx) = handlers::end_block::with_block_height_notifier(handler); - self.state_updater.register_event(&label, rx); - - let sub: HandlerStream<_> = - match self.state_updater.state().handler_block_height(&label) { - None => Box::pin(self.event_sub.sub()), - Some(&completed_height) => Box::pin(event_sub::skip_to_block( - self.event_sub.sub(), - completed_height.increment(), - )), - }; - self.event_processor.add_handler(handler, sub); + self.register_handler(label.as_ref(), handler).await; } Ok(self) } + async fn configure_multisig( + mut self, + worker: &TMAddress, + multisig: Option, + ) -> Result, Error> { + if let Some(address) = multisig { + self.register_handler( + "multisig-handler", + handlers::multisig::Handler::new( + worker.clone(), + address, + self.broadcaster.client(), + self.ecdsa_client.clone(), + ), + ) + .await; + } + + Ok(self) + } + + async fn register_handler( + &mut self, + label: &str, + handler: impl EventHandler + Send + Sync + 'static, + ) { + let (handler, rx) = handlers::end_block::with_block_height_notifier(handler); + self.state_updater.register_event(label, rx); + + let sub: HandlerStream<_> = match self.state_updater.state().handler_block_height(label) { + None => Box::pin(self.event_sub.sub()), + Some(&completed_height) => Box::pin(event_sub::skip_to_block( + self.event_sub.sub(), + completed_height.increment(), + )), + }; + self.event_processor.add_handler(handler, sub); + } + async fn run(self) -> Result<(), Error> { let Self { event_sub, From 60cd341ecf02920a60e8936ba828a328d6ce27e2 Mon Sep 17 00:00:00 2001 From: Haiyi Zhong Date: Wed, 30 Aug 2023 10:30:53 -0400 Subject: [PATCH 2/5] address comment --- ampd/src/handlers/multisig.rs | 33 +++++++++++++++------------------ ampd/src/lib.rs | 15 +++++++-------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/ampd/src/handlers/multisig.rs b/ampd/src/handlers/multisig.rs index a18fd5939..b20002c27 100644 --- a/ampd/src/handlers/multisig.rs +++ b/ampd/src/handlers/multisig.rs @@ -154,24 +154,21 @@ where return Ok(()); } - if !pub_keys.contains_key(&self.worker) { - return Ok(()); - } - - let signature = self - .signer - .sign( - self.multisig.to_string().as_str(), - msg, - &pub_keys[&self.worker], - ) - .await - .change_context(Error::Sign)?; - - self.broadcast_signature(session_id.into(), signature.into()) - .await?; - - return Ok(()); + return match pub_keys.get(&self.worker) { + Some(pub_key) => { + let signature = self + .signer + .sign(self.multisig.to_string().as_str(), msg, pub_key) + .await + .change_context(Error::Sign)?; + + self.broadcast_signature(session_id.into(), signature.into()) + .await?; + + Ok(()) + } + None => Ok(()), + }; } } diff --git a/ampd/src/lib.rs b/ampd/src/lib.rs index fdcacf734..ccfa150bc 100644 --- a/ampd/src/lib.rs +++ b/ampd/src/lib.rs @@ -104,7 +104,7 @@ pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> { .configure_evm_chains(&worker, evm_chains) .await? .configure_multisig(&worker, multisig_contract) - .await? + .await .run() .await } @@ -188,7 +188,7 @@ where mut self, worker: &TMAddress, multisig: Option, - ) -> Result, Error> { + ) -> App { if let Some(address) = multisig { self.register_handler( "multisig-handler", @@ -202,14 +202,13 @@ where .await; } - Ok(self) + self } - async fn register_handler( - &mut self, - label: &str, - handler: impl EventHandler + Send + Sync + 'static, - ) { + async fn register_handler(&mut self, label: &str, handler: H) + where + H: EventHandler + Send + Sync + 'static, + { let (handler, rx) = handlers::end_block::with_block_height_notifier(handler); self.state_updater.register_event(label, rx); From f873d9a195d9515d234482113cba6249f77f7939 Mon Sep 17 00:00:00 2001 From: Haiyi Zhong Date: Wed, 30 Aug 2023 12:41:50 -0400 Subject: [PATCH 3/5] address comment --- ampd/src/handlers/multisig.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/ampd/src/handlers/multisig.rs b/ampd/src/handlers/multisig.rs index 1328781fa..b9d8dc1ca 100644 --- a/ampd/src/handlers/multisig.rs +++ b/ampd/src/handlers/multisig.rs @@ -90,12 +90,12 @@ where async fn broadcast_signature( &self, - session_id: Uint64, - signature: HexBinary, + session_id: impl Into, + signature: impl Into, ) -> error_stack::Result<(), Error> { let msg = serde_json::to_vec(&ExecuteMsg::SubmitSignature { - session_id, - signature, + session_id: session_id.into(), + signature: signature.into(), }) .expect("submit signature msg should serialize"); @@ -145,8 +145,7 @@ where .await .change_context(Error::Sign)?; - self.broadcast_signature(session_id.into(), signature.into()) - .await?; + self.broadcast_signature(session_id, signature).await?; Ok(()) } From 70f48df92301e4da101ee9ef01eebb2ecdeb5be2 Mon Sep 17 00:00:00 2001 From: Haiyi Zhong Date: Wed, 30 Aug 2023 14:53:28 -0400 Subject: [PATCH 4/5] address comment --- ampd/src/config.rs | 6 +++--- ampd/src/handlers/multisig.rs | 21 +++++++++++++++++++-- ampd/src/lib.rs | 11 ++++++----- ampd/src/tofnd/mod.rs | 6 ++++++ 4 files changed, 34 insertions(+), 10 deletions(-) diff --git a/ampd/src/config.rs b/ampd/src/config.rs index 2b04a44b5..5eb70b2c3 100644 --- a/ampd/src/config.rs +++ b/ampd/src/config.rs @@ -2,8 +2,8 @@ use serde::Deserialize; use crate::broadcaster; use crate::evm::{deserialize_evm_chain_configs, EvmChainConfig}; +use crate::handlers::multisig::MultisigConfig; use crate::tofnd::Config as TofndConfig; -use crate::types::TMAddress; use crate::url::Url; use crate::ECDSASigningKey; @@ -19,7 +19,7 @@ pub struct Config { #[serde(with = "hex")] pub private_key: ECDSASigningKey, pub event_buffer_cap: usize, - pub multisig_contract: Option, + pub multisig: Option, } impl Default for Config { @@ -32,7 +32,7 @@ impl Default for Config { tofnd_config: TofndConfig::default(), private_key: ECDSASigningKey::random(), event_buffer_cap: 100000, - multisig_contract: None, + multisig: None, } } } diff --git a/ampd/src/handlers/multisig.rs b/ampd/src/handlers/multisig.rs index b9d8dc1ca..3b3cb5891 100644 --- a/ampd/src/handlers/multisig.rs +++ b/ampd/src/handlers/multisig.rs @@ -6,9 +6,10 @@ use cosmrs::cosmwasm::MsgExecuteContract; use cosmwasm_std::{HexBinary, Uint64}; use ecdsa::VerifyingKey; use error_stack::{IntoReport, ResultExt}; -use hex::FromHex; +use hex::{encode, FromHex}; use serde::de::Error as DeserializeError; use serde::{Deserialize, Deserializer}; +use tracing::info; use events::Error::EventTypeMismatch; use events_derive; @@ -23,6 +24,11 @@ use crate::tofnd::MessageDigest; use crate::types::PublicKey; use crate::types::TMAddress; +#[derive(Debug, Deserialize)] +pub struct MultisigConfig { + pub address: TMAddress, +} + #[derive(Debug, Deserialize)] #[try_from("wasm-signing_started")] struct SigningStartedEvent { @@ -137,6 +143,12 @@ where return Ok(()); } + info!( + session_id = session_id, + msg = encode(&msg), + "get signing request", + ); + return match pub_keys.get(&self.worker) { Some(pub_key) => { let signature = self @@ -145,11 +157,16 @@ where .await .change_context(Error::Sign)?; + info!(signature = encode(&signature), "ready to submit signature"); + self.broadcast_signature(session_id, signature).await?; Ok(()) } - None => Ok(()), + None => { + info!("worker is not a participant"); + Ok(()) + } }; } } diff --git a/ampd/src/lib.rs b/ampd/src/lib.rs index 642458865..6aee8612e 100644 --- a/ampd/src/lib.rs +++ b/ampd/src/lib.rs @@ -16,6 +16,7 @@ use broadcaster::{accounts::account, key::ECDSASigningKey, Broadcaster}; use event_processor::{EventHandler, EventProcessor}; use events::Event; use evm::EvmChainConfig; +use handlers::multisig::MultisigConfig; use queue::queued_broadcaster::{QueuedBroadcaster, QueuedBroadcasterDriver}; use report::Error; use state::StateUpdater; @@ -47,7 +48,7 @@ pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> { tofnd_config, private_key, event_buffer_cap, - multisig_contract, + multisig, } = cfg; let tm_client = @@ -102,7 +103,7 @@ pub async fn run(cfg: Config, state_path: PathBuf) -> Result<(), Error> { ) .configure_evm_chains(&worker, evm_chains) .await? - .configure_multisig(&worker, multisig_contract) + .configure_multisig(&worker, multisig) .await .run() .await @@ -186,14 +187,14 @@ where async fn configure_multisig( mut self, worker: &TMAddress, - multisig: Option, + multisig: Option, ) -> App { - if let Some(address) = multisig { + if let Some(config) = multisig { self.register_handler( "multisig-handler", handlers::multisig::Handler::new( worker.clone(), - address, + config.address, self.broadcaster.client(), self.ecdsa_client.clone(), ), diff --git a/ampd/src/tofnd/mod.rs b/ampd/src/tofnd/mod.rs index 95b359899..bd67f9268 100644 --- a/ampd/src/tofnd/mod.rs +++ b/ampd/src/tofnd/mod.rs @@ -53,3 +53,9 @@ impl From<[u8; 32]> for MessageDigest { MessageDigest(digest) } } + +impl AsRef<[u8]> for MessageDigest { + fn as_ref(&self) -> &[u8] { + &self.0 + } +} From bc1df29f3f810ae016c48cbaa09032d485f3df2b Mon Sep 17 00:00:00 2001 From: Haiyi Zhong Date: Wed, 30 Aug 2023 15:32:29 -0400 Subject: [PATCH 5/5] address comment --- ampd/src/handlers/multisig.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ampd/src/handlers/multisig.rs b/ampd/src/handlers/multisig.rs index 3b3cb5891..7c81a0d71 100644 --- a/ampd/src/handlers/multisig.rs +++ b/ampd/src/handlers/multisig.rs @@ -149,7 +149,7 @@ where "get signing request", ); - return match pub_keys.get(&self.worker) { + match pub_keys.get(&self.worker) { Some(pub_key) => { let signature = self .signer @@ -167,7 +167,7 @@ where info!("worker is not a participant"); Ok(()) } - }; + } } }