Skip to content

Commit

Permalink
Reorg events (#2090)
Browse files Browse the repository at this point in the history
## Issue Addressed

Resolves #2088

## Proposed Changes

Add the `chain_reorg` SSE event topic

## Additional Info


Co-authored-by: realbigsean <[email protected]>
Co-authored-by: Paul Hauner <[email protected]>
  • Loading branch information
3 people committed Jun 17, 2021
1 parent 3261eff commit b1657a6
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 9 deletions.
109 changes: 102 additions & 7 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::validator_pubkey_cache::ValidatorPubkeyCache;
use crate::BeaconForkChoiceStore;
use crate::BeaconSnapshot;
use crate::{metrics, BeaconChainError};
use eth2::types::{EventKind, SseBlock, SseFinalizedCheckpoint, SseHead};
use eth2::types::{EventKind, SseBlock, SseChainReorg, SseFinalizedCheckpoint, SseHead};
use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender;
use itertools::process_results;
Expand Down Expand Up @@ -455,6 +455,77 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map(|result| result.map_err(|e| e.into())))
}

/// Iterate through the current chain to find the slot intersecting with the given beacon state.
/// The maximum depth this will search is `SLOTS_PER_HISTORICAL_ROOT`, and if that depth is reached
/// and no intersection is found, the finalized slot will be returned.
pub fn find_reorg_slot(
&self,
new_state: &BeaconState<T::EthSpec>,
new_block_root: Hash256,
) -> Result<Slot, Error> {
self.with_head(|snapshot| {
let old_state = &snapshot.beacon_state;
let old_block_root = snapshot.beacon_block_root;

// The earliest slot for which the two chains may have a common history.
let lowest_slot = std::cmp::min(new_state.slot, old_state.slot);

// Create an iterator across `$state`, assuming that the block at `$state.slot` has the
// block root of `$block_root`.
//
// The iterator will be skipped until the next value returns `lowest_slot`.
//
// This is a macro instead of a function or closure due to the complex types invloved
// in all the iterator wrapping.
macro_rules! aligned_roots_iter {
($state: ident, $block_root: ident) => {
std::iter::once(Ok(($state.slot, $block_root)))
.chain($state.rev_iter_block_roots(&self.spec))
.skip_while(|result| {
result
.as_ref()
.map_or(false, |(slot, _)| *slot > lowest_slot)
})
};
}

// Create iterators across old/new roots where iterators both start at the same slot.
let mut new_roots = aligned_roots_iter!(new_state, new_block_root);
let mut old_roots = aligned_roots_iter!(old_state, old_block_root);

// Whilst *both* of the iterators are still returning values, try and find a common
// ancestor between them.
while let (Some(old), Some(new)) = (old_roots.next(), new_roots.next()) {
let (old_slot, old_root) = old?;
let (new_slot, new_root) = new?;

// Sanity check to detect programming errors.
if old_slot != new_slot {
return Err(Error::InvalidReorgSlotIter { new_slot, old_slot });
}

if old_root == new_root {
// A common ancestor has been found.
return Ok(old_slot);
}
}

// If no common ancestor is found, declare that the re-org happened at the previous
// finalized slot.
//
// Sometimes this will result in the return slot being *lower* than the actual reorg
// slot. However, assuming we don't re-org through a finalized slot, it will never be
// *higher*.
//
// We provide this potentially-inaccurate-but-safe information to avoid onerous
// database reads during times of deep reorgs.
Ok(old_state
.finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch()))
})
}

/// Iterates across all `(state_root, slot)` pairs from the head of the chain (inclusive) to
/// the earliest reachable ancestor (may or may not be genesis).
///
Expand Down Expand Up @@ -2270,14 +2341,25 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Note: this will declare a re-org if we skip `SLOTS_PER_HISTORICAL_ROOT` blocks
// between calls to fork choice without swapping between chains. This seems like an
// extreme-enough scenario that a warning is fine.
let is_reorg = current_head.block_root
!= new_head
.beacon_state
.get_block_root(current_head.slot)
.map(|root| *root)
.unwrap_or_else(|_| Hash256::random());
let is_reorg = new_head
.beacon_state
.get_block_root(current_head.slot)
.map_or(true, |root| *root != current_head.block_root);

let mut reorg_distance = Slot::new(0);

