Skip to content

Commit

Permalink
[fix] block_waiter serve blocks with batches of similar digest ids (M…
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis authored and huitseeker committed Aug 16, 2022
1 parent fc0e1f4 commit 3a2da22
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 54 deletions.
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions primary/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ tokio-util = { version = "0.7.3", features = ["codec"] }
tonic = "0.7.2"
tower = "0.4.13"
tracing = "0.1.36"
tap = "1.0.1"

consensus = { path = "../consensus" }
crypto = { path = "../crypto" }
Expand Down
113 changes: 69 additions & 44 deletions primary/src/block_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ use std::{
fmt,
fmt::Formatter,
sync::Arc,
time::{Duration, SystemTime, UNIX_EPOCH},
time::Duration,
};
use tap::TapFallible;
use tokio::{
sync::{oneshot, watch},
task::JoinHandle,
time::timeout,
};
use tracing::{debug, error, info, instrument, warn};
use tracing::{debug, error, info, instrument, trace, warn};
use types::{
metered_channel::Receiver, BatchDigest, BatchMessage, BlockError, BlockErrorKind, BlockResult,
Certificate, CertificateDigest, Header, PrimaryWorkerMessage, ReconfigureNotification,
Expand Down Expand Up @@ -227,10 +228,13 @@ pub struct BlockWaiter<SynchronizerHandler: Handler + Send + Sync + 'static> {
rx_batch_receiver: Receiver<BatchResult>,

/// Maps batch ids to channels that "listen" for arrived batch messages.
/// On the key we hold the batch id (we assume it's globally unique).
/// On the value we hold a tuple of the channel to communicate the result
/// to and also a timestamp of when the request was sent.
tx_pending_batch: HashMap<BatchDigest, (oneshot::Sender<BatchResult>, u128)>,
/// On the key we hold the batch id.
/// On the value we hold a map of CertificateDigest --> oneshot::Sender
/// as we might need to deliver the batch result for requests from multiple
/// certificates (although not really probable its still possible for batches of
/// same id to be included in multiple headers).
tx_pending_batch:
HashMap<BatchDigest, HashMap<CertificateDigest, oneshot::Sender<BatchResult>>>,

/// A map that holds the channels we should notify with the
/// GetBlock responses.
Expand Down Expand Up @@ -592,10 +596,12 @@ impl<SynchronizerHandler: Handler + Send + Sync + 'static> BlockWaiter<Synchroni
return None;
}

debug!("No pending get block for {}", id);
trace!("No pending get block for {}", id);

// Add on a vector the receivers
let batch_receivers = self.send_batch_requests(certificate.header.clone()).await;
let batch_receivers = self
.send_batch_requests(id, certificate.header.clone())
.await;

let fut = Self::wait_for_all_batches(id, batch_receivers);

Expand Down Expand Up @@ -675,9 +681,18 @@ impl<SynchronizerHandler: Handler + Send + Sync + 'static> BlockWaiter<Synchroni
match self.pending_get_block.remove(&block_id) {
Some(certificate) => {
for (digest, _) in certificate.header.payload {
// unlock the pending request - mostly about the
// timed out requests.
self.tx_pending_batch.remove(&digest);
// Although we expect the entries to have been cleaned up by the moment
// they have been delivered (or error) still adding this here to ensure
// we don't miss any edge case and introduce memory leaks.
if let Some(senders) = self.tx_pending_batch.get_mut(&digest) {
senders.remove(&block_id);

// if no more senders in the map then remove entirely
// the map for the digest
if senders.is_empty() {
self.tx_pending_batch.remove(&digest);
}
}
}
}
None => {
Expand All @@ -695,42 +710,52 @@ impl<SynchronizerHandler: Handler + Send + Sync + 'static> BlockWaiter<Synchroni
// channel of the fetched batch.
async fn send_batch_requests(
&mut self,
block_id: CertificateDigest,
header: Header,
) -> Vec<(BatchDigest, oneshot::Receiver<BatchResult>)> {
// Get the "now" time
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Failed to measure time")
.as_millis();

// Add the receivers to a vector
let mut batch_receivers = Vec::new();

// otherwise we send requests to all workers to send us their batches
for (digest, worker_id) in header.payload {
debug!(
"Sending batch {} request to worker id {}",
digest.clone(),
worker_id
);

let worker_address = self
.committee
.worker(&self.name, &worker_id)
.expect("Worker id not found")
.primary_to_worker;

let message = PrimaryWorkerMessage::RequestBatch(digest);

self.worker_network
.unreliable_send(worker_address, &message)
.await;

// mark it as pending batch. Since we assume that batches are unique
// per block, a clean up on a block request will also clean
// up all the pending batch requests.
// Although we expect our headers to reference to unique batch ids it is
// possible for a batch with the same id to be produced if the exact same
// transactions are posted and included to a batch. Although unlikely it's
// still a possibility and this component should be prepared for it.
let (tx, rx) = oneshot::channel();
self.tx_pending_batch.insert(digest, (tx, now));

if let Some(map) = self.tx_pending_batch.get_mut(&digest) {
debug!(
"Skip sending request for batch {} to worker id {}, already pending",
digest.clone(),
worker_id
);

map.insert(block_id, tx);
} else {
self.tx_pending_batch
.entry(digest)
.or_default()
.insert(block_id, tx);

debug!(
"Sending batch {} request to worker id {}",
digest.clone(),
worker_id
);

let worker_address = self
.committee
.worker(&self.name, &worker_id)
.expect("Worker id not found")
.primary_to_worker;

let message = PrimaryWorkerMessage::RequestBatch(digest);

self.worker_network
.unreliable_send(worker_address, &message)
.await;
}

// add the receiver to a vector to poll later
batch_receivers.push((digest, rx));
Expand All @@ -743,11 +768,11 @@ impl<SynchronizerHandler: Handler + Send + Sync + 'static> BlockWaiter<Synchroni
let batch_id: BatchDigest = result.clone().map_or_else(|e| e.id, |r| r.id);

match self.tx_pending_batch.remove(&batch_id) {
Some((sender, _)) => {
debug!("Sending BatchResult with id {}", &batch_id);
sender
.send(result)
.expect("Couldn't send BatchResult for pending batch");
Some(respond_to) => {
for (id, s) in respond_to {
let _ = s.send(result.clone())
.tap_err(|err| error!("Couldn't send batch result {batch_id} message to channel [{err:?}] for block_id {id}"));
}
}
None => {
warn!("Couldn't find pending batch with id {}", &batch_id);
Expand Down
62 changes: 52 additions & 10 deletions primary/src/tests/block_waiter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,21 @@ async fn test_successfully_retrieve_multiple_blocks() {
let mut expected_get_block_responses = Vec::new();
let mut certificates = Vec::new();

for _ in 0..2 {
// Batches to be used as "commons" between headers
// Practically we want to test the case where different headers happen
// to refer to batches with same id.
let common_batch_1 = fixture_batch_with_transactions(10);
let common_batch_2 = fixture_batch_with_transactions(10);

for i in 0..10 {
let mut builder = fixture_header_builder();

let batch_1 = fixture_batch_with_transactions(10);
let batch_2 = fixture_batch_with_transactions(10);

let header = fixture_header_builder()
builder = builder
.with_payload_batch(batch_1.clone(), worker_id)
.with_payload_batch(batch_2.clone(), worker_id)
.build(&key)
.unwrap();

let certificate = certificate(&header);
certificates.push(certificate.clone());

block_ids.push(certificate.digest());
.with_payload_batch(batch_2.clone(), worker_id);

expected_batch_messages.insert(
batch_1.digest(),
Expand Down Expand Up @@ -185,9 +186,50 @@ async fn test_successfully_retrieve_multiple_blocks() {
},
];

// The first 5 headers will have unique payload.
// The next 5 will be created with common payload (some similar
// batches will be used)
if i > 5 {
builder = builder
.with_payload_batch(common_batch_1.clone(), worker_id)
.with_payload_batch(common_batch_2.clone(), worker_id);

expected_batch_messages.insert(
common_batch_1.digest(),
BatchMessage {
id: common_batch_1.digest(),
transactions: common_batch_1.clone(),
},
);

expected_batch_messages.insert(
common_batch_2.digest(),
BatchMessage {
id: common_batch_2.digest(),
transactions: common_batch_2.clone(),
},
);

batches.push(BatchMessage {
id: common_batch_1.digest(),
transactions: common_batch_1.clone(),
});
batches.push(BatchMessage {
id: common_batch_2.digest(),
transactions: common_batch_2.clone(),
});
}

// sort the batches to make sure that the response is the expected one.
batches.sort_by(|a, b| a.id.cmp(&b.id));

let header = builder.build(&key).unwrap();

let certificate = certificate(&header);
certificates.push(certificate.clone());

block_ids.push(certificate.digest());

expected_get_block_responses.push(Ok(GetBlockResponse {
id: certificate.digest(),
batches,
Expand Down
1 change: 1 addition & 0 deletions workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ strsim-c38e5c1d305a1b54 = { package = "strsim", version = "0.8", default-feature
structopt = { version = "0.3" }
subtle = { version = "2", default-features = false, features = ["i128", "std"] }
sync_wrapper = { version = "0.1", default-features = false }
tap = { version = "1", default-features = false }
task-group = { version = "0.2", default-features = false }
telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "f4aa523d3029bd6a23bead5f04c182f2cfa04c5e", features = ["chrome", "jaeger", "opentelemetry", "opentelemetry-jaeger", "tracing-chrome", "tracing-opentelemetry"] }
tempfile = { version = "3", default-features = false }
Expand Down

0 comments on commit 3a2da22

Please sign in to comment.