Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

refactor(split): move replica split functions into replica_split_manager class #624

Merged
merged 6 commits into from
Sep 17, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/linux/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ echo "################################# start testing ##########################
if [ -z "$TEST_MODULE" ]
then
# supported test module
TEST_MODULE="dsn_runtime_tests,dsn_utils_tests,dsn_perf_counter_test,dsn.zookeeper.tests,dsn_aio_test,dsn.failure_detector.tests,dsn_meta_state_tests,dsn_nfs_test,dsn_block_service_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test,dsn_replica_bulk_load_test"
TEST_MODULE="dsn_runtime_tests,dsn_utils_tests,dsn_perf_counter_test,dsn.zookeeper.tests,dsn_aio_test,dsn.failure_detector.tests,dsn_meta_state_tests,dsn_nfs_test,dsn_block_service_test,dsn.replication.simple_kv,dsn.rep_tests.simple_kv,dsn.meta.test,dsn.replica.test,dsn_http_test,dsn_replica_dup_test,dsn_replica_backup_test,dsn_replica_bulk_load_test,dsn_replica_split_test"
fi

echo "TEST_MODULE=$TEST_MODULE"
Expand Down
4 changes: 4 additions & 0 deletions src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ set(BACKUP_SRC backup/replica_backup_manager.cpp

set(BULK_LOAD_SRC bulk_load/replica_bulk_loader.cpp)

set(SPLIT_SRC split/replica_split_manager.cpp)

# Source files under CURRENT project directory will be automatically included.
# You can manually set MY_PROJ_SRC to include source files under other directories.
set(MY_PROJ_SRC
${DUPLICATION_SRC}
${BACKUP_SRC}
${BULK_LOAD_SRC}
${SPLIT_SRC}
)

# Search mode for source files under CURRENT project directory?
Expand Down Expand Up @@ -50,5 +53,6 @@ dsn_add_shared_library()
add_subdirectory(duplication/test)
add_subdirectory(backup/test)
add_subdirectory(bulk_load/test)
add_subdirectory(split/test)
add_subdirectory(storage)
add_subdirectory(test)
5 changes: 4 additions & 1 deletion src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include "backup/replica_backup_manager.h"
#include "backup/cold_backup_context.h"
#include "bulk_load/replica_bulk_loader.h"
#include "split/replica_split_manager.h"

#include <dsn/utils/latency_tracer.h>
#include <dsn/cpp/json_helper.h>
Expand Down Expand Up @@ -71,8 +72,8 @@ replica::replica(
_options = &stub->options();
init_state();
_config.pid = gpid;
_partition_version = app.partition_count - 1;
_bulk_loader = make_unique<replica_bulk_loader>(this);
_split_mgr = make_unique<replica_split_manager>(this);

std::string counter_str = fmt::format("private.log.size(MB)@{}", gpid);
_counter_private_log_size.init_app_counter(
Expand Down Expand Up @@ -422,6 +423,8 @@ void replica::close()

_bulk_loader.reset();

_split_mgr.reset();

ddebug("%s: replica closed, time_used = %" PRIu64 "ms", name(), dsn_now_ms() - start_time);
}

Expand Down
92 changes: 11 additions & 81 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class replica_stub;
class replica_duplicator_manager;
class replica_backup_manager;
class replica_bulk_loader;
class replica_split_manager;

class cold_backup_context;
typedef dsn::ref_ptr<cold_backup_context> cold_backup_context_ptr;
Expand Down Expand Up @@ -200,6 +201,11 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
: 0;
}

//
// Partition Split
//
replica_split_manager *get_split_manager() const { return _split_mgr.get(); }

//
// Statistics
//
Expand Down Expand Up @@ -309,6 +315,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

bool update_configuration(const partition_configuration &config);
bool update_local_configuration(const replica_configuration &config, bool same_ballot = false);
error_code update_init_info_ballot_and_decree();

/////////////////////////////////////////////////////////////////
// group check
Expand Down Expand Up @@ -374,76 +381,6 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

std::string query_compact_state() const;

/////////////////////////////////////////////////////////////////
// partition split
// parent partition create child
void on_add_child(const group_check_request &request);

// child replica initialize config and state info
void child_init_replica(gpid parent_gpid, dsn::rpc_address primary_address, ballot init_ballot);

void parent_prepare_states(const std::string &dir);

// child copy parent prepare list and call child_learn_states
void child_copy_prepare_list(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
std::shared_ptr<prepare_list> plist);

// child learn states(including checkpoint, private logs, in-memory mutations)
void child_learn_states(learn_state lstate,
std::vector<mutation_ptr> mutation_list,
std::vector<std::string> plog_files,
uint64_t total_file_size,
decree last_committed_decree);

// TODO(heyuchen): total_file_size is used for split perf-counter in further pull request
// Applies mutation logs that were learned from the parent of this child.
// This stage follows after that child applies the checkpoint of parent, and begins to apply the
// mutations.
// \param last_committed_decree: parent's last_committed_decree when the checkpoint was
// generated.
error_code child_apply_private_logs(std::vector<std::string> plog_files,
std::vector<mutation_ptr> mutation_list,
uint64_t total_file_size,
decree last_committed_decree);

// child catch up parent states while executing async learn task
void child_catch_up_states();

// child send notification to primary parent when it finish async learn
void child_notify_catch_up();

// primary parent handle child catch_up request
void parent_handle_child_catch_up(const notify_catch_up_request &request,
notify_cacth_up_response &response);

// primary parent check if sync_point has been committed
// sync_point is the first decree after parent send write request to child synchronously
void parent_check_sync_point_commit(decree sync_point);

// primary parent register children on meta_server
void register_child_on_meta(ballot b);
void on_register_child_on_meta_reply(dsn::error_code ec,
const register_child_request &request,
const register_child_response &response);
// primary sends register request to meta_server
void parent_send_register_request(const register_child_request &request);

// child partition has been registered on meta_server, could be active
void child_partition_active(const partition_configuration &config);

// return true if parent status is valid
bool parent_check_states();

// parent reset child information when partition split failed
void parent_cleanup_split_context();
// child suicide when partition split failed
void child_handle_split_error(const std::string &error_msg);
// child handle error while async learn parent states
void child_handle_async_learn_error();

void init_table_level_latency_counters();

private:
Expand All @@ -459,6 +396,7 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_test;
friend class replica_backup_manager;
friend class replica_bulk_loader;
friend class replica_split_manager;

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down Expand Up @@ -537,23 +475,15 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// backup
std::unique_ptr<replica_backup_manager> _backup_mgr;

// partition split
// _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition
// _child_gpid.app_id = 0 for parent partition not in partition split and child partition
dsn::gpid _child_gpid{0, 0};
// ballot when starting partition split and split will stop if ballot changed
// _child_init_ballot = 0 if partition not in partition split
ballot _child_init_ballot{0};
// in normal cases, _partition_version = partition_count-1
// when replica reject client read write request, partition_version = -1
std::atomic<int32_t> _partition_version;

// bulk load
std::unique_ptr<replica_bulk_loader> _bulk_loader;
// if replica in bulk load ingestion 2pc, will reject other write requests
bool _is_bulk_load_ingestion{false};
uint64_t _bulk_load_ingestion_start_time_ms{0};

// partition split
std::unique_ptr<replica_split_manager> _split_mgr;

// perf counters
perf_counter_wrapper _counter_private_log_size;
perf_counter_wrapper _counter_recent_write_throttling_delay_count;
Expand Down
11 changes: 6 additions & 5 deletions src/replica/replica_chkpt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "duplication/replica_duplicator_manager.h"
#include "split/replica_split_manager.h"
#include <dsn/utility/filesystem.h>
#include <dsn/utility/chrono_literals.h>
#include <dsn/dist/replication/replication_app_base.h>
Expand Down Expand Up @@ -373,11 +374,11 @@ void replica::catch_up_with_private_logs(partition_status::type s)
get_gpid().thread_hash());
_potential_secondary_states.learn_remote_files_completed_task->enqueue();
} else if (s == partition_status::PS_PARTITION_SPLIT) {
_split_states.async_learn_task =
tasking::enqueue(LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica::child_catch_up_states, this),
get_gpid().thread_hash());
_split_states.async_learn_task = tasking::enqueue(
LPC_PARTITION_SPLIT,
tracker(),
std::bind(&replica_split_manager::child_catch_up_states, get_split_manager()),
get_gpid().thread_hash());
} else {
_secondary_states.checkpoint_completed_task =
tasking::create_task(LPC_CHECKPOINT_REPLICA_COMPLETED,
Expand Down
5 changes: 5 additions & 0 deletions src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1089,5 +1089,10 @@ void replica::replay_prepare_list()
}
}

