Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(split): add splitting_replicas while on_config_sync #653

Merged
merged 18 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion include/dsn/dist/replication/replication_types.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4,916 changes: 2,486 additions & 2,430 deletions src/common/replication_types.cpp

Large diffs are not rendered by default.

27 changes: 21 additions & 6 deletions src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,7 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc)

bool reject_this_request = false;
response.__isset.gc_replicas = false;
std::map<gpid, split_status::type> splitting_replicas;
ddebug("got config sync request from %s, stored_replicas_count(%d)",
request.node.to_string(),
(int)request.stored_replicas.size());
Expand Down Expand Up @@ -800,6 +801,14 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc)
response.partitions[i].info = *app;
response.partitions[i].config = app->partitions[pid.get_partition_index()];
response.partitions[i].host_node = request.node;
// set splitting_replicas
const split_state &app_split_states = app->helpers->split_states;
if (app_split_states.splitting_count > 0) {
auto iter = app_split_states.status.find(pid.get_partition_index());
if (iter != app_split_states.status.end()) {
splitting_replicas[pid] = iter->second;
}
}
++i;
return true;
});
Expand Down Expand Up @@ -891,12 +900,18 @@ void server_state::on_config_sync(configuration_query_by_node_rpc rpc)
if (reject_this_request) {
response.err = ERR_BUSY;
response.partitions.clear();
}
ddebug("send config sync response to %s, err(%s), partitions_count(%d), gc_replicas_count(%d)",
request.node.to_string(),
response.err.to_string(),
(int)response.partitions.size(),
(int)response.gc_replicas.size());
splitting_replicas.clear();
}
if (!splitting_replicas.empty()) {
response.__set_splitting_replicas(splitting_replicas);
}
ddebug_f("send config sync response to {}, err({}), partitions_count({}), "
"gc_replicas_count({}), splitting_replicas({})",
request.node.to_string(),
response.err,
response.partitions.size(),
response.gc_replicas.size(),
splitting_replicas.size());
}

bool server_state::query_configuration_by_gpid(dsn::gpid id,
Expand Down
56 changes: 56 additions & 0 deletions src/meta/test/meta_split_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ class meta_split_service_test : public meta_test_base
return rpc.response().err;
}

int32_t on_config_sync(configuration_query_by_node_request req)
{
auto request = make_unique<configuration_query_by_node_request>(req);
configuration_query_by_node_rpc rpc(std::move(request), RPC_CM_CONFIG_SYNC);
_ss->on_config_sync(rpc);
wait_all();
return rpc.response().splitting_replicas.size();
}

void mock_app_partition_split_context()
{
app->partition_count = NEW_PARTITION_COUNT;
Expand Down Expand Up @@ -202,5 +211,52 @@ TEST_F(meta_split_service_test, register_child_test)
}
}

// config sync unit tests
TEST_F(meta_split_service_test, on_config_sync_test)
{
create_app("not_splitting_app");
auto not_splitting_app = find_app("not_splitting_app");
gpid pid1 = gpid(app->app_id, PARENT_INDEX);
gpid pid2 = gpid(not_splitting_app->app_id, 0);
// mock meta server node state
node_state node;
node.put_partition(pid1, true);
node.put_partition(pid2, true);
mock_node_state(NODE, node);
// mock request
replica_info info1, info2;
info1.pid = pid1;
info2.pid = pid2;
configuration_query_by_node_request req;
req.node = NODE;
req.__isset.stored_replicas = true;
req.stored_replicas.emplace_back(info1);
req.stored_replicas.emplace_back(info2);

// Test case:
// - partition is splitting
// - partition is not splitting
// - TODO(heyuchen): partition split is paused({false, true, 1})
struct config_sync_test
{
bool mock_child_registered;
bool mock_parent_paused;
int32_t expected_count;
} tests[] = {{false, false, 1}, {true, false, 0}};

for (const auto &test : tests) {
mock_app_partition_split_context();
if (test.mock_child_registered) {
mock_child_registered();
}
if (test.mock_parent_paused) {
// TODO(heyuchen): TBD
}
ASSERT_EQ(on_config_sync(req), test.expected_count);
}

drop_app("not_splitting_app");
}

} // namespace replication
} // namespace dsn
4 changes: 3 additions & 1 deletion src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,9 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
// messages and tools from/for meta server
//
void on_config_proposal(configuration_update_request &proposal);
void on_config_sync(const app_info &info, const partition_configuration &config);
void on_config_sync(const app_info &info,
const partition_configuration &config,
split_status::type meta_split_status);
void on_cold_backup(const backup_request &request, /*out*/ backup_response &response);

