Skip to content

Commit

Permalink
protocol: batch operation announcement per interval
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Sep 28, 2022
1 parent 5a6ac4c commit 3787f0a
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 6 deletions.
35 changes: 29 additions & 6 deletions massa-protocol-worker/src/protocol_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use massa_protocol_exports::{
use massa_storage::Storage;
use massa_time::{MassaTime, TimeError};
use std::collections::{HashMap, HashSet};
use std::mem;
use tokio::{
sync::mpsc,
sync::mpsc::error::SendTimeoutError,
Expand Down Expand Up @@ -158,6 +159,8 @@ pub struct ProtocolWorker {
pub(crate) op_batch_buffer: OperationBatchBuffer,
/// Shared storage.
pub(crate) storage: Storage,
/// Operations to announce at the next interval.
operations_to_announce: Vec<OperationId>,
}

/// channels used by the protocol worker
Expand Down Expand Up @@ -213,6 +216,7 @@ impl ProtocolWorker {
config.operation_batch_buffer_capacity,
),
storage,
operations_to_announce: Default::default(),
}
}

Expand Down Expand Up @@ -292,9 +296,14 @@ impl ProtocolWorker {
self.update_ask_block(&mut block_ask_timer).await?;
}

// operation ask timer
// operation ask, and announce, timer
_ = &mut operation_batch_proc_period_timer => {
massa_trace!("protocol.protocol_worker.run_loop.operation_ask_timer", { });
massa_trace!("protocol.protocol_worker.run_loop.operation_ask_and_announce_timer", { });

// Announce operations.
self.announce_ops().await;

// Update operations to ask.
self.update_ask_operation(&mut operation_batch_proc_period_timer).await?;
}
// operation prune timer
Expand All @@ -311,9 +320,10 @@ impl ProtocolWorker {

/// Announce a set of operations to active nodes who do not know about it yet.
/// Side effect: notes nodes as knowing about those operations from now on.
async fn announce_ops(&mut self, operations: &[OperationId]) {
async fn announce_ops(&mut self) {
let operations = mem::take(&mut self.operations_to_announce);
massa_trace!("protocol.protocol_worker.propagate_operations.begin", {
"operation": operations
"operations": operations
});
for (node, node_info) in self.active_nodes.iter_mut() {
let new_ops: Vec<OperationId> = operations
Expand All @@ -334,6 +344,19 @@ impl ProtocolWorker {
}
}

/// Add an list of operations to a queue pending for announcement.
async fn note_operations_to_announce(&mut self, operations: &[OperationId]) {
// If we have too many operations to announce,
// announce them immediately,
// clearing the data at the same time.
if self.operations_to_announce.len() > self.config.max_known_ops_size {
self.announce_ops().await;
}

// Add the operations to a list for announcement at the next interval.
self.operations_to_announce.extend_from_slice(operations);
}

async fn propagate_endorsements(&mut self, storage: &Storage) {
massa_trace!(
"protocol.protocol_worker.process_command.propagate_endorsements.begin",
Expand Down Expand Up @@ -482,7 +505,7 @@ impl ProtocolWorker {

// Announce operations to active nodes not knowing about it.
let to_announce: Vec<OperationId> = operation_ids.iter().copied().collect();
self.announce_ops(&to_announce).await;
self.note_operations_to_announce(&to_announce).await;
}
ProtocolCommand::PropagateEndorsements(endorsements) => {
self.propagate_endorsements(&endorsements).await;
Expand Down Expand Up @@ -987,7 +1010,7 @@ impl ProtocolWorker {
ops_to_propagate.drop_operation_refs(&operations_to_not_propagate);
let to_announce: Vec<OperationId> =
ops_to_propagate.get_op_refs().iter().copied().collect();
self.announce_ops(&to_announce).await;
self.note_operations_to_announce(&to_announce).await;

// Add to pool
self.pool_controller.add_operations(ops);
Expand Down
78 changes: 78 additions & 0 deletions massa-protocol-worker/src/tests/operations_scenarios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,84 @@ async fn test_protocol_propagates_operations_received_over_the_network_only_to_n
.await;
}

#[tokio::test]
#[serial]
async fn test_protocol_batches_propagation_of_operations_received_over_the_network_and_from_the_api(
) {
let protocol_config = &tools::PROTOCOL_CONFIG;
protocol_test_with_storage(
protocol_config,
async move |mut network_controller,
protocol_event_receiver,
mut protocol_command_sender,
protocol_manager,
mut pool_event_receiver,
mut storage| {
// Create 2 nodes.
let nodes = tools::create_and_connect_nodes(2, &mut network_controller).await;

// 1. Create an operation
let operation = tools::create_operation_with_expire_period(&nodes[0].keypair, 1);

// Send operation and wait for the protocol pool event.
network_controller
.send_operations(nodes[0].id, vec![operation.clone()])
.await;
pool_event_receiver.wait_command(1000.into(), |evt| match evt {
MockPoolControllerMessage::AddOperations { .. } => {
panic!("Unexpected or no protocol event.")
}
_ => Some(MockPoolControllerMessage::Any),
});

let expected_operation_id_1 = operation.id;

// Create another operation
let operation = tools::create_operation_with_expire_period(&nodes[0].keypair, 1);

// Send it via the API.
storage.store_operations(vec![operation.clone()]);
protocol_command_sender
.propagate_operations(storage)
.await
.unwrap();

let expected_operation_id_2 = operation.id;

// Assert both operation are propagated in one batch to the node that doesn't know about them.
loop {
match network_controller
.wait_command(1000.into(), |cmd| match cmd {
cmd @ NetworkCommand::SendOperationAnnouncements { .. } => Some(cmd),
_ => None,
})
.await
{
Some(NetworkCommand::SendOperationAnnouncements { to_node, batch }) => {
if nodes[1].id == to_node {
assert_eq!(batch.len(), 2);
assert!(batch.contains(&expected_operation_id_1.prefix()));
assert!(batch.contains(&expected_operation_id_2.prefix()));
break;
} else {
assert_eq!(nodes[0].id, to_node);
}
}
_ => panic!("Unexpected or no network command."),
};
}
(
network_controller,
protocol_event_receiver,
protocol_command_sender,
protocol_manager,
pool_event_receiver,
)
},
)
.await;
}

#[tokio::test]
#[serial]
async fn test_protocol_propagates_operations_only_to_nodes_that_dont_know_about_it_indirect_knowledge_via_header(
Expand Down

0 comments on commit 3787f0a

Please sign in to comment.