Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tidy hotfix for inconsistent head #1638

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 14 additions & 28 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use crate::observed_attestations::{Error as AttestationObservationError, Observe
use crate::observed_attesters::{ObservedAggregators, ObservedAttesters};
use crate::observed_block_producers::ObservedBlockProducers;
use crate::observed_operations::{ObservationOutcome, ObservedOperations};
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::persisted_beacon_chain::{PersistedBeaconChain, PersistedForkChoice};
use crate::shuffling_cache::ShufflingCache;
use crate::snapshot_cache::SnapshotCache;
use crate::timeout_rw_lock::TimeoutRwLock;
Expand Down Expand Up @@ -70,7 +69,6 @@ pub const VALIDATOR_PUBKEY_CACHE_LOCK_TIMEOUT: Duration = Duration::from_secs(1)
pub const BEACON_CHAIN_DB_KEY: [u8; 32] = [0; 32];
pub const OP_POOL_DB_KEY: [u8; 32] = [0; 32];
pub const ETH1_CACHE_DB_KEY: [u8; 32] = [0; 32];
pub const FORK_CHOICE_DB_KEY: [u8; 32] = [0; 32];

/// The result of a chain segment processing.
pub enum ChainSegmentResult<T: EthSpec> {
Expand Down Expand Up @@ -242,39 +240,27 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// We want to ensure that the head never out dates the fork choice to avoid having references
/// to blocks that do not exist in fork choice.
pub fn persist_head_and_fork_choice(&self) -> Result<(), Error> {
let canonical_head_block_root = self
.canonical_head
.try_read_for(HEAD_LOCK_TIMEOUT)
.ok_or_else(|| Error::CanonicalHeadLockTimeout)?
.beacon_block_root;
let fork_choice = self.fork_choice.read();

let persisted_fork_choice = PersistedForkChoice {
fork_choice: fork_choice.to_persisted(),
fork_choice_store: fork_choice.fc_store().to_persisted(),
};

let persisted_head = PersistedBeaconChain {
canonical_head_block_root,
genesis_time: self.slot_clock.genesis_duration().as_secs(),
genesis_block_root: self.genesis_block_root,
ssz_head_tracker: self.head_tracker.to_ssz_container(),
persisted_fork_choice,
};

let fork_choice_timer = metrics::start_timer(&metrics::PERSIST_FORK_CHOICE);

let fork_choice = self.fork_choice.read();

self.store.put_item(
&Hash256::from_slice(&FORK_CHOICE_DB_KEY),
&PersistedForkChoice {
fork_choice: fork_choice.to_persisted(),
fork_choice_store: fork_choice.fc_store().to_persisted(),
},
)?;

drop(fork_choice);

metrics::stop_timer(fork_choice_timer);
let head_timer = metrics::start_timer(&metrics::PERSIST_HEAD);

self.store
.put_item(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY), &persisted_head)?;

metrics::stop_timer(head_timer);
{
let _timer = metrics::start_timer(&metrics::PERSIST_HEAD);
self.store
.put_item(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY), &persisted_head)?;
}

Ok(())
}
Expand Down
23 changes: 7 additions & 16 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,23 +636,14 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
parent: BeaconSnapshot<T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<Self, BlockError<T::EthSpec>> {
// Reject any block if its parent is not known to fork choice.
//
// A block that is not in fork choice is either:
// Do not process a block that doesn't descend from the finalized root.
//
// - Not yet imported: we should reject this block because we should only import a child
// after its parent has been fully imported.
// - Pre-finalized: if the parent block is _prior_ to finalization, we should ignore it
// because it will revert finalization. Note that the finalized block is stored in fork
// choice, so we will not reject any child of the finalized block (this is relevant during
// genesis).
if !chain
.fork_choice
.read()
.contains_block(&block.parent_root())
{
return Err(BlockError::ParentUnknown(Box::new(block)));
}
// We check this *before* we load the parent so that we can return a more detailed error.
let block = check_block_is_finalized_descendant::<T, _>(
block,
&chain.fork_choice.read(),
&chain.store,
)?;

// Reject any block that exceeds our limit on skipped slots.
check_block_skip_slots(chain, &parent.beacon_block.message, &block.message)?;
Expand Down
194 changes: 119 additions & 75 deletions beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::beacon_chain::{
BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY,
};
use crate::beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, OP_POOL_DB_KEY};
use crate::eth1_chain::{CachingEth1Backend, SszEth1};
use crate::events::NullEventHandler;
use crate::head_tracker::HeadTracker;
use crate::migrate::Migrate;
use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::persisted_fork_choice::PersistedForkChoice;
use crate::shuffling_cache::ShufflingCache;
use crate::snapshot_cache::{SnapshotCache, DEFAULT_SNAPSHOT_CACHE_SIZE};
use crate::timeout_rw_lock::TimeoutRwLock;
Expand All @@ -26,6 +23,7 @@ use std::marker::PhantomData;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use store::{HotColdDB, ItemStore};
use types::{
BeaconBlock, BeaconState, ChainSpec, EthSpec, Graffiti, Hash256, Signature, SignedBeaconBlock,
Expand Down Expand Up @@ -97,11 +95,12 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
#[allow(clippy::type_complexity)]
store: Option<Arc<HotColdDB<T::EthSpec, T::HotStore, T::ColdStore>>>,
store_migrator: Option<T::StoreMigrator>,
canonical_head: Option<BeaconSnapshot<T::EthSpec>>,
/// The finalized checkpoint to anchor the chain. May be genesis or a higher
/// checkpoint.
pub finalized_snapshot: Option<BeaconSnapshot<T::EthSpec>>,
pub canonical_head: Option<BeaconSnapshot<T::EthSpec>>,
genesis_block_root: Option<Hash256>,
#[allow(clippy::type_complexity)]
fork_choice: Option<
ForkChoice<BeaconForkChoiceStore<T::EthSpec, T::HotStore, T::ColdStore>, T::EthSpec>,
>,
op_pool: Option<OperationPool<T::EthSpec>>,
eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec>>,
event_handler: Option<T::EventHandler>,
Expand Down Expand Up @@ -147,8 +146,8 @@ where
store: None,
store_migrator: None,
canonical_head: None,
finalized_snapshot: None,
genesis_block_root: None,
fork_choice: None,
op_pool: None,
eth1_chain: None,
event_handler: None,
Expand Down Expand Up @@ -270,6 +269,10 @@ where
.clone()
.ok_or_else(|| "resume_from_db requires a store.".to_string())?;

/*
* Persisted beacon chain.
*/

let chain = store
.get_item::<PersistedBeaconChain>(&Hash256::from_slice(&BEACON_CHAIN_DB_KEY))
.map_err(|e| format!("DB error when reading persisted beacon chain: {:?}", e))?
Expand All @@ -279,12 +282,43 @@ where
})?;

