Skip to content

Commit

Permalink
Client batch sync (#122)
Browse files Browse the repository at this point in the history
Worker batch update
  • Loading branch information
asonnino authored Mar 30, 2022
1 parent 0eaffc3 commit 4c35619
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 44 deletions.
65 changes: 44 additions & 21 deletions narwhal/worker/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crypto::traits::VerifyingKey;
use network::SimpleSender;
use primary::BatchDigest;
use store::Store;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{error, warn};

use crate::processor::SerializedBatchMessage;
Expand All @@ -24,8 +24,10 @@ pub struct Helper<PublicKey: VerifyingKey> {
committee: Committee<PublicKey>,
/// The persistent storage.
store: Store<BatchDigest, SerializedBatchMessage>,
/// Input channel to receive batch requests.
rx_request: Receiver<(Vec<BatchDigest>, PublicKey)>,
/// Input channel to receive batch requests from workers.
rx_worker_request: Receiver<(Vec<BatchDigest>, PublicKey)>,
/// Input channel to receive batch requests from workers.
rx_client_request: Receiver<(Vec<BatchDigest>, Sender<SerializedBatchMessage>)>,
/// A network sender to send the batches to the other workers.
network: SimpleSender,
}
Expand All @@ -35,14 +37,16 @@ impl<PublicKey: VerifyingKey> Helper<PublicKey> {
id: WorkerId,
committee: Committee<PublicKey>,
store: Store<BatchDigest, SerializedBatchMessage>,
rx_request: Receiver<(Vec<BatchDigest>, PublicKey)>,
rx_worker_request: Receiver<(Vec<BatchDigest>, PublicKey)>,
rx_client_request: Receiver<(Vec<BatchDigest>, Sender<SerializedBatchMessage>)>,
) {
tokio::spawn(async move {
Self {
id,
committee,
store,
rx_request,
rx_worker_request,
rx_client_request,
network: SimpleSender::new(),
}
.run()
Expand All @@ -51,24 +55,43 @@ impl<PublicKey: VerifyingKey> Helper<PublicKey> {
}

async fn run(&mut self) {
while let Some((digests, origin)) = self.rx_request.recv().await {
// TODO [issue #7]: Do some accounting to prevent bad nodes from monopolizing our resources.
// TODO [issue #7]: Do some accounting to prevent bad actors from monopolizing our resources.
loop {
tokio::select! {
// Handle requests from other workers.
Some((digests, origin)) = self.rx_worker_request.recv() => {
// get the requestors address.
let address = match self.committee.worker(&origin, &self.id) {
Ok(x) => x.worker_to_worker,
Err(e) => {
warn!("Unexpected batch request: {e}");
continue;
}
};

// get the requestors address.
let address = match self.committee.worker(&origin, &self.id) {
Ok(x) => x.worker_to_worker,
Err(e) => {
warn!("Unexpected batch request: {e}");
continue;
}
};
// Reply to the request (the best we can).
for digest in digests {
match self.store.read(digest).await {
Ok(Some(data)) => self.network.send(address, Bytes::from(data)).await,
Ok(None) => (),
Err(e) => error!("{e}"),
}
}
},

// Reply to the request (the best we can).
for digest in digests {
match self.store.read(digest).await {
Ok(Some(data)) => self.network.send(address, Bytes::from(data)).await,
Ok(None) => (),
Err(e) => error!("{e}"),
// Handle requests from clients.
Some((digests, replier)) = self.rx_client_request.recv() => {
// Reply to the request (the best we can).
for digest in digests {
match self.store.read(digest).await {
Ok(Some(data)) => replier
.send(data)
.await
.expect("Failed to reply to network"),
Ok(None) => (),
Err(e) => error!("{e}"),
}
}
}
}
}
Expand Down
54 changes: 50 additions & 4 deletions narwhal/worker/src/tests/helper_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use store::rocks;
use tokio::sync::mpsc::channel;

#[tokio::test]
async fn batch_reply() {
let (tx_request, rx_request) = channel(1);
async fn worker_batch_reply() {
let (tx_worker_request, rx_worker_request) = channel(1);
let (_tx_client_request, rx_client_request) = channel(1);
let (requestor, _) = keys().pop().unwrap();
let id = 0;
let committee = committee_with_base_port(8_000);
Expand All @@ -29,7 +30,13 @@ async fn batch_reply() {
store.write(batch_digest(), serialized_batch()).await;

// Spawn an `Helper` instance.
Helper::spawn(id, committee.clone(), store, rx_request);
Helper::spawn(
id,
committee.clone(),
store,
rx_worker_request,
rx_client_request,
);

// Spawn a listener to receive the batch reply.
let address = committee.worker(&requestor, &id).unwrap().worker_to_worker;
Expand All @@ -38,8 +45,47 @@ async fn batch_reply() {

// Send a batch request.
let digests = vec![batch_digest()];
tx_request.send((digests, requestor)).await.unwrap();
tx_worker_request.send((digests, requestor)).await.unwrap();

// Ensure the requestor received the batch (ie. it did not panic).
assert!(handle.await.is_ok());
}

#[tokio::test]
async fn client_batch_reply() {
let (_tx_worker_request, rx_worker_request) = channel(1);
let (tx_client_request, rx_client_request) = channel(1);
let id = 0;
let committee = committee_with_base_port(8_001);

// Create a new test store.
let db = rocks::DBMap::<BatchDigest, SerializedBatchMessage>::open(
temp_dir(),
None,
Some("batches"),
)
.unwrap();
let store = Store::new(db);

// Add a batch to the store.
store.write(batch_digest(), serialized_batch()).await;

// Spawn an `Helper` instance.
Helper::spawn(
id,
committee.clone(),
store,
rx_worker_request,
rx_client_request,
);

// Send batch request.
let digests = vec![batch_digest()];
let (sender, mut receiver) = channel(digests.len());
tx_client_request.send((digests, sender)).await.unwrap();

// Wait for the reply and ensure it is as expected.
while let Some(bytes) = receiver.recv().await {
assert_eq!(bytes, serialized_batch());
}
}
55 changes: 53 additions & 2 deletions narwhal/worker/src/tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0
use super::*;
use crate::common::{
batch_digest, committee_with_base_port, keys, listener, temp_dir, transaction,
use crate::{
common::{
batch_digest, committee_with_base_port, keys, listener, serialized_batch, temp_dir,
transaction,
},
worker::WorkerMessage,
};
use crypto::ed25519::Ed25519PublicKey;
use futures::{SinkExt, StreamExt};
use network::SimpleSender;
use primary::WorkerPrimaryMessage;
use store::rocks;
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, LengthDelimitedCodec};

#[tokio::test]
async fn handle_clients_transactions() {
Expand Down Expand Up @@ -51,3 +59,46 @@ async fn handle_clients_transactions() {
// Ensure the primary received the batch's digest (ie. it did not panic).
assert!(handle.await.is_ok());
}

#[tokio::test]
async fn handle_client_batch_request() {
let (name, _) = keys().pop().unwrap();
let id = 0;
let committee = committee_with_base_port(11_001);
let parameters = Parameters {
max_header_delay: 100_000, // Ensure no batches are created.
..Parameters::default()
};

// Create a new test store.
let db = rocks::DBMap::<BatchDigest, SerializedBatchMessage>::open(
temp_dir(),
None,
Some("batches"),
)
.unwrap();
let store = Store::new(db);

// Add a batch to the store.
store.write(batch_digest(), serialized_batch()).await;

// Spawn a `Worker` instance.
Worker::spawn(name.clone(), id, committee.clone(), parameters, store);

// Spawn a client to ask for batches and receive the reply.
tokio::task::yield_now().await;
let address = committee.worker(&name, &id).unwrap().worker_to_worker;
let socket = TcpStream::connect(address).await.unwrap();
let (mut writer, mut reader) = Framed::new(socket, LengthDelimitedCodec::new()).split();

// Send batch request.
let digests = vec![batch_digest()];
let message = WorkerMessage::<Ed25519PublicKey>::ClientBatchRequest(digests);
let serialized = bincode::serialize(&message).unwrap();
writer.send(Bytes::from(serialized)).await.unwrap();

// Wait for the reply and ensure it is as expected.
let bytes = reader.next().await.unwrap().unwrap();
let expected = Bytes::from(serialized_batch());
assert_eq!(bytes, expected);
}
59 changes: 42 additions & 17 deletions narwhal/worker/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ pub type SerializedWorkerPrimaryMessage = Vec<u8>;
#[derive(Debug, Serialize, Deserialize)]
#[serde(bound(deserialize = "PublicKey: VerifyingKey"))]
pub enum WorkerMessage<PublicKey: VerifyingKey> {
/// Used by workers to send a new batch or to reply to a batch request.
Batch(Batch),
/// Used by workers to request batches.
BatchRequest(Vec<BatchDigest>, /* origin */ PublicKey),
/// Used by clients to request batches.
ClientBatchRequest(Vec<BatchDigest>),
}

pub struct Worker<PublicKey: VerifyingKey> {
Expand Down Expand Up @@ -206,7 +210,8 @@ impl<PublicKey: VerifyingKey> Worker<PublicKey> {

/// Spawn all tasks responsible to handle messages from other workers.
fn handle_workers_messages(&self, tx_primary: Sender<SerializedWorkerPrimaryMessage>) {
let (tx_helper, rx_helper) = channel(CHANNEL_CAPACITY);
let (tx_worker_helper, rx_worker_helper) = channel(CHANNEL_CAPACITY);
let (tx_client_helper, rx_client_helper) = channel(CHANNEL_CAPACITY);
let (tx_processor, rx_processor) = channel(CHANNEL_CAPACITY);

// Receive incoming messages from other workers.
Expand All @@ -220,7 +225,8 @@ impl<PublicKey: VerifyingKey> Worker<PublicKey> {
address,
/* handler */
WorkerReceiverHandler {
tx_helper,
tx_worker_helper,
tx_client_helper,
tx_processor,
},
);
Expand All @@ -230,7 +236,8 @@ impl<PublicKey: VerifyingKey> Worker<PublicKey> {
self.id,
self.committee.clone(),
self.store.clone(),
/* rx_request */ rx_helper,
/* rx_worker_request */ rx_worker_helper,
/* rx_client_request */ rx_client_helper,
);

// This `Processor` hashes and stores the batches we receive from the other workers. It then forwards the
Expand Down Expand Up @@ -274,28 +281,46 @@ impl MessageHandler for TxReceiverHandler {
/// Defines how the network receiver handles incoming workers messages.
#[derive(Clone)]
struct WorkerReceiverHandler<PublicKey: VerifyingKey> {
tx_helper: Sender<(Vec<BatchDigest>, PublicKey)>,
tx_worker_helper: Sender<(Vec<BatchDigest>, PublicKey)>,
tx_client_helper: Sender<(Vec<BatchDigest>, Sender<SerializedBatchMessage>)>,
tx_processor: Sender<SerializedBatchMessage>,
}

#[async_trait]
impl<PublicKey: VerifyingKey> MessageHandler for WorkerReceiverHandler<PublicKey> {
async fn dispatch(&self, writer: &mut Writer, serialized: Bytes) -> Result<(), Box<dyn Error>> {
// Reply with an ACK.
let _ = writer.send(Bytes::from("Ack")).await;

// Deserialize and parse the message.
match bincode::deserialize(&serialized) {
Ok(WorkerMessage::Batch(..)) => self
.tx_processor
.send(serialized.to_vec())
.await
.expect("Failed to send batch"),
Ok(WorkerMessage::BatchRequest(missing, requestor)) => self
.tx_helper
.send((missing, requestor))
.await
.expect("Failed to send batch request"),
Ok(WorkerMessage::Batch(..)) => {
self.tx_processor
.send(serialized.to_vec())
.await
.expect("Failed to send batch");

let _ = writer.send(Bytes::from("Ack")).await;
}
Ok(WorkerMessage::BatchRequest(missing, requestor)) => {
self.tx_worker_helper
.send((missing, requestor))
.await
.expect("Failed to send batch request");

let _ = writer.send(Bytes::from("Ack")).await;
}
Ok(WorkerMessage::ClientBatchRequest(missing)) => {
// TODO [issue #7]: Do some accounting to prevent bad actors from use all our
// resources (in this case allocate a gigantic channel).
let (sender, mut receiver) = channel(missing.len());

self.tx_client_helper
.send((missing, sender))
.await
.expect("Failed to send batch request");

while let Some(batch) = receiver.recv().await {
writer.send(Bytes::from(batch)).await?;
}
}
Err(e) => warn!("Serialization error: {e}"),
}
Ok(())
Expand Down

0 comments on commit 4c35619

Please sign in to comment.