Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Initial Commit of Retrospective OTB Verification #3372

Closed
Closed
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Even more efficient variant of `forwards_iter_block_roots` that will avoid cloning the head
/// state if it isn't required for the requested range of blocks.
/// The range [start_slot, end_slot] is inclusive (ie `start_slot <= end_slot`)
pub fn forwards_iter_block_roots_until(
&self,
start_slot: Slot,
Expand Down
8 changes: 6 additions & 2 deletions beacon_node/beacon_chain/src/execution_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//! So, this module contains functions that one might expect to find in other crates, but they live
//! here for good reason.

use crate::otb_verification_service::OptimisticTransitionBlock;
use crate::{
BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, BlockProductionError,
ExecutionPayloadError,
Expand Down Expand Up @@ -191,10 +192,13 @@ pub async fn validate_merge_block<'a, T: BeaconChainTypes>(
if is_optimistic_candidate_block(chain, block.slot(), block.parent_root()).await? {
debug!(
chain.log,
"Optimistically accepting terminal block";
"Optimistically Importing Merge Transition Block";
ethDreamer marked this conversation as resolved.
Show resolved Hide resolved
"block_hash" => ?execution_payload.parent_hash(),
"msg" => "the terminal block/parent was unavailable"
"msg" => "The terminal block/parent was unavailable"
);
// Store Optimistic Transition Block in Database for later Verification
OptimisticTransitionBlock::from_block::<T>(block)
.persist_in_store::<T, _>(&chain.store)?;
Ok(())
} else {
Err(ExecutionPayloadError::UnverifiedNonOptimisticCandidate.into())
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ mod observed_aggregates;
mod observed_attesters;
mod observed_block_producers;
pub mod observed_operations;
pub mod otb_verification_service;
mod persisted_beacon_chain;
mod persisted_fork_choice;
mod pre_finalization_cache;
Expand Down
317 changes: 317 additions & 0 deletions beacon_node/beacon_chain/src/otb_verification_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,317 @@
use crate::execution_payload::validate_merge_block;
use crate::{BeaconChain, BeaconChainError, BeaconChainTypes, BlockError, ExecutionPayloadError};
use proto_array::InvalidationOperation;
use slog::{crit, debug, error, info, warn};
use slot_clock::SlotClock;
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::per_block_processing::is_merge_transition_complete;
use std::sync::Arc;
use store::{DBColumn, Error as StoreError, HotColdDB, KeyValueStore, StoreItem};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio::time::sleep;
use tree_hash::TreeHash;
use types::{BeaconBlockRef, EthSpec, Hash256, Slot};
use DBColumn::OptimisticTransitionBlock as OTBColumn;

#[derive(Clone, Debug, Decode, Encode)]
pub struct OptimisticTransitionBlock {
root: Hash256,
slot: Slot,
}

impl OptimisticTransitionBlock {
// types::BeaconBlockRef<'_, <T as BeaconChainTypes>::EthSpec>
pub fn from_block<T: BeaconChainTypes>(block: BeaconBlockRef<T::EthSpec>) -> Self {
Self {
root: block.tree_hash_root(),
slot: block.slot(),
}
}

pub fn root(&self) -> &Hash256 {
&self.root
}

pub fn slot(&self) -> &Slot {
&self.slot
}

pub fn persist_in_store<T, A>(&self, store: A) -> Result<(), StoreError>
where
T: BeaconChainTypes,
A: AsRef<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
{
if store
.as_ref()
.item_exists::<OptimisticTransitionBlock>(&self.root)?
{
return Ok(());
} else {
store.as_ref().put_item(&self.root, self)
}
}

pub fn remove_from_store<T, A>(&self, store: A) -> Result<(), StoreError>
where
T: BeaconChainTypes,
A: AsRef<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>,
{
store
.as_ref()
.hot_db
.key_delete(OTBColumn.into(), self.root.as_bytes())
}

fn is_canonical<T: BeaconChainTypes>(
&self,
chain: &BeaconChain<T>,
) -> Result<bool, BeaconChainError> {
Ok(chain
.forwards_iter_block_roots_until(self.slot, self.slot)?
.next()
.transpose()?
.map(|(root, _)| root)
== Some(self.root))
}
}

impl StoreItem for OptimisticTransitionBlock {
fn db_column() -> DBColumn {
OTBColumn
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, StoreError> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

/// At 12s slot times, the means that the payload preparation routine will run 4s before the start
ethDreamer marked this conversation as resolved.
Show resolved Hide resolved
/// of each slot (`12 / 3 = 4`).
pub const EPOCH_DELAY_FACTOR: u32 = 4;

/// Spawns a routine which checks the validity of any optimistically imported transition blocks
///
/// This routine will run once per epoch, at `epoch_duration / EPOCH_DELAY_FACTOR` after
/// the start of each epoch.
///
/// The service will not be started if there is no `execution_layer` on the `chain`.
pub fn start_otb_verification_service<T: BeaconChainTypes>(
executor: TaskExecutor,
chain: Arc<BeaconChain<T>>,
) {
// Avoid spawning the service if there's no EL, it'll just error anyway.
if chain.execution_layer.is_some() {
executor.clone().spawn(
async move { otb_verification_service(chain).await },
"otb_verification_service",
);
}
}

fn load_optimistic_transition_blocks<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
) -> Result<Vec<OptimisticTransitionBlock>, StoreError> {
chain
.store
.hot_db
.iter_column(OTBColumn)
.collect::<Result<Vec<_>, _>>()?
ethDreamer marked this conversation as resolved.
Show resolved Hide resolved
.into_iter()
.map(|(_, bytes)| OptimisticTransitionBlock::from_store_bytes(&bytes))
.collect()
}

/// Loop indefinitely, calling `BeaconChain::prepare_beacon_proposer_async` at an interval.
async fn otb_verification_service<T: BeaconChainTypes>(chain: Arc<BeaconChain<T>>) {
let epoch_duration = chain.slot_clock.slot_duration() * T::EthSpec::slots_per_epoch() as u32;

loop {
match chain
.slot_clock
.duration_to_next_epoch(T::EthSpec::slots_per_epoch())
{
Some(duration) => {
let additional_delay = epoch_duration / EPOCH_DELAY_FACTOR;
sleep(duration + additional_delay).await;

debug!(
chain.log,
"OTB Verification Service Firing";
);

if !is_merge_transition_complete(
&chain.canonical_head.cached_head().snapshot.beacon_state,
) {
// We are pre-merge. Nothing to do yet.
continue;
}

let finalized_slot = match chain
.canonical_head
.fork_choice_read_lock()
.get_finalized_block()
{
Ok(block) => block.slot,
Err(e) => {
warn!(chain.log, "Error Retrieving Finalized Slot: {:?}", e);
continue;
}
};

// load all optimistically imported transition blocks from the database
// and separate them into non-canonical, finalized canonical, and
// unfinalized canonical
let mut non_canonical_otbs = vec![];
let (finalized_canonical_otbs, unfinalized_canonical_otbs) =
match load_optimistic_transition_blocks(chain.as_ref()) {
Ok(blocks) => {
if blocks.is_empty() {
// there are no optimistic blocks in the database, we can exit
// the service since the merge transition is completed
break;
}

blocks
.into_iter()
.filter_map(|otb| match otb.is_canonical(chain.as_ref()) {
Ok(true) => Some(otb),
Ok(false) => {
non_canonical_otbs.push(otb);
None
}
Err(e) => {
warn!(
chain.log,
"Error Iterating Over Canonical Blocks: {:?}", e
);
None
}
})
.partition::<Vec<_>, _>(|otb| *otb.slot() <= finalized_slot)
}
Err(e) => {
warn!(
chain.log,
"Error Loading Optimistic Transition Blocks: {:?}", e
);
continue;
}
};

// remove non-canonical blocks that conflict with finalized checkpoint from the database
for otb in non_canonical_otbs {
if *otb.slot() <= finalized_slot {
if let Err(e) = otb.remove_from_store::<T, _>(&chain.store) {
warn!(
chain.log,
"Error Removing Optimistic Transition Block from Database: {:?}", e
);
}
}
}

// ensure finalized canonical otb are valid, otherwise kill client
for otb in finalized_canonical_otbs {
match chain.store.get_full_block(otb.root()) {
Ok(Some(block)) => {
match validate_merge_block(&chain, block.message()).await {
Ok(()) => {
// merge transition block is valid, remove it from OTB
if let Err(e) = otb.remove_from_store::<T, _>(&chain.store) {
warn!(chain.log, "Error Removing Optimistic Transition Block from Database: {:?}", e);
} else {
info!(chain.log, "Validated Merge Transition Block");
}
}
Err(BlockError::ExecutionPayloadError(
ExecutionPayloadError::InvalidTerminalPoWBlock { .. },
)) => {
// Finalized Merge Transition Block is Invalid! Kill the Client!
crit!(
chain.log,
"Finalized Merge Transition Block is Invalid!";
"msg" => "You must use the `--purge-db` flag to clear the database and restart sync. \
You may be on a hostile network.",
"block_hash" => ?block.canonical_root()
);
let mut shutdown_sender = chain.shutdown_sender();
if let Err(e) =
shutdown_sender.try_send(ShutdownReason::Failure(
"Finalized Merge Transition Block is Invalid",
))
{
crit!(chain.log, "Failed to shut down client: {:?}", e);
}
}
Err(_) => {}
}
}
Ok(None) => warn!(
chain.log,
"No Block Found for Finalized Optimistic Transition Block: {:?}", otb
),
Err(e) => {
warn!(chain.log, "Error Loading Full Block from Database: {:?}", e)
}
}
}

// attempt to validate any non-finalized canonical otb blocks
for otb in unfinalized_canonical_otbs {
match chain.store.get_full_block(otb.root()) {
Ok(Some(block)) => {
match validate_merge_block(&chain, block.message()).await {
Ok(()) => {
// merge transition block is valid, remove it from OTB
if let Err(e) = otb.remove_from_store::<T, _>(&chain.store) {
warn!(chain.log, "Error Removing Optimistic Transition Block from Database: {:?}", e);
} else {
info!(chain.log, "Validated Merge Transition Block");
}
}
Err(BlockError::ExecutionPayloadError(
ExecutionPayloadError::InvalidTerminalPoWBlock { .. },
)) => {
// Unfinalized Merge Transition Block is Invalid -> Run process_invalid_execution_payload
warn!(chain.log, "Merge Transition Block Invalid: {:?}", otb);
if let Err(e) = chain
.process_invalid_execution_payload(
&InvalidationOperation::InvalidateOne {
block_root: *otb.root(),
},
)
.await
{
warn!(chain.log, "Error During process_invalid_execution_payload for Invalid Merge Transition Block: {:?}", e);
}
}
Err(_) => {}
}
}
Ok(None) => warn!(
chain.log,
"No Block Found for UnFinalized Optimistic Transition Block: {:?}", otb
),
Err(e) => {
warn!(chain.log, "Error Loading Full Block from Database: {:?}", e)
}
}
}

continue;
}
None => {
error!(chain.log, "Failed to read slot clock");
// If we can't read the slot clock, just wait another epoch.
sleep(epoch_duration).await;
continue;
}
};
}
debug!(chain.log, "No Optimistic Transition Blocks in Database"; "msg" => "Shutting down OTB Verification Service");
}
2 changes: 2 additions & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::config::{ClientGenesis, Config as ClientConfig};
use crate::notifier::spawn_notifier;
use crate::Client;
use beacon_chain::otb_verification_service::start_otb_verification_service;
use beacon_chain::proposer_prep_service::start_proposer_prep_service;
use beacon_chain::schema_change::migrate_schema;
use beacon_chain::{
Expand Down Expand Up @@ -728,6 +729,7 @@ where
}

start_proposer_prep_service(runtime_context.executor.clone(), beacon_chain.clone());
start_otb_verification_service(runtime_context.executor.clone(), beacon_chain.clone());
}

Ok(Client {
Expand Down
3 changes: 3 additions & 0 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ pub enum DBColumn {
BeaconRandaoMixes,
#[strum(serialize = "dht")]
DhtEnrs,
/// For Optimistically Imported Merge Transition Blocks
#[strum(serialize = "otb")]
OptimisticTransitionBlock,
}

/// A block from the database, which might have an execution payload or not.
Expand Down