Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Add stale branches heads to finality notifications #10639

Merged
merged 16 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,26 @@ pub struct ImportSummary<Block: BlockT> {
pub tree_route: Option<sp_blockchain::TreeRoute<Block>>,
}

/// Import operation wrapper
/// Finalization operation summary.
///
/// Contains information about the block that just got finalized,
/// including tree heads that became stale at the moment of finalization.
pub struct FinalizeSummary<Block: BlockT> {
/// Blocks that were finalized.
/// The last entry is the one that has been explicitly finalized.
pub finalized: Vec<Block::Hash>,
/// Heads that became stale during this finalization operation.
pub stale_heads: Vec<Block::Hash>,
}

/// Import operation wrapper.
pub struct ClientImportOperation<Block: BlockT, B: Backend<Block>> {
/// DB Operation.
pub op: B::BlockImportOperation,
/// Summary of imported block.
pub notify_imported: Option<ImportSummary<Block>>,
/// A list of hashes of blocks that got finalized.
pub notify_finalized: Vec<Block::Hash>,
/// Summary of finalized block.
pub notify_finalized: Option<FinalizeSummary<Block>>,
}

/// Helper function to apply auxiliary data insertion into an operation.
Expand Down
10 changes: 7 additions & 3 deletions client/api/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,14 @@ pub struct BlockImportNotification<Block: BlockT> {
/// Summary of a finalized block.
#[derive(Clone, Debug)]
pub struct FinalityNotification<Block: BlockT> {
/// Imported block header hash.
/// Finalized block header hash.
pub hash: Block::Hash,
/// Imported block header.
/// Finalized block header.
pub header: Block::Header,
/// Path from the old finalized to new finalized parent (implicitly finalized blocks).
pub tree_route: Arc<Vec<Block::Hash>>,
/// Stale branches heads.
pub stale_heads: Arc<Vec<Block::Hash>>,
bkchr marked this conversation as resolved.
Show resolved Hide resolved
}

impl<B: BlockT> TryFrom<BlockImportNotification<B>> for ChainEvent<B> {
Expand All @@ -293,6 +297,6 @@ impl<B: BlockT> TryFrom<BlockImportNotification<B>> for ChainEvent<B> {

impl<B: BlockT> From<FinalityNotification<B>> for ChainEvent<B> {
fn from(n: FinalityNotification<B>) -> Self {
Self::Finalized { hash: n.hash }
Self::Finalized { hash: n.hash, tree_route: n.tree_route }
}
}
4 changes: 1 addition & 3 deletions client/network/src/protocol/sync/extra_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,7 @@ impl<B: BlockT> ExtraRequests<B> {
}

if best_finalized_number > self.best_seen_finalized_number {
// normally we'll receive finality notifications for every block => finalize would be
// enough but if many blocks are finalized at once, some notifications may be omitted
// => let's use finalize_with_ancestors here
// we receive finality notification only for the finalized branch head.
match self.tree.finalize_with_ancestors(
best_finalized_hash,
best_finalized_number,
Expand Down
8 changes: 2 additions & 6 deletions client/network/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -976,14 +976,10 @@ where
peer.network.service().announce_block(notification.hash, None);
}

// We poll `finality_notification_stream`, but we only take the last event.
let mut last = None;
while let Poll::Ready(Some(item)) =
// We poll `finality_notification_stream`.
while let Poll::Ready(Some(notification)) =
peer.finality_notification_stream.as_mut().poll_next(cx)
{
last = Some(item);
}
if let Some(notification) = last {
peer.network.on_block_finalized(notification.hash, notification.header);
}
}
Expand Down
162 changes: 105 additions & 57 deletions client/service/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use rand::Rng;
use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider, RecordProof};
use sc_client_api::{
backend::{
self, apply_aux, BlockImportOperation, ClientImportOperation, Finalizer, ImportSummary,
LockImportRun, NewBlockState, StorageProvider,
self, apply_aux, BlockImportOperation, ClientImportOperation, FinalizeSummary, Finalizer,
ImportSummary, LockImportRun, NewBlockState, StorageProvider,
},
client::{
BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo,
Expand Down Expand Up @@ -274,7 +274,7 @@ where
let mut op = ClientImportOperation {
op: self.backend.begin_operation()?,
notify_imported: None,
notify_finalized: Vec::new(),
notify_finalized: None,
};

let r = f(&mut op)?;
Expand Down Expand Up @@ -622,25 +622,25 @@ where
None
},
};
// Ensure parent chain is finalized to maintain invariant that
// finality is called sequentially. This will also send finality
// notifications for top 250 newly finalized blocks.
if finalized && parent_exists {
self.apply_finality_with_block_hash(
operation,
parent_hash,
None,
info.best_hash,
make_notifications,
)?;
}

operation.op.update_cache(new_cache);
storage_changes
},
None => None,
};

