Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
Adding method to synchronizer to sync block payload. Added tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed May 16, 2022
1 parent fdd4f0b commit 0dd2788
Show file tree
Hide file tree
Showing 7 changed files with 422 additions and 61 deletions.
72 changes: 67 additions & 5 deletions primary/src/block_synchronizer/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0
use crate::{
block_synchronizer::{
handler::Error::{BlockDeliveryTimeout, BlockNotFound, Internal},
BlockSynchronizeResult, Command,
handler::Error::{BlockDeliveryTimeout, BlockNotFound, Internal, PayloadSyncError},
BlockSynchronizeResult, Command, SyncError,
},
BlockHeader,
};
Expand All @@ -19,7 +19,7 @@ use tokio::{
sync::mpsc::{channel, Sender},
time::timeout,
};
use tracing::{debug, error, instrument};
use tracing::{debug, error, instrument, trace};
use types::{Certificate, CertificateDigest, PrimaryMessage};

#[cfg(test)]
Expand All @@ -38,14 +38,21 @@ pub enum Error {

#[error("Timed out while waiting for {block_id} to become available after submitting for processing")]
BlockDeliveryTimeout { block_id: CertificateDigest },

#[error("Payload for block with {block_id} couldn't be synchronized: {error}")]
PayloadSyncError {
block_id: CertificateDigest,
error: SyncError,
},
}

impl Error {
pub fn block_id(&self) -> CertificateDigest {
match *self {
BlockNotFound { block_id }
| Internal { block_id }
| BlockDeliveryTimeout { block_id } => block_id,
| BlockDeliveryTimeout { block_id }
| PayloadSyncError { block_id, .. } => block_id,
}
}
}
Expand Down Expand Up @@ -78,6 +85,13 @@ pub trait Handler<PublicKey: VerifyingKey> {
&self,
block_ids: Vec<CertificateDigest>,
) -> Vec<BlockSynchronizeResult<BlockHeader<PublicKey>>>;

/// Synchronizes the block payload for the provided certificates via the
/// block synchronizer and returns the result back.
async fn synchronize_block_payloads(
&self,
certificates: Vec<Certificate<PublicKey>>,
) -> Vec<Result<Certificate<PublicKey>, Error>>;
}

/// A helper struct to allow us access the block_synchronizer in a synchronous
Expand Down Expand Up @@ -238,7 +252,7 @@ impl<PublicKey: VerifyingKey> Handler<PublicKey> for BlockSynchronizerHandler<Pu
loop {
match rx.recv().await {
None => {
debug!("Channel closed when getting certificates, no more messages to get");
trace!("Channel closed when getting certificates, no more messages to get");
break;
}
Some(result) => results.push(result),
Expand All @@ -247,4 +261,52 @@ impl<PublicKey: VerifyingKey> Handler<PublicKey> for BlockSynchronizerHandler<Pu

results
}

#[instrument(level = "debug", skip_all)]
async fn synchronize_block_payloads(
&self,
certificates: Vec<Certificate<PublicKey>>,
) -> Vec<Result<Certificate<PublicKey>, Error>> {
let (tx, mut rx) = channel(certificates.len());

self.tx_block_synchronizer
.send(Command::SynchronizeBlockPayload {
certificates,
respond_to: tx,
})
.await
.expect("Couldn't send message to block synchronizer");

// now wait to retrieve all the results
let mut results = Vec::new();

// We want to block and wait until we get all the results back.
loop {
match rx.recv().await {
None => {
trace!("Channel closed when getting results, no more messages to get");
break;
}
Some(result) => {
let r = result
.map(|h| h.certificate)
.map_err(|e| Error::PayloadSyncError {
block_id: e.block_id(),
error: e,
});

if let Err(err) = r {
error!(
"Error for payload synchronization with block id {}, error: {err}",
err.block_id()
);
}

results.push(r)
}
}
}

results
}
}
84 changes: 82 additions & 2 deletions primary/src/block_synchronizer/mock.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::block_synchronizer::{BlockHeader, BlockSynchronizeResult, Command};
use crypto::traits::VerifyingKey;
use crypto::{traits::VerifyingKey, Hash};
use std::collections::HashMap;
use tokio::sync::{
mpsc::{channel, Receiver, Sender},
Expand All @@ -17,15 +17,33 @@ enum Core<PublicKey: VerifyingKey> {
result: Vec<BlockSynchronizeResult<BlockHeader<PublicKey>>>,
ready: oneshot::Sender<()>,
},
SynchronizeBlockPayload {
block_ids: Vec<CertificateDigest>,
times: u32,
result: Vec<BlockSynchronizeResult<BlockHeader<PublicKey>>>,
ready: oneshot::Sender<()>,
},
AssertExpectations {
ready: oneshot::Sender<()>,
},
}

