diff --git a/Cargo.lock b/Cargo.lock index 867a8cdc6afb..ceb8c0c45217 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8248,6 +8248,7 @@ dependencies = [ "reth-tasks", "reth-testing-utils", "reth-tokio-util", + "reth-transaction-pool", "serde", "thiserror", "tokio", @@ -8358,6 +8359,7 @@ dependencies = [ name = "reth-rpc-types" version = "1.0.1" dependencies = [ + "alloy-eips", "alloy-primitives", "alloy-rpc-types", "alloy-rpc-types-admin", @@ -8619,6 +8621,7 @@ dependencies = [ "reth-metrics", "reth-primitives", "reth-provider", + "reth-rpc-types", "reth-tasks", "reth-tracing", "revm", diff --git a/crates/node/builder/src/launch/mod.rs b/crates/node/builder/src/launch/mod.rs index 9a9447aa7ba9..0dbb47af75e3 100644 --- a/crates/node/builder/src/launch/mod.rs +++ b/crates/node/builder/src/launch/mod.rs @@ -295,6 +295,7 @@ where ctx.chain_spec(), beacon_engine_handle, ctx.components().payload_builder().clone().into(), + ctx.components().pool().clone(), Box::new(ctx.task_executor().clone()), client, EngineCapabilities::default(), diff --git a/crates/rpc/rpc-api/src/engine.rs b/crates/rpc/rpc-api/src/engine.rs index 986dd76b14ab..5ab6f8d1fef4 100644 --- a/crates/rpc/rpc-api/src/engine.rs +++ b/crates/rpc/rpc-api/src/engine.rs @@ -13,7 +13,7 @@ use reth_rpc_types::{ ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, TransitionConfiguration, }, state::StateOverride, - BlockOverrides, Filter, Log, RichBlock, SyncStatus, TransactionRequest, + BlobAndProofV1, BlockOverrides, Filter, Log, RichBlock, SyncStatus, TransactionRequest, }; // NOTE: We can't use associated types in the `EngineApi` trait because of jsonrpsee, so we use a // generic here. It would be nice if the rpc macro would understand which types need to have serde. @@ -211,6 +211,13 @@ pub trait EngineApi { /// See also #[method(name = "exchangeCapabilities")] async fn exchange_capabilities(&self, capabilities: Vec) -> RpcResult>; + + /// Fetch blobs for the consensus layer from the in-memory blob cache. + #[method(name = "getBlobsV1")] + async fn get_blobs_v1( + &self, + transaction_ids: Vec, + ) -> RpcResult>>; } /// A subset of the ETH rpc interface: diff --git a/crates/rpc/rpc-engine-api/Cargo.toml b/crates/rpc/rpc-engine-api/Cargo.toml index d067515f6c2a..8415c594b724 100644 --- a/crates/rpc/rpc-engine-api/Cargo.toml +++ b/crates/rpc/rpc-engine-api/Cargo.toml @@ -25,6 +25,7 @@ reth-tasks.workspace = true reth-rpc-types-compat.workspace = true reth-engine-primitives.workspace = true reth-evm.workspace = true +reth-transaction-pool.workspace = true # async tokio = { workspace = true, features = ["sync"] } diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 4ec832053d20..dbdf91bc9ac6 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -16,17 +16,21 @@ use reth_primitives::{ Block, BlockHash, BlockHashOrNumber, BlockNumber, EthereumHardfork, B256, U64, }; use reth_rpc_api::EngineApiServer; -use reth_rpc_types::engine::{ - CancunPayloadFields, ClientVersionV1, ExecutionPayload, ExecutionPayloadBodiesV1, - ExecutionPayloadBodiesV2, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3, - ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, - TransitionConfiguration, +use reth_rpc_types::{ + engine::{ + CancunPayloadFields, ClientVersionV1, ExecutionPayload, ExecutionPayloadBodiesV1, + ExecutionPayloadBodiesV2, ExecutionPayloadInputV2, ExecutionPayloadV1, ExecutionPayloadV3, + ExecutionPayloadV4, ForkchoiceState, ForkchoiceUpdated, PayloadId, PayloadStatus, + TransitionConfiguration, + }, + BlobAndProofV1, }; use reth_rpc_types_compat::engine::payload::{ convert_payload_input_v2_to_payload, convert_to_payload_body_v1, convert_to_payload_body_v2, }; use reth_storage_api::{BlockReader, HeaderProvider, StateProviderFactory}; use reth_tasks::TaskSpawner; +use reth_transaction_pool::TransactionPool; use std::{sync::Arc, time::Instant}; use tokio::sync::oneshot; use tracing::{trace, warn}; @@ -39,11 +43,11 @@ const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024; /// The Engine API implementation that grants the Consensus layer access to data and /// functions in the Execution layer that are crucial for the consensus process. -pub struct EngineApi { - inner: Arc>, +pub struct EngineApi { + inner: Arc>, } -struct EngineApiInner { +struct EngineApiInner { /// The provider to interact with the chain. provider: Provider, /// Consensus configuration @@ -60,12 +64,15 @@ struct EngineApiInner { client: ClientVersionV1, /// The list of all supported Engine capabilities available over the engine endpoint. capabilities: EngineCapabilities, + /// Transaction pool. + tx_pool: Pool, } -impl EngineApi +impl EngineApi where Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static, EngineT: EngineTypes + 'static, + Pool: TransactionPool + 'static, { /// Create new instance of [`EngineApi`]. pub fn new( @@ -73,6 +80,7 @@ where chain_spec: Arc, beacon_consensus: BeaconConsensusEngineHandle, payload_store: PayloadStore, + tx_pool: Pool, task_spawner: Box, client: ClientVersionV1, capabilities: EngineCapabilities, @@ -86,6 +94,7 @@ where metrics: EngineApiMetrics::default(), client, capabilities, + tx_pool, }); Self { inner } } @@ -609,10 +618,11 @@ where } #[async_trait] -impl EngineApiServer for EngineApi +impl EngineApiServer for EngineApi where Provider: HeaderProvider + BlockReader + StateProviderFactory + EvmEnvProvider + 'static, EngineT: EngineTypes + 'static, + Pool: TransactionPool + 'static, { /// Handler for `engine_newPayloadV1` /// See also @@ -904,9 +914,21 @@ where async fn exchange_capabilities(&self, _capabilities: Vec) -> RpcResult> { Ok(self.inner.capabilities.list()) } + + async fn get_blobs_v1( + &self, + versioned_hashes: Vec, + ) -> RpcResult>> { + // FIXME(sproul): work out error wrapping or make infallible + Ok(self + .inner + .tx_pool + .get_blobs_for_versioned_hashes(&versioned_hashes) + .expect("get_blobs_for_versioned_hashes is infallible")) + } } -impl std::fmt::Debug for EngineApi +impl std::fmt::Debug for EngineApi where EngineT: EngineTypes, { diff --git a/crates/rpc/rpc-types/Cargo.toml b/crates/rpc/rpc-types/Cargo.toml index fa5c8b79c9ae..eb7dc36c27a9 100644 --- a/crates/rpc/rpc-types/Cargo.toml +++ b/crates/rpc/rpc-types/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] # ethereum +alloy-eips = { workspace = true } alloy-primitives = { workspace = true, features = ["rand", "rlp", "serde"] } alloy-rpc-types = { workspace = true, features = ["jsonrpsee-types"] } alloy-rpc-types-admin.workspace = true @@ -42,4 +43,4 @@ serde_json.workspace = true [features] default = ["jsonrpsee-types"] -arbitrary = ["alloy-primitives/arbitrary", "alloy-rpc-types/arbitrary"] \ No newline at end of file +arbitrary = ["alloy-primitives/arbitrary", "alloy-rpc-types/arbitrary"] diff --git a/crates/rpc/rpc-types/src/lib.rs b/crates/rpc/rpc-types/src/lib.rs index 7f578ab29400..a4c9d68bf74d 100644 --- a/crates/rpc/rpc-types/src/lib.rs +++ b/crates/rpc/rpc-types/src/lib.rs @@ -55,3 +55,17 @@ pub use eth::{ pub use peer::*; pub use rpc::*; + +use alloy_eips::eip4844::BYTES_PER_BLOB; +use alloy_primitives::FixedBytes; +use serde::{Deserialize, Serialize}; + +/// Blob type returned in responses to `engine_getBlobsV1`. +// FIXME(sproul): move to alloy? +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct BlobAndProofV1 { + /// The blob data. + pub blob: FixedBytes, + /// The KZG proof for the blob. + pub proof: FixedBytes<48>, +} diff --git a/crates/transaction-pool/Cargo.toml b/crates/transaction-pool/Cargo.toml index 77edd6f3e541..e758c0b2ac0b 100644 --- a/crates/transaction-pool/Cargo.toml +++ b/crates/transaction-pool/Cargo.toml @@ -17,6 +17,7 @@ reth-chainspec.workspace = true reth-eth-wire-types.workspace = true reth-primitives.workspace = true reth-execution-types.workspace = true +reth-rpc-types.workspace = true reth-fs-util.workspace = true reth-provider.workspace = true reth-tasks.workspace = true diff --git a/crates/transaction-pool/src/blobstore/disk.rs b/crates/transaction-pool/src/blobstore/disk.rs index 5799297cc53a..3b5060f73a0a 100644 --- a/crates/transaction-pool/src/blobstore/disk.rs +++ b/crates/transaction-pool/src/blobstore/disk.rs @@ -4,6 +4,7 @@ use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobStor use alloy_rlp::{Decodable, Encodable}; use parking_lot::{Mutex, RwLock}; use reth_primitives::{BlobTransactionSidecar, TxHash, B256}; +use reth_rpc_types::BlobAndProofV1; use schnellru::{ByLength, LruMap}; use std::{collections::HashSet, fmt, fs, io, path::PathBuf, sync::Arc}; use tracing::{debug, trace}; @@ -124,6 +125,32 @@ impl BlobStore for DiskFileBlobStore { self.inner.get_exact(txs) } + fn get_by_versioned_hashes( + &self, + versioned_hashes: &[B256], + ) -> Result>, BlobStoreError> { + let mut result = vec![None; versioned_hashes.len()]; + for (_tx_hash, blob_sidecar) in self.inner.blob_cache.lock().iter() { + // FIXME(sproul): these versioned hashes could be cached + for (i, blob_versioned_hash) in blob_sidecar.versioned_hashes().enumerate() { + for (j, target_versioned_hash) in versioned_hashes.iter().enumerate() { + if blob_versioned_hash == *target_versioned_hash { + result[j].get_or_insert_with(|| BlobAndProofV1 { + blob: blob_sidecar.blobs[i].clone(), + proof: blob_sidecar.proofs[i].clone(), + }); + } + } + } + + // Return early if all blobs are found. + if result.iter().all(|blob| blob.is_some()) { + break; + } + } + Ok(result) + } + fn data_size_hint(&self) -> Option { Some(self.inner.size_tracker.data_size()) } diff --git a/crates/transaction-pool/src/blobstore/mem.rs b/crates/transaction-pool/src/blobstore/mem.rs index 326cd987a9d3..7951285d5e59 100644 --- a/crates/transaction-pool/src/blobstore/mem.rs +++ b/crates/transaction-pool/src/blobstore/mem.rs @@ -3,6 +3,7 @@ use crate::blobstore::{ }; use parking_lot::RwLock; use reth_primitives::B256; +use reth_rpc_types::BlobAndProofV1; use std::{collections::HashMap, sync::Arc}; /// An in-memory blob store. @@ -113,6 +114,32 @@ impl BlobStore for InMemoryBlobStore { Ok(items) } + fn get_by_versioned_hashes( + &self, + versioned_hashes: &[B256], + ) -> Result>, BlobStoreError> { + let mut result = vec![None; versioned_hashes.len()]; + for (_tx_hash, blob_sidecar) in self.inner.store.read().iter() { + // FIXME(sproul): these versioned hashes could be cached + for (i, blob_versioned_hash) in blob_sidecar.versioned_hashes().enumerate() { + for (j, target_versioned_hash) in versioned_hashes.iter().enumerate() { + if blob_versioned_hash == *target_versioned_hash { + result[j].get_or_insert_with(|| BlobAndProofV1 { + blob: blob_sidecar.blobs[i].clone(), + proof: blob_sidecar.proofs[i].clone(), + }); + } + } + } + + // Return early if all blobs are found. + if result.iter().all(|blob| blob.is_some()) { + break; + } + } + Ok(result) + } + fn data_size_hint(&self) -> Option { Some(self.inner.size_tracker.data_size()) } diff --git a/crates/transaction-pool/src/blobstore/mod.rs b/crates/transaction-pool/src/blobstore/mod.rs index bba4b85336d6..f8ec77f5dd23 100644 --- a/crates/transaction-pool/src/blobstore/mod.rs +++ b/crates/transaction-pool/src/blobstore/mod.rs @@ -4,6 +4,7 @@ pub use disk::{DiskFileBlobStore, DiskFileBlobStoreConfig, OpenDiskFileBlobStore pub use mem::InMemoryBlobStore; pub use noop::NoopBlobStore; use reth_primitives::{BlobTransactionSidecar, B256}; +use reth_rpc_types::BlobAndProofV1; use std::{ fmt, sync::atomic::{AtomicUsize, Ordering}, @@ -64,6 +65,12 @@ pub trait BlobStore: fmt::Debug + Send + Sync + 'static { /// Returns an error if any of the blobs are not found in the blob store. fn get_exact(&self, txs: Vec) -> Result, BlobStoreError>; + /// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes. + fn get_by_versioned_hashes( + &self, + versioned_hashes: &[B256], + ) -> Result>, BlobStoreError>; + /// Data size of all transactions in the blob store. fn data_size_hint(&self) -> Option; diff --git a/crates/transaction-pool/src/blobstore/noop.rs b/crates/transaction-pool/src/blobstore/noop.rs index ef9773dabafc..8961999687ba 100644 --- a/crates/transaction-pool/src/blobstore/noop.rs +++ b/crates/transaction-pool/src/blobstore/noop.rs @@ -1,5 +1,6 @@ use crate::blobstore::{BlobStore, BlobStoreCleanupStat, BlobStoreError, BlobTransactionSidecar}; use reth_primitives::B256; +use reth_rpc_types::BlobAndProofV1; /// A blobstore implementation that does nothing #[derive(Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Default)] @@ -49,6 +50,13 @@ impl BlobStore for NoopBlobStore { Err(BlobStoreError::MissingSidecar(txs[0])) } + fn get_by_versioned_hashes( + &self, + versioned_hashes: &[B256], + ) -> Result>, BlobStoreError> { + Ok(vec![None; versioned_hashes.len()]) + } + fn data_size_hint(&self) -> Option { Some(0) } diff --git a/crates/transaction-pool/src/lib.rs b/crates/transaction-pool/src/lib.rs index 2484f6784f4c..ab5beb12c73e 100644 --- a/crates/transaction-pool/src/lib.rs +++ b/crates/transaction-pool/src/lib.rs @@ -152,8 +152,11 @@ use crate::{identifier::TransactionId, pool::PoolInner}; use aquamarine as _; use reth_eth_wire_types::HandleMempoolData; -use reth_primitives::{Address, BlobTransactionSidecar, PooledTransactionsElement, TxHash, U256}; +use reth_primitives::{ + Address, BlobTransactionSidecar, PooledTransactionsElement, TxHash, B256, U256, +}; use reth_provider::StateProviderFactory; +use reth_rpc_types::BlobAndProofV1; use std::{collections::HashSet, sync::Arc}; use tokio::sync::mpsc::Receiver; use tracing::{instrument, trace}; @@ -521,6 +524,13 @@ where ) -> Result, BlobStoreError> { self.pool.blob_store().get_exact(tx_hashes) } + + fn get_blobs_for_versioned_hashes( + &self, + versioned_hashes: &[B256], + ) -> Result>, BlobStoreError> { + self.pool.blob_store().get_by_versioned_hashes(versioned_hashes) + } } impl TransactionPoolExt for Pool diff --git a/crates/transaction-pool/src/noop.rs b/crates/transaction-pool/src/noop.rs index 8c3a52f4c348..6ea8c85dd87c 100644 --- a/crates/transaction-pool/src/noop.rs +++ b/crates/transaction-pool/src/noop.rs @@ -17,7 +17,8 @@ use crate::{ TransactionPool, TransactionValidationOutcome, TransactionValidator, ValidPoolTransaction, }; use reth_eth_wire_types::HandleMempoolData; -use reth_primitives::{Address, BlobTransactionSidecar, TxHash, U256}; +use reth_primitives::{Address, BlobTransactionSidecar, TxHash, B256, U256}; +use reth_rpc_types::BlobAndProofV1; use std::{collections::HashSet, marker::PhantomData, sync::Arc}; use tokio::sync::{mpsc, mpsc::Receiver}; @@ -242,6 +243,13 @@ impl TransactionPool for NoopTransactionPool { } Err(BlobStoreError::MissingSidecar(tx_hashes[0])) } + + fn get_blobs_for_versioned_hashes( + &self, + versioned_hashes: &[B256], + ) -> Result>, BlobStoreError> { + Ok(vec![None; versioned_hashes.len()]) + } } /// A [`TransactionValidator`] that does nothing. diff --git a/crates/transaction-pool/src/traits.rs b/crates/transaction-pool/src/traits.rs index b53c31358b19..0e392d12bee5 100644 --- a/crates/transaction-pool/src/traits.rs +++ b/crates/transaction-pool/src/traits.rs @@ -16,6 +16,7 @@ use reth_primitives::{ SealedBlock, Transaction, TransactionSignedEcRecovered, TryFromRecoveredTransaction, TxHash, TxKind, B256, EIP1559_TX_TYPE_ID, EIP4844_TX_TYPE_ID, EIP7702_TX_TYPE_ID, U256, }; +use reth_rpc_types::BlobAndProofV1; #[cfg(feature = "serde")] use serde::{Deserialize, Serialize}; use std::{ @@ -385,6 +386,12 @@ pub trait TransactionPool: Send + Sync + Clone { &self, tx_hashes: Vec, ) -> Result, BlobStoreError>; + + /// Return the [`BlobTransactionSidecar`]s for a list of blob versioned hashes. + fn get_blobs_for_versioned_hashes( + &self, + versioned_hashes: &[B256], + ) -> Result>, BlobStoreError>; } /// Extension for [TransactionPool] trait that allows to set the current block info.