Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor(split): move replica split functions into replica_split_manager class #624

Merged
merged 6 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/linux/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ echo "################################# start testing ##########################
if [ -z "$TEST_MODULE" ]
then
# supported test module
TEST_MODULE="dsn_runtime_tests,dsn_utils_tests,dsn_perf_counter_test,dsn.zookeeper.tests,dsn_aio_test,dsn.failure_detector.tests,dsn_meta_state_tests,dsn_nfs_test,dsn_block_service_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test,dsn_replica_bulk_load_test"
TEST_MODULE="dsn_runtime_tests,dsn_utils_tests,dsn_perf_counter_test,dsn.zookeeper.tests,dsn_aio_test,dsn.failure_detector.tests,dsn_meta_state_tests,dsn_nfs_test,dsn_block_service_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test,dsn_replica_bulk_load_test,dsn_replica_split_test"
fi

echo "TEST_MODULE=$TEST_MODULE"
Expand Down
4 changes: 4 additions & 0 deletions src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ set(BACKUP_SRC backup/replica_backup_manager.cpp

set(BULK_LOAD_SRC bulk_load/replica_bulk_loader.cpp)

set(SPLIT_SRC split/replica_split_manager.cpp)

# Source files under CURRENT project directory will be automatically included.
# You can manually set MY_PROJ_SRC to include source files under other directories.
set(MY_PROJ_SRC
${DUPLICATION_SRC}
${BACKUP_SRC}
${BULK_LOAD_SRC}
${SPLIT_SRC}
)

# Search mode for source files under CURRENT project directory?
Expand Down Expand Up @@ -50,5 +53,6 @@ dsn_add_shared_library()
add_subdirectory(duplication/test)
add_subdirectory(backup/test)
add_subdirectory(bulk_load/test)
add_subdirectory(split/test)
add_subdirectory(storage)
add_subdirectory(test)
5 changes: 4 additions & 1 deletion src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "backup/replica_backup_manager.h"
#include "backup/cold_backup_context.h"
#include "bulk_load/replica_bulk_loader.h"
#include "split/replica_split_manager.h"

#include <dsn/utils/latency_tracer.h>
#include <dsn/cpp/json_helper.h>
Expand Down Expand Up @@ -71,8 +72,8 @@ replica::replica(
_options = &stub->options();
init_state();
_config.pid = gpid;
_partition_version = app.partition_count - 1;
_bulk_loader = make_unique<replica_bulk_loader>(this);
_split_mgr = make_unique<replica_split_manager>(this);

std::string counter_str = fmt::format("private.log.size(MB)@{}", gpid);
_counter_private_log_size.init_app_counter(
Expand Down Expand Up @@ -422,6 +423,8 @@ void replica::close()

_bulk_loader.reset();

_split_mgr.reset();

ddebug("%s: replica closed, time_used = %" PRIu64 "ms", name(), dsn_now_ms() - start_time);
}

Expand Down
92 changes: 11 additions & 81 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class replica_stub;
class replica_duplicator_manager;
class replica_backup_manager;
class replica_bulk_loader;
class replica_split_manager;

class cold_backup_context;
typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
Expand Down Expand Up @@ -200,6 +201,11 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
: 0;
}

//
// Partition Split
//
replica_split_manager *get_split_manager() const { return _split_mgr.get(); }

//
// Statistics
//
Expand Down Expand Up @@ -309,6 +315,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

bool update_configuration(const partition_configuration &config);
bool update_local_configuration(const replica_configuration &config, bool same_ballot = false);
error_code update_init_info_ballot_and_decree();

/////////////////////////////////////////////////////////////////
// group check
Expand Down Expand Up @@ -374,76 +381,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

std::string query_compact_state() const;

/////////////////////////////////////////////////////////////////
// partition split
// parent partition create child
void on_add_child(const group_check_request &request);

// child replica initialize config and state info
void child_init_replica(gpid parent_gpid, dsn::rpc_address primary_address, ballot init_ballot);

void parent_prepare_states(const std::string &dir);

// child copy parent prepare list and call child_learn_states
void child_copy_prepare_list(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist);

// child learn states(including checkpoint, private logs, in-memory mutations)
void child_learn_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree);

// TODO(heyuchen): total_file_size is used for split perf-counter in further pull request
// Applies mutation logs that were learned from the parent of this child.
// This stage follows after that child applies the checkpoint of parent, and begins to apply the
// mutations.
// \param last_committed_decree: parent's last_committed_decree when the checkpoint was
// generated.
error_code child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree);

// child catch up parent states while executing async learn task
void child_catch_up_states();

// child send notification to primary parent when it finish async learn
void child_notify_catch_up();

// primary parent handle child catch_up request
void parent_handle_child_catch_up(const notify_catch_up_request &request,
notify_cacth_up_response &response);

// primary parent check if sync_point has been committed
// sync_point is the first decree after parent send write request to child synchronously
void parent_check_sync_point_commit(decree sync_point);

// primary parent register children on meta_server
void register_child_on_meta(ballot b);
void on_register_child_on_meta_reply(dsn::error_code ec,
const register_child_request &request,
const register_child_response &response);
// primary sends register request to meta_server
void parent_send_register_request(const register_child_request &request);

// child partition has been registered on meta_server, could be active
void child_partition_active(const partition_configuration &config);

// return true if parent status is valid
bool parent_check_states();

// parent reset child information when partition split failed
void parent_cleanup_split_context();
// child suicide when partition split failed
void child_handle_split_error(const std::string &error_msg);
// child handle error while async learn parent states
void child_handle_async_learn_error();

