From b0f0237aaf4d119248e1ccbe0edc8e7bddffd5fc Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 11 Sep 2020 11:31:06 +0800 Subject: [PATCH 1/3] refactor: refactor replica split --- scripts/linux/build.sh | 2 +- src/replica/CMakeLists.txt | 4 + .../replica_split_manager.cpp} | 371 ++++++++++-------- .../partition_split/replica_split_manager.h | 134 +++++++ .../partition_split/test/CMakeLists.txt | 21 + .../partition_split/test/config-test.ini | 71 ++++ src/replica/partition_split/test/main.cpp | 52 +++ .../test/replica_split_test.cpp | 174 ++++---- src/replica/partition_split/test/run.sh | 11 + src/replica/replica.cpp | 5 +- src/replica/replica.h | 92 +---- src/replica/replica_chkpt.cpp | 11 +- src/replica/replica_config.cpp | 5 + src/replica/replica_stub.cpp | 14 +- src/replica/replica_stub.h | 5 +- src/replica/test/mock_utils.h | 2 - 16 files changed, 633 insertions(+), 341 deletions(-) rename src/replica/{replica_split.cpp => partition_split/replica_split_manager.cpp} (65%) create mode 100644 src/replica/partition_split/replica_split_manager.h create mode 100644 src/replica/partition_split/test/CMakeLists.txt create mode 100644 src/replica/partition_split/test/config-test.ini create mode 100644 src/replica/partition_split/test/main.cpp rename src/replica/{ => partition_split}/test/replica_split_test.cpp (70%) create mode 100755 src/replica/partition_split/test/run.sh diff --git a/scripts/linux/build.sh b/scripts/linux/build.sh index 44e183f999..8908978a63 100755 --- a/scripts/linux/build.sh +++ b/scripts/linux/build.sh @@ -153,7 +153,7 @@ echo "################################# start testing ########################## if [ -z "$TEST_MODULE" ] then # supported test module - TEST_MODULE="dsn_runtime_tests,dsn_utils_tests,dsn_perf_counter_test,dsn.zookeeper.tests,dsn_aio_test,dsn.failure_detector.tests,dsn_meta_state_tests,dsn_nfs_test,dsn_block_service_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test,dsn_replica_bulk_load_test" + TEST_MODULE="dsn_runtime_tests,dsn_utils_tests,dsn_perf_counter_test,dsn.zookeeper.tests,dsn_aio_test,dsn.failure_detector.tests,dsn_meta_state_tests,dsn_nfs_test,dsn_block_service_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test,dsn_replica_bulk_load_test,dsn_replica_split_test" fi echo "TEST_MODULE=$TEST_MODULE" diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index 2827ea9a85..802d54cd11 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -14,12 +14,15 @@ set(BACKUP_SRC backup/replica_backup_manager.cpp set(BULK_LOAD_SRC bulk_load/replica_bulk_loader.cpp) +set(SPLIT_SRC partition_split/replica_split_manager.cpp) + # Source files under CURRENT project directory will be automatically included. # You can manually set MY_PROJ_SRC to include source files under other directories. set(MY_PROJ_SRC ${DUPLICATION_SRC} ${BACKUP_SRC} ${BULK_LOAD_SRC} + ${SPLIT_SRC} ) # Search mode for source files under CURRENT project directory? @@ -48,5 +51,6 @@ dsn_add_shared_library() add_subdirectory(duplication/test) add_subdirectory(backup/test) add_subdirectory(bulk_load/test) +add_subdirectory(partition_split/test) add_subdirectory(storage) add_subdirectory(test) diff --git a/src/replica/replica_split.cpp b/src/replica/partition_split/replica_split_manager.cpp similarity index 65% rename from src/replica/replica_split.cpp rename to src/replica/partition_split/replica_split_manager.cpp index 05fd21167f..529cc08922 100644 --- a/src/replica/replica_split.cpp +++ b/src/replica/partition_split/replica_split_manager.cpp @@ -1,6 +1,20 @@ -// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "replica_split_manager.h" #include #include @@ -8,17 +22,22 @@ #include #include -#include "replica.h" -#include "replica_stub.h" - namespace dsn { namespace replication { +replica_split_manager::replica_split_manager(replica *r) + : replica_base(r), _replica(r), _stub(r->get_replica_stub()) +{ + _partition_version.store(_replica->_app_info.partition_count - 1); +} + +replica_split_manager::~replica_split_manager() {} + // ThreadPool: THREAD_POOL_REPLICATION -void replica::on_add_child(const group_check_request &request) // on parent partition +void replica_split_manager::on_add_child(const group_check_request &request) // on parent partition { if (status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY && - (status() != partition_status::PS_INACTIVE || !_inactive_is_transient)) { + (status() != partition_status::PS_INACTIVE || !_replica->_inactive_is_transient)) { dwarn_replica("receive add child request with wrong status {}, ignore this request", enum_to_string(status())); return; @@ -41,11 +60,11 @@ void replica::on_add_child(const group_check_request &request) // on parent part return; } - if (child_gpid.get_partition_index() < _app_info.partition_count) { + if (child_gpid.get_partition_index() < _replica->_app_info.partition_count) { dwarn_replica("receive old add child request, child gpid is ({}), " "local partition count is {}, ignore this request", child_gpid, - _app_info.partition_count); + _replica->_app_info.partition_count); return; } @@ -64,58 +83,61 @@ void replica::on_add_child(const group_check_request &request) // on parent part tracker(), std::bind(&replica_stub::create_child_replica, _stub, - _config.primary, - _app_info, + _replica->_config.primary, + _replica->_app_info, _child_init_ballot, _child_gpid, get_gpid(), - _dir), + _replica->_dir), get_gpid().thread_hash()); } // ThreadPool: THREAD_POOL_REPLICATION -void replica::child_init_replica(gpid parent_gpid, - rpc_address primary_address, - ballot init_ballot) // on child partition +void replica_split_manager::child_init_replica(gpid parent_gpid, + rpc_address primary_address, + ballot init_ballot) // on child partition { FAIL_POINT_INJECT_F("replica_child_init_replica", [](dsn::string_view) {}); if (status() != partition_status::PS_INACTIVE) { dwarn_replica("wrong status {}", enum_to_string(status())); _stub->split_replica_error_handler( - parent_gpid, std::bind(&replica::parent_cleanup_split_context, std::placeholders::_1)); + parent_gpid, + std::bind(&replica_split_manager::parent_cleanup_split_context, std::placeholders::_1)); return; } // update replica config - _config.ballot = init_ballot; - _config.primary = primary_address; - _config.status = partition_status::PS_PARTITION_SPLIT; + _replica->_config.ballot = init_ballot; + _replica->_config.primary = primary_address; + _replica->_config.status = partition_status::PS_PARTITION_SPLIT; // init split states - _split_states.parent_gpid = parent_gpid; - _split_states.is_prepare_list_copied = false; - _split_states.is_caught_up = false; + _replica->_split_states.parent_gpid = parent_gpid; + _replica->_split_states.is_prepare_list_copied = false; + _replica->_split_states.is_caught_up = false; ddebug_replica("init ballot is {}, parent gpid is ({})", init_ballot, parent_gpid); - dsn::error_code ec = _stub->split_replica_exec( - LPC_PARTITION_SPLIT, - _split_states.parent_gpid, - std::bind(&replica::parent_prepare_states, std::placeholders::_1, _app->learn_dir())); + dsn::error_code ec = + _stub->split_replica_exec(LPC_PARTITION_SPLIT, + _replica->_split_states.parent_gpid, + std::bind(&replica_split_manager::parent_prepare_states, + std::placeholders::_1, + _replica->_app->learn_dir())); if (ec != ERR_OK) { child_handle_split_error("parent not exist when execute parent_prepare_states"); } } // ThreadPool: THREAD_POOL_REPLICATION -bool replica::parent_check_states() // on parent partition +bool replica_split_manager::parent_check_states() // on parent partition { FAIL_POINT_INJECT_F("replica_parent_check_states", [](dsn::string_view) { return true; }); if (_child_init_ballot != get_ballot() || _child_gpid.get_app_id() == 0 || (status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY && - (status() != partition_status::PS_INACTIVE || !_inactive_is_transient))) { + (status() != partition_status::PS_INACTIVE || !_replica->_inactive_is_transient))) { dwarn_replica("parent wrong states: status({}), init_ballot({}) VS current_ballot({}), " "child_gpid({})", enum_to_string(status()), @@ -124,7 +146,7 @@ bool replica::parent_check_states() // on parent partition _child_gpid); _stub->split_replica_error_handler( _child_gpid, - std::bind(&replica::child_handle_split_error, + std::bind(&replica_split_manager::child_handle_split_error, std::placeholders::_1, "wrong parent states when execute parent_check_states")); parent_cleanup_split_context(); @@ -134,7 +156,7 @@ bool replica::parent_check_states() // on parent partition } // ThreadPool: THREAD_POOL_REPLICATION -void replica::parent_prepare_states(const std::string &dir) // on parent partition +void replica_split_manager::parent_prepare_states(const std::string &dir) // on parent partition { FAIL_POINT_INJECT_F("replica_parent_prepare_states", [](dsn::string_view) {}); @@ -145,7 +167,7 @@ void replica::parent_prepare_states(const std::string &dir) // on parent partiti learn_state parent_states; int64_t checkpoint_decree; // generate checkpoint - dsn::error_code ec = _app->copy_checkpoint_to_dir(dir.c_str(), &checkpoint_decree); + dsn::error_code ec = _replica->_app->copy_checkpoint_to_dir(dir.c_str(), &checkpoint_decree); if (ec == ERR_OK) { ddebug_replica("prepare checkpoint succeed: checkpoint dir = {}, checkpoint decree = {}", dir, @@ -158,7 +180,7 @@ void replica::parent_prepare_states(const std::string &dir) // on parent partiti derror_replica("prepare checkpoint failed, error = {}", ec.to_string()); tasking::enqueue(LPC_PARTITION_SPLIT, tracker(), - std::bind(&replica::parent_prepare_states, this, dir), + std::bind(&replica_split_manager::parent_prepare_states, this, dir), get_gpid().thread_hash(), std::chrono::seconds(1)); return; @@ -168,11 +190,12 @@ void replica::parent_prepare_states(const std::string &dir) // on parent partiti std::vector files; uint64_t total_file_size = 0; // get mutation and private log - _private_log->get_parent_mutations_and_logs( + _replica->_private_log->get_parent_mutations_and_logs( get_gpid(), checkpoint_decree + 1, invalid_ballot, mutation_list, files, total_file_size); // get prepare list - std::shared_ptr plist = std::make_shared(this, *_prepare_list); + std::shared_ptr plist = + std::make_shared(_replica, *_replica->_prepare_list); plist->truncate(last_committed_decree()); dassert_replica( @@ -192,7 +215,7 @@ void replica::parent_prepare_states(const std::string &dir) // on parent partiti ec = _stub->split_replica_exec(LPC_PARTITION_SPLIT, _child_gpid, - std::bind(&replica::child_copy_prepare_list, + std::bind(&replica_split_manager::child_copy_prepare_list, std::placeholders::_1, parent_states, mutation_list, @@ -205,32 +228,34 @@ void replica::parent_prepare_states(const std::string &dir) // on parent partiti } // ThreadPool: THREAD_POOL_REPLICATION -void replica::child_copy_prepare_list(learn_state lstate, - std::vector mutation_list, - std::vector plog_files, - uint64_t total_file_size, - std::shared_ptr plist) // on child partition +void replica_split_manager::child_copy_prepare_list( + learn_state lstate, + std::vector mutation_list, + std::vector plog_files, + uint64_t total_file_size, + std::shared_ptr plist) // on child partition { if (status() != partition_status::PS_PARTITION_SPLIT) { dwarn_replica("wrong status, status is {}", enum_to_string(status())); _stub->split_replica_error_handler( - _split_states.parent_gpid, - std::bind(&replica::parent_cleanup_split_context, std::placeholders::_1)); + _replica->_split_states.parent_gpid, + std::bind(&replica_split_manager::parent_cleanup_split_context, std::placeholders::_1)); child_handle_split_error("wrong child status when execute child_copy_prepare_list"); return; } // learning parent states is time-consuming, should execute in THREAD_POOL_REPLICATION_LONG decree last_committed_decree = plist->last_committed_decree(); - _split_states.async_learn_task = tasking::enqueue(LPC_PARTITION_SPLIT_ASYNC_LEARN, - tracker(), - std::bind(&replica::child_learn_states, - this, - lstate, - mutation_list, - plog_files, - total_file_size, - last_committed_decree)); + _replica->_split_states.async_learn_task = + tasking::enqueue(LPC_PARTITION_SPLIT_ASYNC_LEARN, + tracker(), + std::bind(&replica_split_manager::child_learn_states, + this, + lstate, + mutation_list, + plog_files, + total_file_size, + last_committed_decree)); ddebug_replica("start to copy parent prepare list, last_committed_decree={}, prepare list min " "decree={}, max decree={}", @@ -239,29 +264,29 @@ void replica::child_copy_prepare_list(learn_state lstate, plist->max_decree()); // copy parent prepare list - plist->set_committer(std::bind(&replica::execute_mutation, this, std::placeholders::_1)); - delete _prepare_list; - _prepare_list = new prepare_list(this, *plist); - for (decree d = last_committed_decree + 1; d <= _prepare_list->max_decree(); ++d) { - mutation_ptr mu = _prepare_list->get_mutation_by_decree(d); + plist->set_committer(std::bind(&replica::execute_mutation, _replica, std::placeholders::_1)); + delete _replica->_prepare_list; + _replica->_prepare_list = new prepare_list(this, *plist); + for (decree d = last_committed_decree + 1; d <= _replica->_prepare_list->max_decree(); ++d) { + mutation_ptr mu = _replica->_prepare_list->get_mutation_by_decree(d); dassert_replica(mu != nullptr, "can not find mutation, dercee={}", d); mu->data.header.pid = get_gpid(); _stub->_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr); - _private_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr); + _replica->_private_log->append(mu, LPC_WRITE_REPLICATION_LOG_COMMON, tracker(), nullptr); // set mutation has been logged in private log if (!mu->is_logged()) { mu->set_logged(); } } - _split_states.is_prepare_list_copied = true; + _replica->_split_states.is_prepare_list_copied = true; } // ThreadPool: THREAD_POOL_REPLICATION_LONG -void replica::child_learn_states(learn_state lstate, - std::vector mutation_list, - std::vector plog_files, - uint64_t total_file_size, - decree last_committed_decree) // on child partition +void replica_split_manager::child_learn_states(learn_state lstate, + std::vector mutation_list, + std::vector plog_files, + uint64_t total_file_size, + decree last_committed_decree) // on child partition { FAIL_POINT_INJECT_F("replica_child_learn_states", [](dsn::string_view) {}); @@ -288,7 +313,7 @@ void replica::child_learn_states(learn_state lstate, }); // apply parent checkpoint - err = _app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate); + err = _replica->_app->apply_checkpoint(replication_app_base::chkpt_apply_mode::learn, lstate); if (err != ERR_OK) { derror_replica("failed to apply checkpoint, error={}", err); return; @@ -303,13 +328,13 @@ void replica::child_learn_states(learn_state lstate, } // generate a checkpoint synchronously - err = _app->sync_checkpoint(); + err = _replica->_app->sync_checkpoint(); if (err != ERR_OK) { derror_replica("failed to generate checkpoint synchrounously, error={}", err); return; } - err = _app->update_init_info_ballot_and_decree(this); + err = _replica->update_init_info_ballot_and_decree(); if (err != ERR_OK) { derror_replica("update_init_info_ballot_and_decree failed, error={}", err); return; @@ -319,16 +344,17 @@ void replica::child_learn_states(learn_state lstate, tasking::enqueue(LPC_PARTITION_SPLIT, tracker(), - std::bind(&replica::child_catch_up_states, this), + std::bind(&replica_split_manager::child_catch_up_states, this), get_gpid().thread_hash()); - _split_states.async_learn_task = nullptr; + _replica->_split_states.async_learn_task = nullptr; } // ThreadPool: THREAD_POOL_REPLICATION_LONG -error_code replica::child_apply_private_logs(std::vector plog_files, - std::vector mutation_list, - uint64_t total_file_size, - decree last_committed_decree) // on child partition +error_code +replica_split_manager::child_apply_private_logs(std::vector plog_files, + std::vector mutation_list, + uint64_t total_file_size, + decree last_committed_decree) // on child partition { FAIL_POINT_INJECT_F("replica_child_apply_private_logs", [](dsn::string_view arg) { return error_code::try_get(arg.data(), ERR_OK); @@ -343,11 +369,12 @@ error_code replica::child_apply_private_logs(std::vector plog_files int64_t offset; // temp prepare_list used for apply states prepare_list plist(this, - _app->last_committed_decree(), - _options->max_mutation_count_in_prepare_list, + _replica->_app->last_committed_decree(), + _replica->_options->max_mutation_count_in_prepare_list, [this](mutation_ptr &mu) { - if (mu->data.header.decree == _app->last_committed_decree() + 1) { - _app->apply_mutation(mu); + if (mu->data.header.decree == + _replica->_app->last_committed_decree() + 1) { + _replica->_app->apply_mutation(mu); } }); @@ -371,13 +398,13 @@ error_code replica::child_apply_private_logs(std::vector plog_files dwarn_replica( "replay private_log files failed, file count={}, app last_committed_decree={}", plog_files.size(), - _app->last_committed_decree()); + _replica->_app->last_committed_decree()); return ec; } ddebug_replica("replay private_log files succeed, file count={}, app last_committed_decree={}", plog_files.size(), - _app->last_committed_decree()); + _replica->_app->last_committed_decree()); // apply in-memory mutations if replay private logs succeed int count = 0; @@ -400,13 +427,13 @@ error_code replica::child_apply_private_logs(std::vector plog_files ddebug_replica( "apply in-memory mutations succeed, mutation count={}, app last_committed_decree={}", count, - _app->last_committed_decree()); + _replica->_app->last_committed_decree()); return ec; } // ThreadPool: THREAD_POOL_REPLICATION -void replica::child_catch_up_states() // on child partition +void replica_split_manager::child_catch_up_states() // on child partition { FAIL_POINT_INJECT_F("replica_child_catch_up_states", [](dsn::string_view) {}); @@ -419,24 +446,24 @@ void replica::child_catch_up_states() // on child partition // - child prepare_list last_committed_decree = parent prepare_list last_committed_decree, also // is catch_up goal_decree // - local_decree is child local last_committed_decree which is the last decree in async-learn. - decree goal_decree = _prepare_list->last_committed_decree(); - decree local_decree = _app->last_committed_decree(); + decree goal_decree = _replica->_prepare_list->last_committed_decree(); + decree local_decree = _replica->_app->last_committed_decree(); // there are mutations written to parent during async-learn // child does not catch up parent, there are still some mutations child not learn if (local_decree < goal_decree) { - if (local_decree >= _prepare_list->min_decree()) { + if (local_decree >= _replica->_prepare_list->min_decree()) { // all missing mutations are all in prepare list dwarn_replica("there are some in-memory mutations should be learned, app " "last_committed_decree={}, " "goal decree={}, prepare_list min_decree={}", local_decree, goal_decree, - _prepare_list->min_decree()); + _replica->_prepare_list->min_decree()); for (decree d = local_decree + 1; d <= goal_decree; ++d) { - auto mu = _prepare_list->get_mutation_by_decree(d); + auto mu = _replica->_prepare_list->get_mutation_by_decree(d); dassert(mu != nullptr, ""); - error_code ec = _app->apply_mutation(mu); + error_code ec = _replica->_app->apply_mutation(mu); if (ec != ERR_OK) { child_handle_split_error("child_catchup failed because apply mutation failed"); return; @@ -449,13 +476,13 @@ void replica::child_catch_up_states() // on child partition "there are some private logs should be learned, app last_committed_decree=" "{}, prepare_list min_decree={}, please wait", local_decree, - _prepare_list->min_decree()); - _split_states.async_learn_task = tasking::enqueue( + _replica->_prepare_list->min_decree()); + _replica->_split_states.async_learn_task = tasking::enqueue( LPC_CATCHUP_WITH_PRIVATE_LOGS, tracker(), [this]() { - catch_up_with_private_logs(partition_status::PS_PARTITION_SPLIT); - _split_states.async_learn_task = nullptr; + _replica->catch_up_with_private_logs(partition_status::PS_PARTITION_SPLIT); + _replica->_split_states.async_learn_task = nullptr; }, get_gpid().thread_hash()); return; @@ -463,60 +490,63 @@ void replica::child_catch_up_states() // on child partition } ddebug_replica("child catch up parent states, goal decree={}, local decree={}", - _prepare_list->last_committed_decree(), - _app->last_committed_decree()); - _split_states.is_caught_up = true; + _replica->_prepare_list->last_committed_decree(), + _replica->_app->last_committed_decree()); + _replica->_split_states.is_caught_up = true; child_notify_catch_up(); } // ThreadPool: THREAD_POOL_REPLICATION -void replica::child_notify_catch_up() // on child partition +void replica_split_manager::child_notify_catch_up() // on child partition { FAIL_POINT_INJECT_F("replica_child_notify_catch_up", [](dsn::string_view) {}); std::unique_ptr request = make_unique(); - request->parent_gpid = _split_states.parent_gpid; + request->parent_gpid = _replica->_split_states.parent_gpid; request->child_gpid = get_gpid(); request->child_ballot = get_ballot(); request->child_address = _stub->_primary_address; ddebug_replica("send notification to primary: {}@{}, ballot={}", - _split_states.parent_gpid, - _config.primary.to_string(), + _replica->_split_states.parent_gpid, + _replica->_config.primary.to_string(), get_ballot()); notify_catch_up_rpc rpc(std::move(request), RPC_SPLIT_NOTIFY_CATCH_UP); - rpc.call(_config.primary, + rpc.call(_replica->_config.primary, tracker(), [this, rpc](error_code ec) mutable { const auto &response = rpc.response(); if (ec == ERR_TIMEOUT) { dwarn_replica("notify primary catch up timeout, please wait and retry"); - tasking::enqueue(LPC_PARTITION_SPLIT, - tracker(), - std::bind(&replica::child_notify_catch_up, this), - get_gpid().thread_hash(), - std::chrono::seconds(1)); + tasking::enqueue( + LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica_split_manager::child_notify_catch_up, this), + get_gpid().thread_hash(), + std::chrono::seconds(1)); return; } if (ec != ERR_OK || response.err != ERR_OK) { error_code err = (ec == ERR_OK) ? response.err : ec; dwarn_replica("failed to notify primary catch up, error={}", err.to_string()); _stub->split_replica_error_handler( - _split_states.parent_gpid, - std::bind(&replica::parent_cleanup_split_context, std::placeholders::_1)); + _replica->_split_states.parent_gpid, + std::bind(&replica_split_manager::parent_cleanup_split_context, + std::placeholders::_1)); child_handle_split_error("notify_primary_split_catch_up"); return; } ddebug_replica("notify primary catch up succeed"); }, - _split_states.parent_gpid.thread_hash()); + _replica->_split_states.parent_gpid.thread_hash()); } // ThreadPool: THREAD_POOL_REPLICATION -void replica::parent_handle_child_catch_up(const notify_catch_up_request &request, - notify_cacth_up_response &response) // on primary parent +void replica_split_manager::parent_handle_child_catch_up( + const notify_catch_up_request &request, + notify_cacth_up_response &response) // on primary parent { if (status() != partition_status::PS_PRIMARY) { derror_replica("status is {}", enum_to_string(status())); @@ -547,29 +577,29 @@ void replica::parent_handle_child_catch_up(const notify_catch_up_request &reques request.child_address.to_string(), request.child_ballot); - _primary_states.caught_up_children.insert(request.child_address); - // _primary_states.statuses is a map structure: rpc address -> partition_status + _replica->_primary_states.caught_up_children.insert(request.child_address); + // _replica->_primary_states.statuses is a map structure: rpc address -> partition_status // it stores replica's rpc address and partition_status of this replica group - for (auto &iter : _primary_states.statuses) { - if (_primary_states.caught_up_children.find(iter.first) == - _primary_states.caught_up_children.end()) { + for (auto &iter : _replica->_primary_states.statuses) { + if (_replica->_primary_states.caught_up_children.find(iter.first) == + _replica->_primary_states.caught_up_children.end()) { // there are child partitions not caught up its parent return; } } ddebug_replica("all child partitions catch up"); - _primary_states.caught_up_children.clear(); - _primary_states.sync_send_write_request = true; + _replica->_primary_states.caught_up_children.clear(); + _replica->_primary_states.sync_send_write_request = true; // sync_point is the first decree after parent send write request to child synchronously // when sync_point commit, parent consider child has all data it should have during async-learn - decree sync_point = _prepare_list->max_decree() + 1; - if (!_options->empty_write_disabled) { + decree sync_point = _replica->_prepare_list->max_decree() + 1; + if (!_replica->_options->empty_write_disabled) { // empty wirte here to commit sync_point - mutation_ptr mu = new_mutation(invalid_decree); + mutation_ptr mu = _replica->new_mutation(invalid_decree); mu->add_client_request(RPC_REPLICATION_WRITE_EMPTY, nullptr); - init_prepare(mu, false); + _replica->init_prepare(mu, false); dassert_replica(sync_point == mu->data.header.decree, "sync_point should be equal to mutation's decree, {} vs {}", sync_point, @@ -577,75 +607,77 @@ void replica::parent_handle_child_catch_up(const notify_catch_up_request &reques }; // check if sync_point has been committed - tasking::enqueue(LPC_PARTITION_SPLIT, - tracker(), - std::bind(&replica::parent_check_sync_point_commit, this, sync_point), - get_gpid().thread_hash(), - std::chrono::seconds(1)); + tasking::enqueue( + LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica_split_manager::parent_check_sync_point_commit, this, sync_point), + get_gpid().thread_hash(), + std::chrono::seconds(1)); } // ThreadPool: THREAD_POOL_REPLICATION -void replica::parent_check_sync_point_commit(decree sync_point) // on primary parent +void replica_split_manager::parent_check_sync_point_commit(decree sync_point) // on primary parent { FAIL_POINT_INJECT_F("replica_parent_check_sync_point_commit", [](dsn::string_view) {}); ddebug_replica("sync_point = {}, app last_committed_decree = {}", sync_point, - _app->last_committed_decree()); - if (_app->last_committed_decree() >= sync_point) { + _replica->_app->last_committed_decree()); + if (_replica->_app->last_committed_decree() >= sync_point) { // TODO(heyuchen): TBD // update child replica group partition_count } else { dwarn_replica("sync_point has not been committed, please wait and retry"); - tasking::enqueue(LPC_PARTITION_SPLIT, - tracker(), - std::bind(&replica::parent_check_sync_point_commit, this, sync_point), - get_gpid().thread_hash(), - std::chrono::seconds(1)); + tasking::enqueue( + LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica_split_manager::parent_check_sync_point_commit, this, sync_point), + get_gpid().thread_hash(), + std::chrono::seconds(1)); } } // ThreadPool: THREAD_POOL_REPLICATION -void replica::register_child_on_meta(ballot b) // on primary parent +void replica_split_manager::register_child_on_meta(ballot b) // on primary parent { if (status() != partition_status::PS_PRIMARY) { dwarn_replica("failed to register child, status = {}", enum_to_string(status())); return; } - if (_primary_states.reconfiguration_task != nullptr) { + if (_replica->_primary_states.reconfiguration_task != nullptr) { dwarn_replica("under reconfiguration, delay and retry to register child"); - _primary_states.register_child_task = + _replica->_primary_states.register_child_task = tasking::enqueue(LPC_PARTITION_SPLIT, tracker(), - std::bind(&replica::register_child_on_meta, this, b), + std::bind(&replica_split_manager::register_child_on_meta, this, b), get_gpid().thread_hash(), std::chrono::seconds(1)); return; } - partition_configuration child_config = _primary_states.membership; + partition_configuration child_config = _replica->_primary_states.membership; child_config.ballot++; child_config.last_committed_decree = 0; child_config.last_drops.clear(); - child_config.pid.set_partition_index(_app_info.partition_count + + child_config.pid.set_partition_index(_replica->_app_info.partition_count + get_gpid().get_partition_index()); register_child_request request; - request.app = _app_info; + request.app = _replica->_app_info; request.child_config = child_config; - request.parent_config = _primary_states.membership; + request.parent_config = _replica->_primary_states.membership; request.primary_address = _stub->_primary_address; // reject client request - update_local_configuration_with_no_ballot_change(partition_status::PS_INACTIVE); - set_inactive_state_transient(true); + _replica->update_local_configuration_with_no_ballot_change(partition_status::PS_INACTIVE); + _replica->set_inactive_state_transient(true); _partition_version = -1; parent_send_register_request(request); } // ThreadPool: THREAD_POOL_REPLICATION -void replica::parent_send_register_request( +void replica_split_manager::parent_send_register_request( const register_child_request &request) // on primary parent { FAIL_POINT_INJECT_F("replica_parent_send_register_request", [](dsn::string_view) {}); @@ -660,30 +692,30 @@ void replica::parent_send_register_request( rpc_address meta_address(_stub->_failure_detector->get_servers()); std::unique_ptr req = make_unique(request); register_child_rpc rpc(std::move(req), RPC_CM_REGISTER_CHILD_REPLICA); - _primary_states.register_child_task = + _replica->_primary_states.register_child_task = rpc.call(meta_address, tracker(), [this, rpc](error_code ec) mutable { on_register_child_on_meta_reply(ec, rpc.request(), rpc.response()); }, - _split_states.parent_gpid.thread_hash()); + _replica->_split_states.parent_gpid.thread_hash()); } // ThreadPool: THREAD_POOL_REPLICATION -void replica::on_register_child_on_meta_reply( +void replica_split_manager::on_register_child_on_meta_reply( dsn::error_code ec, const register_child_request &request, const register_child_response &response) // on primary parent { FAIL_POINT_INJECT_F("replica_on_register_child_on_meta_reply", [](dsn::string_view) {}); - _checker.only_one_thread_access(); + _replica->_checker.only_one_thread_access(); // primary parent is under reconfiguration, whose status should be PS_INACTIVE if (partition_status::PS_INACTIVE != status() || !_stub->is_connected()) { dwarn_replica("status wrong or stub is not connected, status = {}", enum_to_string(status())); - _primary_states.register_child_task = nullptr; + _replica->_primary_states.register_child_task = nullptr; // TODO(heyuchen): TBD - clear other split tasks in primary context return; } @@ -704,12 +736,12 @@ void replica::on_register_child_on_meta_reply( // we need not resend register request if child has been registered if (err != ERR_CHILD_REGISTERED) { - _primary_states.register_child_task = - tasking::enqueue(LPC_DELAY_UPDATE_CONFIG, - tracker(), - std::bind(&replica::parent_send_register_request, this, request), - get_gpid().thread_hash(), - std::chrono::seconds(1)); + _replica->_primary_states.register_child_task = tasking::enqueue( + LPC_DELAY_UPDATE_CONFIG, + tracker(), + std::bind(&replica_split_manager::parent_send_register_request, this, request), + get_gpid().thread_hash(), + std::chrono::seconds(1)); return; } } @@ -722,10 +754,10 @@ void replica::on_register_child_on_meta_reply( get_ballot(), enum_to_string(status())); - dcheck_eq_replica(_app_info.partition_count * 2, response.app.partition_count); + dcheck_eq_replica(_replica->_app_info.partition_count * 2, response.app.partition_count); _stub->split_replica_exec(LPC_PARTITION_SPLIT, response.child_config.pid, - std::bind(&replica::child_partition_active, + std::bind(&replica_split_manager::child_partition_active, std::placeholders::_1, response.child_config)); @@ -734,33 +766,36 @@ void replica::on_register_child_on_meta_reply( // parent register child succeed or child partition has already resgitered // in both situation, we should reset resgiter child task and child_gpid - _primary_states.register_child_task = nullptr; + _replica->_primary_states.register_child_task = nullptr; _child_gpid.set_app_id(0); if (response.parent_config.ballot >= get_ballot()) { ddebug_replica("response ballot = {}, local ballot = {}, should update configuration", response.parent_config.ballot, get_ballot()); - update_configuration(response.parent_config); + _replica->update_configuration(response.parent_config); } } // ThreadPool: THREAD_POOL_REPLICATION -void replica::child_partition_active(const partition_configuration &config) // on child +void replica_split_manager::child_partition_active( + const partition_configuration &config) // on child { ddebug_replica("child partition become active"); - _primary_states.last_prepare_decree_on_new_primary = _prepare_list->max_decree(); - update_configuration(config); + _replica->_primary_states.last_prepare_decree_on_new_primary = + _replica->_prepare_list->max_decree(); + _replica->update_configuration(config); } // ThreadPool: THREAD_POOL_REPLICATION -void replica::parent_cleanup_split_context() // on parent partition +void replica_split_manager::parent_cleanup_split_context() // on parent partition { _child_gpid.set_app_id(0); _child_init_ballot = 0; } // ThreadPool: THREAD_POOL_REPLICATION -void replica::child_handle_split_error(const std::string &error_msg) // on child partition +void replica_split_manager::child_handle_split_error( + const std::string &error_msg) // on child partition { if (status() != partition_status::PS_ERROR) { dwarn_replica("partition split failed because {}", error_msg); @@ -771,13 +806,13 @@ void replica::child_handle_split_error(const std::string &error_msg) // on child } // ThreadPool: THREAD_POOL_REPLICATION_LONG -void replica::child_handle_async_learn_error() // on child partition +void replica_split_manager::child_handle_async_learn_error() // on child partition { _stub->split_replica_error_handler( - _split_states.parent_gpid, - std::bind(&replica::parent_cleanup_split_context, std::placeholders::_1)); + _replica->_split_states.parent_gpid, + std::bind(&replica_split_manager::parent_cleanup_split_context, std::placeholders::_1)); child_handle_split_error("meet error when execute child_learn_states"); - _split_states.async_learn_task = nullptr; + _replica->_split_states.async_learn_task = nullptr; } } // namespace replication diff --git a/src/replica/partition_split/replica_split_manager.h b/src/replica/partition_split/replica_split_manager.h new file mode 100644 index 0000000000..19188ffa69 --- /dev/null +++ b/src/replica/partition_split/replica_split_manager.h @@ -0,0 +1,134 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "replica/replica.h" +#include "replica/replica_context.h" +#include "replica/replica_stub.h" + +namespace dsn { +namespace replication { + +class replica_split_manager : replica_base +{ +public: + explicit replica_split_manager(replica *r); + ~replica_split_manager(); + + int32_t get_partition_version() const { return _partition_version.load(); } + gpid get_child_gpid() const { return _child_gpid; } + void set_child_gpid(gpid pid) { _child_gpid = pid; } + +private: + // parent partition create child + void on_add_child(const group_check_request &request); + + // child replica initialize config and state info + void child_init_replica(gpid parent_gpid, dsn::rpc_address primary_address, ballot init_ballot); + + void parent_prepare_states(const std::string &dir); + + // child copy parent prepare list and call child_learn_states + void child_copy_prepare_list(learn_state lstate, + std::vector mutation_list, + std::vector plog_files, + uint64_t total_file_size, + std::shared_ptr plist); + + // child learn states(including checkpoint, private logs, in-memory mutations) + void child_learn_states(learn_state lstate, + std::vector mutation_list, + std::vector plog_files, + uint64_t total_file_size, + decree last_committed_decree); + + // TODO(heyuchen): total_file_size is used for split perf-counter in further pull request + // Applies mutation logs that were learned from the parent of this child. + // This stage follows after that child applies the checkpoint of parent, and begins to apply the + // mutations. + // \param last_committed_decree: parent's last_committed_decree when the checkpoint was + // generated. + error_code child_apply_private_logs(std::vector plog_files, + std::vector mutation_list, + uint64_t total_file_size, + decree last_committed_decree); + + // child catch up parent states while executing async learn task + void child_catch_up_states(); + + // child send notification to primary parent when it finish async learn + void child_notify_catch_up(); + + // primary parent handle child catch_up request + void parent_handle_child_catch_up(const notify_catch_up_request &request, + notify_cacth_up_response &response); + + // primary parent check if sync_point has been committed + // sync_point is the first decree after parent send write request to child synchronously + void parent_check_sync_point_commit(decree sync_point); + + // primary parent register children on meta_server + void register_child_on_meta(ballot b); + void on_register_child_on_meta_reply(dsn::error_code ec, + const register_child_request &request, + const register_child_response &response); + // primary sends register request to meta_server + void parent_send_register_request(const register_child_request &request); + + // child partition has been registered on meta_server, could be active + void child_partition_active(const partition_configuration &config); + + // return true if parent status is valid + bool parent_check_states(); + + // parent reset child information when partition split failed + void parent_cleanup_split_context(); + // child suicide when partition split failed + void child_handle_split_error(const std::string &error_msg); + // child handle error while async learn parent states + void child_handle_async_learn_error(); + + // + // helper functions + // + partition_status::type status() const { return _replica->status(); } + ballot get_ballot() const { return _replica->get_ballot(); } + decree last_committed_decree() const { return _replica->last_committed_decree(); } + task_tracker *tracker() { return _replica->tracker(); } + +private: + replica *_replica; + replica_stub *_stub; + + friend class replica; + friend class replica_stub; + friend class replica_split_test; + + // _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition + // _child_gpid.app_id = 0 for parent partition not in partition split and child partition + dsn::gpid _child_gpid{0, 0}; + // ballot when starting partition split and split will stop if ballot changed + // _child_init_ballot = 0 if partition not in partition split + ballot _child_init_ballot{0}; + // in normal cases, _partition_version = partition_count-1 + // when replica reject client read write request, partition_version = -1 + std::atomic _partition_version; +}; + +} // namespace replication +} // namespace dsn diff --git a/src/replica/partition_split/test/CMakeLists.txt b/src/replica/partition_split/test/CMakeLists.txt new file mode 100644 index 0000000000..ee7ca688b7 --- /dev/null +++ b/src/replica/partition_split/test/CMakeLists.txt @@ -0,0 +1,21 @@ +set(MY_PROJ_NAME dsn_replica_split_test) + +set(MY_PROJ_SRC "") + +set(MY_SRC_SEARCH_MODE "GLOB") + +set(MY_PROJ_LIBS dsn_meta_server + dsn_replica_server + dsn_replication_common + dsn_runtime + gtest +) + +set(MY_BOOST_LIBS Boost::system Boost::filesystem Boost::regex) + +set(MY_BINPLACES + config-test.ini + run.sh +) + +dsn_add_test() diff --git a/src/replica/partition_split/test/config-test.ini b/src/replica/partition_split/test/config-test.ini new file mode 100644 index 0000000000..64949591a2 --- /dev/null +++ b/src/replica/partition_split/test/config-test.ini @@ -0,0 +1,71 @@ +[apps..default] +run = true +count = 1 +;network.client.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536 +;network.client.RPC_CHANNEL_UDP = dsn::tools::sim_network_provider, 65536 +;network.server.0.RPC_CHANNEL_TCP = dsn::tools::sim_network_provider, 65536 + +[apps.replica] +type = replica +run = true +count = 1 +ports = 54321 +pools = THREAD_POOL_DEFAULT,THREAD_POOL_REPLICATION_LONG,THREAD_POOL_REPLICATION,THREAD_POOL_SLOG + +[core] +;tool = simulator +tool = nativerun + +;toollets = tracer, profiler +;fault_injector +pause_on_start = false + +logging_start_level = LOG_LEVEL_DEBUG +logging_factory_name = dsn::tools::simple_logger + + +[tools.simple_logger] +fast_flush = true +short_header = false +stderr_start_level = LOG_LEVEL_WARNING + +[tools.simulator] +random_seed = 1465902258 + +[tools.screen_logger] +short_header = false + +[network] +; how many network threads for network library (used by asio) +io_service_worker_count = 2 + +; specification for each thread pool +[threadpool..default] +worker_count = 4 + +[threadpool.THREAD_POOL_DEFAULT] +name = default +partitioned = false +worker_priority = THREAD_xPRIORITY_NORMAL +worker_count = 2 + +[threadpool.THREAD_POOL_SLOG] + name = slog + worker_count = 1 + +[threadpool.THREAD_POOL_REPLICATION] +name = replica +partitioned = true +worker_priority = THREAD_xPRIORITY_NORMAL +worker_count = 3 + +[threadpool.THREAD_POOL_REPLICATION_LONG] +name = replica_long + +[task..default] +is_trace = true +is_profile = true +allow_inline = false +rpc_call_channel = RPC_CHANNEL_TCP +rpc_message_header_format = dsn +rpc_timeout_milliseconds = 5000 diff --git a/src/replica/partition_split/test/main.cpp b/src/replica/partition_split/test/main.cpp new file mode 100644 index 0000000000..ab7851b9a6 --- /dev/null +++ b/src/replica/partition_split/test/main.cpp @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +int g_test_count = 0; +int g_test_ret = 0; + +class gtest_app : public dsn::service_app +{ +public: + gtest_app(const dsn::service_app_info *info) : ::dsn::service_app(info) {} + + dsn::error_code start(const std::vector &args) override + { + g_test_ret = RUN_ALL_TESTS(); + g_test_count = 1; + return dsn::ERR_OK; + } + + dsn::error_code stop(bool) override { return dsn::ERR_OK; } +}; + +GTEST_API_ int main(int argc, char **argv) +{ + testing::InitGoogleTest(&argc, argv); + + dsn::service_app::register_factory("replica"); + + dsn_run_config("config-test.ini", false); + while (g_test_count == 0) { + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + + dsn_exit(g_test_ret); +} diff --git a/src/replica/test/replica_split_test.cpp b/src/replica/partition_split/test/replica_split_test.cpp similarity index 70% rename from src/replica/test/replica_split_test.cpp rename to src/replica/partition_split/test/replica_split_test.cpp index 081c963834..df6389e9f3 100644 --- a/src/replica/test/replica_split_test.cpp +++ b/src/replica/partition_split/test/replica_split_test.cpp @@ -1,11 +1,25 @@ -// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved. -// This source code is licensed under the Apache License Version 2.0, which -// can be found in the LICENSE file in the root directory of this source tree. +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "replica/partition_split/replica_split_manager.h" +#include "replica/test/replica_test_base.h" #include - #include -#include "replica_test_base.h" namespace dsn { namespace replication { @@ -18,8 +32,9 @@ class replica_split_test : public testing::Test _stub = make_unique(); _stub->set_state_connected(); mock_app_info(); - _parent = _stub->generate_replica( + _parent_replica = _stub->generate_replica( _app_info, _parent_pid, partition_status::PS_PRIMARY, _init_ballot); + _parent_split_mgr = make_unique(_parent_replica.get()); mock_group_check_request(); } @@ -44,7 +59,7 @@ class replica_split_test : public testing::Test void mock_notify_catch_up_request() { - _parent->set_child_gpid(_child_pid); + _parent_split_mgr->_child_gpid = _child_pid; _catch_up_req.child_gpid = _child_pid; _catch_up_req.parent_gpid = _parent_pid; _catch_up_req.child_ballot = _init_ballot; @@ -69,9 +84,10 @@ class replica_split_test : public testing::Test void generate_child(partition_status::type status) { - _child = _stub->generate_replica(_app_info, _child_pid, status, _init_ballot); - _parent->set_child_gpid(_child_pid); - _parent->set_init_child_ballot(_init_ballot); + _child_replica = _stub->generate_replica(_app_info, _child_pid, status, _init_ballot); + _child_split_mgr = make_unique(_child_replica.get()); + _parent_split_mgr->_child_gpid = _child_pid; + _parent_split_mgr->_child_init_ballot = _init_ballot; } void mock_shared_log() @@ -110,15 +126,15 @@ class replica_split_test : public testing::Test void mock_parent_states() { mock_shared_log(); - mock_private_log(_parent_pid, _parent, true); - mock_prepare_list(_parent, true); + mock_private_log(_parent_pid, _parent_replica, true); + mock_prepare_list(_parent_replica, true); } void mock_child_split_context(gpid parent_gpid, bool is_prepare_list_copied, bool is_caught_up) { - _child->_split_states.parent_gpid = parent_gpid; - _child->_split_states.is_prepare_list_copied = is_prepare_list_copied; - _child->_split_states.is_caught_up = is_caught_up; + _child_replica->_split_states.parent_gpid = parent_gpid; + _child_replica->_split_states.is_prepare_list_copied = is_prepare_list_copied; + _child_replica->_split_states.is_caught_up = is_caught_up; } void mock_mutation_list(decree min_decree) @@ -134,26 +150,31 @@ class replica_split_test : public testing::Test void mock_parent_primary_context(bool will_all_caught_up) { - _parent->_primary_states.statuses[dsn::rpc_address("127.0.0.1", 1)] = + _parent_replica->_primary_states.statuses[dsn::rpc_address("127.0.0.1", 1)] = partition_status::PS_PRIMARY; - _parent->_primary_states.statuses[dsn::rpc_address("127.0.0.1", 2)] = + _parent_replica->_primary_states.statuses[dsn::rpc_address("127.0.0.1", 2)] = partition_status::PS_SECONDARY; - _parent->_primary_states.statuses[dsn::rpc_address("127.0.0.1", 3)] = + _parent_replica->_primary_states.statuses[dsn::rpc_address("127.0.0.1", 3)] = partition_status::PS_SECONDARY; - _parent->_primary_states.caught_up_children.insert(dsn::rpc_address("127.0.0.1", 2)); + _parent_replica->_primary_states.caught_up_children.insert( + dsn::rpc_address("127.0.0.1", 2)); if (will_all_caught_up) { - _parent->_primary_states.caught_up_children.insert(dsn::rpc_address("127.0.0.1", 3)); + _parent_replica->_primary_states.caught_up_children.insert( + dsn::rpc_address("127.0.0.1", 3)); } - _parent->_primary_states.sync_send_write_request = false; + _parent_replica->_primary_states.sync_send_write_request = false; } - bool get_sync_send_write_request() { return _parent->_primary_states.sync_send_write_request; } + bool get_sync_send_write_request() + { + return _parent_replica->_primary_states.sync_send_write_request; + } void mock_child_async_learn_states(mock_replica_ptr plist_rep, bool add_to_plog, decree min_decree) { mock_shared_log(); - mock_private_log(_child_pid, _child, false); + mock_private_log(_child_pid, _child_replica, false); mock_prepare_list(plist_rep, add_to_plog); // mock_learn_state _mock_learn_state.to_decree_included = _decree; @@ -168,89 +189,88 @@ class replica_split_test : public testing::Test void cleanup_child_split_context() { - _child->_split_states.cleanup(true); - _child->tracker()->wait_outstanding_tasks(); + _child_replica->_split_states.cleanup(true); + _child_replica->tracker()->wait_outstanding_tasks(); } - partition_split_context get_split_context() { return _child->_split_states; } + partition_split_context get_split_context() { return _child_replica->_split_states; } primary_context get_replica_primary_context(mock_replica_ptr rep) { return rep->_primary_states; } - bool is_parent_not_in_split() { return (_parent->_child_gpid.get_app_id() == 0); } - - int32_t get_partition_version(mock_replica_ptr rep) { return rep->_partition_version.load(); } + bool is_parent_not_in_split() { return (_parent_split_mgr->_child_gpid.get_app_id() == 0); } void test_on_add_child() { - _parent->on_add_child(_group_check_req); - _parent->tracker()->wait_outstanding_tasks(); + _parent_split_mgr->on_add_child(_group_check_req); + _parent_replica->tracker()->wait_outstanding_tasks(); } - bool test_parent_check_states() { return _parent->parent_check_states(); } + bool test_parent_check_states() { return _parent_split_mgr->parent_check_states(); } void test_child_copy_prepare_list() { - mock_child_async_learn_states(_parent, false, _decree); - std::shared_ptr plist = std::make_shared(_parent, *_mock_plist); - _child->child_copy_prepare_list(_mock_learn_state, - _mutation_list, - _private_log_files, - _total_file_size, - std::move(plist)); - _child->tracker()->wait_outstanding_tasks(); + mock_child_async_learn_states(_parent_replica, false, _decree); + std::shared_ptr plist = + std::make_shared(_parent_replica, *_mock_plist); + _child_split_mgr->child_copy_prepare_list(_mock_learn_state, + _mutation_list, + _private_log_files, + _total_file_size, + std::move(plist)); + _child_replica->tracker()->wait_outstanding_tasks(); } void test_child_learn_states() { - mock_child_async_learn_states(_child, true, _decree); - _child->child_learn_states( + mock_child_async_learn_states(_child_replica, true, _decree); + _child_split_mgr->child_learn_states( _mock_learn_state, _mutation_list, _private_log_files, _total_file_size, _decree); - _child->tracker()->wait_outstanding_tasks(); + _child_replica->tracker()->wait_outstanding_tasks(); } void test_child_apply_private_logs() { - mock_child_async_learn_states(_child, true, 0); - _child->child_apply_private_logs( + mock_child_async_learn_states(_child_replica, true, 0); + _child_split_mgr->child_apply_private_logs( _private_log_files, _mutation_list, _total_file_size, _decree); - _child->tracker()->wait_outstanding_tasks(); + _child_replica->tracker()->wait_outstanding_tasks(); } void test_child_catch_up_states(decree local_decree, decree goal_decree, decree min_decree) { - mock_child_async_learn_states(_child, true, 0); - _child->set_app_last_committed_decree(local_decree); + mock_child_async_learn_states(_child_replica, true, 0); + _child_replica->set_app_last_committed_decree(local_decree); if (local_decree < goal_decree) { // set prepare_list's start_decree = {min_decree} - _child->prepare_list_truncate(min_decree); + _child_replica->prepare_list_truncate(min_decree); // set prepare_list's last_committed_decree = {goal_decree} - _child->prepare_list_commit_hard(goal_decree); + _child_replica->prepare_list_commit_hard(goal_decree); } - _child->child_catch_up_states(); - _child->tracker()->wait_outstanding_tasks(); + _child_split_mgr->child_catch_up_states(); + _child_replica->tracker()->wait_outstanding_tasks(); } dsn::error_code test_parent_handle_child_catch_up() { notify_cacth_up_response resp; - _parent->parent_handle_child_catch_up(_catch_up_req, resp); - _parent->tracker()->wait_outstanding_tasks(); + _parent_split_mgr->parent_handle_child_catch_up(_catch_up_req, resp); + _parent_replica->tracker()->wait_outstanding_tasks(); return resp.err; } void test_register_child_on_meta() { - _parent->register_child_on_meta(_init_ballot); - _parent->tracker()->wait_outstanding_tasks(); + _parent_split_mgr->register_child_on_meta(_init_ballot); + _parent_replica->tracker()->wait_outstanding_tasks(); } void test_on_register_child_rely(partition_status::type status, dsn::error_code resp_err) { mock_register_child_request(); - _parent->_config.status = status; + _parent_replica->_config.status = status; register_child_response resp; resp.err = resp_err; @@ -259,15 +279,17 @@ class replica_split_test : public testing::Test resp.parent_config = _register_req.parent_config; resp.child_config = _register_req.child_config; - _parent->on_register_child_on_meta_reply(ERR_OK, _register_req, resp); - _parent->tracker()->wait_outstanding_tasks(); + _parent_split_mgr->on_register_child_on_meta_reply(ERR_OK, _register_req, resp); + _parent_replica->tracker()->wait_outstanding_tasks(); } public: std::unique_ptr _stub; - mock_replica_ptr _parent; - mock_replica_ptr _child; + mock_replica_ptr _parent_replica; + mock_replica_ptr _child_replica; + std::unique_ptr _parent_split_mgr; + std::unique_ptr _child_split_mgr; dsn::app_info _app_info; dsn::gpid _parent_pid = gpid(2, 1); @@ -297,7 +319,7 @@ TEST_F(replica_split_test, add_child_wrong_ballot) TEST_F(replica_split_test, add_child_with_child_existed) { - _parent->set_child_gpid(_child_pid); + _parent_split_mgr->set_child_gpid(_child_pid); test_on_add_child(); ASSERT_EQ(_stub->get_replica(_child_pid), nullptr); } @@ -318,7 +340,7 @@ TEST_F(replica_split_test, add_child_succeed) TEST_F(replica_split_test, parent_check_states_with_wrong_status) { generate_child(partition_status::PS_PARTITION_SPLIT); - _parent->set_partition_status(partition_status::PS_POTENTIAL_SECONDARY); + _parent_replica->set_partition_status(partition_status::PS_POTENTIAL_SECONDARY); fail::setup(); fail::cfg("replica_stub_split_replica_exec", "return()"); @@ -344,7 +366,7 @@ TEST_F(replica_split_test, copy_prepare_list_with_wrong_status) test_child_copy_prepare_list(); fail::teardown(); - cleanup_prepare_list(_parent); + cleanup_prepare_list(_parent_replica); // TODO(heyuchen): child should be equal to error(after implement child_handle_split_error) } @@ -361,10 +383,10 @@ TEST_F(replica_split_test, copy_prepare_list_succeed) partition_split_context split_context = get_split_context(); ASSERT_EQ(split_context.is_prepare_list_copied, true); - ASSERT_EQ(_child->get_plist()->count(), _max_count); + ASSERT_EQ(_child_replica->get_plist()->count(), _max_count); - cleanup_prepare_list(_parent); - cleanup_prepare_list(_child); + cleanup_prepare_list(_parent_replica); + cleanup_prepare_list(_child_replica); cleanup_child_split_context(); } @@ -379,7 +401,7 @@ TEST_F(replica_split_test, learn_states_succeed) test_child_learn_states(); fail::teardown(); - cleanup_prepare_list(_child); + cleanup_prepare_list(_child_replica); cleanup_child_split_context(); } @@ -394,7 +416,7 @@ TEST_F(replica_split_test, learn_states_with_replay_private_log_error) test_child_learn_states(); fail::teardown(); - cleanup_prepare_list(_child); + cleanup_prepare_list(_child_replica); cleanup_child_split_context(); } @@ -409,7 +431,7 @@ TEST_F(replica_split_test, child_apply_private_logs_succeed) test_child_apply_private_logs(); fail::teardown(); - cleanup_prepare_list(_child); + cleanup_prepare_list(_child_replica); cleanup_child_split_context(); } @@ -426,7 +448,7 @@ TEST_F(replica_split_test, catch_up_succeed_with_all_states_learned) partition_split_context split_context = get_split_context(); ASSERT_EQ(split_context.is_caught_up, true); - cleanup_prepare_list(_child); + cleanup_prepare_list(_child_replica); cleanup_child_split_context(); } @@ -444,7 +466,7 @@ TEST_F(replica_split_test, catch_up_succeed_with_learn_in_memory_mutations) partition_split_context split_context = get_split_context(); ASSERT_EQ(split_context.is_caught_up, true); - cleanup_prepare_list(_child); + cleanup_prepare_list(_child_replica); cleanup_child_split_context(); } @@ -496,8 +518,8 @@ TEST_F(replica_split_test, register_child_test) test_register_child_on_meta(); fail::teardown(); - ASSERT_EQ(_parent->status(), partition_status::PS_INACTIVE); - ASSERT_EQ(get_partition_version(_parent), -1); + ASSERT_EQ(_parent_replica->status(), partition_status::PS_INACTIVE); + ASSERT_EQ(_parent_split_mgr->get_partition_version(), -1); } TEST_F(replica_split_test, register_child_reply_with_wrong_status) @@ -506,7 +528,7 @@ TEST_F(replica_split_test, register_child_reply_with_wrong_status) mock_child_split_context(_parent_pid, true, true); test_on_register_child_rely(partition_status::PS_PRIMARY, ERR_OK); - primary_context parent_primary_states = get_replica_primary_context(_parent); + primary_context parent_primary_states = get_replica_primary_context(_parent_replica); ASSERT_EQ(parent_primary_states.register_child_task, nullptr); } @@ -517,7 +539,7 @@ TEST_F(replica_split_test, register_child_reply_with_child_registered) test_on_register_child_rely(partition_status::PS_INACTIVE, ERR_CHILD_REGISTERED); - primary_context parent_primary_states = get_replica_primary_context(_parent); + primary_context parent_primary_states = get_replica_primary_context(_parent_replica); ASSERT_EQ(parent_primary_states.register_child_task, nullptr); ASSERT_TRUE(is_parent_not_in_split()); } diff --git a/src/replica/partition_split/test/run.sh b/src/replica/partition_split/test/run.sh new file mode 100755 index 0000000000..90baf39a67 --- /dev/null +++ b/src/replica/partition_split/test/run.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +./dsn_replica_split_test + +if [ $? -ne 0 ]; then + tail -n 100 data/log/log.1.txt + if [ -f core ]; then + gdb ./dsn_replica_split_test core -ex "bt" + fi + exit 1 +fi diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 246f6ae841..85b7781256 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -32,6 +32,7 @@ #include "backup/replica_backup_manager.h" #include "backup/cold_backup_context.h" #include "bulk_load/replica_bulk_loader.h" +#include "partition_split/replica_split_manager.h" #include #include @@ -71,8 +72,8 @@ replica::replica( _options = &stub->options(); init_state(); _config.pid = gpid; - _partition_version = app.partition_count - 1; _bulk_loader = make_unique(this); + _split_mgr = make_unique(this); std::string counter_str = fmt::format("private.log.size(MB)@{}", gpid); _counter_private_log_size.init_app_counter( @@ -422,6 +423,8 @@ void replica::close() _bulk_loader.reset(); + _split_mgr.reset(); + ddebug("%s: replica closed, time_used = %" PRIu64 "ms", name(), dsn_now_ms() - start_time); } diff --git a/src/replica/replica.h b/src/replica/replica.h index 91182d8ec2..83dd756d19 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -64,6 +64,7 @@ class replica_stub; class replica_duplicator_manager; class replica_backup_manager; class replica_bulk_loader; +class replica_split_manager; class cold_backup_context; typedef dsn::ref_ptr cold_backup_context_ptr; @@ -200,6 +201,11 @@ class replica : public serverlet, public ref_counter, public replica_ba : 0; } + // + // Partition Split + // + replica_split_manager *get_split_manager() const { return _split_mgr.get(); } + // // Statistics // @@ -309,6 +315,7 @@ class replica : public serverlet, public ref_counter, public replica_ba bool update_configuration(const partition_configuration &config); bool update_local_configuration(const replica_configuration &config, bool same_ballot = false); + error_code update_init_info_ballot_and_decree(); ///////////////////////////////////////////////////////////////// // group check @@ -374,76 +381,6 @@ class replica : public serverlet, public ref_counter, public replica_ba std::string query_compact_state() const; - ///////////////////////////////////////////////////////////////// - // partition split - // parent partition create child - void on_add_child(const group_check_request &request); - - // child replica initialize config and state info - void child_init_replica(gpid parent_gpid, dsn::rpc_address primary_address, ballot init_ballot); - - void parent_prepare_states(const std::string &dir); - - // child copy parent prepare list and call child_learn_states - void child_copy_prepare_list(learn_state lstate, - std::vector mutation_list, - std::vector plog_files, - uint64_t total_file_size, - std::shared_ptr plist); - - // child learn states(including checkpoint, private logs, in-memory mutations) - void child_learn_states(learn_state lstate, - std::vector mutation_list, - std::vector plog_files, - uint64_t total_file_size, - decree last_committed_decree); - - // TODO(heyuchen): total_file_size is used for split perf-counter in further pull request - // Applies mutation logs that were learned from the parent of this child. - // This stage follows after that child applies the checkpoint of parent, and begins to apply the - // mutations. - // \param last_committed_decree: parent's last_committed_decree when the checkpoint was - // generated. - error_code child_apply_private_logs(std::vector plog_files, - std::vector mutation_list, - uint64_t total_file_size, - decree last_committed_decree); - - // child catch up parent states while executing async learn task - void child_catch_up_states(); - - // child send notification to primary parent when it finish async learn - void child_notify_catch_up(); - - // primary parent handle child catch_up request - void parent_handle_child_catch_up(const notify_catch_up_request &request, - notify_cacth_up_response &response); - - // primary parent check if sync_point has been committed - // sync_point is the first decree after parent send write request to child synchronously - void parent_check_sync_point_commit(decree sync_point); - - // primary parent register children on meta_server - void register_child_on_meta(ballot b); - void on_register_child_on_meta_reply(dsn::error_code ec, - const register_child_request &request, - const register_child_response &response); - // primary sends register request to meta_server - void parent_send_register_request(const register_child_request &request); - - // child partition has been registered on meta_server, could be active - void child_partition_active(const partition_configuration &config); - - // return true if parent status is valid - bool parent_check_states(); - - // parent reset child information when partition split failed - void parent_cleanup_split_context(); - // child suicide when partition split failed - void child_handle_split_error(const std::string &error_msg); - // child handle error while async learn parent states - void child_handle_async_learn_error(); - void init_table_level_latency_counters(); private: @@ -459,6 +396,7 @@ class replica : public serverlet, public ref_counter, public replica_ba friend class replica_test; friend class replica_backup_manager; friend class replica_bulk_loader; + friend class replica_split_manager; // replica configuration, updated by update_local_configuration ONLY replica_configuration _config; @@ -537,23 +475,15 @@ class replica : public serverlet, public ref_counter, public replica_ba // backup std::unique_ptr _backup_mgr; - // partition split - // _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition - // _child_gpid.app_id = 0 for parent partition not in partition split and child partition - dsn::gpid _child_gpid{0, 0}; - // ballot when starting partition split and split will stop if ballot changed - // _child_init_ballot = 0 if partition not in partition split - ballot _child_init_ballot{0}; - // in normal cases, _partition_version = partition_count-1 - // when replica reject client read write request, partition_version = -1 - std::atomic _partition_version; - // bulk load std::unique_ptr _bulk_loader; // if replica in bulk load ingestion 2pc, will reject other write requests bool _is_bulk_load_ingestion{false}; uint64_t _bulk_load_ingestion_start_time_ms{0}; + // partition split + std::unique_ptr _split_mgr; + // perf counters perf_counter_wrapper _counter_private_log_size; perf_counter_wrapper _counter_recent_write_throttling_delay_count; diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp index dfefdd5859..a34f206d32 100644 --- a/src/replica/replica_chkpt.cpp +++ b/src/replica/replica_chkpt.cpp @@ -38,6 +38,7 @@ #include "mutation_log.h" #include "replica_stub.h" #include "duplication/replica_duplicator_manager.h" +#include "partition_split/replica_split_manager.h" #include #include #include @@ -373,11 +374,11 @@ void replica::catch_up_with_private_logs(partition_status::type s) get_gpid().thread_hash()); _potential_secondary_states.learn_remote_files_completed_task->enqueue(); } else if (s == partition_status::PS_PARTITION_SPLIT) { - _split_states.async_learn_task = - tasking::enqueue(LPC_PARTITION_SPLIT, - tracker(), - std::bind(&replica::child_catch_up_states, this), - get_gpid().thread_hash()); + _split_states.async_learn_task = tasking::enqueue( + LPC_PARTITION_SPLIT, + tracker(), + std::bind(&replica_split_manager::child_catch_up_states, get_split_manager()), + get_gpid().thread_hash()); } else { _secondary_states.checkpoint_completed_task = tasking::create_task(LPC_CHECKPOINT_REPLICA_COMPLETED, diff --git a/src/replica/replica_config.cpp b/src/replica/replica_config.cpp index 0bed75fbc9..07e76be842 100644 --- a/src/replica/replica_config.cpp +++ b/src/replica/replica_config.cpp @@ -1089,5 +1089,10 @@ void replica::replay_prepare_list() } } +error_code replica::update_init_info_ballot_and_decree() +{ + return _app->update_init_info_ballot_and_decree(this); +} + } // namespace replication } // namespace dsn diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 22d3ccd35d..56be153d9c 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -40,6 +40,7 @@ #include "bulk_load/replica_bulk_loader.h" #include "duplication/duplication_sync_timer.h" #include "backup/replica_backup_manager.h" +#include "partition_split/replica_split_manager.h" #include #include @@ -2667,16 +2668,17 @@ void replica_stub::create_child_replica(rpc_address primary_address, ddebug_f("create child replica ({}) succeed", child_gpid); tasking::enqueue(LPC_PARTITION_SPLIT, child_replica->tracker(), - std::bind(&replica::child_init_replica, - child_replica, + std::bind(&replica_split_manager::child_init_replica, + child_replica->get_split_manager(), parent_gpid, primary_address, init_ballot), child_gpid.thread_hash()); } else { dwarn_f("failed to create child replica ({}), ignore it and wait next run", child_gpid); - split_replica_error_handler(parent_gpid, - [](replica_ptr r) { r->_child_gpid.set_app_id(0); }); + split_replica_error_handler( + parent_gpid, + std::bind(&replica_split_manager::parent_cleanup_split_context, std::placeholders::_1)); } } @@ -2732,7 +2734,7 @@ replica_stub::split_replica_exec(dsn::task_code code, gpid pid, local_execution if (replica && handler) { tasking::enqueue(code, replica.get()->tracker(), - [handler, replica]() { handler(replica); }, + [handler, replica]() { handler(replica->get_split_manager()); }, pid.thread_hash()); return ERR_OK; } @@ -2747,7 +2749,7 @@ void replica_stub::on_notify_primary_split_catch_up(notify_catch_up_rpc rpc) notify_cacth_up_response &response = rpc.response(); replica_ptr replica = get_replica(request.parent_gpid); if (replica != nullptr) { - replica->parent_handle_child_catch_up(request, response); + replica->get_split_manager()->parent_handle_child_catch_up(request, response); } else { response.err = ERR_OBJECT_NOT_FOUND; } diff --git a/src/replica/replica_stub.h b/src/replica/replica_stub.h index 74c5c93dae..d6ee82c5c7 100644 --- a/src/replica/replica_stub.h +++ b/src/replica/replica_stub.h @@ -74,6 +74,8 @@ typedef dsn::ref_ptr replica_stub_ptr; class duplication_sync_timer; class replica_bulk_loader; +class replica_split_manager; + class replica_stub : public serverlet, public ref_counter { public: @@ -194,7 +196,7 @@ class replica_stub : public serverlet, public ref_counter replica_ptr create_child_replica_if_not_found(gpid child_pid, app_info *app, const std::string &parent_dir); - typedef std::function local_execution; + typedef std::function local_execution; // This function is used for partition split, caller(replica) // parent/child may want child/parent to execute function during partition split @@ -286,6 +288,7 @@ class replica_stub : public serverlet, public ref_counter friend class replica_duplicator; friend class replica_http_service; friend class replica_bulk_loader; + friend class replica_split_manager; friend class mock_replica_stub; friend class duplication_sync_timer; diff --git a/src/replica/test/mock_utils.h b/src/replica/test/mock_utils.h index 9c7fe9d5c2..e976977c2b 100644 --- a/src/replica/test/mock_utils.h +++ b/src/replica/test/mock_utils.h @@ -135,8 +135,6 @@ class mock_replica : public replica /// helper functions void set_replica_config(replica_configuration &config) { _config = config; } void set_partition_status(partition_status::type status) { _config.status = status; } - void set_child_gpid(gpid pid) { _child_gpid = pid; } - void set_init_child_ballot(ballot b) { _child_init_ballot = b; } void set_last_committed_decree(decree d) { _prepare_list->reset(d); } prepare_list *get_plist() { return _prepare_list; } void prepare_list_truncate(decree d) { _prepare_list->truncate(d); } From 81d94d339b794b7a3c14f30290de3b12a248355e Mon Sep 17 00:00:00 2001 From: heyuchen Date: Fri, 11 Sep 2020 14:52:33 +0800 Subject: [PATCH 2/3] update directory name --- src/replica/CMakeLists.txt | 4 ++-- src/replica/replica.cpp | 2 +- src/replica/replica_chkpt.cpp | 2 +- src/replica/replica_stub.cpp | 2 +- .../{partition_split => split}/replica_split_manager.cpp | 0 .../{partition_split => split}/replica_split_manager.h | 0 src/replica/{partition_split => split}/test/CMakeLists.txt | 0 src/replica/{partition_split => split}/test/config-test.ini | 0 src/replica/{partition_split => split}/test/main.cpp | 0 .../{partition_split => split}/test/replica_split_test.cpp | 2 +- src/replica/{partition_split => split}/test/run.sh | 0 11 files changed, 6 insertions(+), 6 deletions(-) rename src/replica/{partition_split => split}/replica_split_manager.cpp (100%) rename src/replica/{partition_split => split}/replica_split_manager.h (100%) rename src/replica/{partition_split => split}/test/CMakeLists.txt (100%) rename src/replica/{partition_split => split}/test/config-test.ini (100%) rename src/replica/{partition_split => split}/test/main.cpp (100%) rename src/replica/{partition_split => split}/test/replica_split_test.cpp (99%) rename src/replica/{partition_split => split}/test/run.sh (100%) diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index 802d54cd11..6de5e0e48b 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -14,7 +14,7 @@ set(BACKUP_SRC backup/replica_backup_manager.cpp set(BULK_LOAD_SRC bulk_load/replica_bulk_loader.cpp) -set(SPLIT_SRC partition_split/replica_split_manager.cpp) +set(SPLIT_SRC split/replica_split_manager.cpp) # Source files under CURRENT project directory will be automatically included. # You can manually set MY_PROJ_SRC to include source files under other directories. @@ -51,6 +51,6 @@ dsn_add_shared_library() add_subdirectory(duplication/test) add_subdirectory(backup/test) add_subdirectory(bulk_load/test) -add_subdirectory(partition_split/test) +add_subdirectory(split/test) add_subdirectory(storage) add_subdirectory(test) diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 85b7781256..e0de73e2e5 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -32,7 +32,7 @@ #include "backup/replica_backup_manager.h" #include "backup/cold_backup_context.h" #include "bulk_load/replica_bulk_loader.h" -#include "partition_split/replica_split_manager.h" +#include "split/replica_split_manager.h" #include #include diff --git a/src/replica/replica_chkpt.cpp b/src/replica/replica_chkpt.cpp index a34f206d32..96e5ec1107 100644 --- a/src/replica/replica_chkpt.cpp +++ b/src/replica/replica_chkpt.cpp @@ -38,7 +38,7 @@ #include "mutation_log.h" #include "replica_stub.h" #include "duplication/replica_duplicator_manager.h" -#include "partition_split/replica_split_manager.h" +#include "split/replica_split_manager.h" #include #include #include diff --git a/src/replica/replica_stub.cpp b/src/replica/replica_stub.cpp index 56be153d9c..e057ec4e5c 100644 --- a/src/replica/replica_stub.cpp +++ b/src/replica/replica_stub.cpp @@ -40,7 +40,7 @@ #include "bulk_load/replica_bulk_loader.h" #include "duplication/duplication_sync_timer.h" #include "backup/replica_backup_manager.h" -#include "partition_split/replica_split_manager.h" +#include "split/replica_split_manager.h" #include #include diff --git a/src/replica/partition_split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp similarity index 100% rename from src/replica/partition_split/replica_split_manager.cpp rename to src/replica/split/replica_split_manager.cpp diff --git a/src/replica/partition_split/replica_split_manager.h b/src/replica/split/replica_split_manager.h similarity index 100% rename from src/replica/partition_split/replica_split_manager.h rename to src/replica/split/replica_split_manager.h diff --git a/src/replica/partition_split/test/CMakeLists.txt b/src/replica/split/test/CMakeLists.txt similarity index 100% rename from src/replica/partition_split/test/CMakeLists.txt rename to src/replica/split/test/CMakeLists.txt diff --git a/src/replica/partition_split/test/config-test.ini b/src/replica/split/test/config-test.ini similarity index 100% rename from src/replica/partition_split/test/config-test.ini rename to src/replica/split/test/config-test.ini diff --git a/src/replica/partition_split/test/main.cpp b/src/replica/split/test/main.cpp similarity index 100% rename from src/replica/partition_split/test/main.cpp rename to src/replica/split/test/main.cpp diff --git a/src/replica/partition_split/test/replica_split_test.cpp b/src/replica/split/test/replica_split_test.cpp similarity index 99% rename from src/replica/partition_split/test/replica_split_test.cpp rename to src/replica/split/test/replica_split_test.cpp index df6389e9f3..387f9020fa 100644 --- a/src/replica/partition_split/test/replica_split_test.cpp +++ b/src/replica/split/test/replica_split_test.cpp @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "replica/partition_split/replica_split_manager.h" +#include "replica/split/replica_split_manager.h" #include "replica/test/replica_test_base.h" #include diff --git a/src/replica/partition_split/test/run.sh b/src/replica/split/test/run.sh similarity index 100% rename from src/replica/partition_split/test/run.sh rename to src/replica/split/test/run.sh From b64c0b495922b4e2a6ceb7e25012cca2d2d1db3f Mon Sep 17 00:00:00 2001 From: heyuchen Date: Wed, 16 Sep 2020 14:09:59 +0800 Subject: [PATCH 3/3] small refactor --- src/replica/split/replica_split_manager.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/replica/split/replica_split_manager.cpp b/src/replica/split/replica_split_manager.cpp index 529cc08922..07afabe849 100644 --- a/src/replica/split/replica_split_manager.cpp +++ b/src/replica/split/replica_split_manager.cpp @@ -14,6 +14,7 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. + #include "replica_split_manager.h" #include