Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: partition config #115

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 17 additions & 21 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,26 +302,24 @@ void partition_resolver_simple::query_config_reply(error_code err,
_app_partition_count = resp.partition_count;
_app_is_stateful = resp.is_stateful;

for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) {
auto &new_config = *it;

for (const auto &new_pc : resp.partitions) {
LOG_DEBUG_PREFIX("query config reply, gpid = {}, ballot = {}, primary = {}",
new_config.pid,
new_config.ballot,
FMT_HOST_PORT_AND_IP(new_config, primary));
new_pc.pid,
new_pc.ballot,
FMT_HOST_PORT_AND_IP(new_pc, primary));

auto it2 = _config_cache.find(new_config.pid.get_partition_index());
auto it2 = _config_cache.find(new_pc.pid.get_partition_index());
if (it2 == _config_cache.end()) {
std::unique_ptr<partition_info> pi(new partition_info);
auto pi = std::make_unique<partition_info>();
pi->timeout_count = 0;
pi->config = new_config;
_config_cache.emplace(new_config.pid.get_partition_index(), std::move(pi));
} else if (_app_is_stateful && it2->second->config.ballot < new_config.ballot) {
pi->pc = new_pc;
_config_cache.emplace(new_pc.pid.get_partition_index(), std::move(pi));
} else if (_app_is_stateful && it2->second->pc.ballot < new_pc.ballot) {
it2->second->timeout_count = 0;
it2->second->config = new_config;
it2->second->pc = new_pc;
} else if (!_app_is_stateful) {
it2->second->timeout_count = 0;
it2->second->config = new_config;
it2->second->pc = new_pc;
} else {
// nothing to do
}
Expand Down Expand Up @@ -413,32 +411,30 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
}

