Skip to content

Commit

Permalink
[rocksdb] use original rocksdb
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Feb 4, 2020
1 parent c5dec62 commit 6fe5b0e
Show file tree
Hide file tree
Showing 9 changed files with 486 additions and 312 deletions.
2 changes: 1 addition & 1 deletion run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ LOCAL_IP=`scripts/get_local_ip`
export REPORT_DIR="$ROOT/test_report"
export DSN_ROOT=$ROOT/DSN_ROOT
export DSN_THIRDPARTY_ROOT=$ROOT/rdsn/thirdparty/output
export LD_LIBRARY_PATH=$DSN_ROOT/lib:$DSN_THIRDPARTY_ROOT/lib:$BOOST_DIR/lib:$TOOLCHAIN_DIR/lib64:$LD_LIBRARY_PATH
export LD_LIBRARY_PATH=$DSN_ROOT/lib:$DSN_THIRDPARTY_ROOT/lib:$BOOST_DIR/lib:$LD_LIBRARY_PATH

function usage()
{
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_manual_compact_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ void pegasus_manual_compact_service::extract_manual_compact_opts(
int32_t target_level;
if (dsn::buf2int32(find->second, target_level) &&
(target_level == -1 ||
(target_level >= 1 && target_level <= _app->_db_opts.num_levels))) {
(target_level >= 1 && target_level <= _app->_data_cf_opts.num_levels))) {
options.target_level = target_level;
} else {
dwarn_replica("{}={} is invalid, use default value {}",
Expand Down
733 changes: 436 additions & 297 deletions src/server/pegasus_server_impl.cpp

Large diffs are not rendered by default.

34 changes: 29 additions & 5 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service

virtual int64_t last_durable_decree() const override { return _last_durable_decree.load(); }

virtual int64_t last_flushed_decree() const override { return _db->GetLastFlushedDecree(); }
virtual int64_t last_flushed_decree() const override { return get_last_flushed_decree(); }

virtual void update_app_envs(const std::map<std::string, std::string> &envs) override;

Expand Down Expand Up @@ -293,8 +293,30 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service
return false;
}

uint64_t get_last_flushed_decree(bool read_cf_only = true) const;
uint32_t get_data_version(bool read_cf_only = true) const;
uint64_t get_last_manual_compact_finish_time(bool read_cf_only = true) const;
::dsn::error_code get_value_from_meta_cf(rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *cfh,
const std::string &key,
uint64_t *value) const;
::dsn::error_code
set_value_to_meta_cf(rocksdb::DB *db, const std::string &key, uint64_t value) const;

void release_db();

::dsn::error_code create_checkpoint_and_get_decree(const char *checkpoint_dir,
int64_t *checkpoint_decree);

::dsn::error_code flush_all_family_columns(bool wait);

private:
static const std::string COMPRESSION_HEADER;
static const std::string DATA_COLUMN_FAMILY_NAME;
static const std::string META_COLUMN_FAMILY_NAME;
static const std::string DATA_VERSION;
static const std::string LAST_FLUSHED_DECREE;
static const std::string LAST_MANUAL_COMPACT_FINISH_TIME;

dsn::gpid _gpid;
std::string _primary_address;
Expand All @@ -308,14 +330,16 @@ class pegasus_server_impl : public ::dsn::apps::rrdb_service

std::shared_ptr<KeyWithTTLCompactionFilterFactory> _key_ttl_compaction_filter_factory;
std::shared_ptr<rocksdb::Statistics> _statistics;
rocksdb::BlockBasedTableOptions _tbl_opts;
rocksdb::Options _db_opts;
rocksdb::WriteOptions _wt_opts;
rocksdb::DBOptions _db_opts;
rocksdb::ColumnFamilyOptions _data_cf_opts;
rocksdb::ColumnFamilyOptions _meta_cf_opts;
rocksdb::ReadOptions _rd_opts;
std::string _usage_scenario;

rocksdb::DB *_db;
static std::shared_ptr<rocksdb::Cache> _block_cache;
rocksdb::ColumnFamilyHandle *_data_cf;
rocksdb::ColumnFamilyHandle *_meta_cf;
static std::shared_ptr<rocksdb::Cache> _s_block_cache;
volatile bool _is_open;
uint32_t _pegasus_data_version;
std::atomic<int64_t> _last_durable_decree;
Expand Down
19 changes: 15 additions & 4 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
_primary_address(server->_primary_address),
_pegasus_data_version(server->_pegasus_data_version),
_db(server->_db),
_wt_opts(server->_wt_opts),
_data_cf(server->_data_cf),
_meta_cf(server->_meta_cf),
_rd_opts(server->_rd_opts),
_default_ttl(0),
_pfc_recent_expire_count(server->_pfc_recent_expire_count)
{
// disable write ahead logging as replication handles logging instead now
_wt_opts.disableWAL = true;
}

int empty_put(int64_t decree)
Expand Down Expand Up @@ -561,8 +564,14 @@ class pegasus_write_service::impl : public dsn::replication::replica_base

FAIL_POINT_INJECT_F("db_write", [](dsn::string_view) -> int { return FAIL_DB_WRITE; });

_wt_opts.given_decree = static_cast<uint64_t>(decree);
auto status = _db->Write(_wt_opts, &_batch);
rocksdb::Status status =
_batch.Put(_meta_cf, pegasus_server_impl::LAST_FLUSHED_DECREE, std::to_string(decree));
if (!status.ok()) {
derror_rocksdb("Write", status.ToString(), "decree: {}", decree);
return status.code();
}

status = _db->Write(_wt_opts, &_batch);
if (!status.ok()) {
derror_rocksdb("Write", status.ToString(), "decree: {}", decree);
}
Expand Down Expand Up @@ -720,7 +729,9 @@ class pegasus_write_service::impl : public dsn::replication::replica_base

rocksdb::WriteBatch _batch;
rocksdb::DB *_db;
rocksdb::WriteOptions &_wt_opts;
rocksdb::ColumnFamilyHandle *_data_cf;
rocksdb::ColumnFamilyHandle *_meta_cf;
rocksdb::WriteOptions _wt_opts;
rocksdb::ReadOptions &_rd_opts;
volatile uint32_t _default_ttl;
::dsn::perf_counter_wrapper &_pfc_recent_expire_count;
Expand Down
2 changes: 1 addition & 1 deletion src/server/test/manual_compact_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class manual_compact_service_test : public pegasus_server_test_base
manual_compact_svc->extract_manual_compact_opts(envs, key_prefix, options);
}

void set_num_level(int level) { _server->_db_opts.num_levels = level; }
void set_num_level(int level) { _server->_data_cf_opts.num_levels = level; }

void check_manual_compact_state(bool ok, const std::string &msg = "")
{
Expand Down
2 changes: 1 addition & 1 deletion src/server/test/pegasus_compression_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class pegasus_compression_options_test : public pegasus_server_test_base

pegasus_compression_options_test()
{
_server->_db_opts.num_levels = 7;
_server->_data_cf_opts.num_levels = 7;
compression_header = _server->COMPRESSION_HEADER;
}

Expand Down

0 comments on commit 6fe5b0e

Please sign in to comment.