Skip to content

Commit

Permalink
Merge pull request #2431 from Taraxa-project/migrations
Browse files Browse the repository at this point in the history
Migrations
  • Loading branch information
kstdl authored Apr 17, 2023
2 parents 6d40892 + 1b99150 commit 645d7e4
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 19 deletions.
1 change: 0 additions & 1 deletion libraries/cli/include/cli/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ class Config {
static constexpr const char* REBUILD_DB = "rebuild-db";
static constexpr const char* REBUILD_DB_PERIOD = "rebuild-db-period";
static constexpr const char* REVERT_TO_PERIOD = "revert-to-period";
static constexpr const char* REBUILD_DB_COLUMNS = "rebuild-db-columns";
static constexpr const char* LIGHT = "light";
static constexpr const char* HELP = "help";
static constexpr const char* VERSION = "version";
Expand Down
4 changes: 0 additions & 4 deletions libraries/cli/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ Config::Config(int argc, const char* argv[]) {
bool destroy_db = false;
bool rebuild_network = false;
bool rebuild_db = false;
bool rebuild_db_columns = false;
bool light_node = false;
bool version = false;
uint64_t rebuild_db_period = 0;
Expand Down Expand Up @@ -81,8 +80,6 @@ Config::Config(int argc, const char* argv[]) {
"rebuilding all the other "
"database tables - this could take a long "
"time");
node_command_options.add_options()(REBUILD_DB_COLUMNS, bpo::bool_switch(&rebuild_db_columns),
"Removes old DB columns ");
node_command_options.add_options()(REBUILD_DB_PERIOD, bpo::value<uint64_t>(&rebuild_db_period),
"Use with rebuild-db - Rebuild db up "
"to a specified period");
Expand Down Expand Up @@ -264,7 +261,6 @@ Config::Config(int argc, const char* argv[]) {
}
node_config_.db_config.db_revert_to_period = revert_to_period;
node_config_.db_config.rebuild_db = rebuild_db;
node_config_.db_config.rebuild_db_columns = rebuild_db_columns;
node_config_.db_config.rebuild_db_period = rebuild_db_period;

node_config_.enable_test_rpc = enable_test_rpc;
Expand Down
1 change: 0 additions & 1 deletion libraries/config/include/config/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ struct DBConfig {
PbftPeriod db_revert_to_period = 0;
bool rebuild_db = false;
PbftPeriod rebuild_db_period = 0;
bool rebuild_db_columns = false;
};

void dec_json(Json::Value const &json, DBConfig &db_config);
Expand Down
7 changes: 4 additions & 3 deletions libraries/core_libs/node/src/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "network/rpc/jsonrpc_http_processor.hpp"
#include "network/rpc/jsonrpc_ws_server.hpp"
#include "pbft/pbft_manager.hpp"
#include "storage/migration/migration_manager.hpp"
#include "transaction/gas_pricer.hpp"
#include "transaction/transaction_manager.hpp"

Expand Down Expand Up @@ -62,11 +63,9 @@ void FullNode::init() {
conf_.db_config.db_max_open_files, conf_.db_config.db_max_snapshots,
conf_.db_config.db_revert_to_period, node_addr, true);
}

db_ = std::make_shared<DbStorage>(conf_.db_path, conf_.db_config.db_snapshot_each_n_pbft_block,
conf_.db_config.db_max_open_files, conf_.db_config.db_max_snapshots,
conf_.db_config.db_revert_to_period, node_addr, false,
conf_.db_config.rebuild_db_columns);
conf_.db_config.db_revert_to_period, node_addr, false);

