Skip to content

Commit

Permalink
[Consensus Observer] Reset pending data on subscription changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Jul 16, 2024
1 parent 85ad048 commit 8f07165
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 13 deletions.
2 changes: 1 addition & 1 deletion config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct ConsensusObserverConfig {

/// Interval (in milliseconds) to garbage collect peer state
pub garbage_collection_interval_ms: u64,
/// Maximum number of pending blocks to keep in memory
/// Maximum number of pending blocks to keep in memory (including payloads, ordered blocks, etc.)
pub max_num_pending_blocks: u64,
/// Maximum timeout (in milliseconds) for active subscriptions
pub max_subscription_timeout_ms: u64,
Expand Down
43 changes: 42 additions & 1 deletion consensus/src/consensus_observer/missing_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ impl MissingBlockStore {
}
}

/// Clears all missing blocks from the store
pub fn clear_missing_blocks(&self) {
self.blocks_missing_payloads.lock().clear();
}

/// Inserts a block (with missing payloads) into the store
pub fn insert_missing_block(&self, ordered_block: OrderedBlock) {
// Get the epoch and round of the first block
Expand Down Expand Up @@ -168,6 +173,42 @@ mod test {
};
use rand::Rng;

#[test]
fn test_clear_missing_blocks() {
// Create a new missing block store
let max_num_pending_blocks = 10;
let consensus_observer_config = ConsensusObserverConfig {
max_num_pending_blocks: max_num_pending_blocks as u64,
..ConsensusObserverConfig::default()
};
let missing_block_store = MissingBlockStore::new(consensus_observer_config);

// Insert the maximum number of blocks into the store
let current_epoch = 0;
let starting_round = 0;
let missing_blocks = create_and_add_missing_blocks(
&missing_block_store,
max_num_pending_blocks,
current_epoch,
starting_round,
5,
);

// Verify that the store is not empty
verify_missing_blocks(
&missing_block_store,
max_num_pending_blocks,
&missing_blocks,
);

// Clear the missing blocks from the store
missing_block_store.clear_missing_blocks();

// Verify that the store is now empty
let blocks_missing_payloads = missing_block_store.blocks_missing_payloads.lock();
assert!(blocks_missing_payloads.is_empty());
}

#[test]
fn test_insert_missing_block() {
// Create a new missing block store
Expand Down Expand Up @@ -379,7 +420,7 @@ mod test {
#[test]
fn test_remove_ready_block_multiple_blocks_missing() {
// Create a new missing block store
let max_num_pending_blocks = 4;
let max_num_pending_blocks = 10;
let consensus_observer_config = ConsensusObserverConfig {
max_num_pending_blocks: max_num_pending_blocks as u64,
..ConsensusObserverConfig::default()
Expand Down
53 changes: 48 additions & 5 deletions consensus/src/consensus_observer/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ impl ConsensusObserver {
debug!(LogSchema::new(LogEntry::ConsensusObserver)
.message("Checking consensus observer progress!"));

// If we're in state sync mode, we should wait for state sync to complete
if self.in_state_sync_mode() {
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Waiting for state sync to reach target: {:?}!",
self.root.lock().commit_info()
))
);
return;
}

// Get the peer ID of the currently active subscription (if any)
let active_subscription_peer = self
.active_observer_subscription
Expand Down Expand Up @@ -192,8 +203,12 @@ impl ConsensusObserver {
self.create_new_observer_subscription(active_subscription_peer)
.await;

// If we successfully created a new subscription, update the subscription creation metrics
// If we successfully created a new subscription, clear the state and update the metrics
if let Some(active_subscription) = &self.active_observer_subscription {
// Clear the block state
self.clear_pending_block_state().await;

// Update the subscription creation metrics
self.update_subscription_creation_metrics(
active_subscription.get_peer_network_id(),
);
Expand Down Expand Up @@ -223,8 +238,7 @@ impl ConsensusObserver {
// Verify the subscription has not timed out
active_subscription.check_subscription_timeout()?;

// Verify that the DB is continuing to sync and commit new data.
// Note: we should only do this if we're not waiting for state sync.
// Verify that the DB is continuing to sync and commit new data
active_subscription.check_syncing_progress()?;

// Verify that the subscription peer is optimal
Expand All @@ -239,6 +253,30 @@ impl ConsensusObserver {
Ok(())
}

/// Clears the pending block state (this is useful for changing
/// subscriptions, where we want to wipe all state and restart).
async fn clear_pending_block_state(&self) {
// Clear the missing blocks
self.missing_block_store.clear_missing_blocks();

// Clear the payload store
self.block_payload_store.clear_all_payloads();

// Clear the pending blocks
self.pending_ordered_blocks.clear_all_pending_blocks();

// Reset the execution pipeline for the root
let root = self.root.lock().clone();
if let Err(error) = self.execution_client.reset(&root).await {
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to reset the execution pipeline for the root! Error: {:?}",
error
))
);
}
}

/// Creates and returns a commit callback (to be called after the execution pipeline)
fn create_commit_callback(&self) -> StateComputerCommitCallBackType {
// Clone the root, pending blocks and payload store
Expand Down Expand Up @@ -457,6 +495,11 @@ impl ConsensusObserver {
}
}

/// Returns true iff we are waiting for state sync to complete
fn in_state_sync_mode(&self) -> bool {
self.sync_handle.is_some()
}

/// Processes the block payload message
async fn process_block_payload_message(&mut self, block_payload: BlockPayload) {
// Get the block round and epoch
Expand Down Expand Up @@ -580,7 +623,7 @@ impl ConsensusObserver {
.update_commit_decision(commit_decision);

// If we are not in sync mode, forward the commit decision to the execution pipeline
if self.sync_handle.is_none() {
if !self.in_state_sync_mode() {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Forwarding commit decision to the execution pipeline: {}",
Expand Down Expand Up @@ -734,7 +777,7 @@ impl ConsensusObserver {
.insert_ordered_block(ordered_block.clone(), verified_ordered_proof);

// If we verified the proof, and we're not in sync mode, finalize the ordered blocks
if verified_ordered_proof && self.sync_handle.is_none() {
if verified_ordered_proof && !self.in_state_sync_mode() {
debug!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Forwarding blocks to the execution pipeline: {}",
Expand Down
5 changes: 5 additions & 0 deletions consensus/src/consensus_observer/payload_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ impl BlockPayloadStore {
})
}

/// Clears all the payloads from the block payload store
pub fn clear_all_payloads(&self) {
self.block_transaction_payloads.lock().clear();
}

/// Returns a reference to the block transaction payloads
pub fn get_block_payloads(&self) -> Arc<Mutex<HashMap<HashValue, BlockPayloadStatus>>> {
self.block_transaction_payloads.clone()
Expand Down
50 changes: 44 additions & 6 deletions consensus/src/consensus_observer/pending_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ impl PendingOrderedBlocks {
}
}

/// Clears all pending blocks
pub fn clear_all_pending_blocks(&self) {
self.pending_blocks.lock().clear();
}

/// Returns a copy of the verified pending blocks
pub fn get_all_verified_pending_blocks(
&self,
Expand Down Expand Up @@ -83,7 +88,7 @@ impl PendingOrderedBlocks {
if self.pending_blocks.lock().len() >= max_num_pending_blocks {
warn!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Exceeded the maximum number of pending blocks: {:?}. Block verification: {:?}, block: {:?}.",
"Exceeded the maximum number of pending blocks: {:?}. Block verification: {:?}. Dropping block: {:?}!",
max_num_pending_blocks,
verified_ordered_proof,
ordered_block.proof_block_info()
Expand Down Expand Up @@ -246,7 +251,40 @@ mod test {
};

#[test]
pub fn test_get_last_pending_block() {
fn test_clear_all_pending_blocks() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

// Insert several verified blocks for the current epoch
let current_epoch = 0;
let num_verified_blocks = 10;
create_and_add_pending_blocks(
&pending_ordered_blocks,
num_verified_blocks,
current_epoch,
true,
);

// Insert several unverified blocks for the next epoch
let next_epoch = current_epoch + 1;
let num_unverified_blocks = 20;
create_and_add_pending_blocks(
&pending_ordered_blocks,
num_unverified_blocks,
next_epoch,
false,
);

// Clear all pending blocks
pending_ordered_blocks.clear_all_pending_blocks();

// Check all the pending blocks were removed
let num_pending_blocks = pending_ordered_blocks.pending_blocks.lock().len();
assert_eq!(num_pending_blocks, 0);
}

#[test]
fn test_get_last_pending_block() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

Expand Down Expand Up @@ -303,7 +341,7 @@ mod test {
}

#[test]
pub fn test_get_verified_pending_block() {
fn test_get_verified_pending_block() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

Expand Down Expand Up @@ -359,7 +397,7 @@ mod test {
}

#[test]
pub fn test_insert_ordered_block_limit() {
fn test_insert_ordered_block_limit() {
// Create a consensus observer config with a maximum of 10 pending blocks
let max_num_pending_blocks = 10;
let consensus_observer_config = ConsensusObserverConfig {
Expand Down Expand Up @@ -408,7 +446,7 @@ mod test {
}

#[test]
pub fn test_remove_blocks_for_commit() {
fn test_remove_blocks_for_commit() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

Expand Down Expand Up @@ -507,7 +545,7 @@ mod test {
}

#[test]
pub fn test_update_commit_decision() {
fn test_update_commit_decision() {
// Create new pending ordered blocks
let pending_ordered_blocks = PendingOrderedBlocks::new(ConsensusObserverConfig::default());

Expand Down

0 comments on commit 8f07165

Please sign in to comment.