Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

[refactor / possible bug] worker synchronizer to reply back for available batches #837

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions primary/src/payload_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ use tokio::task::JoinHandle;
use tracing::info;
use types::{metered_channel::Receiver, BatchDigest};

#[cfg(test)]
#[path = "tests/payload_receiver_tests.rs"]
mod payload_receiver_tests;

/// Receives batches' digests of other authorities. These are only needed to verify incoming
/// headers (i.e.. make sure we have their payload).
pub struct PayloadReceiver {
Expand Down
30 changes: 30 additions & 0 deletions primary/src/tests/payload_receiver_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::common::create_db_stores;
use crate::payload_receiver::PayloadReceiver;
use types::BatchDigest;

#[tokio::test]
async fn receive_batch() {
// GIVEN
let worker_id = 5;
let digest = BatchDigest::new([5u8; 32]);
let (tx_workers, rx_workers) = test_utils::test_channel!(1);
let (_, _, payload_store) = create_db_stores();

let _handle = PayloadReceiver::spawn(payload_store.clone(), rx_workers);

for _ in 0..4 {
// WHEN - irrespective of how many times will send the same (digest, worker_id)
tx_workers.send((digest, worker_id)).await.unwrap();

// THEN we expected to be stored successfully (no errors thrown) and have an
// idempotent behaviour.
let result = payload_store
.notify_read((digest, worker_id))
.await
.unwrap();

assert_eq!(result.unwrap(), 0u8);
}
}
84 changes: 56 additions & 28 deletions worker/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,22 @@ use crypto::PublicKey;
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt as _};
use network::{LuckyNetwork, UnreliableNetwork, WorkerNetwork};
use primary::PrimaryWorkerMessage;
use std::collections::HashSet;
use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use store::{Store, StoreError};
use tap::TapFallible;
use tap::TapOptional;
use tokio::{
sync::{mpsc, watch},
task::JoinHandle,
time::{sleep, Duration, Instant},
};
use tracing::{debug, error, warn};
use tracing::{debug, error, trace, warn};
use types::error::DagError;
use types::{
metered_channel::{Receiver, Sender},
BatchDigest, ReconfigureNotification, Round, SerializedBatchMessage, WorkerMessage,
Expand Down Expand Up @@ -138,55 +141,80 @@ impl Synchronizer {
// Handle primary's messages.
Some(message) = self.rx_message.recv() => match message {
PrimaryWorkerMessage::Synchronize(digests, target) => {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Failed to measure time")
.as_millis();
let mut missing = HashSet::new();
let mut available = HashSet::new();

let mut missing = Vec::new();
for digest in digests {
for digest in digests.iter() {
// Ensure we do not send twice the same sync request.
if self.pending.contains_key(&digest) {
if self.pending.contains_key(digest) {
continue;
}

// Check if we received the batch in the meantime.
match self.store.read(digest).await {
match self.store.read(*digest).await {
Ok(None) => {
missing.push(digest);
missing.insert(*digest);
debug!("Requesting sync for batch {digest}");
},
Ok(Some(_)) => {
// The batch arrived in the meantime: no need to request it.
available.insert(*digest);
trace!("Digest {digest} already in store, nothing to sync");
continue;
},
Err(e) => {
error!("{e}");
continue;
}
}
};
}

// Add the digest to the waiter.
let deliver = digest;
let (tx_cancel, rx_cancel) = mpsc::channel(1);
let fut = Self::waiter(digest, self.store.clone(), deliver, rx_cancel);
waiting.push(fut);
self.pending.insert(digest, (self.round, tx_cancel, now));
// reply back immediately for the available ones
if !available.is_empty() {
// Doing this will ensure the batch id will be populated to primary even
// when other processes fail to do so (ex we received a batch from a peer
// worker and message has been missed by primary).
for digest in available {
let message = WorkerPrimaryMessage::OthersBatch(digest, self.id);
let _ = self.tx_primary.send(message).await.tap_err(|err|{
debug!("{err:?} {}", DagError::ShuttingDown);
});
}
}

// Send sync request to a single node. If this fails, we will send it
// to other nodes when a timer times out.
let address = match self.worker_cache.load().worker(&target, &self.id) {
Ok(address) => address.worker_to_worker,
Err(e) => {
error!("The primary asked us to sync with an unknown node: {e}");
continue;
if !missing.is_empty() {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Failed to measure time")
.as_millis();

// now add all requests as pending
for digest in missing.iter() {
// Add the digest to the waiter.
let deliver = *digest;
let (tx_cancel, rx_cancel) = mpsc::channel(1);
let fut = Self::waiter(*digest, self.store.clone(), deliver, rx_cancel);
waiting.push(fut);
self.pending.insert(*digest, (self.round, tx_cancel, now));
}
};

debug!("Sending BatchRequest message to {} for missing batches {:?}", address, missing.clone());
// Send sync request to a single node. If this fails, we will send it
// to other nodes when a timer times out.
let address = match self.worker_cache.load().worker(&target, &self.id) {
Ok(address) => address.worker_to_worker,
Err(e) => {
error!("The primary asked us to sync with an unknown node: {e}");
continue;
}
};

let message = WorkerMessage::BatchRequest(missing, self.name.clone());
self.network.unreliable_send(address, &message).await;
debug!("Sending BatchRequest message to {} for missing batches {:?}", address, missing.clone());

let message = WorkerMessage::BatchRequest(missing.into_iter().collect::<Vec<_>>(), self.name.clone());
self.network.unreliable_send(address, &message).await;
} else {
debug!("All batches are already available {:?} nothing to request from peers", digests);
}
},
PrimaryWorkerMessage::Cleanup(round) => {
// Keep track of the primary's round number.
Expand Down
105 changes: 84 additions & 21 deletions worker/src/tests/processor_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use store::rocks;
use test_utils::{batch, committee, temp_dir};

#[tokio::test]
async fn hash_and_store() {
async fn hash_and_store_our_batch() {
// GIVEN
let (tx_batch, rx_batch) = test_utils::test_channel!(1);
let (tx_digest, mut rx_digest) = test_utils::test_channel!(1);

Expand All @@ -17,13 +18,7 @@ async fn hash_and_store() {
watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));

// Create a new test store.
let db = rocks::DBMap::<BatchDigest, SerializedBatchMessage>::open(
temp_dir(),
None,
Some("batches"),
)
.unwrap();
let store = Store::new(db);
let store = create_batches_store();

// Spawn a new `Processor` instance.
let id = 0;
Expand All @@ -40,20 +35,88 @@ async fn hash_and_store() {
let batch = batch();
let message = WorkerMessage::Batch(batch.clone());
let serialized = bincode::serialize(&message).unwrap();
tx_batch.send(serialized.clone()).await.unwrap();

// Ensure the `Processor` outputs the batch's digest.
let digest = batch.digest();
match rx_digest.recv().await.unwrap() {
WorkerPrimaryMessage::OurBatch(x, y) => {
assert_eq!(x, digest);
assert_eq!(y, id);

// the process should be idempotent - no matter how many times we write
// the same batch it should be stored and output the message to the tx_digest channel
for _ in 0..3 {
// WHEN
tx_batch.send(serialized.clone()).await.unwrap();

// THEN
// Ensure the `Processor` outputs the batch's digest.
let digest = batch.digest();
match rx_digest.recv().await.unwrap() {
WorkerPrimaryMessage::OurBatch(x, y) => {
assert_eq!(x, digest);
assert_eq!(y, id);
}
_ => panic!("Unexpected protocol message"),
}
_ => panic!("Unexpected protocol message"),

// Ensure the `Processor` correctly stored the batch.
let stored_batch = store.read(digest).await.unwrap();
assert!(stored_batch.is_some(), "The batch is not in the store");
assert_eq!(stored_batch.unwrap(), serialized);
}
}

#[tokio::test]
async fn hash_and_store_others_batch() {
// GIVEN
let (tx_batch, rx_batch) = test_utils::test_channel!(1);
let (tx_digest, mut rx_digest) = test_utils::test_channel!(1);

let committee = committee(None).clone();
let (_tx_reconfiguration, rx_reconfiguration) =
watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));

// Ensure the `Processor` correctly stored the batch.
let stored_batch = store.read(digest).await.unwrap();
assert!(stored_batch.is_some(), "The batch is not in the store");
assert_eq!(stored_batch.unwrap(), serialized);
// Create a new test store.
let store = create_batches_store();

// Spawn a new `Processor` instance.
let id = 0;
let _processor_handler = Processor::spawn(
id,
store.clone(),
rx_reconfiguration,
rx_batch,
tx_digest,
/* own_batch */ false,
);

// Send a batch to the `Processor`.
let batch = batch();
let message = WorkerMessage::Batch(batch.clone());
let serialized = bincode::serialize(&message).unwrap();

for _ in 0..3 {
// WHEN
tx_batch.send(serialized.clone()).await.unwrap();

// THEN
// Ensure the `Processor` outputs the batch's digest.
let digest = batch.digest();
match rx_digest.recv().await.unwrap() {
WorkerPrimaryMessage::OthersBatch(x, y) => {
assert_eq!(x, digest);
assert_eq!(y, id);
}
_ => panic!("Unexpected protocol message"),
}

// Ensure the `Processor` correctly stored the batch.
let stored_batch = store.read(digest).await.unwrap();
assert!(stored_batch.is_some(), "The batch is not in the store");
assert_eq!(stored_batch.unwrap(), serialized);
}
}

fn create_batches_store() -> Store<BatchDigest, SerializedBatchMessage> {
let db = rocks::DBMap::<BatchDigest, SerializedBatchMessage>::open(
temp_dir(),
None,
Some("batches"),
)
.unwrap();
Store::new(db)
}
74 changes: 74 additions & 0 deletions worker/src/tests/synchronizer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::*;
use arc_swap::ArcSwap;
use fastcrypto::traits::KeyPair;
use prometheus::Registry;
use test_utils::committee;
use test_utils::{
batch, batch_digest, batches, keys, open_batch_store, pure_committee_from_keys,
resolve_name_committee_and_worker_cache, serialize_batch_message,
Expand Down Expand Up @@ -70,6 +71,79 @@ async fn synchronize() {
assert_eq!(handle.recv().await.unwrap().payload, serialized);
}

#[tokio::test]
async fn synchronize_when_batch_exists() {
let (tx_message, rx_message) = test_utils::test_channel!(1);
let (tx_primary, mut rx_primary) = test_utils::test_channel!(1);

let mut keys = keys(None);
let worker_cache = shared_worker_cache_from_keys(&keys);
let name = keys.pop().unwrap().public().clone();
let id = 0;

let committee = committee(None);
let (tx_reconfiguration, _rx_reconfiguration) =
watch::channel(ReconfigureNotification::NewEpoch(committee.clone()));

// Create a new test store.
let store = open_batch_store();

let metrics = Arc::new(WorkerMetrics::new(&Registry::new()));

// Spawn a `Synchronizer` instance.
let _synchronizer_handle = Synchronizer::spawn(
name.clone(),
id,
Arc::new(ArcSwap::from_pointee(committee.clone())),
worker_cache.clone(),
store.clone(),
/* gc_depth */ 50, // Not used in this test.
/* sync_retry_delay */
Duration::from_millis(1_000_000), // Ensure it is not triggered.
/* sync_retry_nodes */ 3, // Not used in this test.
rx_message,
tx_reconfiguration,
tx_primary,
metrics,
WorkerNetwork::default(),
);

// Spawn a listener to receive our batch requests.
let target = keys.pop().unwrap().public().clone();
let address = worker_cache
.load()
.worker(&target, &id)
.unwrap()
.worker_to_worker;
let batch_id = batch_digest();
let missing = vec![batch_id];

// now store the batch
let payload = vec![10u8];
store.write(batch_id, payload).await;

let mut handle = WorkerToWorkerMockServer::spawn(address);

// Send a sync request.
let message = PrimaryWorkerMessage::Synchronize(missing, target);
tx_message.send(message).await.unwrap();

// Ensure the target does NOT receive the sync request - we practically timeout waiting.
let result = timeout(Duration::from_secs(1), handle.recv()).await;
assert!(result.is_err());

// Now ensure that the batch is forwarded directly to primary
let result_batch_message: WorkerPrimaryMessage = rx_primary.recv().await.unwrap();

match result_batch_message {
WorkerPrimaryMessage::OthersBatch(result_digest, worker_id) => {
assert_eq!(result_digest, batch_id, "Batch id mismatch");
assert_eq!(worker_id, id, "Worker id mismatch");
}
_ => panic!("Unexpected message received!"),
}
}

#[tokio::test]
async fn test_successful_request_batch() {
let (tx_message, rx_message) = test_utils::test_channel!(1);
Expand Down