diff --git a/Cargo.lock b/Cargo.lock index 3764016..c2bd540 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1719,16 +1719,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "ctrlc" -version = "3.4.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90eeab0aa92f3f9b4e87f258c72b139c207d251f9cbc1080a0086b86a8870dd3" -dependencies = [ - "nix 0.29.0", - "windows-sys 0.59.0", -] - [[package]] name = "curve25519-dalek" version = "4.1.3" @@ -2788,6 +2778,7 @@ dependencies = [ "alloy-primitives", "alloy-provider", "alloy-rpc-types", + "alloy-rpc-types-engine", "alloy-rpc-types-eth", "alloy-transport", "async-trait", @@ -2825,8 +2816,11 @@ dependencies = [ "alloy-rpc-types-engine", "alloy-rpc-types-eth", "alloy-transport-http", + "async-trait", "http-body-util", "kona-driver", + "op-alloy-genesis", + "op-alloy-protocol", "op-alloy-provider", "op-alloy-rpc-types-engine", "thiserror 2.0.3", @@ -2866,15 +2860,12 @@ dependencies = [ "alloy-primitives", "alloy-rpc-types-engine", "alloy-transport", - "ctrlc", "hilo-driver", - "hilo-engine", "op-alloy-genesis", "op-alloy-registry", "serde", "serde_json", "thiserror 2.0.3", - "tokio", "tracing", "url", ] @@ -3661,7 +3652,7 @@ dependencies = [ [[package]] name = "kona-derive" version = "0.1.0" -source = "git+https://github.com/anton-rs/kona?branch=rf/chore/driver-advancing#9f2cc0ff5886802da9f0dae4ab601a36ef7b14bf" +source = "git+https://github.com/anton-rs/kona?branch=rf/engine-waiting#b81f76182824348fdde66a4e322af4b7a1511a22" dependencies = [ "alloy-consensus", "alloy-eips", @@ -3680,7 +3671,7 @@ dependencies = [ [[package]] name = "kona-driver" version = "0.1.0" -source = "git+https://github.com/anton-rs/kona?branch=rf/chore/driver-advancing#9f2cc0ff5886802da9f0dae4ab601a36ef7b14bf" +source = "git+https://github.com/anton-rs/kona?branch=rf/engine-waiting#b81f76182824348fdde66a4e322af4b7a1511a22" dependencies = [ "alloy-consensus", "alloy-primitives", @@ -3741,9 +3732,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.164" +version = "0.2.166" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "433bfe06b8c75da9b2e3fbea6e5329ff87748f0b144ef75306e674c3f6f7c13f" +checksum = "c2ccc108bbc0b1331bd061864e7cd823c0cab660bbe6970e66e2c0614decde36" [[package]] name = "libloading" @@ -4568,18 +4559,6 @@ dependencies = [ "libc", ] -[[package]] -name = "nix" -version = "0.29.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" -dependencies = [ - "bitflags 2.6.0", - "cfg-if", - "cfg_aliases", - "libc", -] - [[package]] name = "node" version = "0.1.0" @@ -7254,9 +7233,9 @@ dependencies = [ [[package]] name = "roaring" -version = "0.10.6" +version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f4b84ba6e838ceb47b41de5194a60244fac43d9fe03b71dbe8c5a201081d6d1" +checksum = "f81dc953b2244ddd5e7860cb0bb2a790494b898ef321d4aff8e260efab60cc88" dependencies = [ "bytemuck", "byteorder", @@ -7284,7 +7263,7 @@ dependencies = [ "netlink-packet-utils", "netlink-proto", "netlink-sys", - "nix 0.26.4", + "nix", "thiserror 1.0.69", "tokio", ] @@ -7389,9 +7368,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.18" +version = "0.23.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c9cc1d47e243d655ace55ed38201c19ae02c148ae56412ab8750e8f0166ab7f" +checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" dependencies = [ "once_cell", "ring 0.17.8", @@ -8363,9 +8342,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", @@ -8387,9 +8366,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 38a4b72..5162b65 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,8 +37,8 @@ all-features = true rustdoc-args = ["--cfg", "docsrs"] [patch.crates-io] -kona-derive = { git = "https://github.com/anton-rs/kona", branch = "rf/chore/driver-advancing" } -kona-driver = { git = "https://github.com/anton-rs/kona", branch = "rf/chore/driver-advancing" } +kona-derive = { git = "https://github.com/anton-rs/kona", branch = "rf/engine-waiting" } +kona-driver = { git = "https://github.com/anton-rs/kona", branch = "rf/engine-waiting" } [workspace.dependencies] # Workspace diff --git a/crates/driver/Cargo.toml b/crates/driver/Cargo.toml index 0e19258..6ade2ed 100644 --- a/crates/driver/Cargo.toml +++ b/crates/driver/Cargo.toml @@ -28,6 +28,7 @@ alloy-network.workspace = true alloy-transport.workspace = true alloy-consensus.workspace = true alloy-rpc-types-eth.workspace = true +alloy-rpc-types-engine = { workspace = true, features = ["jwt", "serde"] } alloy-provider = { workspace = true, features = ["ipc", "ws", "reqwest"] } alloy-rpc-types = { workspace = true, features = ["ssz"] } alloy-primitives = { workspace = true, features = ["map"] } diff --git a/crates/driver/src/config.rs b/crates/driver/src/config.rs index 229afa4..6982e36 100644 --- a/crates/driver/src/config.rs +++ b/crates/driver/src/config.rs @@ -1,5 +1,6 @@ //! Configuration for the Hilo Driver. +use alloy_rpc_types_engine::JwtSecret; use kona_derive::traits::ChainProvider; use kona_driver::PipelineCursor; use op_alloy_genesis::RollupConfig; @@ -46,10 +47,30 @@ pub struct Config { pub rollup_config: RollupConfig, /// The hilo-node RPC server pub rpc_url: Option, + /// Engine API JWT Secret. + /// This is used to authenticate with the engine API + #[serde(deserialize_with = "deserialize_jwt_secret", serialize_with = "as_hex")] + pub jwt_secret: JwtSecret, /// The cache size for in-memory providers. pub cache_size: usize, } +fn as_hex(v: &JwtSecret, serializer: S) -> Result +where + S: serde::ser::Serializer, +{ + let encoded = alloy_primitives::hex::encode(v.as_bytes()); + serializer.serialize_str(&encoded) +} + +fn deserialize_jwt_secret<'de, D>(deserializer: D) -> Result +where + D: serde::de::Deserializer<'de>, +{ + let s: &str = serde::de::Deserialize::deserialize(deserializer)?; + JwtSecret::from_hex(s).map_err(serde::de::Error::custom) +} + impl Config { /// Construct an [OnlineBlobProviderWithFallback] from the [Config]. pub async fn blob_provider( @@ -122,6 +143,11 @@ impl Config { .block_info_by_number(l1_origin_number) .await .map_err(|e| ConfigError::ChainProvider(e.to_string()))?; - Ok(PipelineCursor::new(channel_timeout, l1_origin)) + let mut cursor = PipelineCursor::new(channel_timeout, l1_origin); + // TODO: construct a valid tip cursor + let tip = + kona_driver::TipCursor::new(safe_head_info, Default::default(), Default::default()); + cursor.advance(l1_origin, tip); + Ok(cursor) } } diff --git a/crates/driver/src/context/standalone.rs b/crates/driver/src/context/standalone.rs index eb302de..a131c98 100644 --- a/crates/driver/src/context/standalone.rs +++ b/crates/driver/src/context/standalone.rs @@ -199,6 +199,7 @@ impl Context for StandaloneContext { let header = self.new_block_rx.recv().await?; let block_num = header.number; + info!("Received new block: {}", block_num); let entry = self.reorg_cache.entry(block_num).or_default(); entry.insert(header.hash, header.clone()); diff --git a/crates/driver/src/driver.rs b/crates/driver/src/driver.rs index 3611002..3bca675 100644 --- a/crates/driver/src/driver.rs +++ b/crates/driver/src/driver.rs @@ -4,8 +4,9 @@ use alloy_transport::TransportResult; use kona_derive::{errors::PipelineErrorKind, traits::SignalReceiver, types::ResetSignal}; use kona_driver::{Driver, PipelineCursor, TipCursor}; use std::sync::Arc; +// use tokio::sync::watch::{channel, Receiver}; -use hilo_engine::{EngineApi, HiloExecutorConstructor}; +use hilo_engine::EngineController; use hilo_providers_local::{InMemoryChainProvider, InMemoryL2ChainProvider}; use crate::{ @@ -14,8 +15,7 @@ use crate::{ }; /// A driver from [kona_driver] that uses hilo-types. -pub type KonaDriver = - Driver; +pub type KonaDriver = Driver; /// An error that can happen when running the driver. #[derive(Debug, thiserror::Error)] @@ -29,6 +29,9 @@ pub enum DriverError { /// Kona's driver unexpectedly errored. #[error("kona driver error")] DriverErrored, + /// Shutdown signal received. + #[error("shutdown signal received")] + Shutdown, } /// HiloDriver is a wrapper around the `Driver` that @@ -39,15 +42,13 @@ pub struct HiloDriver { pub ctx: C, /// The driver config. pub cfg: Config, - /// A constructor for execution. - pub exec: Option, } impl HiloDriver { /// Creates a new [HiloDriver] with a standalone context. - pub async fn standalone(cfg: Config, exec: HiloExecutorConstructor) -> TransportResult { + pub async fn standalone(cfg: Config) -> TransportResult { let ctx = StandaloneContext::new(cfg.l1_rpc_url.clone()).await?; - Ok(Self::new(cfg, ctx, exec)) + Ok(Self::new(cfg, ctx)) } } @@ -56,8 +57,8 @@ where C: Context, { /// Constructs a new [HiloDriver]. - pub fn new(cfg: Config, ctx: C, exec: HiloExecutorConstructor) -> Self { - Self { cfg, ctx, exec: Some(exec) } + pub fn new(cfg: Config, ctx: C) -> Self { + Self { cfg, ctx } } /// Initializes the [HiloPipeline]. @@ -77,7 +78,13 @@ where pub async fn init_driver(&mut self) -> Result { let cursor = self.cfg.tip_cursor().await?; let pipeline = self.init_pipeline(cursor.clone()).await?; - let exec = self.exec.take().expect("Executor not set"); + let exec = EngineController::new( + self.cfg.l2_engine_url.clone(), + self.cfg.jwt_secret, + cursor.origin(), + cursor.l2_safe_head().block_info.into(), + &self.cfg.rollup_config, + ); Ok(Driver::new(cursor, exec, pipeline)) } @@ -122,6 +129,9 @@ where let mut driver = self.init_driver().await?; info!("Driver initialized"); + // Wait until the engine is ready + driver.wait_for_executor().await; + // Step 3: Start the processing loop loop { tokio::select! { @@ -142,6 +152,16 @@ where } } + // Exits if a SIGINT signal is received + // fn check_shutdown(&self) -> Result<(), DriverError> { + // if *self.shutdown_recv.borrow() { + // tracing::warn!("shutting down"); + // std::process::exit(1); + // } + // + // Ok(()) + // } + /// Wait for the L2 genesis' corresponding L1 block to be available in the L1 chain. async fn wait_for_l2_genesis_l1_block(&mut self) { loop { diff --git a/crates/engine/Cargo.toml b/crates/engine/Cargo.toml index e453758..5f82ae3 100644 --- a/crates/engine/Cargo.toml +++ b/crates/engine/Cargo.toml @@ -28,11 +28,15 @@ alloy-transport-http = { workspace = true, features = ["jwt-auth"] } alloy-rpc-types-engine = { workspace = true, features = ["jwt", "serde"] } # Op Alloy +op-alloy-genesis.workspace = true op-alloy-provider.workspace = true +op-alloy-protocol.workspace = true op-alloy-rpc-types-engine.workspace = true # Misc +async-trait.workspace = true url.workspace = true +tokio.workspace = true tracing.workspace = true tower.workspace = true http-body-util.workspace = true diff --git a/crates/engine/src/api.rs b/crates/engine/src/api.rs deleted file mode 100644 index 16bb524..0000000 --- a/crates/engine/src/api.rs +++ /dev/null @@ -1,117 +0,0 @@ -//! Contains the engine api client. - -use http_body_util::Full; -use tower::ServiceBuilder; -use tracing::warn; -use url::Url; - -use alloy_consensus::Header; -use alloy_network::AnyNetwork; -use alloy_primitives::{Bytes, B256}; -use alloy_provider::RootProvider; -use alloy_rpc_client::RpcClient; -use alloy_rpc_types_engine::{ForkchoiceState, JwtSecret}; -use alloy_transport_http::{ - hyper_util::{ - client::legacy::{connect::HttpConnector, Client}, - rt::TokioExecutor, - }, - AuthLayer, AuthService, Http, HyperClient, -}; -use kona_driver::Executor; -use op_alloy_provider::ext::engine::OpEngineApi; -use op_alloy_rpc_types_engine::{OpAttributesWithParent, OpPayloadAttributes}; - -/// A Hyper HTTP client with a JWT authentication layer. -type HyperAuthClient> = HyperClient>>; - -/// An external op-geth engine api client -#[derive(Debug, Clone)] -pub struct EngineApi { - /// The inner provider - provider: RootProvider, AnyNetwork>, -} -/// A validation error -#[derive(Debug, thiserror::Error)] -pub enum ValidationError { - /// An RPC error - #[error("RPC error")] - RpcError, -} - -/// An executor error. -#[derive(Debug, thiserror::Error)] -pub enum ExecutorError { - /// An error occurred while executing the payload. - #[error("An error occurred while executing the payload")] - PayloadError, - /// An error occurred while computing the output root. - #[error("An error occurred while computing the output root")] - OutputRootError, -} - -impl Executor for EngineApi { - type Error = ExecutorError; - - /// Execute the given payload attributes. - fn execute_payload(&mut self, _: OpPayloadAttributes) -> Result<&Header, Self::Error> { - todo!() - } - - /// Computes the output root. - fn compute_output_root(&mut self) -> Result { - todo!() - } -} - -impl EngineApi { - /// Creates a new [`EngineApi`] from the provided [Url] and [JwtSecret]. - pub fn new_http(url: Url, jwt: JwtSecret) -> Self { - let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); - - let auth_layer = AuthLayer::new(jwt); - let service = ServiceBuilder::new().layer(auth_layer).service(hyper_client); - - let layer_transport = HyperClient::with_service(service); - let http_hyper = Http::with_client(layer_transport, url); - let rpc_client = RpcClient::new(http_hyper, true); - let provider = RootProvider::<_, AnyNetwork>::new(rpc_client); - - Self { provider } - } - - /// Validates the payload using the Fork Choice Update API. - pub async fn validate_payload_fcu( - &self, - attributes: &OpAttributesWithParent, - ) -> Result { - // TODO: use the correct values - let fork_choice_state = ForkchoiceState { - head_block_hash: attributes.parent.block_info.hash, - finalized_block_hash: attributes.parent.block_info.hash, - safe_block_hash: attributes.parent.block_info.hash, - }; - - let attributes = Some(attributes.attributes.clone()); - let fcu = self - .provider - .fork_choice_updated_v2(fork_choice_state, attributes) - .await - .map_err(|_| ValidationError::RpcError)?; - - if fcu.is_valid() { - Ok(true) - } else { - warn!(status = %fcu.payload_status, "Engine API returned invalid fork choice update"); - Ok(false) - } - } -} - -impl std::ops::Deref for EngineApi { - type Target = RootProvider, AnyNetwork>; - - fn deref(&self) -> &Self::Target { - &self.provider - } -} diff --git a/crates/engine/src/client.rs b/crates/engine/src/client.rs new file mode 100644 index 0000000..3717e83 --- /dev/null +++ b/crates/engine/src/client.rs @@ -0,0 +1,102 @@ +//! Contains the engine api client. + +use alloy_eips::eip1898::BlockNumberOrTag; +use alloy_network::AnyNetwork; +use alloy_primitives::{Bytes, B256}; +use alloy_provider::RootProvider; +use alloy_rpc_client::RpcClient; +use alloy_rpc_types_engine::{ + ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, JwtSecret, PayloadId, PayloadStatus, +}; +use alloy_transport_http::{ + hyper_util::{ + client::legacy::{connect::HttpConnector, Client}, + rt::TokioExecutor, + }, + AuthLayer, AuthService, Http, HyperClient, +}; +use async_trait::async_trait; +use http_body_util::Full; +use op_alloy_protocol::L2BlockInfo; +use op_alloy_provider::ext::engine::OpEngineApi; +use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelopeV3, OpPayloadAttributes}; +use tower::ServiceBuilder; +use url::Url; + +use crate::{Engine, EngineApiError}; + +/// A Hyper HTTP client with a JWT authentication layer. +type HyperAuthClient> = HyperClient>>; + +/// An external engine api client +#[derive(Debug, Clone)] +pub struct EngineClient { + /// The inner provider + provider: RootProvider, AnyNetwork>, +} + +impl EngineClient { + /// Creates a new [`EngineClient`] from the provided [Url] and [JwtSecret]. + pub fn new_http(url: Url, jwt: JwtSecret) -> Self { + let hyper_client = Client::builder(TokioExecutor::new()).build_http::>(); + + let auth_layer = AuthLayer::new(jwt); + let service = ServiceBuilder::new().layer(auth_layer).service(hyper_client); + + let layer_transport = HyperClient::with_service(service); + let http_hyper = Http::with_client(layer_transport, url); + let rpc_client = RpcClient::new(http_hyper, true); + let provider = RootProvider::<_, AnyNetwork>::new(rpc_client); + + Self { provider } + } +} + +#[async_trait] +impl Engine for EngineClient { + type Error = EngineApiError; + + async fn get_payload( + &self, + payload_id: PayloadId, + ) -> Result { + self.provider.get_payload_v3(payload_id).await.map_err(|_| EngineApiError::PayloadError) + } + + async fn forkchoice_update( + &self, + state: ForkchoiceState, + attr: Option, + ) -> Result { + self.provider + .fork_choice_updated_v2(state, attr) + .await + .map_err(|_| EngineApiError::PayloadError) + } + + async fn new_payload( + &self, + payload: ExecutionPayloadV3, + parent_beacon_block_root: B256, + ) -> Result { + self.provider + .new_payload_v3(payload, parent_beacon_block_root) + .await + .map_err(|_| EngineApiError::PayloadError) + } + + async fn l2_block_ref_by_label(&self, _: BlockNumberOrTag) -> Result { + // Convert the payload into an L2 block info. + // go impl uses an L2 client and fetches block by number, converting block to payload and + // payload to L2 block info. + todo!("implement l2_block_ref_by_label for the engine client") + } +} + +impl std::ops::Deref for EngineClient { + type Target = RootProvider, AnyNetwork>; + + fn deref(&self) -> &Self::Target { + &self.provider + } +} diff --git a/crates/engine/src/constructor.rs b/crates/engine/src/constructor.rs deleted file mode 100644 index b8238a5..0000000 --- a/crates/engine/src/constructor.rs +++ /dev/null @@ -1,31 +0,0 @@ -//! A constructor wrapping the engine api client. - -use crate::EngineApi; -use alloy_consensus::{Header, Sealed}; -use alloy_rpc_types_engine::JwtSecret; -use kona_driver::ExecutorConstructor; -use url::Url; - -/// An executor constructor. -#[derive(Clone, Debug)] -pub struct HiloExecutorConstructor { - /// The L2 engine API URL - pub l2_engine_url: Url, - /// Engine API JWT Secret. - /// This is used to authenticate with the engine API - pub jwt_secret: JwtSecret, -} - -impl HiloExecutorConstructor { - /// Creates a new executor constructor. - pub const fn new_http(engine: Url, jwt: JwtSecret) -> Self { - Self { l2_engine_url: engine, jwt_secret: jwt } - } -} - -impl ExecutorConstructor for HiloExecutorConstructor { - /// Constructs the executor. - fn new_executor(&self, _: Sealed
) -> EngineApi { - EngineApi::new_http(self.l2_engine_url.clone(), self.jwt_secret) - } -} diff --git a/crates/engine/src/controller.rs b/crates/engine/src/controller.rs new file mode 100644 index 0000000..3778949 --- /dev/null +++ b/crates/engine/src/controller.rs @@ -0,0 +1,151 @@ +//! Contains the engine controller. +//! +//! See: + +use alloy_consensus::{Header, Sealed}; +use alloy_primitives::B256; +use alloy_rpc_types_engine::{ForkchoiceState, JwtSecret}; +use async_trait::async_trait; +use kona_driver::Executor; +use op_alloy_genesis::RollupConfig; +use op_alloy_protocol::BlockInfo; +use op_alloy_rpc_types_engine::OpPayloadAttributes; +use std::time::Duration; +use tokio::time::sleep; +use url::Url; + +use crate::{Engine, EngineApiError, EngineClient}; + +/// L1 epoch block +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] +pub struct Epoch { + /// The block number + pub number: u64, + /// The block hash + pub hash: B256, + /// The block timestamp + pub timestamp: u64, +} + +impl From for Epoch { + fn from(block: BlockInfo) -> Self { + Self { number: block.number, hash: block.hash, timestamp: block.timestamp } + } +} + +/// The engine controller. +#[derive(Debug, Clone)] +pub struct EngineController { + /// The inner engine client which implements [crate::Engine]. + pub client: EngineClient, + /// Blocktime of the L2 chain + pub blocktime: u64, + /// Most recent block found on the p2p network + pub unsafe_head: BlockInfo, + /// Most recent block that can be derived from L1 data + pub safe_head: BlockInfo, + /// Batch epoch of the safe head + pub safe_epoch: Epoch, + /// Most recent block that can be derived from finalized L1 data + pub finalized_head: BlockInfo, + /// Batch epoch of the finalized head + pub finalized_epoch: Epoch, +} + +impl EngineController { + /// Creates a new engine controller. + pub fn new( + l2_engine_url: Url, + jwt_secret: JwtSecret, + finalized_head: BlockInfo, + finalized_epoch: Epoch, + config: &RollupConfig, + ) -> Self { + let client = EngineClient::new_http(l2_engine_url.clone(), jwt_secret); + Self { + blocktime: config.block_time, + unsafe_head: finalized_head, + safe_head: finalized_head, + safe_epoch: finalized_epoch, + finalized_head, + finalized_epoch, + client, + } + } + + /// Creates a [ForkchoiceState]: + /// - `head_block` = `unsafe_head` + /// - `safe_block` = `safe_head` + /// - `finalized_block` = `finalized_head` + pub fn create_forkchoice_state(&self) -> ForkchoiceState { + ForkchoiceState { + head_block_hash: self.unsafe_head.hash, + safe_block_hash: self.safe_head.hash, + finalized_block_hash: self.finalized_head.hash, + } + } +} + +#[async_trait] +impl Executor for EngineController { + type Error = EngineApiError; + + /// Waits for the engine to be ready. + async fn wait_until_ready(&mut self) { + let forkchoice = self.create_forkchoice_state(); + // Loop until the forkchoice is updated + while !self.client.forkchoice_update(forkchoice, None).await.is_ok_and(|u| u.is_valid()) { + sleep(Duration::from_secs(1)).await; + } + } + + /// Updates the safe head. + fn update_safe_head(&mut self, _: Sealed
) { + todo!() + } + + /// Execute the given payload attributes. + fn execute_payload(&mut self, _: OpPayloadAttributes) -> Result<&Header, Self::Error> { + todo!() + } + + /// Computes the output root. + fn compute_output_root(&mut self) -> Result { + todo!() + } +} + +// /// A validation error +// #[derive(Debug, thiserror::Error)] +// pub enum ValidationError { +// /// An RPC error +// #[error("RPC error")] +// RpcError, +// } + +// Validates the payload using the Fork Choice Update API. +// pub async fn validate_payload_fcu( +// &self, +// attributes: &OpAttributesWithParent, +// ) -> Result { +// // TODO: use the correct values +// let fork_choice_state = ForkchoiceState { +// head_block_hash: attributes.parent.block_info.hash, +// finalized_block_hash: attributes.parent.block_info.hash, +// safe_block_hash: attributes.parent.block_info.hash, +// }; +// +// let attributes = Some(attributes.attributes.clone()); +// let fcu = self +// .provider +// .fork_choice_updated_v2(fork_choice_state, attributes) +// .await +// .map_err(|_| ValidationError::RpcError)?; +// +// if fcu.is_valid() { +// Ok(true) +// } else { +// warn!(status = %fcu.payload_status, "Engine API returned invalid fork choice update"); +// Ok(false) +// } +// } diff --git a/crates/engine/src/errors.rs b/crates/engine/src/errors.rs new file mode 100644 index 0000000..603e129 --- /dev/null +++ b/crates/engine/src/errors.rs @@ -0,0 +1,12 @@ +//! Error types + +/// An error that originated from the engine api. +#[derive(Debug, thiserror::Error)] +pub enum EngineApiError { + /// An error occurred while executing the payload. + #[error("An error occurred while executing the payload")] + PayloadError, + /// An error occurred while computing the output root. + #[error("An error occurred while computing the output root")] + OutputRootError, +} diff --git a/crates/engine/src/lib.rs b/crates/engine/src/lib.rs index 7a1adce..f6e7231 100644 --- a/crates/engine/src/lib.rs +++ b/crates/engine/src/lib.rs @@ -6,11 +6,17 @@ mod validation; pub use validation::ValidationMode; -mod constructor; -pub use constructor::HiloExecutorConstructor; +mod traits; +pub use traits::Engine; -mod api; -pub use api::EngineApi; +mod errors; +pub use errors::EngineApiError; + +mod controller; +pub use controller::EngineController; + +mod client; +pub use client::EngineClient; mod validator; pub use validator::{TrustedPayloadValidator, TrustedValidationError}; diff --git a/crates/engine/src/traits.rs b/crates/engine/src/traits.rs new file mode 100644 index 0000000..61d7082 --- /dev/null +++ b/crates/engine/src/traits.rs @@ -0,0 +1,44 @@ +//! Contains the engine api trait. + +use alloy_eips::eip1898::BlockNumberOrTag; +use alloy_primitives::B256; +use alloy_rpc_types_engine::{ + ExecutionPayloadV3, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, +}; +use async_trait::async_trait; +use op_alloy_protocol::L2BlockInfo; +use op_alloy_rpc_types_engine::{OpExecutionPayloadEnvelopeV3, OpPayloadAttributes}; + +/// Engine trait specifies the interface between the hilo-engine and the engine-api. +/// +/// See: +#[async_trait] +pub trait Engine { + type Error: core::fmt::Debug; + + /// Gets a payload for the given payload id. + async fn get_payload( + &self, + payload_id: PayloadId, + ) -> Result; + + /// Updates the forkchoice state with the given payload attributes. + async fn forkchoice_update( + &self, + state: ForkchoiceState, + attr: Option, + ) -> Result; + + /// Creates a new payload with the given payload and parent beacon block root. + async fn new_payload( + &self, + payload: ExecutionPayloadV3, + parent_beacon_block_root: B256, + ) -> Result; + + /// Returns the [L2BlockInfo] for the given label. + async fn l2_block_ref_by_label( + &self, + label: BlockNumberOrTag, + ) -> Result; +} diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index f7b5e31..4833ca0 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -14,7 +14,6 @@ rust-version.workspace = true [dependencies] # Local -hilo-engine.workspace = true hilo-driver.workspace = true # Alloy @@ -26,9 +25,7 @@ alloy-rpc-types-engine = { workspace = true, features = ["jwt", "serde"] } op-alloy-genesis = { workspace = true, features = ["serde"] } # Misc -tokio.workspace = true serde.workspace = true -ctrlc.workspace = true tracing.workspace = true thiserror.workspace = true url = { workspace = true, features = ["serde"] } diff --git a/crates/node/src/config.rs b/crates/node/src/config.rs index d61a832..3a411e3 100644 --- a/crates/node/src/config.rs +++ b/crates/node/src/config.rs @@ -2,7 +2,6 @@ use crate::SyncMode; use alloy_rpc_types_engine::JwtSecret; -use hilo_engine::HiloExecutorConstructor; use op_alloy_genesis::RollupConfig; use serde::{Deserialize, Serialize}; use url::Url; @@ -55,13 +54,6 @@ pub struct Config { pub cache_size: usize, } -impl Config { - /// Constructs a new [HiloExecutorConstructor] from the config. - pub fn executor(&self) -> HiloExecutorConstructor { - HiloExecutorConstructor::new_http(self.l2_engine_url.clone(), self.jwt_secret) - } -} - impl From for hilo_driver::Config { fn from(config: Config) -> Self { hilo_driver::Config { @@ -74,6 +66,7 @@ impl From for hilo_driver::Config { rollup_config: config.rollup_config, rpc_url: config.rpc_url, cache_size: config.cache_size, + jwt_secret: config.jwt_secret, } } } diff --git a/crates/node/src/node.rs b/crates/node/src/node.rs index 947754e..24ac22d 100644 --- a/crates/node/src/node.rs +++ b/crates/node/src/node.rs @@ -2,7 +2,6 @@ use crate::{Config, NodeError, SyncMode}; use hilo_driver::HiloDriver; -use tokio::sync::watch::{channel, Receiver}; /// The core node runner. #[derive(Debug)] @@ -13,20 +12,11 @@ pub struct Node { sync_mode: SyncMode, /// The L2 block hash to begin syncing from checkpoint_hash: Option, - /// Receiver to listen for SIGINT signals - shutdown_recv: Receiver, } impl From for Node { fn from(config: Config) -> Self { - let (shutdown_sender, shutdown_recv) = channel(false); - ctrlc::set_handler(move || { - tracing::info!("shutting down"); - shutdown_sender.send(true).expect("could not send shutdown signal"); - }) - .expect("could not register shutdown handler"); - - Self { config, sync_mode: SyncMode::Full, checkpoint_hash: None, shutdown_recv } + Self { config, sync_mode: SyncMode::Full, checkpoint_hash: None } } } @@ -89,20 +79,8 @@ impl Node { /// Creates and starts the [HiloDriver] which handles the derivation sync process. async fn start_driver(&self) -> Result<(), NodeError> { let cfg = self.config.clone().into(); - let exec = self.config.executor(); - let mut driver = HiloDriver::standalone(cfg, exec).await?; + let mut driver = HiloDriver::standalone(cfg).await?; driver.start().await?; Ok(()) } - - /// Exits if a SIGINT signal is received - #[allow(unused)] - fn check_shutdown(&self) -> Result<(), NodeError> { - if *self.shutdown_recv.borrow() { - tracing::warn!("shutting down"); - std::process::exit(0); - } - - Ok(()) - } } diff --git a/crates/providers-alloy/src/beacon_client.rs b/crates/providers-alloy/src/beacon_client.rs index 902e025..719e47c 100644 --- a/crates/providers-alloy/src/beacon_client.rs +++ b/crates/providers-alloy/src/beacon_client.rs @@ -1,8 +1,8 @@ //! Contains an online implementation of the `BeaconClient` trait. +use alloy_eips::eip4844::IndexedBlobHash; use alloy_rpc_types_beacon::sidecar::{BeaconBlobBundle, BlobData}; use async_trait::async_trait; -use kona_derive::sources::IndexedBlobHash; use reqwest::Client; use std::{ boxed::Box, @@ -99,7 +99,11 @@ pub struct OnlineBeaconClient { impl OnlineBeaconClient { /// Creates a new [OnlineBeaconClient] from the provided [reqwest::Url]. - pub fn new_http(base: String) -> Self { + pub fn new_http(mut base: String) -> Self { + // If base ends with a slash, remove it + if base.ends_with("/") { + base.remove(base.len() - 1); + } Self { base, inner: Client::new() } } } @@ -134,7 +138,7 @@ impl BeaconClient for OnlineBeaconClient { let mut sidecars = Vec::with_capacity(hashes.len()); hashes.iter().for_each(|hash| { if let Some(sidecar) = - raw_response.data.iter().find(|sidecar| sidecar.index == hash.index as u64) + raw_response.data.iter().find(|sidecar| sidecar.index == hash.index) { sidecars.push(sidecar.clone()); } diff --git a/crates/providers-alloy/src/blob_provider.rs b/crates/providers-alloy/src/blob_provider.rs index e0515f2..3519d45 100644 --- a/crates/providers-alloy/src/blob_provider.rs +++ b/crates/providers-alloy/src/blob_provider.rs @@ -2,11 +2,11 @@ use std::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc, vec::Vec}; -use alloy_eips::eip4844::{Blob, BlobTransactionSidecar}; +use alloy_eips::eip4844::{Blob, BlobTransactionSidecar, IndexedBlobHash}; use alloy_primitives::{map::HashMap, B256}; use async_trait::async_trait; use eyre::{eyre, Result}; -use kona_derive::{errors::BlobProviderError, sources::IndexedBlobHash, traits::BlobProvider}; +use kona_derive::{errors::BlobProviderError, traits::BlobProvider}; use op_alloy_protocol::BlockInfo; use parking_lot::Mutex; use tracing::warn; diff --git a/crates/providers-alloy/src/blobs.rs b/crates/providers-alloy/src/blobs.rs index 26df39c..67c6adc 100644 --- a/crates/providers-alloy/src/blobs.rs +++ b/crates/providers-alloy/src/blobs.rs @@ -1,9 +1,9 @@ //! Contains an online implementation of the `BlobProvider` trait. -use alloy_eips::eip4844::{Blob, BlobTransactionSidecarItem}; +use alloy_eips::eip4844::{Blob, BlobTransactionSidecarItem, IndexedBlobHash}; use alloy_rpc_types_beacon::sidecar::BlobData; use async_trait::async_trait; -use kona_derive::{errors::BlobProviderError, sources::IndexedBlobHash, traits::BlobProvider}; +use kona_derive::{errors::BlobProviderError, traits::BlobProvider}; use op_alloy_protocol::BlockInfo; use std::{ boxed::Box, @@ -109,10 +109,10 @@ impl OnlineBlobProvider { let sidecars = self.fetch_sidecars(slot, blob_hashes).await?; // Filter blob sidecars that match the indicies in the specified list. - let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::>(); + let blob_hash_indicies = blob_hashes.iter().map(|b| b.index).collect::>(); let filtered = sidecars .into_iter() - .filter(|s| blob_hash_indicies.contains(&(s.index as usize))) + .filter(|s| blob_hash_indicies.contains(&s.index)) .collect::>(); // Validate the correct number of blob sidecars were retrieved. @@ -165,10 +165,7 @@ where let hash = blob_hashes .get(i) .ok_or(BlobProviderError::Backend("Missing blob hash".to_string()))?; - match sidecar.verify_blob(&alloy_eips::eip4844::IndexedBlobHash { - hash: hash.hash, - index: hash.index as u64, - }) { + match sidecar.verify_blob(&IndexedBlobHash { hash: hash.hash, index: hash.index }) { Ok(_) => Ok(sidecar.blob), Err(e) => Err(BlobProviderError::Backend(e.to_string())), } @@ -262,7 +259,7 @@ impl OnlineBlobProviderWithFallback>(); let filtered = sidecars .into_iter() - .filter(|s| blob_hash_indicies.contains(&(s.index as usize))) + .filter(|s| blob_hash_indicies.contains(&s.index)) .collect::>(); // Validate the correct number of blob sidecars were retrieved. @@ -324,10 +321,9 @@ where let hash = blob_hashes.get(i).ok_or(BlobProviderError::Backend( "fallback: failed to get blob hash".to_string(), ))?; - match sidecar.verify_blob(&alloy_eips::eip4844::IndexedBlobHash { - hash: hash.hash, - index: hash.index as u64, - }) { + match sidecar + .verify_blob(&IndexedBlobHash { hash: hash.hash, index: hash.index }) + { Ok(_) => Ok(sidecar.blob), Err(e) => Err(BlobProviderError::Backend(e.to_string())), }