diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 28868a72aaf2e..acc01be51dcf7 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -38,12 +38,14 @@ impl Default for QuorumStoreBackPressureConfig { pub struct QuorumStoreConfig { pub channel_size: usize, pub proof_timeout_ms: usize, - pub batch_request_num_peers: usize, pub batch_generation_poll_interval_ms: usize, pub batch_generation_min_non_empty_interval_ms: usize, pub batch_generation_max_interval_ms: usize, pub max_batch_bytes: usize, - pub batch_request_timeout_ms: usize, + pub batch_request_num_peers: usize, + pub batch_request_retry_limit: usize, + pub batch_request_retry_interval_ms: usize, + pub batch_request_rpc_timeout_ms: usize, /// Used when setting up the expiration time for the batch initation. pub batch_expiry_gap_when_init_usecs: u64, pub memory_quota: usize, @@ -59,12 +61,14 @@ impl Default for QuorumStoreConfig { QuorumStoreConfig { channel_size: 1000, proof_timeout_ms: 10000, - batch_request_num_peers: 2, batch_generation_poll_interval_ms: 25, batch_generation_min_non_empty_interval_ms: 100, batch_generation_max_interval_ms: 250, max_batch_bytes: 4 * 1024 * 1024, - batch_request_timeout_ms: 10000, + batch_request_num_peers: 3, + batch_request_retry_limit: 10, + batch_request_retry_interval_ms: 1000, + batch_request_rpc_timeout_ms: 5000, batch_expiry_gap_when_init_usecs: Duration::from_secs(60).as_micros() as u64, memory_quota: 120_000_000, db_quota: 300_000_000, diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index 5ac5f93a12892..41cbd37d0ca90 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + monitor, network::QuorumStoreSender, quorum_store::{counters, types::BatchRequest}, }; @@ -10,33 +11,43 @@ use aptos_executor_types::*; use aptos_logger::prelude::*; use aptos_types::{transaction::SignedTransaction, PeerId}; use futures::{stream::FuturesUnordered, StreamExt}; +use rand::Rng; use std::time::Duration; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, time}; struct BatchRequesterState { signers: Vec, next_index: usize, ret_tx: oneshot::Sender, aptos_executor_types::Error>>, num_retries: usize, - max_num_retry: usize, + retry_limit: usize, } impl BatchRequesterState { fn new( signers: Vec, ret_tx: oneshot::Sender, aptos_executor_types::Error>>, + retry_limit: usize, ) -> Self { Self { signers, next_index: 0, ret_tx, num_retries: 0, - max_num_retry: 5, // TODO: get it from config. + retry_limit, } } fn next_request_peers(&mut self, num_peers: usize) -> Option> { - if self.num_retries < self.max_num_retry { + if self.num_retries == 0 { + let mut rng = rand::thread_rng(); + // make sure nodes request from the different set of nodes + self.next_index = rng.gen::() % self.signers.len(); + counters::SENT_BATCH_REQUEST_COUNT.inc_by(num_peers as u64); + } else { + counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc_by(num_peers as u64); + } + if self.num_retries < self.retry_limit { self.num_retries += 1; let ret = self .signers @@ -84,23 +95,29 @@ pub(crate) struct BatchRequester { epoch: u64, my_peer_id: PeerId, request_num_peers: usize, - request_timeout_ms: usize, + retry_limit: usize, + retry_interval_ms: usize, + rpc_timeout_ms: usize, network_sender: T, } -impl BatchRequester { +impl BatchRequester { pub(crate) fn new( epoch: u64, my_peer_id: PeerId, request_num_peers: usize, - request_timeout_ms: usize, + retry_limit: usize, + retry_interval_ms: usize, + rpc_timeout_ms: usize, network_sender: T, ) -> Self { Self { epoch, my_peer_id, request_num_peers, - request_timeout_ms, + retry_limit, + retry_interval_ms, + rpc_timeout_ms, network_sender, } } @@ -111,39 +128,47 @@ impl BatchRequester { signers: Vec, ret_tx: oneshot::Sender, Error>>, ) { - let mut request_state = BatchRequesterState::new(signers, ret_tx); + let mut request_state = BatchRequesterState::new(signers, ret_tx, self.retry_limit); let network_sender = self.network_sender.clone(); let request_num_peers = self.request_num_peers; let my_peer_id = self.my_peer_id; let epoch = self.epoch; - let timeout = Duration::from_millis(self.request_timeout_ms as u64); + let retry_interval = Duration::from_millis(self.retry_interval_ms as u64); + let rpc_timeout = Duration::from_millis(self.rpc_timeout_ms as u64); tokio::spawn(async move { - while let Some(request_peers) = request_state.next_request_peers(request_num_peers) { + monitor!("batch_request", { + let mut interval = time::interval(retry_interval); let mut futures = FuturesUnordered::new(); - trace!("QS: requesting from {:?}", request_peers); let request = BatchRequest::new(my_peer_id, epoch, digest); - for peer in request_peers { - counters::SENT_BATCH_REQUEST_COUNT.inc(); - futures.push(network_sender.request_batch(request.clone(), peer, timeout)); - } - while let Some(response) = futures.next().await { - match response { - Ok(batch) => { - counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); - let digest = *batch.digest(); - let payload = batch.into_transactions(); - request_state.serve_request(digest, Some(payload)); - return; - }, - Err(e) => { - error!("Batch request failed: {}", e); + loop { + tokio::select! { + _ = interval.tick() => { + // send batch request to a set of peers of size request_num_peers + if let Some(request_peers) = request_state.next_request_peers(request_num_peers) { + for peer in request_peers { + futures.push(network_sender.request_batch(request.clone(), peer, rpc_timeout)); + } + } else if futures.is_empty() { + // end the loop when the futures are drained + break; + } + } + Some(response) = futures.next() => { + if let Ok(batch) = response { + counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); + if batch.verify().is_ok() { + let digest = *batch.digest(); + let payload = batch.into_transactions(); + request_state.serve_request(digest, Some(payload)); + return; + } + } }, } } - counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc(); - } - request_state.serve_request(digest, None); + request_state.serve_request(digest, None); + }) }); } } diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index 21a7e6da3efa6..5ed4d5c74b2f6 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -231,7 +231,9 @@ impl InnerBuilder { self.epoch, self.author, self.config.batch_request_num_peers, - self.config.batch_request_timeout_ms, + self.config.batch_request_retry_limit, + self.config.batch_request_retry_interval_ms, + self.config.batch_request_rpc_timeout_ms, self.network_sender.clone(), ); let batch_store = Arc::new(BatchStore::new( diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index 2d8fb1b2f24df..20d79a406cc64 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -31,6 +31,8 @@ fn batch_store_for_test_no_db(memory_quota: usize) -> Arc