Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta_store): Reduce the dependency of meta_store #1932

Merged
merged 5 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/server/meta_store.cpp → src/base/meta_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
#include <rocksdb/status.h>

#include "common/replica_envs.h"
#include "server/pegasus_server_impl.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"

namespace pegasus {
namespace server {

const std::string meta_store::DATA_COLUMN_FAMILY_NAME = "default";
const std::string meta_store::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
const std::string meta_store::DATA_VERSION = "pegasus_data_version";
const std::string meta_store::LAST_FLUSHED_DECREE = "pegasus_last_flushed_decree";
const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME =
Expand All @@ -38,10 +38,10 @@ const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL = "normal";
const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE = "prefer_write";
const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD = "bulk_load";

meta_store::meta_store(pegasus_server_impl *server,
meta_store::meta_store(const char *log_prefix,
rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *meta_cf)
: replica_base(server), _db(db), _meta_cf(meta_cf)
: _log_prefix(log_prefix), _db(db), _meta_cf(meta_cf)
{
// disable write ahead logging as replication handles logging instead now
_wt_opts.disableWAL = true;
Expand Down
14 changes: 9 additions & 5 deletions src/server/meta_store.h → src/base/meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include <stdint.h>
#include <string>

#include "replica/replica_base.h"
#include "utils/error_code.h"

namespace rocksdb {
Expand All @@ -35,16 +34,18 @@ class DB;
namespace pegasus {
namespace server {

class pegasus_server_impl;

// Manage meta data of Pegasus, now support
// - pegasus_data_version
// - pegasus_last_flushed_decree
// - pegasus_last_manual_compact_finish_time
class meta_store : public dsn::replication::replica_base
class meta_store
{
public:
meta_store(pegasus_server_impl *server, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf);
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
static const std::string META_COLUMN_FAMILY_NAME;

meta_store(const char *log_prefix, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf);

dsn::error_code get_last_flushed_decree(uint64_t *decree) const;
uint64_t get_decree_from_readonly_db(rocksdb::DB *db,
Expand Down Expand Up @@ -85,6 +86,8 @@ class meta_store : public dsn::replication::replica_base
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);

const char *log_prefix() const { return _log_prefix.c_str(); }

// Keys of meta data wrote into meta column family.
static const std::string DATA_VERSION;
static const std::string LAST_FLUSHED_DECREE;
Expand All @@ -93,6 +96,7 @@ class meta_store : public dsn::replication::replica_base
static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE;
static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;

const std::string _log_prefix;
rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_meta_cf;
rocksdb::WriteOptions _wt_opts;
Expand Down
25 changes: 12 additions & 13 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include "absl/strings/string_view.h"
#include "base/idl_utils.h" // IWYU pragma: keep
#include "base/meta_store.h"
#include "base/pegasus_key_schema.h"
#include "base/pegasus_utils.h"
#include "base/pegasus_value_schema.h"
Expand All @@ -56,7 +57,6 @@
#include "consensus_types.h"
#include "dsn.layer2_types.h"
#include "hotkey_collector.h"
#include "meta_store.h"
#include "pegasus_rpc_types.h"
#include "pegasus_server_write.h"
#include "replica_admin_types.h"
Expand Down Expand Up @@ -142,8 +142,6 @@ ::dsn::task_ptr pegasus_server_impl::_update_server_rdb_stat;
METRIC_VAR_DEFINE_gauge_int64(rdb_block_cache_mem_usage_bytes, pegasus_server_impl);
METRIC_VAR_DEFINE_gauge_int64(rdb_write_rate_limiter_through_bytes_per_sec, pegasus_server_impl);
const std::string pegasus_server_impl::COMPRESSION_HEADER = "per_level:";
const std::string pegasus_server_impl::DATA_COLUMN_FAMILY_NAME = "default";
const std::string pegasus_server_impl::META_COLUMN_FAMILY_NAME = "pegasus_meta_cf";
const std::chrono::seconds pegasus_server_impl::kServerStatUpdateTimeSec = std::chrono::seconds(10);

// should be same with items in dsn::backup_restore_constant
Expand Down Expand Up @@ -1691,7 +1689,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)

if (!has_incompatible_db_options) {
for (int i = 0; i < loaded_cf_descs.size(); ++i) {
if (loaded_cf_descs[i].name == DATA_COLUMN_FAMILY_NAME) {
if (loaded_cf_descs[i].name == meta_store::DATA_COLUMN_FAMILY_NAME) {
loaded_data_cf_opts = loaded_cf_descs[i].options;
}
}
Expand All @@ -1710,7 +1708,8 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
}

std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts}, {META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
{{meta_store::DATA_COLUMN_FAMILY_NAME, _table_data_cf_opts},
{meta_store::META_COLUMN_FAMILY_NAME, _meta_cf_opts}});
rocksdb::ConfigOptions config_options;
config_options.ignore_unknown_options = true;
config_options.ignore_unsupported_options = true;
Expand All @@ -1730,13 +1729,13 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
return dsn::ERR_LOCAL_APP_FAILURE;
}
CHECK_EQ_PREFIX(2, handles_opened.size());
CHECK_EQ_PREFIX(handles_opened[0]->GetName(), DATA_COLUMN_FAMILY_NAME);
CHECK_EQ_PREFIX(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
CHECK_EQ_PREFIX(handles_opened[0]->GetName(), meta_store::DATA_COLUMN_FAMILY_NAME);
CHECK_EQ_PREFIX(handles_opened[1]->GetName(), meta_store::META_COLUMN_FAMILY_NAME);
_data_cf = handles_opened[0];
_meta_cf = handles_opened[1];

// Create _meta_store which provide Pegasus meta data read and write.
_meta_store = std::make_unique<meta_store>(this, _db, _meta_cf);
_meta_store = std::make_unique<meta_store>(replica_name(), _db, _meta_cf);

if (db_exist) {
auto cleanup = dsn::defer([this]() { release_db(); });
Expand Down Expand Up @@ -2192,8 +2191,8 @@ ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char
// Because of RocksDB's restriction, we have to to open default column family even though
// not use it
std::vector<rocksdb::ColumnFamilyDescriptor> column_families(
{{DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()},
{META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}});
{{meta_store::DATA_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()},
{meta_store::META_COLUMN_FAMILY_NAME, rocksdb::ColumnFamilyOptions()}});
status = rocksdb::DB::OpenForReadOnly(
_db_opts, checkpoint_dir, column_families, &handles_opened, &snapshot_db);
if (!status.ok()) {
Expand All @@ -2204,7 +2203,7 @@ ::dsn::error_code pegasus_server_impl::copy_checkpoint_to_dir_unsafe(const char
return ::dsn::ERR_LOCAL_APP_FAILURE;
}
CHECK_EQ_PREFIX(handles_opened.size(), 2);
CHECK_EQ_PREFIX(handles_opened[1]->GetName(), META_COLUMN_FAMILY_NAME);
CHECK_EQ_PREFIX(handles_opened[1]->GetName(), meta_store::META_COLUMN_FAMILY_NAME);
uint64_t last_flushed_decree =
_meta_store->get_decree_from_readonly_db(snapshot_db, handles_opened[1]);
*checkpoint_decree = last_flushed_decree;
Expand Down Expand Up @@ -3322,9 +3321,9 @@ ::dsn::error_code pegasus_server_impl::check_column_families(const std::string &
}

for (const auto &column_family : column_families) {
if (column_family == META_COLUMN_FAMILY_NAME) {
if (column_family == meta_store::META_COLUMN_FAMILY_NAME) {
*missing_meta_cf = false;
} else if (column_family == DATA_COLUMN_FAMILY_NAME) {
} else if (column_family == meta_store::DATA_COLUMN_FAMILY_NAME) {
*missing_data_cf = false;
} else {
LOG_ERROR_PREFIX("unknown column family name: {}", column_family);
Expand Down
3 changes: 0 additions & 3 deletions src/server/pegasus_server_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -476,9 +476,6 @@ class pegasus_server_impl : public pegasus_read_service
private:
static const std::chrono::seconds kServerStatUpdateTimeSec;
static const std::string COMPRESSION_HEADER;
// Column family names.
static const std::string DATA_COLUMN_FAMILY_NAME;
static const std::string META_COLUMN_FAMILY_NAME;

dsn::gpid _gpid;
std::string _primary_address;
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <utility>
#include <vector>

#include "base/meta_store.h" // IWYU pragma: keep
#include "common/gpid.h"
#include "hashkey_transform.h"
#include "hotkey_collector.h"
Expand All @@ -48,7 +49,6 @@
#include "runtime/rpc/rpc_address.h"
#include "server/capacity_unit_calculator.h" // IWYU pragma: keep
#include "server/key_ttl_compaction_filter.h"
#include "server/meta_store.h" // IWYU pragma: keep
#include "server/pegasus_read_service.h"
#include "server/pegasus_server_write.h" // IWYU pragma: keep
#include "server/range_read_limiter.h"
Expand Down
2 changes: 1 addition & 1 deletion src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include <gtest/gtest_prod.h>

#include "base/idl_utils.h"
#include "base/meta_store.h"
#include "base/pegasus_key_schema.h"
#include "logging_utils.h"
#include "meta_store.h"
#include "pegasus_server_impl.h"
#include "pegasus_write_service.h"
#include "rocksdb_wrapper.h"
Expand Down
2 changes: 1 addition & 1 deletion src/server/rocksdb_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
#include <rocksdb/slice.h>
#include <rocksdb/status.h>

#include "base/meta_store.h"
#include "base/pegasus_value_schema.h"
#include "pegasus_key_schema.h"
#include "pegasus_utils.h"
#include "pegasus_write_service_impl.h"
#include "server/logging_utils.h"
#include "server/meta_store.h"
#include "server/pegasus_server_impl.h"
#include "server/pegasus_write_service.h"
#include "utils/autoref_ptr.h"
Expand Down
2 changes: 1 addition & 1 deletion src/server/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ set(MY_PROJ_SRC
"../capacity_unit_calculator.cpp"
"../pegasus_mutation_duplicator.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
"../hotkey_collector.cpp"
"../rocksdb_wrapper.cpp"
"../compaction_filter_rule.cpp"
"../compaction_operation.cpp")

set(MY_SRC_SEARCH_MODE "GLOB")
set(MY_PROJ_LIBS
dsn_replica_server
Expand Down
2 changes: 1 addition & 1 deletion src/server/test/pegasus_server_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
#include <string>
#include <utility>

#include "base/meta_store.h"
#include "common/replica_envs.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "pegasus_server_test_base.h"
#include "rrdb/rrdb.code.definition.h"
#include "rrdb/rrdb_types.h"
#include "runtime/serverlet.h"
#include "server/meta_store.h"
#include "server/pegasus_read_service.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
Expand Down
Loading