Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(update_replication_factor#12): update the mutation_2pc_min_replica_count base the max count of table #1035

Merged
merged 15 commits into from
Jul 12, 2022
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,11 @@ package-lock.json

#go
go-client/bin

# rdsn
rdsn/builder
rdsn/thirdparty/build
rdsn/thirdparty/src
rdsn/thirdparty/output
rdsn/cmake-build-debug
rdsn/test_reports
10 changes: 10 additions & 0 deletions rdsn/src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1306,5 +1306,15 @@ void meta_service::on_set_max_replica_count(configuration_set_max_replica_count_
server_state::sStateHash);
}

int32_t meta_service::mutation_2pc_min_replica_count(int32_t app_max_replica_count) const
{
dassert_f(_app_info.max_replica_count > 0);
if (_opts.mutation_2pc_min_replica_count > 0) {// >0 means use the user config
return _opts.mutation_2pc_min_replica_count;
} else {// otherwise based on the table max_replica_count
return app_max_replica_count <=2 ? 1 : app_max_replica_count / 2 + 1;
}
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions rdsn/src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ class meta_service : public serverlet<meta_service>
return metas.substr(0, metas.length() - 1);
}

int32_t mutation_2pc_min_replica_count(int32_t app_max_replica_count) const;

private:
void register_rpc_handlers();
void register_ctrl_commands();
Expand Down
7 changes: 3 additions & 4 deletions rdsn/src/meta/partition_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ namespace replication {
partition_guardian::partition_guardian(meta_service *svc) : _svc(svc)
{
if (svc != nullptr) {
_mutation_2pc_min_replica_count = svc->get_options().mutation_2pc_min_replica_count;
_replica_assign_delay_ms_for_dropouts =
svc->get_meta_options()._lb_opts.replica_assign_delay_ms_for_dropouts;
config_context::MAX_REPLICA_COUNT_IN_GRROUP =
svc->get_meta_options()._lb_opts.max_replicas_in_group;
} else {
_mutation_2pc_min_replica_count = 0;
_replica_assign_delay_ms_for_dropouts = 0;
}

Expand Down Expand Up @@ -477,8 +475,9 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g

configuration_proposal_action action;
bool is_emergency = false;
if (cc.config_owner->max_replica_count > _mutation_2pc_min_replica_count &&
replica_count(pc) < _mutation_2pc_min_replica_count) {
if (cc.config_owner->max_replica_count >
_svc->mutation_2pc_min_replica_count(pc.max_replica_count) &&
replica_count(pc) < _svc->mutation_2pc_min_replica_count(pc.max_replica_count)) {
// ATTENTION:
// when max_replica_count == 2, even if there is only 1 replica alive now, we will still
// wait for replica_assign_delay_ms_for_dropouts before recover the second replica.
Expand Down
1 change: 0 additions & 1 deletion rdsn/src/meta/partition_guardian.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class partition_guardian
// ]
dsn_handle_t _ctrl_assign_secondary_black_list = nullptr;

int32_t _mutation_2pc_min_replica_count;
dsn_handle_t _ctrl_assign_delay_ms = nullptr;
uint64_t _replica_assign_delay_ms_for_dropouts;

Expand Down
4 changes: 2 additions & 2 deletions rdsn/src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,7 @@ void server_state::update_configuration_locally(
partition_configuration &old_cfg = app.partitions[gpid.get_partition_index()];
partition_configuration &new_cfg = config_request->config;

int min_2pc_count = _meta_svc->get_options().mutation_2pc_min_replica_count;
int min_2pc_count = _meta_svc->mutation_2pc_min_replica_count(old_cfg.max_replica_count);
health_status old_health_status = partition_health_status(old_cfg, min_2pc_count);
health_status new_health_status = partition_health_status(new_cfg, min_2pc_count);

Expand Down Expand Up @@ -2375,8 +2375,8 @@ void server_state::update_partition_perf_counter()
{
int counters[HS_MAX_VALUE];
::memset(counters, 0, sizeof(counters));
int min_2pc_count = _meta_svc->get_options().mutation_2pc_min_replica_count;
auto func = [&](const std::shared_ptr<app_state> &app) {
int min_2pc_count = _meta_svc->mutation_2pc_min_replica_count(app->max_replica_count);
for (unsigned int i = 0; i != app->partition_count; ++i) {
health_status st = partition_health_status(app->partitions[i], min_2pc_count);
counters[st]++;
Expand Down
10 changes: 10 additions & 0 deletions rdsn/src/replica/replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,5 +581,15 @@ error_code replica::store_app_info(app_info &info, const std::string &path)
return err;
}

int32_t replica::mutation_2pc_min_replica_count() const
{
dassert_f(_app_info.max_replica_count > 0);
if (_options->mutation_2pc_min_replica_count > 0) {// >0 means use the user config
return _options->mutation_2pc_min_replica_count;
} else {// otherwise based on the table max_replica_count
return _app_info.max_replica_count <=2 ? 1 : _app_info.max_replica_count / 2 + 1;
}
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions rdsn/src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba

void update_app_max_replica_count(int32_t max_replica_count);

int32_t mutation_2pc_min_replica_count() const;

private:
friend class ::dsn::replication::test::test_checker;
friend class ::dsn::replication::mutation_queue;
Expand Down
4 changes: 2 additions & 2 deletions rdsn/src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
}

if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
_options->mutation_2pc_min_replica_count) {
mutation_2pc_min_replica_count()) {
response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
return;
}
Expand Down Expand Up @@ -223,7 +223,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
// for reconciliation, we should ensure every prepared mutation to be committed
// please refer to PacificA paper
if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
_options->mutation_2pc_min_replica_count &&
mutation_2pc_min_replica_count() &&
!reconciliation) {
err = ERR_NOT_ENOUGH_MEMBER;
goto ErrOut;
Expand Down
3 changes: 1 addition & 2 deletions rdsn/src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
init_prepare(next, false);
}

if (_primary_states.membership.secondaries.size() + 1 <
_options->mutation_2pc_min_replica_count) {
if (_primary_states.membership.secondaries.size() + 1 < mutation_2pc_min_replica_count()) {
std::vector<mutation_ptr> queued;
_primary_states.write_queue.clear(queued);
for (auto &m : queued) {
Expand Down