/*search in cache*/
host_port partition_resolver_simple::get_host_port(const partition_configuration &config) const
host_port partition_resolver_simple::get_host_port(const partition_configuration &pc) const
{
if (_app_is_stateful) {
return config.hp_primary;
return pc.hp_primary;
}

if (config.hp_last_drops.empty()) {
if (pc.hp_last_drops.empty()) {
return host_port();
}

return config.hp_last_drops[rand::next_u32(0, config.last_drops.size() - 1)];
return pc.hp_last_drops[rand::next_u32(0, pc.last_drops.size() - 1)];
}

error_code partition_resolver_simple::get_host_port(int partition_index, /*out*/ host_port &hp)
{
// partition_configuration config;
{
zauto_read_lock l(_config_lock);
auto it = _config_cache.find(partition_index);
if (it != _config_cache.end()) {
// config = it->second->config;
if (it->second->config.ballot < 0) {
if (it->second->pc.ballot < 0) {
// client query config for splitting app, child partition is not ready
return ERR_CHILD_NOT_READY;
}
hp = get_host_port(it->second->config);
hp = get_host_port(it->second->pc);
if (!hp) {
return ERR_IO_PENDING;
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/client/partition_resolver_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class partition_resolver_simple : public partition_resolver
struct partition_info
{
int timeout_count;
::dsn::partition_configuration config;
::dsn::partition_configuration pc;
};
mutable dsn::zrwlock_nr _config_lock;
std::unordered_map<int, std::unique_ptr<partition_info>> _config_cache;
Expand Down Expand Up @@ -107,7 +107,7 @@ class partition_resolver_simple : public partition_resolver

private:
// local routines
host_port get_host_port(const partition_configuration &config) const;
host_port get_host_port(const partition_configuration &pc) const;
error_code get_host_port(int partition_index, /*out*/ host_port &hp);
void handle_pending_requests(std::deque<request_context_ptr> &reqs, error_code err);
void clear_all_pending_requests();
Expand Down
91 changes: 41 additions & 50 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_na
CHECK_EQ(partition_count, query_resp.partition_count);
int ready_count = 0;
for (int i = 0; i < partition_count; i++) {
const partition_configuration &pc = query_resp.partitions[i];
const auto &pc = query_resp.partitions[i];
if (pc.hp_primary && (pc.hp_secondaries.size() + 1 >= max_replica_count)) {
ready_count++;
}
Expand Down Expand Up @@ -422,8 +422,8 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
}
int32_t app_id;
int32_t partition_count;
std::vector<partition_configuration> partitions;
r = list_app(info.app_name, app_id, partition_count, partitions);
std::vector<partition_configuration> pcs;
r = list_app(info.app_name, app_id, partition_count, pcs);
if (r != dsn::ERR_OK) {
LOG_ERROR("list app({}) failed, err = {}", info.app_name, r);
return r;
Expand All @@ -433,18 +433,18 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
int fully_healthy = 0;
int write_unhealthy = 0;
int read_unhealthy = 0;
for (int i = 0; i < partitions.size(); i++) {
const dsn::partition_configuration &p = partitions[i];
for (const auto &pc : pcs) {
int replica_count = 0;
if (p.hp_primary) {
if (pc.hp_primary) {
replica_count++;
}
replica_count += p.hp_secondaries.size();
if (p.hp_primary) {
if (replica_count >= p.max_replica_count)
replica_count += pc.hp_secondaries.size();
if (pc.hp_primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
else if (replica_count < 2)
} else if (replica_count < 2) {
write_unhealthy++;
}
} else {
write_unhealthy++;
read_unhealthy++;
Expand Down Expand Up @@ -566,22 +566,21 @@ dsn::error_code replication_ddl_client::list_nodes(const dsn::replication::node_
for (auto &app : apps) {
int32_t app_id;
int32_t partition_count;
std::vector<partition_configuration> partitions;
r = list_app(app.app_name, app_id, partition_count, partitions);
std::vector<partition_configuration> pcs;
r = list_app(app.app_name, app_id, partition_count, pcs);
if (r != dsn::ERR_OK) {
return r;
}

for (int i = 0; i < partitions.size(); i++) {
const dsn::partition_configuration &p = partitions[i];
if (p.hp_primary) {
auto find = tmp_map.find(p.hp_primary);
for (const auto &pc : pcs) {
if (pc.hp_primary) {
auto find = tmp_map.find(pc.hp_primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (int j = 0; j < p.hp_secondaries.size(); j++) {
auto find = tmp_map.find(p.hp_secondaries[j]);
for (const auto &secondary : pc.hp_secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
}
Expand Down Expand Up @@ -723,13 +722,13 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int32_t app_id = 0;
int32_t partition_count = 0;
int32_t max_replica_count = 0;
std::vector<partition_configuration> partitions;
dsn::error_code err = list_app(app_name, app_id, partition_count, partitions);
std::vector<partition_configuration> pcs;
dsn::error_code err = list_app(app_name, app_id, partition_count, pcs);
if (err != dsn::ERR_OK) {
return err;
}
if (!partitions.empty()) {
max_replica_count = partitions[0].max_replica_count;
if (!pcs.empty()) {
max_replica_count = pcs[0].max_replica_count;
}

// print query_cfg_response
Expand Down Expand Up @@ -765,41 +764,33 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int fully_healthy = 0;
int write_unhealthy = 0;
int read_unhealthy = 0;
for (const auto &p : partitions) {
for (const auto &pc : pcs) {
int replica_count = 0;
if (p.hp_primary) {
if (pc.hp_primary) {
replica_count++;
node_stat[p.hp_primary].first++;
node_stat[pc.hp_primary].first++;
total_prim_count++;
}
replica_count += p.hp_secondaries.size();
total_sec_count += p.hp_secondaries.size();
if (p.hp_primary) {
if (replica_count >= p.max_replica_count)
replica_count += pc.hp_secondaries.size();
total_sec_count += pc.hp_secondaries.size();
if (pc.hp_primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
else if (replica_count < 2)
} else if (replica_count < 2) {
write_unhealthy++;
}
} else {
write_unhealthy++;
read_unhealthy++;
}
tp_details.add_row(p.pid.get_partition_index());
tp_details.append_data(p.ballot);
std::stringstream oss;
oss << replica_count << "/" << p.max_replica_count;
tp_details.append_data(oss.str());
tp_details.append_data(p.hp_primary ? p.hp_primary.to_string() : "-");
oss.str("");
oss << "[";
// TODO (yingchun) join
for (int j = 0; j < p.hp_secondaries.size(); j++) {
if (j != 0)
oss << ",";
oss << p.hp_secondaries[j];
node_stat[p.hp_secondaries[j]].second++;
for (const auto &secondary : pc.hp_secondaries) {
node_stat[secondary].second++;
}
oss << "]";
tp_details.append_data(oss.str());
tp_details.add_row(pc.pid.get_partition_index());
tp_details.append_data(pc.ballot);
tp_details.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count));
tp_details.append_data(pc.hp_primary ? pc.hp_primary.to_string() : "-");
tp_details.append_data(fmt::format("[{}]", fmt::join(pc.hp_secondaries, ",")));
}
mtp.add(std::move(tp_details));

Expand Down Expand Up @@ -837,7 +828,7 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int32_t &app_id,
int32_t &partition_count,
std::vector<partition_configuration> &partitions)
std::vector<partition_configuration> &pcs)
{
RETURN_EC_NOT_OK_MSG(validate_app_name(app_name), "invalid app_name: '{}'", app_name);

Expand All @@ -859,7 +850,7 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,

app_id = resp.app_id;
partition_count = resp.partition_count;
partitions = resp.partitions;
pcs = resp.partitions;

return dsn::ERR_OK;
}
Expand Down Expand Up @@ -1322,8 +1313,8 @@ dsn::error_code replication_ddl_client::query_restore(int32_t restore_app_id, bo
::dsn::unmarshall(resp_task->get_response(), response);
if (response.err == ERR_OK) {
int overall_progress = 0;
for (const auto &p : response.restore_progress) {
overall_progress += p;
for (const auto &progress : response.restore_progress) {
overall_progress += progress;
}
overall_progress = overall_progress / response.restore_progress.size();
overall_progress = overall_progress / 10;
Expand Down
2 changes: 1 addition & 1 deletion src/client/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class replication_ddl_client
dsn::error_code list_app(const std::string &app_name,
int32_t &app_id,
int32_t &partition_count,
std::vector<partition_configuration> &partitions);
std::vector<partition_configuration> &pcs);

dsn::replication::configuration_meta_control_response
control_meta_function_level(meta_function_level::type level);
Expand Down
4 changes: 2 additions & 2 deletions src/common/json_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ inline bool json_decode(const dsn::json::JsonObject &in, dsn::host_port &hp)
return static_cast<bool>(hp);
}

inline void json_encode(JsonWriter &out, const dsn::partition_configuration &config);
inline bool json_decode(const JsonObject &in, dsn::partition_configuration &config);
inline void json_encode(JsonWriter &out, const dsn::partition_configuration &pc);
inline bool json_decode(const JsonObject &in, dsn::partition_configuration &pc);
inline void json_encode(JsonWriter &out, const dsn::app_info &info);
inline bool json_decode(const JsonObject &in, dsn::app_info &info);
inline void json_encode(JsonWriter &out, const dsn::replication::file_meta &f_meta);
Expand Down
22 changes: 11 additions & 11 deletions src/common/replication_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,26 +166,26 @@ int32_t replication_options::app_mutation_2pc_min_replica_count(int32_t app_max_
}
}

/*static*/ bool replica_helper::get_replica_config(const partition_configuration &partition_config,
/*static*/ bool replica_helper::get_replica_config(const partition_configuration &pc,
const ::dsn::host_port &node,
/*out*/ replica_configuration &replica_config)
/*out*/ replica_configuration &rc)
{
replica_config.pid = partition_config.pid;
replica_config.ballot = partition_config.ballot;
replica_config.learner_signature = invalid_signature;
SET_OBJ_IP_AND_HOST_PORT(replica_config, primary, partition_config, primary);
rc.pid = pc.pid;
rc.ballot = pc.ballot;
rc.learner_signature = invalid_signature;
SET_OBJ_IP_AND_HOST_PORT(rc, primary, pc, primary);

if (node == partition_config.hp_primary) {
replica_config.status = partition_status::PS_PRIMARY;
if (node == pc.hp_primary) {
rc.status = partition_status::PS_PRIMARY;
return true;
}

if (utils::contains(partition_config.hp_secondaries, node)) {
replica_config.status = partition_status::PS_SECONDARY;
if (utils::contains(pc.hp_secondaries, node)) {
rc.status = partition_status::PS_SECONDARY;
return true;
}

replica_config.status = partition_status::PS_INACTIVE;
rc.status = partition_status::PS_INACTIVE;
return false;
}

Expand Down
8 changes: 4 additions & 4 deletions src/common/replication_other_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ inline bool is_partition_config_equal(const partition_configuration &pc1,
const partition_configuration &pc2)
{
// secondaries no need to be same order
for (const auto &hp : pc1.hp_secondaries) {
if (!is_secondary(pc2, hp)) {
for (const auto &pc1_secondary : pc1.hp_secondaries) {
if (!is_secondary(pc2, pc1_secondary)) {
return false;
}
}
Expand All @@ -106,9 +106,9 @@ class replica_helper
}
return false;
}
static bool get_replica_config(const partition_configuration &partition_config,
static bool get_replica_config(const partition_configuration &pc,
const ::dsn::host_port &node,
/*out*/ replica_configuration &replica_config);
/*out*/ replica_configuration &rc);

// Return true if 'server_list' is a valid comma-separated list of servers, otherwise return
// false. The result is filled into 'servers' if success.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ void backup_engine::backup_app_partition(const gpid &pid)
_is_backup_failed = true;
return;
}
partition_primary = app->partitions[pid.get_partition_index()].hp_primary;
partition_primary = app->pcs[pid.get_partition_index()].hp_primary;
}

if (!partition_primary) {
Expand Down
Loading
Loading