From 56cb7cc2d9be317abdc9d0fb8194eb4af769d766 Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Thu, 24 Feb 2022 10:19:17 +0000 Subject: [PATCH] [feature]: introduce component to retrieve a Block (#55) [feature]: introduce a new component, the BlockWaiter. BlockWaiter will basically act as an aggregator and fetch the batches that correspond to a header by providing a certificate id. --- narwhal/primary/src/block_waiter.rs | 472 ++++++++++++++++++ narwhal/primary/src/lib.rs | 2 + narwhal/primary/src/primary.rs | 27 + .../primary/src/tests/block_waiter_tests.rs | 419 ++++++++++++++++ narwhal/primary/src/tests/common.rs | 34 +- narwhal/primary/src/tests/core_tests.rs | 12 +- narwhal/worker/src/synchronizer.rs | 3 + 7 files changed, 960 insertions(+), 9 deletions(-) create mode 100644 narwhal/primary/src/block_waiter.rs create mode 100644 narwhal/primary/src/tests/block_waiter_tests.rs diff --git a/narwhal/primary/src/block_waiter.rs b/narwhal/primary/src/block_waiter.rs new file mode 100644 index 0000000000000..6de2c5ef5d898 --- /dev/null +++ b/narwhal/primary/src/block_waiter.rs @@ -0,0 +1,472 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +use crate::{messages::Header, Certificate, PrimaryWorkerMessage}; +use bytes::Bytes; +use config::Committee; +use crypto::{traits::VerifyingKey, Digest}; +use futures::{ + future::{try_join_all, BoxFuture}, + stream::{futures_unordered::FuturesUnordered, StreamExt as _}, + FutureExt, +}; +use network::SimpleSender; +use std::{ + collections::HashMap, + fmt, + fmt::Formatter, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use store::Store; +use tokio::{ + sync::{ + mpsc::{Receiver, Sender}, + oneshot, + }, + time::timeout, +}; +use tracing::{error, log::debug}; +use Result::*; + +const BATCH_RETRIEVE_TIMEOUT: Duration = Duration::from_secs(1); + +pub type Transaction = Vec; + +#[cfg(test)] +#[path = "tests/block_waiter_tests.rs"] +pub mod block_waiter_tests; + +pub enum BlockCommand { + /// GetBlock dictates retrieving the block data + /// (vector of transactions) by a given block digest. + /// Results are sent to the provided Sender. The id is + /// basically the Certificate digest id. + #[allow(dead_code)] + GetBlock { + id: Digest, + // The channel to send the results to. + sender: Sender>, + }, +} + +#[derive(Clone, Debug)] +pub struct GetBlockResponse { + id: Digest, + #[allow(dead_code)] + batches: Vec, +} + +#[derive(Clone, Default, Debug)] +pub struct BatchMessage { + pub id: Digest, + pub transactions: Vec, +} + +type BlockResult = Result; + +#[derive(Debug, Clone)] +pub struct BlockError { + id: Digest, + error: BlockErrorType, +} + +impl From for BlockResult { + fn from(error: BlockError) -> Self { + BlockResult::Err(error) + } +} + +impl fmt::Display for BlockError { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "block id: {}, error type: {}", self.id, self.error) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub enum BlockErrorType { + BlockNotFound, + BatchTimeout, + BatchError, +} + +impl fmt::Display for BlockErrorType { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "{:?}", self) + } +} + +/// BlockWaiter is responsible for fetching the block data from the +/// downstream worker nodes. A block is basically the aggregate +/// of batches of transactions for a given certificate. +/// +/// # Example +/// +/// Basic setup of the BlockWaiter +/// +/// This example shows the basic setup of the BlockWaiter module. It showcases +/// the necessary components that have to be used (e.x channels, datastore etc) +/// and how a request (command) should be issued to get a block and receive +/// the result of it. +/// +/// ```rust +/// # use store::{reopen, rocks, rocks::DBMap, Store}; +/// # use tokio::sync::mpsc::{channel}; +/// # use crypto::Hash; +/// # use std::env::temp_dir; +/// # use crypto::Digest; +/// # use crypto::ed25519::Ed25519PublicKey; +/// # use config::Committee; +/// # use std::collections::BTreeMap; +/// # use primary::Certificate; +/// # use primary::{BatchMessage, BlockWaiter, BlockCommand}; +/// +/// #[tokio::main(flavor = "current_thread")] +/// # async fn main() { +/// const CERTIFICATES_CF: &str = "certificates"; +/// +/// // Basic setup: datastore, channels & BlockWaiter +/// let rocksdb = rocks::open_cf(temp_dir(), None, &[CERTIFICATES_CF]) +/// .expect("Failed creating database"); +/// +/// let (certificate_map) = reopen!(&rocksdb, +/// CERTIFICATES_CF;>); +/// let certificate_store = Store::new(certificate_map); +/// +/// let (tx_commands, rx_commands) = channel(1); +/// let (tx_batches, rx_batches) = channel(1); +/// let (tx_get_block, mut rx_get_block) = channel(1); +/// +/// let name = Ed25519PublicKey::default(); +/// let committee = Committee{ authorities: BTreeMap::new() }; +/// +/// BlockWaiter::spawn( +/// name, +/// committee, +/// certificate_store.clone(), +/// rx_commands, +/// rx_batches, +/// ); +/// +/// // A dummy certificate +/// let certificate = Certificate::::default(); +/// +/// // Send a command to receive a block +/// tx_commands +/// .send(BlockCommand::GetBlock { +/// id: certificate.digest(), +/// sender: tx_get_block, +/// }) +/// .await; +/// +/// // Dummy - we expect to receive the requested batches via another component +/// // and get fed via the tx_batches channel. +/// tx_batches.send(BatchMessage{ id: Digest::default(), transactions: vec![] }).await; +/// +/// // Wait to receive the block output to the provided sender channel +/// match rx_get_block.recv().await { +/// Some(Ok(result)) => { +/// println!("Successfully received a block response"); +/// } +/// Some(Err(err)) => { +/// println!("Received an error {}", err); +/// } +/// _ => { +/// println!("Nothing received"); +/// } +/// } +/// # } +/// ``` +pub struct BlockWaiter { + /// The public key of this primary. + name: PublicKey, + + /// The committee information. + committee: Committee, + + /// Storage that keeps the Certificates by their digest id. + certificate_store: Store>, + + /// Receive all the requests to get a block + rx_commands: Receiver, + + /// Whenever we have a get_block request, we mark the + /// processing as pending by adding it on the hashmap. Once + /// we have a result back - or timeout - we expect to remove + /// the digest from the map. The key is the block id, and + /// the value is the corresponding certificate. + pending_get_block: HashMap>, + + /// Network driver allowing to send messages. + network: SimpleSender, + + /// The batch receive channel is listening for received + /// messages for batches that have been requested + rx_batch_receiver: Receiver, + + /// Maps batch ids to channels that "listen" for arrived batch messages. + /// On the key we hold the batch id (we assume it's globally unique). + /// On the value we hold a tuple of the channel to communicate the result + /// to and also a timestamp of when the request was sent. + tx_pending_batch: HashMap, u128)>, + + /// A map that holds the channels we should notify with the + /// GetBlock responses. + tx_get_block_map: HashMap>>>, +} + +impl BlockWaiter { + // Create a new waiter and start listening on incoming + // commands to fetch a block + pub fn spawn( + name: PublicKey, + committee: Committee, + certificate_store: Store>, + rx_commands: Receiver, + batch_receiver: Receiver, + ) { + tokio::spawn(async move { + Self { + name, + committee, + certificate_store, + rx_commands, + pending_get_block: HashMap::new(), + network: SimpleSender::new(), + rx_batch_receiver: batch_receiver, + tx_pending_batch: HashMap::new(), + tx_get_block_map: HashMap::new(), + } + .run() + .await; + }); + } + + async fn run(&mut self) { + let mut waiting = FuturesUnordered::new(); + + loop { + tokio::select! { + Some(command) = self.rx_commands.recv() => { + match self.handle_command(command).await { + Some(fut) => waiting.push(fut), + None => debug!("no processing for command, will not wait for any results") + } + }, + // When we receive a BatchMessage (from a worker), this is + // this is captured by the rx_batch_receiver channel and + // handled appropriately. + Some(batch_message) = self.rx_batch_receiver.recv() => { + self.handle_batch_message(batch_message).await; + }, + // When we send a request to fetch a block's batches + // we wait on the results to come back before we proceed. + // By iterating the waiting vector it allow us to proceed + // whenever waiting has been finished for a request. + Some(result) = waiting.next() => { + self.handle_batch_waiting_result(result).await; + }, + } + } + } + + // handles received commands and returns back a future if needs to + // wait for further results. Otherwise, an empty option is returned + // if no further waiting on processing is needed. + async fn handle_command<'a>( + &mut self, + command: BlockCommand, + ) -> Option>> { + match command { + BlockCommand::GetBlock { id, sender } => { + match self.certificate_store.read(id.clone()).await { + Ok(Some(certificate)) => { + // If similar request is already under processing, don't start a new one + if self.pending_get_block.contains_key(&id.clone()) { + self.tx_get_block_map + .entry(id.clone()) + .or_insert_with(Vec::new) + .push(sender); + + debug!("Block with id {} already has a pending request", id.clone()); + return None; + } + + debug!("No pending get block for {}", id.clone()); + + // Add on a vector the receivers + let batch_receivers = + self.send_batch_requests(certificate.header.clone()).await; + + let fut = Self::wait_for_all_batches(id.clone(), batch_receivers); + + // Ensure that we mark this block retrieval + // as pending so no other can initiate the process + self.pending_get_block + .insert(id.clone(), certificate.clone()); + + self.tx_get_block_map + .entry(id.clone()) + .or_insert_with(Vec::new) + .push(sender); + + return Some(fut.boxed()); + } + _ => { + sender + .send(Err(BlockError { + id: id.clone(), + error: BlockErrorType::BlockNotFound, + })) + .await + .expect("Couldn't send BlockNotFound error for a GetBlock request"); + } + } + } + } + + None + } + + async fn handle_batch_waiting_result(&mut self, result: BlockResult) { + let block_id = result.clone().map_or_else(|e| e.id, |r| r.id); + + match self.tx_get_block_map.remove(&block_id) { + Some(senders) => { + for sender in senders { + if sender.send(result.clone()).await.is_err() { + error!( + "Couldn't forward results for block {} to sender", + block_id.clone() + ) + } + } + } + None => { + error!( + "We should expect to find channels to respond for {}", + block_id.clone() + ); + } + } + + // unlock the pending request & batches. + match self.pending_get_block.remove(&block_id) { + Some(certificate) => { + for (digest, _) in certificate.header.payload { + // unlock the pending request - mostly about the + // timed out requests. + self.tx_pending_batch.remove(&digest); + } + } + None => { + // TODO: handle panic here + error!( + "Expected to find certificate with id {} for pending processing", + &block_id + ); + } + } + } + + // Sends requests to fetch the batches from the corresponding workers. + // It returns a vector of tuples of the batch digest and a Receiver + // channel of the fetched batch. + async fn send_batch_requests( + &mut self, + header: Header, + ) -> Vec<(Digest, oneshot::Receiver)> { + // Get the "now" time + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Failed to measure time") + .as_millis(); + + // Add the receivers to a vector + let mut batch_receivers = Vec::new(); + + // otherwise we send requests to all workers to send us their batches + for (digest, worker_id) in header.payload { + debug!( + "Sending batch {} request to worker id {}", + digest.clone(), + worker_id + ); + + let worker_address = self + .committee + .worker(&self.name, &worker_id) + .expect("Worker id not found") + .primary_to_worker; + + let message = PrimaryWorkerMessage::::RequestBatch(digest.clone()); + let bytes = bincode::serialize(&message).expect("Failed to serialize batch request"); + + self.network.send(worker_address, Bytes::from(bytes)).await; + + // mark it as pending batch. Since we assume that batches are unique + // per block, a clean up on a block request will also clean + // up all the pending batch requests. + let (tx, rx) = oneshot::channel(); + self.tx_pending_batch.insert(digest.clone(), (tx, now)); + + // add the receiver to a vector to poll later + batch_receivers.push((digest.clone(), rx)); + } + + batch_receivers + } + + async fn handle_batch_message(&mut self, message: BatchMessage) { + match self.tx_pending_batch.remove(&message.id) { + Some((sender, _)) => { + debug!("Sending batch message with id {}", &message.id); + sender + .send(message.clone()) + .expect("Couldn't send BatchMessage for pending batch"); + } + None => { + debug!("Couldn't find pending batch with id {}", message.id); + } + } + } + + /// A helper method to "wait" for all the batch responses to be received. + /// It gets the fetched batches and creates a GetBlockResponse ready + /// to be sent back to the request. + async fn wait_for_all_batches( + block_id: Digest, + batches_receivers: Vec<(Digest, oneshot::Receiver)>, + ) -> BlockResult { + let waiting: Vec<_> = batches_receivers + .into_iter() + .map(|p| Self::wait_for_batch(block_id.clone(), p.1)) + .collect(); + + let result = try_join_all(waiting).await?; + Ok(GetBlockResponse { + id: block_id, + batches: result, + }) + } + + /// Waits for a batch to be received. If batch is not received in time, + /// then a timeout is yielded and an error is returned. + async fn wait_for_batch( + block_id: Digest, + batch_receiver: oneshot::Receiver, + ) -> BlockResult { + // ensure that we won't wait forever for a batch result to come + return match timeout(BATCH_RETRIEVE_TIMEOUT, batch_receiver).await { + Ok(Ok(result)) => BlockResult::Ok(result), + Ok(Err(_)) => BlockError { + id: block_id, + error: BlockErrorType::BatchError, + } + .into(), + Err(_) => BlockError { + id: block_id, + error: BlockErrorType::BatchTimeout, + } + .into(), + }; + } +} diff --git a/narwhal/primary/src/lib.rs b/narwhal/primary/src/lib.rs index 3e5b5acdcf2e4..131996f2ce8c2 100644 --- a/narwhal/primary/src/lib.rs +++ b/narwhal/primary/src/lib.rs @@ -11,6 +11,7 @@ #[macro_use] mod error; mod aggregators; +mod block_waiter; mod certificate_waiter; mod core; mod garbage_collector; @@ -27,6 +28,7 @@ mod synchronizer; mod common; pub use crate::{ + block_waiter::{BatchMessage, BlockCommand, BlockWaiter}, messages::{Certificate, Header}, primary::{PayloadToken, Primary, PrimaryWorkerMessage, Round, WorkerPrimaryMessage}, }; diff --git a/narwhal/primary/src/primary.rs b/narwhal/primary/src/primary.rs index 79b6d97a33618..fe8f8bfb8afab 100644 --- a/narwhal/primary/src/primary.rs +++ b/narwhal/primary/src/primary.rs @@ -2,6 +2,7 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use crate::{ + block_waiter::{BatchMessage, BlockWaiter, Transaction}, certificate_waiter::CertificateWaiter, core::Core, error::DagError, @@ -54,6 +55,8 @@ pub enum PrimaryWorkerMessage { Synchronize(Vec, /* target */ PublicKey), /// The primary indicates a round update. Cleanup(Round), + /// The primary requests a batch from the worker + RequestBatch(Digest), } /// The messages sent by the workers to their primary. @@ -63,6 +66,8 @@ pub enum WorkerPrimaryMessage { OurBatch(Digest, WorkerId), /// The worker indicates it received a batch's digest from another authority. OthersBatch(Digest, WorkerId), + /// The worker sends a requested batch + RequestedBatch(Digest, Vec), } // A type alias marking the "payload" tokens sent by workers to their primary as batch acknowledgements @@ -94,6 +99,8 @@ impl Primary { let (tx_certificates_loopback, rx_certificates_loopback) = channel(CHANNEL_CAPACITY); let (tx_primary_messages, rx_primary_messages) = channel(CHANNEL_CAPACITY); let (tx_cert_requests, rx_cert_requests) = channel(CHANNEL_CAPACITY); + let (_tx_batch_commands, rx_batch_commands) = channel(CHANNEL_CAPACITY); + let (tx_batches, rx_batches) = channel(CHANNEL_CAPACITY); // Write the parameters to the logs. parameters.tracing(); @@ -134,6 +141,7 @@ impl Primary { WorkerReceiverHandler { tx_our_digests, tx_others_digests, + tx_batches, }, ); info!( @@ -182,6 +190,16 @@ impl Primary { /* rx_workers */ rx_others_digests, ); + // Retrieves a block's data by contacting the worker nodes that contain the + // underlying batches and their transactions. + BlockWaiter::spawn( + name.clone(), + committee.clone(), + certificate_store.clone(), + rx_batch_commands, + rx_batches, + ); + // Whenever the `Synchronizer` does not manage to validate a header due to missing parent certificates of // batch digests, it commands the `HeaderWaiter` to synchronize with other nodes, wait for their reply, and // re-schedule execution of the header once we have all missing data. @@ -272,6 +290,7 @@ impl MessageHandler for PrimaryReceiverHandler, tx_others_digests: Sender<(Digest, WorkerId)>, + tx_batches: Sender, } #[async_trait] @@ -293,6 +312,14 @@ impl MessageHandler for WorkerReceiverHandler { .send((digest, worker_id)) .await .expect("Failed to send workers' digests"), + WorkerPrimaryMessage::RequestedBatch(digest, transactions) => self + .tx_batches + .send(BatchMessage { + id: digest, + transactions, + }) + .await + .expect("Failed to send batches"), } Ok(()) } diff --git a/narwhal/primary/src/tests/block_waiter_tests.rs b/narwhal/primary/src/tests/block_waiter_tests.rs new file mode 100644 index 0000000000000..87eb4ddec4e36 --- /dev/null +++ b/narwhal/primary/src/tests/block_waiter_tests.rs @@ -0,0 +1,419 @@ +// Copyright (c) 2022, Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 +use crate::{ + block_waiter::{ + BatchMessage, BlockCommand, BlockErrorType, BlockResult, BlockWaiter, GetBlockResponse, + }, + common, + common::{certificate, committee_with_base_port, create_db_stores, keys}, + Certificate, PrimaryWorkerMessage, +}; +use bincode::deserialize; +use config::Committee; +use crypto::{ + ed25519::Ed25519PublicKey, + traits::{KeyPair, VerifyingKey}, + Digest, Hash, +}; +use ed25519_dalek::{Digest as _, Sha512}; +use futures::StreamExt; +use network::SimpleSender; +use std::{collections::HashMap, net::SocketAddr}; +use tokio::{ + net::TcpListener, + sync::mpsc::{channel, Sender}, + task::JoinHandle, + time::{sleep, timeout, Duration}, +}; +use tokio_util::codec::{Framed, LengthDelimitedCodec}; + +#[tokio::test] +async fn test_successfully_retrieve_block() { + // GIVEN + let (_, certificate_store, _) = create_db_stores(); + + // AND the necessary keys + let (name, committee) = resolve_name_and_committee(); + + // AND store certificate + let header = common::fixture_header_with_payload(2); + let certificate = certificate(&header); + certificate_store + .write(certificate.digest(), certificate.clone()) + .await; + + let block_id = certificate.digest(); + + // AND spawn a new blocks waiter + let (tx_commands, rx_commands) = channel(1); + let (tx_get_block, mut rx_get_block) = channel(1); + let (tx_batch_messages, rx_batch_messages) = channel(10); + + BlockWaiter::spawn( + name.clone(), + committee.clone(), + certificate_store.clone(), + rx_commands, + rx_batch_messages, + ); + + // AND "mock" the batch responses + let mut expected_batch_messages = HashMap::new(); + for (batch_id, _) in header.payload { + expected_batch_messages.insert( + batch_id.clone(), + BatchMessage { + id: batch_id, + transactions: vec![vec![10u8, 5u8, 2u8], vec![8u8, 2u8, 3u8]], + }, + ); + } + + // AND spin up a worker node + let worker_id = 0; + let worker_address = committee + .worker(&name, &worker_id) + .unwrap() + .primary_to_worker; + + let handle = worker_listener::( + worker_address, + expected_batch_messages.clone(), + tx_batch_messages, + ); + + // WHEN we send a request to get a block + tx_commands + .send(BlockCommand::GetBlock { + id: block_id.clone(), + sender: tx_get_block, + }) + .await; + + // Wait for the worker server to complete before continue. + // Then we'll be confident that the expected batch responses + // have been sent (via the tx_batch_messages channel though) + match timeout(Duration::from_millis(4_000), handle).await { + Err(_) => panic!("worker hasn't received expected batch requests"), + _ => {} + } + + // THEN we should expect to get back the result + let timer = sleep(Duration::from_millis(5_000)); + tokio::pin!(timer); + + tokio::select! { + Some(result) = rx_get_block.recv() => { + assert!(result.is_ok(), "Expected to receive a successful result, instead got error: {}", result.err().unwrap()); + + let block = result.unwrap(); + + assert_eq!(block.batches.len(), expected_batch_messages.len()); + assert_eq!(block.id, block_id.clone()); + + for (_, batch) in expected_batch_messages { + assert_eq!(batch.transactions.len(), 2); + } + }, + () = &mut timer => { + panic!("Timeout, no result has been received in time") + } + } +} + +#[tokio::test] +async fn test_one_pending_request_for_block_at_time() { + // GIVEN + let (_, certificate_store, _) = create_db_stores(); + + // AND the necessary keys + let (name, committee) = resolve_name_and_committee(); + + // AND store certificate + let header = common::fixture_header_with_payload(2); + let certificate = certificate(&header); + certificate_store + .write(certificate.digest(), certificate.clone()) + .await; + + let block_id = certificate.digest(); + + // AND spawn a new blocks waiter + let (_, rx_commands) = channel(1); + let (_, rx_batch_messages) = channel(1); + + let mut waiter = BlockWaiter { + name: name.clone(), + committee: committee.clone(), + certificate_store: certificate_store.clone(), + rx_commands, + pending_get_block: HashMap::new(), + network: SimpleSender::new(), + rx_batch_receiver: rx_batch_messages, + tx_pending_batch: HashMap::new(), + tx_get_block_map: HashMap::new(), + }; + + let get_mock_sender = || { + let (tx, _) = channel(1); + return tx; + }; + + // WHEN we send GetBlock command + let result_some = waiter + .handle_command(BlockCommand::GetBlock { + id: block_id.clone(), + sender: get_mock_sender(), + }) + .await; + + // AND we send more GetBlock commands + let mut results_none = Vec::new(); + for _ in 0..3 { + results_none.push( + waiter + .handle_command(BlockCommand::GetBlock { + id: block_id.clone(), + sender: get_mock_sender(), + }) + .await, + ); + } + + // THEN + assert!( + result_some.is_some(), + "Expected to have a future to do some further work" + ); + + for result in results_none { + assert!( + result.is_none(), + "Expected to not get a future for further work" + ); + } +} + +#[tokio::test] +async fn test_unlocking_pending_get_block_request_after_response() { + // GIVEN + let (_, certificate_store, _) = create_db_stores(); + + // AND the necessary keys + let (name, committee) = resolve_name_and_committee(); + + // AND store certificate + let header = common::fixture_header_with_payload(2); + let certificate = certificate(&header); + certificate_store + .write(certificate.digest(), certificate.clone()) + .await; + + let block_id = certificate.digest(); + + // AND spawn a new blocks waiter + let (_, rx_commands) = channel(1); + let (_, rx_batch_messages) = channel(1); + + let mut waiter = BlockWaiter { + name: name.clone(), + committee: committee.clone(), + certificate_store: certificate_store.clone(), + rx_commands, + pending_get_block: HashMap::new(), + network: SimpleSender::new(), + rx_batch_receiver: rx_batch_messages, + tx_pending_batch: HashMap::new(), + tx_get_block_map: HashMap::new(), + }; + + let get_mock_sender = || { + let (tx, _) = channel(1); + return tx; + }; + + // AND we send GetBlock commands + for _ in 0..3 { + waiter + .handle_command(BlockCommand::GetBlock { + id: block_id.clone(), + sender: get_mock_sender(), + }) + .await; + } + + // WHEN + let result = BlockResult::Ok(GetBlockResponse { + id: block_id.clone(), + batches: vec![], + }); + + waiter.handle_batch_waiting_result(result).await; + + // THEN + assert_eq!(waiter.pending_get_block.contains_key(&block_id), false); + assert_eq!(waiter.tx_get_block_map.contains_key(&block_id), false); +} + +#[tokio::test] +async fn test_batch_timeout() { + // GIVEN + let (_, certificate_store, _) = create_db_stores(); + + // AND the necessary keys + let (name, committee) = resolve_name_and_committee(); + + // AND store certificate + let header = common::fixture_header_with_payload(2); + let certificate = certificate(&header); + certificate_store + .write(certificate.digest(), certificate.clone()) + .await; + + let block_id = certificate.digest(); + + // AND spawn a new blocks waiter + let (tx_commands, rx_commands) = channel(1); + let (tx_get_block, mut rx_get_block) = channel(1); + let (_, rx_batch_messages) = channel(10); + + BlockWaiter::spawn( + name.clone(), + committee.clone(), + certificate_store.clone(), + rx_commands, + rx_batch_messages, + ); + + // WHEN we send a request to get a block + tx_commands + .send(BlockCommand::GetBlock { + id: block_id.clone(), + sender: tx_get_block, + }) + .await; + + // THEN we should expect to get back the result + let timer = sleep(Duration::from_millis(5_000)); + tokio::pin!(timer); + + tokio::select! { + Some(result) = rx_get_block.recv() => { + assert!(result.is_err(), "Expected to receive an error result"); + + let block_error = result.err().unwrap(); + + assert_eq!(block_error.id, block_id.clone()); + assert_eq!(block_error.error, BlockErrorType::BatchTimeout); + }, + () = &mut timer => { + panic!("Timeout, no result has been received in time") + } + } +} + +#[tokio::test] +async fn test_return_error_when_certificate_is_missing() { + // GIVEN + let (_, certificate_store, _) = create_db_stores(); + let (name, committee) = resolve_name_and_committee(); + + // AND create a certificate but don't store it + let certificate = Certificate::::default(); + let block_id = certificate.digest(); + + // AND spawn a new blocks waiter + let (tx_commands, rx_commands) = channel(1); + let (tx_get_block, mut rx_get_block) = channel(1); + let (_, rx_batch_messages) = channel(10); + + BlockWaiter::spawn( + name.clone(), + committee.clone(), + certificate_store.clone(), + rx_commands, + rx_batch_messages, + ); + + // WHEN we send a request to get a block + tx_commands + .send(BlockCommand::GetBlock { + id: block_id.clone(), + sender: tx_get_block, + }) + .await; + + // THEN we should expect to get back the error + let timer = sleep(Duration::from_millis(5_000)); + tokio::pin!(timer); + + tokio::select! { + Some(result) = rx_get_block.recv() => { + assert!(result.is_err(), "Expected to receive an error result"); + + let block_error = result.err().unwrap(); + + assert_eq!(block_error.id, block_id.clone()); + assert_eq!(block_error.error, BlockErrorType::BlockNotFound); + }, + () = &mut timer => { + panic!("Timeout, no result has been received in time") + } + } +} + +// helper method to get a name and a committee +fn resolve_name_and_committee() -> (Ed25519PublicKey, Committee) { + let mut keys = keys(); + let _ = keys.pop().unwrap(); // Skip the header' author. + let kp = keys.pop().unwrap(); + let name = kp.public().clone(); + let committee = committee_with_base_port(13_000); + + (name, committee) +} + +// worker_listener listens to TCP requests. The worker responds to the +// RequestBatch requests for the provided expected_batches. +pub fn worker_listener( + address: SocketAddr, + expected_batches: HashMap, + tx_batch_messages: Sender, +) -> JoinHandle<()> { + tokio::spawn(async move { + let listener = TcpListener::bind(&address).await.unwrap(); + let (socket, _) = listener.accept().await.unwrap(); + let transport = Framed::new(socket, LengthDelimitedCodec::new()); + + println!("Start listening server"); + + let (_, mut reader) = transport.split(); + let mut counter = 0; + loop { + match reader.next().await { + Some(Ok(received)) => { + let message = received.freeze(); + match deserialize(&message) { + Ok(PrimaryWorkerMessage::::RequestBatch(id)) => { + if expected_batches.contains_key(&id) { + tx_batch_messages + .send(expected_batches.get(&id).cloned().unwrap()) + .await; + + counter += 1; + + // Once all the expected requests have been received, break the loop + // of the server. + if counter == expected_batches.len() { + break; + } + } + } + _ => panic!("Unexpected request received"), + }; + } + _ => panic!("Failed to receive network message"), + } + } + }) +} diff --git a/narwhal/primary/src/tests/common.rs b/narwhal/primary/src/tests/common.rs index 0e1357a8c00df..77d62fc18e12d 100644 --- a/narwhal/primary/src/tests/common.rs +++ b/narwhal/primary/src/tests/common.rs @@ -12,9 +12,10 @@ use crypto::{ traits::{KeyPair, Signer, VerifyingKey}, Digest, Hash as _, }; +use ed25519_dalek::{Digest as _, Sha512}; use futures::{sink::SinkExt as _, stream::StreamExt as _}; use rand::{rngs::StdRng, SeedableRng as _}; -use std::net::SocketAddr; +use std::{collections::BTreeMap, net::SocketAddr}; use store::{reopen, rocks, rocks::DBMap, Store}; use tokio::{net::TcpListener, task::JoinHandle}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; @@ -152,6 +153,37 @@ pub fn headers() -> Vec> { .collect() } +pub fn fixture_header_with_payload(number_of_batches: u8) -> Header { + let kp = keys().pop().unwrap(); + let mut payload: BTreeMap = BTreeMap::new(); + + for i in 0..number_of_batches { + let batch_digest = Digest::new( + Sha512::digest(vec![10u8, 5u8, 8u8, 20u8, i].as_slice()).as_slice()[..32] + .try_into() + .unwrap(), + ); + payload.insert(batch_digest, 0); + } + + let header = Header { + author: kp.public().clone(), + round: 1, + parents: Certificate::genesis(&committee()) + .iter() + .map(|x| x.digest()) + .collect(), + payload, + ..Header::default() + }; + + Header { + id: header.digest(), + signature: kp.sign(header.digest().as_ref()), + ..header + } +} + // Fixture pub fn votes(header: &Header) -> Vec> { keys() diff --git a/narwhal/primary/src/tests/core_tests.rs b/narwhal/primary/src/tests/core_tests.rs index b010c6524cd6f..97da706aa1c88 100644 --- a/narwhal/primary/src/tests/core_tests.rs +++ b/narwhal/primary/src/tests/core_tests.rs @@ -2,15 +2,11 @@ // Copyright (c) 2022, Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 use super::*; -use crate::{ - common::{ - certificate, committee, committee_with_base_port, create_db_stores, header, headers, keys, - listener, temp_dir, votes, - }, - primary::PayloadToken, +use crate::common::{ + certificate, committee, committee_with_base_port, create_db_stores, header, headers, keys, + listener, votes, }; -use config::WorkerId; -use crypto::{ed25519::Ed25519PublicKey, traits::KeyPair}; +use crypto::traits::KeyPair; use futures::future::try_join_all; use tokio::sync::mpsc::channel; diff --git a/narwhal/worker/src/synchronizer.rs b/narwhal/worker/src/synchronizer.rs index cd977458eab10..11ae8443b3152 100644 --- a/narwhal/worker/src/synchronizer.rs +++ b/narwhal/worker/src/synchronizer.rs @@ -180,6 +180,9 @@ impl Synchronizer { } } self.pending.retain(|_, (r, _, _)| r > &mut gc_round); + }, + PrimaryWorkerMessage::RequestBatch(_) => { + } },