struct MockBlockSynchronizerCore<PublicKey: VerifyingKey> {
/// A map that holds the expected requests for sync block headers and their
/// stubbed response.
block_headers_expected_requests:
HashMap<Vec<CertificateDigest>, (u32, Vec<BlockSynchronizeResult<BlockHeader<PublicKey>>>)>,

/// A map that holds the expected requests for sync block payload and their
/// stubbed response.
block_payload_expected_requests:
HashMap<Vec<CertificateDigest>, (u32, Vec<BlockSynchronizeResult<BlockHeader<PublicKey>>>)>,

/// Channel to receive the messages that are supposed to be sent to the
/// block synchronizer.
rx_commands: Receiver<Command<PublicKey>>,

/// Channel to receive the commands to mock the requests.
rx_core: Receiver<Core<PublicKey>>,
}

Expand All @@ -50,7 +68,22 @@ impl<PublicKey: VerifyingKey> MockBlockSynchronizerCore<PublicKey> {
respond_to.send(result).await.expect("Couldn't send message");
}
}
Command::SynchronizeBlockPayload { .. } => {}
Command::SynchronizeBlockPayload { certificates, respond_to } => {
let block_ids = certificates.into_iter().map(|c|c.digest()).collect();
let (times, results) = self
.block_payload_expected_requests
.remove(&block_ids)
.unwrap_or_else(||panic!("{}", format!("Unexpected call received for SynchronizeBlockPayload, {:?}", block_ids).as_str()))
.to_owned();

if times > 1 {
self.block_payload_expected_requests.insert(block_ids, (times - 1, results.clone()));
}

for result in results {
respond_to.send(result).await.expect("Couldn't send message");
}
}
}
}
Some(command) = self.rx_core.recv() => {
Expand All @@ -64,6 +97,15 @@ impl<PublicKey: VerifyingKey> MockBlockSynchronizerCore<PublicKey> {
self.block_headers_expected_requests.insert(block_ids, (times, result));
ready.send(()).expect("Failed to send ready message");
},
Core::SynchronizeBlockPayload {
block_ids,
times,
result,
ready,
} => {
self.block_payload_expected_requests.insert(block_ids, (times, result));
ready.send(()).expect("Failed to send ready message");
},
Core::AssertExpectations {ready} => {
self.assert_expectations();
ready.send(()).expect("Failed to send ready message");
Expand All @@ -87,6 +129,16 @@ impl<PublicKey: VerifyingKey> MockBlockSynchronizerCore<PublicKey> {
);
}

for (ids, results) in &self.block_payload_expected_requests {
result.push_str(
format!(
"SynchronizeBlockPayload, ids={:?}, results={:?}",
ids, results
)
.as_str(),
);
}

if !result.is_empty() {
panic!(
"There are expected calls that haven't been fulfilled \n\n {}",
Expand All @@ -109,6 +161,7 @@ impl<PublicKey: VerifyingKey> MockBlockSynchronizer<PublicKey> {

let mut core = MockBlockSynchronizerCore {
block_headers_expected_requests: HashMap::new(),
block_payload_expected_requests: HashMap::new(),
rx_commands,
rx_core,
};
Expand Down Expand Up @@ -145,6 +198,33 @@ impl<PublicKey: VerifyingKey> MockBlockSynchronizer<PublicKey> {
Self::await_channel(rx).await;
}

/// A method that allow us to mock responses for the
/// SynchronizeBlockPayload requests. It has to be noted that we use
/// the block_ids as a way to identify the expected certificates for
/// the request since that on its own suffice to identify them.
/// `block_ids`: The block_ids we expect
/// `results`: The results we would like to respond with
/// `times`: How many times we should expect to be called.
pub async fn expect_synchronize_block_payload(
&self,
block_ids: Vec<CertificateDigest>,
result: Vec<BlockSynchronizeResult<BlockHeader<PublicKey>>>,
times: u32,
) {
let (tx, rx) = oneshot::channel();
self.tx_core
.send(Core::SynchronizeBlockPayload {
block_ids,
times,
result,
ready: tx,
})
.await
.expect("Failed to send mock expectation");

Self::await_channel(rx).await;
}

/// Asserts that all the expectations have been fulfilled and no
/// expectation has been left without having been called.
pub async fn assert_expectations(&self) {
Expand Down
77 changes: 74 additions & 3 deletions primary/src/block_synchronizer/tests/handler_tests.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{
block_synchronizer::handler::{BlockSynchronizerHandler, Error, Handler},
block_synchronizer::{
handler::{BlockSynchronizerHandler, Error, Handler},
SyncError,
},
common::create_db_stores,
BlockHeader, MockBlockSynchronizer,
};
use crypto::Hash;
use crypto::{ed25519::Ed25519PublicKey, Hash};
use std::{collections::HashSet, time::Duration};
use test_utils::{certificate, fixture_header_with_payload};
use tokio::sync::mpsc::channel;
use types::{CertificateDigest, PrimaryMessage};
use types::{Certificate, CertificateDigest, PrimaryMessage};

#[tokio::test]
async fn test_get_and_synchronize_block_headers_when_fetched_from_storage() {
Expand Down Expand Up @@ -215,3 +218,71 @@ async fn test_get_and_synchronize_block_headers_timeout_on_causal_completion() {
// AND
mock_synchronizer.assert_expectations().await;
}

#[tokio::test]
async fn test_synchronize_block_payload() {
// GIVEN
let (_, certificate_store, payload_store) = create_db_stores();
let (tx_block_synchronizer, rx_block_synchronizer) = channel(1);
let (tx_core, _rx_core) = channel(1);

let synchronizer = BlockSynchronizerHandler {
tx_block_synchronizer,
tx_core,
certificate_store: certificate_store.clone(),
certificate_deliver_timeout: Duration::from_millis(2_000),
};

// AND a certificate with payload already available
let cert_stored: Certificate<Ed25519PublicKey> = certificate(&fixture_header_with_payload(1));
for e in cert_stored.clone().header.payload {
payload_store.write(e, 1).await;
}

// AND a certificate with payload NOT available
let cert_missing = certificate(&fixture_header_with_payload(2));

// AND
let block_ids = vec![cert_stored.digest(), cert_missing.digest()];

// AND mock the block_synchronizer where the certificate is fetched
// from peers (fetched_from_storage = false)
let mock_synchronizer = MockBlockSynchronizer::new(rx_block_synchronizer);
let expected_result = vec![
Ok(BlockHeader {
certificate: cert_stored.clone(),
fetched_from_storage: true,
}),
Err(SyncError::NoResponse {
block_id: cert_missing.digest(),
}),
];
mock_synchronizer
.expect_synchronize_block_payload(block_ids.clone(), expected_result, 1)
.await;

// WHEN
let result = synchronizer
.synchronize_block_payloads(vec![cert_stored.clone(), cert_missing.clone()])
.await;

// THEN
assert_eq!(result.len(), 2);

// AND
for r in result {
if let Ok(cert) = r {
assert_eq!(cert_stored.digest(), cert.digest());
} else {
match r.err().unwrap() {
Error::PayloadSyncError { block_id, .. } => {
assert_eq!(cert_missing.digest(), block_id)
}
_ => panic!("Unexpected error returned"),
}
}
}

// AND
mock_synchronizer.assert_expectations().await;
}
Loading

0 comments on commit 0dd2788

Please sign in to comment.