Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(update_replication_factor#12): update the mutation_2pc_min_replica_count base the max count of table #1035

Merged
merged 15 commits into from
Jul 12, 2022
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,11 @@ package-lock.json

#go
go-client/bin

# rdsn
rdsn/builder
rdsn/thirdparty/build
rdsn/thirdparty/src
rdsn/thirdparty/output
rdsn/cmake-build-debug
rdsn/test_reports
13 changes: 12 additions & 1 deletion rdsn/src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ void replication_options::initialize()
"replication",
"mutation_2pc_min_replica_count",
mutation_2pc_min_replica_count,
"minimum number of alive replicas under which write is allowed");
"minimum number of alive replicas under which write is allowed. it's valid if larger than "
"0, otherwise, the final value is based on app_max_replica_count");

group_check_disabled = dsn_config_get_value_bool("replication",
"group_check_disabled",
Expand Down Expand Up @@ -421,6 +422,16 @@ void replication_options::sanity_check()
staleness_for_commit);
}

int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_replica_count) const
{
dcheck_gt(app_max_replica_count, 0);
if (mutation_2pc_min_replica_count > 0) { // >0 means use the user config
return mutation_2pc_min_replica_count;
} else { // otherwise, the value based on the table max_replica_count
return app_max_replica_count <= 2 ? 1 : app_max_replica_count / 2 + 1;
}
}

/*static*/ bool replica_helper::remove_node(::dsn::rpc_address node,
/*inout*/ std::vector<::dsn::rpc_address> &nodeList)
{
Expand Down
1 change: 1 addition & 0 deletions rdsn/src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class replication_options
~replication_options();

void initialize();
int32_t app_mutation_2pc_min_replica_count(int32_t app_max_replica_count) const;
static bool get_data_dir_and_tag(const std::string &config_dirs_str,
const std::string &default_dir,
const std::string &app_name,
Expand Down
8 changes: 4 additions & 4 deletions rdsn/src/meta/partition_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ namespace replication {
partition_guardian::partition_guardian(meta_service *svc) : _svc(svc)
{
if (svc != nullptr) {
_mutation_2pc_min_replica_count = svc->get_options().mutation_2pc_min_replica_count;
_replica_assign_delay_ms_for_dropouts =
svc->get_meta_options()._lb_opts.replica_assign_delay_ms_for_dropouts;
config_context::MAX_REPLICA_COUNT_IN_GRROUP =
svc->get_meta_options()._lb_opts.max_replicas_in_group;
} else {
_mutation_2pc_min_replica_count = 0;
_replica_assign_delay_ms_for_dropouts = 0;
}

Expand Down Expand Up @@ -477,8 +475,10 @@ pc_status partition_guardian::on_missing_secondary(meta_view &view, const dsn::g

configuration_proposal_action action;
bool is_emergency = false;
if (cc.config_owner->max_replica_count > _mutation_2pc_min_replica_count &&
replica_count(pc) < _mutation_2pc_min_replica_count) {
if (cc.config_owner->max_replica_count >
_svc->get_options().app_mutation_2pc_min_replica_count(pc.max_replica_count) &&
replica_count(pc) <
_svc->get_options().app_mutation_2pc_min_replica_count(pc.max_replica_count)) {
// ATTENTION:
// when max_replica_count == 2, even if there is only 1 replica alive now, we will still
// wait for replica_assign_delay_ms_for_dropouts before recover the second replica.
Expand Down
1 change: 0 additions & 1 deletion rdsn/src/meta/partition_guardian.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ class partition_guardian
// ]
dsn_handle_t _ctrl_assign_secondary_black_list = nullptr;

int32_t _mutation_2pc_min_replica_count;
dsn_handle_t _ctrl_assign_delay_ms = nullptr;
uint64_t _replica_assign_delay_ms_for_dropouts;

Expand Down
6 changes: 4 additions & 2 deletions rdsn/src/meta/server_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1446,7 +1446,8 @@ void server_state::update_configuration_locally(
partition_configuration &old_cfg = app.partitions[gpid.get_partition_index()];
partition_configuration &new_cfg = config_request->config;

int min_2pc_count = _meta_svc->get_options().mutation_2pc_min_replica_count;
int min_2pc_count =
_meta_svc->get_options().app_mutation_2pc_min_replica_count(app.max_replica_count);
health_status old_health_status = partition_health_status(old_cfg, min_2pc_count);
health_status new_health_status = partition_health_status(new_cfg, min_2pc_count);

Expand Down Expand Up @@ -2375,8 +2376,9 @@ void server_state::update_partition_perf_counter()
{
int counters[HS_MAX_VALUE];
::memset(counters, 0, sizeof(counters));
int min_2pc_count = _meta_svc->get_options().mutation_2pc_min_replica_count;
auto func = [&](const std::shared_ptr<app_state> &app) {
int min_2pc_count =
_meta_svc->get_options().app_mutation_2pc_min_replica_count(app->max_replica_count);
for (unsigned int i = 0; i != app->partition_count; ++i) {
health_status st = partition_health_status(app->partitions[i], min_2pc_count);
counters[st]++;
Expand Down
4 changes: 2 additions & 2 deletions rdsn/src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
}

if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
_options->mutation_2pc_min_replica_count) {
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
return;
}
Expand Down Expand Up @@ -223,7 +223,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
// for reconciliation, we should ensure every prepared mutation to be committed
// please refer to PacificA paper
if (static_cast<int>(_primary_states.membership.secondaries.size()) + 1 <
_options->mutation_2pc_min_replica_count &&
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count) &&
!reconciliation) {
err = ERR_NOT_ENOUGH_MEMBER;
goto ErrOut;
Expand Down
2 changes: 1 addition & 1 deletion rdsn/src/replica/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1068,7 +1068,7 @@ bool replica::update_local_configuration(const replica_configuration &config,
}

if (_primary_states.membership.secondaries.size() + 1 <
_options->mutation_2pc_min_replica_count) {
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
std::vector<mutation_ptr> queued;
_primary_states.write_queue.clear(queued);
for (auto &m : queued) {
Expand Down
5 changes: 3 additions & 2 deletions scripts/format_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,18 @@ cd $root_dir

linenoise=./src/shell/linenoise
sds=./src/shell/sds
thirdparty=./rdsn/thirdparty

if [ $# -eq 0 ]; then
echo "formating all .h/.cpp files in $root_dir ..."
find . -type f -not \( -wholename "$linenoise/*" -o -wholename "$sds/*" \) \
find . -type f -not \( -wholename "$linenoise/*" -o -wholename "$sds/*" -o -wholename "$thirdparty/*" \) \
-regextype posix-egrep -regex ".*\.(cpp|h)" | xargs clang-format-3.9 -i -style=file
elif [ $1 = "-h" ]; then
echo "USAGE: ./format-files.sh [<relative_path>] -- format .h/.cpp files in $root_dir/relative_path"
echo " ./format-files.sh means format all .h/.cpp files in $root_dir"
else
echo "formating all .h/.cpp files in $root_dir/$1 ..."
find ./$1 -type f -not \( -wholename "$linenoise/*" -o -wholename "$sds/*" \) \
find ./$1 -type f -not \( -wholename "$linenoise/*" -o -wholename "$sds/*" -o -wholename "$thirdparty/*" \) \
-regextype posix-egrep -regex ".*\.(cpp|h)" | xargs clang-format-3.9 -i -style=file
fi