Skip to content

Commit

Permalink
Add forkchoice lock
Browse files Browse the repository at this point in the history
  • Loading branch information
paulhauner committed Feb 28, 2022
1 parent ce2417e commit f851f4b
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 105 deletions.
164 changes: 85 additions & 79 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3464,13 +3464,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.attester_shuffling_decision_root(self.genesis_block_root, RelativeEpoch::Current);

// Used later for the execution engine.
let new_head_execution_block_hash_opt = new_head
.beacon_block
.message()
.body()
.execution_payload()
.ok()
.map(|ep| ep.block_hash);
let is_merge_transition_complete = is_merge_transition_complete(&new_head.beacon_state);

drop(lag_timer);
Expand Down Expand Up @@ -3680,41 +3673,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

// If this is a post-merge block, update the execution layer.
if let Some(new_head_execution_block_hash) = new_head_execution_block_hash_opt {
if is_merge_transition_complete {
let finalized_execution_block_hash = finalized_block
.execution_status
.block_hash()
.unwrap_or_else(ExecutionBlockHash::zero);
let current_slot = self.slot()?;

if let Err(e) = self.update_execution_engine_forkchoice_blocking(
finalized_execution_block_hash,
beacon_block_root,
new_head_execution_block_hash,
current_slot,
) {
crit!(
self.log,
"Failed to update execution head";
"error" => ?e
);
}
if is_merge_transition_complete {
let current_slot = self.slot()?;

// Performing this call immediately after
// `update_execution_engine_forkchoice_blocking` might result in two calls to fork
// choice updated, one *without* payload attributes and then a second *with*
// payload attributes.
//
// This seems OK. It's not a significant waste of EL<>CL bandwidth or resources, as
// far as I know.
if let Err(e) = self.prepare_beacon_proposer_blocking() {
crit!(
self.log,
"Failed to prepare proposers after fork choice";
"error" => ?e
);
}
if let Err(e) = self.update_execution_engine_forkchoice_blocking(current_slot) {
crit!(
self.log,
"Failed to update execution head";
"error" => ?e
);
}

// Performing this call immediately after
// `update_execution_engine_forkchoice_blocking` might result in two calls to fork
// choice updated, one *without* payload attributes and then a second *with*
// payload attributes.
//
// This seems OK. It's not a significant waste of EL<>CL bandwidth or resources, as
// far as I know.
if let Err(e) = self.prepare_beacon_proposer_blocking() {
crit!(
self.log,
"Failed to prepare proposers after fork choice";
"error" => ?e
);
}
}

Expand Down Expand Up @@ -3758,14 +3740,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let head = self.head_info()?;

let head_block_hash = if let Some(block_hash) = head.execution_payload_block_hash {
block_hash
} else {
// We only start to push preparation data for some chain *after* the transition block
// has been imported.
//
// There is no payload preparation for the transition block (i.e., the first block with
// execution enabled in some chain).
// We only start to push preparation data for some chain *after* the transition block
// has been imported.
//
// There is no payload preparation for the transition block (i.e., the first block with
// execution enabled in some chain).
if head.execution_payload_block_hash.is_none() {
return Ok(());
};

Expand Down Expand Up @@ -3886,32 +3866,16 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
"prepare_slot" => prepare_slot
);

let finalized_root = head.finalized_checkpoint.root;
let finalized_hash = self
.fork_choice
.read()
.get_block(&finalized_root)
.ok_or(Error::FinalizedBlockMissingFromForkChoice(finalized_root))?
.execution_status
.block_hash()
.unwrap_or_else(ExecutionBlockHash::zero);

self.update_execution_engine_forkchoice_blocking(
finalized_hash,
head.block_root,
head_block_hash,
current_slot,
)?;
// Use the blocking method here so that we don't form a queue of these functions when
// routinely calling them.
self.update_execution_engine_forkchoice_blocking(current_slot)?;
}

Ok(())
}

pub fn update_execution_engine_forkchoice_blocking(
&self,
finalized_execution_block_hash: ExecutionBlockHash,
head_block_root: Hash256,
head_execution_block_hash: ExecutionBlockHash,
current_slot: Slot,
) -> Result<(), Error> {
let execution_layer = self
Expand All @@ -3920,24 +3884,62 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.ok_or(Error::ExecutionLayerMissing)?;

execution_layer
.block_on_generic(|_| {
self.update_execution_engine_forkchoice_async(
finalized_execution_block_hash,
head_block_root,
head_execution_block_hash,
current_slot,
)
})
.block_on_generic(|_| self.update_execution_engine_forkchoice_async(current_slot))
.map_err(Error::ForkchoiceUpdate)?
}

