Skip to content

Commit

Permalink
execution: fix OOM crash due to db::Buffer::accounts realloc (#2081)
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr authored Jun 14, 2024
1 parent 9787672 commit ba106d1
Show file tree
Hide file tree
Showing 24 changed files with 206 additions and 134 deletions.
7 changes: 5 additions & 2 deletions cmd/dev/check_changes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,17 +128,20 @@ int main(int argc, char* argv[]) {
break;
}

db::Buffer buffer{txn, /*prune_history_threshold=*/0, /*historical_block=*/block_num};
db::Buffer buffer{txn};
buffer.set_historical_block(block_num);

ExecutionProcessor processor{block, *rule_set, buffer, *chain_config};
processor.evm().analysis_cache = &analysis_cache;
processor.evm().state_pool = &state_pool;

if (const auto res{processor.execute_and_write_block(receipts)}; res != ValidationResult::kOk) {
if (const ValidationResult res = processor.execute_block(receipts); res != ValidationResult::kOk) {
log::Error() << "Failed execution for block " << block_num << " result " << magic_enum::enum_name<>(res);
continue;
}

processor.flush_state();

db::AccountChanges db_account_changes{db::read_account_changes(txn, block_num)};

const auto& block_account_changes{buffer.account_changes()};
Expand Down
7 changes: 5 additions & 2 deletions cmd/dev/scan_txs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,20 @@ int main(int argc, char* argv[]) {
break;
}

db::Buffer buffer{txn, /*prune_history_threshold=*/0, /*historical_block=*/block_num};
db::Buffer buffer{txn};
buffer.set_historical_block(block_num);

ExecutionProcessor processor{block, *rule_set, buffer, *chain_config};
processor.evm().analysis_cache = &analysis_cache;
processor.evm().state_pool = &state_pool;

// Execute the block and retrieve the receipts
if (const auto res{processor.execute_and_write_block(receipts)}; res != ValidationResult::kOk) {
if (const ValidationResult res = processor.execute_block(receipts); res != ValidationResult::kOk) {
std::cerr << "Validation error " << static_cast<int>(res) << " at block " << block_num << "\n";
}

processor.flush_state();

// There is one receipt per transaction
assert(block.transactions.size() == receipts.size());

Expand Down
80 changes: 53 additions & 27 deletions silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

#include <absl/strings/str_split.h>
#include <boost/thread/scoped_thread.hpp>
#include <gsl/util>

#include <silkworm/buildinfo.h>
#include <silkworm/core/chain/config.hpp>
Expand Down Expand Up @@ -456,12 +457,12 @@ class BlockExecutor {
}

std::vector<Receipt> receipts;
const auto result{processor.execute_and_write_block(receipts)};

if (result != ValidationResult::kOk) {
return result;
if (const ValidationResult res = processor.execute_block(receipts); res != ValidationResult::kOk) {
return res;
}

processor.flush_state();

if (write_receipts_) {
state_buffer.insert_receipts(block.header.number, receipts);
}
Expand All @@ -484,7 +485,7 @@ class BlockExecutor {
log_time_ = now + 20s;
}

return result;
return ValidationResult::kOk;
}

private:
Expand Down Expand Up @@ -534,20 +535,22 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,

try {
auto txn = db::RWTxnUnmanaged{mdbx_txn};
const auto db_path{txn.db().get_path()};

db::Buffer state_buffer{txn, /*prune_history_threshold=*/0};
db::Buffer state_buffer{txn};
state_buffer.set_memory_limit(batch_size);

const size_t max_batch_size{batch_size};
auto signal_check_time{std::chrono::steady_clock::now()};

BlockNum block_number{start_block};
BlockNum last_block_number = 0;
db::DataModel da_layer{txn};
BlockExecutor block_executor{*chain_info, write_receipts, write_call_traces, write_change_sets, max_batch_size};
ValidationResult last_exec_result = ValidationResult::kOk;
boost::circular_buffer<Block> prefetched_blocks{/*buffer_capacity=*/kMaxPrefetchedBlocks};

while (block_number <= max_block) {
while (state_buffer.current_batch_state_size() < max_batch_size && block_number <= max_block) {
while (block_number <= max_block) {
if (prefetched_blocks.empty()) {
const auto num_blocks{std::min(size_t(max_block - block_number + 1), kMaxPrefetchedBlocks)};
SILK_TRACE << "Prefetching " << num_blocks << " blocks start";
Expand All @@ -562,20 +565,26 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,
}
const Block& block{prefetched_blocks.front()};

const auto result{block_executor.execute_single(block, state_buffer)};

if (result != ValidationResult::kOk) {
return SILKWORM_INVALID_BLOCK;
try {
last_exec_result = block_executor.execute_single(block, state_buffer);
} catch (const db::Buffer::MemoryLimitError&) {
// batch done
break;
}
if (last_exec_result != ValidationResult::kOk) {
// firstly, persist the work done so far, then return SILKWORM_INVALID_BLOCK
break;
}

if (signal_check(signal_check_time)) {
return SILKWORM_TERMINATION_SIGNAL;
}

last_block_number = block_number;
++block_number;
prefetched_blocks.pop_front();
}

auto last_block_number = block_number - 1;
log::Info{"[4/12 Execution] Flushing state", // NOLINT(*-unused-raii)
log_args_for_exec_flush(state_buffer, max_batch_size, last_block_number)};
state_buffer.write_state_to_db();
Expand All @@ -585,6 +594,10 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,
if (last_executed_block) {
*last_executed_block = last_block_number;
}

if (last_exec_result != ValidationResult::kOk) {
return SILKWORM_INVALID_BLOCK;
}
}
return SILKWORM_OK;
} catch (const mdbx::exception& e) {
Expand Down Expand Up @@ -629,8 +642,11 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,
auto txn = db::RWTxnManaged{unmanaged_env};
const auto db_path{unmanaged_env.get_path()};

db::Buffer state_buffer{txn, /*prune_history_threshold=*/0};
db::Buffer state_buffer{txn};
state_buffer.set_memory_limit(batch_size);

BoundedBuffer<std::optional<Block>> block_buffer{kMaxBlockBufferSize};
[[maybe_unused]] auto _ = gsl::finally([&block_buffer] { block_buffer.terminate_and_release_all(); });
BlockProvider block_provider{&block_buffer, unmanaged_env, start_block, max_block};
boost::strict_scoped_thread<boost::interrupt_and_join_if_joinable> block_provider_thread(block_provider);

Expand All @@ -639,45 +655,55 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,

std::optional<Block> block;
BlockNum block_number{start_block};
BlockNum last_block_number = 0;
BlockExecutor block_executor{*chain_info, write_receipts, write_call_traces, write_change_sets, max_batch_size};
ValidationResult last_exec_result = ValidationResult::kOk;

while (block_number <= max_block) {
while (state_buffer.current_batch_state_size() < max_batch_size && block_number <= max_block) {
while (block_number <= max_block) {
block_buffer.pop_back(&block);
if (!block) {
block_buffer.terminate_and_release_all();
return SILKWORM_BLOCK_NOT_FOUND;
}
SILKWORM_ASSERT(block->header.number == block_number);

const auto result{block_executor.execute_single(*block, state_buffer)};

if (result != ValidationResult::kOk) {
block_buffer.terminate_and_release_all();
return SILKWORM_INVALID_BLOCK;
try {
last_exec_result = block_executor.execute_single(*block, state_buffer);
} catch (const db::Buffer::MemoryLimitError&) {
// batch done
break;
}
if (last_exec_result != ValidationResult::kOk) {
// firstly, persist the work done so far, then return SILKWORM_INVALID_BLOCK
break;
}

if (signal_check(signal_check_time)) {
block_buffer.terminate_and_release_all();
return SILKWORM_TERMINATION_SIGNAL;
}

last_block_number = block_number;
++block_number;
}

StopWatch sw{/*auto_start=*/true};
log::Info{"[4/12 Execution] Flushing state", // NOLINT(*-unused-raii)
log_args_for_exec_flush(state_buffer, max_batch_size, block->header.number)};
log_args_for_exec_flush(state_buffer, max_batch_size, last_block_number)};
state_buffer.write_state_to_db();
// Always save the Execution stage progress when state batch is flushed
db::stages::write_stage_progress(txn, db::stages::kExecutionKey, block->header.number);
db::stages::write_stage_progress(txn, db::stages::kExecutionKey, last_block_number);
// Commit and renew only in case of internally managed transaction
txn.commit_and_renew();
const auto [elapsed, _]{sw.stop()};
const auto elapsed_time_and_duration = sw.stop();
log::Info("[4/12 Execution] Commit state+history", // NOLINT(*-unused-raii)
log_args_for_exec_commit(sw.since_start(elapsed), db_path));
log_args_for_exec_commit(elapsed_time_and_duration.second, db_path));

if (last_executed_block) {
*last_executed_block = block->header.number;
*last_executed_block = last_block_number;
}

if (last_exec_result != ValidationResult::kOk) {
return SILKWORM_INVALID_BLOCK;
}
}
return SILKWORM_OK;
Expand Down
9 changes: 8 additions & 1 deletion silkworm/core/execution/execution.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,14 @@ namespace silkworm {
return ValidationResult::kUnknownProtocolRuleSet;
}
ExecutionProcessor processor{block, *rule_set, state, chain_config};
return processor.execute_and_write_block(receipts);

if (const ValidationResult res = processor.execute_block(receipts); res != ValidationResult::kOk) {
return res;
}

processor.flush_state();

return ValidationResult::kOk;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion silkworm/core/execution/execution_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ TEST_CASE("Execute block with tracing") {
CallTracer call_tracer{call_traces};
processor.evm().add_tracer(call_tracer);

REQUIRE(processor.execute_and_write_block(receipts) == ValidationResult::kOk);
REQUIRE(processor.execute_block(receipts) == ValidationResult::kOk);

CHECK((block_tracer.block_start_called() && block_tracer.block_end_called()));
CHECK(call_traces.senders.empty());
Expand Down
8 changes: 5 additions & 3 deletions silkworm/core/execution/processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ ValidationResult ExecutionProcessor::execute_block_no_post_validation(std::vecto
return ValidationResult::kOk;
}

ValidationResult ExecutionProcessor::execute_and_write_block(std::vector<Receipt>& receipts) noexcept {
ValidationResult ExecutionProcessor::execute_block(std::vector<Receipt>& receipts) noexcept {
if (const ValidationResult res{execute_block_no_post_validation(receipts)}; res != ValidationResult::kOk) {
return res;
}
Expand Down Expand Up @@ -188,11 +188,13 @@ ValidationResult ExecutionProcessor::execute_and_write_block(std::vector<Receipt
return ValidationResult::kWrongLogsBloom;
}

state_.write_to_db(header.number);

return ValidationResult::kOk;
}

void ExecutionProcessor::flush_state() {
state_.write_to_db(evm_.block().header.number);
}

//! \brief Notify the registered tracers at the start of block execution.
void ExecutionProcessor::notify_block_execution_start(const Block& block) {
for (auto& tracer : evm_.tracers()) {
Expand Down
7 changes: 5 additions & 2 deletions silkworm/core/execution/processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ class ExecutionProcessor {
*/
void execute_transaction(const Transaction& txn, Receipt& receipt) noexcept;

//! \brief Execute the block and write the result to the DB.
//! \brief Execute the block.
//! \remarks Warning: This method does not verify state root; pre-Byzantium receipt root isn't validated either.
//! \pre RuleSet's validate_block_header & pre_validate_block_body must return kOk.
[[nodiscard]] ValidationResult execute_and_write_block(std::vector<Receipt>& receipts) noexcept;
[[nodiscard]] ValidationResult execute_block(std::vector<Receipt>& receipts) noexcept;

//! \brief Flush IntraBlockState into cumulative State.
void flush_state();

uint64_t available_gas() const noexcept;

Expand Down
4 changes: 3 additions & 1 deletion silkworm/core/protocol/blockchain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,12 @@ ValidationResult Blockchain::execute_block(const Block& block, bool check_state_
processor.evm().state_pool = state_pool;
processor.evm().exo_evm = exo_evm;

if (const auto res{processor.execute_and_write_block(receipts_)}; res != ValidationResult::kOk) {
if (const ValidationResult res = processor.execute_block(receipts_); res != ValidationResult::kOk) {
return res;
}

processor.flush_state();

if (check_state_root) {
evmc::bytes32 state_root{state_.state_root_hash()};
if (state_root != block.header.state_root) {
Expand Down
2 changes: 1 addition & 1 deletion silkworm/core/state/in_memory_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void InMemoryState::insert_receipts(BlockNum, const std::vector<Receipt>&) {}

void InMemoryState::insert_call_traces(BlockNum /*block_number*/, const CallTraces& /*traces*/) {}

void InMemoryState::begin_block(BlockNum block_number) {
void InMemoryState::begin_block(BlockNum block_number, size_t /*updated_accounts_count*/) {
block_number_ = block_number;
account_changes_.erase(block_number);
storage_changes_.erase(block_number);
Expand Down
2 changes: 1 addition & 1 deletion silkworm/core/state/in_memory_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class InMemoryState : public State {

void insert_call_traces(BlockNum block_number, const CallTraces& traces) override;

void begin_block(BlockNum block_number) override;
void begin_block(BlockNum block_number, size_t updated_accounts_count) override;

void update_account(const evmc::address& address, std::optional<Account> initial,
std::optional<Account> current) override;
Expand Down
2 changes: 1 addition & 1 deletion silkworm/core/state/intra_block_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ void IntraBlockState::set_transient_storage(const evmc::address& addr, const evm
}

void IntraBlockState::write_to_db(uint64_t block_number) {
db_.begin_block(block_number);
db_.begin_block(block_number, objects_.size());

for (const auto& [address, storage] : storage_) {
auto it1{objects_.find(address)};
Expand Down
2 changes: 1 addition & 1 deletion silkworm/core/state/state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class State : public BlockState {
/** Mark the beginning of a new block.
* Must be called prior to calling update_account/update_account_code/update_storage.
*/
virtual void begin_block(BlockNum block_number) = 0;
virtual void begin_block(BlockNum block_number, size_t updated_accounts_count) = 0;

virtual void update_account(const evmc::address& address, std::optional<Account> initial,
std::optional<Account> current) = 0;
Expand Down
Loading

0 comments on commit ba106d1

Please sign in to comment.