Skip to content

Commit

Permalink
[quorum-store] subscribe to DirectSend batch in batch requester (#14035)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun authored Jul 18, 2024
1 parent aa5234f commit 9a58d3a
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 3 deletions.
16 changes: 15 additions & 1 deletion consensus/src/quorum_store/batch_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
network::QuorumStoreSender,
quorum_store::{
counters,
types::{BatchRequest, BatchResponse},
types::{BatchRequest, BatchResponse, PersistedValue},
},
};
use aptos_consensus_types::proof_of_store::{BatchInfo, ProofOfStore};
Expand Down Expand Up @@ -132,6 +132,7 @@ impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
&self,
proof: ProofOfStore,
ret_tx: oneshot::Sender<ExecutorResult<Vec<SignedTransaction>>>,
mut subscriber_rx: oneshot::Receiver<PersistedValue>,
) -> Option<(BatchInfo, Vec<SignedTransaction>)> {
let digest = *proof.digest();
let expiration = proof.expiration();
Expand Down Expand Up @@ -191,6 +192,19 @@ impl<T: QuorumStoreSender + Sync + 'static> BatchRequester<T> {
}
}
},
result = &mut subscriber_rx => {
match result {
Ok(persisted_value) => {
counters::RECEIVED_BATCH_FROM_SUBSCRIPTION_COUNT.inc();
let (info, maybe_payload) = persisted_value.unpack();
request_state.serve_request(*info.digest(), maybe_payload);
return None;
}
Err(err) => {
debug!("channel closed: {}", err);
}
};
},
}
}
counters::RECEIVED_BATCH_REQUEST_TIMEOUT_COUNT.inc();
Expand Down
36 changes: 34 additions & 2 deletions consensus/src/quorum_store/batch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub struct BatchStore {
db_quota: usize,
batch_quota: usize,
validator_signer: ValidatorSigner,
persist_subscribers: DashMap<HashValue, Vec<oneshot::Sender<PersistedValue>>>,
}

impl BatchStore {
Expand All @@ -140,6 +141,7 @@ impl BatchStore {
db_quota,
batch_quota,
validator_signer,
persist_subscribers: DashMap::new(),
};
let db_content = db_clone
.get_all_batches()
Expand Down Expand Up @@ -284,6 +286,7 @@ impl BatchStore {
// digest with a higher expiration would update the persisted value and
// effectively extend the expiration.
if entry.get().expiration() <= certified_time {
self.persist_subscribers.remove(entry.get().digest());
Some(entry.remove())
} else {
None
Expand Down Expand Up @@ -371,13 +374,39 @@ impl BatchStore {
Err(ExecutorError::CouldNotGetData)
}
}

/// This calls lets the caller subscribe to a batch being added to the batch store.
/// This can be useful in cases where there are multiple flows to add a batch (like
/// direct from author batch / batch requester fetch) to the batch store and either
/// flow needs to subscribe to the other.
fn subscribe(&self, digest: HashValue) -> oneshot::Receiver<PersistedValue> {
let (tx, rx) = oneshot::channel();
self.persist_subscribers.entry(digest).or_default().push(tx);

// This is to account for the race where this subscribe call happens after the
// persist call.
if let Ok(value) = self.get_batch_from_local(&digest) {
self.notify_subscribers(value)
}

rx
}

fn notify_subscribers(&self, value: PersistedValue) {
if let Some((_, subscribers)) = self.persist_subscribers.remove(value.digest()) {
for subscriber in subscribers {
subscriber.send(value.clone()).ok();
}
}
}
}

impl BatchWriter for BatchStore {
fn persist(&self, persist_requests: Vec<PersistedValue>) -> Vec<SignedBatchInfo> {
let mut signed_infos = vec![];
for persist_request in persist_requests.into_iter() {
if let Some(signed_info) = self.persist_inner(persist_request) {
if let Some(signed_info) = self.persist_inner(persist_request.clone()) {
self.notify_subscribers(persist_request);
signed_infos.push(signed_info);
}
}
Expand Down Expand Up @@ -440,7 +469,10 @@ impl<T: QuorumStoreSender + Clone + Send + Sync + 'static> BatchReader for Batch
} else {
// Quorum store metrics
counters::MISSED_BATCHES_COUNT.inc();
if let Some((batch_info, payload)) = batch_requester.request_batch(proof, tx).await
let subscriber_rx = batch_store.subscribe(*proof.digest());
if let Some((batch_info, payload)) = batch_requester
.request_batch(proof, tx, subscriber_rx)
.await
{
batch_store.persist(vec![PersistedValue::new(batch_info, Some(payload))]);
}
Expand Down
8 changes: 8 additions & 0 deletions consensus/src/quorum_store/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,14 @@ pub static RECEIVED_BATCH_RESPONSE_ERROR_COUNT: Lazy<IntCounter> = Lazy::new(||
.unwrap()
});

pub static RECEIVED_BATCH_FROM_SUBSCRIPTION_COUNT: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"quorum_store_batch_from_subscription_count",
"Count of the number of batches received via batch store subscription."
)
.unwrap()
});

pub static QS_BACKPRESSURE_TXN_COUNT: Lazy<Histogram> = Lazy::new(|| {
register_avg_counter(
"quorum_store_backpressure_txn_count",
Expand Down
7 changes: 7 additions & 0 deletions consensus/src/quorum_store/tests/batch_requester_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use aptos_types::{
};
use move_core_types::account_address::AccountAddress;
use std::time::{Duration, Instant};
use tokio::sync::oneshot;

#[derive(Clone)]
struct MockBatchRequester {
Expand Down Expand Up @@ -99,13 +100,15 @@ async fn test_batch_request_exists() {
ValidatorVerifier::new_single(validator_signer.author(), validator_signer.public_key()),
);

let (_, subscriber_rx) = oneshot::channel();
let result = batch_requester
.request_batch(
ProofOfStore::new(
batch.batch_info().clone(),
AggregateSignature::new(vec![u8::MAX].into(), None),
),
tx,
subscriber_rx,
)
.await;
assert!(result.is_some());
Expand Down Expand Up @@ -194,13 +197,15 @@ async fn test_batch_request_not_exists_not_expired() {
);

let request_start = Instant::now();
let (_, subscriber_rx) = oneshot::channel();
let result = batch_requester
.request_batch(
ProofOfStore::new(
batch.batch_info().clone(),
AggregateSignature::new(vec![u8::MAX].into(), None),
),
tx,
subscriber_rx,
)
.await;
let request_duration = request_start.elapsed();
Expand Down Expand Up @@ -241,13 +246,15 @@ async fn test_batch_request_not_exists_expired() {
);

let request_start = Instant::now();
let (_, subscriber_rx) = oneshot::channel();
let result = batch_requester
.request_batch(
ProofOfStore::new(
batch.batch_info().clone(),
AggregateSignature::new(vec![u8::MAX].into(), None),
),
tx,
subscriber_rx,
)
.await;
let request_duration = request_start.elapsed();
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/quorum_store/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ impl PersistedValue {
}
vec![]
}

pub fn unpack(self) -> (BatchInfo, Option<Vec<SignedTransaction>>) {
(self.info, self.maybe_payload)
}
}

impl Deref for PersistedValue {
Expand Down

0 comments on commit 9a58d3a

Please sign in to comment.