Skip to content

Commit

Permalink
feat(dup_enhancement#9): master replica support `on_query_last_checkp…
Browse files Browse the repository at this point in the history
…oint_info` rpc for follower replica (apache#1056)
  • Loading branch information
foreverneverer committed Mar 29, 2022
1 parent 5083015 commit 2dd71c6
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 4 deletions.
2 changes: 2 additions & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ MAKE_EVENT_CODE_AIO(LPC_LERARN_REMOTE_DISK_STATE, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_RPC(RPC_CONFIG_PROPOSAL, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_RPC(RPC_QUERY_PN_DECREE, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_RPC(RPC_QUERY_REPLICA_INFO, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_RPC(RPC_QUERY_LAST_CHECKPOINT_INFO, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_PREPARE, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_DELAY_PREPARE, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_RPC(RPC_GROUP_CHECK, TASK_PRIORITY_HIGH)
Expand All @@ -161,6 +162,7 @@ MAKE_EVENT_CODE_RPC(RPC_LEARN, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_RPC(RPC_LEARN_COMPLETION_NOTIFY, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_RPC(RPC_LEARN_ADD_LEARNER, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE_RPC(RPC_REMOVE_REPLICA, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_AIO(LPC_REPLICA_COPY_LAST_CHECKPOINT_DONE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_COLD_BACKUP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CLEAR_COLD_BACKUP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_COLD_BACKUP, TASK_PRIORITY_COMMON)
Expand Down
1 change: 1 addition & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// Duplication
//
error_code trigger_manual_emergency_checkpoint(decree old_decree);
void on_query_last_checkpoint(learn_response &response);
replica_duplicator_manager *get_duplication_manager() const { return _duplication_mgr.get(); }
bool is_duplicating() const { return _duplicating; }

Expand Down
36 changes: 36 additions & 0 deletions src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@
namespace dsn {
namespace replication {

const std::string kCheckpointFolderPrefix /*NOLINT*/ = "checkpoint";

static std::string checkpoint_folder(int64_t decree)
{
return fmt::format("{}.{}", kCheckpointFolderPrefix, decree);
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::on_checkpoint_timer()
{
Expand Down Expand Up @@ -191,6 +198,35 @@ void replica::init_checkpoint(bool is_emergency)
_stub->_counter_recent_trigger_emergency_checkpoint_count->increment();
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::on_query_last_checkpoint(/*out*/ learn_response &response)
{
_checker.only_one_thread_access();

if (_app->last_durable_decree() == 0) {
response.err = ERR_PATH_NOT_FOUND;
return;
}

blob placeholder;
int err = _app->get_checkpoint(0, placeholder, response.state);
if (err != 0) {
response.err = ERR_GET_LEARN_STATE_FAILED;
} else {
response.err = ERR_OK;
response.last_committed_decree = last_committed_decree();
// for example: base_local_dir = "./data" + "checkpoint.1024" = "./data/checkpoint.1024"
response.base_local_dir = utils::filesystem::path_combine(
_app->data_dir(), checkpoint_folder(response.state.to_decree_included));
response.address = _stub->_primary_address;
for (auto &file : response.state.files) {
// response.state.files contain file absolute path, for example:
// "./data/checkpoint.1024/1.sst" use `substr` to get the file name: 1.sst
file = file.substr(response.base_local_dir.length() + 1);
}
}
}

// run in background thread
error_code replica::background_async_checkpoint(bool is_emergency)
{
Expand Down
16 changes: 16 additions & 0 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,19 @@ void replica_stub::on_query_replica_info(query_replica_info_rpc rpc)
resp.err = ERR_OK;
}

void replica_stub::on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc)
{
const learn_request &request = rpc.request();
learn_response &response = rpc.response();

replica_ptr rep = get_replica(request.pid);
if (rep != nullptr) {
rep->on_query_last_checkpoint(response);
} else {
response.err = ERR_OBJECT_NOT_FOUND;
}
}

// ThreadPool: THREAD_POOL_DEFAULT
void replica_stub::on_query_disk_info(query_disk_info_rpc rpc)
{
Expand Down Expand Up @@ -2206,6 +2219,9 @@ void replica_stub::open_service()
RPC_QUERY_PN_DECREE, "query_decree", &replica_stub::on_query_decree);
register_rpc_handler_with_rpc_holder(
RPC_QUERY_REPLICA_INFO, "query_replica_info", &replica_stub::on_query_replica_info);
register_rpc_handler_with_rpc_holder(RPC_QUERY_LAST_CHECKPOINT_INFO,
"query_last_checkpoint_info",
&replica_stub::on_query_last_checkpoint);
register_rpc_handler_with_rpc_holder(
RPC_QUERY_DISK_INFO, "query_disk_info", &replica_stub::on_query_disk_info);
register_rpc_handler_with_rpc_holder(
Expand Down
5 changes: 4 additions & 1 deletion src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ typedef rpc_holder<group_check_request, group_check_response> group_check_rpc;
typedef rpc_holder<query_replica_decree_request, query_replica_decree_response>
query_replica_decree_rpc;
typedef rpc_holder<query_replica_info_request, query_replica_info_response> query_replica_info_rpc;
typedef rpc_holder<replica_configuration, learn_response> copy_checkpoint_rpc;
typedef rpc_holder<learn_request, learn_response> query_last_checkpoint_info_rpc;
typedef rpc_holder<query_disk_info_request, query_disk_info_response> query_disk_info_rpc;
typedef rpc_holder<replica_disk_migrate_request, replica_disk_migrate_response>
replica_disk_migrate_rpc;
Expand Down Expand Up @@ -229,6 +229,9 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter

void on_add_new_disk(add_new_disk_rpc rpc);

// query last checkpoint info for follower in duplication process
void on_query_last_checkpoint(query_last_checkpoint_info_rpc rpc);

private:
enum replica_node_state
{
Expand Down
7 changes: 5 additions & 2 deletions src/replica/test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@ class mock_replication_app_base : public replication_app_base
return ERR_OK;
}
error_code prepare_get_checkpoint(blob &) override { return ERR_NOT_IMPLEMENTED; }
error_code get_checkpoint(int64_t, const blob &, learn_state &) override
error_code get_checkpoint(int64_t learn_start,
const dsn::blob &learn_request,
dsn::replication::learn_state &state) override
{
return ERR_NOT_IMPLEMENTED;
state.to_decree_included = last_durable_decree();
return ERR_OK;
}
error_code storage_apply_checkpoint(chkpt_apply_mode, const learn_state &) override
{
Expand Down
25 changes: 24 additions & 1 deletion src/replica/test/replica_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ TEST_F(replica_test, test_replica_backup_and_restore_with_specific_path)
ASSERT_EQ(ERR_OK, err);
}

TEST_F(replica_test, trigger_manual_emergency_checkpoint)
TEST_F(replica_test, test_trigger_manual_emergency_checkpoint)
{
ASSERT_EQ(_mock_replica->trigger_manual_emergency_checkpoint(100), ERR_OK);
ASSERT_TRUE(is_checkpointing());
Expand All @@ -348,5 +348,28 @@ TEST_F(replica_test, trigger_manual_emergency_checkpoint)
_mock_replica->tracker()->wait_outstanding_tasks();
}

TEST_F(replica_test, test_query_last_checkpoint_info)
{
// test no exist gpid
auto req = std::make_unique<learn_request>();
req->pid = gpid(100, 100);
query_last_checkpoint_info_rpc rpc =
query_last_checkpoint_info_rpc(std::move(req), RPC_QUERY_LAST_CHECKPOINT_INFO);
stub->on_query_last_checkpoint(rpc);
ASSERT_EQ(rpc.response().err, ERR_OBJECT_NOT_FOUND);

learn_response resp;
// last_checkpoint hasn't exist
_mock_replica->on_query_last_checkpoint(resp);
ASSERT_EQ(resp.err, ERR_PATH_NOT_FOUND);

// query ok
_mock_replica->update_last_durable_decree(100);
_mock_replica->set_last_committed_decree(200);
_mock_replica->on_query_last_checkpoint(resp);
ASSERT_EQ(resp.last_committed_decree, 200);
ASSERT_EQ(resp.base_local_dir, "./data/checkpoint.100");
}

} // namespace replication
} // namespace dsn

0 comments on commit 2dd71c6

Please sign in to comment.