// Ensure parent chain is finalized to maintain invariant that finality is called
// sequentially.
if finalized && parent_exists {
davxy marked this conversation as resolved.
Show resolved Hide resolved
self.apply_finality_with_block_hash(
operation,
parent_hash,
None,
info.best_hash,
make_notifications,
davxy marked this conversation as resolved.
Show resolved Hide resolved
)?;
}

let is_new_best = !gap_block &&
(finalized ||
match fork_choice {
Expand Down Expand Up @@ -683,11 +683,36 @@ where

operation.op.insert_aux(aux)?;

// we only notify when we are already synced to the tip of the chain
// We only notify when we are already synced to the tip of the chain
// or if this import triggers a re-org
if make_notifications || tree_route.is_some() {
if finalized {
operation.notify_finalized.push(hash);
let mut summary = match operation.notify_finalized.take() {
Some(summary) => summary,
None => FinalizeSummary { finalized: Vec::new(), stale_heads: Vec::new() },
};
summary.finalized.push(hash);
bkchr marked this conversation as resolved.
Show resolved Hide resolved
if parent_exists {
// Add to the stale list all heads that are branching from parent besides our
// current `head`.
for head in self
.backend
.blockchain()
.leaves()?
.into_iter()
.filter(|h| *h != parent_hash)
{
let route_from_parent = sp_blockchain::tree_route(
self.backend.blockchain(),
parent_hash,
head,
)?;
if route_from_parent.retracted().is_empty() {
summary.stale_heads.push(head);
}
}
}
operation.notify_finalized = Some(summary);
}

operation.notify_imported = Some(ImportSummary {
Expand Down Expand Up @@ -831,58 +856,82 @@ where
operation.op.mark_finalized(BlockId::Hash(block), justification)?;

if notify {
// sometimes when syncing, tons of blocks can be finalized at once.
// we'll send notifications spuriously in that case.
const MAX_TO_NOTIFY: usize = 256;
let enacted = route_from_finalized.enacted();
let start = enacted.len() - std::cmp::min(enacted.len(), MAX_TO_NOTIFY);
for finalized in &enacted[start..] {
operation.notify_finalized.push(finalized.hash);
let finalized =
route_from_finalized.enacted().iter().map(|elem| elem.hash).collect::<Vec<_>>();

let last_finalized_number = self
.backend
.blockchain()
.number(last_finalized)?
.expect("Finalized block expected to be onchain; qed");
let mut stale_heads = Vec::new();
for head in self.backend.blockchain().leaves()? {
let route_from_finalized =
sp_blockchain::tree_route(self.backend.blockchain(), block, head)?;
let retracted = route_from_finalized.retracted();
let pivot = route_from_finalized.common_block();
// It is not guaranteed that `backend.blockchain().leaves()` doesn't return
// heads that were in a stale state before this finalization and thus already
// included in previous notifications. We want to skip such heads.
// Given the "route" from the currently finalized block to the head under
// analysis, the condition for it to be added to the new stale heads list is:
// `!retracted.is_empty() && last_finalized_number <= pivot.number`
// 1. "route" has some "retractions".
// 2. previously finalized block number is not greater than the "route" pivot:
// - if `last_finalized_number <= pivot.number` then this is a new stale head;
// - else the stale head was already included by some previous finalization.
if !retracted.is_empty() && last_finalized_number <= pivot.number {
stale_heads.push(head);
}
}
operation.notify_finalized = Some(FinalizeSummary { finalized, stale_heads });
}

Ok(())
}

fn notify_finalized(&self, notify_finalized: Vec<Block::Hash>) -> sp_blockchain::Result<()> {
fn notify_finalized(
&self,
notify_finalized: Option<FinalizeSummary<Block>>,
) -> sp_blockchain::Result<()> {
let mut sinks = self.finality_notification_sinks.lock();

if notify_finalized.is_empty() {
// cleanup any closed finality notification sinks
// since we won't be running the loop below which
// would also remove any closed sinks.
sinks.retain(|sink| !sink.is_closed());

return Ok(())
}
let mut notify_finalized = match notify_finalized {
Some(notify_finalized) => notify_finalized,
None => {
// Cleanup any closed finality notification sinks
// since we won't be running the loop below which
// would also remove any closed sinks.
sinks.retain(|sink| !sink.is_closed());
return Ok(())
},
};

// We assume the list is sorted and only want to inform the
// telemetry once about the finalized block.
if let Some(last) = notify_finalized.last() {
let header = self.header(&BlockId::Hash(*last))?.expect(
"Header already known to exist in DB because it is indicated in the tree route; \
qed",
);
let last = notify_finalized.finalized.pop().expect(
"At least one finalized block shall exist within a valid finalization summary; qed",
);

telemetry!(
self.telemetry;
SUBSTRATE_INFO;
"notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?last,
);
}
let header = self.header(&BlockId::Hash(last))?.expect(
"Header already known to exist in DB because it is indicated in the tree route; \
qed",
);

for finalized_hash in notify_finalized {
let header = self.header(&BlockId::Hash(finalized_hash))?.expect(
"Header already known to exist in DB because it is indicated in the tree route; \
qed",
);
telemetry!(
self.telemetry;
SUBSTRATE_INFO;
"notify.finalized";
"height" => format!("{}", header.number()),
"best" => ?last,
);

let notification = FinalityNotification { header, hash: finalized_hash };
let notification = FinalityNotification {
hash: last,
header,
tree_route: Arc::new(notify_finalized.finalized),
stale_heads: Arc::new(notify_finalized.stale_heads),
};

sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());
}
sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok());

Ok(())
}
Expand All @@ -901,7 +950,6 @@ where
// temporary leak of closed/discarded notification sinks (e.g.
// from consensus code).
self.import_notification_sinks.lock().retain(|sink| !sink.is_closed());

return Ok(())
},
};
Expand Down
25 changes: 3 additions & 22 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ mod client;
mod metrics;
mod task_manager;

use std::{collections::HashMap, io, net::SocketAddr, pin::Pin, task::Poll};
use std::{collections::HashMap, io, net::SocketAddr, pin::Pin};

use codec::{Decode, Encode};
use futures::{stream, Future, FutureExt, Stream, StreamExt};
use futures::{Future, FutureExt, StreamExt};
use log::{debug, error, warn};
use sc_network::PeerId;
use sc_utils::mpsc::TracingUnboundedReceiver;
Expand Down Expand Up @@ -152,26 +152,7 @@ async fn build_network_future<
let starting_block = client.info().best_number;

// Stream of finalized blocks reported by the client.
let mut finality_notification_stream = {
let mut finality_notification_stream = client.finality_notification_stream().fuse();

// We tweak the `Stream` in order to merge together multiple items if they happen to be
// ready. This way, we only get the latest finalized block.
stream::poll_fn(move |cx| {
let mut last = None;
while let Poll::Ready(Some(item)) =
Pin::new(&mut finality_notification_stream).poll_next(cx)
{
last = Some(item);
}
if let Some(last) = last {
Poll::Ready(Some(last))
} else {
Poll::Pending
}
})
.fuse()
};
let mut finality_notification_stream = client.finality_notification_stream().fuse();
davxy marked this conversation as resolved.
Show resolved Hide resolved

loop {
futures::select! {
Expand Down
Loading