-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[QuorumStore] Fix batch requester #7190
Changes from 14 commits
9045c25
9bec939
091cd87
55a7c71
0b5c6a8
0ca3da7
0558a37
63647cb
4ef7ef9
8b761db
d6187a9
a7fba40
e1c9869
084005d
925f4c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::{ | ||
monitor, | ||
network::QuorumStoreSender, | ||
quorum_store::{counters, types::BatchRequest}, | ||
}; | ||
|
@@ -10,33 +11,43 @@ use aptos_executor_types::*; | |
use aptos_logger::prelude::*; | ||
use aptos_types::{transaction::SignedTransaction, PeerId}; | ||
use futures::{stream::FuturesUnordered, StreamExt}; | ||
use rand::Rng; | ||
use std::time::Duration; | ||
use tokio::sync::oneshot; | ||
use tokio::{sync::oneshot, time}; | ||
|
||
struct BatchRequesterState { | ||
signers: Vec<PeerId>, | ||
next_index: usize, | ||
ret_tx: oneshot::Sender<Result<Vec<SignedTransaction>, aptos_executor_types::Error>>, | ||
num_retries: usize, | ||
max_num_retry: usize, | ||
retry_limit: usize, | ||
} | ||
|
||
impl BatchRequesterState { | ||
fn new( | ||
signers: Vec<PeerId>, | ||
ret_tx: oneshot::Sender<Result<Vec<SignedTransaction>, aptos_executor_types::Error>>, | ||
retry_limit: usize, | ||
) -> Self { | ||
Self { | ||
signers, | ||
next_index: 0, | ||
ret_tx, | ||
num_retries: 0, | ||
max_num_retry: 5, // TODO: get it from config. | ||
retry_limit, | ||
} | ||
} | ||
|
||
fn next_request_peers(&mut self, num_peers: usize) -> Option<Vec<PeerId>> { | ||
if self.num_retries < self.max_num_retry { | ||
if self.num_retries == 0 { | ||
let mut rng = rand::thread_rng(); | ||
// make sure nodes request from the different set of nodes | ||
self.next_index = rng.gen::<usize>() % self.signers.len(); | ||
counters::SENT_BATCH_REQUEST_COUNT.inc_by(num_peers as u64); | ||
} else { | ||
counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc_by(num_peers as u64); | ||
} | ||
if self.num_retries < self.retry_limit { | ||
self.num_retries += 1; | ||
let ret = self | ||
.signers | ||
|
@@ -84,23 +95,29 @@ pub(crate) struct BatchRequester<T> { | |
epoch: u64, | ||
my_peer_id: PeerId, | ||
request_num_peers: usize, | ||
request_timeout_ms: usize, | ||
retry_limit: usize, | ||
retry_interval_ms: usize, | ||
rpc_timeout_ms: usize, | ||
network_sender: T, | ||
} | ||
|
||
impl<T: QuorumStoreSender + 'static> BatchRequester<T> { | ||
impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> { | ||
pub(crate) fn new( | ||
epoch: u64, | ||
my_peer_id: PeerId, | ||
request_num_peers: usize, | ||
request_timeout_ms: usize, | ||
retry_limit: usize, | ||
retry_interval_ms: usize, | ||
rpc_timeout_ms: usize, | ||
network_sender: T, | ||
) -> Self { | ||
Self { | ||
epoch, | ||
my_peer_id, | ||
request_num_peers, | ||
request_timeout_ms, | ||
retry_limit, | ||
retry_interval_ms, | ||
rpc_timeout_ms, | ||
network_sender, | ||
} | ||
} | ||
|
@@ -111,39 +128,47 @@ impl<T: QuorumStoreSender + 'static> BatchRequester<T> { | |
signers: Vec<PeerId>, | ||
ret_tx: oneshot::Sender<Result<Vec<SignedTransaction>, Error>>, | ||
) { | ||
let mut request_state = BatchRequesterState::new(signers, ret_tx); | ||
let mut request_state = BatchRequesterState::new(signers, ret_tx, self.retry_limit); | ||
let network_sender = self.network_sender.clone(); | ||
let request_num_peers = self.request_num_peers; | ||
let my_peer_id = self.my_peer_id; | ||
let epoch = self.epoch; | ||
let timeout = Duration::from_millis(self.request_timeout_ms as u64); | ||
let retry_interval = Duration::from_millis(self.retry_interval_ms as u64); | ||
let rpc_timeout = Duration::from_millis(self.rpc_timeout_ms as u64); | ||
|
||
tokio::spawn(async move { | ||
while let Some(request_peers) = request_state.next_request_peers(request_num_peers) { | ||
monitor!("batch_request", { | ||
let mut interval = time::interval(retry_interval); | ||
let mut futures = FuturesUnordered::new(); | ||
trace!("QS: requesting from {:?}", request_peers); | ||
let request = BatchRequest::new(my_peer_id, epoch, digest); | ||
for peer in request_peers { | ||
counters::SENT_BATCH_REQUEST_COUNT.inc(); | ||
futures.push(network_sender.request_batch(request.clone(), peer, timeout)); | ||
} | ||
while let Some(response) = futures.next().await { | ||
match response { | ||
Ok(batch) => { | ||
counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); | ||
let digest = *batch.digest(); | ||
let payload = batch.into_transactions(); | ||
request_state.serve_request(digest, Some(payload)); | ||
return; | ||
}, | ||
Err(e) => { | ||
error!("Batch request failed: {}", e); | ||
loop { | ||
tokio::select! { | ||
_ = interval.tick() => { | ||
// send batch request to a set of peers of size request_num_peers | ||
if let Some(request_peers) = request_state.next_request_peers(request_num_peers) { | ||
for peer in request_peers { | ||
futures.push(network_sender.request_batch(request.clone(), peer, rpc_timeout)); | ||
} | ||
} else if futures.is_empty() { | ||
// end the loop when the futures are drained | ||
break; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, so if we break here, the last set of peers gets only 1 second to receive RPC responses? Instead of a break immediately, is the behavior we want to wait until the futures are drained? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point! Resolved. |
||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would this work?
Having another There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool! Thanks |
||
} | ||
Some(response) = futures.next() => { | ||
if let Ok(batch) = response { | ||
counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); | ||
if batch.verify().is_ok() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you need a rebase |
||
let digest = *batch.digest(); | ||
let payload = batch.into_transactions(); | ||
request_state.serve_request(digest, Some(payload)); | ||
return; | ||
} | ||
} | ||
}, | ||
} | ||
} | ||
counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc(); | ||
} | ||
request_state.serve_request(digest, None); | ||
request_state.serve_request(digest, None); | ||
}) | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are peers shuffled or sorted? If sorted, can it be a problem that we are using a consecutive range?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's shuffled
aptos-core/consensus/src/quorum_store/batch_store.rs
Line 469 in 510ffa0