Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
acelyc111 committed Jun 24, 2024
1 parent 500a749 commit cbc4273
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 67 deletions.
7 changes: 4 additions & 3 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1202,7 +1202,9 @@ bool bulk_load_service::check_ever_ingestion_succeed(const partition_configurati
}

std::vector<host_port> current_nodes;
current_nodes.emplace_back(pc.hp_primary1);
dsn::host_port primary;
GET_HOST_PORT(pc, primary1, primary);
current_nodes.emplace_back(primary);
std::vector<host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
Expand Down Expand Up @@ -1275,12 +1277,11 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g

host_port primary;
GET_HOST_PORT(pc, primary1, primary);
ballot meta_ballot = pc.ballot;
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, meta_ballot),
&bulk_load_service::send_ingestion_request, this, app_name, pid, primary, pc.ballot),
0,
std::chrono::seconds(bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL));
}
Expand Down
7 changes: 5 additions & 2 deletions src/meta/test/balancer_simulator/balancer_simulator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ void generate_balanced_apps(/*out*/ app_mapper &apps,

for (auto &pc : app->pcs) {
temp.clear();
while (pc.hp_secondaries1.size() + 1 < pc.max_replica_count) {
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(pc, secondaries1, secondaries);
while (secondaries.size() + 1 < pc.max_replica_count) {
const auto &n = pq2.pop();
if (!is_member(pc, n)) {
pc.hp_secondaries1.push_back(n);
secondaries.push_back(n);
nodes[n].put_partition(pc.pid, false);
}
temp.push_back(n);
Expand Down Expand Up @@ -154,6 +156,7 @@ void random_move_primary(app_mapper &apps, node_mapper &nodes, int primary_move_
for (auto &pc : app.pcs) {
int n = random32(1, space_size) / 100;
if (n < primary_move_ratio) {
CHECK(pc.hp_primary1, "");
int indice = random32(0, 1);
nodes[pc.hp_primary1].remove_partition(pc.pid, true);
std::swap(pc.primary1, pc.secondaries1[indice]);
Expand Down
69 changes: 40 additions & 29 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,29 +188,32 @@ void replica_bulk_loader::broadcast_group_bulk_load(const bulk_load_request &met

LOG_INFO_PREFIX("start to broadcast group bulk load");

for (const auto &hp : _replica->_primary_states.pc.hp_secondaries1) {
if (hp == _stub->primary_host_port())
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
if (secondary == _stub->primary_host_port()) {
continue;
}

auto request = std::make_unique<group_bulk_load_request>();
request->app_name = _replica->_app_info.app_name;
const auto &addr = dsn::dns_resolver::instance().resolve_address(hp);
SET_IP_AND_HOST_PORT(*request, target, addr, hp);
const auto &addr = dsn::dns_resolver::instance().resolve_address(secondary);
SET_IP_AND_HOST_PORT(*request, target, addr, secondary);
_replica->_primary_states.get_replica_config(partition_status::PS_SECONDARY,
request->config);
request->cluster_name = meta_req.cluster_name;
request->provider_name = meta_req.remote_provider_name;
request->meta_bulk_load_status = meta_req.meta_bulk_load_status;
request->remote_root_path = meta_req.remote_root_path;

LOG_INFO_PREFIX("send group_bulk_load_request to {}({})", hp, addr);
LOG_INFO_PREFIX("send group_bulk_load_request to {}({})", secondary, addr);

group_bulk_load_rpc rpc(
std::move(request), RPC_GROUP_BULK_LOAD, 0_ms, 0, get_gpid().thread_hash());
auto callback_task = rpc.call(addr, tracker(), [this, rpc](error_code err) mutable {
on_group_bulk_load_reply(err, rpc.request(), rpc.response());
});
_replica->_primary_states.group_bulk_load_pending_replies[hp] = callback_task;
_replica->_primary_states.group_bulk_load_pending_replies[secondary] = callback_task;
}
}

Expand Down Expand Up @@ -740,8 +743,10 @@ void replica_bulk_loader::handle_bulk_load_finish(bulk_load_status::type new_sta
}

if (status() == partition_status::PS_PRIMARY) {
for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) {
_replica->_primary_states.reset_node_bulk_load_states(target_hp);
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
_replica->_primary_states.reset_node_bulk_load_states(secondary);
}
}

Expand Down Expand Up @@ -938,16 +943,18 @@ void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_respo
primary_state.download_status);

int32_t total_progress = primary_state.download_progress;
for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) {
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
const auto &secondary_state =
_replica->_primary_states.secondary_bulk_load_states[target_hp];
_replica->_primary_states.secondary_bulk_load_states[secondary];
int32_t s_progress =
secondary_state.__isset.download_progress ? secondary_state.download_progress : 0;
error_code s_status =
secondary_state.__isset.download_status ? secondary_state.download_status : ERR_OK;
LOG_INFO_PREFIX(
"secondary = {}, download progress = {}%, status={}", target_hp, s_progress, s_status);
SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, secondary_state);
"secondary = {}, download progress = {}%, status={}", secondary, s_progress, s_status);
SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, secondary, secondary_state);
total_progress += s_progress;
}

Expand Down Expand Up @@ -978,19 +985,20 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary1),
enum_to_string(primary_state.ingest_status));

