From 9045c2503c25d6bd405f88f285dd61ae1e2720c3 Mon Sep 17 00:00:00 2001 From: danielxiangzl Date: Wed, 15 Mar 2023 16:44:22 -0700 Subject: [PATCH 1/7] fix batch request counter, add retry limit to config --- config/src/config/quorum_store_config.rs | 4 +++- consensus/src/quorum_store/batch_requester.rs | 15 +++++++++++---- .../src/quorum_store/quorum_store_builder.rs | 1 + .../src/quorum_store/tests/batch_store_test.rs | 1 + 4 files changed, 16 insertions(+), 5 deletions(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index c38fc331e351b..9fb945ad4e0e9 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -39,6 +39,7 @@ pub struct QuorumStoreConfig { pub channel_size: usize, pub proof_timeout_ms: usize, pub batch_request_num_peers: usize, + pub batch_request_num_retry: usize, pub batch_generation_poll_interval_ms: usize, pub batch_generation_min_non_empty_interval_ms: usize, pub batch_generation_max_interval_ms: usize, @@ -68,7 +69,8 @@ impl Default for QuorumStoreConfig { QuorumStoreConfig { channel_size: 1000, proof_timeout_ms: 10000, - batch_request_num_peers: 2, + batch_request_num_peers: 5, + batch_request_num_retry: 5, batch_generation_poll_interval_ms: 25, batch_generation_min_non_empty_interval_ms: 100, batch_generation_max_interval_ms: 250, diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index 5fe0df35a8173..8f9174095a43c 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -25,17 +25,23 @@ impl BatchRequesterState { fn new( signers: Vec, ret_tx: oneshot::Sender, aptos_executor_types::Error>>, + max_num_retry: usize, ) -> Self { Self { signers, next_index: 0, ret_tx, num_retries: 0, - max_num_retry: 5, // TODO: get it from config. + max_num_retry, } } fn next_request_peers(&mut self, num_peers: usize) -> Option> { + if self.num_retries == 0 { + counters::SENT_BATCH_REQUEST_COUNT.inc_by(num_peers as u64); + } else { + counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc_by(num_peers as u64); + } if self.num_retries < self.max_num_retry { self.num_retries += 1; let ret = self @@ -84,6 +90,7 @@ pub(crate) struct BatchRequester { epoch: u64, my_peer_id: PeerId, request_num_peers: usize, + max_num_retry: usize, request_timeout_ms: usize, network_sender: T, } @@ -94,6 +101,7 @@ impl BatchRequester { my_peer_id: PeerId, request_num_peers: usize, request_timeout_ms: usize, + max_num_retry: usize, network_sender: T, ) -> Self { Self { @@ -101,6 +109,7 @@ impl BatchRequester { my_peer_id, request_num_peers, request_timeout_ms, + max_num_retry, network_sender, } } @@ -111,7 +120,7 @@ impl BatchRequester { signers: Vec, ret_tx: oneshot::Sender, Error>>, ) { - let mut request_state = BatchRequesterState::new(signers, ret_tx); + let mut request_state = BatchRequesterState::new(signers, ret_tx, self.max_num_retry); let network_sender = self.network_sender.clone(); let request_num_peers = self.request_num_peers; let my_peer_id = self.my_peer_id; @@ -124,7 +133,6 @@ impl BatchRequester { trace!("QS: requesting from {:?}", request_peers); let request = BatchRequest::new(my_peer_id, epoch, digest); for peer in request_peers { - counters::SENT_BATCH_REQUEST_COUNT.inc(); futures.push(network_sender.request_batch(request.clone(), peer, timeout)); } while let Some(response) = futures.next().await { @@ -138,7 +146,6 @@ impl BatchRequester { } } } - counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc(); } request_state.serve_request(digest, None); }); diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index a8d2b59a3722e..bb93afa86745d 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -244,6 +244,7 @@ impl InnerBuilder { self.epoch, self.author, self.config.batch_request_num_peers, + self.config.batch_request_num_retry, self.config.batch_request_timeout_ms, self.network_sender.clone(), ); diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index eea9a80881c28..e8b7df5aac6a4 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -29,6 +29,7 @@ fn batch_store_for_test_no_db(memory_quota: usize) -> Arc Date: Thu, 16 Mar 2023 00:20:48 -0700 Subject: [PATCH 2/7] new batch requester --- config/src/config/quorum_store_config.rs | 14 ++-- consensus/src/quorum_store/batch_requester.rs | 75 ++++++++++++------- .../src/quorum_store/quorum_store_builder.rs | 5 +- .../quorum_store/tests/batch_store_test.rs | 1 + 4 files changed, 58 insertions(+), 37 deletions(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 9fb945ad4e0e9..bab511dd4728d 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -38,14 +38,15 @@ impl Default for QuorumStoreBackPressureConfig { pub struct QuorumStoreConfig { pub channel_size: usize, pub proof_timeout_ms: usize, - pub batch_request_num_peers: usize, - pub batch_request_num_retry: usize, pub batch_generation_poll_interval_ms: usize, pub batch_generation_min_non_empty_interval_ms: usize, pub batch_generation_max_interval_ms: usize, pub end_batch_ms: u64, pub max_batch_bytes: usize, - pub batch_request_timeout_ms: usize, + pub batch_request_num_peers: usize, + pub batch_request_retry_limit: usize, + pub batch_request_retry_timeout_ms: usize, + pub batch_request_rpc_timeout_ms: usize, /// Used when setting up the expiration time for the batch initation. pub batch_expiry_round_gap_when_init: Round, /// Batches may have expiry set for batch_expiry_rounds_gap rounds after the @@ -69,15 +70,16 @@ impl Default for QuorumStoreConfig { QuorumStoreConfig { channel_size: 1000, proof_timeout_ms: 10000, - batch_request_num_peers: 5, - batch_request_num_retry: 5, batch_generation_poll_interval_ms: 25, batch_generation_min_non_empty_interval_ms: 100, batch_generation_max_interval_ms: 250, // TODO: This essentially turns fragments off, because there was performance degradation. Needs more investigation. end_batch_ms: 10, max_batch_bytes: 4 * 1024 * 1024, - batch_request_timeout_ms: 10000, + batch_request_num_peers: 3, + batch_request_retry_limit: 10, + batch_request_retry_timeout_ms: 1000, + batch_request_rpc_timeout_ms: 5000, batch_expiry_round_gap_when_init: 100, batch_expiry_round_gap_behind_latest_certified: 500, batch_expiry_round_gap_beyond_latest_certified: 500, diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index 8f9174095a43c..f80b35eeb0560 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ + monitor, network::QuorumStoreSender, quorum_store::{counters, types::BatchRequest}, }; @@ -11,28 +12,28 @@ use aptos_logger::prelude::*; use aptos_types::{transaction::SignedTransaction, PeerId}; use futures::{stream::FuturesUnordered, StreamExt}; use std::time::Duration; -use tokio::sync::oneshot; +use tokio::{sync::oneshot, time}; struct BatchRequesterState { signers: Vec, next_index: usize, ret_tx: oneshot::Sender, aptos_executor_types::Error>>, num_retries: usize, - max_num_retry: usize, + retry_limit: usize, } impl BatchRequesterState { fn new( signers: Vec, ret_tx: oneshot::Sender, aptos_executor_types::Error>>, - max_num_retry: usize, + retry_limit: usize, ) -> Self { Self { signers, next_index: 0, ret_tx, num_retries: 0, - max_num_retry, + retry_limit, } } @@ -42,7 +43,7 @@ impl BatchRequesterState { } else { counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc_by(num_peers as u64); } - if self.num_retries < self.max_num_retry { + if self.num_retries < self.retry_limit { self.num_retries += 1; let ret = self .signers @@ -90,26 +91,29 @@ pub(crate) struct BatchRequester { epoch: u64, my_peer_id: PeerId, request_num_peers: usize, - max_num_retry: usize, - request_timeout_ms: usize, + retry_limit: usize, + retry_timeout_ms: usize, + rpc_timeout_ms: usize, network_sender: T, } -impl BatchRequester { +impl BatchRequester { pub(crate) fn new( epoch: u64, my_peer_id: PeerId, request_num_peers: usize, - request_timeout_ms: usize, - max_num_retry: usize, + retry_limit: usize, + retry_timeout_ms: usize, + rpc_timeout_ms: usize, network_sender: T, ) -> Self { Self { epoch, my_peer_id, request_num_peers, - request_timeout_ms, - max_num_retry, + retry_limit, + retry_timeout_ms, + rpc_timeout_ms, network_sender, } } @@ -120,34 +124,47 @@ impl BatchRequester { signers: Vec, ret_tx: oneshot::Sender, Error>>, ) { - let mut request_state = BatchRequesterState::new(signers, ret_tx, self.max_num_retry); + let mut request_state = BatchRequesterState::new(signers, ret_tx, self.retry_limit); let network_sender = self.network_sender.clone(); let request_num_peers = self.request_num_peers; let my_peer_id = self.my_peer_id; let epoch = self.epoch; - let timeout = Duration::from_millis(self.request_timeout_ms as u64); + let retry_timeout = Duration::from_millis(self.retry_timeout_ms as u64); + let rpc_timeout = Duration::from_millis(self.rpc_timeout_ms as u64); tokio::spawn(async move { - while let Some(request_peers) = request_state.next_request_peers(request_num_peers) { + monitor!("batch_request", { + let mut interval = time::interval(retry_timeout); let mut futures = FuturesUnordered::new(); - trace!("QS: requesting from {:?}", request_peers); let request = BatchRequest::new(my_peer_id, epoch, digest); - for peer in request_peers { - futures.push(network_sender.request_batch(request.clone(), peer, timeout)); - } - while let Some(response) = futures.next().await { - if let Ok(batch) = response { - counters::RECEIVED_BATCH_COUNT.inc(); - if batch.verify().is_ok() { - let digest = batch.digest(); - let payload = batch.into_payload(); - request_state.serve_request(digest, Some(payload)); - return; + loop { + tokio::select! { + _ = interval.tick() => { + // send batch request to a set of peers of size request_num_peers + if let Some(request_peers) = request_state.next_request_peers(request_num_peers) { + for peer in request_peers { + futures.push(network_sender.request_batch(request.clone(), peer, rpc_timeout)); + } + } else { + // retry_limit exceeds, fail to get batch from peers + break; + } } + Some(response) = futures.next() => { + if let Ok(batch) = response { + counters::RECEIVED_BATCH_COUNT.inc(); + if batch.verify().is_ok() { + let digest = batch.digest(); + let payload = batch.into_payload(); + request_state.serve_request(digest, Some(payload)); + return; + } + } + }, } } - } - request_state.serve_request(digest, None); + request_state.serve_request(digest, None); + }) }); } } diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index bb93afa86745d..2ba52fa0a3e5e 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -244,8 +244,9 @@ impl InnerBuilder { self.epoch, self.author, self.config.batch_request_num_peers, - self.config.batch_request_num_retry, - self.config.batch_request_timeout_ms, + self.config.batch_request_retry_limit, + self.config.batch_request_retry_timeout_ms, + self.config.batch_request_rpc_timeout_ms, self.network_sender.clone(), ); let batch_store = Arc::new(BatchStore::new( diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index e8b7df5aac6a4..9b15c8b31043f 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -30,6 +30,7 @@ fn batch_store_for_test_no_db(memory_quota: usize) -> Arc Date: Thu, 16 Mar 2023 11:02:47 -0700 Subject: [PATCH 3/7] resolve comment --- consensus/src/quorum_store/batch_requester.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index f80b35eeb0560..592ac448a4265 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -152,10 +152,10 @@ impl BatchRequester { } Some(response) = futures.next() => { if let Ok(batch) = response { - counters::RECEIVED_BATCH_COUNT.inc(); + counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); if batch.verify().is_ok() { - let digest = batch.digest(); - let payload = batch.into_payload(); + let digest = *batch.digest(); + let payload = batch.into_transactions(); request_state.serve_request(digest, Some(payload)); return; } From 0b5c6a8dd84e09415e57f6c539bc658ff988165f Mon Sep 17 00:00:00 2001 From: danielxiangzl Date: Thu, 16 Mar 2023 11:09:49 -0700 Subject: [PATCH 4/7] resolve comments --- consensus/src/quorum_store/batch_requester.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index 592ac448a4265..b4e83e73a6fb2 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -145,9 +145,6 @@ impl BatchRequester { for peer in request_peers { futures.push(network_sender.request_batch(request.clone(), peer, rpc_timeout)); } - } else { - // retry_limit exceeds, fail to get batch from peers - break; } } Some(response) = futures.next() => { @@ -161,6 +158,10 @@ impl BatchRequester { } } }, + else => { + // end the batch requester when the futures are drained + break; + } } } request_state.serve_request(digest, None); From 0558a379c01ffb3140dcbe83cc040e88f69517ce Mon Sep 17 00:00:00 2001 From: danielxiangzl Date: Thu, 16 Mar 2023 11:31:19 -0700 Subject: [PATCH 5/7] request from random initial set --- consensus/src/quorum_store/batch_requester.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index b4e83e73a6fb2..06488d25d7ddb 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -11,6 +11,7 @@ use aptos_executor_types::*; use aptos_logger::prelude::*; use aptos_types::{transaction::SignedTransaction, PeerId}; use futures::{stream::FuturesUnordered, StreamExt}; +use rand::Rng; use std::time::Duration; use tokio::{sync::oneshot, time}; @@ -39,6 +40,9 @@ impl BatchRequesterState { fn next_request_peers(&mut self, num_peers: usize) -> Option> { if self.num_retries == 0 { + let mut rng = rand::thread_rng(); + // make sure nodes request from the different set of nodes + self.next_index = rng.gen::() % self.signers.len(); counters::SENT_BATCH_REQUEST_COUNT.inc_by(num_peers as u64); } else { counters::SENT_BATCH_REQUEST_RETRY_COUNT.inc_by(num_peers as u64); From 8b761dbb0b29857b9f28f87ee4d52f915dd4a640 Mon Sep 17 00:00:00 2001 From: danielxiangzl Date: Thu, 16 Mar 2023 12:58:41 -0700 Subject: [PATCH 6/7] fix --- consensus/src/quorum_store/batch_requester.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index 06488d25d7ddb..e1a29bdcd39dd 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -135,6 +135,7 @@ impl BatchRequester { let epoch = self.epoch; let retry_timeout = Duration::from_millis(self.retry_timeout_ms as u64); let rpc_timeout = Duration::from_millis(self.rpc_timeout_ms as u64); + let mut num_futures = request_num_peers * self.retry_limit; tokio::spawn(async move { monitor!("batch_request", { @@ -152,6 +153,7 @@ impl BatchRequester { } } Some(response) = futures.next() => { + num_futures -= 1; if let Ok(batch) = response { counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); if batch.verify().is_ok() { @@ -161,11 +163,11 @@ impl BatchRequester { return; } } + if num_futures == 0 { + // end the batch requester when the futures are drained + break; + } }, - else => { - // end the batch requester when the futures are drained - break; - } } } request_state.serve_request(digest, None); From e1c986936952215d7db031bec8d50db77aba5357 Mon Sep 17 00:00:00 2001 From: danielxiangzl Date: Thu, 16 Mar 2023 16:34:23 -0700 Subject: [PATCH 7/7] resolve comments --- config/src/config/quorum_store_config.rs | 4 ++-- consensus/src/quorum_store/batch_requester.rs | 19 ++++++++----------- .../src/quorum_store/quorum_store_builder.rs | 2 +- 3 files changed, 11 insertions(+), 14 deletions(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index 0820efd88efc7..d2f00c74b3977 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -44,7 +44,7 @@ pub struct QuorumStoreConfig { pub max_batch_bytes: usize, pub batch_request_num_peers: usize, pub batch_request_retry_limit: usize, - pub batch_request_retry_timeout_ms: usize, + pub batch_request_retry_interval_ms: usize, pub batch_request_rpc_timeout_ms: usize, /// Used when setting up the expiration time for the batch initation. pub batch_expiry_round_gap_when_init: Round, @@ -75,7 +75,7 @@ impl Default for QuorumStoreConfig { max_batch_bytes: 4 * 1024 * 1024, batch_request_num_peers: 3, batch_request_retry_limit: 10, - batch_request_retry_timeout_ms: 1000, + batch_request_retry_interval_ms: 1000, batch_request_rpc_timeout_ms: 5000, batch_expiry_round_gap_when_init: 100, batch_expiry_round_gap_behind_latest_certified: 500, diff --git a/consensus/src/quorum_store/batch_requester.rs b/consensus/src/quorum_store/batch_requester.rs index e1a29bdcd39dd..41cbd37d0ca90 100644 --- a/consensus/src/quorum_store/batch_requester.rs +++ b/consensus/src/quorum_store/batch_requester.rs @@ -96,7 +96,7 @@ pub(crate) struct BatchRequester { my_peer_id: PeerId, request_num_peers: usize, retry_limit: usize, - retry_timeout_ms: usize, + retry_interval_ms: usize, rpc_timeout_ms: usize, network_sender: T, } @@ -107,7 +107,7 @@ impl BatchRequester { my_peer_id: PeerId, request_num_peers: usize, retry_limit: usize, - retry_timeout_ms: usize, + retry_interval_ms: usize, rpc_timeout_ms: usize, network_sender: T, ) -> Self { @@ -116,7 +116,7 @@ impl BatchRequester { my_peer_id, request_num_peers, retry_limit, - retry_timeout_ms, + retry_interval_ms, rpc_timeout_ms, network_sender, } @@ -133,13 +133,12 @@ impl BatchRequester { let request_num_peers = self.request_num_peers; let my_peer_id = self.my_peer_id; let epoch = self.epoch; - let retry_timeout = Duration::from_millis(self.retry_timeout_ms as u64); + let retry_interval = Duration::from_millis(self.retry_interval_ms as u64); let rpc_timeout = Duration::from_millis(self.rpc_timeout_ms as u64); - let mut num_futures = request_num_peers * self.retry_limit; tokio::spawn(async move { monitor!("batch_request", { - let mut interval = time::interval(retry_timeout); + let mut interval = time::interval(retry_interval); let mut futures = FuturesUnordered::new(); let request = BatchRequest::new(my_peer_id, epoch, digest); loop { @@ -150,10 +149,12 @@ impl BatchRequester { for peer in request_peers { futures.push(network_sender.request_batch(request.clone(), peer, rpc_timeout)); } + } else if futures.is_empty() { + // end the loop when the futures are drained + break; } } Some(response) = futures.next() => { - num_futures -= 1; if let Ok(batch) = response { counters::RECEIVED_BATCH_RESPONSE_COUNT.inc(); if batch.verify().is_ok() { @@ -163,10 +164,6 @@ impl BatchRequester { return; } } - if num_futures == 0 { - // end the batch requester when the futures are drained - break; - } }, } } diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index 5fd44fa17c96a..a3c4a16fbf7df 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -244,7 +244,7 @@ impl InnerBuilder { self.author, self.config.batch_request_num_peers, self.config.batch_request_retry_limit, - self.config.batch_request_retry_timeout_ms, + self.config.batch_request_retry_interval_ms, self.config.batch_request_rpc_timeout_ms, self.network_sender.clone(), );