From 34685c97b0c1989e8169f8bf3fd33a55e4e9b265 Mon Sep 17 00:00:00 2001 From: danielx <66756900+danielxiangzl@users.noreply.github.com> Date: Thu, 16 Mar 2023 23:53:34 -0700 Subject: [PATCH] [QuorumStore] Fix batch requester (#7190) * fix batch request counter, add retry limit to config * new batch requester --- config/src/config/quorum_store_config.rs | 12 ++- consensus/src/quorum_store/batch_requester.rs | 79 +++++++++++++------ .../src/quorum_store/quorum_store_builder.rs | 4 +- .../quorum_store/tests/batch_store_test.rs | 2 + 4 files changed, 70 insertions(+), 27 deletions(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 1c9f2c070153c..b166bf34b183f 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -38,12 +38,14 @@ impl Default for QuorumStoreBackPressureConfig { pub struct QuorumStoreConfig { pub channel_size: usize, pub proof_timeout_ms: usize, - pub batch_request_num_peers: usize, pub batch_generation_poll_interval_ms: usize, pub batch_generation_min_non_empty_interval_ms: usize, pub batch_generation_max_interval_ms: usize, pub max_batch_bytes: usize, - pub batch_request_timeout_ms: usize, + pub batch_request_num_peers: usize, + pub batch_request_retry_limit: usize, + pub batch_request_retry_interval_ms: usize, + pub batch_request_rpc_timeout_ms: usize, /// Used when setting up the expiration time for the batch initation. pub batch_expiry_gap_when_init_usecs: u64, pub memory_quota: usize, @@ -59,12 +61,14 @@ impl Default for QuorumStoreConfig { QuorumStoreConfig { channel_size: 1000, proof_timeout_ms: 10000, - batch_request_num_peers: 5, batch_generation_poll_interval_ms: 25, batch_generation_min_non_empty_interval_ms: 100, batch_generation_max_interval_ms: 250, max_batch_bytes: 4 * 1024 * 1024, - batch_request_timeout_ms: 10000, + batch_request_num_peers: 5, + batch_request_retry_limit: 10, + batch_request_retry_interval_ms: 1000, + batch_request_rpc_timeout_ms: 5000, batch_expiry_gap_when_init_usecs: Duration::from_secs(60).as_micros() as u64, memory_quota: 120_000_000, db_quota: 300_000_000, diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index 6b88deeae9cb3..482eaccbb65d6 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -11,33 +11,43 @@ use aptos_executor_types::*; use aptos_logger::prelude::*; use aptos_types::{transaction::SignedTransaction, PeerId}; use futures::{stream::FuturesUnordered, StreamExt}; +use rand::Rng; use std::time::Duration; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, time}; struct BatchRequesterState { signers: Vec, next_index: usize, ret_tx: oneshot::Sender, aptos_executor_types::Error>>, num_retries: usize, - max_num_retry: usize, + retry_limit: usize, } impl BatchRequesterState { fn new( signers: Vec, ret_tx: oneshot::Sender, aptos_executor_types::Error>>, + retry_limit: usize, ) -> Self { Self { signers, next_index: 0, ret_tx, num_retries: 0, - max_num_retry: 5, // TODO: get it from config. + retry_limit, } } fn next_request_peers(&mut self, num_peers: usize) -> Option> { - if self.num_retries < self.max_num_retry { + if self.num_retries == 0 { + let mut rng = rand::thread_rng(); + // make sure nodes request from the different set of nodes + self.next_index = rng.gen::() % self.signers.len(); + counters::SENT_BATCH_REQUEST_COUNT.inc_by(num_peers as u64); + } else { + counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc_by(num_peers as u64); + } + if self.num_retries < self.retry_limit { self.num_retries += 1; let ret = self .signers @@ -85,23 +95,29 @@ pub(crate) struct BatchRequester { epoch: u64, my_peer_id: PeerId, request_num_peers: usize, - request_timeout_ms: usize, + retry_limit: usize, + retry_interval_ms: usize, + rpc_timeout_ms: usize, network_sender: T, } -impl BatchRequester { +impl BatchRequester { pub(crate) fn new( epoch: u64, my_peer_id: PeerId, request_num_peers: usize, - request_timeout_ms: usize, + retry_limit: usize, + retry_interval_ms: usize, + rpc_timeout_ms: usize, network_sender: T, ) -> Self { Self { epoch, my_peer_id, request_num_peers, - request_timeout_ms, + retry_limit, + retry_interval_ms, + rpc_timeout_ms, network_sender, } } @@ -112,24 +128,43 @@ impl BatchRequester { signers: Vec, ret_tx: oneshot::Sender, Error>>, ) { - let mut request_state = BatchRequesterState::new(signers, ret_tx); + let mut request_state = BatchRequesterState::new(signers, ret_tx, self.retry_limit); let network_sender = self.network_sender.clone(); let request_num_peers = self.request_num_peers; let my_peer_id = self.my_peer_id; let epoch = self.epoch; - let timeout = Duration::from_millis(self.request_timeout_ms as u64); + let retry_interval = Duration::from_millis(self.retry_interval_ms as u64); + let rpc_timeout = Duration::from_millis(self.rpc_timeout_ms as u64); tokio::spawn(async move { - monitor!( - "batch_request", - while let Some(request_peers) = request_state.next_request_peers(request_num_peers) - { - let mut futures = FuturesUnordered::new(); - trace!("QS: requesting from {:?}", request_peers); - let request = BatchRequest::new(my_peer_id, epoch, digest); - for peer in request_peers { - counters::SENT_BATCH_REQUEST_COUNT.inc(); - futures.push(network_sender.request_batch(request.clone(), peer, timeout)); + monitor!("batch_request", { + let mut interval = time::interval(retry_interval); + let mut futures = FuturesUnordered::new(); + let request = BatchRequest::new(my_peer_id, epoch, digest); + loop { + tokio::select! { + _ = interval.tick() => { + // send batch request to a set of peers of size request_num_peers + if let Some(request_peers) = request_state.next_request_peers(request_num_peers) { + for peer in request_peers { + futures.push(network_sender.request_batch(request.clone(), peer, rpc_timeout)); + } + } else if futures.is_empty() { + // end the loop when the futures are drained + break; + } + } + Some(response) = futures.next() => { + if let Ok(batch) = response { + counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); + if batch.verify().is_ok() { + let digest = *batch.digest(); + let payload = batch.into_transactions(); + request_state.serve_request(digest, Some(payload)); + return; + } + } + }, } while let Some(response) = futures.next().await { match response { @@ -147,8 +182,8 @@ impl BatchRequester { } counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc(); } - ); - request_state.serve_request(digest, None); + request_state.serve_request(digest, None); + }) }); } } diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index 21a7e6da3efa6..5ed4d5c74b2f6 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -231,7 +231,9 @@ impl InnerBuilder { self.epoch, self.author, self.config.batch_request_num_peers, - self.config.batch_request_timeout_ms, + self.config.batch_request_retry_limit, + self.config.batch_request_retry_interval_ms, + self.config.batch_request_rpc_timeout_ms, self.network_sender.clone(), ); let batch_store = Arc::new(BatchStore::new( diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index 2d8fb1b2f24df..20d79a406cc64 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -31,6 +31,8 @@ fn batch_store_for_test_no_db(memory_quota: usize) -> Arc