if is_reorg {
match self.find_reorg_slot(&new_head.beacon_state, new_head.beacon_block_root) {
Ok(slot) => reorg_distance = current_head.slot.saturating_sub(slot),
Err(e) => {
warn!(
self.log,
"Could not find re-org depth";
"error" => format!("{:?}", e),
);
}
}

metrics::inc_counter(&metrics::FORK_CHOICE_REORG_COUNT);
warn!(
self.log,
Expand All @@ -2287,6 +2369,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"new_head_parent" => %new_head.beacon_block.parent_root(),
"new_head" => %beacon_block_root,
"new_slot" => new_head.beacon_block.slot(),
"reorg_distance" => reorg_distance,
);
} else {
debug!(
Expand Down Expand Up @@ -2452,6 +2535,18 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
}

if is_reorg && event_handler.has_reorg_subscribers() {
event_handler.register(EventKind::ChainReorg(SseChainReorg {
slot: head_slot,
depth: reorg_distance.as_u64(),
old_head_block: current_head.block_root,
old_head_state: current_head.state_root,
new_head_block: beacon_block_root,
new_head_state: state_root,
epoch: head_slot.epoch(T::EthSpec::slots_per_epoch()),
}));
}
}

Ok(())
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ pub enum BeaconChainError {
request_slot: Slot,
slot: Slot,
},
InvalidReorgSlotIter {
old_slot: Slot,
new_slot: Slot,
},
}

easy_from_to!(SlotProcessingError, BeaconChainError);
Expand Down
15 changes: 15 additions & 0 deletions beacon_node/beacon_chain/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub struct ServerSentEventHandler<T: EthSpec> {
finalized_tx: Sender<EventKind<T>>,
head_tx: Sender<EventKind<T>>,
exit_tx: Sender<EventKind<T>>,
chain_reorg: Sender<EventKind<T>>,
log: Logger,
}

Expand All @@ -22,13 +23,15 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (finalized_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (head_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (exit_tx, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);
let (chain_reorg, _) = broadcast::channel(DEFAULT_CHANNEL_CAPACITY);

Self {
attestation_tx,
block_tx,
finalized_tx,
head_tx,
exit_tx,
chain_reorg,
log,
}
}
Expand All @@ -39,13 +42,15 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
let (finalized_tx, _) = broadcast::channel(capacity);
let (head_tx, _) = broadcast::channel(capacity);
let (exit_tx, _) = broadcast::channel(capacity);
let (chain_reorg, _) = broadcast::channel(capacity);

Self {
attestation_tx,
block_tx,
finalized_tx,
head_tx,
exit_tx,
chain_reorg,
log,
}
}
Expand All @@ -65,6 +70,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
.map(|count| trace!(self.log, "Registering server-sent head event"; "receiver_count" => count)),
EventKind::VoluntaryExit(exit) => self.exit_tx.send(EventKind::VoluntaryExit(exit))
.map(|count| trace!(self.log, "Registering server-sent voluntary exit event"; "receiver_count" => count)),
EventKind::ChainReorg(reorg) => self.chain_reorg.send(EventKind::ChainReorg(reorg))
.map(|count| trace!(self.log, "Registering server-sent chain reorg event"; "receiver_count" => count)),
};
if let Err(SendError(event)) = result {
trace!(self.log, "No receivers registered to listen for event"; "event" => ?event);
Expand All @@ -91,6 +98,10 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
self.exit_tx.subscribe()
}

pub fn subscribe_reorgs(&self) -> Receiver<EventKind<T>> {
self.chain_reorg.subscribe()
}

pub fn has_attestation_subscribers(&self) -> bool {
self.attestation_tx.receiver_count() > 0
}
Expand All @@ -110,4 +121,8 @@ impl<T: EthSpec> ServerSentEventHandler<T> {
pub fn has_exit_subscribers(&self) -> bool {
self.exit_tx.receiver_count() > 0
}

pub fn has_reorg_subscribers(&self) -> bool {
self.chain_reorg.receiver_count() > 0
}
}
67 changes: 66 additions & 1 deletion beacon_node/beacon_chain/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use beacon_chain::{
AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType,
OP_POOL_DB_KEY,
},
WhenSlotSkipped,
StateSkipConfig, WhenSlotSkipped,
};
use operation_pool::PersistedOperationPool;
use state_processing::{
Expand Down Expand Up @@ -139,6 +139,71 @@ fn iterators() {
);
}