error_code replica::update_init_info_ballot_and_decree()
{
return _app->update_init_info_ballot_and_decree(this);
}

} // namespace replication
} // namespace dsn
14 changes: 8 additions & 6 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "bulk_load/replica_bulk_loader.h"
#include "duplication/duplication_sync_timer.h"
#include "backup/replica_backup_server.h"
#include "split/replica_split_manager.h"

#include <dsn/cpp/json_helper.h>
#include <dsn/utility/filesystem.h>
Expand Down Expand Up @@ -2618,16 +2619,17 @@ void replica_stub::create_child_replica(rpc_address primary_address,
ddebug_f("create child replica ({}) succeed", child_gpid);
tasking::enqueue(LPC_PARTITION_SPLIT,
child_replica->tracker(),
std::bind(&replica::child_init_replica,
child_replica,
std::bind(&replica_split_manager::child_init_replica,
child_replica->get_split_manager(),
parent_gpid,
primary_address,
init_ballot),
child_gpid.thread_hash());
} else {
dwarn_f("failed to create child replica ({}), ignore it and wait next run", child_gpid);
split_replica_error_handler(parent_gpid,
[](replica_ptr r) { r->_child_gpid.set_app_id(0); });
split_replica_error_handler(
parent_gpid,
std::bind(&replica_split_manager::parent_cleanup_split_context, std::placeholders::_1));
}
}

