Skip to content

Commit

Permalink
stagedsync: do not share global node_settings with stages
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr committed Mar 7, 2024
1 parent d22bb57 commit 73487b1
Show file tree
Hide file tree
Showing 28 changed files with 247 additions and 137 deletions.
47 changes: 27 additions & 20 deletions silkworm/node/stagedsync/execution_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ class ExecutionPipeline::LogTimer : public Timer {
ExecutionPipeline* pipeline_;

public:
LogTimer(ExecutionPipeline* pipeline)
LogTimer(const boost::asio::any_io_executor& executor, ExecutionPipeline* pipeline, uint32_t interval_seconds)
: Timer{
pipeline->node_settings_->asio_context.get_executor(),
pipeline->node_settings_->sync_loop_log_interval_seconds * 1'000,
executor,
interval_seconds * 1'000,
[this] { return expired(); },
/*.auto_start=*/true},
pipeline_{pipeline} {}
Expand All @@ -64,6 +64,13 @@ class ExecutionPipeline::LogTimer : public Timer {
}
};

std::unique_ptr<ExecutionPipeline::LogTimer> ExecutionPipeline::make_log_timer() {
return std::make_unique<LogTimer>(
this->node_settings_->asio_context.get_executor(),
this,
this->node_settings_->sync_loop_log_interval_seconds);
}