if (db_->hasMinorVersionChanged()) {
LOG(log_si_) << "Minor DB version has changed. Rebuilding Db";
Expand All @@ -82,6 +81,8 @@ void FullNode::init() {
if (db_->getDagBlocksCount() == 0) {
db_->setGenesisHash(conf_.genesis.genesisHash());
}

storage::migration::Manager(db_).applyAll();
}
LOG(log_nf_) << "DB initialized ...";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once
#include "storage/storage.hpp"

namespace taraxa::storage::migration {
class Base {
public:
Base(std::shared_ptr<DbStorage> db) : db_(std::move(db)), batch_(db_->createWriteBatch()) {}
virtual ~Base() = default;
virtual std::string id() = 0;
// We need to specify version here, so in case of major version change(db reindex) we won't apply unneeded migrations
virtual uint32_t dbVersion() = 0;

bool isApplied() { return db_->lookup_int<bool>(id(), DB::Columns::migrations).has_value(); }
void apply() {
if (db_->getMajorVersion() != dbVersion()) {
return;
}
migrate();
setApplied();
db_->commitWriteBatch(batch_);
}

protected:
// Method with custom logic. All db changes should be made using `batch_`
virtual void migrate() = 0;

void setApplied() { db_->insert(batch_, DB::Columns::migrations, id(), true); }

std::shared_ptr<DbStorage> db_;
DB::Batch batch_;
};
} // namespace taraxa::storage::migration
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#pragma once
#include "storage/migration/migration_base.hpp"

namespace taraxa::storage::migration {
class Manager {
public:
explicit Manager(std::shared_ptr<DbStorage> db, const addr_t& node_addr = {});
template <typename T>
void registerMigration() {
migrations_.push_back(std::make_shared<T>(db_));
}
void applyAll();

private:
std::shared_ptr<DbStorage> db_;
std::vector<std::shared_ptr<migration::Base>> migrations_;
LOG_OBJECTS_DEFINE
};
} // namespace taraxa::storage::migration
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once
#include <libdevcore/Common.h>

#include "final_chain/final_chain.hpp"
#include "storage/migration/migration_base.hpp"
#include "transaction/transaction.hpp"

namespace taraxa::storage::migration {
class TransactionHashes : public migration::Base {
public:
TransactionHashes(std::shared_ptr<DbStorage> db);
std::string id() override;
uint32_t dbVersion() override;
void migrate() override;
};
} // namespace taraxa::storage::migration
8 changes: 7 additions & 1 deletion libraries/core_libs/storage/include/storage/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class DbStorage : public std::enable_shared_from_this<DbStorage> {

// do not change/move
COLUMN(default_column);
// migrations
COLUMN(migrations);
// Contains full data for an executed PBFT block including PBFT block, cert votes, dag blocks and transactions
COLUMN_W_COMP(period_data, getIntComparator<PbftPeriod>());
COLUMN(genesis);
Expand Down Expand Up @@ -141,6 +143,7 @@ class DbStorage : public std::enable_shared_from_this<DbStorage> {
const uint32_t kDbSnapshotsMaxCount = 0;
std::set<PbftPeriod> snapshots_;

uint32_t kMajorVersion_;
bool minor_version_changed_ = false;

auto handle(Column const& col) const { return handles_[col.ordinal_]; }
Expand All @@ -150,7 +153,7 @@ class DbStorage : public std::enable_shared_from_this<DbStorage> {
public:
explicit DbStorage(fs::path const& base_path, uint32_t db_snapshot_each_n_pbft_block = 0, uint32_t max_open_files = 0,
uint32_t db_max_snapshots = 0, PbftPeriod db_revert_to_period = 0, addr_t node_addr = addr_t(),
bool rebuild = false, bool rebuild_columns = false);
bool rebuild = false);
~DbStorage();

DbStorage(const DbStorage&) = delete;
Expand All @@ -173,6 +176,9 @@ class DbStorage : public std::enable_shared_from_this<DbStorage> {
void disableSnapshots();
void enableSnapshots();

uint32_t getMajorVersion() const;
std::unique_ptr<rocksdb::Iterator> getColumnIterator(const Column& c);

// Genesis
void setGenesisHash(const h256& genesis_hash);
std::optional<h256> getGenesisHash();
Expand Down
20 changes: 20 additions & 0 deletions libraries/core_libs/storage/src/migration/migration_manager.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include "storage/migration/migration_manager.hpp"

#include "storage/migration/transaction_hashes.hpp"

namespace taraxa::storage::migration {
Manager::Manager(std::shared_ptr<DbStorage> db, const addr_t& node_addr) : db_(db) {
LOG_OBJECTS_CREATE("MIGRATIONS");
registerMigration<migration::TransactionHashes>();
}

void Manager::applyAll() {
for (const auto& m : migrations_) {
if (!m->isApplied()) {
LOG(log_nf_) << "Applying migration " << m->id();
m->apply();
LOG(log_nf_) << "Migration applied " << m->id();
}
}
}
} // namespace taraxa::storage::migration
37 changes: 37 additions & 0 deletions libraries/core_libs/storage/src/migration/transaction_hashes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#include "storage/migration/transaction_hashes.hpp"

namespace taraxa::storage::migration {
struct OldTransactionsHashes {
std::string serialized_;
size_t count_;

explicit OldTransactionsHashes(std::string serialized)
: serialized_(std::move(serialized)), count_(serialized_.size() / dev::h256::size) {}
dev::h256 get(size_t i) const {
return dev::h256(reinterpret_cast<const uint8_t*>(serialized_.data() + i * dev::h256::size),
dev::h256::ConstructFromPointer);
}
size_t count() const { return count_; }
};

TransactionHashes::TransactionHashes(std::shared_ptr<DbStorage> db) : migration::Base(db) {}

std::string TransactionHashes::id() { return "TransactionHashes"; }

uint32_t TransactionHashes::dbVersion() { return 1; }

void TransactionHashes::migrate() {
auto it = db_->getColumnIterator(DB::Columns::final_chain_transaction_hashes_by_blk_number);

// Get and save data in new format for all blocks
for (it->SeekToFirst(); it->Valid(); it->Next()) {
::taraxa::TransactionHashes new_data;
auto old_data = std::make_unique<OldTransactionsHashes>(it->value().ToString());
new_data.reserve(old_data->count());
for (size_t i = 0; i < new_data.capacity(); ++i) {
new_data.emplace_back(old_data->get(i));
}
db_->insert(batch_, DB::Columns::final_chain_transaction_hashes_by_blk_number, it->key(), dev::rlp(new_data));
}
}
} // namespace taraxa::storage::migration
25 changes: 16 additions & 9 deletions libraries/core_libs/storage/src/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ static constexpr uint16_t DAG_BLOCKS_POS_IN_PERIOD_DATA = 2;
static constexpr uint16_t TRANSACTIONS_POS_IN_PERIOD_DATA = 3;

DbStorage::DbStorage(fs::path const& path, uint32_t db_snapshot_each_n_pbft_block, uint32_t max_open_files,
uint32_t db_max_snapshots, PbftPeriod db_revert_to_period, addr_t node_addr, bool rebuild,
bool rebuild_columns)
uint32_t db_max_snapshots, PbftPeriod db_revert_to_period, addr_t node_addr, bool rebuild)
: path_(path),
handles_(Columns::all.size()),
kDbSnapshotsEachNblock(db_snapshot_each_n_pbft_block),
Expand Down Expand Up @@ -61,9 +60,7 @@ DbStorage::DbStorage(fs::path const& path, uint32_t db_snapshot_each_n_pbft_bloc
});
LOG_OBJECTS_CREATE("DBS");

if (rebuild_columns) {
rebuildColumns(options);
}
rebuildColumns(options);

// Iterate over the db folders and populate snapshot set
loadSnapshots();
Expand All @@ -80,15 +77,15 @@ DbStorage::DbStorage(fs::path const& path, uint32_t db_snapshot_each_n_pbft_bloc
dag_blocks_count_.store(getStatusField(StatusDbField::DagBlkCount));
dag_edge_count_.store(getStatusField(StatusDbField::DagEdgeCount));

uint32_t major_version = getStatusField(StatusDbField::DbMajorVersion);
kMajorVersion_ = getStatusField(StatusDbField::DbMajorVersion);
uint32_t minor_version = getStatusField(StatusDbField::DbMinorVersion);
if (major_version == 0 && minor_version == 0) {
if (kMajorVersion_ == 0 && minor_version == 0) {
saveStatusField(StatusDbField::DbMajorVersion, TARAXA_DB_MAJOR_VERSION);
saveStatusField(StatusDbField::DbMinorVersion, TARAXA_DB_MINOR_VERSION);
} else {
if (major_version != TARAXA_DB_MAJOR_VERSION) {
if (kMajorVersion_ != TARAXA_DB_MAJOR_VERSION) {
throw DbException(string("Database version mismatch. Version on disk ") +
getFormattedVersion({major_version, minor_version}) +
getFormattedVersion({kMajorVersion_, minor_version}) +
" Node version:" + getFormattedVersion({TARAXA_DB_MAJOR_VERSION, TARAXA_DB_MINOR_VERSION}));
} else if (minor_version != TARAXA_DB_MINOR_VERSION) {
minor_version_changed_ = true;
Expand All @@ -100,6 +97,10 @@ void DbStorage::rebuildColumns(const rocksdb::Options& options) {
std::unique_ptr<rocksdb::DB> db;
std::vector<std::string> column_families;
rocksdb::DB::ListColumnFamilies(options, db_path_.string(), &column_families);
if (column_families.empty()) {
LOG(log_wr_) << "DB isn't initialized in rebuildColumns. Skip it";
return;
}

std::vector<rocksdb::ColumnFamilyDescriptor> descriptors;
descriptors.reserve(column_families.size());
Expand Down Expand Up @@ -262,6 +263,12 @@ DbStorage::~DbStorage() {
checkStatus(db_->Close());
}

uint32_t DbStorage::getMajorVersion() const { return kMajorVersion_; }

std::unique_ptr<rocksdb::Iterator> DbStorage::getColumnIterator(const Column& c) {
return std::unique_ptr<rocksdb::Iterator>(db_->NewIterator(read_options_, handle(c)));
}

void DbStorage::checkStatus(rocksdb::Status const& status) {
if (status.ok()) return;
throw DbException(string("Db error. Status code: ") + std::to_string(status.code()) +
Expand Down

0 comments on commit 645d7e4

Please sign in to comment.