Expand Down Expand Up @@ -2683,7 +2685,7 @@ replica_stub::split_replica_exec(dsn::task_code code, gpid pid, local_execution
if (replica && handler) {
tasking::enqueue(code,
replica.get()->tracker(),
[handler, replica]() { handler(replica); },
[handler, replica]() { handler(replica->get_split_manager()); },
pid.thread_hash());
return ERR_OK;
}
Expand All @@ -2698,7 +2700,7 @@ void replica_stub::on_notify_primary_split_catch_up(notify_catch_up_rpc rpc)
notify_cacth_up_response &response = rpc.response();
replica_ptr replica = get_replica(request.parent_gpid);
if (replica != nullptr) {
replica->parent_handle_child_catch_up(request, response);
replica->get_split_manager()->parent_handle_child_catch_up(request, response);
} else {
response.err = ERR_OBJECT_NOT_FOUND;
}
Expand Down
5 changes: 4 additions & 1 deletion src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ typedef dsn::ref_ptr<replica_stub> replica_stub_ptr;
class duplication_sync_timer;
class replica_bulk_loader;
class replica_backup_server;
class replica_split_manager;

class replica_stub : public serverlet<replica_stub>, public ref_counter
{
public:
Expand Down Expand Up @@ -193,7 +195,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
replica_ptr
create_child_replica_if_not_found(gpid child_pid, app_info *app, const std::string &parent_dir);

typedef std::function<void(::dsn::replication::replica *rep)> local_execution;
typedef std::function<void(replica_split_manager *split_mgr)> local_execution;

// This function is used for partition split, caller(replica)
// parent/child may want child/parent to execute function during partition split
Expand Down Expand Up @@ -285,6 +287,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class replica_duplicator;
friend class replica_http_service;
friend class replica_bulk_loader;
friend class replica_split_manager;

friend class mock_replica_stub;
friend class duplication_sync_timer;
Expand Down
Loading