Skip to content

Commit

Permalink
[QuorumStore] Fix batch requester (#7190)
Browse files Browse the repository at this point in the history
* fix batch request counter, add retry limit to config

* new batch requester
  • Loading branch information
danielxiangzl authored Mar 17, 2023
1 parent bebaeec commit faa75d2
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 35 deletions.
12 changes: 8 additions & 4 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,14 @@ impl Default for QuorumStoreBackPressureConfig {
pub struct QuorumStoreConfig {
pub channel_size: usize,
pub proof_timeout_ms: usize,
pub batch_request_num_peers: usize,
pub batch_generation_poll_interval_ms: usize,
pub batch_generation_min_non_empty_interval_ms: usize,
pub batch_generation_max_interval_ms: usize,
pub max_batch_bytes: usize,
pub batch_request_timeout_ms: usize,
pub batch_request_num_peers: usize,
pub batch_request_retry_limit: usize,
pub batch_request_retry_interval_ms: usize,
pub batch_request_rpc_timeout_ms: usize,
/// Used when setting up the expiration time for the batch initation.
pub batch_expiry_gap_when_init_usecs: u64,
pub memory_quota: usize,
Expand All @@ -59,12 +61,14 @@ impl Default for QuorumStoreConfig {
QuorumStoreConfig {
channel_size: 1000,
proof_timeout_ms: 10000,
batch_request_num_peers: 2,
batch_generation_poll_interval_ms: 25,
batch_generation_min_non_empty_interval_ms: 100,
batch_generation_max_interval_ms: 250,
max_batch_bytes: 4 * 1024 * 1024,
batch_request_timeout_ms: 10000,
batch_request_num_peers: 3,
batch_request_retry_limit: 10,
batch_request_retry_interval_ms: 1000,
batch_request_rpc_timeout_ms: 5000,
batch_expiry_gap_when_init_usecs: Duration::from_secs(60).as_micros() as u64,
memory_quota: 120_000_000,
db_quota: 300_000_000,
Expand Down
85 changes: 55 additions & 30 deletions consensus/src/quorum_store/batch_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
monitor,
network::QuorumStoreSender,
quorum_store::{counters, types::BatchRequest},
};
Expand All @@ -10,33 +11,43 @@ use aptos_executor_types::*;
use aptos_logger::prelude::*;
use aptos_types::{transaction::SignedTransaction, PeerId};
use futures::{stream::FuturesUnordered, StreamExt};
use rand::Rng;
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::{sync::oneshot, time};

struct BatchRequesterState {
signers: Vec<PeerId>,
next_index: usize,
ret_tx: oneshot::Sender<Result<Vec<SignedTransaction>, aptos_executor_types::Error>>,
num_retries: usize,
max_num_retry: usize,
retry_limit: usize,
}

impl BatchRequesterState {
fn new(
signers: Vec<PeerId>,
ret_tx: oneshot::Sender<Result<Vec<SignedTransaction>, aptos_executor_types::Error>>,
retry_limit: usize,
) -> Self {
Self {
signers,
next_index: 0,
ret_tx,
num_retries: 0,
max_num_retry: 5, // TODO: get it from config.
retry_limit,
}
}

fn next_request_peers(&mut self, num_peers: usize) -> Option<Vec<PeerId>> {
if self.num_retries < self.max_num_retry {
if self.num_retries == 0 {
let mut rng = rand::thread_rng();
// make sure nodes request from the different set of nodes
self.next_index = rng.gen::<usize>() % self.signers.len();
counters::SENT_BATCH_REQUEST_COUNT.inc_by(num_peers as u64);
} else {
counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc_by(num_peers as u64);
}
if self.num_retries < self.retry_limit {
self.num_retries += 1;
let ret = self
.signers
Expand Down Expand Up @@ -84,23 +95,29 @@ pub(crate) struct BatchRequester<T> {
epoch: u64,
my_peer_id: PeerId,
request_num_peers: usize,
request_timeout_ms: usize,
retry_limit: usize,
retry_interval_ms: usize,
rpc_timeout_ms: usize,
network_sender: T,
}

impl<T: QuorumStoreSender + 'static> BatchRequester<T> {
impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
pub(crate) fn new(
epoch: u64,
my_peer_id: PeerId,
request_num_peers: usize,
request_timeout_ms: usize,
retry_limit: usize,
retry_interval_ms: usize,
rpc_timeout_ms: usize,
network_sender: T,
) -> Self {
Self {
epoch,
my_peer_id,
request_num_peers,
request_timeout_ms,
retry_limit,
retry_interval_ms,
rpc_timeout_ms,
network_sender,
}
}
Expand All @@ -111,39 +128,47 @@ impl<T: QuorumStoreSender + 'static> BatchRequester<T> {
signers: Vec<PeerId>,
ret_tx: oneshot::Sender<Result<Vec<SignedTransaction>, Error>>,
) {
let mut request_state = BatchRequesterState::new(signers, ret_tx);
let mut request_state = BatchRequesterState::new(signers, ret_tx, self.retry_limit);
let network_sender = self.network_sender.clone();
let request_num_peers = self.request_num_peers;
let my_peer_id = self.my_peer_id;
let epoch = self.epoch;
let timeout = Duration::from_millis(self.request_timeout_ms as u64);
let retry_interval = Duration::from_millis(self.retry_interval_ms as u64);
let rpc_timeout = Duration::from_millis(self.rpc_timeout_ms as u64);

tokio::spawn(async move {
while let Some(request_peers) = request_state.next_request_peers(request_num_peers) {
monitor!("batch_request", {
let mut interval = time::interval(retry_interval);
let mut futures = FuturesUnordered::new();
trace!("QS: requesting from {:?}", request_peers);
let request = BatchRequest::new(my_peer_id, epoch, digest);
for peer in request_peers {
counters::SENT_BATCH_REQUEST_COUNT.inc();
futures.push(network_sender.request_batch(request.clone(), peer, timeout));
}
while let Some(response) = futures.next().await {
match response {
Ok(batch) => {
counters::RECEIVED_BATCH_RESPONSE_COUNT.inc();
let digest = *batch.digest();
let payload = batch.into_transactions();
request_state.serve_request(digest, Some(payload));
return;
},
Err(e) => {
error!("Batch request failed: {}", e);
loop {
tokio::select! {
_ = interval.tick() => {
// send batch request to a set of peers of size request_num_peers
if let Some(request_peers) = request_state.next_request_peers(request_num_peers) {
for peer in request_peers {
futures.push(network_sender.request_batch(request.clone(), peer, rpc_timeout));
}
} else if futures.is_empty() {
// end the loop when the futures are drained
break;
}
}
Some(response) = futures.next() => {
if let Ok(batch) = response {
counters::RECEIVED_BATCH_RESPONSE_COUNT.inc();
if batch.verify().is_ok() {
let digest = *batch.digest();
let payload = batch.into_transactions();
request_state.serve_request(digest, Some(payload));
return;
}
}
},
}
}
counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc();
}
request_state.serve_request(digest, None);
request_state.serve_request(digest, None);
})
});
}
}
4 changes: 3 additions & 1 deletion consensus/src/quorum_store/quorum_store_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,9 @@ impl InnerBuilder {
self.epoch,
self.author,
self.config.batch_request_num_peers,
self.config.batch_request_timeout_ms,
self.config.batch_request_retry_limit,
self.config.batch_request_retry_interval_ms,
self.config.batch_request_rpc_timeout_ms,
self.network_sender.clone(),
);
let batch_store = Arc::new(BatchStore::new(
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/quorum_store/tests/batch_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ fn batch_store_for_test_no_db(memory_quota: usize) -> Arc<BatchStore<MockQuorumS
AccountAddress::random(),
1,
1,
1,
1,
MockQuorumStoreSender::new(tx),
);
let (signers, validator_verifier) = random_validator_verifier(4, None, false);
Expand Down

0 comments on commit faa75d2

Please sign in to comment.