//
Expand Down
14 changes: 10 additions & 4 deletions src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include "bulk_load/replica_bulk_loader.h"
#include "split/replica_split_manager.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/fail_point.h>
Expand Down Expand Up @@ -1019,7 +1020,9 @@ bool replica::update_local_configuration_with_no_ballot_change(partition_status:
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica::on_config_sync(const app_info &info, const partition_configuration &config)
void replica::on_config_sync(const app_info &info,
const partition_configuration &config,
split_status::type meta_split_status)
{
dinfo_replica("configuration sync");
// no outdated update
Expand All @@ -1029,9 +1032,12 @@ void replica::on_config_sync(const app_info &info, const partition_configuration
update_app_envs(info.envs);
_duplicating = info.duplicating;

if (status() == partition_status::PS_PRIMARY ||
nullptr != _primary_states.reconfiguration_task) {
// nothing to do as primary always holds the truth
if (status() == partition_status::PS_PRIMARY) {
if (nullptr != _primary_states.reconfiguration_task) {
// already under reconfiguration, skip configuration sync
} else if (info.partition_count != _app_info.partition_count) {
_split_mgr->trigger_primary_parent_split(info.partition_count, meta_split_status);
}
} else {
if (_is_initializing) {
// in initializing, when replica still primary, need to inc ballot
Expand Down
39 changes: 28 additions & 11 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1324,10 +1324,13 @@ void replica_stub::on_node_query_reply(error_code err,
return;
}

ddebug("process query node partitions response for resp.err = ERR_OK, "
"partitions_count(%d), gc_replicas_count(%d)",
(int)resp.partitions.size(),
(int)resp.gc_replicas.size());
auto splitting_count = resp.__isset.splitting_replicas ? resp.splitting_replicas.size() : 0;

ddebug_f("process query node partitions response for resp.err = ERR_OK, "
"partitions_count({}), gc_replicas_count({}), splitting_replicas({})",
resp.partitions.size(),
resp.gc_replicas.size(),
splitting_count);

replicas rs;
{
Expand All @@ -1336,11 +1339,20 @@ void replica_stub::on_node_query_reply(error_code err,
}

for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) {
auto meta_split_status = split_status::NOT_SPLIT;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you can calculate meta_split_status in meta server. So we don't need to calculate it in each replica server

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an app is not splitting, splitting_replicas in configuration_query_by_node_response won't be set. The config_sync rpc will happen every 30 seconds for all replica servers, and partition split is not a frequent action, for most cases it will reduce the network cost.

if (splitting_count > 0) {
auto iter = resp.splitting_replicas.find(it->config.pid);
if (iter != resp.splitting_replicas.end()) {
meta_split_status = iter->second;
}
}
rs.erase(it->config.pid);
tasking::enqueue(LPC_QUERY_NODE_CONFIGURATION_SCATTER,
&_tracker,
std::bind(&replica_stub::on_node_query_reply_scatter, this, this, *it),
it->config.pid.thread_hash());
tasking::enqueue(
LPC_QUERY_NODE_CONFIGURATION_SCATTER,
&_tracker,
std::bind(
&replica_stub::on_node_query_reply_scatter, this, this, *it, meta_split_status),
it->config.pid.thread_hash());
}

// for rps not exist on meta_servers
Expand Down Expand Up @@ -1377,7 +1389,11 @@ void replica_stub::set_meta_server_connected_for_test(
for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) {
tasking::enqueue(LPC_QUERY_NODE_CONFIGURATION_SCATTER,
&_tracker,
std::bind(&replica_stub::on_node_query_reply_scatter, this, this, *it),
std::bind(&replica_stub::on_node_query_reply_scatter,
this,
this,
*it,
split_status::NOT_SPLIT),
it->config.pid.thread_hash());
}
}
Expand All @@ -1393,11 +1409,12 @@ void replica_stub::set_replica_state_subscriber_for_test(replica_state_subscribe
// replica_stub::close
// ThreadPool: THREAD_POOL_REPLICATION
void replica_stub::on_node_query_reply_scatter(replica_stub_ptr this_,
const configuration_update_request &req)
const configuration_update_request &req,
split_status::type meta_split_status)
{
replica_ptr replica = get_replica(req.config.pid);
if (replica != nullptr) {
replica->on_config_sync(req.info, req.config);
replica->on_config_sync(req.info, req.config, meta_split_status);
Copy link
Contributor

@neverchanje neverchanje Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ugly to particularly add an argument for one requirement. It makes the function responsibility obscure. I'd prefer adding meta_split_status to configuration_update_request, even it's ugly too.

Think about it, dozens of parameters in a function, if we keep doing that. And I admit this code is shit, so we can't make it worse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

} else {
if (req.config.primary == _primary_address) {
ddebug("%s@%s: replica not exists on replica server, which is primary, remove it "
Expand Down
3 changes: 2 additions & 1 deletion src/replica/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
void on_meta_server_disconnected_scatter(replica_stub_ptr this_, gpid id);
void on_node_query_reply(error_code err, dsn::message_ex *request, dsn::message_ex *response);
void on_node_query_reply_scatter(replica_stub_ptr this_,
const configuration_update_request &config);
const configuration_update_request &config,
split_status::type meta_split_status);
void on_node_query_reply_scatter2(replica_stub_ptr this_, gpid id);
void remove_replica_on_meta_server(const app_info &info, const partition_configuration &config);
::dsn::task_ptr begin_open_replica(const app_info &app,
Expand Down
42 changes: 42 additions & 0 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,48 @@ void replica_split_manager::child_handle_async_learn_error() // on child partiti
_replica->_split_states.async_learn_task = nullptr;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::trigger_primary_parent_split(
const int32_t meta_partition_count,
split_status::type meta_split_status) // on primary parent partition
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
dcheck_eq_replica(status(), partition_status::PS_PRIMARY);
dcheck_eq_replica(_replica->_app_info.partition_count * 2, meta_partition_count);
ddebug_replica("app({}) partition count changed, local({}) VS meta({}), split_status local({}) "
"VS meta({})",
_replica->_app_info.app_name,
_replica->_app_info.partition_count,
meta_partition_count,
enum_to_string(_split_status),
enum_to_string(meta_split_status));

if (meta_split_status == split_status::SPLITTING) {
if (!_replica->_primary_states.learners.empty() ||
_replica->_primary_states.membership.secondaries.size() + 1 <
_replica->_primary_states.membership.max_replica_count) {
dwarn_replica(
"there are {} learners or not have enough secondaries(count is {}), wait for "
"next round",
_replica->_primary_states.learners.size(),
_replica->_primary_states.membership.secondaries.size());
return;
}

group_check_request add_child_request;
add_child_request.app = _replica->_app_info;
_replica->_primary_states.get_replica_config(status(), add_child_request.config);
auto child_gpid =
gpid(get_gpid().get_app_id(),
get_gpid().get_partition_index() + _replica->_app_info.partition_count);
add_child_request.__set_child_gpid(child_gpid);
parent_start_split(add_child_request);
// TODO(heyuchen): broadcast group check request to secondaries to start split
return;
}

// TODO(heyuchen): add other split_status check
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_split_manager::trigger_secondary_parent_split(
const group_check_request &request,
Expand Down
5 changes: 5 additions & 0 deletions src/replica/split/replica_split_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ class replica_split_manager : replica_base
// child handle error while async learn parent states
void child_handle_async_learn_error();

// called by `on_config_sync` in `replica_config.cpp`
// primary parent start or stop split according to meta_split_status
void trigger_primary_parent_split(const int32_t meta_partition_count,
split_status::type meta_split_status);

// called by `on_group_check` in `replica_check.cpp`
// secondary parent check whether should start or stop split
void trigger_secondary_parent_split(const group_check_request &request,
Expand Down
Loading