Skip to content

Commit

Permalink
[feature]: wire in the block-remover component (#105)
Browse files Browse the repository at this point in the history
This commit wires in the block-remover component to the primary node setup.
  • Loading branch information
akichidis authored Mar 31, 2022
1 parent 4c35619 commit 3cfba25
Showing 1 changed file with 36 additions and 11 deletions.
47 changes: 36 additions & 11 deletions narwhal/primary/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::{
block_remover::DeleteBatchResult,
block_waiter::{BatchMessage, BatchMessageError, BatchResult, BlockWaiter},
certificate_waiter::CertificateWaiter,
core::Core,
Expand All @@ -13,7 +14,7 @@ use crate::{
payload_receiver::PayloadReceiver,
proposer::Proposer,
synchronizer::Synchronizer,
Batch,
Batch, BlockRemover, DeleteBatchMessage,
};
use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -23,7 +24,7 @@ use crypto::{
SignatureService,
};
use futures::sink::SinkExt as _;
use network::{MessageHandler, Receiver as NetworkReceiver, Writer};
use network::{MessageHandler, Receiver as NetworkReceiver, SimpleSender, Writer};
use serde::{Deserialize, Serialize};
use std::{
error::Error,
Expand Down Expand Up @@ -117,8 +118,14 @@ impl Primary {
let (tx_certificates_loopback, rx_certificates_loopback) = channel(CHANNEL_CAPACITY);
let (tx_primary_messages, rx_primary_messages) = channel(CHANNEL_CAPACITY);
let (tx_cert_requests, rx_cert_requests) = channel(CHANNEL_CAPACITY);
let (_tx_batch_commands, rx_batch_commands) = channel(CHANNEL_CAPACITY);
// _tx_get_block_commands should be used by the handler that will issue the requests
// to fetch the collections from Narwhal (e.x the get_collections endpoint).
let (_tx_get_block_commands, rx_get_block_commands) = channel(CHANNEL_CAPACITY);
let (tx_batches, rx_batches) = channel(CHANNEL_CAPACITY);
// _tx_block_removal_commands should be used by the handler that will issue the requests
// to remove collections from Narwhal (e.x the remove_collections endpoint).
let (_tx_block_removal_commands, rx_block_removal_commands) = channel(CHANNEL_CAPACITY);
let (tx_batch_removal, rx_batch_removal) = channel(CHANNEL_CAPACITY);

// Write the parameters to the logs.
parameters.tracing();
Expand Down Expand Up @@ -160,6 +167,7 @@ impl Primary {
tx_our_digests,
tx_others_digests,
tx_batches,
tx_batch_removal,
},
);
info!(
Expand All @@ -185,7 +193,7 @@ impl Primary {
Core::spawn(
name.clone(),
committee.clone(),
header_store,
header_store.clone(),
certificate_store.clone(),
synchronizer,
signature_service.clone(),
Expand Down Expand Up @@ -214,10 +222,22 @@ impl Primary {
name.clone(),
committee.clone(),
certificate_store.clone(),
rx_batch_commands,
rx_get_block_commands,
rx_batches,
);

// Orchestrates the removal of blocks across the primary and worker nodes.
BlockRemover::spawn(
name.clone(),
committee.clone(),
certificate_store.clone(),
header_store,
payload_store.clone(),
SimpleSender::new(),
rx_block_removal_commands,
rx_batch_removal,
);

// Whenever the `Synchronizer` does not manage to validate a header due to missing parent certificates of
// batch digests, it commands the `HeaderWaiter` to synchronize with other nodes, wait for their reply, and
// re-schedule execution of the header once we have all missing data.
Expand Down Expand Up @@ -309,6 +329,7 @@ struct WorkerReceiverHandler {
tx_our_digests: Sender<(BatchDigest, WorkerId)>,
tx_others_digests: Sender<(BatchDigest, WorkerId)>,
tx_batches: Sender<BatchResult>,
tx_batch_removal: Sender<DeleteBatchResult>,
}

#[async_trait]
Expand Down Expand Up @@ -338,18 +359,22 @@ impl MessageHandler for WorkerReceiverHandler {
}))
.await
.expect("Failed to send batch result"),
WorkerPrimaryMessage::DeletedBatches(_) => {
// TODO: send the deleted batches to the appropriate channel
}
WorkerPrimaryMessage::DeletedBatches(batch_ids) => self
.tx_batch_removal
.send(Ok(DeleteBatchMessage { ids: batch_ids }))
.await
.expect("Failed to send batch delete result"),
WorkerPrimaryMessage::Error(error) => match error.clone() {
WorkerPrimaryError::RequestedBatchNotFound(digest) => self
.tx_batches
.send(Err(BatchMessageError { id: digest }))
.await
.expect("Failed to send batch result"),
WorkerPrimaryError::ErrorWhileDeletingBatches(_) => {
// TODO: send the error to the appropriate channel
}
WorkerPrimaryError::ErrorWhileDeletingBatches(batch_ids) => self
.tx_batch_removal
.send(Err(DeleteBatchMessage { ids: batch_ids }))
.await
.expect("Failed to send error batch delete result"),
},
}
Ok(())
Expand Down

0 comments on commit 3cfba25

Please sign in to comment.