Skip to content
This repository has been archived by the owner on Oct 17, 2022. It is now read-only.

Commit

Permalink
Revert "[refactor] use block_waiter instead of batch_loader (#738)"
Browse files Browse the repository at this point in the history
This reverts commit babd3d8.
  • Loading branch information
huitseeker committed Sep 7, 2022
1 parent c7b1b63 commit e44a82f
Show file tree
Hide file tree
Showing 22 changed files with 518 additions and 562 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ tokio-util = { version = "0.7.3", features = ["codec"] }
tonic = "0.7.2"
tracing = "0.1.36"
prometheus = "0.13.1"
backoff = { version = "0.4.0", features = ["tokio"] }
storage = { path = "../storage" }
itertools = "0.10.3"

crypto = { path = "../crypto" }
types = { path = "../types" }
mysten-network = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "352091e92070c2ecfcccad444361a78249ecfe59" }

Expand All @@ -38,7 +38,6 @@ match_opt = "0.1.2"
indexmap = { version = "1.9.1", features = ["serde"] }
rand = "0.8.5"
tempfile = "3.3.0"
primary = { path = "../primary" }
test_utils = { path = "../test_utils" }
types = { path = "../types" }
telemetry-subscribers = { git = "https://github.com/mystenlabs/mysten-infra.git", rev = "352091e92070c2ecfcccad444361a78249ecfe59" }
204 changes: 204 additions & 0 deletions executor/src/batch_loader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use crate::errors::{SubscriberError, SubscriberResult};

use config::WorkerId;
use consensus::ConsensusOutput;
use futures::stream::StreamExt;
use multiaddr::Multiaddr;
use std::collections::{HashMap, HashSet};
use store::Store;
use tokio::{
sync::{
mpsc::{channel, Receiver, Sender},
watch,
},
task::JoinHandle,
};
use tracing::{error, warn};
use types::{
metered_channel, serialized_batch_digest, BatchDigest, BincodeEncodedPayload,
ClientBatchRequest, ReconfigureNotification, SerializedBatchMessage, WorkerToWorkerClient,
};

/// Download transactions data from the consensus workers and notifies the called when the job is done.
pub struct BatchLoader {
/// The temporary storage holding all transactions' data (that may be too big to hold in memory).
store: Store<BatchDigest, SerializedBatchMessage>,
/// Receive reconfiguration updates.
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
/// Receive consensus outputs for which to download the associated transaction data.
rx_input: metered_channel::Receiver<ConsensusOutput>,
/// The network addresses of the consensus workers.
addresses: HashMap<WorkerId, Multiaddr>,
/// A map of connections with the consensus workers.
connections: HashMap<WorkerId, Sender<Vec<BatchDigest>>>,
}

impl BatchLoader {
/// Spawn a new batch loaded in a dedicated tokio task.
#[must_use]
pub fn spawn(
store: Store<BatchDigest, SerializedBatchMessage>,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_input: metered_channel::Receiver<ConsensusOutput>,
addresses: HashMap<WorkerId, Multiaddr>,
) -> JoinHandle<()> {
tokio::spawn(async move {
Self {
store,
rx_reconfigure,
rx_input,
addresses,
connections: HashMap::new(),
}
.run()
.await
.expect("Failed to run batch loader")
})
}

/// Receive consensus messages for which we need to download the associated transaction data.
async fn run(&mut self) -> SubscriberResult<()> {
loop {
tokio::select! {
// Receive sync requests.
Some(message) = self.rx_input.recv() => {
let certificate = &message.certificate;

// Send a request for every batch referenced by the certificate.
// TODO: Can we write it better without allocating a HashMap every time?
let mut map = HashMap::with_capacity(certificate.header.payload.len());
for (digest, worker_id) in certificate.header.payload.iter() {
map.entry(*worker_id).or_insert_with(Vec::new).push(*digest);
}
for (worker_id, digests) in map {
let address = self
.addresses
.get(&worker_id)
.ok_or(SubscriberError::UnexpectedWorkerId(worker_id))?;

let sender = self.connections.entry(worker_id).or_insert_with(|| {
let (sender, receiver) = channel(primary::CHANNEL_CAPACITY);
SyncConnection::spawn(
address.clone(),
self.store.clone(),
receiver,
);
sender
});

sender
.send(digests)
.await
.expect("Sync connections are kept alive and never die");
}
}

// Check whether the committee changed.
result = self.rx_reconfigure.changed() => {
result.expect("Committee channel dropped");
let message = self.rx_reconfigure.borrow().clone();
if let ReconfigureNotification::Shutdown = message {
return Ok(());
}
}
}
}
}
}

/// Connect (and maintain a connection) with a specific worker. Then download batches from that
/// specific worker.
struct SyncConnection {
/// The address of the worker to connect with.
address: Multiaddr,
/// The temporary storage holding all transactions' data (that may be too big to hold in memory).
store: Store<BatchDigest, SerializedBatchMessage>,
/// Receive the batches to download from the worker.
rx_request: Receiver<Vec<BatchDigest>>,
/// Keep a set of requests already made to avoid asking twice for the same batch.
to_request: HashSet<BatchDigest>,
}

