Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
rebased to main. Refactored integration test to check the sync payloa…
Browse files Browse the repository at this point in the history
…d and fetch them
  • Loading branch information
akichidis committed May 13, 2022
1 parent 3b8c938 commit 0ad2d6b
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 48 deletions.
2 changes: 2 additions & 0 deletions primary/src/block_synchronizer/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ impl<PublicKey: VerifyingKey> Handler<PublicKey> for BlockSynchronizerHandler<Pu
break;
}
Some(result) => {
println!("Received result {:?}", result.clone());

let r = result
.map(|h| h.certificate)
.map_err(|e| Error::PayloadSyncError {
Expand Down
2 changes: 1 addition & 1 deletion primary/src/block_synchronizer/tests/handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ async fn test_synchronize_block_payload() {
certificate: cert_stored.clone(),
fetched_from_storage: true,
}),
Err(SyncError::NotFound {
Err(SyncError::NoResponse {
block_id: cert_missing.digest(),
}),
];
Expand Down
11 changes: 8 additions & 3 deletions primary/src/block_waiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ type RequestKey = Vec<u8>;
/// vec![]
/// }
///
/// async fn synchronize_block_payloads(&self, certificates: Vec<Certificate<PublicKey>>) -> Vec<Result<Certificate<PublicKey>, Error>> {
/// vec![]
/// }
///
/// }
///
/// #[tokio::main(flavor = "current_thread")]
Expand Down Expand Up @@ -432,7 +436,7 @@ impl<PublicKey: VerifyingKey, SynchronizerHandler: Handler<PublicKey> + Send + S
let mut ids = Vec::new();

// ensure payloads are synchronized for the found certificates
let found_certificates = certificates
let found_certificates: Vec<Certificate<PublicKey>> = certificates
.clone()
.into_iter()
.filter(|(_, c)| c.is_some())
Expand All @@ -444,6 +448,7 @@ impl<PublicKey: VerifyingKey, SynchronizerHandler: Handler<PublicKey> + Send + S
.synchronize_block_payloads(found_certificates)
.await;
let successful_payload_sync_set = sync_result
.clone()
.into_iter()
.flatten()
.map(|c| c.digest())
Expand All @@ -468,7 +473,7 @@ impl<PublicKey: VerifyingKey, SynchronizerHandler: Handler<PublicKey> + Send + S
get_block_sender
.send(Err(BlockError {
id,
error: BlockErrorType::BatchError,
error: BlockErrorKind::BatchError,
}))
.expect("Couldn't send BatchError error for a GetBlocks request");
}
Expand Down Expand Up @@ -513,7 +518,7 @@ impl<PublicKey: VerifyingKey, SynchronizerHandler: Handler<PublicKey> + Send + S
sender
.send(Err(BlockError {
id,
error: BlockErrorType::BatchError,
error: BlockErrorKind::BatchError,
}))
.expect("Couldn't send message back to sender");

Expand Down
91 changes: 52 additions & 39 deletions primary/tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use blake2::digest::Update;
use config::{Parameters, WorkerId};
use crypto::{
ed25519::{Ed25519KeyPair, Ed25519PublicKey},
Expand All @@ -9,15 +10,18 @@ use crypto::{
};
use node::NodeStorage;
use primary::{PayloadToken, Primary, CHANNEL_CAPACITY};
use std::time::Duration;
use std::{
collections::{BTreeMap, HashMap},
time::Duration,
};
use store::Store;
use test_utils::{
certificate, committee, fixture_batch_with_transactions, fixture_header_builder, keys, temp_dir,
};
use tokio::sync::mpsc::channel;
use tonic::transport::Channel;
use types::{
BatchDigest, Certificate, CertificateDigest, CertificateDigestProto, CollectionErrorType,
Batch, BatchDigest, Certificate, CertificateDigest, CertificateDigestProto,
CollectionRetrievalResult, Empty, GetCollectionsRequest, Header, HeaderDigest,
RemoveCollectionsRequest, RetrievalResult, ValidatorClient,
};
Expand Down Expand Up @@ -371,11 +375,10 @@ async fn test_remove_collections() {
/// collections for both certificates. Since primary 1 knows only about the
/// certificate 1 we expect to sync with primary 2 to fetch the unknown
/// certificate 2 after it has been processed for causal completion & validation.
///
/// TODO: the code will not successfully fetch the batches for certificate 2
/// since we don't block on waiting to sync the batches. That will be included
/// in a follow up PR (see https://github.com/MystenLabs/narwhal/issues/223).
/// Then this test will be refactored to test only for successful responses.
/// We also expect to synchronize the missing batches of the missing certificate
/// from primary 2. All in all the end goal is to:
/// * Primary 1 be able to retrieve both certificates 1 & 2 successfully
/// * Primary 1 be able to fetch the payload for certificates 1 & 2
#[tokio::test]
async fn test_get_collections_with_missing_certificates() {
// GIVEN keys for two primary nodes
Expand Down Expand Up @@ -403,7 +406,7 @@ async fn test_get_collections_with_missing_certificates() {
let signer_1 = keys().remove(0);

// The certificate_1 will be stored in primary 1
let certificate_1 = fixture_certificate(
let (certificate_1, batch_digest_1, batch_1) = fixture_certificate(
signer_1,
store_primary_1.header_store.clone(),
store_primary_1.certificate_store.clone(),
Expand All @@ -416,7 +419,7 @@ async fn test_get_collections_with_missing_certificates() {
let signer_2 = keys().remove(1);

// The certificate_2 will be stored in primary 2
let certificate_2 = fixture_certificate(
let (certificate_2, batch_digest_2, batch_2) = fixture_certificate(
signer_2,
store_primary_2.header_store.clone(),
store_primary_2.certificate_store.clone(),
Expand All @@ -425,6 +428,11 @@ async fn test_get_collections_with_missing_certificates() {
)
.await;

// AND keep a map of batches and payload
let mut batches_map = HashMap::new();
batches_map.insert(batch_digest_1, batch_1);
batches_map.insert(batch_digest_2, batch_2);

let block_ids = vec![certificate_1.digest(), certificate_2.digest()];

// Spawn the primary 1 (which will be the one that we'll interact with)
Expand Down Expand Up @@ -498,34 +506,28 @@ async fn test_get_collections_with_missing_certificates() {

// We expect to get successfully the batches only for the one collection
assert_eq!(
1,
2,
actual_result
.iter()
.filter(|&r| matches!(r.retrieval_result, Some(types::RetrievalResult::Batch(_))))
.count()
);

// The second certificate, which was missing from the primary 1, we expect
// to have fetched it but not be able to fetch its payload. So the expected
// error would be CollectionError.
// TODO: once we plugin the block_synchronizer to sync missing payload as well
// then this error shouldn't be thrown anymore
// (see https://github.com/MystenLabs/narwhal/issues/223)
let error: &CollectionRetrievalResult = actual_result
.iter()
.find(|&r| matches!(r.retrieval_result, Some(types::RetrievalResult::Error(_))))
.unwrap();

match error.retrieval_result.as_ref().unwrap() {
RetrievalResult::Error(err) => {
assert_eq!(err.error, CollectionErrorType::CollectionError as i32);
assert_eq!(
err.id.as_ref().unwrap(),
&CertificateDigestProto::from(certificate_2.digest())
);
}
_ => {
panic!("Error was expected");
for result in actual_result {
match result.retrieval_result.unwrap() {
RetrievalResult::Batch(batch) => {
let id: BatchDigest = batch.id.unwrap().into();
let result_batch: Batch = batch.transactions.unwrap().into();

if let Some(expected_batch) = batches_map.get(&id) {
assert_eq!(result_batch, *expected_batch, "Batch payload doesn't match");
} else {
panic!("Unexpected batch!");
}
}
_ => {
panic!("Expected to have received a batch response");
}
}
}
}
Expand All @@ -536,10 +538,23 @@ async fn fixture_certificate(
certificate_store: Store<CertificateDigest, Certificate<Ed25519PublicKey>>,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
batch_store: Store<BatchDigest, SerializedBatchMessage>,
) -> Certificate<Ed25519PublicKey> {
) -> (Certificate<Ed25519PublicKey>, BatchDigest, Batch) {
let batch = fixture_batch_with_transactions(10);
let worker_id = 0;

// We need to make sure that we calculate the batch digest based on the
// serialised message rather than the batch it self.
// See more info https://github.com/MystenLabs/narwhal/issues/188
// TODO: refactor this when the above is changed/fixed.
let message = WorkerMessage::<Ed25519PublicKey>::Batch(batch.clone());
let serialized_batch = bincode::serialize(&message).unwrap();
let batch_digest = BatchDigest::new(crypto::blake2b_256(|hasher| {
hasher.update(&serialized_batch)
}));

let mut payload = BTreeMap::new();
payload.insert(batch_digest, worker_id);

let builder = types::HeaderBuilder::<Ed25519PublicKey>::default();
let header = builder
.author(key.public().clone())
Expand All @@ -550,8 +565,8 @@ async fn fixture_certificate(
.map(|x| x.digest())
.collect(),
)
.with_payload_batch(batch.clone(), worker_id)
.build(|payload| key.sign(payload));
.payload(payload)
.build(|p| key.sign(p));

let certificate = certificate(&header);

Expand All @@ -565,16 +580,14 @@ async fn fixture_certificate(

// Write the batches to payload store
payload_store
.write_all(vec![((batch.clone().digest(), worker_id), 0)])
.write_all(vec![((batch_digest, worker_id), 0)])
.await
.expect("couldn't store batches");

// Add a batch to the workers store
let message = WorkerMessage::<Ed25519PublicKey>::Batch(batch.clone());
let serialized_batch = bincode::serialize(&message).unwrap();
batch_store.write(batch.digest(), serialized_batch).await;
batch_store.write(batch_digest, serialized_batch).await;

certificate
(certificate, batch_digest, batch)
}

fn connect_to_validator_client(parameters: Parameters) -> ValidatorClient<Channel> {
Expand Down
37 changes: 32 additions & 5 deletions types/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ mod narwhal;

use std::{array::TryFromSliceError, ops::Deref};

use crate::{Batch, BatchMessage, BlockError, BlockErrorKind, CertificateDigest};
use bytes::Bytes;
use crate::{
Batch, BatchDigest, BatchMessage, BlockError, BlockErrorKind, CertificateDigest, Transaction,
};
use bytes::{Buf, Bytes};

pub use narwhal::{
collection_retrieval_result::RetrievalResult,
Expand Down Expand Up @@ -45,14 +47,39 @@ impl From<Batch> for BatchProto {
transaction: batch
.0
.into_iter()
.map(|transaction| TransactionProto {
transaction: Bytes::from(transaction),
})
.map(TransactionProto::from)
.collect::<Vec<TransactionProto>>(),
}
}
}

impl From<Transaction> for TransactionProto {
fn from(transaction: Transaction) -> Self {
TransactionProto {
transaction: Bytes::from(transaction),
}
}
}

impl From<BatchProto> for Batch {
fn from(batch: BatchProto) -> Self {
let transactions: Vec<Vec<u8>> = batch
.transaction
.into_iter()
.map(|t| t.transaction.to_vec())
.collect();
Batch(transactions)
}
}

impl From<BatchDigestProto> for BatchDigest {
fn from(batch_digest: BatchDigestProto) -> Self {
let mut result: [u8; crypto::DIGEST_LEN] = [0; crypto::DIGEST_LEN];
batch_digest.digest.as_ref().copy_to_slice(&mut result);
BatchDigest::new(result)
}
}

impl From<BlockError> for CollectionError {
fn from(error: BlockError) -> Self {
CollectionError {
Expand Down

0 comments on commit 0ad2d6b

Please sign in to comment.