From e5c15070f5aa710adbfebed22217e35453a64810 Mon Sep 17 00:00:00 2001 From: Xiangyi Zheng Date: Tue, 29 Oct 2024 15:30:12 -0700 Subject: [PATCH] enable batching multiple sign calls in one batch transaction (#909) * enable batching multiple sign calls in one batch transaction * remove receipt_id from anywhere but indexer --- chain-signatures/Cargo.lock | 1 + chain-signatures/contract/src/errors/mod.rs | 2 +- chain-signatures/contract/src/lib.rs | 21 ++- chain-signatures/node/Cargo.toml | 1 + chain-signatures/node/src/indexer.rs | 2 +- chain-signatures/node/src/kdf.rs | 4 +- chain-signatures/node/src/protocol/message.rs | 30 ++-- .../node/src/protocol/signature.rs | 161 ++++++++++-------- chain-signatures/node/src/protocol/triple.rs | 2 +- integration-tests/chain-signatures/Cargo.lock | 1 + .../chain-signatures/tests/actions/mod.rs | 109 +++++++++++- .../tests/actions/wait_for.rs | 96 +++++++++++ .../chain-signatures/tests/cases/mod.rs | 26 +++ 13 files changed, 360 insertions(+), 96 deletions(-) diff --git a/chain-signatures/Cargo.lock b/chain-signatures/Cargo.lock index 7a47fc6bc..f0d376fa9 100644 --- a/chain-signatures/Cargo.lock +++ b/chain-signatures/Cargo.lock @@ -3244,6 +3244,7 @@ dependencies = [ "aws-types", "axum", "axum-extra", + "borsh", "cait-sith", "chrono", "clap", diff --git a/chain-signatures/contract/src/errors/mod.rs b/chain-signatures/contract/src/errors/mod.rs index 2c5cf8284..b3eb3388a 100644 --- a/chain-signatures/contract/src/errors/mod.rs +++ b/chain-signatures/contract/src/errors/mod.rs @@ -6,7 +6,7 @@ pub enum SignError { #[error("Signature request has timed out.")] Timeout, #[error("Signature request has already been submitted. Please try again later.")] - PayloadCollision, + RequestCollision, #[error( "This key version is not supported. Call latest_key_version() to get the latest supported version." )] diff --git a/chain-signatures/contract/src/lib.rs b/chain-signatures/contract/src/lib.rs index 0da7dfec3..39740ec73 100644 --- a/chain-signatures/contract/src/lib.rs +++ b/chain-signatures/contract/src/lib.rs @@ -64,17 +64,23 @@ impl Default for VersionedMpcContract { #[derive(BorshDeserialize, BorshSerialize, Debug)] pub struct MpcContract { protocol_state: ProtocolContractState, - pending_requests: LookupMap, + pending_requests: LookupMap>, request_counter: u32, proposed_updates: ProposedUpdates, config: Config, } impl MpcContract { + fn mark_request_received(&mut self, request: &SignatureRequest) { + if self.pending_requests.insert(request, &None).is_none() { + self.request_counter += 1; + } + } + fn add_request(&mut self, request: &SignatureRequest, data_id: CryptoHash) { if self .pending_requests - .insert(request, &YieldIndex { data_id }) + .insert(request, &Some(YieldIndex { data_id })) .is_none() { self.request_counter += 1; @@ -166,6 +172,7 @@ impl VersionedMpcContract { "sign: predecessor={predecessor}, payload={payload:?}, path={path:?}, key_version={key_version}", ); env::log_str(&serde_json::to_string(&near_sdk::env::random_seed_array()).unwrap()); + self.mark_request_received(&request); let contract_signature_request = ContractSignatureRequest { request, requester: predecessor, @@ -174,7 +181,7 @@ impl VersionedMpcContract { }; Ok(Self::ext(env::current_account_id()).sign_helper(contract_signature_request)) } else { - Err(SignError::PayloadCollision.into()) + Err(SignError::RequestCollision.into()) } } @@ -275,7 +282,7 @@ impl VersionedMpcContract { match self { Self::V0(mpc_contract) => { - if let Some(YieldIndex { data_id }) = + if let Some(Some(YieldIndex { data_id })) = mpc_contract.pending_requests.get(&request) { env::promise_yield_resume( @@ -803,6 +810,12 @@ impl VersionedMpcContract { } } + fn mark_request_received(&mut self, request: &SignatureRequest) { + match self { + Self::V0(ref mut mpc_contract) => mpc_contract.mark_request_received(request), + } + } + fn threshold(&self) -> Result { match self { Self::V0(contract) => match &contract.protocol_state { diff --git a/chain-signatures/node/Cargo.toml b/chain-signatures/node/Cargo.toml index 68b91637d..819038dae 100644 --- a/chain-signatures/node/Cargo.toml +++ b/chain-signatures/node/Cargo.toml @@ -15,6 +15,7 @@ aws-sdk-s3 = "1.29" aws-types = "1.2" axum = { version = "0.6.19" } axum-extra = "0.7" +borsh = "1.5.0" cait-sith = { git = "https://github.com/LIT-Protocol/cait-sith.git", features = [ "k256", ], rev = "8ad2316" } diff --git a/chain-signatures/node/src/indexer.rs b/chain-signatures/node/src/indexer.rs index 9335e4bfd..4239dd55f 100644 --- a/chain-signatures/node/src/indexer.rs +++ b/chain-signatures/node/src/indexer.rs @@ -239,7 +239,7 @@ async fn handle_block( key_version: arguments.request.key_version, }; pending_requests.push(SignRequest { - receipt_id, + request_id: receipt_id.0, request, epsilon, entropy, diff --git a/chain-signatures/node/src/kdf.rs b/chain-signatures/node/src/kdf.rs index d38218ae2..5cdaaa33a 100644 --- a/chain-signatures/node/src/kdf.rs +++ b/chain-signatures/node/src/kdf.rs @@ -9,12 +9,12 @@ use sha3::Sha3_256; // that we generate different random scalars as delta tweaks. // Receipt ID should be unique inside of a block, so it serves us as the request identifier. pub fn derive_delta( - receipt_id: CryptoHash, + request_id: [u8; 32], entropy: [u8; 32], presignature_big_r: AffinePoint, ) -> Scalar { let hk = Hkdf::::new(None, &entropy); - let info = format!("{DELTA_DERIVATION_PREFIX}:{}", receipt_id); + let info = format!("{DELTA_DERIVATION_PREFIX}:{}", CryptoHash(request_id)); let mut okm = [0u8; 32]; hk.expand(info.as_bytes(), &mut okm).unwrap(); hk.expand( diff --git a/chain-signatures/node/src/protocol/message.rs b/chain-signatures/node/src/protocol/message.rs index 1d91071c8..60fd9fc8c 100644 --- a/chain-signatures/node/src/protocol/message.rs +++ b/chain-signatures/node/src/protocol/message.rs @@ -1,5 +1,6 @@ use super::cryptography::CryptographicError; use super::presignature::{GenerationError, PresignatureId}; +use super::signature::SignRequestIdentifier; use super::state::{GeneratingState, NodeState, ResharingState, RunningState}; use super::triple::TripleId; use crate::gcp::error::SecretStorageError; @@ -13,7 +14,6 @@ use cait_sith::protocol::{InitializationError, MessageData, Participant, Protoco use k256::Scalar; use mpc_keys::hpke::{self, Ciphered}; use near_crypto::Signature; -use near_primitives::hash::CryptoHash; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, VecDeque}; use std::sync::Arc; @@ -63,7 +63,7 @@ pub struct PresignatureMessage { #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub struct SignatureMessage { - pub receipt_id: CryptoHash, + pub request_id: [u8; 32], pub proposer: Participant, pub presignature_id: PresignatureId, pub request: ContractSignRequest, @@ -103,7 +103,7 @@ pub struct MpcMessageQueue { resharing_bins: HashMap>, triple_bins: HashMap>>, presignature_bins: HashMap>>, - signature_bins: HashMap>>, + signature_bins: HashMap>>, } impl MpcMessageQueue { @@ -133,7 +133,11 @@ impl MpcMessageQueue { .signature_bins .entry(message.epoch) .or_default() - .entry(message.receipt_id) + .entry(SignRequestIdentifier::new( + message.request_id, + message.epsilon, + message.request.payload, + )) .or_default() .push_back(message), } @@ -366,7 +370,7 @@ impl MessageHandler for RunningState { let mut signature_manager = self.signature_manager.write().await; let signature_messages = queue.signature_bins.entry(self.epoch).or_default(); - signature_messages.retain(|receipt_id, queue| { + signature_messages.retain(|sign_request_identifier, queue| { // Skip message if it already timed out if queue.is_empty() || queue.iter().any(|msg| { @@ -379,9 +383,9 @@ impl MessageHandler for RunningState { return false; } - !signature_manager.refresh_gc(receipt_id) + !signature_manager.refresh_gc(sign_request_identifier) }); - for (receipt_id, queue) in signature_messages { + for (sign_request_identifier, queue) in signature_messages { // SAFETY: this unwrap() is safe since we have already checked that the queue is not empty. let SignatureMessage { proposer, @@ -414,7 +418,7 @@ impl MessageHandler for RunningState { // TODO: Validate that the message matches our sign_queue let protocol = match signature_manager.get_or_generate( participants, - *receipt_id, + sign_request_identifier.request_id, *proposer, *presignature_id, request, @@ -437,7 +441,11 @@ impl MessageHandler for RunningState { // and have the other nodes timeout in the following cases: // - If a presignature is in GC, then it was used already or failed to be produced. // - If a presignature is missing, that means our system cannot process this signature. - tracing::warn!(%receipt_id, ?err, "signature cannot be generated"); + tracing::warn!( + ?sign_request_identifier, + ?err, + "signature cannot be generated" + ); queue.clear(); continue; } @@ -445,7 +453,7 @@ impl MessageHandler for RunningState { // ignore the whole of the messages since the generation had bad parameters. Also have the other node who // initiated the protocol resend the message or have it timeout on their side. tracing::warn!( - ?receipt_id, + ?sign_request_identifier, presignature_id, ?error, "unable to initialize incoming signature protocol" @@ -455,7 +463,7 @@ impl MessageHandler for RunningState { } Err(err) => { tracing::warn!( - ?receipt_id, + ?sign_request_identifier, ?err, "Unexpected error encounted while generating signature" ); diff --git a/chain-signatures/node/src/protocol/signature.rs b/chain-signatures/node/src/protocol/signature.rs index e148ca107..4fd20e534 100644 --- a/chain-signatures/node/src/protocol/signature.rs +++ b/chain-signatures/node/src/protocol/signature.rs @@ -28,7 +28,7 @@ use near_fetch::signer::SignerExt; pub type ReceiptId = near_primitives::hash::CryptoHash; pub struct SignRequest { - pub receipt_id: ReceiptId, + pub request_id: [u8; 32], pub request: ContractSignRequest, pub epsilon: Scalar, pub entropy: [u8; 32], @@ -38,18 +38,12 @@ pub struct SignRequest { /// Type that preserves the insertion order of requests. #[derive(Default)] pub struct ParticipantRequests { - requests: HashMap, - order: VecDeque, + requests: VecDeque, } impl ParticipantRequests { - fn insert(&mut self, receipt_id: ReceiptId, request: SignRequest) { - self.requests.insert(receipt_id, request); - self.order.push_back(receipt_id); - } - - fn contains_key(&self, receipt_id: &ReceiptId) -> bool { - self.requests.contains_key(receipt_id) + fn insert(&mut self, request: SignRequest) { + self.requests.push_back(request); } pub fn len(&self) -> usize { @@ -60,11 +54,8 @@ impl ParticipantRequests { self.len() == 0 } - pub fn pop_front(&mut self) -> Option<(ReceiptId, SignRequest)> { - let receipt_id = self.order.pop_front()?; - self.requests - .remove(&receipt_id) - .map(|req| (receipt_id, req)) + pub fn pop_front(&mut self) -> Option { + self.requests.pop_front() } } @@ -89,7 +80,7 @@ impl SignQueue { pub fn add(&mut self, request: SignRequest) { tracing::info!( - receipt_id = %request.receipt_id, + request_id = ?CryptoHash(request.request_id), payload = hex::encode(request.request.payload.to_bytes()), entropy = hex::encode(request.entropy), "new sign request" @@ -120,14 +111,14 @@ impl SignQueue { if subset.contains(&&me) { let is_mine = proposer == me; tracing::info!( - receipt_id = %request.receipt_id, + request_id = ?CryptoHash(request.request_id), ?is_mine, ?subset, ?proposer, "saving sign request: node is in the signer subset" ); let proposer_requests = self.requests.entry(proposer).or_default(); - proposer_requests.insert(request.receipt_id, request); + proposer_requests.insert(request); if is_mine { crate::metrics::NUM_SIGN_REQUESTS_MINE .with_label_values(&[my_account_id.as_str()]) @@ -135,7 +126,7 @@ impl SignQueue { } } else { tracing::info!( - receipt_id = %request.receipt_id, + rrequest_id = ?CryptoHash(request.request_id), ?me, ?subset, ?proposer, @@ -145,13 +136,6 @@ impl SignQueue { } } - pub fn contains(&self, participant: Participant, receipt_id: ReceiptId) -> bool { - let Some(participant_requests) = self.requests.get(&participant) else { - return false; - }; - participant_requests.contains_key(&receipt_id) - } - pub fn my_requests(&mut self, me: Participant) -> &mut ParticipantRequests { self.requests.entry(me).or_default() } @@ -165,7 +149,7 @@ pub struct SignatureGenerator { pub presignature_id: PresignatureId, pub request: ContractSignRequest, pub epsilon: Scalar, - pub receipt_id: CryptoHash, + pub request_id: [u8; 32], pub entropy: [u8; 32], pub sign_request_timestamp: Instant, pub generator_timestamp: Instant, @@ -182,7 +166,7 @@ impl SignatureGenerator { presignature_id: PresignatureId, request: ContractSignRequest, epsilon: Scalar, - receipt_id: CryptoHash, + request_id: [u8; 32], entropy: [u8; 32], sign_request_timestamp: Instant, cfg: &ProtocolConfig, @@ -194,7 +178,7 @@ impl SignatureGenerator { presignature_id, request, epsilon, - receipt_id, + request_id, entropy, sign_request_timestamp, generator_timestamp: Instant::now(), @@ -227,18 +211,35 @@ pub struct GenerationRequest { pub proposer: Participant, pub request: ContractSignRequest, pub epsilon: Scalar, - pub receipt_id: CryptoHash, + pub request_id: [u8; 32], pub entropy: [u8; 32], pub sign_request_timestamp: Instant, } +#[derive(Debug, Clone, Eq, Hash, PartialEq)] +pub struct SignRequestIdentifier { + pub request_id: [u8; 32], + pub epsilon: Vec, + pub payload: Vec, +} + +impl SignRequestIdentifier { + pub fn new(request_id: [u8; 32], epsilon: Scalar, payload: Scalar) -> Self { + Self { + request_id, + epsilon: borsh::to_vec(&SerializableScalar { scalar: epsilon }).unwrap(), + payload: borsh::to_vec(&SerializableScalar { scalar: payload }).unwrap(), + } + } +} + pub struct SignatureManager { /// Ongoing signature generation protocols. - generators: HashMap, + generators: HashMap, /// Failed signatures awaiting to be retried. - failed: VecDeque<(ReceiptId, GenerationRequest)>, + failed: VecDeque<(SignRequestIdentifier, GenerationRequest)>, /// Set of completed signatures - completed: HashMap, + completed: HashMap, /// Generated signatures assigned to the current node that are yet to be published. /// Vec<(receipt_id, msg_hash, timestamp, output)> signatures: Vec, @@ -250,7 +251,7 @@ pub struct SignatureManager { pub const MAX_RETRY: u8 = 10; pub struct ToPublish { - receipt_id: ReceiptId, + request_id: [u8; 32], request: SignatureRequest, time_added: Instant, signature: FullSignature, @@ -259,13 +260,13 @@ pub struct ToPublish { impl ToPublish { pub fn new( - receipt_id: ReceiptId, + request_id: [u8; 32], request: SignatureRequest, time_added: Instant, signature: FullSignature, ) -> ToPublish { ToPublish { - receipt_id, + request_id, request, time_added, signature, @@ -316,12 +317,12 @@ impl SignatureManager { proposer, request, epsilon, - receipt_id, + request_id, entropy, sign_request_timestamp, } = req; let PresignOutput { big_r, k, sigma } = presignature.output; - let delta = derive_delta(receipt_id, entropy, big_r); + let delta = derive_delta(request_id, entropy, big_r); // TODO: Check whether it is okay to use invert_vartime instead let output: PresignOutput = PresignOutput { big_r: (big_r * delta).to_affine(), @@ -346,7 +347,7 @@ impl SignatureManager { presignature_id, request, epsilon, - receipt_id, + request_id, entropy, sign_request_timestamp, cfg, @@ -356,13 +357,13 @@ impl SignatureManager { #[allow(clippy::result_large_err)] fn retry_failed_generation( &mut self, - receipt_id: ReceiptId, + sign_request_identifier: SignRequestIdentifier, req: GenerationRequest, presignature: Presignature, participants: &Participants, cfg: &ProtocolConfig, ) -> Result<(), (Presignature, InitializationError)> { - tracing::info!(receipt_id = %receipt_id, participants = ?participants.keys_vec(), "restarting failed protocol to generate signature"); + tracing::info!(sign_request_identifier = ?sign_request_identifier, participants = ?participants.keys_vec(), "restarting failed protocol to generate signature"); let generator = Self::generate_internal( participants, self.me, @@ -374,7 +375,7 @@ impl SignatureManager { crate::metrics::NUM_TOTAL_HISTORICAL_SIGNATURE_GENERATORS .with_label_values(&[self.my_account_id.as_str()]) .inc(); - self.generators.insert(receipt_id, generator); + self.generators.insert(sign_request_identifier, generator); Ok(()) } @@ -384,7 +385,7 @@ impl SignatureManager { pub fn generate( &mut self, participants: &Participants, - receipt_id: ReceiptId, + request_id: [u8; 32], presignature: Presignature, request: ContractSignRequest, epsilon: Scalar, @@ -392,8 +393,10 @@ impl SignatureManager { sign_request_timestamp: Instant, cfg: &ProtocolConfig, ) -> Result<(), (Presignature, InitializationError)> { + let sign_request_identifier = + SignRequestIdentifier::new(request_id, epsilon, request.payload); tracing::info!( - %receipt_id, + ?sign_request_identifier, me = ?self.me, presignature_id = presignature.id, participants = ?participants.keys_vec(), @@ -408,7 +411,7 @@ impl SignatureManager { proposer: self.me, request, epsilon, - receipt_id, + request_id, entropy, sign_request_timestamp, }, @@ -417,7 +420,7 @@ impl SignatureManager { crate::metrics::NUM_TOTAL_HISTORICAL_SIGNATURE_GENERATORS .with_label_values(&[self.my_account_id.as_str()]) .inc(); - self.generators.insert(receipt_id, generator); + self.generators.insert(sign_request_identifier, generator); Ok(()) } @@ -431,7 +434,7 @@ impl SignatureManager { pub fn get_or_generate( &mut self, participants: &Participants, - receipt_id: ReceiptId, + request_id: [u8; 32], proposer: Participant, presignature_id: PresignatureId, request: &ContractSignRequest, @@ -440,13 +443,15 @@ impl SignatureManager { presignature_manager: &mut PresignatureManager, cfg: &ProtocolConfig, ) -> Result<&mut SignatureProtocol, GenerationError> { - if self.completed.contains_key(&receipt_id) { - tracing::warn!(%receipt_id, presignature_id, "presignature has already been used to generate a signature"); + let sign_request_identifier = + SignRequestIdentifier::new(request_id, epsilon, request.payload); + if self.completed.contains_key(&sign_request_identifier) { + tracing::warn!(sign_request_identifier = ?sign_request_identifier.clone(), presignature_id, "presignature has already been used to generate a signature"); return Err(GenerationError::AlreadyGenerated); } - match self.generators.entry(receipt_id) { + match self.generators.entry(sign_request_identifier.clone()) { Entry::Vacant(entry) => { - tracing::info!(%receipt_id, me = ?self.me, presignature_id, "joining protocol to generate a new signature"); + tracing::info!(sign_request_identifier = ?sign_request_identifier.clone(), me = ?self.me, presignature_id, "joining protocol to generate a new signature"); let presignature = match presignature_manager.take(presignature_id) { Ok(presignature) => presignature, Err(err @ GenerationError::PresignatureIsGenerating(_)) => { @@ -474,7 +479,7 @@ impl SignatureManager { request: request.clone(), epsilon, entropy, - receipt_id, + request_id, sign_request_timestamp: Instant::now(), }, cfg, @@ -482,7 +487,7 @@ impl SignatureManager { Ok(generator) => generator, Err((presignature, err @ InitializationError::BadParameters(_))) => { presignature_manager.insert_mine(presignature); - tracing::warn!(%receipt_id, presignature_id, ?err, "failed to start signature generation"); + tracing::warn!(sign_request = ?sign_request_identifier, presignature_id, ?err, "failed to start signature generation"); return Err(GenerationError::CaitSithInitializationError(err)); } }; @@ -502,7 +507,7 @@ impl SignatureManager { /// An empty vector means we cannot progress until we receive a new message. pub fn poke(&mut self) -> Vec<(Participant, SignatureMessage)> { let mut messages = Vec::new(); - self.generators.retain(|receipt_id, generator| { + self.generators.retain(|sign_request_identifier, generator| { loop { let action = match generator.poke() { Ok(action) => action, @@ -516,18 +521,18 @@ impl SignatureManager { // only retry the signature generation if it was initially proposed by us. We do not // want any nodes to be proposing the same signature multiple times. self.failed.push_back(( - *receipt_id, + sign_request_identifier.clone(), GenerationRequest { proposer: generator.proposer, request: generator.request.clone(), epsilon: generator.epsilon, - receipt_id: generator.receipt_id, + request_id: generator.request_id, entropy: generator.entropy, sign_request_timestamp: generator.sign_request_timestamp }, )); } else { - self.completed.insert(*receipt_id, Instant::now()); + self.completed.insert(sign_request_identifier.clone(), Instant::now()); crate::metrics::SIGNATURE_FAILURES .with_label_values(&[self.my_account_id.as_str()]) .inc(); @@ -548,7 +553,7 @@ impl SignatureManager { messages.push(( *p, SignatureMessage { - receipt_id: *receipt_id, + request_id: sign_request_identifier.request_id, proposer: generator.proposer, presignature_id: generator.presignature_id, request: generator.request.clone(), @@ -565,7 +570,7 @@ impl SignatureManager { Action::SendPrivate(p, data) => messages.push(( p, SignatureMessage { - receipt_id: *receipt_id, + request_id: sign_request_identifier.request_id, proposer: generator.proposer, presignature_id: generator.presignature_id, request: generator.request.clone(), @@ -579,21 +584,21 @@ impl SignatureManager { )), Action::Return(output) => { tracing::info!( - ?receipt_id, + sign_request_identifier =?sign_request_identifier.clone(), me = ?self.me, presignature_id = generator.presignature_id, big_r = ?output.big_r.to_base58(), s = ?output.s, "completed signature generation" ); - self.completed.insert(*receipt_id, Instant::now()); + self.completed.insert(sign_request_identifier.clone(), Instant::now()); let request = SignatureRequest { epsilon: SerializableScalar {scalar: generator.epsilon}, payload_hash: generator.request.payload.into(), }; if generator.proposer == self.me { self.signatures - .push(ToPublish::new(*receipt_id, request, generator.sign_request_timestamp, output)); + .push(ToPublish::new(sign_request_identifier.request_id, request, generator.sign_request_timestamp, output)); } // Do not retain the protocol return false; @@ -644,17 +649,22 @@ impl SignatureManager { // TODO: we need to decide how to prioritize certain requests over others such as with gas or time of // when the request made it into the NEAR network. // issue: https://github.com/near/mpc-recovery/issues/596 - if let Some((receipt_id, failed_req)) = self.failed.pop_front() { + if let Some((sign_request_identifier, failed_req)) = self.failed.pop_front() { if let Err((presignature, InitializationError::BadParameters(err))) = self .retry_failed_generation( - receipt_id, + sign_request_identifier.clone(), failed_req, presignature, &sig_participants, cfg, ) { - tracing::warn!(%receipt_id, presig_id, ?err, "failed to retry signature generation: trashing presignature"); + tracing::warn!( + ?sign_request_identifier, + presig_id, + ?err, + "failed to retry signature generation: trashing presignature" + ); failed_presigs.push(presignature); continue; } @@ -666,13 +676,14 @@ impl SignatureManager { } } - let Some((receipt_id, my_request)) = my_requests.pop_front() else { + let Some(my_request) = my_requests.pop_front() else { failed_presigs.push(presignature); continue; }; + if let Err((presignature, InitializationError::BadParameters(err))) = self.generate( &sig_participants, - receipt_id, + my_request.request_id, presignature, my_request.request, my_request.epsilon, @@ -681,7 +692,7 @@ impl SignatureManager { cfg, ) { failed_presigs.push(presignature); - tracing::warn!(%receipt_id, presig_id, ?err, "failed to start signature generation: trashing presignature"); + tracing::warn!(request_id = ?CryptoHash(my_request.request_id), presig_id, ?err, "failed to start signature generation: trashing presignature"); continue; } } @@ -703,7 +714,7 @@ impl SignatureManager { for mut to_publish in self.signatures.drain(..) { let ToPublish { - receipt_id, + request_id, request, time_added, signature, @@ -717,7 +728,7 @@ impl SignatureManager { &signature.s, request.payload_hash.scalar, ) else { - tracing::error!(%receipt_id, "Failed to generate a recovery ID"); + tracing::error!(request_id = ?CryptoHash(*request_id), "Failed to generate a recovery ID"); continue; }; let response = match rpc_client @@ -733,7 +744,7 @@ impl SignatureManager { { Ok(response) => response, Err(err) => { - tracing::error!(%receipt_id, error = ?err, "Failed to publish the signature"); + tracing::error!(request_id = ?CryptoHash(*request_id), request = ?request, error = ?err, "Failed to publish the signature"); crate::metrics::SIGNATURE_PUBLISH_FAILURES .with_label_values(&[self.my_account_id.as_str()]) .inc(); @@ -748,10 +759,10 @@ impl SignatureManager { match response.json() { Ok(()) => { - tracing::info!(%receipt_id, bi_r = signature.big_r.affine_point.to_base58(), s = ?signature.s, "published signature sucessfully") + tracing::info!(request_id = ?CryptoHash(*request_id), request = ?request, bi_r = signature.big_r.affine_point.to_base58(), s = ?signature.s, "published signature sucessfully") } Err(err) => { - tracing::error!(%receipt_id, bi_r = signature.big_r.affine_point.to_base58(), s = ?signature.s, error = ?err, "smart contract threw error"); + tracing::error!(request_id = ?CryptoHash(*request_id), bi_r = signature.big_r.affine_point.to_base58(), s = ?signature.s, error = ?err, "smart contract threw error"); crate::metrics::SIGNATURE_PUBLISH_RESPONSE_ERRORS .with_label_values(&[self.my_account_id.as_str()]) .inc(); @@ -790,10 +801,10 @@ impl SignatureManager { } } - pub fn refresh_gc(&mut self, id: &ReceiptId) -> bool { + pub fn refresh_gc(&mut self, id: &SignRequestIdentifier) -> bool { let entry = self .completed - .entry(*id) + .entry(id.clone()) .and_modify(|e| *e = Instant::now()); matches!(entry, Entry::Occupied(_)) } diff --git a/chain-signatures/node/src/protocol/triple.rs b/chain-signatures/node/src/protocol/triple.rs index 2bcca581b..c76ac16b6 100644 --- a/chain-signatures/node/src/protocol/triple.rs +++ b/chain-signatures/node/src/protocol/triple.rs @@ -226,7 +226,7 @@ impl TripleManager { ))); } - tracing::info!(id, "starting protocol to generate a new triple"); + tracing::debug!(id, "starting protocol to generate a new triple"); let participants: Vec<_> = participants.keys().cloned().collect(); let protocol: TripleProtocol = Box::new(cait_sith::triples::generate_triple::( &participants, diff --git a/integration-tests/chain-signatures/Cargo.lock b/integration-tests/chain-signatures/Cargo.lock index e76ab76df..3f5699c9a 100644 --- a/integration-tests/chain-signatures/Cargo.lock +++ b/integration-tests/chain-signatures/Cargo.lock @@ -3695,6 +3695,7 @@ dependencies = [ "aws-types", "axum", "axum-extra", + "borsh", "cait-sith", "chrono", "clap", diff --git a/integration-tests/chain-signatures/tests/actions/mod.rs b/integration-tests/chain-signatures/tests/actions/mod.rs index fa2236ba4..478259efc 100644 --- a/integration-tests/chain-signatures/tests/actions/mod.rs +++ b/integration-tests/chain-signatures/tests/actions/mod.rs @@ -14,18 +14,20 @@ use k256::elliptic_curve::sec1::FromEncodedPoint; use k256::elliptic_curve::ProjectivePoint; use k256::{AffinePoint, EncodedPoint, Scalar, Secp256k1}; use mpc_contract::errors; +use mpc_contract::errors::SignError; use mpc_contract::primitives::SignRequest; use mpc_contract::primitives::SignatureRequest; use mpc_contract::RunningContractState; use mpc_node::kdf::into_eth_sig; use near_crypto::InMemorySigner; use near_fetch::ops::AsyncTransactionStatus; +use near_fetch::ops::Function; use near_workspaces::types::Gas; use near_workspaces::types::NearToken; use near_workspaces::Account; use rand::Rng; use secp256k1::XOnlyPublicKey; -use wait_for::WaitForError; +use wait_for::{SignatureError, WaitForError}; use std::time::Duration; @@ -71,6 +73,77 @@ pub async fn request_sign( Ok((payload, payload_hashed, account, status)) } +pub async fn request_batch_random_sign( + ctx: &MultichainTestContext<'_>, +) -> anyhow::Result<(Vec<([u8; 32], [u8; 32])>, Account, AsyncTransactionStatus)> { + let worker = &ctx.nodes.ctx().worker; + let account = worker.dev_create_account().await?; + let signer = InMemorySigner { + account_id: account.id().clone(), + public_key: account.secret_key().public_key().to_string().parse()?, + secret_key: account.secret_key().to_string().parse()?, + }; + + let mut payloads: Vec<([u8; 32], [u8; 32])> = vec![]; + let mut tx = ctx.rpc_client.batch(&signer, ctx.contract().id()); + for _ in 0..3 { + let payload: [u8; 32] = rand::thread_rng().gen(); + let payload_hashed = web3::signing::keccak256(&payload); + payloads.push((payload, payload_hashed)); + let request = SignRequest { + payload: payload_hashed, + path: "test".to_string(), + key_version: 0, + }; + let function = Function::new("sign") + .args_json(serde_json::json!({ + "request": request, + })) + .gas(Gas::from_tgas(50)) + .deposit(NearToken::from_yoctonear(1)); + tx = tx.call(function); + } + + let status = tx.transact_async().await?; + tokio::time::sleep(Duration::from_secs(3)).await; + Ok((payloads, account, status)) +} + +pub async fn request_batch_duplicate_sign( + ctx: &MultichainTestContext<'_>, +) -> anyhow::Result<([u8; 32], u32, Account, AsyncTransactionStatus)> { + let worker = &ctx.nodes.ctx().worker; + let account = worker.dev_create_account().await?; + let signer = InMemorySigner { + account_id: account.id().clone(), + public_key: account.secret_key().public_key().to_string().parse()?, + secret_key: account.secret_key().to_string().parse()?, + }; + + let mut tx = ctx.rpc_client.batch(&signer, ctx.contract().id()); + let payload: [u8; 32] = rand::thread_rng().gen(); + let payload_hashed = web3::signing::keccak256(&payload); + let sign_call_cnt = 2; + for _ in 0..sign_call_cnt { + let request = SignRequest { + payload: payload_hashed, + path: "test".to_string(), + key_version: 0, + }; + let function = Function::new("sign") + .args_json(serde_json::json!({ + "request": request, + })) + .gas(Gas::from_tgas(50)) + .deposit(NearToken::from_yoctonear(1)); + tx = tx.call(function); + } + + let status = tx.transact_async().await?; + tokio::time::sleep(Duration::from_secs(3)).await; + Ok((payload_hashed, sign_call_cnt, account, status)) +} + pub async fn assert_signature( account_id: &near_workspaces::AccountId, mpc_pk_bytes: &[u8], @@ -303,6 +376,40 @@ pub async fn clear_toxics() -> anyhow::Result<()> { Ok(()) } +pub async fn batch_random_signature_production( + ctx: &MultichainTestContext<'_>, + state: &RunningContractState, +) -> anyhow::Result<()> { + let (payloads, account, status) = request_batch_random_sign(ctx).await?; + let signatures = wait_for::batch_signature_responded(status).await?; + + let mut mpc_pk_bytes = vec![0x04]; + mpc_pk_bytes.extend_from_slice(&state.public_key.as_bytes()[1..]); + assert_eq!(payloads.len(), signatures.len()); + for i in 0..payloads.len() { + let (_, payload_hash) = payloads.get(i).unwrap(); + let signature = signatures.get(i).unwrap(); + assert_signature(account.id(), &mpc_pk_bytes, *payload_hash, signature).await; + } + + Ok(()) +} + +pub async fn batch_duplicate_signature_production( + ctx: &MultichainTestContext<'_>, + _state: &RunningContractState, +) -> anyhow::Result<()> { + let (_, _, _, status) = request_batch_duplicate_sign(ctx).await?; + let result = wait_for::batch_signature_responded(status).await; + match result { + Err(WaitForError::Signature(SignatureError::Failed(err_msg))) => { + assert!(err_msg.contains(&SignError::RequestCollision.to_string())); + } + _ => panic!("Should have failed with PayloadCollision"), + } + Ok(()) +} + // This test hardcodes the output of the signing process and checks that everything verifies as expected // If you find yourself changing the constants in this test you are likely breaking backwards compatibility #[tokio::test] diff --git a/integration-tests/chain-signatures/tests/actions/wait_for.rs b/integration-tests/chain-signatures/tests/actions/wait_for.rs index ac2e00693..19bfabc2d 100644 --- a/integration-tests/chain-signatures/tests/actions/wait_for.rs +++ b/integration-tests/chain-signatures/tests/actions/wait_for.rs @@ -14,9 +14,13 @@ use mpc_contract::ProtocolContractState; use mpc_contract::RunningContractState; use mpc_node::web::StateView; use near_fetch::ops::AsyncTransactionStatus; +use near_lake_primitives::CryptoHash; use near_primitives::errors::ActionErrorKind; +use near_primitives::views::ExecutionOutcomeWithIdView; +use near_primitives::views::ExecutionStatusView; use near_primitives::views::FinalExecutionStatus; use near_workspaces::Account; +use std::collections::HashMap; use url::Url; pub async fn running_mpc<'a>( @@ -256,6 +260,7 @@ pub enum WaitForError { enum Outcome { Signature(FullSignature), Failed(String), + Signatures(Vec>), } pub async fn signature_responded( @@ -290,6 +295,9 @@ pub async fn signature_responded( match is_tx_ready.retry(&strategy).await? { Outcome::Signature(signature) => Ok(signature), Outcome::Failed(err) => Err(WaitForError::Signature(SignatureError::Failed(err))), + _ => Err(WaitForError::Signature(SignatureError::Failed( + "Should not return more than one signature".to_string(), + ))), } } @@ -366,3 +374,91 @@ pub async fn rogue_message_responded(status: AsyncTransactionStatus) -> anyhow:: Ok(signature.clone()) } + +pub async fn batch_signature_responded( + status: AsyncTransactionStatus, +) -> Result>, WaitForError> { + let is_tx_ready = || async { + let Poll::Ready(outcome) = status + .status() + .await + .map_err(|err| WaitForError::JsonRpc(format!("{err:?}")))? + else { + return Err(WaitForError::Signature(SignatureError::NotYetAvailable)); + }; + + if !outcome.is_success() { + return Err(WaitForError::Signature(SignatureError::Failed(format!( + "status: {:?}", + outcome.status() + )))); + } + + let receipt_outcomes = outcome.details.receipt_outcomes(); + let mut result_receipts: HashMap> = HashMap::new(); + for receipt_outcome in receipt_outcomes { + result_receipts + .entry(receipt_outcome.id) + .or_insert(receipt_outcome.outcome.receipt_ids.clone()); + } + let mut receipt_outcomes_keyed: HashMap = + HashMap::new(); + for receipt_outcome in receipt_outcomes { + receipt_outcomes_keyed + .entry(receipt_outcome.id) + .or_insert(receipt_outcome); + } + + let starting_receipts = &receipt_outcomes.first().unwrap().outcome.receipt_ids; + + let mut signatures: Vec> = vec![]; + for receipt_id in starting_receipts { + if !result_receipts.contains_key(receipt_id) { + break; + } + let sign_receipt_id = receipt_id; + for receipt_id in result_receipts.get(sign_receipt_id).unwrap() { + let receipt_outcome = receipt_outcomes_keyed + .get(receipt_id) + .unwrap() + .outcome + .clone(); + if receipt_outcome + .logs + .contains(&"Signature is ready.".to_string()) + { + match receipt_outcome.status { + ExecutionStatusView::SuccessValue(value) => { + let result: SignatureResponse = serde_json::from_slice(&value) + .map_err(|err| WaitForError::SerdeJson(format!("{err:?}")))?; + let signature = cait_sith::FullSignature:: { + big_r: result.big_r.affine_point, + s: result.s.scalar, + }; + signatures.push(signature); + } + _ => { + return Err(WaitForError::Signature(SignatureError::Failed( + "one signature not done.".to_string(), + ))) + } + } + } + } + } + + Ok(Outcome::Signatures(signatures)) + }; + + let strategy = ConstantBuilder::default() + .with_delay(Duration::from_secs(20)) + .with_max_times(5); + + match is_tx_ready.retry(&strategy).await? { + Outcome::Signature(_) => Err(WaitForError::Signature(SignatureError::Failed( + "Should not return just 1 signature".to_string(), + ))), + Outcome::Failed(err) => Err(WaitForError::Signature(SignatureError::Failed(err))), + Outcome::Signatures(signatures) => Ok(signatures), + } +} diff --git a/integration-tests/chain-signatures/tests/cases/mod.rs b/integration-tests/chain-signatures/tests/cases/mod.rs index a166d7d23..c7df40c95 100644 --- a/integration-tests/chain-signatures/tests/cases/mod.rs +++ b/integration-tests/chain-signatures/tests/cases/mod.rs @@ -392,3 +392,29 @@ async fn test_multichain_update_contract() -> anyhow::Result<()> { }) .await } + +#[test(tokio::test)] +async fn test_batch_random_signature() -> anyhow::Result<()> { + with_multichain_nodes(MultichainConfig::default(), |ctx| { + Box::pin(async move { + let state_0 = wait_for::running_mpc(&ctx, Some(0)).await?; + assert_eq!(state_0.participants.len(), 3); + actions::batch_random_signature_production(&ctx, &state_0).await?; + Ok(()) + }) + }) + .await +} + +#[test(tokio::test)] +async fn test_batch_duplicate_signature() -> anyhow::Result<()> { + with_multichain_nodes(MultichainConfig::default(), |ctx| { + Box::pin(async move { + let state_0 = wait_for::running_mpc(&ctx, Some(0)).await?; + assert_eq!(state_0.participants.len(), 3); + actions::batch_duplicate_signature_production(&ctx, &state_0).await?; + Ok(()) + }) + }) + .await +}