Skip to content

Commit

Permalink
refactor: Simplify code on partition_config
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jul 16, 2024
1 parent a128565 commit 5652125
Show file tree
Hide file tree
Showing 80 changed files with 1,084 additions and 1,149 deletions.
38 changes: 17 additions & 21 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,26 +302,24 @@ void partition_resolver_simple::query_config_reply(error_code err,
_app_partition_count = resp.partition_count;
_app_is_stateful = resp.is_stateful;

for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) {
auto &new_config = *it;

for (const auto &new_pc : resp.partitions) {
LOG_DEBUG_PREFIX("query config reply, gpid = {}, ballot = {}, primary = {}",
new_config.pid,
new_config.ballot,
FMT_HOST_PORT_AND_IP(new_config, primary));
new_pc.pid,
new_pc.ballot,
FMT_HOST_PORT_AND_IP(new_pc, primary));

auto it2 = _config_cache.find(new_config.pid.get_partition_index());
auto it2 = _config_cache.find(new_pc.pid.get_partition_index());
if (it2 == _config_cache.end()) {
std::unique_ptr<partition_info> pi(new partition_info);
auto pi = std::make_unique<partition_info>();
pi->timeout_count = 0;
pi->config = new_config;
_config_cache.emplace(new_config.pid.get_partition_index(), std::move(pi));
} else if (_app_is_stateful && it2->second->config.ballot < new_config.ballot) {
pi->pc = new_pc;
_config_cache.emplace(new_pc.pid.get_partition_index(), std::move(pi));
} else if (_app_is_stateful && it2->second->pc.ballot < new_pc.ballot) {
it2->second->timeout_count = 0;
it2->second->config = new_config;
it2->second->pc = new_pc;
} else if (!_app_is_stateful) {
it2->second->timeout_count = 0;
it2->second->config = new_config;
it2->second->pc = new_pc;
} else {
// nothing to do
}
Expand Down Expand Up @@ -413,32 +411,30 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
}