std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries);
bool is_group_ingestion_finish =
(primary_state.ingest_status == ingestion_status::IS_SUCCEED) &&
(_replica->_primary_states.pc.hp_secondaries1.size() + 1 ==
_replica->_primary_states.pc.max_replica_count);
for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) {
(secondaries.size() + 1 == _replica->_primary_states.pc.max_replica_count);
for (const auto &secondary : secondaries) {
const auto &secondary_state =
_replica->_primary_states.secondary_bulk_load_states[target_hp];
_replica->_primary_states.secondary_bulk_load_states[secondary];
ingestion_status::type ingest_status = secondary_state.__isset.ingest_status
? secondary_state.ingest_status
: ingestion_status::IS_INVALID;
LOG_INFO_PREFIX(
"secondary = {}, ingestion status={}", target_hp, enum_to_string(ingest_status));
SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, secondary_state);
"secondary = {}, ingestion status={}", secondary, enum_to_string(ingest_status));
SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, secondary, secondary_state);
is_group_ingestion_finish &= (ingest_status == ingestion_status::IS_SUCCEED);
}
response.__set_is_group_ingestion_finished(is_group_ingestion_finish);
Expand Down Expand Up @@ -1025,17 +1033,18 @@ void replica_bulk_loader::report_group_cleaned_up(bulk_load_response &response)
FMT_HOST_PORT_AND_IP(_replica->_primary_states.pc, primary1),
primary_state.is_cleaned_up);

bool group_flag =
(primary_state.is_cleaned_up) && (_replica->_primary_states.pc.hp_secondaries1.size() + 1 ==
_replica->_primary_states.pc.max_replica_count);
for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) {
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries);
bool group_flag = (primary_state.is_cleaned_up) &&
(secondaries.size() + 1 == _replica->_primary_states.pc.max_replica_count);
for (const auto &secondary : secondaries) {
const auto &secondary_state =
_replica->_primary_states.secondary_bulk_load_states[target_hp];
_replica->_primary_states.secondary_bulk_load_states[secondary];
bool is_cleaned_up =
secondary_state.__isset.is_cleaned_up ? secondary_state.is_cleaned_up : false;
LOG_INFO_PREFIX(
"secondary = {}, bulk load states cleaned_up = {}", target_hp, is_cleaned_up);
SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, secondary_state);
"secondary = {}, bulk load states cleaned_up = {}", secondary, is_cleaned_up);
SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, secondary, secondary_state);
group_flag &= is_cleaned_up;
}
LOG_INFO_PREFIX("group bulk load states cleaned_up = {}", group_flag);
Expand Down Expand Up @@ -1067,12 +1076,14 @@ void replica_bulk_loader::report_group_is_paused(bulk_load_response &response)
bool group_is_paused =
primary_state.is_paused && (_replica->_primary_states.pc.hp_secondaries1.size() + 1 ==
_replica->_primary_states.pc.max_replica_count);
for (const auto &target_hp : _replica->_primary_states.pc.hp_secondaries1) {
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
partition_bulk_load_state secondary_state =
_replica->_primary_states.secondary_bulk_load_states[target_hp];
_replica->_primary_states.secondary_bulk_load_states[secondary];
bool is_paused = secondary_state.__isset.is_paused ? secondary_state.is_paused : false;
LOG_INFO_PREFIX("secondary = {}, bulk_load is_paused = {}", target_hp, is_paused);
SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, target_hp, secondary_state);
LOG_INFO_PREFIX("secondary = {}, bulk_load is_paused = {}", secondary, is_paused);
SET_VALUE_FROM_HOST_PORT(response, group_bulk_load_state, secondary, secondary_state);
group_is_paused &= is_paused;
}
LOG_INFO_PREFIX("group bulk load is_paused = {}", group_is_paused);
Expand Down
15 changes: 11 additions & 4 deletions src/replica/duplication/replica_follower.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,9 @@ error_code replica_follower::update_master_replica_config(error_code err, query_
return ERR_INCONSISTENT_STATE;
}

