diff --git a/Cargo.lock b/Cargo.lock index b03f30df8..529cac9e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2765,6 +2765,7 @@ dependencies = [ "prost", "rand 0.7.3", "serde", + "tap", "telemetry-subscribers", "tempfile", "test_utils", @@ -3750,6 +3751,12 @@ dependencies = [ "unicode-xid 0.2.3", ] +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + [[package]] name = "task-group" version = "0.2.2" @@ -5046,6 +5053,7 @@ dependencies = [ "syn 1.0.98", "sync_wrapper", "synstructure", + "tap", "task-group", "telemetry-subscribers", "tempfile", diff --git a/primary/Cargo.toml b/primary/Cargo.toml index c39e1e021..bec435a51 100644 --- a/primary/Cargo.toml +++ b/primary/Cargo.toml @@ -31,6 +31,7 @@ tokio-util = { version = "0.7.3", features = ["codec"] } tonic = "0.7.2" tower = "0.4.13" tracing = "0.1.36" +tap = "1.0.1" consensus = { path = "../consensus" } crypto = { path = "../crypto" } diff --git a/primary/src/block_waiter.rs b/primary/src/block_waiter.rs index 84869c808..5857b935f 100644 --- a/primary/src/block_waiter.rs +++ b/primary/src/block_waiter.rs @@ -14,14 +14,15 @@ use std::{ fmt, fmt::Formatter, sync::Arc, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::Duration, }; +use tap::TapFallible; use tokio::{ sync::{oneshot, watch}, task::JoinHandle, time::timeout, }; -use tracing::{debug, error, info, instrument, warn}; +use tracing::{debug, error, info, instrument, trace, warn}; use types::{ metered_channel::Receiver, BatchDigest, BatchMessage, BlockError, BlockErrorKind, BlockResult, Certificate, CertificateDigest, Header, PrimaryWorkerMessage, ReconfigureNotification, @@ -227,10 +228,13 @@ pub struct BlockWaiter { rx_batch_receiver: Receiver, /// Maps batch ids to channels that "listen" for arrived batch messages. - /// On the key we hold the batch id (we assume it's globally unique). - /// On the value we hold a tuple of the channel to communicate the result - /// to and also a timestamp of when the request was sent. - tx_pending_batch: HashMap, u128)>, + /// On the key we hold the batch id. + /// On the value we hold a map of CertificateDigest --> oneshot::Sender + /// as we might need to deliver the batch result for requests from multiple + /// certificates (although not really probable its still possible for batches of + /// same id to be included in multiple headers). + tx_pending_batch: + HashMap>>, /// A map that holds the channels we should notify with the /// GetBlock responses. @@ -592,10 +596,12 @@ impl BlockWaiter BlockWaiter { for (digest, _) in certificate.header.payload { - // unlock the pending request - mostly about the - // timed out requests. - self.tx_pending_batch.remove(&digest); + // Although we expect the entries to have been cleaned up by the moment + // they have been delivered (or error) still adding this here to ensure + // we don't miss any edge case and introduce memory leaks. + if let Some(senders) = self.tx_pending_batch.get_mut(&digest) { + senders.remove(&block_id); + + // if no more senders in the map then remove entirely + // the map for the digest + if senders.is_empty() { + self.tx_pending_batch.remove(&digest); + } + } } } None => { @@ -695,42 +710,52 @@ impl BlockWaiter Vec<(BatchDigest, oneshot::Receiver)> { - // Get the "now" time - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .expect("Failed to measure time") - .as_millis(); - // Add the receivers to a vector let mut batch_receivers = Vec::new(); // otherwise we send requests to all workers to send us their batches for (digest, worker_id) in header.payload { - debug!( - "Sending batch {} request to worker id {}", - digest.clone(), - worker_id - ); - - let worker_address = self - .committee - .worker(&self.name, &worker_id) - .expect("Worker id not found") - .primary_to_worker; - - let message = PrimaryWorkerMessage::RequestBatch(digest); - - self.worker_network - .unreliable_send(worker_address, &message) - .await; - - // mark it as pending batch. Since we assume that batches are unique - // per block, a clean up on a block request will also clean - // up all the pending batch requests. + // Although we expect our headers to reference to unique batch ids it is + // possible for a batch with the same id to be produced if the exact same + // transactions are posted and included to a batch. Although unlikely it's + // still a possibility and this component should be prepared for it. let (tx, rx) = oneshot::channel(); - self.tx_pending_batch.insert(digest, (tx, now)); + + if let Some(map) = self.tx_pending_batch.get_mut(&digest) { + debug!( + "Skip sending request for batch {} to worker id {}, already pending", + digest.clone(), + worker_id + ); + + map.insert(block_id, tx); + } else { + self.tx_pending_batch + .entry(digest) + .or_default() + .insert(block_id, tx); + + debug!( + "Sending batch {} request to worker id {}", + digest.clone(), + worker_id + ); + + let worker_address = self + .committee + .worker(&self.name, &worker_id) + .expect("Worker id not found") + .primary_to_worker; + + let message = PrimaryWorkerMessage::RequestBatch(digest); + + self.worker_network + .unreliable_send(worker_address, &message) + .await; + } // add the receiver to a vector to poll later batch_receivers.push((digest, rx)); @@ -743,11 +768,11 @@ impl BlockWaiter { - debug!("Sending BatchResult with id {}", &batch_id); - sender - .send(result) - .expect("Couldn't send BatchResult for pending batch"); + Some(respond_to) => { + for (id, s) in respond_to { + let _ = s.send(result.clone()) + .tap_err(|err| error!("Couldn't send batch result {batch_id} message to channel [{err:?}] for block_id {id}")); + } } None => { warn!("Couldn't find pending batch with id {}", &batch_id); diff --git a/primary/src/tests/block_waiter_tests.rs b/primary/src/tests/block_waiter_tests.rs index b562f3a90..e18db4ef4 100644 --- a/primary/src/tests/block_waiter_tests.rs +++ b/primary/src/tests/block_waiter_tests.rs @@ -143,20 +143,21 @@ async fn test_successfully_retrieve_multiple_blocks() { let mut expected_get_block_responses = Vec::new(); let mut certificates = Vec::new(); - for _ in 0..2 { + // Batches to be used as "commons" between headers + // Practically we want to test the case where different headers happen + // to refer to batches with same id. + let common_batch_1 = fixture_batch_with_transactions(10); + let common_batch_2 = fixture_batch_with_transactions(10); + + for i in 0..10 { + let mut builder = fixture_header_builder(); + let batch_1 = fixture_batch_with_transactions(10); let batch_2 = fixture_batch_with_transactions(10); - let header = fixture_header_builder() + builder = builder .with_payload_batch(batch_1.clone(), worker_id) - .with_payload_batch(batch_2.clone(), worker_id) - .build(&key) - .unwrap(); - - let certificate = certificate(&header); - certificates.push(certificate.clone()); - - block_ids.push(certificate.digest()); + .with_payload_batch(batch_2.clone(), worker_id); expected_batch_messages.insert( batch_1.digest(), @@ -185,9 +186,50 @@ async fn test_successfully_retrieve_multiple_blocks() { }, ]; + // The first 5 headers will have unique payload. + // The next 5 will be created with common payload (some similar + // batches will be used) + if i > 5 { + builder = builder + .with_payload_batch(common_batch_1.clone(), worker_id) + .with_payload_batch(common_batch_2.clone(), worker_id); + + expected_batch_messages.insert( + common_batch_1.digest(), + BatchMessage { + id: common_batch_1.digest(), + transactions: common_batch_1.clone(), + }, + ); + + expected_batch_messages.insert( + common_batch_2.digest(), + BatchMessage { + id: common_batch_2.digest(), + transactions: common_batch_2.clone(), + }, + ); + + batches.push(BatchMessage { + id: common_batch_1.digest(), + transactions: common_batch_1.clone(), + }); + batches.push(BatchMessage { + id: common_batch_2.digest(), + transactions: common_batch_2.clone(), + }); + } + // sort the batches to make sure that the response is the expected one. batches.sort_by(|a, b| a.id.cmp(&b.id)); + let header = builder.build(&key).unwrap(); + + let certificate = certificate(&header); + certificates.push(certificate.clone()); + + block_ids.push(certificate.digest()); + expected_get_block_responses.push(Ok(GetBlockResponse { id: certificate.digest(), batches, diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 227097c71..0615b18ad 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -262,6 +262,7 @@ strsim-c38e5c1d305a1b54 = { package = "strsim", version = "0.8", default-feature structopt = { version = "0.3" } subtle = { version = "2", default-features = false, features = ["i128", "std"] } sync_wrapper = { version = "0.1", default-features = false } +tap = { version = "1", default-features = false } task-group = { version = "0.2", default-features = false } telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "f4aa523d3029bd6a23bead5f04c182f2cfa04c5e", features = ["chrome", "jaeger", "opentelemetry", "opentelemetry-jaeger", "tracing-chrome", "tracing-opentelemetry"] } tempfile = { version = "3", default-features = false }