/*search in cache*/
host_port partition_resolver_simple::get_host_port(const partition_configuration &config) const
host_port partition_resolver_simple::get_host_port(const partition_configuration &pc) const
{
if (_app_is_stateful) {
return config.hp_primary;
return pc.hp_primary;
}

if (config.hp_last_drops.empty()) {
if (pc.hp_last_drops.empty()) {
return host_port();
}

return config.hp_last_drops[rand::next_u32(0, config.last_drops.size() - 1)];
return pc.hp_last_drops[rand::next_u32(0, pc.last_drops.size() - 1)];
}

error_code partition_resolver_simple::get_host_port(int partition_index, /*out*/ host_port &hp)
{
// partition_configuration config;
{
zauto_read_lock l(_config_lock);
auto it = _config_cache.find(partition_index);
if (it != _config_cache.end()) {
// config = it->second->config;
if (it->second->config.ballot < 0) {
if (it->second->pc.ballot < 0) {
// client query config for splitting app, child partition is not ready
return ERR_CHILD_NOT_READY;
}
hp = get_host_port(it->second->config);
hp = get_host_port(it->second->pc);
if (!hp) {
return ERR_IO_PENDING;
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/client/partition_resolver_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class partition_resolver_simple : public partition_resolver
struct partition_info
{
int timeout_count;
::dsn::partition_configuration config;
::dsn::partition_configuration pc;
};
mutable dsn::zrwlock_nr _config_lock;
std::unordered_map<int, std::unique_ptr<partition_info>> _config_cache;
Expand Down Expand Up @@ -107,7 +107,7 @@ class partition_resolver_simple : public partition_resolver

private:
// local routines
host_port get_host_port(const partition_configuration &config) const;
host_port get_host_port(const partition_configuration &pc) const;
error_code get_host_port(int partition_index, /*out*/ host_port &hp);
void handle_pending_requests(std::deque<request_context_ptr> &reqs, error_code err);
void clear_all_pending_requests();
Expand Down
91 changes: 41 additions & 50 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_na
CHECK_EQ(partition_count, query_resp.partition_count);
int ready_count = 0;
for (int i = 0; i < partition_count; i++) {
const partition_configuration &pc = query_resp.partitions[i];
const auto &pc = query_resp.partitions[i];
if (pc.hp_primary && (pc.hp_secondaries.size() + 1 >= max_replica_count)) {
ready_count++;
}
Expand Down Expand Up @@ -422,8 +422,8 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
}
int32_t app_id;
int32_t partition_count;
std::vector<partition_configuration> partitions;
r = list_app(info.app_name, app_id, partition_count, partitions);
std::vector<partition_configuration> pcs;
r = list_app(info.app_name, app_id, partition_count, pcs);
if (r != dsn::ERR_OK) {
LOG_ERROR("list app({}) failed, err = {}", info.app_name, r);
return r;
Expand All @@ -433,18 +433,18 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
int fully_healthy = 0;
int write_unhealthy = 0;
int read_unhealthy = 0;
for (int i = 0; i < partitions.size(); i++) {
const dsn::partition_configuration &p = partitions[i];
for (const auto &pc : pcs) {
int replica_count = 0;
if (p.hp_primary) {
if (pc.hp_primary) {
replica_count++;
}
replica_count += p.hp_secondaries.size();
if (p.hp_primary) {
if (replica_count >= p.max_replica_count)
replica_count += pc.hp_secondaries.size();
if (pc.hp_primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
else if (replica_count < 2)
} else if (replica_count < 2) {
write_unhealthy++;
}
} else {
write_unhealthy++;
read_unhealthy++;
Expand Down Expand Up @@ -566,22 +566,21 @@ dsn::error_code replication_ddl_client::list_nodes(const dsn::replication::node_
for (auto &app : apps) {
int32_t app_id;
int32_t partition_count;
std::vector<partition_configuration> partitions;
r = list_app(app.app_name, app_id, partition_count, partitions);
std::vector<partition_configuration> pcs;
r = list_app(app.app_name, app_id, partition_count, pcs);
if (r != dsn::ERR_OK) {
return r;
}

for (int i = 0; i < partitions.size(); i++) {
const dsn::partition_configuration &p = partitions[i];
if (p.hp_primary) {
auto find = tmp_map.find(p.hp_primary);
for (const auto &pc : pcs) {
if (pc.hp_primary) {
auto find = tmp_map.find(pc.hp_primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (int j = 0; j < p.hp_secondaries.size(); j++) {
auto find = tmp_map.find(p.hp_secondaries[j]);
for (const auto &secondary : pc.hp_secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
}
Expand Down Expand Up @@ -723,13 +722,13 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int32_t app_id = 0;
int32_t partition_count = 0;
int32_t max_replica_count = 0;
std::vector<partition_configuration> partitions;
dsn::error_code err = list_app(app_name, app_id, partition_count, partitions);
std::vector<partition_configuration> pcs;
dsn::error_code err = list_app(app_name, app_id, partition_count, pcs);
if (err != dsn::ERR_OK) {
return err;
}
if (!partitions.empty()) {
max_replica_count = partitions[0].max_replica_count;
if (!pcs.empty()) {
max_replica_count = pcs[0].max_replica_count;
}

// print query_cfg_response
Expand Down Expand Up @@ -765,41 +764,33 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int fully_healthy = 0;
int write_unhealthy = 0;
int read_unhealthy = 0;
for (const auto &p : partitions) {
for (const auto &pc : pcs) {
int replica_count = 0;
if (p.hp_primary) {
if (pc.hp_primary) {
replica_count++;
node_stat[p.hp_primary].first++;
node_stat[pc.hp_primary].first++;
total_prim_count++;
}
replica_count += p.hp_secondaries.size();
total_sec_count += p.hp_secondaries.size();
if (p.hp_primary) {
if (replica_count >= p.max_replica_count)
replica_count += pc.hp_secondaries.size();
total_sec_count += pc.hp_secondaries.size();
if (pc.hp_primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
else if (replica_count < 2)
} else if (replica_count < 2) {
write_unhealthy++;
}
} else {
write_unhealthy++;
read_unhealthy++;
}
tp_details.add_row(p.pid.get_partition_index());
tp_details.append_data(p.ballot);
std::stringstream oss;
oss << replica_count << "/" << p.max_replica_count;
tp_details.append_data(oss.str());
tp_details.append_data(p.hp_primary ? p.hp_primary.to_string() : "-");
oss.str("");
oss << "[";
// TODO (yingchun) join
for (int j = 0; j < p.hp_secondaries.size(); j++) {
if (j != 0)
oss << ",";
oss << p.hp_secondaries[j];
node_stat[p.hp_secondaries[j]].second++;
for (const auto &secondary : pc.hp_secondaries) {
node_stat[secondary].second++;
}
oss << "]";
tp_details.append_data(oss.str());
tp_details.add_row(pc.pid.get_partition_index());
tp_details.append_data(pc.ballot);
tp_details.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count));
tp_details.append_data(pc.hp_primary ? pc.hp_primary.to_string() : "-");
tp_details.append_data(fmt::format("[{}]", fmt::join(pc.hp_secondaries, ",")));
}
mtp.add(std::move(tp_details));

Expand Down Expand Up @@ -837,7 +828,7 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int32_t &app_id,
int32_t &partition_count,
std::vector<partition_configuration> &partitions)
std::vector<partition_configuration> &pcs)
{
RETURN_EC_NOT_OK_MSG(validate_app_name(app_name), "invalid app_name: '{}'", app_name);

Expand All @@ -859,7 +850,7 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,

app_id = resp.app_id;
partition_count = resp.partition_count;
partitions = resp.partitions;
pcs = resp.partitions;

return dsn::ERR_OK;
}
Expand Down Expand Up @@ -1322,8 +1313,8 @@ dsn::error_code replication_ddl_client::query_restore(int32_t restore_app_id, bo
::dsn::unmarshall(resp_task->get_response(), response);
if (response.err == ERR_OK) {
int overall_progress = 0;
for (const auto &p : response.restore_progress) {
overall_progress += p;
for (const auto &progress : response.restore_progress) {
overall_progress += progress;
}
overall_progress = overall_progress / response.restore_progress.size();
overall_progress = overall_progress / 10;
Expand Down
2 changes: 1 addition & 1 deletion src/client/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class replication_ddl_client
dsn::error_code list_app(const std::string &app_name,
int32_t &app_id,
int32_t &partition_count,
std::vector<partition_configuration> &partitions);
std::vector<partition_configuration> &pcs);

dsn::replication::configuration_meta_control_response
control_meta_function_level(meta_function_level::type level);
Expand Down
4 changes: 2 additions & 2 deletions src/common/json_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ inline bool json_decode(const dsn::json::JsonObject &in, dsn::host_port &hp)
return static_cast<bool>(hp);
}

inline void json_encode(JsonWriter &out, const dsn::partition_configuration &config);
inline bool json_decode(const JsonObject &in, dsn::partition_configuration &config);
inline void json_encode(JsonWriter &out, const dsn::partition_configuration &pc);
inline bool json_decode(const JsonObject &in, dsn::partition_configuration &pc);
inline void json_encode(JsonWriter &out, const dsn::app_info &info);
inline bool json_decode(const JsonObject &in, dsn::app_info &info);
inline void json_encode(JsonWriter &out, const dsn::replication::file_meta &f_meta);
Expand Down
22 changes: 11 additions & 11 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,26 +166,26 @@ int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
}
}

/*static*/ bool replica_helper::get_replica_config(const partition_configuration &partition_config,
/*static*/ bool replica_helper::get_replica_config(const partition_configuration &pc,
const ::dsn::host_port &node,
/*out*/ replica_configuration &replica_config)
/*out*/ replica_configuration &rc)
{
replica_config.pid = partition_config.pid;
replica_config.ballot = partition_config.ballot;
replica_config.learner_signature = invalid_signature;
SET_OBJ_IP_AND_HOST_PORT(replica_config, primary, partition_config, primary);
rc.pid = pc.pid;
rc.ballot = pc.ballot;
rc.learner_signature = invalid_signature;
SET_OBJ_IP_AND_HOST_PORT(rc, primary, pc, primary);

if (node == partition_config.hp_primary) {
replica_config.status = partition_status::PS_PRIMARY;
if (node == pc.hp_primary) {
rc.status = partition_status::PS_PRIMARY;
return true;
}

if (utils::contains(partition_config.hp_secondaries, node)) {
replica_config.status = partition_status::PS_SECONDARY;
if (utils::contains(pc.hp_secondaries, node)) {
rc.status = partition_status::PS_SECONDARY;
return true;
}

replica_config.status = partition_status::PS_INACTIVE;
rc.status = partition_status::PS_INACTIVE;
return false;
}

Expand Down
8 changes: 4 additions & 4 deletions src/common/replication_other_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ inline bool is_partition_config_equal(const partition_configuration &pc1,
const partition_configuration &pc2)
{
// secondaries no need to be same order
for (const auto &hp : pc1.hp_secondaries) {
if (!is_secondary(pc2, hp)) {
for (const auto &pc1_secondary : pc1.hp_secondaries) {
if (!is_secondary(pc2, pc1_secondary)) {
return false;
}
}
Expand All @@ -106,9 +106,9 @@ class replica_helper
}
return false;
}
static bool get_replica_config(const partition_configuration &partition_config,
static bool get_replica_config(const partition_configuration &pc,
const ::dsn::host_port &node,
/*out*/ replica_configuration &replica_config);
/*out*/ replica_configuration &rc);

// Return true if 'server_list' is a valid comma-separated list of servers, otherwise return
// false. The result is filled into 'servers' if success.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void backup_engine::backup_app_partition(const gpid &pid)
_is_backup_failed = true;
return;
}
partition_primary = app->partitions[pid.get_partition_index()].hp_primary;
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
}

if (!partition_primary) {
Expand Down
Loading

0 comments on commit 5652125

Please sign in to comment.