Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jun 23, 2024
1 parent 3d16c0b commit 500a749
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 120 deletions.
8 changes: 6 additions & 2 deletions src/meta/meta_bulk_load_ingestion_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,12 @@ void ingestion_context::partition_node_info::create(const partition_configuratio
{
pid = pc.pid;
std::unordered_set<host_port> current_nodes;
current_nodes.insert(pc.hp_primary1);
for (const auto &secondary : pc.hp_secondaries1) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
current_nodes.insert(primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
current_nodes.insert(secondary);
}
for (const auto &node : current_nodes) {
Expand Down
32 changes: 18 additions & 14 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ bool bulk_load_service::check_partition_status(
}

pc = app->pcs[pid.get_partition_index()];
if (!pc.hp_primary1) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (!primary) {
LOG_WARNING("app({}) partition({}) primary is invalid, try it later", app_name, pid);
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
Expand All @@ -380,7 +382,9 @@ bool bulk_load_service::check_partition_status(
return false;
}

if (pc.hp_secondaries1.size() < pc.max_replica_count - 1) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (secondaries.size() < pc.max_replica_count - 1) {
bulk_load_status::type p_status;
{
zauto_read_lock l(_lock);
Expand Down Expand Up @@ -1199,7 +1203,9 @@ bool bulk_load_service::check_ever_ingestion_succeed(const partition_configurati

std::vector<host_port> current_nodes;
current_nodes.emplace_back(pc.hp_primary1);
for (const auto &secondary : pc.hp_secondaries1) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
current_nodes.emplace_back(secondary);
}

Expand Down Expand Up @@ -1267,18 +1273,16 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
return;
}

const auto &primary_addr = pc.hp_primary1;
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
ballot meta_ballot = pc.ballot;
tasking::enqueue(LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::send_ingestion_request,
this,
app_name,
pid,
primary_addr,
meta_ballot),
0,
std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot),
0,
std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
}

// ThreadPool: THREAD_POOL_DEFAULT
Expand Down
7 changes: 5 additions & 2 deletions src/meta/meta_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,13 @@ bool construct_replica(meta_view view, const gpid &pid, int max_replica_count)
// we put max_replica_count-1 recent replicas to last_drops, in case of the DDD-state when the
// only primary dead
// when add node to pc.last_drops1, we don't remove it from our cc.drop_list
CHECK(pc.hp_last_drops1.empty(), "last_drops of partition({}) must be empty", pid);
std::vector<host_port> last_drops;
GET_HOST_PORTS(pc, last_drops1, last_drops);
CHECK(last_drops.empty(), "last_drops of partition({}) must be empty", pid);
for (auto iter = drop_list.rbegin(); iter != drop_list.rend(); ++iter) {
if (pc.hp_last_drops1.size() + 1 >= max_replica_count)
if (last_drops.size() + 1 >= max_replica_count) {
break;
}
// similar to cc.drop_list, pc.last_drop is also a stack structure
HEAD_INSERT_IP_AND_HOST_PORT_BY_DNS(pc, last_drops1, iter->node);
LOG_INFO("construct for ({}), select {} into last_drops, ballot({}), "
Expand Down
27 changes: 18 additions & 9 deletions src/meta/meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,11 @@ inline config_context *get_config_context(app_mapper &apps, const dsn::gpid &gpi

inline int replica_count(const partition_configuration &pc)
{
int ans = pc.hp_primary1 ? 1 : 0;
return ans + pc.hp_secondaries1.size();
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
return (primary ? 1 : 0) + secondaries.size();
}

enum health_status
Expand All @@ -503,19 +506,25 @@ enum health_status
inline health_status partition_health_status(const partition_configuration &pc,
int mutation_2pc_min_replica_count)
{
if (!pc.hp_primary1) {
if (pc.hp_secondaries1.empty())
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (!primary) {
if (secondaries.empty()) {
return HS_DEAD;
else
} else {
return HS_UNREADABLE;
}
} else {
int n = pc.hp_secondaries1.size() + 1;
if (n < mutation_2pc_min_replica_count)
int n = secondaries.size() + 1;
if (n < mutation_2pc_min_replica_count) {
return HS_UNWRITABLE;
else if (n < pc.max_replica_count)
} else if (n < pc.max_replica_count) {
return HS_WRITABLE_ILL;
else
} else {
return HS_HEALTHY;
}
}
}

Expand Down
64 changes: 35 additions & 29 deletions src/meta/meta_http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,38 +143,35 @@ void meta_http_service::get_app_handler(const http_request &req, http_response &
int read_unhealthy = 0;
for (const auto &pc : response.partitions) {
int replica_count = 0;
if (pc.hp_primary1) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (primary) {
replica_count++;
node_stat[pc.hp_primary1].first++;
node_stat[primary].first++;
total_prim_count++;
}
replica_count += pc.hp_secondaries1.size();
total_sec_count += pc.hp_secondaries1.size();
if (pc.hp_primary1) {
if (replica_count >= pc.max_replica_count)
replica_count += secondaries.size();
total_sec_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
else if (replica_count < 2)
} else if (replica_count < 2) {
write_unhealthy++;
}
} else {
write_unhealthy++;
read_unhealthy++;
}
tp_details.add_row(pc.pid.get_partition_index());
tp_details.append_data(pc.ballot);
std::stringstream oss;
oss << replica_count << "/" << pc.max_replica_count;
tp_details.append_data(oss.str());
tp_details.append_data(pc.hp_primary1 ? pc.hp_primary1.to_string() : "-");
oss.str("");
oss << "[";
for (int j = 0; j < pc.hp_secondaries1.size(); j++) {
if (j != 0)
oss << ",";
oss << pc.hp_secondaries1[j];
node_stat[pc.hp_secondaries1[j]].second++;
tp_details.append_data(fmt::format("{}/{}", replica_count, pc.max_replica_count));
tp_details.append_data(primary ? primary.to_string() : "-");
tp_details.append_data(fmt::format("[{}]", fmt::join(secondaries, ",")));
for (const auto &secondary : secondaries) {
node_stat[secondary].second++;
}
oss << "]";
tp_details.append_data(oss.str());
}
mtp.add(std::move(tp_details));

Expand Down Expand Up @@ -325,15 +322,20 @@ void meta_http_service::list_app_handler(const http_request &req, http_response
for (int i = 0; i < response.partitions.size(); i++) {
const auto &pc = response.partitions[i];
int replica_count = 0;
if (pc.hp_primary1) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (primary) {
replica_count++;
}
replica_count += pc.hp_secondaries1.size();
if (pc.hp_primary1) {
if (replica_count >= pc.max_replica_count)
replica_count += secondaries.size();
if (primary) {
if (replica_count >= pc.max_replica_count) {
fully_healthy++;
else if (replica_count < 2)
} else if (replica_count < 2) {
write_unhealthy++;
}
} else {
write_unhealthy++;
read_unhealthy++;
Expand Down Expand Up @@ -415,14 +417,18 @@ void meta_http_service::list_node_handler(const http_request &req, http_response

for (int i = 0; i < response_app.partitions.size(); i++) {
const auto &pc = response_app.partitions[i];
if (pc.hp_primary1) {
auto find = tmp_map.find(pc.hp_primary1);
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (primary) {
auto find = tmp_map.find(primary);
if (find != tmp_map.end()) {
find->second.primary_count++;
}
}
for (int j = 0; j < pc.hp_secondaries1.size(); j++) {
auto find = tmp_map.find(pc.hp_secondaries1[j]);
for (const auto &secondary : secondaries) {
auto find = tmp_map.find(secondary);
if (find != tmp_map.end()) {
find->second.secondary_count++;
}
Expand Down
54 changes: 34 additions & 20 deletions src/meta/partition_guardian.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,19 @@ pc_status partition_guardian::cure(meta_view view,
CHECK(acts.empty(), "");

pc_status status;
if (!pc.hp_primary1)
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
if (!primary) {
status = on_missing_primary(view, gpid);
else if (static_cast<int>(pc.hp_secondaries1.size()) + 1 < pc.max_replica_count)
} else if (static_cast<int>(secondaries.size()) + 1 < pc.max_replica_count) {
status = on_missing_secondary(view, gpid);
else if (static_cast<int>(pc.hp_secondaries1.size()) >= pc.max_replica_count)
} else if (static_cast<int>(secondaries.size()) >= pc.max_replica_count) {
status = on_redundant_secondary(view, gpid);
else
} else {
status = pc_status::healthy;
}

if (!acts.empty()) {
action = *acts.front();
Expand Down Expand Up @@ -125,8 +130,9 @@ void partition_guardian::reconfig(meta_view view, const configuration_update_req
if (request.type == config_type::CT_DROP_PARTITION) {
cc->serving.clear();

const std::vector<host_port> &config_dropped = request.config.hp_last_drops1;
for (const auto &drop_node : config_dropped) {
std::vector<host_port> last_drops;
GET_HOST_PORTS(request.config, last_drops1, last_drops);
for (const auto &drop_node : last_drops) {
cc->record_drop_history(drop_node);
}
} else {
Expand Down Expand Up @@ -162,6 +168,8 @@ bool partition_guardian::from_proposals(meta_view &view,
host_port target;
host_port node;
GET_HOST_PORT(action, target, target);
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
std::string reason;
if (!target) {
reason = "action target is invalid";
Expand All @@ -188,10 +196,10 @@ bool partition_guardian::from_proposals(meta_view &view,

switch (action.type) {
case config_type::CT_ASSIGN_PRIMARY:
is_action_valid = (node == target && !pc.primary1 && !is_secondary(pc, node));
is_action_valid = (node == target && !primary && !is_secondary(pc, node));
break;
case config_type::CT_UPGRADE_TO_PRIMARY:
is_action_valid = (node == target && !pc.primary1 && is_secondary(pc, node));
is_action_valid = (node == target && !primary && is_secondary(pc, node));
break;
case config_type::CT_ADD_SECONDARY:
case config_type::CT_ADD_SECONDARY_FOR_LB:
Expand Down Expand Up @@ -246,11 +254,15 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi

action.type = config_type::CT_INVALID;
// try to upgrade a secondary to primary if the primary is missing
if (!pc.hp_secondaries1.empty()) {
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
std::vector<host_port> last_drops;
GET_HOST_PORTS(pc, last_drops1, last_drops);
if (!secondaries.empty()) {
RESET_IP_AND_HOST_PORT(action, node);
for (const auto &hp_secondary : pc.hp_secondaries1) {
const auto ns = get_node_state(*(view.nodes), hp_secondary, false);
CHECK_NOTNULL(ns, "invalid secondary: {}", hp_secondary);
for (const auto &secondary : secondaries) {
const auto ns = get_node_state(*(view.nodes), secondary, false);
CHECK_NOTNULL(ns, "invalid secondary: {}", secondary);
if (dsn_unlikely(!ns->alive())) {
continue;
}
Expand Down Expand Up @@ -283,7 +295,7 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi
}
// if nothing in the last_drops, it means that this is a newly created partition, so let's
// just find a node and assign primary for it.
else if (pc.hp_last_drops1.empty()) {
else if (last_drops.empty()) {
dsn::host_port min_primary_server;
newly_partitions *min_primary_server_np = nullptr;

Expand Down Expand Up @@ -337,10 +349,10 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi
dr.last_prepared_decree);
}

for (int i = 0; i < pc.hp_last_drops1.size(); ++i) {
for (int i = 0; i < last_drops.size(); ++i) {
int dropped_index = -1;
for (int k = 0; k < cc.dropped.size(); k++) {
if (cc.dropped[k].node == pc.hp_last_drops1[i]) {
if (cc.dropped[k].node == last_drops[i]) {
dropped_index = k;
break;
}
Expand All @@ -352,13 +364,13 @@ pc_status partition_guardian::on_missing_primary(meta_view &view, const dsn::gpi
dropped_index);
}

if (pc.hp_last_drops1.size() == 1) {
if (last_drops.size() == 1) {
LOG_WARNING("{}: the only node({}) is dead, waiting it to come back",
gpid_name,
FMT_HOST_PORT_AND_IP(pc, last_drops1.back()));
SET_OBJ_IP_AND_HOST_PORT(action, node, pc, last_drops1.back());
} else {
std::vector<dsn::host_port> nodes(pc.hp_last_drops1.end() - 2, pc.hp_last_drops1.end());
std::vector<dsn::host_port> nodes(last_drops.end() - 2, last_drops.end());
std::vector<dropped_replica> collected_info(2);
bool ready = true;

Expand Down Expand Up @@ -667,9 +679,11 @@ pc_status partition_guardian::on_redundant_secondary(meta_view &view, const dsn:
const node_mapper &nodes = *(view.nodes);
const partition_configuration &pc = *get_config(*(view.apps), gpid);
int target = 0;
int load = nodes.find(pc.hp_secondaries1.front())->second.partition_count();
for (int i = 0; i != pc.hp_secondaries1.size(); ++i) {
int l = nodes.find(pc.hp_secondaries1[i])->second.partition_count();
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
int load = nodes.find(secondaries.front())->second.partition_count();
for (int i = 0; i != secondaries.size(); ++i) {
int l = nodes.find(secondaries[i])->second.partition_count();
if (l > load) {
load = l;
target = i;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/server_load_balancer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ void server_load_balancer::register_proposals(meta_view view,
continue;
}

if (!pc.hp_primary1) {
host_port primary;
GET_HOST_PORT(pc, primary1, primary);
if (!primary) {
resp.err = ERR_INVALID_PARAMETERS;
return;
}
Expand Down
Loading

0 comments on commit 500a749

Please sign in to comment.