From 4becc362cb8905e91d904a28f6b2f86b2b65ea6a Mon Sep 17 00:00:00 2001 From: sean Date: Wed, 22 Jun 2022 22:42:13 +0000 Subject: [PATCH 1/6] apply diva's patch --- Cargo.lock | 37 +- Cargo.toml | 1 + beacon_node/Cargo.toml | 2 +- beacon_node/beacon_chain/src/beacon_chain.rs | 12 +- beacon_node/beacon_chain/src/builder.rs | 4 +- .../beacon_chain/src/execution_payload.rs | 11 +- beacon_node/beacon_chain/src/test_utils.rs | 3 +- beacon_node/client/src/builder.rs | 2 +- beacon_node/execution_layer/Cargo.toml | 1 + beacon_node/execution_layer/src/engine_api.rs | 32 +- .../execution_layer/src/engine_api/http.rs | 64 +--- beacon_node/execution_layer/src/engines.rs | 105 +----- beacon_node/execution_layer/src/lib.rs | 347 ++++++++++-------- beacon_node/execution_layer/src/metrics.rs | 1 + .../src/test_utils/mock_execution_layer.rs | 11 +- beacon_node/src/cli.rs | 4 +- beacon_node/src/config.rs | 14 +- common/eth2/src/lib.rs | 2 +- consensus/types/Cargo.toml | 1 + consensus/types/src/lib.rs | 3 + .../src/test_rig.rs | 20 +- 21 files changed, 299 insertions(+), 378 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de385f22cf9..6c8fc4a9683 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -328,7 +328,7 @@ dependencies = [ [[package]] name = "beacon_node" -version = "2.3.1" +version = "2.3.0" dependencies = [ "beacon_chain", "clap", @@ -537,6 +537,17 @@ dependencies = [ "safemem", ] +[[package]] +name = "builder_client" +version = "0.1.0" +dependencies = [ + "eth2", + "reqwest", + "sensitive_url", + "serde", + "serde_json", +] + [[package]] name = "bumpalo" version = "3.10.0" @@ -1875,6 +1886,7 @@ name = "execution_layer" version = "0.1.0" dependencies = [ "async-trait", + "builder_client", "bytes", "environment", "eth1", @@ -5497,6 +5509,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" +dependencies = [ + "serde", + "serde_with_macros", +] + +[[package]] +name = "serde_with_macros" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_yaml" version = "0.8.24" @@ -6646,6 +6680,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "serde_with", "serde_yaml", "slog", "smallvec", diff --git a/Cargo.toml b/Cargo.toml index c79859d0a78..819f92d99ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,7 @@ members = [ "beacon_node", "beacon_node/beacon_chain", + "beacon_node/builder_client", "beacon_node/client", "beacon_node/eth1", "beacon_node/lighthouse_network", diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index 081e91aba8a..bc61d1756f4 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "2.3.1" +version = "2.3.0" authors = ["Paul Hauner ", "Age Manning { /// Provides information from the Ethereum 1 (PoW) chain. pub eth1_chain: Option>, /// Interfaces with the execution client. - pub execution_layer: Option, + pub execution_layer: Option>, /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received. pub(crate) canonical_head: TimeoutRwLock>, /// The root of the genesis block. @@ -3218,6 +3218,14 @@ impl BeaconChain { let slot = state.slot(); let proposer_index = state.get_beacon_proposer_index(state.slot(), &self.spec)? as u64; + let pubkey_opt = match self.validator_pubkey_bytes(proposer_index as usize) { + Ok(p) => p, + Err(e) => { + warn!(self.log, "Can't access proposer's pubkey, cannot use external builder"; "error" => ?e); + None + } + }; + // Closure to fetch a sync aggregate in cases where it is required. let get_sync_aggregate = || -> Result, BlockProductionError> { Ok(self @@ -3276,7 +3284,7 @@ impl BeaconChain { BeaconState::Merge(_) => { let sync_aggregate = get_sync_aggregate()?; let execution_payload = - get_execution_payload::(self, &state, proposer_index)?; + get_execution_payload::(self, &state, proposer_index, pubkey_opt)?; BeaconBlock::Merge(BeaconBlockMerge { slot, proposer_index, diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index 361246b4d38..87f94161585 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -77,7 +77,7 @@ pub struct BeaconChainBuilder { >, op_pool: Option>, eth1_chain: Option>, - execution_layer: Option, + execution_layer: Option>, event_handler: Option>, slot_clock: Option, shutdown_sender: Option>, @@ -481,7 +481,7 @@ where } /// Sets the `BeaconChain` execution layer. - pub fn execution_layer(mut self, execution_layer: Option) -> Self { + pub fn execution_layer(mut self, execution_layer: Option>) -> Self { self.execution_layer = execution_layer; self } diff --git a/beacon_node/beacon_chain/src/execution_payload.rs b/beacon_node/beacon_chain/src/execution_payload.rs index 08e4cd41efd..7085fc6500f 100644 --- a/beacon_node/beacon_chain/src/execution_payload.rs +++ b/beacon_node/beacon_chain/src/execution_payload.rs @@ -247,9 +247,10 @@ pub fn get_execution_payload, state: &BeaconState, proposer_index: u64, + pubkey: Option, ) -> Result { Ok( - prepare_execution_payload_blocking::(chain, state, proposer_index)? + prepare_execution_payload_blocking::(chain, state, proposer_index, pubkey)? .unwrap_or_default(), ) } @@ -259,6 +260,7 @@ pub fn prepare_execution_payload_blocking, state: &BeaconState, proposer_index: u64, + pubkey: Option, ) -> Result, BlockProductionError> { let execution_layer = chain .execution_layer @@ -267,7 +269,7 @@ pub fn prepare_execution_payload_blocking(chain, state, proposer_index).await + prepare_execution_payload::(chain, state, proposer_index, pubkey).await }) .map_err(BlockProductionError::BlockingFailed)? } @@ -290,6 +292,7 @@ pub async fn prepare_execution_payload, state: &BeaconState, proposer_index: u64, + pubkey: Option, ) -> Result, BlockProductionError> { let spec = &chain.spec; let execution_layer = chain @@ -345,12 +348,14 @@ pub async fn prepare_execution_payload( + .get_payload::( parent_hash, timestamp, random, finalized_block_hash.unwrap_or_else(ExecutionBlockHash::zero), proposer_index, + pubkey, + state.slot(), ) .await .map_err(BlockProductionError::GetPayloadFailed)?; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 69ed413fd40..980de25cf3c 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -147,7 +147,7 @@ pub struct Builder { store: Option>>, initial_mutator: Option>, store_mutator: Option>, - execution_layer: Option, + execution_layer: Option>, mock_execution_layer: Option>, runtime: TestRuntime, log: Logger, @@ -361,6 +361,7 @@ where DEFAULT_TERMINAL_BLOCK, spec.terminal_block_hash, spec.terminal_block_hash_activation_epoch, + None, ); self.execution_layer = Some(mock.el.clone()); self.mock_execution_layer = Some(mock); diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 1f02ec7b3c3..cb3fe7e0050 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -700,7 +700,7 @@ where execution_layer.spawn_watchdog_routine(beacon_chain.slot_clock.clone()); // Spawn a routine that removes expired proposer preparations. - execution_layer.spawn_clean_proposer_caches_routine::( + execution_layer.spawn_clean_proposer_caches_routine::( beacon_chain.slot_clock.clone(), ); diff --git a/beacon_node/execution_layer/Cargo.toml b/beacon_node/execution_layer/Cargo.toml index 0351b5e433d..f078d56053e 100644 --- a/beacon_node/execution_layer/Cargo.toml +++ b/beacon_node/execution_layer/Cargo.toml @@ -36,3 +36,4 @@ zeroize = { version = "1.4.2", features = ["zeroize_derive"] } lighthouse_metrics = { path = "../../common/lighthouse_metrics" } lazy_static = "1.4.0" ethers-core = { git = "https://github.com/gakonst/ethers-rs", rev = "02ad93a1cfb7b62eb051c77c61dc4c0218428e4a" } +builder_client = { path = "../builder_client" } diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index 9eb98cecb97..38148f95414 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,11 +1,9 @@ use crate::engines::ForkChoiceState; -use async_trait::async_trait; use eth1::http::RpcError; pub use ethers_core::types::Transaction; pub use json_structures::TransitionConfigurationV1; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; -use slog::Logger; pub use types::{ Address, EthSpec, ExecutionBlockHash, ExecutionPayload, ExecutionPayloadHeader, FixedVector, Hash256, Uint256, VariableList, @@ -28,10 +26,7 @@ pub enum Error { InvalidExecutePayloadResponse(&'static str), JsonRpc(RpcError), Json(serde_json::Error), - ServerMessage { - code: i64, - message: String, - }, + ServerMessage { code: i64, message: String }, Eip155Failure, IsSyncing, ExecutionBlockNotFound(ExecutionBlockHash), @@ -40,15 +35,9 @@ pub enum Error { PayloadIdUnavailable, TransitionConfigurationMismatch, PayloadConversionLogicFlaw, - InvalidBuilderQuery, - MissingPayloadId { - parent_hash: ExecutionBlockHash, - timestamp: u64, - prev_randao: Hash256, - suggested_fee_recipient: Address, - }, DeserializeTransaction(ssz_types::Error), DeserializeTransactions(ssz_types::Error), + BuilderApi(builder_client::Error), } impl From for Error { @@ -76,19 +65,14 @@ impl From for Error { } } -pub struct EngineApi; -pub struct BuilderApi; - -#[async_trait] -pub trait Builder { - async fn notify_forkchoice_updated( - &self, - forkchoice_state: ForkChoiceState, - payload_attributes: Option, - log: &Logger, - ) -> Result; +impl From for Error { + fn from(e: builder_client::Error) -> Self { + Error::BuilderApi(e) + } } +pub struct EngineApi; + #[derive(Clone, Copy, Debug, PartialEq)] pub enum PayloadStatusV1Status { Valid, diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 179045ccf86..f67e5d9539d 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -10,7 +10,7 @@ use serde::de::DeserializeOwned; use serde_json::json; use std::marker::PhantomData; use std::time::Duration; -use types::{BlindedPayload, EthSpec, ExecutionPayloadHeader, SignedBeaconBlock}; +use types::EthSpec; pub use reqwest::Client; @@ -42,12 +42,6 @@ pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1: &str = pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration = Duration::from_millis(500); -pub const BUILDER_GET_PAYLOAD_HEADER_V1: &str = "builder_getPayloadHeaderV1"; -pub const BUILDER_GET_PAYLOAD_HEADER_TIMEOUT: Duration = Duration::from_secs(2); - -pub const BUILDER_PROPOSE_BLINDED_BLOCK_V1: &str = "builder_proposeBlindedBlockV1"; -pub const BUILDER_PROPOSE_BLINDED_BLOCK_TIMEOUT: Duration = Duration::from_secs(2); - pub struct HttpJsonRpc { pub client: Client, pub url: SensitiveUrl, @@ -233,62 +227,6 @@ impl HttpJsonRpc { } } -impl HttpJsonRpc { - pub async fn get_payload_header_v1( - &self, - payload_id: PayloadId, - ) -> Result, Error> { - let params = json!([JsonPayloadIdRequest::from(payload_id)]); - - let response: JsonExecutionPayloadHeaderV1 = self - .rpc_request( - BUILDER_GET_PAYLOAD_HEADER_V1, - params, - BUILDER_GET_PAYLOAD_HEADER_TIMEOUT, - ) - .await?; - - Ok(response.into()) - } - - pub async fn forkchoice_updated_v1( - &self, - forkchoice_state: ForkChoiceState, - payload_attributes: Option, - ) -> Result { - let params = json!([ - JsonForkChoiceStateV1::from(forkchoice_state), - payload_attributes.map(JsonPayloadAttributesV1::from) - ]); - - let response: JsonForkchoiceUpdatedV1Response = self - .rpc_request( - ENGINE_FORKCHOICE_UPDATED_V1, - params, - ENGINE_FORKCHOICE_UPDATED_TIMEOUT, - ) - .await?; - - Ok(response.into()) - } - - pub async fn propose_blinded_block_v1( - &self, - block: SignedBeaconBlock>, - ) -> Result, Error> { - let params = json!([block]); - - let response: JsonExecutionPayloadV1 = self - .rpc_request( - BUILDER_PROPOSE_BLINDED_BLOCK_V1, - params, - BUILDER_PROPOSE_BLINDED_BLOCK_TIMEOUT, - ) - .await?; - - Ok(response.into()) - } -} #[cfg(test)] mod test { use super::auth::JwtKey; diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index d3c4d0e421a..94c40671fa8 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -1,11 +1,9 @@ //! Provides generic behaviour for multiple execution engines, specifically fallback behaviour. use crate::engine_api::{ - Builder, EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, - PayloadId, + EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId, }; -use crate::{BuilderApi, HttpJsonRpc}; -use async_trait::async_trait; +use crate::HttpJsonRpc; use futures::future::join_all; use lru::LruCache; use slog::{crit, debug, info, warn, Logger}; @@ -97,9 +95,8 @@ impl Engine { } } -#[async_trait] -impl Builder for Engine { - async fn notify_forkchoice_updated( +impl Engine { + pub async fn notify_forkchoice_updated( &self, forkchoice_state: ForkChoiceState, payload_attributes: Option, @@ -128,34 +125,6 @@ impl Builder for Engine { } } -#[async_trait] -impl Builder for Engine { - async fn notify_forkchoice_updated( - &self, - forkchoice_state: ForkChoiceState, - pa: Option, - log: &Logger, - ) -> Result { - let payload_attributes = pa.ok_or(EngineApiError::InvalidBuilderQuery)?; - let response = self - .api - .forkchoice_updated_v1(forkchoice_state, Some(payload_attributes)) - .await?; - - if let Some(payload_id) = response.payload_id { - let key = PayloadIdCacheKey::new(&forkchoice_state, &payload_attributes); - self.payload_id_cache.lock().await.put(key, payload_id); - } else { - warn!( - log, - "Builder should have returned a payload_id for attributes {:?}", payload_attributes - ); - } - - Ok(response) - } -} - // This structure used to hold multiple execution engines managed in a fallback manner. This // functionality has been removed following https://github.com/sigp/lighthouse/issues/3118 and this // struct will likely be removed in the future. @@ -165,15 +134,11 @@ pub struct Engines { pub log: Logger, } -pub struct Builders { - pub builders: Vec>, - pub log: Logger, -} - #[derive(Debug)] pub enum EngineError { Offline { id: String }, Api { id: String, error: EngineApiError }, + BuilderApi { error: EngineApiError }, Auth { id: String }, } @@ -422,66 +387,6 @@ impl Engines { } } -impl Builders { - pub async fn first_success_without_retry<'a, F, G, H>( - &'a self, - func: F, - ) -> Result> - where - F: Fn(&'a Engine) -> G, - G: Future>, - { - let mut errors = vec![]; - - for builder in &self.builders { - match func(builder).await { - Ok(result) => return Ok(result), - Err(error) => { - debug!( - self.log, - "Builder call failed"; - "error" => ?error, - "id" => &builder.id - ); - errors.push(EngineError::Api { - id: builder.id.clone(), - error, - }) - } - } - } - - Err(errors) - } - - pub async fn broadcast_without_retry<'a, F, G, H>( - &'a self, - func: F, - ) -> Vec> - where - F: Fn(&'a Engine) -> G, - G: Future>, - { - let func = &func; - let futures = self.builders.iter().map(|engine| async move { - func(engine).await.map_err(|error| { - debug!( - self.log, - "Builder call failed"; - "error" => ?error, - "id" => &engine.id - ); - EngineError::Api { - id: engine.id.clone(), - error, - } - }) - }); - - join_all(futures).await - } -} - impl PayloadIdCacheKey { fn new(state: &ForkChoiceState, attributes: &PayloadAttributes) -> Self { Self { diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index cff21902722..a3e5f2955d5 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -4,9 +4,8 @@ //! This crate only provides useful functionality for "The Merge", it does not provide any of the //! deposit-contract functionality that the `beacon_node/eth1` crate already provides. -use crate::engine_api::Builder; -use crate::engines::Builders; use auth::{Auth, JwtKey}; +use builder_client::BuilderHttpClient; use engine_api::Error as ApiError; pub use engine_api::*; pub use engine_api::{http, http::HttpJsonRpc}; @@ -20,7 +19,6 @@ use serde::{Deserialize, Serialize}; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use std::collections::HashMap; -use std::convert::TryInto; use std::future::Future; use std::io::Write; use std::path::PathBuf; @@ -33,7 +31,7 @@ use tokio::{ }; use types::{ BlindedPayload, BlockType, ChainSpec, Epoch, ExecPayload, ExecutionBlockHash, - ProposerPreparationData, SignedBeaconBlock, Slot, + ProposerPreparationData, PublicKeyBytes, SignedBeaconBlock, Slot, }; mod engine_api; @@ -66,6 +64,7 @@ pub enum Error { NoEngines, NoPayloadBuilder, ApiError(ApiError), + Builder(builder_client::Error), EngineErrors(Vec), NotSynced, ShuttingDown, @@ -99,15 +98,16 @@ pub struct Proposer { payload_attributes: PayloadAttributes, } -struct Inner { +struct Inner { engines: Engines, - builders: Builders, + builder: Option, execution_engine_forkchoice_lock: Mutex<()>, suggested_fee_recipient: Option
, proposer_preparation_data: Mutex>, execution_blocks: Mutex>, proposers: RwLock>, executor: TaskExecutor, + phantom: std::marker::PhantomData, log: Logger, } @@ -116,7 +116,7 @@ pub struct Config { /// Endpoint urls for EL nodes that are running the engine api. pub execution_endpoints: Vec, /// Endpoint urls for services providing the builder api. - pub builder_endpoints: Vec, + pub builder_url: Option, /// JWT secrets for the above endpoints running the engine api. pub secret_files: Vec, /// The default fee recipient to use on the beacon node if none if provided from @@ -148,16 +148,16 @@ fn strip_prefix(s: &str) -> &str { /// /// The fallback nodes have an ordering. The first supplied will be the first contacted, and so on. #[derive(Clone)] -pub struct ExecutionLayer { - inner: Arc, +pub struct ExecutionLayer { + inner: Arc>, } -impl ExecutionLayer { +impl ExecutionLayer { /// Instantiate `Self` with Execution engines specified using `Config`, all using the JSON-RPC via HTTP. pub fn from_config(config: Config, executor: TaskExecutor, log: Logger) -> Result { let Config { execution_endpoints: urls, - builder_endpoints: builder_urls, + builder_url, secret_files, suggested_fee_recipient, jwt_id, @@ -213,14 +213,9 @@ impl ExecutionLayer { Engine::::new(id, api) }; - let builders: Vec> = builder_urls - .into_iter() - .map(|url| { - let id = url.to_string(); - let api = HttpJsonRpc::::new(url)?; - Ok(Engine::::new(id, api)) - }) - .collect::>()?; + let builder = builder_url + .map(|url| BuilderHttpClient::new(url).map_err(Error::Builder)) + .transpose()?; let inner = Inner { engines: Engines { @@ -228,16 +223,14 @@ impl ExecutionLayer { latest_forkchoice_state: <_>::default(), log: log.clone(), }, - builders: Builders { - builders, - log: log.clone(), - }, + builder, execution_engine_forkchoice_lock: <_>::default(), suggested_fee_recipient, proposer_preparation_data: Mutex::new(HashMap::new()), proposers: RwLock::new(HashMap::new()), execution_blocks: Mutex::new(LruCache::new(EXECUTION_BLOCKS_LRU_CACHE_SIZE)), executor, + phantom: std::marker::PhantomData, log, }; @@ -247,13 +240,13 @@ impl ExecutionLayer { } } -impl ExecutionLayer { +impl ExecutionLayer { fn engines(&self) -> &Engines { &self.inner.engines } - fn builders(&self) -> &Builders { - &self.inner.builders + pub fn builder(&self) -> &Option { + &self.inner.builder } pub fn executor(&self) -> &TaskExecutor { @@ -287,9 +280,9 @@ impl ExecutionLayer { } /// Convenience function to allow calling async functions in a non-async context. - pub fn block_on<'a, T, U, V>(&'a self, generate_future: T) -> Result + pub fn block_on<'a, F, U, V>(&'a self, generate_future: F) -> Result where - T: Fn(&'a Self) -> U, + F: Fn(&'a Self) -> U, U: Future>, { let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?; @@ -301,9 +294,9 @@ impl ExecutionLayer { /// /// The function is "generic" since it does not enforce a particular return type on /// `generate_future`. - pub fn block_on_generic<'a, T, U, V>(&'a self, generate_future: T) -> Result + pub fn block_on_generic<'a, F, U, V>(&'a self, generate_future: F) -> Result where - T: Fn(&'a Self) -> U, + F: Fn(&'a Self) -> U, U: Future, { let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?; @@ -312,9 +305,9 @@ impl ExecutionLayer { } /// Convenience function to allow spawning a task without waiting for the result. - pub fn spawn(&self, generate_future: T, name: &'static str) + pub fn spawn(&self, generate_future: F, name: &'static str) where - T: FnOnce(Self) -> U, + F: FnOnce(Self) -> U, U: Future + Send + 'static, { self.executor().spawn(generate_future(self.clone()), name); @@ -322,12 +315,12 @@ impl ExecutionLayer { /// Spawns a routine which attempts to keep the execution engines online. pub fn spawn_watchdog_routine(&self, slot_clock: S) { - let watchdog = |el: ExecutionLayer| async move { + let watchdog = |el: ExecutionLayer| async move { // Run one task immediately. el.watchdog_task().await; let recurring_task = - |el: ExecutionLayer, now: Instant, duration_to_next_slot: Duration| async move { + |el: ExecutionLayer, now: Instant, duration_to_next_slot: Duration| async move { // We run the task three times per slot. // // The interval between each task is 1/3rd of the slot duration. This matches nicely @@ -382,11 +375,8 @@ impl ExecutionLayer { } /// Spawns a routine which cleans the cached proposer data periodically. - pub fn spawn_clean_proposer_caches_routine( - &self, - slot_clock: S, - ) { - let preparation_cleaner = |el: ExecutionLayer| async move { + pub fn spawn_clean_proposer_caches_routine(&self, slot_clock: S) { + let preparation_cleaner = |el: ExecutionLayer| async move { // Start the loop to periodically clean proposer preparation cache. loop { if let Some(duration_to_next_epoch) = @@ -400,7 +390,7 @@ impl ExecutionLayer { .map(|slot| slot.epoch(T::slots_per_epoch())) { Some(current_epoch) => el - .clean_proposer_caches::(current_epoch) + .clean_proposer_caches(current_epoch) .await .map_err(|e| { error!( @@ -425,7 +415,7 @@ impl ExecutionLayer { /// Spawns a routine that polls the `exchange_transition_configuration` endpoint. pub fn spawn_transition_configuration_poll(&self, spec: ChainSpec) { - let routine = |el: ExecutionLayer| async move { + let routine = |el: ExecutionLayer| async move { loop { if let Err(e) = el.exchange_transition_configuration(&spec).await { error!( @@ -459,7 +449,7 @@ impl ExecutionLayer { } /// Updates the proposer preparation data provided by validators - async fn update_proposer_preparation( + pub async fn update_proposer_preparation( &self, update_epoch: Epoch, preparation_data: &[ProposerPreparationData], @@ -481,7 +471,7 @@ impl ExecutionLayer { } /// Removes expired entries from proposer_preparation_data and proposers caches - async fn clean_proposer_caches(&self, current_epoch: Epoch) -> Result<(), Error> { + async fn clean_proposer_caches(&self, current_epoch: Epoch) -> Result<(), Error> { let mut proposer_preparation_data = self.proposer_preparation_data().await; // Keep all entries that have been updated in the last 2 epochs @@ -566,104 +556,137 @@ impl ExecutionLayer { /// /// The result will be returned from the first node that returns successfully. No more nodes /// will be contacted. - pub async fn get_payload>( + #[allow(clippy::too_many_arguments)] + pub async fn get_payload>( &self, parent_hash: ExecutionBlockHash, timestamp: u64, prev_randao: Hash256, finalized_block_hash: ExecutionBlockHash, proposer_index: u64, + pubkey: Option, + slot: Slot, ) -> Result { - let _timer = metrics::start_timer_vec( - &metrics::EXECUTION_LAYER_REQUEST_TIMES, - &[metrics::GET_PAYLOAD], - ); - let suggested_fee_recipient = self.get_suggested_fee_recipient(proposer_index).await; match Payload::block_type() { BlockType::Blinded => { - debug!( - self.log(), - "Issuing builder_getPayloadHeader"; - "suggested_fee_recipient" => ?suggested_fee_recipient, - "prev_randao" => ?prev_randao, - "timestamp" => timestamp, - "parent_hash" => ?parent_hash, + let _timer = metrics::start_timer_vec( + &metrics::EXECUTION_LAYER_REQUEST_TIMES, + &[metrics::GET_BLINDED_PAYLOAD], ); - self.builders() - .first_success_without_retry(|engine| async move { - let payload_id = engine - .get_payload_id( - parent_hash, - timestamp, - prev_randao, - suggested_fee_recipient, - ) - .await - .ok_or(ApiError::MissingPayloadId { - parent_hash, - timestamp, - prev_randao, - suggested_fee_recipient, - })?; - engine - .api - .get_payload_header_v1::(payload_id) - .await? - .try_into() - .map_err(|_| ApiError::PayloadConversionLogicFlaw) - }) - .await - .map_err(Error::EngineErrors) + self.get_blinded_payload( + parent_hash, + timestamp, + prev_randao, + finalized_block_hash, + suggested_fee_recipient, + pubkey, + slot, + ) + .await } BlockType::Full => { - debug!( - self.log(), - "Issuing engine_getPayload"; - "suggested_fee_recipient" => ?suggested_fee_recipient, - "prev_randao" => ?prev_randao, - "timestamp" => timestamp, - "parent_hash" => ?parent_hash, + let _timer = metrics::start_timer_vec( + &metrics::EXECUTION_LAYER_REQUEST_TIMES, + &[metrics::GET_PAYLOAD], ); - self.engines() - .first_success(|engine| async move { - let payload_id = if let Some(id) = engine - .get_payload_id( - parent_hash, - timestamp, - prev_randao, - suggested_fee_recipient, - ) - .await - { - // The payload id has been cached for this engine. - metrics::inc_counter_vec( - &metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID, - &[metrics::HIT], - ); - id - } else { - // The payload id has *not* been cached for this engine. Trigger an artificial - // fork choice update to retrieve a payload ID. - // - // TODO(merge): a better algorithm might try to favour a node that already had a - // cached payload id, since a payload that has had more time to produce is - // likely to be more profitable. - metrics::inc_counter_vec( - &metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID, - &[metrics::MISS], - ); - let fork_choice_state = ForkChoiceState { - head_block_hash: parent_hash, - safe_block_hash: parent_hash, - finalized_block_hash, - }; - let payload_attributes = PayloadAttributes { - timestamp, - prev_randao, - suggested_fee_recipient, - }; + self.get_full_payload( + parent_hash, + timestamp, + prev_randao, + finalized_block_hash, + suggested_fee_recipient, + ) + .await + } + } + } + + #[allow(clippy::too_many_arguments)] + async fn get_blinded_payload>( + &self, + parent_hash: ExecutionBlockHash, + timestamp: u64, + prev_randao: Hash256, + finalized_block_hash: ExecutionBlockHash, + suggested_fee_recipient: Address, + pubkey_opt: Option, + slot: Slot, + ) -> Result { + todo!("sean") + } + + /// Get a full payload without caching its result in the execution layer's payload cache. + async fn get_full_payload>( + &self, + parent_hash: ExecutionBlockHash, + timestamp: u64, + prev_randao: Hash256, + finalized_block_hash: ExecutionBlockHash, + suggested_fee_recipient: Address, + ) -> Result { + self.get_full_payload_with( + parent_hash, + timestamp, + prev_randao, + finalized_block_hash, + suggested_fee_recipient, + noop, + ) + .await + } + + async fn get_full_payload_with>( + &self, + parent_hash: ExecutionBlockHash, + timestamp: u64, + prev_randao: Hash256, + finalized_block_hash: ExecutionBlockHash, + suggested_fee_recipient: Address, + f: fn(&ExecutionLayer, &ExecutionPayload) -> Option>, + ) -> Result { + debug!( + self.log(), + "Issuing engine_getPayload"; + "suggested_fee_recipient" => ?suggested_fee_recipient, + "prev_randao" => ?prev_randao, + "timestamp" => timestamp, + "parent_hash" => ?parent_hash, + ); + self.engines() + .first_success(|engine| async move { + let payload_id = if let Some(id) = engine + .get_payload_id(parent_hash, timestamp, prev_randao, suggested_fee_recipient) + .await + { + // The payload id has been cached for this engine. + metrics::inc_counter_vec( + &metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID, + &[metrics::HIT], + ); + id + } else { + // The payload id has *not* been cached for this engine. Trigger an artificial + // fork choice update to retrieve a payload ID. + // + // TODO(merge): a better algorithm might try to favour a node that already had a + // cached payload id, since a payload that has had more time to produce is + // likely to be more profitable. + metrics::inc_counter_vec( + &metrics::EXECUTION_LAYER_PRE_PREPARED_PAYLOAD_ID, + &[metrics::MISS], + ); + let fork_choice_state = ForkChoiceState { + head_block_hash: parent_hash, + safe_block_hash: parent_hash, + finalized_block_hash, + }; + let payload_attributes = PayloadAttributes { + timestamp, + prev_randao, + suggested_fee_recipient, + }; let response = engine .notify_forkchoice_updated( @@ -689,16 +712,19 @@ impl ExecutionLayer { } }; - engine - .api - .get_payload_v1::(payload_id) - .await - .map(Into::into) - }) + engine + .api + .get_payload_v1::(payload_id) .await - .map_err(Error::EngineErrors) - } - } + .map(|full_payload| { + if f(self, &full_payload).is_some() { + warn!(self.log(), "Duplicate payload cached, this might indicate redundant proposal attempts."); + } + full_payload.into() + }) + }) + .await + .map_err(Error::EngineErrors) } /// Maps to the `engine_newPayload` JSON-RPC call. @@ -714,7 +740,7 @@ impl ExecutionLayer { /// - Invalid, if any nodes return invalid. /// - Syncing, if any nodes return syncing. /// - An error, if all nodes return an error. - pub async fn notify_new_payload( + pub async fn notify_new_payload( &self, execution_payload: &ExecutionPayload, ) -> Result { @@ -877,23 +903,10 @@ impl ExecutionLayer { }) .await; - // Only query builders with payload attributes populated. - let builder_broadcast_results = if payload_attributes.is_some() { - self.builders() - .broadcast_without_retry(|engine| async move { - engine - .notify_forkchoice_updated(forkchoice_state, payload_attributes, self.log()) - .await - }) - .await - } else { - vec![] - }; process_multiple_payload_statuses( head_block_hash, Some(broadcast_results) .into_iter() - .chain(builder_broadcast_results.into_iter()) .map(|result| result.map(|response| response.payload_status)), self.log(), ) @@ -1152,7 +1165,7 @@ impl ExecutionLayer { } } - pub async fn get_payload_by_block_hash( + pub async fn get_payload_by_block_hash( &self, hash: ExecutionBlockHash, ) -> Result>, Error> { @@ -1165,7 +1178,7 @@ impl ExecutionLayer { .map_err(Error::EngineErrors) } - async fn get_payload_by_block_hash_from_engine( + async fn get_payload_by_block_hash_from_engine( &self, engine: &Engine, hash: ExecutionBlockHash, @@ -1210,21 +1223,24 @@ impl ExecutionLayer { })) } - pub async fn propose_blinded_beacon_block( + pub async fn propose_blinded_beacon_block( &self, block: &SignedBeaconBlock>, ) -> Result, Error> { debug!( self.log(), - "Issuing builder_proposeBlindedBlock"; + "Sending block to builder"; "root" => ?block.canonical_root(), ); - self.builders() - .first_success_without_retry(|engine| async move { - engine.api.propose_blinded_block_v1(block.clone()).await - }) - .await - .map_err(Error::EngineErrors) + if let Some(builder) = self.builder() { + builder + .post_builder_blinded_blocks(block) + .await + .map_err(Error::Builder) + .map(|d| d.data) + } else { + Err(Error::NoPayloadBuilder) + } } } @@ -1324,4 +1340,17 @@ mod test { }) .await; } + + // test fallback + + // test normal flow used when + // - merge hasn't finalized + // - bad chain health (finalization not advancing?) + // - gas_limit not what you sent builder + // - fee recipient not what you sent builder + // - timeout? +} + +fn noop(_: &ExecutionLayer, _: &ExecutionPayload) -> Option> { + None } diff --git a/beacon_node/execution_layer/src/metrics.rs b/beacon_node/execution_layer/src/metrics.rs index 356c5a46dd9..e28a81fd878 100644 --- a/beacon_node/execution_layer/src/metrics.rs +++ b/beacon_node/execution_layer/src/metrics.rs @@ -3,6 +3,7 @@ pub use lighthouse_metrics::*; pub const HIT: &str = "hit"; pub const MISS: &str = "miss"; pub const GET_PAYLOAD: &str = "get_payload"; +pub const GET_BLINDED_PAYLOAD: &str = "get_blinded_payload"; pub const NEW_PAYLOAD: &str = "new_payload"; pub const FORKCHOICE_UPDATED: &str = "forkchoice_updated"; pub const GET_TERMINAL_POW_BLOCK_HASH: &str = "get_terminal_pow_block_hash"; diff --git a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs index 5770a8a3821..707a7c0c3e7 100644 --- a/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs +++ b/beacon_node/execution_layer/src/test_utils/mock_execution_layer.rs @@ -9,7 +9,7 @@ use types::{Address, ChainSpec, Epoch, EthSpec, FullPayload, Hash256, Uint256}; pub struct MockExecutionLayer { pub server: MockServer, - pub el: ExecutionLayer, + pub el: ExecutionLayer, pub executor: TaskExecutor, pub spec: ChainSpec, } @@ -22,6 +22,7 @@ impl MockExecutionLayer { DEFAULT_TERMINAL_BLOCK, ExecutionBlockHash::zero(), Epoch::new(0), + None, ) } @@ -31,6 +32,7 @@ impl MockExecutionLayer { terminal_block: u64, terminal_block_hash: ExecutionBlockHash, terminal_block_hash_activation_epoch: Epoch, + builder_url: Option, ) -> Self { let handle = executor.handle().unwrap(); @@ -54,6 +56,7 @@ impl MockExecutionLayer { let config = Config { execution_endpoints: vec![url], + builder_url, secret_files: vec![path], suggested_fee_recipient: Some(Address::repeat_byte(42)), ..Default::default() @@ -111,12 +114,14 @@ impl MockExecutionLayer { let validator_index = 0; let payload = self .el - .get_payload::>( + .get_payload::>( parent_hash, timestamp, prev_randao, finalized_block_hash, validator_index, + None, + slot, ) .await .unwrap() @@ -173,7 +178,7 @@ impl MockExecutionLayer { pub async fn with_terminal_block<'a, U, V>(self, func: U) -> Self where - U: Fn(ChainSpec, ExecutionLayer, Option) -> V, + U: Fn(ChainSpec, ExecutionLayer, Option) -> V, V: Future, { let terminal_block_number = self diff --git a/beacon_node/src/cli.rs b/beacon_node/src/cli.rs index 3102018e3e2..01bc9d7a112 100644 --- a/beacon_node/src/cli.rs +++ b/beacon_node/src/cli.rs @@ -465,8 +465,8 @@ pub fn cli_app<'a, 'b>() -> App<'a, 'b> { .takes_value(true) ) .arg( - Arg::with_name("payload-builders") - .long("payload-builders") + Arg::with_name("builder") + .long("builder") .help("The URL of a service compatible with the MEV-boost API.") .requires("merge") .takes_value(true) diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index db765100c3a..defbfc07bef 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -256,13 +256,13 @@ pub fn get_config( el_config.execution_endpoints = client_config.eth1.endpoints.clone(); } - if let Some(endpoints) = cli_args.value_of("payload-builders") { - el_config.builder_endpoints = endpoints - .split(',') - .map(SensitiveUrl::parse) - .collect::>() - .map_err(|e| format!("payload-builders contains an invalid URL {:?}", e))?; - } + el_config.builder_url = cli_args + .value_of("builder") + .map(|url| { + SensitiveUrl::parse(url) + .map_err(|e| format!("builder contains an invalid URL {:?}", e)) + }) + .transpose()?; if let Some(secrets) = cli_args.value_of("jwt-secrets") { let secret_files: Vec<_> = secrets.split(',').map(PathBuf::from).collect(); diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 3e965a2bf86..fd23cab42d1 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -1491,7 +1491,7 @@ impl BeaconNodeHttpClient { /// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an /// appropriate error message. -async fn ok_or_error(response: Response) -> Result { +pub async fn ok_or_error(response: Response) -> Result { let status = response.status(); if status == StatusCode::OK { diff --git a/consensus/types/Cargo.toml b/consensus/types/Cargo.toml index 881d17a3309..96018230f0d 100644 --- a/consensus/types/Cargo.toml +++ b/consensus/types/Cargo.toml @@ -46,6 +46,7 @@ itertools = "0.10.0" superstruct = "0.5.0" serde_json = "1.0.74" smallvec = "1.8.0" +serde_with = "1.13.0" [dev-dependencies] criterion = "0.3.3" diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index 22e429a58c2..885efac2dad 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -27,6 +27,7 @@ pub mod beacon_block_body; pub mod beacon_block_header; pub mod beacon_committee; pub mod beacon_state; +pub mod builder_bid; pub mod chain_spec; pub mod checkpoint; pub mod consts; @@ -82,6 +83,7 @@ pub mod sync_committee_message; pub mod sync_selection_proof; pub mod sync_subnet_id; mod tree_hash_impls; +pub mod validator_registration_data; pub mod slot_data; #[cfg(feature = "sqlite")] @@ -157,6 +159,7 @@ pub use crate::sync_duty::SyncDuty; pub use crate::sync_selection_proof::SyncSelectionProof; pub use crate::sync_subnet_id::SyncSubnetId; pub use crate::validator::Validator; +pub use crate::validator_registration_data::*; pub use crate::validator_subscription::ValidatorSubscription; pub use crate::voluntary_exit::VoluntaryExit; diff --git a/testing/execution_engine_integration/src/test_rig.rs b/testing/execution_engine_integration/src/test_rig.rs index 21162fea56d..a5bab4ed781 100644 --- a/testing/execution_engine_integration/src/test_rig.rs +++ b/testing/execution_engine_integration/src/test_rig.rs @@ -11,9 +11,9 @@ use types::{ const EXECUTION_ENGINE_START_TIMEOUT: Duration = Duration::from_secs(20); -struct ExecutionPair { +struct ExecutionPair { /// The Lighthouse `ExecutionLayer` struct, connected to the `execution_engine` via HTTP. - execution_layer: ExecutionLayer, + execution_layer: ExecutionLayer, /// A handle to external EE process, once this is dropped the process will be killed. #[allow(dead_code)] execution_engine: ExecutionEngine, @@ -23,11 +23,11 @@ struct ExecutionPair { /// /// There are two EEs held here so that we can test out-of-order application of payloads, and other /// edge-cases. -pub struct TestRig { +pub struct TestRig { #[allow(dead_code)] runtime: Arc, - ee_a: ExecutionPair, - ee_b: ExecutionPair, + ee_a: ExecutionPair, + ee_b: ExecutionPair, spec: ChainSpec, _runtime_shutdown: exit_future::Signal, } @@ -172,12 +172,14 @@ impl TestRig { let valid_payload = self .ee_a .execution_layer - .get_payload::>( + .get_payload::>( parent_hash, timestamp, prev_randao, finalized_block_hash, proposer_index, + None, + Slot::new(0), ) .await .unwrap() @@ -265,12 +267,14 @@ impl TestRig { let second_payload = self .ee_a .execution_layer - .get_payload::>( + .get_payload::>( parent_hash, timestamp, prev_randao, finalized_block_hash, proposer_index, + None, + Slot::new(0), ) .await .unwrap() @@ -400,7 +404,7 @@ impl TestRig { /// /// Panic if payload reconstruction fails. async fn check_payload_reconstruction( - ee: &ExecutionPair, + ee: &ExecutionPair, payload: &ExecutionPayload, ) { let reconstructed = ee From 558adf549ebfc0fb0959e0fdcad611425e134d5c Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 29 Jun 2022 13:25:58 -0400 Subject: [PATCH 2/6] undo version decrement --- beacon_node/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/Cargo.toml b/beacon_node/Cargo.toml index bc61d1756f4..081e91aba8a 100644 --- a/beacon_node/Cargo.toml +++ b/beacon_node/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "beacon_node" -version = "2.3.0" +version = "2.3.1" authors = ["Paul Hauner ", "Age Manning Date: Wed, 29 Jun 2022 14:05:55 -0400 Subject: [PATCH 3/6] merge unstable, add builder client, add types required by builder client, fix compile errors --- Cargo.lock | 35 ++++ beacon_node/builder_client/Cargo.toml | 12 ++ beacon_node/builder_client/src/lib.rs | 192 ++++++++++++++++++ beacon_node/execution_layer/src/engine_api.rs | 2 - .../execution_layer/src/engine_api/http.rs | 1 - beacon_node/execution_layer/src/engines.rs | 1 - beacon_node/execution_layer/src/lib.rs | 40 ++-- beacon_node/src/config.rs | 4 +- consensus/types/src/builder_bid.rs | 52 +++++ .../types/src/validator_registration_data.rs | 23 +++ 10 files changed, 344 insertions(+), 18 deletions(-) create mode 100644 beacon_node/builder_client/Cargo.toml create mode 100644 beacon_node/builder_client/src/lib.rs create mode 100644 consensus/types/src/builder_bid.rs create mode 100644 consensus/types/src/validator_registration_data.rs diff --git a/Cargo.lock b/Cargo.lock index 3dbe005658f..07112417f4e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,6 +537,17 @@ dependencies = [ "safemem", ] +[[package]] +name = "builder_client" +version = "0.1.0" +dependencies = [ + "eth2", + "reqwest", + "sensitive_url", + "serde", + "serde_json", +] + [[package]] name = "bumpalo" version = "3.10.0" @@ -1876,6 +1887,7 @@ name = "execution_layer" version = "0.1.0" dependencies = [ "async-trait", + "builder_client", "bytes", "environment", "eth2", @@ -5504,6 +5516,28 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "1.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "678b5a069e50bf00ecd22d0cd8ddf7c236f68581b03db652061ed5eb13a312ff" +dependencies = [ + "serde", + "serde_with_macros", +] + +[[package]] +name = "serde_with_macros" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e182d6ec6f05393cc0e5ed1bf81ad6db3a8feedf8ee515ecdd369809bcce8082" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "serde_yaml" version = "0.8.24" @@ -6654,6 +6688,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "serde_with", "serde_yaml", "slog", "smallvec", diff --git a/beacon_node/builder_client/Cargo.toml b/beacon_node/builder_client/Cargo.toml new file mode 100644 index 00000000000..c4d21c59ab8 --- /dev/null +++ b/beacon_node/builder_client/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "builder_client" +version = "0.1.0" +edition = "2021" +authors = ["Sean Anderson "] + +[dependencies] +reqwest = { version = "0.11.0", features = ["json","stream"] } +sensitive_url = { path = "../../common/sensitive_url" } +eth2 = { path = "../../common/eth2" } +serde = { version = "1.0.116", features = ["derive"] } +serde_json = "1.0.58" \ No newline at end of file diff --git a/beacon_node/builder_client/src/lib.rs b/beacon_node/builder_client/src/lib.rs new file mode 100644 index 00000000000..500f5aa9ffe --- /dev/null +++ b/beacon_node/builder_client/src/lib.rs @@ -0,0 +1,192 @@ +use eth2::ok_or_error; +use eth2::types::builder_bid::SignedBuilderBid; +use eth2::types::{ + BlindedPayload, EthSpec, ExecPayload, ExecutionBlockHash, ExecutionPayload, + ForkVersionedResponse, PublicKeyBytes, SignedBeaconBlock, SignedValidatorRegistrationData, + Slot, +}; +pub use eth2::Error; +use reqwest::{IntoUrl, Response}; +use sensitive_url::SensitiveUrl; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::time::Duration; + +pub const DEFAULT_GET_HEADER_TIMEOUT_MILLIS: u64 = 500; + +#[derive(Clone)] +pub struct Timeouts { + get_header: Duration, +} + +impl Default for Timeouts { + fn default() -> Self { + Self { + get_header: Duration::from_millis(DEFAULT_GET_HEADER_TIMEOUT_MILLIS), + } + } +} + +#[derive(Clone)] +pub struct BuilderHttpClient { + client: reqwest::Client, + server: SensitiveUrl, + timeouts: Timeouts, +} + +impl BuilderHttpClient { + pub fn new(server: SensitiveUrl) -> Result { + Ok(Self { + client: reqwest::Client::new(), + server, + timeouts: Timeouts::default(), + }) + } + + pub fn new_with_timeouts(server: SensitiveUrl, timeouts: Timeouts) -> Result { + Ok(Self { + client: reqwest::Client::new(), + server, + timeouts, + }) + } + + async fn get(&self, url: U) -> Result { + self.get_response_with_timeout(url, None) + .await? + .json() + .await + .map_err(Error::Reqwest) + } + + async fn get_with_timeout( + &self, + url: U, + timeout: Duration, + ) -> Result { + self.get_response_with_timeout(url, Some(timeout)) + .await? + .json() + .await + .map_err(Error::Reqwest) + } + + /// Perform a HTTP GET request, returning the `Response` for further processing. + async fn get_response_with_timeout( + &self, + url: U, + timeout: Option, + ) -> Result { + let mut builder = self.client.get(url); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + let response = builder.send().await.map_err(Error::Reqwest)?; + ok_or_error(response).await + } + + /// Generic POST function supporting arbitrary responses and timeouts. + async fn post_generic( + &self, + url: U, + body: &T, + timeout: Option, + ) -> Result { + let mut builder = self.client.post(url); + if let Some(timeout) = timeout { + builder = builder.timeout(timeout); + } + let response = builder.json(body).send().await?; + ok_or_error(response).await + } + + async fn post_with_raw_response( + &self, + url: U, + body: &T, + ) -> Result { + let response = self + .client + .post(url) + .json(body) + .send() + .await + .map_err(Error::Reqwest)?; + ok_or_error(response).await + } + + /// `POST /eth/v1/builder/validators` + pub async fn post_builder_validators( + &self, + validator: &[SignedValidatorRegistrationData], + ) -> Result<(), Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("eth") + .push("v1") + .push("builder") + .push("validators"); + + self.post_generic(path, &validator, None).await?; + Ok(()) + } + + /// `POST /eth/v1/builder/blinded_blocks` + pub async fn post_builder_blinded_blocks( + &self, + blinded_block: &SignedBeaconBlock>, + ) -> Result>, Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("eth") + .push("v1") + .push("builder") + .push("blinded_blocks"); + + Ok(self + .post_with_raw_response(path, &blinded_block) + .await? + .json() + .await?) + } + + /// `GET /eth/v1/builder/header` + pub async fn get_builder_header>( + &self, + slot: Slot, + parent_hash: ExecutionBlockHash, + pubkey: &PublicKeyBytes, + ) -> Result>, Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("eth") + .push("v1") + .push("builder") + .push("header") + .push(slot.to_string().as_str()) + .push(format!("{parent_hash:?}").as_str()) + .push(pubkey.as_hex_string().as_str()); + + self.get_with_timeout(path, self.timeouts.get_header).await + } + + /// `GET /eth/v1/builder/status` + pub async fn get_builder_status(&self) -> Result<(), Error> { + let mut path = self.server.full.clone(); + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("eth") + .push("v1") + .push("builder") + .push("status"); + + self.get(path).await + } +} diff --git a/beacon_node/execution_layer/src/engine_api.rs b/beacon_node/execution_layer/src/engine_api.rs index db9e6965eba..a1e769e3e35 100644 --- a/beacon_node/execution_layer/src/engine_api.rs +++ b/beacon_node/execution_layer/src/engine_api.rs @@ -1,6 +1,4 @@ use crate::engines::ForkChoiceState; -use async_trait::async_trait; -use eth1::http::RpcError; pub use ethers_core::types::Transaction; use http::deposit_methods::RpcError; pub use json_structures::TransitionConfigurationV1; diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index 21a43808bac..832771460e5 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -43,7 +43,6 @@ pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1: &str = pub const ENGINE_EXCHANGE_TRANSITION_CONFIGURATION_V1_TIMEOUT: Duration = Duration::from_millis(500); - /// This error is returned during a `chainId` call by Geth. pub const EIP155_ERROR_STR: &str = "chain not synced beyond EIP-155 replay-protection fork block"; diff --git a/beacon_node/execution_layer/src/engines.rs b/beacon_node/execution_layer/src/engines.rs index 94c40671fa8..88c94162f82 100644 --- a/beacon_node/execution_layer/src/engines.rs +++ b/beacon_node/execution_layer/src/engines.rs @@ -4,7 +4,6 @@ use crate::engine_api::{ EngineApi, Error as EngineApiError, ForkchoiceUpdatedResponse, PayloadAttributes, PayloadId, }; use crate::HttpJsonRpc; -use futures::future::join_all; use lru::LruCache; use slog::{crit, debug, info, warn, Logger}; use std::future::Future; diff --git a/beacon_node/execution_layer/src/lib.rs b/beacon_node/execution_layer/src/lib.rs index b0073eeedc2..156382c4812 100644 --- a/beacon_node/execution_layer/src/lib.rs +++ b/beacon_node/execution_layer/src/lib.rs @@ -4,8 +4,6 @@ //! This crate only provides useful functionality for "The Merge", it does not provide any of the //! deposit-contract functionality that the `beacon_node/eth1` crate already provides. -use crate::engine_api::Builder; -use crate::engines::Builders; use auth::{strip_prefix, Auth, JwtKey}; use builder_client::BuilderHttpClient; use engine_api::Error as ApiError; @@ -611,7 +609,34 @@ impl ExecutionLayer { pubkey_opt: Option, slot: Slot, ) -> Result { - todo!("sean") + //FIXME(sean) fallback logic included in PR #3134 + + // Don't attempt to outsource payload construction until after the merge transition has been + // finalized. We want to be conservative with payload construction until then. + if let (Some(builder), Some(pubkey)) = (self.builder(), pubkey_opt) { + if finalized_block_hash != ExecutionBlockHash::zero() { + info!( + self.log(), + "Requesting blinded header from connected builder"; + "slot" => ?slot, + "pubkey" => ?pubkey, + "parent_hash" => ?parent_hash, + ); + return builder + .get_builder_header::(slot, parent_hash, &pubkey) + .await + .map(|d| d.data.message.header) + .map_err(Error::Builder); + } + } + self.get_full_payload::( + parent_hash, + timestamp, + prev_randao, + finalized_block_hash, + suggested_fee_recipient, + ) + .await } /// Get a full payload without caching its result in the execution layer's payload cache. @@ -1337,15 +1362,6 @@ mod test { }) .await; } - - // test fallback - - // test normal flow used when - // - merge hasn't finalized - // - bad chain health (finalization not advancing?) - // - gas_limit not what you sent builder - // - fee recipient not what you sent builder - // - timeout? } fn noop(_: &ExecutionLayer, _: &ExecutionPayload) -> Option> { diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index d1730855091..13947f8e5e8 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -289,8 +289,8 @@ pub fn get_config( // Parse and set the payload builder, if any. if let Some(endpoint) = cli_args.value_of("builder") { let payload_builder = - parse_only_one_value(endpoints, SensitiveUrl::parse, "--builder", log)?; - el_config.builder_url = payload_builder; + parse_only_one_value(endpoint, SensitiveUrl::parse, "--builder", log)?; + el_config.builder_url = Some(payload_builder); } // Set config values from parse values. diff --git a/consensus/types/src/builder_bid.rs b/consensus/types/src/builder_bid.rs new file mode 100644 index 00000000000..1726f2ad077 --- /dev/null +++ b/consensus/types/src/builder_bid.rs @@ -0,0 +1,52 @@ +use crate::{EthSpec, ExecPayload, ExecutionPayloadHeader, Uint256}; +use bls::blst_implementations::PublicKeyBytes; +use bls::Signature; +use serde::{Deserialize as De, Deserializer, Serialize as Ser, Serializer}; +use serde_derive::{Deserialize, Serialize}; +use serde_with::{serde_as, DeserializeAs, SerializeAs}; +use std::marker::PhantomData; + +#[serde_as] +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +#[serde(bound = "E: EthSpec, Payload: ExecPayload")] +pub struct BuilderBid> { + #[serde_as(as = "BlindedPayloadAsHeader")] + pub header: Payload, + #[serde(with = "eth2_serde_utils::quoted_u256")] + pub value: Uint256, + pub pubkey: PublicKeyBytes, + #[serde(skip)] + _phantom_data: PhantomData, +} + +/// Validator registration, for use in interacting with servers implementing the builder API. +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +#[serde(bound = "E: EthSpec, Payload: ExecPayload")] +pub struct SignedBuilderBid> { + pub message: BuilderBid, + pub signature: Signature, +} + +struct BlindedPayloadAsHeader(PhantomData); + +impl> SerializeAs for BlindedPayloadAsHeader { + fn serialize_as(source: &Payload, serializer: S) -> Result + where + S: Serializer, + { + source.to_execution_payload_header().serialize(serializer) + } +} + +impl<'de, E: EthSpec, Payload: ExecPayload> DeserializeAs<'de, Payload> + for BlindedPayloadAsHeader +{ + fn deserialize_as(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let payload_header = ExecutionPayloadHeader::deserialize(deserializer)?; + Payload::try_from(payload_header) + .map_err(|_| serde::de::Error::custom("unable to convert payload header to payload")) + } +} diff --git a/consensus/types/src/validator_registration_data.rs b/consensus/types/src/validator_registration_data.rs new file mode 100644 index 00000000000..5a3450df081 --- /dev/null +++ b/consensus/types/src/validator_registration_data.rs @@ -0,0 +1,23 @@ +use crate::*; +use serde::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; +use tree_hash_derive::TreeHash; + +/// Validator registration, for use in interacting with servers implementing the builder API. +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct SignedValidatorRegistrationData { + pub message: ValidatorRegistrationData, + pub signature: Signature, +} + +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone, Encode, Decode, TreeHash)] +pub struct ValidatorRegistrationData { + pub fee_recipient: Address, + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub gas_limit: u64, + #[serde(with = "eth2_serde_utils::quoted_u64")] + pub timestamp: u64, + pub pubkey: PublicKeyBytes, +} + +impl SignedRoot for ValidatorRegistrationData {} From 2854beab7c559bcd9e20067832cf5e0ad9b1d5ff Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 29 Jun 2022 14:22:19 -0400 Subject: [PATCH 4/6] fix cli tests --- lighthouse/tests/beacon_node.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 443c442027e..a9f8900d0cf 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -411,12 +411,13 @@ fn run_payload_builder_flag_test(flag: &str, builders: &str) { let config = config.execution_layer.as_ref().unwrap(); // Only first provided endpoint is parsed as we don't support // redundancy. - assert_eq!(&config.builder_endpoints, &all_builders[..1]); + assert_eq!(config.builder_url, all_builders.get(0).cloned()); }); } #[test] fn payload_builder_flags() { + run_payload_builder_flag_test("builder", "http://meow.cats"); run_payload_builder_flag_test("payload-builder", "http://meow.cats"); run_payload_builder_flag_test("payload-builders", "http://meow.cats,http://woof.dogs"); run_payload_builder_flag_test("payload-builders", "http://meow.cats,http://woof.dogs"); From a94c0b91cfcfee6e63e8efefb3988ff311ae1486 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 29 Jun 2022 15:10:53 -0400 Subject: [PATCH 5/6] add generic to el in invalid payload rig --- beacon_node/beacon_chain/tests/payload_invalidation.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 1aa9844a351..2a48a4b6911 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -64,7 +64,7 @@ impl InvalidPayloadRig { self } - fn execution_layer(&self) -> ExecutionLayer { + fn execution_layer(&self) -> ExecutionLayer { self.harness.chain.execution_layer.clone().unwrap() } From ac15207bf3e4a41239be1fc562297fb0c9916a68 Mon Sep 17 00:00:00 2001 From: realbigsean Date: Wed, 29 Jun 2022 20:57:17 -0400 Subject: [PATCH 6/6] pr feedback --- beacon_node/beacon_chain/src/beacon_chain.rs | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 9afa4befbca..5351b785505 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3218,13 +3218,10 @@ impl BeaconChain { let slot = state.slot(); let proposer_index = state.get_beacon_proposer_index(state.slot(), &self.spec)? as u64; - let pubkey_opt = match self.validator_pubkey_bytes(proposer_index as usize) { - Ok(p) => p, - Err(e) => { - warn!(self.log, "Can't access proposer's pubkey, cannot use external builder"; "error" => ?e); - None - } - }; + let pubkey_opt = state + .validators() + .get(proposer_index as usize) + .map(|v| v.pubkey); // Closure to fetch a sync aggregate in cases where it is required. let get_sync_aggregate = || -> Result, BlockProductionError> {