Skip to content

Commit

Permalink
refactor: don't reference StaticFileProvider in `static_file::Segme…
Browse files Browse the repository at this point in the history
  • Loading branch information
klkvr authored Nov 14, 2024
1 parent d8af28b commit d028c1c
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 29 deletions.
10 changes: 8 additions & 2 deletions crates/consensus/beacon/src/engine/hooks/static_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ impl<Provider> StaticFileHook<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<
Provider: StageCheckpointReader + BlockReader + ChainStateBlockReader,
Provider: StaticFileProviderFactory
+ StageCheckpointReader
+ BlockReader
+ ChainStateBlockReader,
> + 'static,
{
/// Create a new instance
Expand Down Expand Up @@ -145,7 +148,10 @@ impl<Provider> EngineHook for StaticFileHook<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<
Provider: StageCheckpointReader + BlockReader + ChainStateBlockReader,
Provider: StaticFileProviderFactory
+ StageCheckpointReader
+ BlockReader
+ ChainStateBlockReader,
> + 'static,
{
fn name(&self) -> &'static str {
Expand Down
14 changes: 7 additions & 7 deletions crates/stages/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ use reth_network_p2p::headers::{downloader::HeaderDownloader, error::HeadersDown
use reth_primitives::{SealedHeader, StaticFileSegment};
use reth_primitives_traits::serde_bincode_compat;
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockHashReader, DBProvider, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
StaticFileProviderFactory,
providers::StaticFileWriter, BlockHashReader, DBProvider, HeaderProvider, HeaderSyncGap,
HeaderSyncGapProvider, StaticFileProviderFactory,
};
use reth_stages_api::{
BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput,
Expand Down Expand Up @@ -90,15 +89,16 @@ where
///
/// Writes to static files ( `Header | HeaderTD | HeaderHash` ) and [`tables::HeaderNumbers`]
/// database table.
fn write_headers(
fn write_headers<P: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory>(
&mut self,
provider: &impl DBProvider<Tx: DbTxMut>,
static_file_provider: StaticFileProvider,
provider: &P,
) -> Result<BlockNumber, StageError> {
let total_headers = self.header_collector.len();

info!(target: "sync::stages::headers", total = total_headers, "Writing headers");

let static_file_provider = provider.static_file_provider();

// Consistency check of expected headers in static files vs DB is done on provider::sync_gap
// when poll_execute_ready is polled.
let mut last_header_number = static_file_provider
Expand Down Expand Up @@ -293,7 +293,7 @@ where

// Write the headers and related tables to DB from ETL space
let to_be_processed = self.hash_collector.len() as u64;
let last_header_number = self.write_headers(provider, provider.static_file_provider())?;
let last_header_number = self.write_headers(provider)?;

// Clear ETL collectors
self.hash_collector.clear();
Expand Down
9 changes: 3 additions & 6 deletions crates/static-file/static-file/src/segments/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@ use crate::segments::Segment;
use alloy_primitives::BlockNumber;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
DBProvider,
};
use reth_provider::{providers::StaticFileWriter, DBProvider, StaticFileProviderFactory};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::ProviderResult;
use std::ops::RangeInclusive;
Expand All @@ -14,17 +11,17 @@ use std::ops::RangeInclusive;
#[derive(Debug, Default)]
pub struct Headers;

impl<Provider: DBProvider> Segment<Provider> for Headers {
impl<Provider: StaticFileProviderFactory + DBProvider> Segment<Provider> for Headers {
fn segment(&self) -> StaticFileSegment {
StaticFileSegment::Headers
}

fn copy_to_static_files(
&self,
provider: Provider,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let static_file_provider = provider.static_file_provider();
let mut static_file_writer =
static_file_provider.get_writer(*block_range.start(), StaticFileSegment::Headers)?;

Expand Down
8 changes: 4 additions & 4 deletions crates/static-file/static-file/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@ mod receipts;
pub use receipts::Receipts;

use alloy_primitives::BlockNumber;
use reth_provider::providers::StaticFileProvider;
use reth_provider::StaticFileProviderFactory;
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::ProviderResult;
use std::ops::RangeInclusive;

/// A segment represents moving some portion of the data to static files.
pub trait Segment<Provider>: Send + Sync {
pub trait Segment<Provider: StaticFileProviderFactory>: Send + Sync {
/// Returns the [`StaticFileSegment`].
fn segment(&self) -> StaticFileSegment;

/// Move data to static files for the provided block range. [`StaticFileProvider`] will handle
/// Move data to static files for the provided block range.
/// [`StaticFileProvider`](reth_provider::providers::StaticFileProvider) will handle
/// the management of and writing to files.
fn copy_to_static_files(
&self,
provider: Provider,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;
}
9 changes: 5 additions & 4 deletions crates/static-file/static-file/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use alloy_primitives::BlockNumber;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DBProvider,
providers::StaticFileWriter, BlockReader, DBProvider, StaticFileProviderFactory,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
Expand All @@ -14,17 +13,19 @@ use std::ops::RangeInclusive;
#[derive(Debug, Default)]
pub struct Receipts;

impl<Provider: DBProvider + BlockReader> Segment<Provider> for Receipts {
impl<Provider: StaticFileProviderFactory + DBProvider + BlockReader> Segment<Provider>
for Receipts
{
fn segment(&self) -> StaticFileSegment {
StaticFileSegment::Receipts
}

fn copy_to_static_files(
&self,
provider: Provider,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let static_file_provider = provider.static_file_provider();
let mut static_file_writer =
static_file_provider.get_writer(*block_range.start(), StaticFileSegment::Receipts)?;

Expand Down
9 changes: 5 additions & 4 deletions crates/static-file/static-file/src/segments/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use alloy_primitives::BlockNumber;
use reth_db::tables;
use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DBProvider,
providers::StaticFileWriter, BlockReader, DBProvider, StaticFileProviderFactory,
};
use reth_static_file_types::StaticFileSegment;
use reth_storage_errors::provider::{ProviderError, ProviderResult};
Expand All @@ -14,7 +13,9 @@ use std::ops::RangeInclusive;
#[derive(Debug, Default)]
pub struct Transactions;

impl<Provider: DBProvider + BlockReader> Segment<Provider> for Transactions {
impl<Provider: StaticFileProviderFactory + DBProvider + BlockReader> Segment<Provider>
for Transactions
{
fn segment(&self) -> StaticFileSegment {
StaticFileSegment::Transactions
}
Expand All @@ -24,9 +25,9 @@ impl<Provider: DBProvider + BlockReader> Segment<Provider> for Transactions {
fn copy_to_static_files(
&self,
provider: Provider,
static_file_provider: StaticFileProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let static_file_provider = provider.static_file_provider();
let mut static_file_writer = static_file_provider
.get_writer(*block_range.start(), StaticFileSegment::Transactions)?;

Expand Down
6 changes: 4 additions & 2 deletions crates/static-file/static-file/src/static_file_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ where
impl<Provider> StaticFileProducerInner<Provider>
where
Provider: StaticFileProviderFactory
+ DatabaseProviderFactory<Provider: StageCheckpointReader + BlockReader>,
+ DatabaseProviderFactory<
Provider: StaticFileProviderFactory + StageCheckpointReader + BlockReader,
>,
{
/// Listen for events on the `static_file_producer`.
pub fn events(&self) -> EventStream<StaticFileProducerEvent> {
Expand Down Expand Up @@ -136,7 +138,7 @@ where
// Create a new database transaction on every segment to prevent long-lived read-only
// transactions
let provider = self.provider.database_provider_ro()?.disable_long_read_transaction_safety();
segment.copy_to_static_files(provider, self.provider.static_file_provider(), block_range.clone())?;
segment.copy_to_static_files(provider, block_range.clone())?;

let elapsed = start.elapsed(); // TODO(alexey): track in metrics
debug!(target: "static_file", segment = %segment.segment(), ?block_range, ?elapsed, "Finished StaticFileProducer segment");
Expand Down

0 comments on commit d028c1c

Please sign in to comment.