void init_table_level_latency_counters();

private:
Expand All @@ -459,6 +396,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_test;
friend class replica_backup_manager;
friend class replica_bulk_loader;
friend class replica_split_manager;

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down Expand Up @@ -537,23 +475,15 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// backup
std::unique_ptr<replica_backup_manager> _backup_mgr;

// partition split
// _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition
// _child_gpid.app_id = 0 for parent partition not in partition split and child partition
dsn::gpid _child_gpid{0, 0};
// ballot when starting partition split and split will stop if ballot changed
// _child_init_ballot = 0 if partition not in partition split
ballot _child_init_ballot{0};
// in normal cases, _partition_version = partition_count-1
// when replica reject client read write request, partition_version = -1
std::atomic<int32_t> _partition_version;

// bulk load
std::unique_ptr<replica_bulk_loader> _bulk_loader;
// if replica in bulk load ingestion 2pc, will reject other write requests
bool _is_bulk_load_ingestion{false};
uint64_t _bulk_load_ingestion_start_time_ms{0};

// partition split
std::unique_ptr<replica_split_manager> _split_mgr;

// perf counters
perf_counter_wrapper _counter_private_log_size;
perf_counter_wrapper _counter_recent_write_throttling_delay_count;
Expand Down
11 changes: 6 additions & 5 deletions src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "duplication/replica_duplicator_manager.h"
#include "split/replica_split_manager.h"
#include <dsn/utility/filesystem.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -373,11 +374,11 @@ void replica::catch_up_with_private_logs(partition_status::type s)
get_gpid().thread_hash());
_potential_secondary_states.learn_remote_files_completed_task->enqueue();
} else if (s == partition_status::PS_PARTITION_SPLIT) {
_split_states.async_learn_task =
tasking::enqueue(LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica::child_catch_up_states, this),
get_gpid().thread_hash());
_split_states.async_learn_task = tasking::enqueue(
LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica_split_manager::child_catch_up_states, get_split_manager()),
get_gpid().thread_hash());
} else {
_secondary_states.checkpoint_completed_task =
tasking::create_task(LPC_CHECKPOINT_REPLICA_COMPLETED,
Expand Down
5 changes: 5 additions & 0 deletions src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1089,5 +1089,10 @@ void replica::replay_prepare_list()
}
}

error_code replica::update_init_info_ballot_and_decree()
{
return _app->update_init_info_ballot_and_decree(this);
}

} // namespace replication
} // namespace dsn
14 changes: 8 additions & 6 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "bulk_load/replica_bulk_loader.h"
#include "duplication/duplication_sync_timer.h"
#include "backup/replica_backup_server.h"
#include "split/replica_split_manager.h"

#include <dsn/cpp/json_helper.h>
#include <dsn/utility/filesystem.h>
Expand Down Expand Up @@ -2618,16 +2619,17 @@ void replica_stub::create_child_replica(rpc_address primary_address,
ddebug_f("create child replica ({}) succeed", child_gpid);
tasking::enqueue(LPC_PARTITION_SPLIT,
child_replica->tracker(),
std::bind(&replica::child_init_replica,
child_replica,
std::bind(&replica_split_manager::child_init_replica,
child_replica->get_split_manager(),
parent_gpid,
primary_address,
init_ballot),
child_gpid.thread_hash());
} else {
dwarn_f("failed to create child replica ({}), ignore it and wait next run", child_gpid);
split_replica_error_handler(parent_gpid,
[](replica_ptr r) { r->_child_gpid.set_app_id(0); });
split_replica_error_handler(
parent_gpid,
std::bind(&replica_split_manager::parent_cleanup_split_context, std::placeholders::_1));
}
}

Expand Down Expand Up @@ -2683,7 +2685,7 @@ replica_stub::split_replica_exec(dsn::task_code code, gpid pid, local_execution
if (replica && handler) {
tasking::enqueue(code,
replica.get()->tracker(),
[handler, replica]() { handler(replica); },
[handler, replica]() { handler(replica->get_split_manager()); },
pid.thread_hash());
return ERR_OK;
}
Expand All @@ -2698,7 +2700,7 @@ void replica_stub::on_notify_primary_split_catch_up(notify_catch_up_rpc rpc)
notify_cacth_up_response &response = rpc.response();
replica_ptr replica = get_replica(request.parent_gpid);
if (replica != nullptr) {
replica->parent_handle_child_catch_up(request, response);
replica->get_split_manager()->parent_handle_child_catch_up(request, response);
} else {
response.err = ERR_OBJECT_NOT_FOUND;
}
Expand Down
5 changes: 4 additions & 1 deletion src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ typedef dsn::ref_ptr<replica_stub> replica_stub_ptr;
class duplication_sync_timer;
class replica_bulk_loader;
class replica_backup_server;
class replica_split_manager;

class replica_stub : public serverlet<replica_stub>, public ref_counter
{
public:
Expand Down Expand Up @@ -193,7 +195,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
replica_ptr
create_child_replica_if_not_found(gpid child_pid, app_info *app, const std::string &parent_dir);

typedef std::function<void(::dsn::replication::replica *rep)> local_execution;
typedef std::function<void(replica_split_manager *split_mgr)> local_execution;

// This function is used for partition split, caller(replica)
// parent/child may want child/parent to execute function during partition split
Expand Down Expand Up @@ -285,6 +287,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class replica_duplicator;
friend class replica_http_service;
friend class replica_bulk_loader;
friend class replica_split_manager;

friend class mock_replica_stub;
friend class duplication_sync_timer;
Expand Down
Loading