Skip to content

Commit

Permalink
feat(split): add split_status (#641)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Oct 15, 2020
1 parent 2161300 commit 2c7148d
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 72 deletions.
9 changes: 9 additions & 0 deletions include/dsn/dist/replication/replication_enums.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ENUM_REG(replication::partition_status::PS_ERROR)
ENUM_REG(replication::partition_status::PS_PRIMARY)
ENUM_REG(replication::partition_status::PS_SECONDARY)
ENUM_REG(replication::partition_status::PS_POTENTIAL_SECONDARY)
ENUM_REG(replication::partition_status::PS_PARTITION_SPLIT)
ENUM_END2(replication::partition_status::type, partition_status)

ENUM_BEGIN2(replication::read_semantic::type,
Expand Down Expand Up @@ -98,4 +99,12 @@ ENUM_BEGIN2(replication::detect_action::type, detect_action, replication::detect
ENUM_REG(replication::detect_action::START)
ENUM_REG(replication::detect_action::STOP)
ENUM_END2(replication::detect_action::type, detect_action)

ENUM_BEGIN2(replication::split_status::type, split_status, replication::split_status::NOT_SPLIT)
ENUM_REG(replication::split_status::NOT_SPLIT)
ENUM_REG(replication::split_status::SPLITTING)
ENUM_REG(replication::split_status::PAUSING)
ENUM_REG(replication::split_status::PAUSED)
ENUM_REG(replication::split_status::CANCELING)
ENUM_END2(replication::split_status::type, split_status)
}
14 changes: 14 additions & 0 deletions include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions src/common/replication_types.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,19 @@ struct restore_state
restore_state() : restore_status(dsn::ERR_OK), progress(0), reason() {}
};

// app partition_split states
// when starting partition split, `splitting_count` will be equal to old_partition_count,
// <parent_partition_index, SPLITTING> will be inserted into `status`.
// if partition[0] finish split, `splitting_count` will decrease and <0, SPLITTING> will be removed
// in `status`.
struct split_state
{
int32_t splitting_count;
// partition_index -> split_status
std::map<int32_t, split_status::type> status;
split_state() : splitting_count(0) {}
};

class app_state;
class app_state_helper
{
Expand All @@ -293,6 +306,7 @@ class app_state_helper
std::vector<config_context> contexts;
dsn::message_ex *pending_response;
std::vector<restore_state> restore_states;
split_state split_states;

public:
app_state_helper() : owner(nullptr), partitions_in_progress(0)
Expand Down
23 changes: 11 additions & 12 deletions src/meta/meta_split_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,13 @@ void meta_split_service::start_partition_split(start_split_rpc rpc)
return;
}

for (const auto &partition_config : app->partitions) {
// partition already during split
if (partition_config.ballot < 0) {
response.err = ERR_BUSY;
dwarn_f("app is already during partition split, client({}) sent repeated split "
"request: app({}), new_partition_count({})",
rpc.remote_address().to_string(),
request.app_name,
request.new_partition_count);
return;
}
if (app->helpers->split_states.splitting_count > 0) {
response.err = ERR_BUSY;
auto err_msg =
fmt::format("app({}) is already executing partition split", request.app_name);
derror_f("{}", err_msg);
response.hint_msg = err_msg;
return;
}
}

Expand All @@ -101,15 +97,18 @@ void meta_split_service::do_start_partition_split(std::shared_ptr<app_state> app
app->partition_count * 2);

zauto_write_lock l(app_lock());
app->helpers->split_states.splitting_count = app->partition_count;
app->partition_count *= 2;
app->helpers->contexts.resize(app->partition_count);
app->partitions.resize(app->partition_count);

for (int i = 0; i < app->partition_count; ++i) {
app->helpers->contexts[i].config_owner = &app->partitions[i];
if (i >= app->partition_count / 2) {
if (i >= app->partition_count / 2) { // child partitions
app->partitions[i].ballot = invalid_ballot;
app->partitions[i].pid = gpid(app->app_id, i);
} else { // parent partitions
app->helpers->split_states.status[i] = split_status::SPLITTING;
}
}

Expand Down
56 changes: 31 additions & 25 deletions src/meta/test/meta_split_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,34 +109,40 @@ class meta_split_service_test : public meta_test_base
}

const std::string NAME = "split_table";
const uint32_t PARTITION_COUNT = 4;
const uint32_t NEW_PARTITION_COUNT = 8;
const uint32_t PARENT_BALLOT = 3;
const uint32_t PARENT_INDEX = 0;
const uint32_t CHILD_INDEX = 4;
const int32_t PARTITION_COUNT = 4;
const int32_t NEW_PARTITION_COUNT = 8;
const int32_t PARENT_BALLOT = 3;
const int32_t PARENT_INDEX = 0;
const int32_t CHILD_INDEX = 4;
};

// TODO(heyuchen): refactor start split unit tests
TEST_F(meta_split_service_test, start_split_with_not_existed_app)
// start split unit tests
TEST_F(meta_split_service_test, start_split_test)
{
auto err = start_partition_split("table_not_exist", PARTITION_COUNT);
ASSERT_EQ(err, ERR_APP_NOT_EXIST);
}

TEST_F(meta_split_service_test, start_split_with_wrong_params)
{
auto app = find_app(NAME);
auto err = start_partition_split(NAME, PARTITION_COUNT);
ASSERT_EQ(err, ERR_INVALID_PARAMETERS);
ASSERT_EQ(app->partition_count, PARTITION_COUNT);
}