self.genesis_block_root = Some(chain.genesis_block_root);

/*
* Head tracker
*/

self.head_tracker = Some(
HeadTracker::from_ssz_container(&chain.ssz_head_tracker)
.map_err(|e| format!("Failed to decode head tracker for database: {:?}", e))?,
);

let head_block_root = chain.canonical_head_block_root;
/*
* Fork choice
*/

let fc_store = BeaconForkChoiceStore::from_persisted(
chain.persisted_fork_choice.fork_choice_store,
store.clone(),
)
.map_err(|e| format!("Unable to load ForkChoiceStore: {:?}", e))?;

let mut fork_choice =
ForkChoice::from_persisted(chain.persisted_fork_choice.fork_choice, fc_store)
.map_err(|e| format!("Unable to parse persisted fork choice from disk: {:?}", e))?;

let current_slot = Slot::new(chain.genesis_time * 1_000 / self.spec.milliseconds_per_slot)
+ self.spec.genesis_slot;

let head_block_root = fork_choice
.get_head(current_slot)
.map_err(|e| format!("Unable to find head block: {:?}", e))?;

self.fork_choice = Some(fork_choice);

/*
* Canonical head
*/

let head_block = store
.get_item::<SignedBeaconBlock<TEthSpec>>(&head_block_root)
.map_err(|e| format!("DB error when reading head block: {:?}", e))?
Expand All @@ -295,6 +329,17 @@ where
.map_err(|e| format!("DB error when reading head state: {:?}", e))?
.ok_or_else(|| "Head state not found in store".to_string())?;

self.canonical_head = Some(BeaconSnapshot {
beacon_block_root: head_block_root,
beacon_block: head_block,
beacon_state_root: head_state_root,
beacon_state: head_state,
});

/*
* Operations pool
*/

self.op_pool = Some(
store
.get_item::<PersistedOperationPool<TEthSpec>>(&Hash256::from_slice(&OP_POOL_DB_KEY))
Expand All @@ -303,30 +348,9 @@ where
.unwrap_or_else(OperationPool::new),
);

let finalized_block_root = head_state.finalized_checkpoint.root;
let finalized_block = store
.get_item::<SignedBeaconBlock<TEthSpec>>(&finalized_block_root)
.map_err(|e| format!("DB error when reading finalized block: {:?}", e))?
.ok_or_else(|| "Finalized block not found in store".to_string())?;
let finalized_state_root = finalized_block.state_root();
let finalized_state = store
.get_state(&finalized_state_root, Some(finalized_block.slot()))
.map_err(|e| format!("DB error when reading finalized state: {:?}", e))?
.ok_or_else(|| "Finalized state not found in store".to_string())?;

self.finalized_snapshot = Some(BeaconSnapshot {
beacon_block_root: finalized_block_root,
beacon_block: finalized_block,
beacon_state_root: finalized_state_root,
beacon_state: finalized_state,
});

self.canonical_head = Some(BeaconSnapshot {
beacon_block_root: head_block_root,
beacon_block: head_block,
beacon_state_root: head_state_root,
beacon_state: head_state,
});
/*
* Validator public key cache
*/