if (dsn_unlikely(!resp.partitions[0].hp_primary1)) {
dsn::host_port primary;
GET_HOST_PORT(resp.partitions[0], primary1, primary);
if (dsn_unlikely(!primary)) {
LOG_ERROR_PREFIX("master[{}] partition address is invalid", master_replica_name());
return ERR_INVALID_STATE;
}
Expand All @@ -203,9 +205,14 @@ void replica_follower::copy_master_replica_checkpoint()
dsn::message_ex *msg =
dsn::message_ex::create_request(RPC_QUERY_LAST_CHECKPOINT_INFO, 0, _pc.pid.thread_hash());
dsn::marshall(msg, request);
rpc::call(_pc.primary1, msg, &_tracker, [&](error_code err, learn_response &&resp) mutable {
nfs_copy_checkpoint(err, std::move(resp));
});
dsn::host_port primary;
GET_HOST_PORT(_pc, primary1, primary);
rpc::call(dsn::dns_resolver::instance().resolve_address(primary),
msg,
&_tracker,
[&](error_code err, learn_response &&resp) mutable {
nfs_copy_checkpoint(err, std::move(resp));
});
}

// ThreadPool: THREAD_POOL_DEFAULT
Expand Down
5 changes: 4 additions & 1 deletion src/replica/duplication/replica_follower.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/gpid.h"
#include "dsn.layer2_types.h"
#include "replica/replica_base.h"
#include "runtime/rpc/dns_resolver.h"
#include "runtime/rpc/rpc_host_port.h"
#include "runtime/task/task_tracker.h"
#include "utils/error_code.h"
Expand Down Expand Up @@ -78,7 +79,9 @@ class replica_follower : replica_base
std::string master_replica_name()
{
std::string app_info = fmt::format("{}.{}", _master_cluster_name, _master_app_name);
if (_pc.hp_primary1) {
dsn::host_port primary;
GET_HOST_PORT(_pc, primary1, primary);
if (primary) {
return fmt::format("{}({}|{})", app_info, FMT_HOST_PORT_AND_IP(_pc, primary1), _pc.pid);
}
return app_info;
Expand Down
18 changes: 10 additions & 8 deletions src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_primary_states.pc, secondaries1, secondaries);
if (request->rpc_code() == dsn::apps::RPC_RRDB_RRDB_BULK_LOAD) {
auto cur_bulk_load_status = _bulk_loader->get_bulk_load_status();
if (cur_bulk_load_status != bulk_load_status::BLS_DOWNLOADED &&
Expand All @@ -200,16 +202,15 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
LOG_INFO_PREFIX("receive bulk load ingestion request");

// bulk load ingestion request requires that all secondaries should be alive
if (static_cast<int>(_primary_states.pc.hp_secondaries1.size()) + 1 <
_primary_states.pc.max_replica_count) {
if (static_cast<int32_t>(secondaries.size()) + 1 < _primary_states.pc.max_replica_count) {
response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
return;
}
_is_bulk_load_ingestion = true;
_bulk_load_ingestion_start_time_ms = dsn_now_ms();
}

if (static_cast<int>(_primary_states.pc.hp_secondaries1.size()) + 1 <
if (static_cast<int32_t>(secondaries.size()) + 1 <
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count)) {
response_client_write(request, ERR_NOT_ENOUGH_MEMBER);
return;
Expand Down Expand Up @@ -257,6 +258,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
mu->set_is_sync_to_child(_primary_states.sync_send_write_request);

// check bounded staleness
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_primary_states.pc, secondaries1, secondaries);
if (mu->data.header.decree > last_committed_decree() + FLAGS_staleness_for_commit) {
err = ERR_CAPACITY_EXCEEDED;
goto ErrOut;
Expand All @@ -269,8 +272,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
break;
}
LOG_INFO_PREFIX("try to prepare bulk load mutation({})", mu->name());
if (static_cast<int>(_primary_states.pc.hp_secondaries1.size()) + 1 <
_primary_states.pc.max_replica_count) {
if (static_cast<int32_t>(secondaries.size()) + 1 < _primary_states.pc.max_replica_count) {
err = ERR_NOT_ENOUGH_MEMBER;
break;
}
Expand All @@ -282,7 +284,7 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c
// stop prepare if there are too few replicas unless it's a reconciliation
// for reconciliation, we should ensure every prepared mutation to be committed
// please refer to PacificA paper
if (static_cast<int>(_primary_states.pc.hp_secondaries1.size()) + 1 <
if (static_cast<int32_t>(secondaries.size()) + 1 <
_options->app_mutation_2pc_min_replica_count(_app_info.max_replica_count) &&
!reconciliation) {
err = ERR_NOT_ENOUGH_MEMBER;
Expand All @@ -299,8 +301,8 @@ void replica::init_prepare(mutation_ptr &mu, bool reconciliation, bool pop_all_c

// remote prepare
mu->set_prepare_ts();
mu->set_left_secondary_ack_count((unsigned int)_primary_states.pc.hp_secondaries1.size());
for (const auto &secondary : _primary_states.pc.hp_secondaries1) {
mu->set_left_secondary_ack_count(static_cast<unsigned int>(secondaries.size()));
for (const auto &secondary : secondaries) {
send_prepare_message(secondary,
partition_status::PS_SECONDARY,
mu,
Expand Down
9 changes: 7 additions & 2 deletions src/replica/replica_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,10 +255,15 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo

void replica::send_backup_request_to_secondary(const backup_request &request)
{
for (const auto &target_address : _primary_states.pc.secondaries1) {
std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_primary_states.pc, secondaries1, secondaries);
for (const auto &secondary : secondaries) {
// primary will send backup_request to secondary periodically
// so, we shouldn't handle the response
rpc::call_one_way_typed(target_address, RPC_COLD_BACKUP, request, get_gpid().thread_hash());
rpc::call_one_way_typed(dsn::dns_resolver::instance().resolve_address(secondary),
RPC_COLD_BACKUP,
request,
get_gpid().thread_hash());
}
}

Expand Down
39 changes: 23 additions & 16 deletions src/replica/split/replica_split_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -775,14 +775,18 @@ void replica_split_manager::update_child_group_partition_count(
return;
}

if (!_replica->_primary_states.learners.empty() ||
_replica->_primary_states.pc.hp_secondaries1.size() + 1 <
_replica->_primary_states.pc.max_replica_count) {
LOG_ERROR_PREFIX("there are {} learners or not have enough secondaries(count is {})",
_replica->_primary_states.learners.size(),
_replica->_primary_states.pc.hp_secondaries1.size());
parent_handle_split_error(
"update_child_group_partition_count failed, have learner or lack of secondary", true);
if (!_replica->_primary_states.learners.empty()) {
LOG_ERROR_PREFIX("there are {} learners", _replica->_primary_states.learners.size());
parent_handle_split_error("update_child_group_partition_count failed, have learner", true);
return;
}

std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries);
if (secondaries.size() + 1 < _replica->_primary_states.pc.max_replica_count) {
LOG_ERROR_PREFIX("there are not enough secondaries(count is {})", secondaries.size());
parent_handle_split_error("update_child_group_partition_count failed, lack of secondary",
true);
return;
}

Expand Down Expand Up @@ -1221,14 +1225,17 @@ void replica_split_manager::trigger_primary_parent_split(

_meta_split_status = meta_split_status;
if (meta_split_status == split_status::SPLITTING) {
if (!_replica->_primary_states.learners.empty() ||
_replica->_primary_states.pc.hp_secondaries1.size() + 1 <
_replica->_primary_states.pc.max_replica_count) {
LOG_WARNING_PREFIX(
"there are {} learners or not have enough secondaries(count is {}), wait for "
"next round",
_replica->_primary_states.learners.size(),
_replica->_primary_states.pc.hp_secondaries1.size());
if (!_replica->_primary_states.learners.empty()) {
LOG_WARNING_PREFIX("there are {} learners, wait for next round",
_replica->_primary_states.learners.size());
return;
}

std::vector<dsn::host_port> secondaries;
GET_HOST_PORTS(_replica->_primary_states.pc, secondaries1, secondaries);
if (secondaries.size() + 1 < _replica->_primary_states.pc.max_replica_count) {
LOG_WARNING_PREFIX("there are not enough secondaries(count is {}), wait for next round",
secondaries.size());
return;
}

Expand Down
Loading

0 comments on commit cbc4273

Please sign in to comment.