impl SyncConnection {
/// Spawn a new worker connection in a dedicated tokio task.
pub fn spawn(
address: Multiaddr,
store: Store<BatchDigest, SerializedBatchMessage>,
rx_request: Receiver<Vec<BatchDigest>>,
) {
tokio::spawn(async move {
Self {
address,
store,
rx_request,
to_request: HashSet::new(),
}
.run()
.await;
});
}

/// Main loop keeping the connection with a worker alive and receive batches to download.
async fn run(&mut self) {
let config = mysten_network::config::Config::new();
//TODO don't panic on bad address
let channel = config.connect_lazy(&self.address).unwrap();
let mut client = WorkerToWorkerClient::new(channel);

while let Some(digests) = self.rx_request.recv().await {
// Filter digests that we already requested.
for digest in digests {
self.to_request.insert(digest);
}

let missing = self.to_request.iter().copied().collect();
// Request the batch from the worker.
let message = ClientBatchRequest(missing);
//TODO wrap this call in the retry
let mut stream = match client
.client_batch_request(BincodeEncodedPayload::try_from(&message).unwrap())
.await
{
Ok(stream) => stream.into_inner(),
Err(e) => {
warn!(
"Failed to send sync request to worker {}: {e}",
self.address
);
continue;
}
};

// Receive the batch data from the worker.
while let Some(result) = stream.next().await {
match result {
Ok(batch) => {
let batch = batch.payload;
// Store the batch in the temporary store.
// TODO: We can probably avoid re-computing the hash of the bach since we trust the worker.
let res_digest = serialized_batch_digest(&batch);
match res_digest {
Ok(digest) => {
self.store.write(digest, batch.to_vec()).await;

// Cleanup internal state.
self.to_request.remove(&digest);
}
Err(error) => {
error!("Worker sent invalid serialized batch data: {error}");
}
}
}
Err(e) => {
warn!(
"Failed to receive batch reply from worker {}: {e}",
self.address
);
}
}
}
}
}
}
19 changes: 14 additions & 5 deletions executor/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ use tokio::{
task::JoinHandle,
};
use tracing::debug;
use types::{metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber};
use types::{
metered_channel, Batch, BatchDigest, ReconfigureNotification, SequenceNumber,
SerializedBatchMessage, WorkerMessage,
};

#[cfg(test)]
#[path = "tests/executor_tests.rs"]
Expand All @@ -26,7 +29,7 @@ pub mod executor_tests;
/// not processes twice the same transaction (despite crash-recovery).
pub struct Core<State: ExecutionState> {
/// The temporary storage holding all transactions' data (that may be too big to hold in memory).
store: Store<BatchDigest, Batch>,
store: Store<BatchDigest, SerializedBatchMessage>,
/// The (global) state to perform execution.
execution_state: Arc<State>,
/// Receive reconfiguration updates.
Expand Down Expand Up @@ -54,7 +57,7 @@ where
/// Spawn a new executor in a dedicated tokio task.
#[must_use]
pub fn spawn(
store: Store<BatchDigest, Batch>,
store: Store<BatchDigest, SerializedBatchMessage>,
execution_state: Arc<State>,
rx_reconfigure: watch::Receiver<ReconfigureNotification>,
rx_subscriber: metered_channel::Receiver<ConsensusOutput>,
Expand Down Expand Up @@ -134,8 +137,8 @@ where
total_batches: usize,
) -> SubscriberResult<()> {
// The store should now hold all transaction data referenced by the input certificate.
let transactions = match self.store.read(batch_digest).await? {
Some(x) => x.0,
let batch = match self.store.read(batch_digest).await? {
Some(x) => x,
None => {
// If two certificates contain the exact same batch (eg. by the actions of a Byzantine
// consensus node), some correct client may already have deleted the batch from their
Expand All @@ -148,6 +151,12 @@ where
}
};

// Deserialize the consensus workers' batch message to retrieve a list of transactions.
let transactions = match bincode::deserialize(&batch)? {
WorkerMessage::Batch(Batch(x)) => x,
_ => bail!(SubscriberError::UnexpectedProtocolMessage),
};

// Execute every transaction in the batch.
let total_transactions = transactions.len();
for (index, transaction) in transactions.into_iter().enumerate() {
Expand Down
4 changes: 0 additions & 4 deletions executor/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use config::WorkerId;
use std::fmt::Debug;
use store::StoreError;
use thiserror::Error;
use types::CertificateDigest;

#[macro_export]
macro_rules! bail {
Expand Down Expand Up @@ -50,9 +49,6 @@ pub enum SubscriberError {
#[error("Storage failure: {0}")]
StoreError(#[from] StoreError),

#[error("Error occurred while retrieving certificate {0} payload: {1}")]
PayloadRetrieveError(CertificateDigest, String),

#[error("Consensus referenced unexpected worker id {0}")]
UnexpectedWorkerId(WorkerId),

Expand Down
Loading

0 comments on commit e44a82f

Please sign in to comment.