pub async fn update_execution_engine_forkchoice_async(
&self,
finalized_execution_block_hash: ExecutionBlockHash,
head_block_root: Hash256,
head_execution_block_hash: ExecutionBlockHash,
current_slot: Slot,
) -> Result<(), Error> {
let execution_layer = self
.execution_layer
.as_ref()
.ok_or(Error::ExecutionLayerMissing)?;

// Take the global lock for updating the execution engine fork choice.
//
// Whilst holding this lock we must:
//
// 1. Read the canonical head.
// 2. Issue a forkchoiceUpdated call to the execution engine.
//
// This will allow us to ensure that we provide the execution layer with an *ordered* view
// of the head. I.e., we will never communicate a past head after communicating a later
// one.
//
// There are two "deadlock warnings" in this function. The downside of this nice ordering
// is the potential for deadlock. I would advise against any other use of
// `execution_engine_forkchoice_lock` apart from the one here.
let forkchoice_lock = execution_layer.execution_engine_forkchoice_lock().await;

// Deadlock warning:
//
// We are taking the `self.canonical_head` lock whilst holding the `forkchoice_lock`. This
// is intentional, since it allows us to ensure a consistent ordering of messages to the
// execution layer.
let head = self.head_info()?;
let head_block_root = head.block_root;
let head_execution_block_hash = if let Some(hash) = head.execution_payload_block_hash {
hash
} else {
// Don't send fork choice updates to the execution layer before the transition block.
return Ok(());
};

// Deadlock warning:
//
// The same as above, but the lock on `self.fork_choice`.
let finalized_root = head.finalized_checkpoint.root;
let finalized_execution_block_hash = self
.fork_choice
.read()
.get_block(&finalized_root)
.ok_or(Error::FinalizedBlockMissingFromForkChoice(finalized_root))?
.execution_status
.block_hash()
.unwrap_or_else(ExecutionBlockHash::zero);

let forkchoice_updated_response = self
.execution_layer
.as_ref()
Expand All @@ -3951,6 +3953,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.await
.map_err(Error::ExecutionForkChoiceUpdateFailed);

// The head has been read and the execution layer has been updated. It is now valid to send
// another fork choice update.
drop(forkchoice_lock);

match forkchoice_updated_response {
Ok(status) => match &status {
PayloadStatus::Valid | PayloadStatus::Syncing => Ok(()),
Expand Down
30 changes: 4 additions & 26 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use std::time::Duration;
use timer::spawn_timer;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use types::{
test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec,
ExecutionBlockHash, Hash256, SignedBeaconBlock,
test_utils::generate_deterministic_keypairs, BeaconState, ChainSpec, EthSpec, Hash256,
SignedBeaconBlock,
};

/// Interval between polling the eth1 node for genesis information.
Expand Down Expand Up @@ -673,24 +673,7 @@ where

// Issue the head to the execution engine on startup. This ensures it can start
// syncing.
if let Some(block_hash) = head.execution_payload_block_hash {
let finalized_root = head.finalized_checkpoint.root;
let finalized_block = beacon_chain
.store
.get_block(&finalized_root)
.map_err(|e| format!("Failed to read finalized block from DB: {:?}", e))?
.ok_or(format!(
"Finalized block missing from store: {:?}",
finalized_root
))?;
let finalized_execution_block_hash = finalized_block
.message()
.body()
.execution_payload()
.ok()
.map(|ep| ep.block_hash)
.unwrap_or_else(ExecutionBlockHash::zero);

if head.execution_payload_block_hash.is_some() {
// Spawn a new task using the "async" fork choice update method, rather than
// using the "blocking" method.
//
Expand All @@ -700,12 +683,7 @@ where
runtime_context.executor.spawn(
async move {
let result = inner_chain
.update_execution_engine_forkchoice_async(
finalized_execution_block_hash,
head.block_root,
block_hash,
current_slot,
)
.update_execution_engine_forkchoice_async(current_slot)
.await;

// No need to exit early if setting the head fails. It will be set again if/when the
Expand Down
6 changes: 6 additions & 0 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub struct Proposer {

struct Inner {
engines: Engines<HttpJsonRpc>,
execution_engine_forkchoice_lock: Mutex<()>,
suggested_fee_recipient: Option<Address>,
proposer_preparation_data: Mutex<HashMap<u64, ProposerPreparationDataEntry>>,
execution_blocks: Mutex<LruCache<ExecutionBlockHash, ExecutionBlock>>,
Expand Down Expand Up @@ -131,6 +132,7 @@ impl ExecutionLayer {
latest_forkchoice_state: <_>::default(),
log: log.clone(),
},
execution_engine_forkchoice_lock: <_>::default(),
suggested_fee_recipient,
proposer_preparation_data: Mutex::new(HashMap::new()),
proposers: RwLock::new(HashMap::new()),
Expand Down Expand Up @@ -176,6 +178,10 @@ impl ExecutionLayer {
&self.inner.log
}

pub async fn execution_engine_forkchoice_lock(&self) -> MutexGuard<'_, ()> {
self.inner.execution_engine_forkchoice_lock.lock().await
}

/// Convenience function to allow calling async functions in a non-async context.
pub fn block_on<'a, T, U, V>(&'a self, generate_future: T) -> Result<V, Error>
where
Expand Down

0 comments on commit f851f4b

Please sign in to comment.