Skip to content

Commit

Permalink
protocol: batch operation announcement per interval
Browse files Browse the repository at this point in the history
  • Loading branch information
gterzian committed Sep 28, 2022
1 parent 5a6ac4c commit 353dc93
Showing 1 changed file with 28 additions and 6 deletions.
34 changes: 28 additions & 6 deletions massa-protocol-worker/src/protocol_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use massa_protocol_exports::{
use massa_storage::Storage;
use massa_time::{MassaTime, TimeError};
use std::collections::{HashMap, HashSet};
use std::mem;
use tokio::{
sync::mpsc,
sync::mpsc::error::SendTimeoutError,
Expand Down Expand Up @@ -158,6 +159,8 @@ pub struct ProtocolWorker {
pub(crate) op_batch_buffer: OperationBatchBuffer,
/// Shared storage.
pub(crate) storage: Storage,
/// Operations to announce at the next interval.
operations_to_announce: Vec<OperationId>,
}

/// channels used by the protocol worker
Expand Down Expand Up @@ -213,6 +216,7 @@ impl ProtocolWorker {
config.operation_batch_buffer_capacity,
),
storage,
operations_to_announce: Default::default(),
}
}

Expand Down Expand Up @@ -292,9 +296,14 @@ impl ProtocolWorker {
self.update_ask_block(&mut block_ask_timer).await?;
}

// operation ask timer
// operation ask, and announce, timer
_ = &mut operation_batch_proc_period_timer => {
massa_trace!("protocol.protocol_worker.run_loop.operation_ask_timer", { });
massa_trace!("protocol.protocol_worker.run_loop.operation_ask_and_announce_timer", { });

// Announce operations.
self.announce_ops().await;

// Update operations to ask.
self.update_ask_operation(&mut operation_batch_proc_period_timer).await?;
}
// operation prune timer
Expand All @@ -311,9 +320,10 @@ impl ProtocolWorker {

/// Announce a set of operations to active nodes who do not know about it yet.
/// Side effect: notes nodes as knowing about those operations from now on.
async fn announce_ops(&mut self, operations: &[OperationId]) {
async fn announce_ops(&mut self) {
let operations = mem::take(&mut self.operations_to_announce);
massa_trace!("protocol.protocol_worker.propagate_operations.begin", {
"operation": operations
"operations": operations
});
for (node, node_info) in self.active_nodes.iter_mut() {
let new_ops: Vec<OperationId> = operations
Expand All @@ -334,6 +344,18 @@ impl ProtocolWorker {
}
}

/// Add an list of operations to a queue pending for announcement.
async fn note_operations_to_announce(&mut self, operations: &[OperationId]) {
// If we have too many operations to announce,
// announce them immediately, clearing the data at the same time.
if self.operations_to_announce.len() > self.config.max_known_ops_size {
self.announce_ops().await;
}

// Add the operations to a list for announcement at the next interval.
self.operations_to_announce.extend_from_slice(operations);
}

async fn propagate_endorsements(&mut self, storage: &Storage) {
massa_trace!(
"protocol.protocol_worker.process_command.propagate_endorsements.begin",
Expand Down Expand Up @@ -482,7 +504,7 @@ impl ProtocolWorker {

// Announce operations to active nodes not knowing about it.
let to_announce: Vec<OperationId> = operation_ids.iter().copied().collect();
self.announce_ops(&to_announce).await;
self.note_operations_to_announce(&to_announce).await;
}
ProtocolCommand::PropagateEndorsements(endorsements) => {
self.propagate_endorsements(&endorsements).await;
Expand Down Expand Up @@ -987,7 +1009,7 @@ impl ProtocolWorker {
ops_to_propagate.drop_operation_refs(&operations_to_not_propagate);
let to_announce: Vec<OperationId> =
ops_to_propagate.get_op_refs().iter().copied().collect();
self.announce_ops(&to_announce).await;
self.note_operations_to_announce(&to_announce).await;

// Add to pool
self.pool_controller.add_operations(ops);
Expand Down

0 comments on commit 353dc93

Please sign in to comment.