From 56b79848942b109cfbca6069f6333e3ec53f7522 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 26 Nov 2024 18:44:09 -0500 Subject: [PATCH 1/5] chore: remove ctrlc handler --- Cargo.lock | 26 +------------------------ crates/driver/src/context/standalone.rs | 1 + crates/driver/src/driver.rs | 23 ++++++++++++++++++++++ crates/node/Cargo.toml | 2 -- crates/node/src/node.rs | 23 +--------------------- 5 files changed, 26 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3764016..5ced666 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1719,16 +1719,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "ctrlc" -version = "3.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3" -dependencies = [ - "nix 0.29.0", - "windows-sys 0.59.0", -] - [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -2866,7 +2856,6 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", "alloy-transport", - "ctrlc", "hilo-driver", "hilo-engine", "op-alloy-genesis", @@ -2874,7 +2863,6 @@ dependencies = [ "serde", "serde_json", "thiserror 2.0.3", - "tokio", "tracing", "url", ] @@ -4568,18 +4556,6 @@ dependencies = [ "libc", ] -[[package]] -name = "nix" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "cfg_aliases", - "libc", -] - [[package]] name = "node" version = "0.1.0" @@ -7284,7 +7260,7 @@ dependencies = [ "netlink-packet-utils", "netlink-proto", "netlink-sys", - "nix 0.26.4", + "nix", "thiserror 1.0.69", "tokio", ] diff --git a/crates/driver/src/context/standalone.rs b/crates/driver/src/context/standalone.rs index eb302de..a131c98 100644 --- a/crates/driver/src/context/standalone.rs +++ b/crates/driver/src/context/standalone.rs @@ -199,6 +199,7 @@ impl Context for StandaloneContext { let header = self.new_block_rx.recv().await?; let block_num = header.number; + info!("Received new block: {}", block_num); let entry = self.reorg_cache.entry(block_num).or_default(); entry.insert(header.hash, header.clone()); diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs index 3611002..265f12a 100644 --- a/crates/driver/src/driver.rs +++ b/crates/driver/src/driver.rs @@ -4,6 +4,7 @@ use alloy_transport::TransportResult; use kona_derive::{errors::PipelineErrorKind, traits::SignalReceiver, types::ResetSignal}; use kona_driver::{Driver, PipelineCursor, TipCursor}; use std::sync::Arc; +// use tokio::sync::watch::{channel, Receiver}; use hilo_engine::{EngineApi, HiloExecutorConstructor}; use hilo_providers_local::{InMemoryChainProvider, InMemoryL2ChainProvider}; @@ -29,6 +30,9 @@ pub enum DriverError { /// Kona's driver unexpectedly errored. #[error("kona driver error")] DriverErrored, + /// Shutdown signal received. + #[error("shutdown signal received")] + Shutdown, } /// HiloDriver is a wrapper around the `Driver` that @@ -41,6 +45,8 @@ pub struct HiloDriver { pub cfg: Config, /// A constructor for execution. pub exec: Option, + // Receiver to listen for SIGINT signals + // shutdown_recv: Receiver, } impl HiloDriver { @@ -57,6 +63,13 @@ where { /// Constructs a new [HiloDriver]. pub fn new(cfg: Config, ctx: C, exec: HiloExecutorConstructor) -> Self { + // TODO: Receive shutdown signal + // let (_shutdown_sender, shutdown_recv) = channel(false); + // ctrlc::set_handler(move || { + // tracing::info!("sending shut down signal"); + // shutdown_sender.send(true).expect("could not send shutdown signal"); + // }) + // .expect("could not register shutdown handler"); Self { cfg, ctx, exec: Some(exec) } } @@ -142,6 +155,16 @@ where } } + // Exits if a SIGINT signal is received + // fn check_shutdown(&self) -> Result<(), DriverError> { + // if *self.shutdown_recv.borrow() { + // tracing::warn!("shutting down"); + // std::process::exit(1); + // } + // + // Ok(()) + // } + /// Wait for the L2 genesis' corresponding L1 block to be available in the L1 chain. async fn wait_for_l2_genesis_l1_block(&mut self) { loop { diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index f7b5e31..5484fc6 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -26,9 +26,7 @@ alloy-rpc-types-engine = { workspace = true, features = ["jwt", "serde"] } op-alloy-genesis = { workspace = true, features = ["serde"] } # Misc -tokio.workspace = true serde.workspace = true -ctrlc.workspace = true tracing.workspace = true thiserror.workspace = true url = { workspace = true, features = ["serde"] } diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 947754e..bdd73f8 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -2,7 +2,6 @@ use crate::{Config, NodeError, SyncMode}; use hilo_driver::HiloDriver; -use tokio::sync::watch::{channel, Receiver}; /// The core node runner. #[derive(Debug)] @@ -13,20 +12,11 @@ pub struct Node { sync_mode: SyncMode, /// The L2 block hash to begin syncing from checkpoint_hash: Option, - /// Receiver to listen for SIGINT signals - shutdown_recv: Receiver, } impl From for Node { fn from(config: Config) -> Self { - let (shutdown_sender, shutdown_recv) = channel(false); - ctrlc::set_handler(move || { - tracing::info!("shutting down"); - shutdown_sender.send(true).expect("could not send shutdown signal"); - }) - .expect("could not register shutdown handler"); - - Self { config, sync_mode: SyncMode::Full, checkpoint_hash: None, shutdown_recv } + Self { config, sync_mode: SyncMode::Full, checkpoint_hash: None } } } @@ -94,15 +84,4 @@ impl Node { driver.start().await?; Ok(()) } - - /// Exits if a SIGINT signal is received - #[allow(unused)] - fn check_shutdown(&self) -> Result<(), NodeError> { - if *self.shutdown_recv.borrow() { - tracing::warn!("shutting down"); - std::process::exit(0); - } - - Ok(()) - } } From 306fc80c0958ddf770bea090f15464bedce72667 Mon Sep 17 00:00:00 2001 From: refcell Date: Tue, 26 Nov 2024 20:57:51 -0500 Subject: [PATCH 2/5] fix: driver --- crates/driver/src/config.rs | 7 ++++++- crates/providers-alloy/src/beacon_client.rs | 6 +++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/crates/driver/src/config.rs b/crates/driver/src/config.rs index 229afa4..73a24d8 100644 --- a/crates/driver/src/config.rs +++ b/crates/driver/src/config.rs @@ -122,6 +122,11 @@ impl Config { .block_info_by_number(l1_origin_number) .await .map_err(|e| ConfigError::ChainProvider(e.to_string()))?; - Ok(PipelineCursor::new(channel_timeout, l1_origin)) + let mut cursor = PipelineCursor::new(channel_timeout, l1_origin); + // TODO: construct a valid tip cursor + let tip = + kona_driver::TipCursor::new(safe_head_info, Default::default(), Default::default()); + cursor.advance(l1_origin, tip); + Ok(cursor) } } diff --git a/crates/providers-alloy/src/beacon_client.rs b/crates/providers-alloy/src/beacon_client.rs index 902e025..16b7030 100644 --- a/crates/providers-alloy/src/beacon_client.rs +++ b/crates/providers-alloy/src/beacon_client.rs @@ -99,7 +99,11 @@ pub struct OnlineBeaconClient { impl OnlineBeaconClient { /// Creates a new [OnlineBeaconClient] from the provided [reqwest::Url]. - pub fn new_http(base: String) -> Self { + pub fn new_http(mut base: String) -> Self { + // If base ends with a slash, remove it + if base.ends_with("/") { + base.remove(base.len() - 1); + } Self { base, inner: Client::new() } } } From 0aaa7c57264c5a49e327fac8d9b1e5e1e7848772 Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 27 Nov 2024 10:50:47 -0500 Subject: [PATCH 3/5] stash: --- Cargo.lock | 2 + crates/driver/src/driver.rs | 11 ++- crates/engine/Cargo.toml | 2 + crates/engine/src/api.rs | 117 ------------------------------ crates/engine/src/client.rs | 121 +++++++++++++++++++++++++++++++ crates/engine/src/constructor.rs | 31 -------- crates/engine/src/controller.rs | 72 ++++++++++++++++++ crates/engine/src/errors.rs | 12 +++ crates/engine/src/lib.rs | 14 +++- crates/engine/src/traits.rs | 44 +++++++++++ crates/node/src/config.rs | 8 +- 11 files changed, 272 insertions(+), 162 deletions(-) delete mode 100644 crates/engine/src/api.rs create mode 100644 crates/engine/src/client.rs delete mode 100644 crates/engine/src/constructor.rs create mode 100644 crates/engine/src/controller.rs create mode 100644 crates/engine/src/errors.rs create mode 100644 crates/engine/src/traits.rs diff --git a/Cargo.lock b/Cargo.lock index 5ced666..0c40bb4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2815,8 +2815,10 @@ dependencies = [ "alloy-rpc-types-engine", "alloy-rpc-types-eth", "alloy-transport-http", + "async-trait", "http-body-util", "kona-driver", + "op-alloy-protocol", "op-alloy-provider", "op-alloy-rpc-types-engine", "thiserror 2.0.3", diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs index 265f12a..7bef997 100644 --- a/crates/driver/src/driver.rs +++ b/crates/driver/src/driver.rs @@ -6,7 +6,7 @@ use kona_driver::{Driver, PipelineCursor, TipCursor}; use std::sync::Arc; // use tokio::sync::watch::{channel, Receiver}; -use hilo_engine::{EngineApi, HiloExecutorConstructor}; +use hilo_engine::{EngineClient, EngineController}; use hilo_providers_local::{InMemoryChainProvider, InMemoryL2ChainProvider}; use crate::{ @@ -15,8 +15,7 @@ use crate::{ }; /// A driver from [kona_driver] that uses hilo-types. -pub type KonaDriver = - Driver; +pub type KonaDriver = Driver; /// An error that can happen when running the driver. #[derive(Debug, thiserror::Error)] @@ -44,14 +43,14 @@ pub struct HiloDriver { /// The driver config. pub cfg: Config, /// A constructor for execution. - pub exec: Option, + pub exec: Option, // Receiver to listen for SIGINT signals // shutdown_recv: Receiver, } impl HiloDriver { /// Creates a new [HiloDriver] with a standalone context. - pub async fn standalone(cfg: Config, exec: HiloExecutorConstructor) -> TransportResult { + pub async fn standalone(cfg: Config, exec: EngineController) -> TransportResult { let ctx = StandaloneContext::new(cfg.l1_rpc_url.clone()).await?; Ok(Self::new(cfg, ctx, exec)) } @@ -62,7 +61,7 @@ where C: Context, { /// Constructs a new [HiloDriver]. - pub fn new(cfg: Config, ctx: C, exec: HiloExecutorConstructor) -> Self { + pub fn new(cfg: Config, ctx: C, exec: EngineController) -> Self { // TODO: Receive shutdown signal // let (_shutdown_sender, shutdown_recv) = channel(false); // ctrlc::set_handler(move || { diff --git a/crates/engine/Cargo.toml b/crates/engine/Cargo.toml index e453758..709dd41 100644 --- a/crates/engine/Cargo.toml +++ b/crates/engine/Cargo.toml @@ -29,9 +29,11 @@ alloy-rpc-types-engine = { workspace = true, features = ["jwt", "serde"] } # Op Alloy op-alloy-provider.workspace = true +op-alloy-protocol.workspace = true op-alloy-rpc-types-engine.workspace = true # Misc +async-trait.workspace = true url.workspace = true tracing.workspace = true tower.workspace = true diff --git a/crates/engine/src/api.rs b/crates/engine/src/api.rs deleted file mode 100644 index 16bb524..0000000 --- a/crates/engine/src/api.rs +++ /dev/null @@ -1,117 +0,0 @@ -//! Contains the engine api client. - -use http_body_util::Full; -use tower::ServiceBuilder; -use tracing::warn; -use url::Url; - -use alloy_consensus::Header; -use alloy_network::AnyNetwork; -use alloy_primitives::{Bytes, B256}; -use alloy_provider::RootProvider; -use alloy_rpc_client::RpcClient; -use alloy_rpc_types_engine::{ForkchoiceState, JwtSecret}; -use alloy_transport_http::{ - hyper_util::{ - client::legacy::{connect::HttpConnector, Client}, - rt::TokioExecutor, - }, - AuthLayer, AuthService, Http, HyperClient, -}; -use kona_driver::Executor; -use op_alloy_provider::ext::engine::OpEngineApi; -use op_alloy_rpc_types_engine::{OpAttributesWithParent, OpPayloadAttributes}; - -/// A Hyper HTTP client with a JWT authentication layer. -type HyperAuthClient> = HyperClient>>; - -/// An external op-geth engine api client -#[derive(Debug, Clone)] -pub struct EngineApi { - /// The inner provider - provider: RootProvider, AnyNetwork>, -} -/// A validation error -#[derive(Debug, thiserror::Error)] -pub enum ValidationError { - /// An RPC error - #[error("RPC error")] - RpcError, -} - -/// An executor error. -#[derive(Debug, thiserror::Error)] -pub enum ExecutorError { - /// An error occurred while executing the payload. - #[error("An error occurred while executing the payload")] - PayloadError, - /// An error occurred while computing the output root. - #[error("An error occurred while computing the output root")] - OutputRootError, -} - -impl Executor for EngineApi { - type Error = ExecutorError; - - /// Execute the given payload attributes. - fn execute_payload(&mut self, _: OpPayloadAttributes) -> Result<&Header, Self::Error> { - todo!() - } - - /// Computes the output root. - fn compute_output_root(&mut self) -> Result { - todo!() - } -} - -impl EngineApi { - /// Creates a new [`EngineApi`] from the provided [Url] and [JwtSecret]. - pub fn new_http(url: Url, jwt: JwtSecret) -> Self { - let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); - - let auth_layer = AuthLayer::new(jwt); - let service = ServiceBuilder::new().layer(auth_layer).service(hyper_client); - - let layer_transport = HyperClient::with_service(service); - let http_hyper = Http::with_client(layer_transport, url); - let rpc_client = RpcClient::new(http_hyper, true); - let provider = RootProvider::<_, AnyNetwork>::new(rpc_client); - - Self { provider } - } - - /// Validates the payload using the Fork Choice Update API. - pub async fn validate_payload_fcu( - &self, - attributes: &OpAttributesWithParent, - ) -> Result { - // TODO: use the correct values - let fork_choice_state = ForkchoiceState { - head_block_hash: attributes.parent.block_info.hash, - finalized_block_hash: attributes.parent.block_info.hash, - safe_block_hash: attributes.parent.block_info.hash, - }; - - let attributes = Some(attributes.attributes.clone()); - let fcu = self - .provider - .fork_choice_updated_v2(fork_choice_state, attributes) - .await - .map_err(|_| ValidationError::RpcError)?; - - if fcu.is_valid() { - Ok(true) - } else { - warn!(status = %fcu.payload_status, "Engine API returned invalid fork choice update"); - Ok(false) - } - } -} - -impl std::ops::Deref for EngineApi { - type Target = RootProvider, AnyNetwork>; - - fn deref(&self) -> &Self::Target { - &self.provider - } -} diff --git a/crates/engine/src/client.rs b/crates/engine/src/client.rs new file mode 100644 index 0000000..e5f5c76 --- /dev/null +++ b/crates/engine/src/client.rs @@ -0,0 +1,121 @@ +//! Contains the engine api client. + +use async_trait::async_trait; +use http_body_util::Full; +use tower::ServiceBuilder; +use url::Url; + +use kona_driver::Executor; + +use alloy_consensus::Header; +use alloy_eips::eip1898::BlockNumberOrTag; +use alloy_network::AnyNetwork; +use alloy_primitives::{Bytes, B256}; +use alloy_provider::RootProvider; +use alloy_rpc_client::RpcClient; +use alloy_rpc_types_engine::{ + ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtSecret, PayloadId, PayloadStatus, +}; +use alloy_transport_http::{ + hyper_util::{ + client::legacy::{connect::HttpConnector, Client}, + rt::TokioExecutor, + }, + AuthLayer, AuthService, Http, HyperClient, +}; + +use op_alloy_protocol::L2BlockInfo; +use op_alloy_provider::ext::engine::OpEngineApi; +use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelopeV3, OpPayloadAttributes}; + +use crate::{Engine, EngineApiError}; + +/// A Hyper HTTP client with a JWT authentication layer. +type HyperAuthClient> = HyperClient>>; + +/// An external engine api client +#[derive(Debug, Clone)] +pub struct EngineClient { + /// The inner provider + provider: RootProvider, AnyNetwork>, +} + +impl EngineClient { + /// Creates a new [`EngineClient`] from the provided [Url] and [JwtSecret]. + pub fn new_http(url: Url, jwt: JwtSecret) -> Self { + let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); + + let auth_layer = AuthLayer::new(jwt); + let service = ServiceBuilder::new().layer(auth_layer).service(hyper_client); + + let layer_transport = HyperClient::with_service(service); + let http_hyper = Http::with_client(layer_transport, url); + let rpc_client = RpcClient::new(http_hyper, true); + let provider = RootProvider::<_, AnyNetwork>::new(rpc_client); + + Self { provider } + } +} + +impl Executor for EngineClient { + type Error = EngineApiError; + + /// Execute the given payload attributes. + fn execute_payload(&mut self, _: OpPayloadAttributes) -> Result<&Header, Self::Error> { + todo!() + } + + /// Computes the output root. + fn compute_output_root(&mut self) -> Result { + todo!() + } +} + +#[async_trait] +impl Engine for EngineClient { + type Error = EngineApiError; + + async fn get_payload( + &self, + payload_id: PayloadId, + ) -> Result { + self.provider.get_payload_v3(payload_id).await.map_err(|_| EngineApiError::PayloadError) + } + + async fn forkchoice_update( + &self, + state: ForkchoiceState, + attr: OpPayloadAttributes, + ) -> Result { + self.provider + .fork_choice_updated_v2(state, Some(attr)) + .await + .map_err(|_| EngineApiError::PayloadError) + } + + async fn new_payload( + &self, + payload: ExecutionPayloadV3, + parent_beacon_block_root: B256, + ) -> Result { + self.provider + .new_payload_v3(payload, parent_beacon_block_root) + .await + .map_err(|_| EngineApiError::PayloadError) + } + + async fn l2_block_ref_by_label(&self, _: BlockNumberOrTag) -> Result { + // Convert the payload into an L2 block info. + // go impl uses an L2 client and fetches block by number, converting block to payload and + // payload to L2 block info. + todo!("implement l2_block_ref_by_label for the engine client") + } +} + +impl std::ops::Deref for EngineClient { + type Target = RootProvider, AnyNetwork>; + + fn deref(&self) -> &Self::Target { + &self.provider + } +} diff --git a/crates/engine/src/constructor.rs b/crates/engine/src/constructor.rs deleted file mode 100644 index b8238a5..0000000 --- a/crates/engine/src/constructor.rs +++ /dev/null @@ -1,31 +0,0 @@ -//! A constructor wrapping the engine api client. - -use crate::EngineApi; -use alloy_consensus::{Header, Sealed}; -use alloy_rpc_types_engine::JwtSecret; -use kona_driver::ExecutorConstructor; -use url::Url; - -/// An executor constructor. -#[derive(Clone, Debug)] -pub struct HiloExecutorConstructor { - /// The L2 engine API URL - pub l2_engine_url: Url, - /// Engine API JWT Secret. - /// This is used to authenticate with the engine API - pub jwt_secret: JwtSecret, -} - -impl HiloExecutorConstructor { - /// Creates a new executor constructor. - pub const fn new_http(engine: Url, jwt: JwtSecret) -> Self { - Self { l2_engine_url: engine, jwt_secret: jwt } - } -} - -impl ExecutorConstructor for HiloExecutorConstructor { - /// Constructs the executor. - fn new_executor(&self, _: Sealed
) -> EngineApi { - EngineApi::new_http(self.l2_engine_url.clone(), self.jwt_secret) - } -} diff --git a/crates/engine/src/controller.rs b/crates/engine/src/controller.rs new file mode 100644 index 0000000..64b7815 --- /dev/null +++ b/crates/engine/src/controller.rs @@ -0,0 +1,72 @@ +//! Contains the engine controller. +//! +//! See: + +use alloy_consensus::{Header, Sealed}; +use alloy_rpc_types_engine::JwtSecret; +use kona_driver::ExecutorConstructor; +use url::Url; + +use crate::EngineClient; + +/// The engine controller. +#[derive(Debug, Clone)] +pub struct EngineController { + /// The L2 engine API URL + pub l2_engine_url: Url, + /// Engine API JWT Secret. + /// This is used to authenticate with the engine API + pub jwt_secret: JwtSecret, + /// The inner engine client which implements [crate::Engine]. + #[allow(unused)] + client: EngineClient, +} + +impl EngineController { + /// Creates a new engine controller. + pub fn new(l2_engine_url: Url, jwt_secret: JwtSecret) -> Self { + let client = EngineClient::new_http(l2_engine_url.clone(), jwt_secret); + Self { l2_engine_url, jwt_secret, client } + } +} + +impl ExecutorConstructor for EngineController { + fn new_executor(&self, _: Sealed
) -> EngineClient { + EngineClient::new_http(self.l2_engine_url.clone(), self.jwt_secret) + } +} + +// /// A validation error +// #[derive(Debug, thiserror::Error)] +// pub enum ValidationError { +// /// An RPC error +// #[error("RPC error")] +// RpcError, +// } + +// Validates the payload using the Fork Choice Update API. +// pub async fn validate_payload_fcu( +// &self, +// attributes: &OpAttributesWithParent, +// ) -> Result { +// // TODO: use the correct values +// let fork_choice_state = ForkchoiceState { +// head_block_hash: attributes.parent.block_info.hash, +// finalized_block_hash: attributes.parent.block_info.hash, +// safe_block_hash: attributes.parent.block_info.hash, +// }; +// +// let attributes = Some(attributes.attributes.clone()); +// let fcu = self +// .provider +// .fork_choice_updated_v2(fork_choice_state, attributes) +// .await +// .map_err(|_| ValidationError::RpcError)?; +// +// if fcu.is_valid() { +// Ok(true) +// } else { +// warn!(status = %fcu.payload_status, "Engine API returned invalid fork choice update"); +// Ok(false) +// } +// } diff --git a/crates/engine/src/errors.rs b/crates/engine/src/errors.rs new file mode 100644 index 0000000..603e129 --- /dev/null +++ b/crates/engine/src/errors.rs @@ -0,0 +1,12 @@ +//! Error types + +/// An error that originated from the engine api. +#[derive(Debug, thiserror::Error)] +pub enum EngineApiError { + /// An error occurred while executing the payload. + #[error("An error occurred while executing the payload")] + PayloadError, + /// An error occurred while computing the output root. + #[error("An error occurred while computing the output root")] + OutputRootError, +} diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 7a1adce..f6e7231 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -6,11 +6,17 @@ mod validation; pub use validation::ValidationMode; -mod constructor; -pub use constructor::HiloExecutorConstructor; +mod traits; +pub use traits::Engine; -mod api; -pub use api::EngineApi; +mod errors; +pub use errors::EngineApiError; + +mod controller; +pub use controller::EngineController; + +mod client; +pub use client::EngineClient; mod validator; pub use validator::{TrustedPayloadValidator, TrustedValidationError}; diff --git a/crates/engine/src/traits.rs b/crates/engine/src/traits.rs new file mode 100644 index 0000000..ab35085 --- /dev/null +++ b/crates/engine/src/traits.rs @@ -0,0 +1,44 @@ +//! Contains the engine api trait. + +use alloy_eips::eip1898::BlockNumberOrTag; +use alloy_primitives::B256; +use alloy_rpc_types_engine::{ + ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, +}; +use async_trait::async_trait; +use op_alloy_protocol::L2BlockInfo; +use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelopeV3, OpPayloadAttributes}; + +/// Engine trait specifies the interface between the hilo-engine and the engine-api. +/// +/// See: +#[async_trait] +pub trait Engine { + type Error: core::fmt::Debug; + + /// Gets a payload for the given payload id. + async fn get_payload( + &self, + payload_id: PayloadId, + ) -> Result; + + /// Updates the forkchoice state with the given payload attributes. + async fn forkchoice_update( + &self, + state: ForkchoiceState, + attr: OpPayloadAttributes, + ) -> Result; + + /// Creates a new payload with the given payload and parent beacon block root. + async fn new_payload( + &self, + payload: ExecutionPayloadV3, + parent_beacon_block_root: B256, + ) -> Result; + + /// Returns the [L2BlockInfo] for the given label. + async fn l2_block_ref_by_label( + &self, + label: BlockNumberOrTag, + ) -> Result; +} diff --git a/crates/node/src/config.rs b/crates/node/src/config.rs index d61a832..61d4ab1 100644 --- a/crates/node/src/config.rs +++ b/crates/node/src/config.rs @@ -2,7 +2,7 @@ use crate::SyncMode; use alloy_rpc_types_engine::JwtSecret; -use hilo_engine::HiloExecutorConstructor; +use hilo_engine::EngineController; use op_alloy_genesis::RollupConfig; use serde::{Deserialize, Serialize}; use url::Url; @@ -56,9 +56,9 @@ pub struct Config { } impl Config { - /// Constructs a new [HiloExecutorConstructor] from the config. - pub fn executor(&self) -> HiloExecutorConstructor { - HiloExecutorConstructor::new_http(self.l2_engine_url.clone(), self.jwt_secret) + /// Constructs a new [EngineController] from the config. + pub fn executor(&self) -> EngineController { + EngineController::new(self.l2_engine_url.clone(), self.jwt_secret) } } From 6343cf04befc36c8e67871fdfe429d8c4561ef67 Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 27 Nov 2024 13:52:32 -0500 Subject: [PATCH 4/5] stash --- Cargo.lock | 25 ++--- Cargo.toml | 4 +- crates/driver/src/driver.rs | 8 ++ crates/engine/Cargo.toml | 2 + crates/engine/src/client.rs | 23 +---- crates/engine/src/controller.rs | 108 +++++++++++++++++--- crates/engine/src/traits.rs | 2 +- crates/providers-alloy/src/beacon_client.rs | 2 +- crates/providers-alloy/src/blob_provider.rs | 4 +- crates/providers-alloy/src/blobs.rs | 4 +- 10 files changed, 125 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c40bb4..e712def 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2818,6 +2818,7 @@ dependencies = [ "async-trait", "http-body-util", "kona-driver", + "op-alloy-genesis", "op-alloy-protocol", "op-alloy-provider", "op-alloy-rpc-types-engine", @@ -3651,7 +3652,7 @@ dependencies = [ [[package]] name = "kona-derive" version = "0.1.0" -source = "git+https://github.com/anton-rs/kona?branch=rf/chore/driver-advancing#9f2cc0ff5886802da9f0dae4ab601a36ef7b14bf" +source = "git+https://github.com/anton-rs/kona?branch=rf/engine-waiting#b81f76182824348fdde66a4e322af4b7a1511a22" dependencies = [ "alloy-consensus", "alloy-eips", @@ -3670,7 +3671,7 @@ dependencies = [ [[package]] name = "kona-driver" version = "0.1.0" -source = "git+https://github.com/anton-rs/kona?branch=rf/chore/driver-advancing#9f2cc0ff5886802da9f0dae4ab601a36ef7b14bf" +source = "git+https://github.com/anton-rs/kona?branch=rf/engine-waiting#b81f76182824348fdde66a4e322af4b7a1511a22" dependencies = [ "alloy-consensus", "alloy-primitives", @@ -3731,9 +3732,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.164" +version = "0.2.166" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "c2ccc108bbc0b1331bd061864e7cd823c0cab660bbe6970e66e2c0614decde36" [[package]] name = "libloading" @@ -7232,9 +7233,9 @@ dependencies = [ [[package]] name = "roaring" -version = "0.10.6" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f4b84ba6e838ceb47b41de5194a60244fac43d9fe03b71dbe8c5a201081d6d1" +checksum = "f81dc953b2244ddd5e7860cb0bb2a790494b898ef321d4aff8e260efab60cc88" dependencies = [ "bytemuck", "byteorder", @@ -7367,9 +7368,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.18" +version = "0.23.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" +checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" dependencies = [ "once_cell", "ring 0.17.8", @@ -8341,9 +8342,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", @@ -8365,9 +8366,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 38a4b72..5162b65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,8 +37,8 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [patch.crates-io] -kona-derive = { git = "https://github.com/anton-rs/kona", branch = "rf/chore/driver-advancing" } -kona-driver = { git = "https://github.com/anton-rs/kona", branch = "rf/chore/driver-advancing" } +kona-derive = { git = "https://github.com/anton-rs/kona", branch = "rf/engine-waiting" } +kona-driver = { git = "https://github.com/anton-rs/kona", branch = "rf/engine-waiting" } [workspace.dependencies] # Workspace diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs index 7bef997..6cfadd7 100644 --- a/crates/driver/src/driver.rs +++ b/crates/driver/src/driver.rs @@ -154,6 +154,14 @@ where } } + /// Loops until the engine client is online and receives a response from the engine. + async fn await_engine_ready(&self) { + while !self.engine_driver.engine_ready().await { + self.check_shutdown().await; + sleep(Duration::from_secs(1)).await; + } + } + // Exits if a SIGINT signal is received // fn check_shutdown(&self) -> Result<(), DriverError> { // if *self.shutdown_recv.borrow() { diff --git a/crates/engine/Cargo.toml b/crates/engine/Cargo.toml index 709dd41..5f82ae3 100644 --- a/crates/engine/Cargo.toml +++ b/crates/engine/Cargo.toml @@ -28,6 +28,7 @@ alloy-transport-http = { workspace = true, features = ["jwt-auth"] } alloy-rpc-types-engine = { workspace = true, features = ["jwt", "serde"] } # Op Alloy +op-alloy-genesis.workspace = true op-alloy-provider.workspace = true op-alloy-protocol.workspace = true op-alloy-rpc-types-engine.workspace = true @@ -35,6 +36,7 @@ op-alloy-rpc-types-engine.workspace = true # Misc async-trait.workspace = true url.workspace = true +tokio.workspace = true tracing.workspace = true tower.workspace = true http-body-util.workspace = true diff --git a/crates/engine/src/client.rs b/crates/engine/src/client.rs index e5f5c76..b2394af 100644 --- a/crates/engine/src/client.rs +++ b/crates/engine/src/client.rs @@ -4,10 +4,6 @@ use async_trait::async_trait; use http_body_util::Full; use tower::ServiceBuilder; use url::Url; - -use kona_driver::Executor; - -use alloy_consensus::Header; use alloy_eips::eip1898::BlockNumberOrTag; use alloy_network::AnyNetwork; use alloy_primitives::{Bytes, B256}; @@ -23,7 +19,6 @@ use alloy_transport_http::{ }, AuthLayer, AuthService, Http, HyperClient, }; - use op_alloy_protocol::L2BlockInfo; use op_alloy_provider::ext::engine::OpEngineApi; use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelopeV3, OpPayloadAttributes}; @@ -57,20 +52,6 @@ impl EngineClient { } } -impl Executor for EngineClient { - type Error = EngineApiError; - - /// Execute the given payload attributes. - fn execute_payload(&mut self, _: OpPayloadAttributes) -> Result<&Header, Self::Error> { - todo!() - } - - /// Computes the output root. - fn compute_output_root(&mut self) -> Result { - todo!() - } -} - #[async_trait] impl Engine for EngineClient { type Error = EngineApiError; @@ -85,10 +66,10 @@ impl Engine for EngineClient { async fn forkchoice_update( &self, state: ForkchoiceState, - attr: OpPayloadAttributes, + attr: Option, ) -> Result { self.provider - .fork_choice_updated_v2(state, Some(attr)) + .fork_choice_updated_v2(state, attr) .await .map_err(|_| EngineApiError::PayloadError) } diff --git a/crates/engine/src/controller.rs b/crates/engine/src/controller.rs index 64b7815..2420857 100644 --- a/crates/engine/src/controller.rs +++ b/crates/engine/src/controller.rs @@ -2,37 +2,113 @@ //! //! See: -use alloy_consensus::{Header, Sealed}; -use alloy_rpc_types_engine::JwtSecret; -use kona_driver::ExecutorConstructor; use url::Url; +use std::time::Duration; +use tokio::time::sleep; +use kona_driver::Executor; +use alloy_primitives::B256; +use async_trait::async_trait; +use alloy_consensus::{Sealed, Header}; +use alloy_rpc_types_engine::{ForkchoiceState, JwtSecret}; +use op_alloy_genesis::RollupConfig; +use op_alloy_protocol::BlockInfo; +use op_alloy_rpc_types_engine::OpPayloadAttributes; -use crate::EngineClient; +use crate::{EngineApiError, Engine, EngineClient}; + +/// L1 epoch block +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +pub struct Epoch { + /// The block number + pub number: u64, + /// The block hash + pub hash: B256, + /// The block timestamp + pub timestamp: u64, +} /// The engine controller. #[derive(Debug, Clone)] pub struct EngineController { - /// The L2 engine API URL - pub l2_engine_url: Url, - /// Engine API JWT Secret. - /// This is used to authenticate with the engine API - pub jwt_secret: JwtSecret, /// The inner engine client which implements [crate::Engine]. - #[allow(unused)] - client: EngineClient, + pub client: EngineClient, + /// Blocktime of the L2 chain + pub blocktime: u64, + /// Most recent block found on the p2p network + pub unsafe_head: BlockInfo, + /// Most recent block that can be derived from L1 data + pub safe_head: BlockInfo, + /// Batch epoch of the safe head + pub safe_epoch: Epoch, + /// Most recent block that can be derived from finalized L1 data + pub finalized_head: BlockInfo, + /// Batch epoch of the finalized head + pub finalized_epoch: Epoch, } impl EngineController { /// Creates a new engine controller. - pub fn new(l2_engine_url: Url, jwt_secret: JwtSecret) -> Self { + pub fn new( + l2_engine_url: Url, + jwt_secret: JwtSecret, + finalized_head: BlockInfo, + finalized_epoch: Epoch, + config: &RollupConfig, + ) -> Self { let client = EngineClient::new_http(l2_engine_url.clone(), jwt_secret); - Self { l2_engine_url, jwt_secret, client } + Self { + blocktime: config.block_time, + unsafe_head: finalized_head, + safe_head: finalized_head, + safe_epoch: finalized_epoch, + finalized_head, + finalized_epoch, + client, + } + } + + /// Creates a [ForkchoiceState]: + /// - `head_block` = `unsafe_head` + /// - `safe_block` = `safe_head` + /// - `finalized_block` = `finalized_head` + pub fn create_forkchoice_state(&self) -> ForkchoiceState { + ForkchoiceState { + head_block_hash: self.unsafe_head.hash, + safe_block_hash: self.safe_head.hash, + finalized_block_hash: self.finalized_head.hash, + } } } -impl ExecutorConstructor for EngineController { - fn new_executor(&self, _: Sealed
) -> EngineClient { - EngineClient::new_http(self.l2_engine_url.clone(), self.jwt_secret) +#[async_trait] +impl Executor for EngineController { + type Error = EngineApiError; + + /// Waits for the engine to be ready. + async fn wait_until_ready(&mut self) { + let forkchoice = self.create_forkchoice_state(); + // Loop until the forkchoice is updated + while !self.client + .forkchoice_update(forkchoice, None) + .await + .is_ok() { + sleep(Duration::from_secs(1)).await; + } + } + + /// Updates the safe head. + fn update_safe_head(&mut self, _: Sealed
) { + todo!() + } + + /// Execute the given payload attributes. + fn execute_payload(&mut self, _: OpPayloadAttributes) -> Result<&Header, Self::Error> { + todo!() + } + + /// Computes the output root. + fn compute_output_root(&mut self) -> Result { + todo!() } } diff --git a/crates/engine/src/traits.rs b/crates/engine/src/traits.rs index ab35085..61d7082 100644 --- a/crates/engine/src/traits.rs +++ b/crates/engine/src/traits.rs @@ -26,7 +26,7 @@ pub trait Engine { async fn forkchoice_update( &self, state: ForkchoiceState, - attr: OpPayloadAttributes, + attr: Option, ) -> Result; /// Creates a new payload with the given payload and parent beacon block root. diff --git a/crates/providers-alloy/src/beacon_client.rs b/crates/providers-alloy/src/beacon_client.rs index 16b7030..66c74b0 100644 --- a/crates/providers-alloy/src/beacon_client.rs +++ b/crates/providers-alloy/src/beacon_client.rs @@ -1,8 +1,8 @@ //! Contains an online implementation of the `BeaconClient` trait. +use alloy_eips::eip4844::IndexedBlobHash; use alloy_rpc_types_beacon::sidecar::{BeaconBlobBundle, BlobData}; use async_trait::async_trait; -use kona_derive::sources::IndexedBlobHash; use reqwest::Client; use std::{ boxed::Box, diff --git a/crates/providers-alloy/src/blob_provider.rs b/crates/providers-alloy/src/blob_provider.rs index e0515f2..3519d45 100644 --- a/crates/providers-alloy/src/blob_provider.rs +++ b/crates/providers-alloy/src/blob_provider.rs @@ -2,11 +2,11 @@ use std::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc, vec::Vec}; -use alloy_eips::eip4844::{Blob, BlobTransactionSidecar}; +use alloy_eips::eip4844::{Blob, BlobTransactionSidecar, IndexedBlobHash}; use alloy_primitives::{map::HashMap, B256}; use async_trait::async_trait; use eyre::{eyre, Result}; -use kona_derive::{errors::BlobProviderError, sources::IndexedBlobHash, traits::BlobProvider}; +use kona_derive::{errors::BlobProviderError, traits::BlobProvider}; use op_alloy_protocol::BlockInfo; use parking_lot::Mutex; use tracing::warn; diff --git a/crates/providers-alloy/src/blobs.rs b/crates/providers-alloy/src/blobs.rs index 26df39c..624497b 100644 --- a/crates/providers-alloy/src/blobs.rs +++ b/crates/providers-alloy/src/blobs.rs @@ -1,9 +1,9 @@ //! Contains an online implementation of the `BlobProvider` trait. -use alloy_eips::eip4844::{Blob, BlobTransactionSidecarItem}; +use alloy_eips::eip4844::{Blob, BlobTransactionSidecarItem, IndexedBlobHash}; use alloy_rpc_types_beacon::sidecar::BlobData; use async_trait::async_trait; -use kona_derive::{errors::BlobProviderError, sources::IndexedBlobHash, traits::BlobProvider}; +use kona_derive::{errors::BlobProviderError, traits::BlobProvider}; use op_alloy_protocol::BlockInfo; use std::{ boxed::Box, From 83ab26d27415466d38bb44961f409f469ea5e98e Mon Sep 17 00:00:00 2001 From: refcell Date: Wed, 27 Nov 2024 15:03:18 -0500 Subject: [PATCH 5/5] engine-controller --- Cargo.lock | 2 +- crates/driver/Cargo.toml | 1 + crates/driver/src/config.rs | 21 +++++++++++ crates/driver/src/driver.rs | 42 ++++++++------------- crates/engine/src/client.rs | 8 ++-- crates/engine/src/controller.rs | 25 ++++++------ crates/node/Cargo.toml | 1 - crates/node/src/config.rs | 9 +---- crates/node/src/node.rs | 3 +- crates/providers-alloy/src/beacon_client.rs | 2 +- crates/providers-alloy/src/blobs.rs | 18 ++++----- 11 files changed, 67 insertions(+), 65 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e712def..c2bd540 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2778,6 +2778,7 @@ dependencies = [ "alloy-primitives", "alloy-provider", "alloy-rpc-types", + "alloy-rpc-types-engine", "alloy-rpc-types-eth", "alloy-transport", "async-trait", @@ -2860,7 +2861,6 @@ dependencies = [ "alloy-rpc-types-engine", "alloy-transport", "hilo-driver", - "hilo-engine", "op-alloy-genesis", "op-alloy-registry", "serde", diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index 0e19258..6ade2ed 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -28,6 +28,7 @@ alloy-network.workspace = true alloy-transport.workspace = true alloy-consensus.workspace = true alloy-rpc-types-eth.workspace = true +alloy-rpc-types-engine = { workspace = true, features = ["jwt", "serde"] } alloy-provider = { workspace = true, features = ["ipc", "ws", "reqwest"] } alloy-rpc-types = { workspace = true, features = ["ssz"] } alloy-primitives = { workspace = true, features = ["map"] } diff --git a/crates/driver/src/config.rs b/crates/driver/src/config.rs index 73a24d8..6982e36 100644 --- a/crates/driver/src/config.rs +++ b/crates/driver/src/config.rs @@ -1,5 +1,6 @@ //! Configuration for the Hilo Driver. +use alloy_rpc_types_engine::JwtSecret; use kona_derive::traits::ChainProvider; use kona_driver::PipelineCursor; use op_alloy_genesis::RollupConfig; @@ -46,10 +47,30 @@ pub struct Config { pub rollup_config: RollupConfig, /// The hilo-node RPC server pub rpc_url: Option, + /// Engine API JWT Secret. + /// This is used to authenticate with the engine API + #[serde(deserialize_with = "deserialize_jwt_secret", serialize_with = "as_hex")] + pub jwt_secret: JwtSecret, /// The cache size for in-memory providers. pub cache_size: usize, } +fn as_hex(v: &JwtSecret, serializer: S) -> Result +where + S: serde::ser::Serializer, +{ + let encoded = alloy_primitives::hex::encode(v.as_bytes()); + serializer.serialize_str(&encoded) +} + +fn deserialize_jwt_secret<'de, D>(deserializer: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let s: &str = serde::de::Deserialize::deserialize(deserializer)?; + JwtSecret::from_hex(s).map_err(serde::de::Error::custom) +} + impl Config { /// Construct an [OnlineBlobProviderWithFallback] from the [Config]. pub async fn blob_provider( diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs index 6cfadd7..3bca675 100644 --- a/crates/driver/src/driver.rs +++ b/crates/driver/src/driver.rs @@ -6,7 +6,7 @@ use kona_driver::{Driver, PipelineCursor, TipCursor}; use std::sync::Arc; // use tokio::sync::watch::{channel, Receiver}; -use hilo_engine::{EngineClient, EngineController}; +use hilo_engine::EngineController; use hilo_providers_local::{InMemoryChainProvider, InMemoryL2ChainProvider}; use crate::{ @@ -15,7 +15,7 @@ use crate::{ }; /// A driver from [kona_driver] that uses hilo-types. -pub type KonaDriver = Driver; +pub type KonaDriver = Driver; /// An error that can happen when running the driver. #[derive(Debug, thiserror::Error)] @@ -42,17 +42,13 @@ pub struct HiloDriver { pub ctx: C, /// The driver config. pub cfg: Config, - /// A constructor for execution. - pub exec: Option, - // Receiver to listen for SIGINT signals - // shutdown_recv: Receiver, } impl HiloDriver { /// Creates a new [HiloDriver] with a standalone context. - pub async fn standalone(cfg: Config, exec: EngineController) -> TransportResult { + pub async fn standalone(cfg: Config) -> TransportResult { let ctx = StandaloneContext::new(cfg.l1_rpc_url.clone()).await?; - Ok(Self::new(cfg, ctx, exec)) + Ok(Self::new(cfg, ctx)) } } @@ -61,15 +57,8 @@ where C: Context, { /// Constructs a new [HiloDriver]. - pub fn new(cfg: Config, ctx: C, exec: EngineController) -> Self { - // TODO: Receive shutdown signal - // let (_shutdown_sender, shutdown_recv) = channel(false); - // ctrlc::set_handler(move || { - // tracing::info!("sending shut down signal"); - // shutdown_sender.send(true).expect("could not send shutdown signal"); - // }) - // .expect("could not register shutdown handler"); - Self { cfg, ctx, exec: Some(exec) } + pub fn new(cfg: Config, ctx: C) -> Self { + Self { cfg, ctx } } /// Initializes the [HiloPipeline]. @@ -89,7 +78,13 @@ where pub async fn init_driver(&mut self) -> Result { let cursor = self.cfg.tip_cursor().await?; let pipeline = self.init_pipeline(cursor.clone()).await?; - let exec = self.exec.take().expect("Executor not set"); + let exec = EngineController::new( + self.cfg.l2_engine_url.clone(), + self.cfg.jwt_secret, + cursor.origin(), + cursor.l2_safe_head().block_info.into(), + &self.cfg.rollup_config, + ); Ok(Driver::new(cursor, exec, pipeline)) } @@ -134,6 +129,9 @@ where let mut driver = self.init_driver().await?; info!("Driver initialized"); + // Wait until the engine is ready + driver.wait_for_executor().await; + // Step 3: Start the processing loop loop { tokio::select! { @@ -154,14 +152,6 @@ where } } - /// Loops until the engine client is online and receives a response from the engine. - async fn await_engine_ready(&self) { - while !self.engine_driver.engine_ready().await { - self.check_shutdown().await; - sleep(Duration::from_secs(1)).await; - } - } - // Exits if a SIGINT signal is received // fn check_shutdown(&self) -> Result<(), DriverError> { // if *self.shutdown_recv.borrow() { diff --git a/crates/engine/src/client.rs b/crates/engine/src/client.rs index b2394af..3717e83 100644 --- a/crates/engine/src/client.rs +++ b/crates/engine/src/client.rs @@ -1,9 +1,5 @@ //! Contains the engine api client. -use async_trait::async_trait; -use http_body_util::Full; -use tower::ServiceBuilder; -use url::Url; use alloy_eips::eip1898::BlockNumberOrTag; use alloy_network::AnyNetwork; use alloy_primitives::{Bytes, B256}; @@ -19,9 +15,13 @@ use alloy_transport_http::{ }, AuthLayer, AuthService, Http, HyperClient, }; +use async_trait::async_trait; +use http_body_util::Full; use op_alloy_protocol::L2BlockInfo; use op_alloy_provider::ext::engine::OpEngineApi; use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelopeV3, OpPayloadAttributes}; +use tower::ServiceBuilder; +use url::Url; use crate::{Engine, EngineApiError}; diff --git a/crates/engine/src/controller.rs b/crates/engine/src/controller.rs index 2420857..3778949 100644 --- a/crates/engine/src/controller.rs +++ b/crates/engine/src/controller.rs @@ -2,19 +2,19 @@ //! //! See: -use url::Url; -use std::time::Duration; -use tokio::time::sleep; -use kona_driver::Executor; +use alloy_consensus::{Header, Sealed}; use alloy_primitives::B256; -use async_trait::async_trait; -use alloy_consensus::{Sealed, Header}; use alloy_rpc_types_engine::{ForkchoiceState, JwtSecret}; +use async_trait::async_trait; +use kona_driver::Executor; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::BlockInfo; use op_alloy_rpc_types_engine::OpPayloadAttributes; +use std::time::Duration; +use tokio::time::sleep; +use url::Url; -use crate::{EngineApiError, Engine, EngineClient}; +use crate::{Engine, EngineApiError, EngineClient}; /// L1 epoch block #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] @@ -27,6 +27,12 @@ pub struct Epoch { pub timestamp: u64, } +impl From for Epoch { + fn from(block: BlockInfo) -> Self { + Self { number: block.number, hash: block.hash, timestamp: block.timestamp } + } +} + /// The engine controller. #[derive(Debug, Clone)] pub struct EngineController { @@ -88,10 +94,7 @@ impl Executor for EngineController { async fn wait_until_ready(&mut self) { let forkchoice = self.create_forkchoice_state(); // Loop until the forkchoice is updated - while !self.client - .forkchoice_update(forkchoice, None) - .await - .is_ok() { + while !self.client.forkchoice_update(forkchoice, None).await.is_ok_and(|u| u.is_valid()) { sleep(Duration::from_secs(1)).await; } } diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 5484fc6..4833ca0 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -14,7 +14,6 @@ rust-version.workspace = true [dependencies] # Local -hilo-engine.workspace = true hilo-driver.workspace = true # Alloy diff --git a/crates/node/src/config.rs b/crates/node/src/config.rs index 61d4ab1..3a411e3 100644 --- a/crates/node/src/config.rs +++ b/crates/node/src/config.rs @@ -2,7 +2,6 @@ use crate::SyncMode; use alloy_rpc_types_engine::JwtSecret; -use hilo_engine::EngineController; use op_alloy_genesis::RollupConfig; use serde::{Deserialize, Serialize}; use url::Url; @@ -55,13 +54,6 @@ pub struct Config { pub cache_size: usize, } -impl Config { - /// Constructs a new [EngineController] from the config. - pub fn executor(&self) -> EngineController { - EngineController::new(self.l2_engine_url.clone(), self.jwt_secret) - } -} - impl From for hilo_driver::Config { fn from(config: Config) -> Self { hilo_driver::Config { @@ -74,6 +66,7 @@ impl From for hilo_driver::Config { rollup_config: config.rollup_config, rpc_url: config.rpc_url, cache_size: config.cache_size, + jwt_secret: config.jwt_secret, } } } diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index bdd73f8..24ac22d 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -79,8 +79,7 @@ impl Node { /// Creates and starts the [HiloDriver] which handles the derivation sync process. async fn start_driver(&self) -> Result<(), NodeError> { let cfg = self.config.clone().into(); - let exec = self.config.executor(); - let mut driver = HiloDriver::standalone(cfg, exec).await?; + let mut driver = HiloDriver::standalone(cfg).await?; driver.start().await?; Ok(()) } diff --git a/crates/providers-alloy/src/beacon_client.rs b/crates/providers-alloy/src/beacon_client.rs index 66c74b0..719e47c 100644 --- a/crates/providers-alloy/src/beacon_client.rs +++ b/crates/providers-alloy/src/beacon_client.rs @@ -138,7 +138,7 @@ impl BeaconClient for OnlineBeaconClient { let mut sidecars = Vec::with_capacity(hashes.len()); hashes.iter().for_each(|hash| { if let Some(sidecar) = - raw_response.data.iter().find(|sidecar| sidecar.index == hash.index as u64) + raw_response.data.iter().find(|sidecar| sidecar.index == hash.index) { sidecars.push(sidecar.clone()); } diff --git a/crates/providers-alloy/src/blobs.rs b/crates/providers-alloy/src/blobs.rs index 624497b..67c6adc 100644 --- a/crates/providers-alloy/src/blobs.rs +++ b/crates/providers-alloy/src/blobs.rs @@ -109,10 +109,10 @@ impl OnlineBlobProvider { let sidecars = self.fetch_sidecars(slot, blob_hashes).await?; // Filter blob sidecars that match the indicies in the specified list. - let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::>(); + let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::>(); let filtered = sidecars .into_iter() - .filter(|s| blob_hash_indicies.contains(&(s.index as usize))) + .filter(|s| blob_hash_indicies.contains(&s.index)) .collect::>(); // Validate the correct number of blob sidecars were retrieved. @@ -165,10 +165,7 @@ where let hash = blob_hashes .get(i) .ok_or(BlobProviderError::Backend("Missing blob hash".to_string()))?; - match sidecar.verify_blob(&alloy_eips::eip4844::IndexedBlobHash { - hash: hash.hash, - index: hash.index as u64, - }) { + match sidecar.verify_blob(&IndexedBlobHash { hash: hash.hash, index: hash.index }) { Ok(_) => Ok(sidecar.blob), Err(e) => Err(BlobProviderError::Backend(e.to_string())), } @@ -262,7 +259,7 @@ impl OnlineBlobProviderWithFallback>(); let filtered = sidecars .into_iter() - .filter(|s| blob_hash_indicies.contains(&(s.index as usize))) + .filter(|s| blob_hash_indicies.contains(&s.index)) .collect::>(); // Validate the correct number of blob sidecars were retrieved. @@ -324,10 +321,9 @@ where let hash = blob_hashes.get(i).ok_or(BlobProviderError::Backend( "fallback: failed to get blob hash".to_string(), ))?; - match sidecar.verify_blob(&alloy_eips::eip4844::IndexedBlobHash { - hash: hash.hash, - index: hash.index as u64, - }) { + match sidecar + .verify_blob(&IndexedBlobHash { hash: hash.hash, index: hash.index }) + { Ok(_) => Ok(sidecar.blob), Err(e) => Err(BlobProviderError::Backend(e.to_string())), }