Skip to content

Commit

Permalink
feat(local_partition_split): init
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Mar 1, 2024
1 parent 4a05fae commit 9d0f586
Show file tree
Hide file tree
Showing 13 changed files with 553 additions and 25 deletions.
14 changes: 11 additions & 3 deletions src/server/meta_store.cpp → src/base/meta_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,22 @@
#include <rocksdb/db.h>
#include <rocksdb/status.h>

#include "base/pegasus_const.h"
#include "common/replica_envs.h"
#include "server/pegasus_server_impl.h"
#include "utils/fmt_logging.h"
#include "utils/string_conv.h"

DSN_DEFINE_string("pegasus.server",
get_meta_store_type,
"manifest",
"Where to get meta data, now support 'manifest' and 'metacf'");
DSN_DEFINE_validator(get_meta_store_type, [](const char *type) {
return strcmp(type, "manifest") == 0 || strcmp(type, "metacf") == 0;
});

namespace pegasus {
namespace server {

const std::string meta_store::DATA_VERSION = "pegasus_data_version";
const std::string meta_store::LAST_FLUSHED_DECREE = "pegasus_last_flushed_decree";
const std::string meta_store::LAST_MANUAL_COMPACT_FINISH_TIME =
Expand All @@ -38,10 +46,10 @@ const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_NORMAL = "normal";
const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE = "prefer_write";
const std::string meta_store::ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD = "bulk_load";

meta_store::meta_store(pegasus_server_impl *server,
meta_store::meta_store(const char *log_prefix,
rocksdb::DB *db,
rocksdb::ColumnFamilyHandle *meta_cf)
: replica_base(server), _db(db), _meta_cf(meta_cf)
: _name(log_prefix), _db(db), _meta_cf(meta_cf)
{
// disable write ahead logging as replication handles logging instead now
_wt_opts.disableWAL = true;
Expand Down
15 changes: 13 additions & 2 deletions src/server/meta_store.h → src/base/meta_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,17 @@ class pegasus_server_impl;
// - pegasus_data_version
// - pegasus_last_flushed_decree
// - pegasus_last_manual_compact_finish_time
class meta_store : public dsn::replication::replica_base
class meta_store
{
public:
meta_store(pegasus_server_impl *server, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf);
enum class meta_store_type
{
kManifestOnly = 0,
kMetaCFOnly,
kBothManifestAndMetaCF,
};

meta_store(const char *log_prefix, rocksdb::DB *db, rocksdb::ColumnFamilyHandle *meta_cf);

dsn::error_code get_last_flushed_decree(uint64_t *decree) const;
uint64_t get_decree_from_readonly_db(rocksdb::DB *db,
Expand Down Expand Up @@ -85,6 +92,9 @@ class meta_store : public dsn::replication::replica_base
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_latest_options);
FRIEND_TEST(pegasus_server_impl_test, test_open_db_with_app_envs);

// tricky: the 'replica_name()' is used for logging.
const char *replica_name() const { return _name.c_str(); }

// Keys of meta data wrote into meta column family.
static const std::string DATA_VERSION;
static const std::string LAST_FLUSHED_DECREE;
Expand All @@ -93,6 +103,7 @@ class meta_store : public dsn::replication::replica_base
static const std::string ROCKSDB_ENV_USAGE_SCENARIO_PREFER_WRITE;
static const std::string ROCKSDB_ENV_USAGE_SCENARIO_BULK_LOAD;

const std::string _name;
rocksdb::DB *_db;
rocksdb::ColumnFamilyHandle *_meta_cf;
rocksdb::WriteOptions _wt_opts;
Expand Down
5 changes: 5 additions & 0 deletions src/client/partition_resolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ partition_resolver_ptr partition_resolver::get_resolver(const char *cluster_name
return partition_resolver_manager::instance().find_or_create(cluster_name, meta_list, app_name);
}

int partition_resolver::get_partition_index(int partition_count, uint64_t partition_hash)
{
return partition_hash % static_cast<uint64_t>(partition_count);
}

DEFINE_TASK_CODE(LPC_RPC_DELAY_CALL, TASK_PRIORITY_COMMON, THREAD_POOL_DEFAULT)

static inline bool error_retry(error_code err)
Expand Down
20 changes: 9 additions & 11 deletions src/client/partition_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ class partition_resolver : public ref_counter
get_resolver(const char *cluster_name,
const std::vector<dsn::rpc_address> &meta_list,
const char *app_name);
/**
* get zero-based partition index
*
* \param partition_count number of partitions.
* \param partition_hash the partition hash.
*
* \return zero-based partition index.
*/
static int get_partition_index(int partition_count, uint64_t partition_hash);

template <typename TReq, typename TCallback>
dsn::rpc_response_task_ptr call_op(dsn::task_code code,
Expand Down Expand Up @@ -131,17 +140,6 @@ class partition_resolver : public ref_counter
*/
virtual void on_access_failure(int partition_index, error_code err) = 0;

/**
* get zero-based partition index
*
* \param partition_count number of partitions.
* \param partition_hash the partition hash.
*
* \return zero-based partition index.
*/

virtual int get_partition_index(int partition_count, uint64_t partition_hash) = 0;

std::string _cluster_name;
std::string _app_name;
rpc_address _meta_server;
Expand Down
4 changes: 0 additions & 4 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -448,9 +448,5 @@ error_code partition_resolver_simple::get_address(int partition_index, /*out*/ r
}
}

int partition_resolver_simple::get_partition_index(int partition_count, uint64_t partition_hash)
{
return partition_hash % static_cast<uint64_t>(partition_count);
}
} // namespace replication
} // namespace dsn
2 changes: 0 additions & 2 deletions src/client/partition_resolver_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ class partition_resolver_simple : public partition_resolver

virtual void on_access_failure(int partition_index, error_code err) override;

virtual int get_partition_index(int partition_count, uint64_t partition_hash) override;

int get_partition_count() const { return _app_partition_count; }

private:
Expand Down
4 changes: 2 additions & 2 deletions src/server/pegasus_server_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include "absl/strings/string_view.h"
#include "base/idl_utils.h" // IWYU pragma: keep
#include "base/meta_store.h"
#include "base/pegasus_key_schema.h"
#include "base/pegasus_utils.h"
#include "base/pegasus_value_schema.h"
Expand All @@ -56,7 +57,6 @@
#include "consensus_types.h"
#include "dsn.layer2_types.h"
#include "hotkey_collector.h"
#include "meta_store.h"
#include "pegasus_rpc_types.h"
#include "pegasus_server_write.h"
#include "replica_admin_types.h"
Expand Down Expand Up @@ -1736,7 +1736,7 @@ dsn::error_code pegasus_server_impl::start(int argc, char **argv)
_meta_cf = handles_opened[1];

// Create _meta_store which provide Pegasus meta data read and write.
_meta_store = std::make_unique<meta_store>(this, _db, _meta_cf);
_meta_store = dsn::make_unique<meta_store>(replica_name(), _db, _meta_cf);

if (db_exist) {
auto cleanup = dsn::defer([this]() { release_db(); });
Expand Down
3 changes: 3 additions & 0 deletions src/server/pegasus_server_impl_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@
#include <utility>
#include <vector>

#include "base/meta_store.h"
#include "common/gpid.h"
#include "hashkey_transform.h"
#include "pegasus_event_listener.h"
#include "pegasus_server_write.h"
#include "hotkey_collector.h"
#include "pegasus_event_listener.h"
#include "pegasus_server_impl.h"
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <gtest/gtest_prod.h>

#include "base/idl_utils.h"
#include "base/meta_store.h"
#include "base/pegasus_key_schema.h"
#include "logging_utils.h"
#include "meta_store.h"
Expand Down
2 changes: 1 addition & 1 deletion src/server/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ set(MY_PROJ_SRC
"../capacity_unit_calculator.cpp"
"../pegasus_mutation_duplicator.cpp"
"../hotspot_partition_calculator.cpp"
"../meta_store.cpp"
"../hotkey_collector.cpp"
"../rocksdb_wrapper.cpp"
"../compaction_filter_rule.cpp"
"../compaction_operation.cpp")

set(MY_SRC_SEARCH_MODE "GLOB")
set(MY_PROJ_LIBS
dsn_replica_server
Expand Down
5 changes: 5 additions & 0 deletions src/shell/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,3 +284,8 @@ bool clear_bulk_load(command_executor *e, shell_context *sc, arguments args);
// == detect hotkey (see 'commands/detect_hotkey.cpp') == //

bool detect_hotkey(command_executor *e, shell_context *sc, arguments args);

// == local partition split (see 'commands/local_partition_split.cpp') == //
extern const std::string local_partition_split_help;
bool local_partition_split(command_executor *e, shell_context *sc, arguments args);

Loading

0 comments on commit 9d0f586

Please sign in to comment.