#[test]
fn find_reorgs() {
let num_blocks_produced = MinimalEthSpec::slots_per_historical_root() + 1;

let harness = get_harness(VALIDATOR_COUNT);

harness.extend_chain(
num_blocks_produced as usize,
BlockStrategy::OnCanonicalHead,
// No need to produce attestations for this test.
AttestationStrategy::SomeValidators(vec![]),
);

let head_state = harness.chain.head_beacon_state().unwrap();
let head_slot = head_state.slot;
let genesis_state = harness
.chain
.state_at_slot(Slot::new(0), StateSkipConfig::WithStateRoots)
.unwrap();

// because genesis is more than `SLOTS_PER_HISTORICAL_ROOT` away, this should return with the
// finalized slot.
assert_eq!(
harness
.chain
.find_reorg_slot(&genesis_state, harness.chain.genesis_block_root)
.unwrap(),
head_state
.finalized_checkpoint
.epoch
.start_slot(MinimalEthSpec::slots_per_epoch())
);

// test head
assert_eq!(
harness
.chain
.find_reorg_slot(
&head_state,
harness.chain.head_beacon_block().unwrap().canonical_root()
)
.unwrap(),
head_slot
);

// Re-org back to the slot prior to the head.
let prev_slot = head_slot - Slot::new(1);
let prev_state = harness
.chain
.state_at_slot(prev_slot, StateSkipConfig::WithStateRoots)
.unwrap();
let prev_block_root = harness
.chain
.block_root_at_slot(prev_slot, WhenSlotSkipped::None)
.unwrap()
.unwrap();
assert_eq!(
harness
.chain
.find_reorg_slot(&prev_state, prev_block_root)
.unwrap(),
prev_slot
);
}

#[test]
fn chooses_fork() {
let harness = get_harness(VALIDATOR_COUNT);
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2153,6 +2153,9 @@ pub fn serve<T: BeaconChainTypes>(
api_types::EventTopic::FinalizedCheckpoint => {
event_handler.subscribe_finalized()
}
api_types::EventTopic::ChainReorg => {
event_handler.subscribe_reorgs()
}
};

receivers.push(BroadcastStream::new(receiver).map(|msg| {
Expand Down
41 changes: 41 additions & 0 deletions beacon_node/http_api/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ struct ApiTester {
chain: Arc<BeaconChain<EphemeralHarnessType<E>>>,
client: BeaconNodeHttpClient,
next_block: SignedBeaconBlock<E>,
reorg_block: SignedBeaconBlock<E>,
attestations: Vec<Attestation<E>>,
attester_slashing: AttesterSlashing<E>,
proposer_slashing: ProposerSlashing,
Expand Down Expand Up @@ -105,6 +106,10 @@ impl ApiTester {
let (next_block, _next_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());

// `make_block` adds random graffiti, so this will produce an alternate block
let (reorg_block, _reorg_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());

let head_state_root = head.beacon_state_root();
let attestations = harness
.get_unaggregated_attestations(
Expand Down Expand Up @@ -213,6 +218,7 @@ impl ApiTester {
chain,
client,
next_block,
reorg_block,
attestations,
attester_slashing,
proposer_slashing,
Expand All @@ -238,6 +244,10 @@ impl ApiTester {
let (next_block, _next_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());

// `make_block` adds random graffiti, so this will produce an alternate block
let (reorg_block, _reorg_state) =
harness.make_block(head.beacon_state.clone(), harness.chain.slot().unwrap());

let head_state_root = head.beacon_state_root();
let attestations = harness
.get_unaggregated_attestations(
Expand Down Expand Up @@ -320,6 +330,7 @@ impl ApiTester {
chain,
client,
next_block,
reorg_block,
attestations,
attester_slashing,
proposer_slashing,
Expand Down Expand Up @@ -2233,6 +2244,36 @@ impl ApiTester {
&[expected_block, expected_finalized, expected_head]
);

// Test a reorg event
let mut chain_reorg_event_future = self
.client
.get_events::<E>(&[EventTopic::ChainReorg])
.await
.unwrap();

let expected_reorg = EventKind::ChainReorg(SseChainReorg {
slot: self.next_block.slot(),
depth: 1,
old_head_block: self.next_block.canonical_root(),
old_head_state: self.next_block.state_root(),
new_head_block: self.reorg_block.canonical_root(),
new_head_state: self.reorg_block.state_root(),
epoch: self.next_block.slot().epoch(E::slots_per_epoch()),
});

self.client
.post_beacon_blocks(&self.reorg_block)
.await
.unwrap();

let reorg_event = poll_events(
&mut chain_reorg_event_future,
1,
Duration::from_millis(10000),
)
.await;
assert_eq!(reorg_event.as_slice(), &[expected_reorg]);

self
}

Expand Down
Loading

0 comments on commit b1657a6

Please sign in to comment.