Skip to content

Commit

Permalink
db::Buffer::MemoryLimitError for batch execution
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr committed Jun 13, 2024
1 parent 5579b2f commit 4d3ea7a
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 22 deletions.
19 changes: 14 additions & 5 deletions silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,6 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,

try {
auto txn = db::RWTxnUnmanaged{mdbx_txn};
const auto db_path{txn.db().get_path()};

db::Buffer state_buffer{txn};
state_buffer.set_memory_limit(batch_size);
Expand All @@ -551,7 +550,7 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,
boost::circular_buffer<Block> prefetched_blocks{/*buffer_capacity=*/kMaxPrefetchedBlocks};

while (block_number <= max_block) {
while (state_buffer.current_batch_state_size() < max_batch_size && block_number <= max_block) {
while (block_number <= max_block) {
if (prefetched_blocks.empty()) {
const auto num_blocks{std::min(size_t(max_block - block_number + 1), kMaxPrefetchedBlocks)};
SILK_TRACE << "Prefetching " << num_blocks << " blocks start";
Expand All @@ -566,7 +565,12 @@ int silkworm_execute_blocks_ephemeral(SilkwormHandle handle, MDBX_txn* mdbx_txn,
}
const Block& block{prefetched_blocks.front()};

last_exec_result = block_executor.execute_single(block, state_buffer);
try {
last_exec_result = block_executor.execute_single(block, state_buffer);
} catch (const db::Buffer::MemoryLimitError&) {
// batch done
break;
}
if (last_exec_result != ValidationResult::kOk) {
// firstly, persist the work done so far, then return SILKWORM_INVALID_BLOCK
break;
Expand Down Expand Up @@ -656,14 +660,19 @@ int silkworm_execute_blocks_perpetual(SilkwormHandle handle, MDBX_env* mdbx_env,
ValidationResult last_exec_result = ValidationResult::kOk;

while (block_number <= max_block) {
while (state_buffer.current_batch_state_size() < max_batch_size && block_number <= max_block) {
while (block_number <= max_block) {
block_buffer.pop_back(&block);
if (!block) {
return SILKWORM_BLOCK_NOT_FOUND;
}
SILKWORM_ASSERT(block->header.number == block_number);

last_exec_result = block_executor.execute_single(*block, state_buffer);
try {
last_exec_result = block_executor.execute_single(*block, state_buffer);
} catch (const db::Buffer::MemoryLimitError&) {
// batch done
break;
}
if (last_exec_result != ValidationResult::kOk) {
// firstly, persist the work done so far, then return SILKWORM_INVALID_BLOCK
break;
Expand Down
5 changes: 4 additions & 1 deletion silkworm/db/buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,11 @@ size_t flat_hash_map_memory_size_after_inserts(const TFlatHashMap& map, size_t i
}

void Buffer::begin_block(uint64_t block_number, size_t updated_accounts_count) {
if (current_batch_state_size() > memory_limit_) {
throw MemoryLimitError();
}
if (flat_hash_map_memory_size_after_inserts(accounts_, updated_accounts_count) > memory_limit_) {
// TODO: error
throw MemoryLimitError();
}

block_number_ = block_number;
Expand Down
6 changes: 6 additions & 0 deletions silkworm/db/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <limits>
#include <optional>
#include <stdexcept>
#include <vector>

#include <absl/container/btree_map.h>
Expand Down Expand Up @@ -147,6 +148,11 @@ class Buffer : public State {
//! \brief Persists *state* accrued contents into db
void write_state_to_db();

class MemoryLimitError : public std::runtime_error {
public:
MemoryLimitError() : std::runtime_error("db::Buffer::MemoryLimitError") {}
};

private:
RWTxn& txn_;
db::DataModel access_layer_;
Expand Down
29 changes: 13 additions & 16 deletions silkworm/node/stagedsync/stages/stage_execution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,12 @@ Stage::Result Execution::execute_batch(db::RWTxn& txn, BlockNum max_block_num, A

std::vector<Receipt> receipts;

// Transform batch_size limit into Ggas
size_t gas_max_history_size{batch_size_ * 1_Kibi / 2}; // 512MB -> 256Ggas roughly
size_t gas_max_batch_size{gas_max_history_size * 20}; // 256Ggas -> 5Tgas roughly
size_t gas_batch_size{0};

{
std::unique_lock progress_lock(progress_mtx_);
lap_time_ = std::chrono::steady_clock::now();
}

while (true) {
while (block_num_ <= max_block_num) {
if (prefetched_blocks_.empty()) {
throw_if_stopping();
prefetch_blocks(txn, block_num_, max_block_num);
Expand Down Expand Up @@ -258,7 +253,12 @@ Stage::Result Execution::execute_batch(db::RWTxn& txn, BlockNum max_block_num, A
return Stage::Result::kInvalidBlock;
}

processor.flush_state();
try {
processor.flush_state();
} catch (const db::Buffer::MemoryLimitError&) {
// batch done
break;
}

if (block_num_ >= prune_receipts_threshold) {
buffer.insert_receipts(block_num_, receipts);
Expand All @@ -274,21 +274,18 @@ Stage::Result Execution::execute_batch(db::RWTxn& txn, BlockNum max_block_num, A
++processed_blocks_;
processed_transactions_ += block.transactions.size();
processed_gas_ += block.header.gas_used;
gas_batch_size += block.header.gas_used;
progress_lock.unlock();

prefetched_blocks_.pop_front();

// Flush whole buffer if time to
if (gas_batch_size >= gas_max_batch_size || block_num_ >= max_block_num) {
log::Trace(log_prefix_, {"buffer", "state", "size", human_size(buffer.current_batch_state_size())});
buffer.write_to_db();
break;
}

++block_num_;
}

// update block_num_ to point to the last successfully executed block
block_num_--;

log::Trace(log_prefix_, {"buffer", "state", "size", human_size(buffer.current_batch_state_size())});
buffer.write_to_db();

} catch (const StageError& ex) {
log::Error(log_prefix_,
{"function", std::string(__FUNCTION__), "exception", std::string(ex.what())});
Expand Down

0 comments on commit 4d3ea7a

Please sign in to comment.