Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
add integration test for the get_collections when missing certificates (
Browse files Browse the repository at this point in the history
#228)

feat: introduces an integration test for get_collections

Introducing an end to end test for the get_collections gRPC endpoint when we are missing some certificates.
  • Loading branch information
akichidis authored May 13, 2022
1 parent e01f229 commit 064c55b
Showing 1 changed file with 224 additions and 9 deletions.
233 changes: 224 additions & 9 deletions primary/tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use config::Parameters;
use config::{Parameters, WorkerId};
use crypto::{
ed25519::Ed25519PublicKey,
ed25519::{Ed25519KeyPair, Ed25519PublicKey},
traits::{KeyPair, Signer},
Hash,
};
use node::NodeStorage;
use primary::{Primary, CHANNEL_CAPACITY};
use primary::{PayloadToken, Primary, CHANNEL_CAPACITY};
use std::time::Duration;
use store::Store;
use test_utils::{
certificate, committee, fixture_batch_with_transactions, fixture_header_builder, keys, temp_dir,
};
use tokio::sync::mpsc::channel;
use tonic::transport::Channel;
use types::{
CertificateDigest, CertificateDigestProto, CollectionRetrievalResult, Empty,
GetCollectionsRequest, RemoveCollectionsRequest, ValidatorClient,
BatchDigest, Certificate, CertificateDigest, CertificateDigestProto, CollectionErrorType,
CollectionRetrievalResult, Empty, GetCollectionsRequest, Header, HeaderDigest,
RemoveCollectionsRequest, RetrievalResult, ValidatorClient,
};
use worker::{Worker, WorkerMessage};
use worker::{SerializedBatchMessage, Worker, WorkerMessage};

#[tokio::test]
async fn test_get_collections() {
Expand Down Expand Up @@ -116,7 +118,7 @@ async fn test_get_collections() {
tokio::time::sleep(Duration::from_secs(1)).await;

// Test gRPC server with client call
let mut client = connect_to_validator_client(parameters.clone()).await;
let mut client = connect_to_validator_client(parameters.clone());

// Test get no collections
let request = tonic::Request::new(GetCollectionsRequest {
Expand Down Expand Up @@ -265,7 +267,7 @@ async fn test_remove_collections() {
tokio::time::sleep(Duration::from_secs(1)).await;

// Test gRPC server with client call
let mut client = connect_to_validator_client(parameters.clone()).await;
let mut client = connect_to_validator_client(parameters.clone());

// Test remove 1 collection without spawning worker. Should result in a timeout error
// when trying to remove batches.
Expand Down Expand Up @@ -362,7 +364,220 @@ async fn test_remove_collections() {
assert_eq!(Empty {}, actual_result);
}

async fn connect_to_validator_client(parameters: Parameters) -> ValidatorClient<Channel> {
/// Here we test the ability on our code to synchronize missing certificates
/// by requesting them from other peers. On this example we emulate 2 authorities
/// (2 separate primary nodes) where we store a certificate on each one. Then we
/// are requesting via the get_collections call to the primary 1 to fetch the
/// collections for both certificates. Since primary 1 knows only about the
/// certificate 1 we expect to sync with primary 2 to fetch the unknown
/// certificate 2 after it has been processed for causal completion & validation.
///
/// TODO: the code will not successfully fetch the batches for certificate 2
/// since we don't block on waiting to sync the batches. That will be included
/// in a follow up PR (see https://github.com/MystenLabs/narwhal/issues/223).
/// Then this test will be refactored to test only for successful responses.
#[tokio::test]
async fn test_get_collections_with_missing_certificates() {
// GIVEN keys for two primary nodes
let mut k = keys();

let keypair_1 = k.pop().unwrap();
let name_1 = keypair_1.public().clone();

let keypair_2 = k.pop().unwrap();
let name_2 = keypair_2.public().clone();

let committee = committee();
let parameters = Parameters {
batch_size: 200, // Two transactions.
..Parameters::default()
};

// AND create separate data stores for the 2 primaries
let store_primary_1 = NodeStorage::reopen(temp_dir());
let store_primary_2 = NodeStorage::reopen(temp_dir());

let worker_id = 0;

// AND generate and store the certificates
let signer_1 = keys().remove(0);

// The certificate_1 will be stored in primary 1
let certificate_1 = fixture_certificate(
signer_1,
store_primary_1.header_store.clone(),
store_primary_1.certificate_store.clone(),
store_primary_1.payload_store.clone(),
store_primary_1.batch_store.clone(),
)
.await;

// pop first to skip
let signer_2 = keys().remove(1);

// The certificate_2 will be stored in primary 2
let certificate_2 = fixture_certificate(
signer_2,
store_primary_2.header_store.clone(),
store_primary_2.certificate_store.clone(),
store_primary_2.payload_store.clone(),
store_primary_2.batch_store.clone(),
)
.await;

let block_ids = vec![certificate_1.digest(), certificate_2.digest()];

// Spawn the primary 1 (which will be the one that we'll interact with)
let (tx_new_certificates_1, _rx_new_certificates_1) = channel(CHANNEL_CAPACITY);
let (_tx_feedback_1, rx_feedback_1) = channel(CHANNEL_CAPACITY);

Primary::spawn(
name_1.clone(),
keypair_1,
committee.clone(),
parameters.clone(),
store_primary_1.header_store,
store_primary_1.certificate_store,
store_primary_1.payload_store,
/* tx_consensus */ tx_new_certificates_1,
/* rx_consensus */ rx_feedback_1,
/* external_consensus */ false,
);

// Spawn a `Worker` instance for primary 1.
Worker::spawn(
name_1,
worker_id,
committee.clone(),
parameters.clone(),
store_primary_1.batch_store,
);

// Spawn the primary 2 - a peer to fetch missing certificates from
let (tx_new_certificates_2, _rx_new_certificates_2) = channel(CHANNEL_CAPACITY);
let (_tx_feedback_2, rx_feedback_2) = channel(CHANNEL_CAPACITY);

Primary::spawn(
name_2.clone(),
keypair_2,
committee.clone(),
parameters.clone(),
store_primary_2.header_store,
store_primary_2.certificate_store,
store_primary_2.payload_store,
/* tx_consensus */ tx_new_certificates_2,
/* rx_consensus */ rx_feedback_2,
/* external_consensus */ true,
);

// Spawn a `Worker` instance for primary 2.
Worker::spawn(
name_2,
worker_id,
committee.clone(),
parameters.clone(),
store_primary_2.batch_store,
);

// Wait for tasks to start
tokio::time::sleep(Duration::from_secs(1)).await;

// Test gRPC server with client call
let mut client = connect_to_validator_client(parameters.clone());

let collection_ids = block_ids;

// Test get collections
let request = tonic::Request::new(GetCollectionsRequest {
collection_ids: collection_ids.iter().map(|&c_id| c_id.into()).collect(),
});
let response = client.get_collections(request).await.unwrap();
let actual_result = response.into_inner().result;

assert_eq!(2, actual_result.len());

// We expect to get successfully the batches only for the one collection
assert_eq!(
1,
actual_result
.iter()
.filter(|&r| matches!(r.retrieval_result, Some(types::RetrievalResult::Batch(_))))
.count()
);

// The second certificate, which was missing from the primary 1, we expect
// to have fetched it but not be able to fetch its payload. So the expected
// error would be CollectionError.
// TODO: once we plugin the block_synchronizer to sync missing payload as well
// then this error shouldn't be thrown anymore
// (see https://github.com/MystenLabs/narwhal/issues/223)
let error: &CollectionRetrievalResult = actual_result
.iter()
.find(|&r| matches!(r.retrieval_result, Some(types::RetrievalResult::Error(_))))
.unwrap();

match error.retrieval_result.as_ref().unwrap() {
RetrievalResult::Error(err) => {
assert_eq!(err.error, CollectionErrorType::CollectionError as i32);
assert_eq!(
err.id.as_ref().unwrap(),
&CertificateDigestProto::from(certificate_2.digest())
);
}
_ => {
panic!("Error was expected");
}
}
}

async fn fixture_certificate(
key: Ed25519KeyPair,
header_store: Store<HeaderDigest, Header<Ed25519PublicKey>>,
certificate_store: Store<CertificateDigest, Certificate<Ed25519PublicKey>>,
payload_store: Store<(BatchDigest, WorkerId), PayloadToken>,
batch_store: Store<BatchDigest, SerializedBatchMessage>,
) -> Certificate<Ed25519PublicKey> {
let batch = fixture_batch_with_transactions(10);
let worker_id = 0;

let builder = types::HeaderBuilder::<Ed25519PublicKey>::default();
let header = builder
.author(key.public().clone())
.round(1)
.parents(
Certificate::genesis(&committee())
.iter()
.map(|x| x.digest())
.collect(),
)
.with_payload_batch(batch.clone(), worker_id)
.build(|payload| key.sign(payload));

let certificate = certificate(&header);

// Write the certificate
certificate_store
.write(certificate.digest(), certificate.clone())
.await;

// Write the header
header_store.write(header.clone().id, header.clone()).await;

// Write the batches to payload store
payload_store
.write_all(vec![((batch.clone().digest(), worker_id), 0)])
.await
.expect("couldn't store batches");

// Add a batch to the workers store
let message = WorkerMessage::<Ed25519PublicKey>::Batch(batch.clone());
let serialized_batch = bincode::serialize(&message).unwrap();
batch_store.write(batch.digest(), serialized_batch).await;

certificate
}

fn connect_to_validator_client(parameters: Parameters) -> ValidatorClient<Channel> {
let config = mysten_network::config::Config::new();
let channel = config
.connect_lazy(&parameters.consensus_api_grpc.socket_addr)
Expand Down

1 comment on commit 064c55b

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bench results


SUMMARY:

  • CONFIG:
    Faults: 0 node(s)
    Committee size: 4 node(s)
    Worker(s) per node: 1 worker(s)
    Collocate primary and workers: True
    Input rate: 50,000 tx/s
    Transaction size: 512 B
    Execution time: 19 s

Header size: 1,000 B
Max header delay: 200 ms
GC depth: 50 round(s)
Sync retry delay: 10,000 ms
Sync retry nodes: 3 node(s)
batch size: 500,000 B
Max batch delay: 200 ms
Max concurrent requests: 2

  • RESULTS:
    Consensus TPS: 22,495 tx/s
    Consensus BPS: 11,517,351 B/s
    Consensus latency: 996 ms

End-to-end TPS: 22,337 tx/s
End-to-end BPS: 11,436,339 B/s
End-to-end latency: 1,471 ms

PR to merge 064c55b ->

Please sign in to comment.