TEST_F(meta_split_service_test, start_split_succeed)
{
auto app = find_app(NAME);
auto err = start_partition_split(NAME, NEW_PARTITION_COUNT);
ASSERT_EQ(err, ERR_OK);
ASSERT_EQ(app->partition_count, NEW_PARTITION_COUNT);
// Test case:
// - app not existed
// - wrong partition_count
// - app already splitting
// - start split succeed
struct start_test
{
std::string app_name;
int32_t new_partition_count;
bool need_mock_splitting;
error_code expected_err;
int32_t expected_partition_count;
} tests[] = {{"table_not_exist", PARTITION_COUNT, false, ERR_APP_NOT_EXIST, PARTITION_COUNT},
{NAME, PARTITION_COUNT, false, ERR_INVALID_PARAMETERS, PARTITION_COUNT},
{NAME, NEW_PARTITION_COUNT, true, ERR_BUSY, PARTITION_COUNT},
{NAME, NEW_PARTITION_COUNT, false, ERR_OK, NEW_PARTITION_COUNT}};

for (auto test : tests) {
auto app = find_app(NAME);
app->helpers->split_states.splitting_count = test.need_mock_splitting ? PARTITION_COUNT : 0;
ASSERT_EQ(start_partition_split(test.app_name, test.new_partition_count),
test.expected_err);
ASSERT_EQ(app->partition_count, test.expected_partition_count);
}
}

// TODO(heyuchen): refactor register unit tests
Expand Down
32 changes: 22 additions & 10 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ replica_split_manager::replica_split_manager(replica *r)
replica_split_manager::~replica_split_manager() {}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::on_add_child(const group_check_request &request) // on parent partition
void replica_split_manager::parent_start_split(
const group_check_request &request) // on parent partition
{
if (status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY &&
(status() != partition_status::PS_INACTIVE || !_replica->_inactive_is_transient)) {
Expand All @@ -53,14 +54,12 @@ void replica_split_manager::on_add_child(const group_check_request &request) //
return;
}

gpid child_gpid = request.child_gpid;
if (_child_gpid == child_gpid) {
dwarn_replica("child replica({}) is already existed, might be partition splitting, ignore "
"this request",
child_gpid);
if (_split_status == split_status::SPLITTING) {
dwarn_replica("partition is already splitting, ignore this request");
return;
}

gpid child_gpid = request.child_gpid;
if (child_gpid.get_partition_index() < _replica->_app_info.partition_count) {
dwarn_replica(
"receive old add child request, child_gpid={}, partition_count={}, ignore this request",
Expand All @@ -69,6 +68,11 @@ void replica_split_manager::on_add_child(const group_check_request &request) //
return;
}

// TODO(heyuchen): if partition is primary, reset split related varieties

_partition_version.store(_replica->_app_info.partition_count - 1);

_split_status = split_status::SPLITTING;
_child_gpid = child_gpid;
_child_init_ballot = get_ballot();

Expand Down Expand Up @@ -137,12 +141,15 @@ bool replica_split_manager::parent_check_states() // on parent partition
{
FAIL_POINT_INJECT_F("replica_parent_check_states", [](dsn::string_view) { return true; });

if (_child_init_ballot != get_ballot() || _child_gpid.get_app_id() == 0 ||
if (_split_status != split_status::SPLITTING || _child_init_ballot != get_ballot() ||
_child_gpid.get_app_id() == 0 ||
(status() != partition_status::PS_PRIMARY && status() != partition_status::PS_SECONDARY &&
(status() != partition_status::PS_INACTIVE || !_replica->_inactive_is_transient))) {
dwarn_replica("parent wrong states: status({}), init_ballot({}) VS current_ballot({}), "
dwarn_replica("parent wrong states: status({}), split_status({}), init_ballot({}) VS "
"current_ballot({}), "
"child_gpid({})",
enum_to_string(status()),
enum_to_string(_split_status),
_child_init_ballot,
get_ballot(),
_child_gpid);
Expand Down Expand Up @@ -541,8 +548,12 @@ void replica_split_manager::parent_handle_child_catch_up(
const notify_catch_up_request &request,
notify_cacth_up_response &response) // on primary parent
{
if (status() != partition_status::PS_PRIMARY) {
derror_replica("status is {}", enum_to_string(status()));
if (status() != partition_status::PS_PRIMARY || _split_status != split_status::SPLITTING) {
derror_replica(
"wrong partition status or wrong split status, partition_status={}, split_status={}",
enum_to_string(status()),
enum_to_string(_split_status));

response.err = ERR_INVALID_STATE;
return;
}
Expand Down Expand Up @@ -798,6 +809,7 @@ void replica_split_manager::parent_cleanup_split_context() // on parent partitio
{
_child_gpid.set_app_id(0);
_child_init_ballot = 0;
_split_status = split_status::NOT_SPLIT;
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down
6 changes: 4 additions & 2 deletions src/replica/split/replica_split_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class replica_split_manager : replica_base
void set_child_gpid(gpid pid) { _child_gpid = pid; }

private:
// parent partition create child
void on_add_child(const group_check_request &request);
// parent partition start split
void parent_start_split(const group_check_request &request);

// child replica initialize config and state info
void child_init_replica(gpid parent_gpid, rpc_address primary_address, ballot init_ballot);
Expand Down Expand Up @@ -119,6 +119,8 @@ class replica_split_manager : replica_base
friend class replica_stub;
friend class replica_split_test;

split_status::type _split_status{split_status::NOT_SPLIT};

// _child_gpid = gpid({app_id},{pidx}+{old_partition_count}) for parent partition
// _child_gpid.app_id = 0 for parent partition not in partition split and child partition
gpid _child_gpid{0, 0};
Expand Down
Loading

0 comments on commit 2c7148d

Please sign in to comment.