ExecutionPipeline::ExecutionPipeline(silkworm::NodeSettings* node_settings)
: node_settings_{node_settings},
sync_context_{std::make_unique<SyncContext>()} {
Expand Down Expand Up @@ -107,27 +114,27 @@ std::optional<Hash> ExecutionPipeline::bad_block() {

void ExecutionPipeline::load_stages() {
stages_.emplace(db::stages::kHeadersKey,
std::make_unique<stagedsync::HeadersStage>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::HeadersStage>(sync_context_.get()));
stages_.emplace(db::stages::kBlockBodiesKey,
std::make_unique<stagedsync::BodiesStage>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::BodiesStage>(sync_context_.get(), *node_settings_->chain_config));
stages_.emplace(db::stages::kBlockHashesKey,
std::make_unique<stagedsync::BlockHashes>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::BlockHashes>(sync_context_.get(), node_settings_->etl()));
stages_.emplace(db::stages::kSendersKey,
std::make_unique<stagedsync::Senders>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::Senders>(sync_context_.get(), *node_settings_->chain_config, node_settings_->batch_size, node_settings_->etl(), node_settings_->prune_mode.senders()));
stages_.emplace(db::stages::kExecutionKey,
std::make_unique<stagedsync::Execution>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::Execution>(sync_context_.get(), *node_settings_->chain_config, node_settings_->batch_size, node_settings_->prune_mode));
stages_.emplace(db::stages::kHashStateKey,
std::make_unique<stagedsync::HashState>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::HashState>(sync_context_.get(), node_settings_->etl()));
stages_.emplace(db::stages::kIntermediateHashesKey,
std::make_unique<stagedsync::InterHashes>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::InterHashes>(sync_context_.get(), node_settings_->etl()));
stages_.emplace(db::stages::kHistoryIndexKey,
std::make_unique<stagedsync::HistoryIndex>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::HistoryIndex>(sync_context_.get(), node_settings_->batch_size, node_settings_->etl(), node_settings_->prune_mode.history()));
stages_.emplace(db::stages::kLogIndexKey,
std::make_unique<stagedsync::LogIndex>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::LogIndex>(sync_context_.get(), node_settings_->batch_size, node_settings_->etl(), node_settings_->prune_mode.history()));
stages_.emplace(db::stages::kTxLookupKey,
std::make_unique<stagedsync::TxLookup>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::TxLookup>(sync_context_.get(), node_settings_->etl(), node_settings_->prune_mode.tx_index()));
stages_.emplace(db::stages::kFinishKey,
std::make_unique<stagedsync::Finish>(node_settings_, sync_context_.get()));
std::make_unique<stagedsync::Finish>(sync_context_.get(), node_settings_->build_info));
current_stage_ = stages_.begin();

stages_forward_order_.insert(stages_forward_order_.begin(),
Expand Down Expand Up @@ -174,7 +181,7 @@ bool ExecutionPipeline::stop() {
Stage::Result ExecutionPipeline::forward(db::RWTxn& cycle_txn, BlockNum target_height) {
using std::to_string;
StopWatch stages_stop_watch(true);
LogTimer log_timer{this};
auto log_timer = make_log_timer();

sync_context_->target_height = target_height;
log::Info("ExecPipeline") << "Forward start --------------------------";
Expand Down Expand Up @@ -217,7 +224,7 @@ Stage::Result ExecutionPipeline::forward(db::RWTxn& cycle_txn, BlockNum target_h
break;
}

log_timer.reset(); // Resets the interval for next log line from now
log_timer->reset(); // Resets the interval for next log line from now

// forward
const auto stage_result = current_stage_->second->forward(cycle_txn);
Expand Down Expand Up @@ -266,7 +273,7 @@ Stage::Result ExecutionPipeline::forward(db::RWTxn& cycle_txn, BlockNum target_h
Stage::Result ExecutionPipeline::unwind(db::RWTxn& cycle_txn, BlockNum unwind_point) {
using std::to_string;
StopWatch stages_stop_watch(true);
LogTimer log_timer{this};
auto log_timer = make_log_timer();
log::Info("ExecPipeline") << "Unwind start ---------------------------";

try {
Expand All @@ -282,7 +289,7 @@ Stage::Result ExecutionPipeline::unwind(db::RWTxn& cycle_txn, BlockNum unwind_po
}
++current_stage_number_;
current_stage_->second->set_log_prefix(get_log_prefix());
log_timer.reset(); // Resets the interval for next log line from now
log_timer->reset(); // Resets the interval for next log line from now

// Do unwind on current stage
const auto stage_result = current_stage_->second->unwind(cycle_txn);
Expand Down Expand Up @@ -326,7 +333,7 @@ Stage::Result ExecutionPipeline::unwind(db::RWTxn& cycle_txn, BlockNum unwind_po

Stage::Result ExecutionPipeline::prune(db::RWTxn& cycle_txn) {
StopWatch stages_stop_watch(true);
LogTimer log_timer{this};
auto log_timer = make_log_timer();

try {
current_stages_count_ = stages_forward_order_.size();
Expand All @@ -340,7 +347,7 @@ Stage::Result ExecutionPipeline::prune(db::RWTxn& cycle_txn) {
++current_stage_number_;
current_stage_->second->set_log_prefix(get_log_prefix());

log_timer.reset(); // Resets the interval for next log line from now
log_timer->reset(); // Resets the interval for next log line from now
const auto stage_result{current_stage_->second->prune(cycle_txn)};
if (stage_result != Stage::Result::kSuccess) {
log::Error(get_log_prefix(), {"op", "Prune", "returned",
Expand Down
2 changes: 2 additions & 0 deletions silkworm/node/stagedsync/execution_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <vector>

#include <silkworm/core/types/hash.hpp>
#include <silkworm/node/common/settings.hpp>
#include <silkworm/node/stagedsync/stages/stage.hpp>

namespace silkworm::stagedsync {
Expand Down Expand Up @@ -62,6 +63,7 @@ class ExecutionPipeline : public Stoppable {

std::string get_log_prefix() const; // Returns the current log lines prefix on behalf of current stage
class LogTimer; // Timer for async log scheduling
std::unique_ptr<LogTimer> make_log_timer();
};

} // namespace silkworm::stagedsync
42 changes: 26 additions & 16 deletions silkworm/node/stagedsync/stages/_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <silkworm/core/types/address.hpp>
#include <silkworm/core/types/evmc_bytes32.hpp>
#include <silkworm/infra/test_util/log.hpp>
#include <silkworm/node/common/settings.hpp>
#include <silkworm/node/db/access_layer.hpp>
#include <silkworm/node/db/buffer.hpp>
#include <silkworm/node/db/genesis.hpp>
Expand All @@ -40,6 +41,17 @@ static ethash::hash256 keccak256(const evmc::address& address) {
return silkworm::keccak256(address.bytes);
}

static stagedsync::Execution make_execution_stage(
stagedsync::SyncContext* sync_context,
const NodeSettings& node_settings) {
return stagedsync::Execution{
sync_context,
*node_settings.chain_config,
node_settings.batch_size,
node_settings.prune_mode,
};
}

TEST_CASE("Sync Stages") {
TemporaryDirectory temp_dir{};
NodeSettings node_settings{};
Expand Down Expand Up @@ -76,7 +88,7 @@ TEST_CASE("Sync Stages") {
SECTION("BlockHashes") {
SECTION("Forward/Unwind/Prune args validation") {
stagedsync::SyncContext sync_context{};
stagedsync::BlockHashes stage(&node_settings, &sync_context);
stagedsync::BlockHashes stage(&sync_context, node_settings.etl());

// (previous_progress == headers_progress == 0)
REQUIRE(stage.forward(txn) == stagedsync::Stage::Result::kSuccess);
Expand All @@ -103,7 +115,7 @@ TEST_CASE("Sync Stages") {
REQUIRE_NOTHROW(txn.commit_and_renew());

stagedsync::SyncContext sync_context{};
stagedsync::BlockHashes stage(&node_settings, &sync_context);
stagedsync::BlockHashes stage(&sync_context, node_settings.etl());

// Forward
auto stage_result{stage.forward(txn)};
Expand Down Expand Up @@ -190,7 +202,13 @@ TEST_CASE("Sync Stages") {

// Prepare stage
stagedsync::SyncContext sync_context{};
stagedsync::Senders stage(&node_settings, &sync_context);
stagedsync::Senders stage{
&sync_context,
*node_settings.chain_config,
node_settings.batch_size,
node_settings.etl(),
node_settings.prune_mode.senders(),
};

// Insert a martian stage progress
stage.update_progress(txn, 5);
Expand Down Expand Up @@ -250,15 +268,7 @@ TEST_CASE("Sync Stages") {

// Check prune works
// Override prune mode and issue pruning
node_settings.prune_mode =
db::parse_prune_mode(/*mode=*/"s",
/*olderHistory=*/std::nullopt, /*olderReceipts=*/std::nullopt,
/*olderSenders=*/std::nullopt, /*olderTxIndex=*/std::nullopt,
/*olderCallTraces=*/std::nullopt,
/*beforeHistory=*/std::nullopt, /*beforeReceipts=*/std::nullopt,
/*beforeSenders=*/2, /*beforeTxIndex=*/std::nullopt,
/*beforeCallTraces=*/std::nullopt);

stage.set_prune_mode_senders(db::BlockAmount(db::BlockAmount::Type::kBefore, 2));
stage_result = stage.prune(txn);
REQUIRE(stage_result == stagedsync::Stage::Result::kSuccess);
auto written_senders{db::read_senders(txn, 1, block_hashes[0].bytes)};
Expand Down Expand Up @@ -361,7 +371,7 @@ TEST_CASE("Sync Stages") {
// ---------------------------------------
stagedsync::SyncContext sync_context{};
sync_context.unwind_point.emplace(2);
stagedsync::Execution stage(&node_settings, &sync_context);
stagedsync::Execution stage = make_execution_stage(&sync_context, node_settings);
REQUIRE(stage.unwind(txn) == stagedsync::Stage::Result::kSuccess);

db::Buffer buffer2{txn, 0};
Expand All @@ -386,7 +396,7 @@ TEST_CASE("Sync Stages") {
SECTION("Execution Prune Default") {
log::Info() << "Pruning with " << node_settings.prune_mode.to_string();
stagedsync::SyncContext sync_context{};
stagedsync::Execution stage(&node_settings, &sync_context);
stagedsync::Execution stage = make_execution_stage(&sync_context, node_settings);
REQUIRE(stage.prune(txn) == stagedsync::Stage::Result::kSuccess);

// With default settings nothing should be pruned
Expand Down Expand Up @@ -417,7 +427,7 @@ TEST_CASE("Sync Stages") {
log::Info() << "Pruning with " << node_settings.prune_mode.to_string();
REQUIRE(node_settings.prune_mode.history().enabled());
stagedsync::SyncContext sync_context{};
stagedsync::Execution stage(&node_settings, &sync_context);
stagedsync::Execution stage = make_execution_stage(&sync_context, node_settings);
REQUIRE(stage.prune(txn) == stagedsync::Stage::Result::kSuccess);

db::PooledCursor account_changeset_table(txn, db::table::kAccountChangeSet);
Expand All @@ -431,7 +441,7 @@ TEST_CASE("Sync Stages") {

SECTION("HashState") {
stagedsync::SyncContext sync_context{};
stagedsync::HashState stage(&node_settings, &sync_context);
stagedsync::HashState stage{&sync_context, node_settings.etl()};
auto expected_stage_result{
magic_enum::enum_name<stagedsync::Stage::Result>(stagedsync::Stage::Result::kSuccess)};
auto actual_stage_result = magic_enum::enum_name<stagedsync::Stage::Result>(stage.forward(txn));
Expand Down
4 changes: 2 additions & 2 deletions silkworm/node/stagedsync/stages/stage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@

namespace silkworm::stagedsync {

Stage::Stage(SyncContext* sync_context, const char* stage_name, NodeSettings* node_settings)
: sync_context_{sync_context}, stage_name_{stage_name}, node_settings_{node_settings} {}
Stage::Stage(SyncContext* sync_context, const char* stage_name)
: sync_context_{sync_context}, stage_name_{stage_name} {}

BlockNum Stage::get_progress(db::ROTxn& txn) {
return db::stages::read_stage_progress(txn, stage_name_);
Expand Down
6 changes: 2 additions & 4 deletions silkworm/node/stagedsync/stages/stage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <silkworm/infra/common/ensure.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/concurrency/stoppable.hpp>
#include <silkworm/node/common/settings.hpp>
#include <silkworm/node/db/etl_mdbx_collector.hpp>
#include <silkworm/node/db/stages.hpp>
#include <silkworm/node/db/tables.hpp>
Expand Down Expand Up @@ -56,7 +55,6 @@ class Stage : public Stoppable {
public:
enum class [[nodiscard]] Result {
kSuccess, // valid chain
kUnknownChainId, //
kUnknownProtocolRuleSet, //
kBadChainSequence, //
kInvalidProgress, //
Expand All @@ -78,7 +76,8 @@ class Stage : public Stoppable {
Unwind, // Executing Unwind
Prune, // Executing Prune
};
explicit Stage(SyncContext* sync_context, const char* stage_name, NodeSettings* node_settings);

Stage(SyncContext* sync_context, const char* stage_name);

//! \brief Forward is called when the stage is executed. The main logic of the stage must be here.
//! \param [in] txn : A db transaction holder
Expand Down Expand Up @@ -128,7 +127,6 @@ class Stage : public Stoppable {
protected:
SyncContext* sync_context_; // Shared context across stages
const char* stage_name_; // Human friendly identifier of the stage
NodeSettings* node_settings_; // Pointer to shared node configuration settings
std::atomic<OperationType> operation_{OperationType::None}; // Actual operation being carried out
std::mutex sl_mutex_; // To synchronize access by outer sync loop
std::string log_prefix_; // Log lines prefix holding the progress among stages
Expand Down
4 changes: 2 additions & 2 deletions silkworm/node/stagedsync/stages/stage_blockhashes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Stage::Result BlockHashes::forward(db::RWTxn& txn) {
"span", std::to_string(segment_width)});
}

collector_ = std::make_unique<Collector>(node_settings_->etl());
collector_ = std::make_unique<Collector>(etl_settings_);
collect_and_load(txn, previous_progress, headers_stage_progress);
update_progress(txn, reached_block_num_);
txn.commit_and_renew();
Expand Down Expand Up @@ -124,7 +124,7 @@ Stage::Result BlockHashes::unwind(db::RWTxn& txn) {
"span", std::to_string(segment_width)});
}

collector_ = std::make_unique<Collector>(node_settings_->etl());
collector_ = std::make_unique<Collector>(etl_settings_);
collect_and_load(txn, to, previous_progress);
update_progress(txn, to);
txn.commit_and_renew();
Expand Down
7 changes: 5 additions & 2 deletions silkworm/node/stagedsync/stages/stage_blockhashes.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@

#pragma once

#include <silkworm/node/db/etl/collector_settings.hpp>
#include <silkworm/node/stagedsync/stages/stage.hpp>

namespace silkworm::stagedsync {

class BlockHashes final : public Stage {
public:
explicit BlockHashes(NodeSettings* node_settings, SyncContext* sync_context)
: Stage(sync_context, db::stages::kBlockHashesKey, node_settings){};
explicit BlockHashes(SyncContext* sync_context, db::etl::CollectorSettings etl_settings)
: Stage(sync_context, db::stages::kBlockHashesKey),
etl_settings_(std::move(etl_settings)) {}
~BlockHashes() override = default;

Stage::Result forward(db::RWTxn& txn) final;
Expand All @@ -32,6 +34,7 @@ class BlockHashes final : public Stage {
std::vector<std::string> get_log_progress() final;

private:
db::etl::CollectorSettings etl_settings_;
std::unique_ptr<db::etl_mdbx::Collector> collector_{nullptr};

/* Stats */
Expand Down
8 changes: 4 additions & 4 deletions silkworm/node/stagedsync/stages/stage_bodies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ bool BodiesStage::BodyDataModel::get_canonical_block(BlockNum height, Block& blo
return data_model_.read_canonical_block(height, block);
}

BodiesStage::BodiesStage(NodeSettings* ns, SyncContext* sc)
: Stage(sc, db::stages::kBlockBodiesKey, ns) {
}
BodiesStage::BodiesStage(SyncContext* sync_context, const ChainConfig& chain_config)
: Stage(sync_context, db::stages::kBlockBodiesKey),
chain_config_(chain_config) {}

Stage::Result BodiesStage::forward(db::RWTxn& tx) {
using std::shared_ptr;
Expand Down Expand Up @@ -129,7 +129,7 @@ Stage::Result BodiesStage::forward(db::RWTxn& tx) {
"span", std::to_string(target_height - current_height_)});
}

BodyDataModel body_persistence(tx, current_height_, node_settings_->chain_config.value());
BodyDataModel body_persistence(tx, current_height_, chain_config_);
body_persistence.set_preverified_height(PreverifiedHashes::current.height);

get_log_progress(); // this is a trick to set log progress initial value, please improve
Expand Down
4 changes: 3 additions & 1 deletion silkworm/node/stagedsync/stages/stage_bodies.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <silkworm/core/chain/config.hpp>
#include <silkworm/core/protocol/rule_set.hpp>
#include <silkworm/core/types/block.hpp>
#include <silkworm/infra/concurrency/containers.hpp>
Expand All @@ -27,7 +28,7 @@ namespace silkworm::stagedsync {

class BodiesStage : public Stage {
public:
BodiesStage(NodeSettings*, SyncContext*);
BodiesStage(SyncContext*, const ChainConfig&);
BodiesStage(const BodiesStage&) = delete; // not copyable
BodiesStage(BodiesStage&&) = delete; // nor movable
~BodiesStage() override = default;
Expand All @@ -38,6 +39,7 @@ class BodiesStage : public Stage {

private:
std::vector<std::string> get_log_progress() override; // thread safe
const ChainConfig& chain_config_;
std::atomic<BlockNum> current_height_{0};

protected:
Expand Down
Loading

0 comments on commit 73487b1

Please sign in to comment.