Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(bulk_load): meta server adds bulk load ingestion concurrent coun…
Browse files Browse the repository at this point in the history
…t restriction (#829)
  • Loading branch information
hycdong committed May 7, 2021
1 parent 836a944 commit c620db0
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 39 deletions.
106 changes: 78 additions & 28 deletions src/meta/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ DSN_DEFINE_uint32("meta_server",
"failed");
DSN_TAG_VARIABLE(bulk_load_max_rollback_times, FT_MUTABLE);

DSN_DEFINE_uint32("meta_server",
bulk_load_ingestion_concurrent_count,
4,
"max partition_count executing ingestion at the same time");
DSN_TAG_VARIABLE(bulk_load_ingestion_concurrent_count, FT_MUTABLE);

bulk_load_service::bulk_load_service(meta_service *meta_svc, const std::string &bulk_load_dir)
: _meta_svc(meta_svc), _state(meta_svc->get_server_state()), _bulk_load_root(bulk_load_dir)
{
Expand Down Expand Up @@ -197,6 +203,7 @@ void bulk_load_service::do_start_app_bulk_load(std::shared_ptr<app_state> app,
zauto_write_lock l(_lock);
_bulk_load_app_id.insert(app->app_id);
_apps_in_progress_count[app->app_id] = app->partition_count;
_apps_ingesting_count[app->app_id] = 0;
}
create_app_bulk_load_dir(
app->app_name, app->app_id, app->partition_count, std::move(rpc));
Expand Down Expand Up @@ -565,6 +572,7 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,
app_name,
pid,
kv.first.to_string());
decrease_app_ingestion_count(pid);
handle_bulk_load_failed(pid.get_app_id());
return;
}
Expand All @@ -584,6 +592,7 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,

if (response.is_group_ingestion_finished) {
ddebug_f("app({}) partition({}) ingestion files succeed", app_name, pid);
decrease_app_ingestion_count(pid);
update_partition_status_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED);
}
}
Expand Down Expand Up @@ -982,6 +991,11 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk
_app_bulk_load_info[app_id] = ainfo;
_apps_pending_sync_flag[app_id] = false;
_apps_in_progress_count[app_id] = partition_count;
// when rollback from ingesting, ingesting_count should be reset
if (old_status == bulk_load_status::BLS_INGESTING &&
new_status == bulk_load_status::BLS_DOWNLOADING) {
_apps_ingesting_count[app_id] = 0;
}
}

ddebug_f("update app({}) status from {} to {}",
Expand All @@ -991,12 +1005,7 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk

if (new_status == bulk_load_status::BLS_INGESTING) {
for (int i = 0; i < partition_count; ++i) {
tasking::enqueue(LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion,
this,
ainfo.app_name,
gpid(app_id, i)));
partition_ingestion(ainfo.app_name, gpid(app_id, i));
}
}

Expand All @@ -1011,10 +1020,41 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk
}
}

// ThreadPool: THREAD_POOL_DEFAULT
// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::partition_ingestion(const std::string &app_name, const gpid &pid)
{
FAIL_POINT_INJECT_F("meta_bulk_load_partition_ingestion", [](dsn::string_view) {});
FAIL_POINT_INJECT_F("meta_bulk_load_partition_ingestion", [=](dsn::string_view) {
if (_apps_ingesting_count[pid.get_app_id()] < FLAGS_bulk_load_ingestion_concurrent_count) {
_apps_ingesting_count[pid.get_app_id()]++;
}
});

{
zauto_read_lock l(_lock);
if (_apps_ingesting_count[pid.get_app_id()] >= FLAGS_bulk_load_ingestion_concurrent_count) {
dwarn_f("app({}) has already {} partitions executing ingestion, partition({}) will "
"wait and try it later",
app_name,
_apps_ingesting_count[pid.get_app_id()],
pid);
tasking::enqueue(
LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid),
pid.thread_hash(),
std::chrono::seconds(5));
return;
}
}

auto app_status = get_app_bulk_load_status(pid.get_app_id());
if (app_status != bulk_load_status::BLS_INGESTING) {
dwarn_f("app({}) current status is {}, partition({}), ignore it",
app_name,
dsn::enum_to_string(app_status),
pid);
return;
}

rpc_address primary_addr;
{
Expand All @@ -1024,20 +1064,15 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
dwarn_f("app(name={}, id={}) is not existed, set bulk load failed",
app_name,
pid.get_app_id());
tasking::enqueue(
LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(
&bulk_load_service::handle_app_unavailable, this, pid.get_app_id(), app_name));

handle_app_unavailable(pid.get_app_id(), app_name);
return;
}
primary_addr = app->partitions[pid.get_partition_index()].primary;
}

if (primary_addr.is_invalid()) {
dwarn_f("app({}) partition({}) primary is invalid, try it later", app_name, pid);
tasking::enqueue(LPC_BULK_LOAD_INGESTION,
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, app_name, pid),
pid.thread_hash(),
Expand All @@ -1049,20 +1084,36 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
derror_f("app({}) partition({}) doesn't have bulk load metadata, set bulk load failed",
app_name,
pid);
tasking::enqueue(
LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::handle_bulk_load_failed, this, pid.get_app_id()));
handle_bulk_load_failed(pid.get_app_id());
return;
}

tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::send_ingestion_request, this, app_name, pid, primary_addr));
{
zauto_write_lock l(_lock);
_apps_ingesting_count[pid.get_app_id()]++;
ddebug_f("send ingest_request to node({}), app({}) partition({}), ingestion_count({})",
primary_addr.to_string(),
app_name,
pid,
_apps_ingesting_count[pid.get_app_id()]);
}
}

// ThreadPool: THREAD_POOL_DEFAULT
void bulk_load_service::send_ingestion_request(const std::string &app_name,
const gpid &pid,
const rpc_address &primary_addr)
{
ingestion_request req;
req.app_name = app_name;
{
zauto_read_lock l(_lock);
req.metadata = _partition_bulk_load_info[pid].metadata;
}

// create a client request, whose gpid field in header should be pid
message_ex *msg = message_ex::create_request(dsn::apps::RPC_RRDB_RRDB_BULK_LOAD,
0,
Expand All @@ -1077,10 +1128,6 @@ void bulk_load_service::partition_ingestion(const std::string &app_name, const g
[this, app_name, pid](error_code err, ingestion_response &&resp) {
on_partition_ingestion_reply(err, std::move(resp), app_name, pid);
});
ddebug_f("send ingest_request to node({}), app({}) partition({})",
primary_addr.to_string(),
app_name,
pid);
_meta_svc->send_request(msg, primary_addr, rpc_callback);
}

Expand All @@ -1090,6 +1137,10 @@ void bulk_load_service::on_partition_ingestion_reply(error_code err,
const std::string &app_name,
const gpid &pid)
{
if (err != ERR_OK || resp.err != ERR_OK || resp.rocksdb_error != ERR_OK) {
decrease_app_ingestion_count(pid);
}

if (err == ERR_NO_NEED_OPERATE) {
dwarn_f(
"app({}) partition({}) has already executing ingestion, ignore this repeated request",
Expand Down Expand Up @@ -1194,6 +1245,7 @@ void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::
erase_map_elem_by_id(app_id, _partitions_cleaned_up);
_apps_rolling_back.erase(app_id);
_apps_rollback_count.erase(app_id);
_apps_ingesting_count.erase(app_id);
_apps_cleaning_up.erase(app_id);
_bulk_load_app_id.erase(app_id);
ddebug_f("reset local app({}) bulk load context", app_name);
Expand Down Expand Up @@ -1666,6 +1718,7 @@ void bulk_load_service::do_continue_app_bulk_load(
zauto_write_lock l(_lock);
_apps_in_progress_count[app_id] = in_progress_partition_count;
_apps_rollback_count[app_id] = 0;
_apps_ingesting_count[app_id] = 0;
}

// if app is paused, no need to send bulk_load_request, just return
Expand Down Expand Up @@ -1700,10 +1753,7 @@ void bulk_load_service::do_continue_app_bulk_load(
gpid pid = gpid(app_id, i);
partition_bulk_load(ainfo.app_name, pid);
if (app_status == bulk_load_status::BLS_INGESTING) {
tasking::enqueue(
LPC_BULK_LOAD_INGESTION,
_meta_svc->tracker(),
std::bind(&bulk_load_service::partition_ingestion, this, ainfo.app_name, pid));
partition_ingestion(ainfo.app_name, pid);
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/meta/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace dsn {
namespace replication {

DSN_DECLARE_uint32(bulk_load_max_rollback_times);
DSN_DECLARE_uint32(bulk_load_ingestion_concurrent_count);

///
/// bulk load path on remote storage:
Expand Down Expand Up @@ -159,6 +160,10 @@ class bulk_load_service
// create ingestion_request and send it to primary
void partition_ingestion(const std::string &app_name, const gpid &pid);

void send_ingestion_request(const std::string &app_name,
const gpid &pid,
const rpc_address &primary_addr);

void on_partition_ingestion_reply(error_code err,
const ingestion_response &&resp,
const std::string &app_name,
Expand Down Expand Up @@ -357,6 +362,15 @@ class bulk_load_service
return (_bulk_load_app_id.find(app_id) != _bulk_load_app_id.end());
}

inline void decrease_app_ingestion_count(const gpid &pid)
{
zauto_write_lock l(_lock);
auto app_id = pid.get_app_id();
if (_apps_ingesting_count.find(app_id) != _apps_ingesting_count.end()) {
_apps_ingesting_count[app_id]--;
}
}

private:
friend class bulk_load_service_test;
friend class meta_bulk_load_http_test;
Expand Down Expand Up @@ -395,6 +409,8 @@ class bulk_load_service
std::unordered_map<app_id, bool> _apps_rolling_back;
// Used for restrict bulk load rollback count
std::unordered_map<app_id, int32_t> _apps_rollback_count;
// app_id -> ingesting partition count
std::unordered_map<app_id, int32_t> _apps_ingesting_count;
};

} // namespace replication
Expand Down
Loading

0 comments on commit c620db0

Please sign in to comment.