Skip to content

Commit

Permalink
refactor(meta_store): Reduce the dependency of meta_store (apache#1932)
Browse files Browse the repository at this point in the history
Make class meta_store not derive from class replica_base and move it
to pegasus_base module, then we can reuse it freely.
  • Loading branch information
acelyc111 authored Mar 7, 2024
1 parent 2c9b5ed commit 1c4fcbb
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 30 deletions.
8 changes: 4 additions & 4 deletions src/server/meta_store.cpp → src/base/meta_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
#include <rocksdb/status.h>

#include "common/replica_envs.h"
#include "server/pegasus_server_impl.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"

namespace pegasus {
namespace server {

const std::string meta_store::DATA_COLUMN_FAMILY_NAME = "default";
const std::string meta_store::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
const std::string meta_store::DATA_VERSION = "pegasus_data_version";
const std::string meta_store::LAST_FLUSHED_DECREE = "pegasus_last_flushed_decree";
const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME =
Expand All @@ -38,10 +38,10 @@ const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL = "normal";
const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE = "prefer_write";
const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD = "bulk_load";

meta_store::meta_store(pegasus_server_impl *server,
meta_store::meta_store(const char *log_prefix,
rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *meta_cf)
: replica_base(server), _db(db), _meta_cf(meta_cf)
: _log_prefix(log_prefix), _db(db), _meta_cf(meta_cf)
{
// disable write ahead logging as replication handles logging instead now
_wt_opts.disableWAL = true;
Expand Down
14 changes: 9 additions & 5 deletions src/server/meta_store.h → src/base/meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <stdint.h>
#include <string>

#include "replica/replica_base.h"
#include "utils/error_code.h"

namespace rocksdb {
Expand All @@ -35,16 +34,18 @@ class DB;
namespace pegasus {
namespace server {

class pegasus_server_impl;

// Manage meta data of Pegasus, now support
// - pegasus_data_version
// - pegasus_last_flushed_decree
// - pegasus_last_manual_compact_finish_time
class meta_store : public dsn::replication::replica_base
class meta_store
{
public:
meta_store(pegasus_server_impl *server, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf);
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
static const std::string META_COLUMN_FAMILY_NAME;

meta_store(const char *log_prefix, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf);

dsn::error_code get_last_flushed_decree(uint64_t *decree) const;
uint64_t get_decree_from_readonly_db(rocksdb::DB *db,
Expand Down Expand Up @@ -85,6 +86,8 @@ class meta_store : public dsn::replication::replica_base
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);

const char *log_prefix() const { return _log_prefix.c_str(); }

// Keys of meta data wrote into meta column family.
static const std::string DATA_VERSION;
static const std::string LAST_FLUSHED_DECREE;
Expand All @@ -93,6 +96,7 @@ class meta_store : public dsn::replication::replica_base
static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE;
static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;

const std::string _log_prefix;
rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_meta_cf;
rocksdb::WriteOptions _wt_opts;
Expand Down
25 changes: 12 additions & 13 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include "absl/strings/string_view.h"
#include "base/idl_utils.h" // IWYU pragma: keep
#include "base/meta_store.h"
#include "base/pegasus_key_schema.h"
#include "base/pegasus_utils.h"
#include "base/pegasus_value_schema.h"
Expand All @@ -56,7 +57,6 @@
#include "consensus_types.h"
#include "dsn.layer2_types.h"
#include "hotkey_collector.h"
#include "meta_store.h"
#include "pegasus_rpc_types.h"
#include "pegasus_server_write.h"
#include "replica_admin_types.h"
Expand Down Expand Up @@ -142,8 +142,6 @@ ::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat;
METRIC_VAR_DEFINE_gauge_int64(rdb_block_cache_mem_usage_bytes, pegasus_server_impl);
METRIC_VAR_DEFINE_gauge_int64(rdb_write_rate_limiter_through_bytes_per_sec, pegasus_server_impl);
const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:";
const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default";
const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10);

// should be same with items in dsn::backup_restore_constant
Expand Down Expand Up @@ -1691,7 +1689,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)

if (!has_incompatible_db_options) {
for (int i = 0; i < loaded_cf_descs.size(); ++i) {
if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) {
if (loaded_cf_descs[i].name == meta_store::DATA_COLUMN_FAMILY_NAME) {
loaded_data_cf_opts = loaded_cf_descs[i].options;
}
}
Expand All @@ -1710,7 +1708,8 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
}

std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
{{meta_store::DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts},
{meta_store::META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
rocksdb::ConfigOptions config_options;
config_options.ignore_unknown_options = true;
config_options.ignore_unsupported_options = true;
Expand All @@ -1730,13 +1729,13 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
return dsn::ERR_LOCAL_APP_FAILURE;
}
CHECK_EQ_PREFIX(2, handles_opened.size());
CHECK_EQ_PREFIX(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME);
CHECK_EQ_PREFIX(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
CHECK_EQ_PREFIX(handles_opened[0]->GetName(), meta_store::DATA_COLUMN_FAMILY_NAME);
CHECK_EQ_PREFIX(handles_opened[1]->GetName(), meta_store::META_COLUMN_FAMILY_NAME);
_data_cf = handles_opened[0];
_meta_cf = handles_opened[1];

// Create _meta_store which provide Pegasus meta data read and write.
_meta_store = std::make_unique<meta_store>(this, _db, _meta_cf);
_meta_store = std::make_unique<meta_store>(replica_name(), _db, _meta_cf);

if (db_exist) {
auto cleanup = dsn::defer([this]() { release_db(); });
Expand Down Expand Up @@ -2192,8 +2191,8 @@ ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char
// Because of RocksDB's restriction, we have to to open default column family even though
// not use it
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()},
{META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}});
{{meta_store::DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()},
{meta_store::META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}});
status = rocksdb::DB::OpenForReadOnly(
_db_opts, checkpoint_dir, column_families, &handles_opened, &snapshot_db);
if (!status.ok()) {
Expand All @@ -2204,7 +2203,7 @@ ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
CHECK_EQ_PREFIX(handles_opened.size(), 2);
CHECK_EQ_PREFIX(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
CHECK_EQ_PREFIX(handles_opened[1]->GetName(), meta_store::META_COLUMN_FAMILY_NAME);
uint64_t last_flushed_decree =
_meta_store->get_decree_from_readonly_db(snapshot_db, handles_opened[1]);
*checkpoint_decree = last_flushed_decree;
Expand Down Expand Up @@ -3322,9 +3321,9 @@ ::dsn::error_code pegasus_server_impl::check_column_families(const std::string &
}

for (const auto &column_family : column_families) {
if (column_family == META_COLUMN_FAMILY_NAME) {
if (column_family == meta_store::META_COLUMN_FAMILY_NAME) {
*missing_meta_cf = false;
} else if (column_family == DATA_COLUMN_FAMILY_NAME) {
} else if (column_family == meta_store::DATA_COLUMN_FAMILY_NAME) {
*missing_data_cf = false;
} else {
LOG_ERROR_PREFIX("unknown column family name: {}", column_family);
Expand Down
3 changes: 0 additions & 3 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,6 @@ class pegasus_server_impl : public pegasus_read_service
private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
static const std::string META_COLUMN_FAMILY_NAME;

dsn::gpid _gpid;
std::string _primary_address;
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <utility>
#include <vector>

#include "base/meta_store.h" // IWYU pragma: keep
#include "common/gpid.h"
#include "hashkey_transform.h"
#include "hotkey_collector.h"
Expand All @@ -48,7 +49,6 @@
#include "runtime/rpc/rpc_address.h"
#include "server/capacity_unit_calculator.h" // IWYU pragma: keep
#include "server/key_ttl_compaction_filter.h"
#include "server/meta_store.h" // IWYU pragma: keep
#include "server/pegasus_read_service.h"
#include "server/pegasus_server_write.h" // IWYU pragma: keep
#include "server/range_read_limiter.h"
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include <gtest/gtest_prod.h>

#include "base/idl_utils.h"
#include "base/meta_store.h"
#include "base/pegasus_key_schema.h"
#include "logging_utils.h"
#include "meta_store.h"
#include "pegasus_server_impl.h"
#include "pegasus_write_service.h"
#include "rocksdb_wrapper.h"
Expand Down
2 changes: 1 addition & 1 deletion src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
#include <rocksdb/slice.h>
#include <rocksdb/status.h>

#include "base/meta_store.h"
#include "base/pegasus_value_schema.h"
#include "pegasus_key_schema.h"
#include "pegasus_utils.h"
#include "pegasus_write_service_impl.h"
#include "server/logging_utils.h"
#include "server/meta_store.h"
#include "server/pegasus_server_impl.h"
#include "server/pegasus_write_service.h"
#include "utils/autoref_ptr.h"
Expand Down
2 changes: 1 addition & 1 deletion src/server/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ set(MY_PROJ_SRC
"../capacity_unit_calculator.cpp"
"../pegasus_mutation_duplicator.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
"../hotkey_collector.cpp"
"../rocksdb_wrapper.cpp"
"../compaction_filter_rule.cpp"
"../compaction_operation.cpp")

set(MY_SRC_SEARCH_MODE "GLOB")
set(MY_PROJ_LIBS
dsn_replica_server
Expand Down
2 changes: 1 addition & 1 deletion src/server/test/pegasus_server_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
#include <string>
#include <utility>

#include "base/meta_store.h"
#include "common/replica_envs.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "pegasus_server_test_base.h"
#include "rrdb/rrdb.code.definition.h"
#include "rrdb/rrdb_types.h"
#include "runtime/serverlet.h"
#include "server/meta_store.h"
#include "server/pegasus_read_service.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
Expand Down

0 comments on commit 1c4fcbb

Please sign in to comment.