let pubkey_cache = ValidatorPubkeyCache::load_from_file(pubkey_cache_path)
.map_err(|e| format!("Unable to open persisted pubkey cache: {:?}", e))?;
Expand Down Expand Up @@ -374,12 +398,31 @@ where
)
})?;

self.finalized_snapshot = Some(BeaconSnapshot {
let head = BeaconSnapshot {
beacon_block_root,
beacon_block,
beacon_state_root,
beacon_state,
});
};

let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &head);

self.fork_choice = Some(
ForkChoice::from_genesis(fc_store, &head.beacon_block.message)
.map_err(|e| format!("Unable to build initialize ForkChoice: {:?}", e))?,
);

let pubkey_cache_path = self
.pubkey_cache_path
.as_ref()
.ok_or_else(|| "Cannot build without a pubkey cache path".to_string())?;

self.validator_pubkey_cache = Some(
ValidatorPubkeyCache::new(&head.beacon_state, pubkey_cache_path)
.map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e))?,
);

self.canonical_head = Some(head);

Ok(self.empty_op_pool())
}
Expand Down Expand Up @@ -457,53 +500,54 @@ where
.store
.clone()
.ok_or_else(|| "Cannot build without a store.".to_string())?;

// If this beacon chain is being loaded from disk, use the stored head. Otherwise, just use
// the finalized checkpoint (which is probably genesis).
let mut canonical_head = if let Some(head) = self.canonical_head {
head
} else {
self.finalized_snapshot
.ok_or_else(|| "Cannot build without a state".to_string())?
};
let validator_pubkey_cache = self
.validator_pubkey_cache
.ok_or_else(|| "Cannot build without a validator pubkey cache.".to_string())?;
let fork_choice = self
.fork_choice
.ok_or_else(|| "Cannot build without a fork choice.".to_string())?;
let mut canonical_head = self
.canonical_head
.ok_or_else(|| "Cannot build without a canonical head.".to_string())?;

info!(log, "Building head state caches");
let start = Instant::now();

canonical_head
.beacon_state
.build_all_caches(&self.spec)
.map_err(|e| format!("Failed to build state caches: {:?}", e))?;

info!(
log,
"Built head state caches";
"elapsed" => format!("{:?}", Instant::now().duration_since(start))
);

if canonical_head.beacon_block.state_root() != canonical_head.beacon_state_root {
return Err("beacon_block.state_root != beacon_state".to_string());
}

let pubkey_cache_path = self
.pubkey_cache_path
.ok_or_else(|| "Cannot build without a pubkey cache path".to_string())?;

let validator_pubkey_cache = self.validator_pubkey_cache.map(Ok).unwrap_or_else(|| {
ValidatorPubkeyCache::new(&canonical_head.beacon_state, pubkey_cache_path)
.map_err(|e| format!("Unable to init validator pubkey cache: {:?}", e))
})?;

let persisted_fork_choice = store
.get_item::<PersistedForkChoice>(&Hash256::from_slice(&FORK_CHOICE_DB_KEY))
.map_err(|e| format!("DB error when reading persisted fork choice: {:?}", e))?;

let fork_choice = if let Some(persisted) = persisted_fork_choice {
let fc_store =
BeaconForkChoiceStore::from_persisted(persisted.fork_choice_store, store.clone())
.map_err(|e| format!("Unable to load ForkChoiceStore: {:?}", e))?;

ForkChoice::from_persisted(persisted.fork_choice, fc_store)
.map_err(|e| format!("Unable to parse persisted fork choice from disk: {:?}", e))?
} else {
let genesis = &canonical_head;

let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store.clone(), genesis);

ForkChoice::from_genesis(fc_store, &genesis.beacon_block.message)
.map_err(|e| format!("Unable to build initialize ForkChoice: {:?}", e))?
};
// Perform a check to ensure that the finalization points of the head and fork choice are
// consistent.
//
// This is a sanity check to detect database corruption.
let fc_finalized = fork_choice.finalized_checkpoint();
let head_finalized = canonical_head.beacon_state.finalized_checkpoint;
if fc_finalized != head_finalized {
if head_finalized.root == Hash256::zero()
&& head_finalized.epoch == fc_finalized.epoch
&& fc_finalized.root == canonical_head.beacon_block_root
{
// This is a legal edge-case encountered during genesis.
} else {
return Err(format!(
"Database corrupt: fork choice is finalized at {:?} whilst head is finalized at \
{:?}",
fc_finalized, head_finalized
));
}
}

let beacon_chain = BeaconChain {
spec: self.spec,
Expand Down Expand Up @@ -634,9 +678,9 @@ where
/// Requires the state to be initialized.
pub fn testing_slot_clock(self, slot_duration: Duration) -> Result<Self, String> {
let genesis_time = self
.finalized_snapshot
.canonical_head
.as_ref()
.ok_or_else(|| "testing_slot_clock requires an initialized state")?
.ok_or_else(|| "testing_slot_clock requires a canonical head")?
.beacon_state
.genesis_time;

Expand Down
Loading