Skip to content

Commit

Permalink
refactor(FQDN): feather refator on idl/dsn.layer2.thrift
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jun 24, 2024
1 parent 7e95f31 commit 4df8807
Show file tree
Hide file tree
Showing 84 changed files with 1,671 additions and 1,506 deletions.
12 changes: 6 additions & 6 deletions idl/dsn.layer2.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ struct partition_configuration
1:dsn.gpid pid;
2:i64 ballot;
3:i32 max_replica_count;
4:dsn.rpc_address primary;
5:list<dsn.rpc_address> secondaries;
6:list<dsn.rpc_address> last_drops;
4:dsn.rpc_address primary1;
5:list<dsn.rpc_address> secondaries1;
6:list<dsn.rpc_address> last_drops1;
7:i64 last_committed_decree;
8:i32 partition_flags;
9:optional dsn.host_port hp_primary;
10:optional list<dsn.host_port> hp_secondaries;
11:optional list<dsn.host_port> hp_last_drops;
9:optional dsn.host_port hp_primary1;
10:optional list<dsn.host_port> hp_secondaries1;
11:optional list<dsn.host_port> hp_last_drops1;
}

struct query_cfg_request
Expand Down
40 changes: 20 additions & 20 deletions src/client/partition_resolver_simple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,26 +301,24 @@ void partition_resolver_simple::query_config_reply(error_code err,
_app_partition_count = resp.partition_count;
_app_is_stateful = resp.is_stateful;

for (auto it = resp.partitions.begin(); it != resp.partitions.end(); ++it) {
auto &new_config = *it;

for (auto &new_pc : resp.partitions) {
LOG_DEBUG_PREFIX("query config reply, gpid = {}, ballot = {}, primary = {}",
new_config.pid,
new_config.ballot,
FMT_HOST_PORT_AND_IP(new_config, primary));
new_pc.pid,
new_pc.ballot,
FMT_HOST_PORT_AND_IP(new_pc, primary1));

auto it2 = _config_cache.find(new_config.pid.get_partition_index());
auto it2 = _config_cache.find(new_pc.pid.get_partition_index());
if (it2 == _config_cache.end()) {
std::unique_ptr<partition_info> pi(new partition_info);
pi->timeout_count = 0;
pi->config = new_config;
_config_cache.emplace(new_config.pid.get_partition_index(), std::move(pi));
} else if (_app_is_stateful && it2->second->config.ballot < new_config.ballot) {
pi->pc = new_pc;
_config_cache.emplace(new_pc.pid.get_partition_index(), std::move(pi));
} else if (_app_is_stateful && it2->second->pc.ballot < new_pc.ballot) {
it2->second->timeout_count = 0;
it2->second->config = new_config;
it2->second->pc = new_pc;
} else if (!_app_is_stateful) {
it2->second->timeout_count = 0;
it2->second->config = new_config;
it2->second->pc = new_pc;
} else {
// nothing to do
}
Expand Down Expand Up @@ -412,32 +410,34 @@ void partition_resolver_simple::handle_pending_requests(std::deque<request_conte
}

/*search in cache*/
host_port partition_resolver_simple::get_host_port(const partition_configuration &config) const
host_port partition_resolver_simple::get_host_port(const partition_configuration &pc) const
{
if (_app_is_stateful) {
return config.hp_primary;
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
return primary;
}

if (config.hp_last_drops.empty()) {
std::vector<host_port> last_drops;
GET_HOST_PORTS(pc, last_drops1, last_drops);
if (last_drops.empty()) {
return host_port();
}

return config.hp_last_drops[rand::next_u32(0, config.last_drops.size() - 1)];
return last_drops[rand::next_u32(0, last_drops.size() - 1)];
}

error_code partition_resolver_simple::get_host_port(int partition_index, /*out*/ host_port &hp)
{
// partition_configuration config;
{
zauto_read_lock l(_config_lock);
auto it = _config_cache.find(partition_index);
if (it != _config_cache.end()) {
// config = it->second->config;
if (it->second->config.ballot < 0) {
if (it->second->pc.ballot < 0) {
// client query config for splitting app, child partition is not ready
return ERR_CHILD_NOT_READY;
}
hp = get_host_port(it->second->config);
hp = get_host_port(it->second->pc);
if (!hp) {
return ERR_IO_PENDING;
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/client/partition_resolver_simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class partition_resolver_simple : public partition_resolver
struct partition_info
{
int timeout_count;
::dsn::partition_configuration config;
::dsn::partition_configuration pc;
};
mutable dsn::zrwlock_nr _config_lock;
std::unordered_map<int, std::unique_ptr<partition_info>> _config_cache;
Expand Down Expand Up @@ -107,7 +107,7 @@ class partition_resolver_simple : public partition_resolver

private:
// local routines
host_port get_host_port(const partition_configuration &config) const;
host_port get_host_port(const partition_configuration &pc) const;
error_code get_host_port(int partition_index, /*out*/ host_port &hp);
void handle_pending_requests(std::deque<request_context_ptr> &reqs, error_code err);
void clear_all_pending_requests();
Expand Down
109 changes: 64 additions & 45 deletions src/client/replication_ddl_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,20 @@ dsn::error_code replication_ddl_client::wait_app_ready(const std::string &app_na
CHECK_EQ(partition_count, query_resp.partition_count);
int ready_count = 0;
for (int i = 0; i < partition_count; i++) {
const partition_configuration &pc = query_resp.partitions[i];
if (pc.hp_primary && (pc.hp_secondaries.size() + 1 >= max_replica_count)) {
ready_count++;
const auto &pc = query_resp.partitions[i];
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (!primary) {
continue;
}

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (secondaries.size() + 1 < max_replica_count) {
continue;
}

ready_count++;
}
if (ready_count == partition_count) {
std::cout << app_name << " is ready now: (" << ready_count << "/" << partition_count
Expand Down Expand Up @@ -422,8 +432,8 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
}
int32_t app_id;
int32_t partition_count;
std::vector<partition_configuration> partitions;
r = list_app(info.app_name, app_id, partition_count, partitions);
std::vector<partition_configuration> pcs;
r = list_app(info.app_name, app_id, partition_count, pcs);
if (r != dsn::ERR_OK) {
LOG_ERROR("list app({}) failed, err = {}", info.app_name, r);
return r;
Expand All @@ -433,18 +443,22 @@ dsn::error_code replication_ddl_client::list_apps(const dsn::app_status::type st
int fully_healthy = 0;
int write_unhealthy = 0;
int read_unhealthy = 0;
for (int i = 0; i < partitions.size(); i++) {
const dsn::partition_configuration &p = partitions[i];
for (const auto &pc : pcs) {
int replica_count = 0;
if (p.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (primary) {
replica_count++;
}
replica_count += p.hp_secondaries.size();
if (p.hp_primary) {
if (replica_count >= p.max_replica_count)
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
replica_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
else if (replica_count < 2)
} else if (replica_count < 2) {
write_unhealthy++;
}
} else {
write_unhealthy++;
read_unhealthy++;
Expand Down Expand Up @@ -566,22 +580,26 @@ dsn::error_code replication_ddl_client::list_nodes(const dsn::replication::node_
for (auto &app : apps) {
int32_t app_id;
int32_t partition_count;
std::vector<partition_configuration> partitions;
r = list_app(app.app_name, app_id, partition_count, partitions);
std::vector<partition_configuration> pcs;
r = list_app(app.app_name, app_id, partition_count, pcs);
if (r != dsn::ERR_OK) {
return r;
}

for (int i = 0; i < partitions.size(); i++) {
const dsn::partition_configuration &p = partitions[i];
if (p.hp_primary) {
auto find = tmp_map.find(p.hp_primary);
for (const auto &pc : pcs) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (primary) {
auto find = tmp_map.find(primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (int j = 0; j < p.hp_secondaries.size(); j++) {
auto find = tmp_map.find(p.hp_secondaries[j]);

std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
}
Expand Down Expand Up @@ -723,13 +741,13 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int32_t app_id = 0;
int32_t partition_count = 0;
int32_t max_replica_count = 0;
std::vector<partition_configuration> partitions;
dsn::error_code err = list_app(app_name, app_id, partition_count, partitions);
std::vector<partition_configuration> pcs;
dsn::error_code err = list_app(app_name, app_id, partition_count, pcs);
if (err != dsn::ERR_OK) {
return err;
}
if (!partitions.empty()) {
max_replica_count = partitions[0].max_replica_count;
if (!pcs.empty()) {
max_replica_count = pcs[0].max_replica_count;
}

// print query_cfg_response
Expand Down Expand Up @@ -765,38 +783,39 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int fully_healthy = 0;
int write_unhealthy = 0;
int read_unhealthy = 0;
for (const auto &p : partitions) {
for (const auto &pc : pcs) {
int replica_count = 0;
if (p.hp_primary) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (primary) {
replica_count++;
node_stat[p.hp_primary].first++;
node_stat[primary].first++;
total_prim_count++;
}
replica_count += p.hp_secondaries.size();
total_sec_count += p.hp_secondaries.size();
if (p.hp_primary) {
if (replica_count >= p.max_replica_count)
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
replica_count += secondaries.size();
total_sec_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count)
fully_healthy++;
else if (replica_count < 2)
write_unhealthy++;
} else {
write_unhealthy++;
read_unhealthy++;
}
tp_details.add_row(p.pid.get_partition_index());
tp_details.append_data(p.ballot);
tp_details.add_row(pc.pid.get_partition_index());
tp_details.append_data(pc.ballot);
std::stringstream oss;
oss << replica_count << "/" << p.max_replica_count;
oss << replica_count << "/" << pc.max_replica_count;
tp_details.append_data(oss.str());
tp_details.append_data(p.hp_primary ? p.hp_primary.to_string() : "-");
tp_details.append_data(primary ? primary.to_string() : "-");
oss.str("");
oss << "[";
// TODO (yingchun) join
for (int j = 0; j < p.hp_secondaries.size(); j++) {
if (j != 0)
oss << ",";
oss << p.hp_secondaries[j];
node_stat[p.hp_secondaries[j]].second++;
oss << fmt::format("{}", fmt::join(secondaries, ","));
for (const auto &secondary : secondaries) {
node_stat[secondary].second++;
}
oss << "]";
tp_details.append_data(oss.str());
Expand Down Expand Up @@ -837,7 +856,7 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
dsn::error_code replication_ddl_client::list_app(const std::string &app_name,
int32_t &app_id,
int32_t &partition_count,
std::vector<partition_configuration> &partitions)
std::vector<partition_configuration> &pcs)
{
RETURN_EC_NOT_OK_MSG(validate_app_name(app_name), "invalid app_name: '{}'", app_name);

Expand All @@ -859,7 +878,7 @@ dsn::error_code replication_ddl_client::list_app(const std::string &app_name,

app_id = resp.app_id;
partition_count = resp.partition_count;
partitions = resp.partitions;
pcs = resp.partitions;

return dsn::ERR_OK;
}
Expand Down Expand Up @@ -1320,8 +1339,8 @@ dsn::error_code replication_ddl_client::query_restore(int32_t restore_app_id, bo
::dsn::unmarshall(resp_task->get_response(), response);
if (response.err == ERR_OK) {
int overall_progress = 0;
for (const auto &p : response.restore_progress) {
overall_progress += p;
for (const auto &progress : response.restore_progress) {
overall_progress += progress;
}
overall_progress = overall_progress / response.restore_progress.size();
overall_progress = overall_progress / 10;
Expand Down
2 changes: 1 addition & 1 deletion src/client/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class replication_ddl_client
dsn::error_code list_app(const std::string &app_name,
int32_t &app_id,
int32_t &partition_count,
std::vector<partition_configuration> &partitions);
std::vector<partition_configuration> &pcs);

dsn::replication::configuration_meta_control_response
control_meta_function_level(meta_function_level::type level);
Expand Down
16 changes: 8 additions & 8 deletions src/common/json_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ inline bool json_decode(const dsn::json::JsonObject &in, dsn::host_port &hp)
return static_cast<bool>(hp);
}

inline void json_encode(JsonWriter &out, const dsn::partition_configuration &config);
inline bool json_decode(const JsonObject &in, dsn::partition_configuration &config);
inline void json_encode(JsonWriter &out, const dsn::partition_configuration &pc);
inline bool json_decode(const JsonObject &in, dsn::partition_configuration &pc);
inline void json_encode(JsonWriter &out, const dsn::app_info &info);
inline bool json_decode(const JsonObject &in, dsn::app_info &info);
inline void json_encode(JsonWriter &out, const dsn::replication::file_meta &f_meta);
Expand Down Expand Up @@ -717,14 +717,14 @@ NON_MEMBER_JSON_SERIALIZATION(dsn::partition_configuration,
pid,
ballot,
max_replica_count,
primary,
secondaries,
last_drops,
primary1,
secondaries1,
last_drops1,
last_committed_decree,
partition_flags,
hp_primary,
hp_secondaries,
hp_last_drops)
hp_primary1,
hp_secondaries1,
hp_last_drops1)

NON_MEMBER_JSON_SERIALIZATION(dsn::app_info,
status,
Expand Down
Loading

0 comments on commit 4df8807

Please sign in to comment.