Skip to content

Commit

Permalink
merge feat(bulk-load): bulk load succeed part2 - meta handle bulk loa…
Browse files Browse the repository at this point in the history
…d succeed XiaoMi#508
  • Loading branch information
hycdong committed Jun 24, 2020
1 parent 22b9aee commit 617a059
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 85 deletions.
124 changes: 67 additions & 57 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,59 +594,56 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon
const std::string &app_name = response.app_name;
const gpid &pid = response.pid;

if (!response.__isset.is_group_bulk_load_context_cleaned_up) {
dwarn_f(
"receive bulk load response from node({}) app({}), partition({}), primary_status({}), "
"but is_group_bulk_load_context_cleaned is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status));
return;
}
dassert_f(
response.__isset.is_group_bulk_load_context_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), primary_status({}), "
"but is_group_bulk_load_context_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status));

for (const auto &kv : response.group_bulk_load_state) {
if (!kv.second.__isset.is_cleaned_up) {
dwarn_f("receive bulk load response from node({}) app({}), partition({}), "
"primary_status({}), but node({}) is_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
kv.first.to_string());
return;
}
dassert_f(kv.second.__isset.is_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), "
"primary_status({}), but node({}) is_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
kv.first.to_string());
}

{
zauto_read_lock l(_lock);
if (_partitions_cleaned_up[pid]) {
dwarn_f(
"receive bulk load response from node({}) app({}) partition({}), current partition "
"has already be cleaned up",
"has already been cleaned up",
primary_addr.to_string(),
app_name,
pid);
return;
}
}

bool all_cleaned_up = response.is_group_bulk_load_context_cleaned_up;
// The replicas have cleaned up their bulk load states and removed temporary sst files
bool group_cleaned_up = response.is_group_bulk_load_context_cleaned_up;
ddebug_f("receive bulk load response from node({}) app({}) partition({}), primary status = {}, "
"is_group_bulk_load_context_cleaned = {}",
"is_group_bulk_load_context_cleaned_up = {}",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
all_cleaned_up);
group_cleaned_up);
{
zauto_write_lock l(_lock);
_partitions_cleaned_up[pid] = all_cleaned_up;
_partitions_cleaned_up[pid] = group_cleaned_up;
_partitions_bulk_load_state[pid] = response.group_bulk_load_state;
}

if (all_cleaned_up) {
int32_t count;
if (group_cleaned_up) {
int32_t count = 0;
{
zauto_write_lock l(_lock);
count = --_apps_in_progress_count[pid.get_app_id()];
Expand All @@ -657,15 +654,16 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon
zauto_read_lock l(app_lock());
app = _state->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, set bulk load failed",
dwarn_f("app(name={}, id={}) is not existed, remove bulk load dir on remote "
"storage",
app_name,
pid.get_app_id());
remove_bulk_load_dir(pid.get_app_id(), app_name);
remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name);
return;
}
}
ddebug_f("app({}) all partitions cleanup bulk load context", app_name);
remove_bulk_load_dir(std::move(app), true);
remove_bulk_load_dir_on_remote_storage(std::move(app), true);
}
}
}
Expand Down Expand Up @@ -757,7 +755,7 @@ void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string
zauto_write_lock l(_lock);
if (is_app_bulk_loading_unlocked(app_id) && !_apps_cleaning_up[app_id]) {
_apps_cleaning_up[app_id] = true;
remove_bulk_load_dir(app_id, app_name);
remove_bulk_load_dir_on_remote_storage(app_id, app_name);
}
}

Expand Down Expand Up @@ -1052,7 +1050,8 @@ void bulk_load_service::on_partition_ingestion_reply(error_code err,
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::remove_bulk_load_dir(int32_t app_id, const std::string &app_name)
void bulk_load_service::remove_bulk_load_dir_on_remote_storage(int32_t app_id,
const std::string &app_name)
{
std::string bulk_load_path = get_app_bulk_load_path(app_id);
_meta_svc->get_meta_storage()->delete_node_recursively(
Expand All @@ -1063,35 +1062,30 @@ void bulk_load_service::remove_bulk_load_dir(int32_t app_id, const std::string &
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::remove_bulk_load_dir(std::shared_ptr<app_state> app, bool need_set_app_flag)
void bulk_load_service::remove_bulk_load_dir_on_remote_storage(std::shared_ptr<app_state> app,
bool set_app_not_bulk_loading)
{
std::string bulk_load_path = get_app_bulk_load_path(app->app_id);
_meta_svc->get_meta_storage()->delete_node_recursively(
std::move(bulk_load_path), [this, app, need_set_app_flag, bulk_load_path]() {
ddebug_f("remove app({}) bulk load dir {}", app->app_name, bulk_load_path);
std::move(bulk_load_path), [this, app, set_app_not_bulk_loading, bulk_load_path]() {
ddebug_f("remove app({}) bulk load dir {} succeed", app->app_name, bulk_load_path);
reset_local_bulk_load_states(app->app_id, app->app_name);
if (need_set_app_flag) {
update_app_is_bulk_loading(std::move(app), false);
if (set_app_not_bulk_loading) {
update_app_not_bulk_loading_on_remote_storage(std::move(app));
}
});
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::update_app_is_bulk_loading(std::shared_ptr<app_state> app,
bool is_bulk_loading)
template <typename T>
inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map<gpid, T> &mymap)
{
app_info info = *app;
info.__set_is_bulk_loading(is_bulk_loading);

blob value = dsn::json::json_forwarder<app_info>::encode(info);
_meta_svc->get_meta_storage()->set_data(
_state->get_app_path(*app), std::move(value), [app, is_bulk_loading, this]() {
{
zauto_write_lock l(app_lock());
app->is_bulk_loading = is_bulk_loading;
}
ddebug_f("app({}) update app is_bulk_loading to {}", app->app_name, is_bulk_loading);
});
for (auto iter = mymap.begin(); iter != mymap.end();) {
if (iter->first.get_app_id() == app_id) {
mymap.erase(iter++);
} else {
iter++;
}
}
}

// ThreadPool: THREAD_POOL_META_STATE
Expand All @@ -1111,6 +1105,22 @@ void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::
ddebug_f("reset local app({}) bulk load context", app_name);
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::update_app_not_bulk_loading_on_remote_storage(
std::shared_ptr<app_state> app)
{
app_info info = *app;
info.__set_is_bulk_loading(false);

blob value = dsn::json::json_forwarder<app_info>::encode(info);
_meta_svc->get_meta_storage()->set_data(
_state->get_app_path(*app), std::move(value), [app, this]() {
zauto_write_lock l(app_lock());
app->is_bulk_loading = false;
ddebug_f("app({}) update app is_bulk_loading to false", app->app_name);
});
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::on_query_bulk_load_status(query_bulk_load_rpc rpc)
{
Expand Down Expand Up @@ -1294,9 +1304,9 @@ void bulk_load_service::continue_bulk_load(
derror_f(
"app(name={},app_id={}) is not existed or not available", ainfo.app_name, ainfo.app_id);
if (app == nullptr) {
remove_bulk_load_dir(ainfo.app_id, ainfo.app_name);
remove_bulk_load_dir_on_remote_storage(ainfo.app_id, ainfo.app_name);
} else {
remove_bulk_load_dir(std::move(app), true);
remove_bulk_load_dir_on_remote_storage(std::move(app), true);
}
return;
}
Expand All @@ -1308,7 +1318,7 @@ void bulk_load_service::continue_bulk_load(
ainfo,
partition_bulk_load_info_map,
different_status_pidx_set)) {
remove_bulk_load_dir(std::move(app), true);
remove_bulk_load_dir_on_remote_storage(std::move(app), true);
return;
}

Expand Down Expand Up @@ -1623,7 +1633,7 @@ void bulk_load_service::check_app_bulk_load_consistency(std::shared_ptr<app_stat
app->app_name,
app_path,
is_app_bulk_loading);
update_app_is_bulk_loading(std::move(app), false);
update_app_not_bulk_loading_on_remote_storage(std::move(app));
return;
}
if (err == ERR_OK && !is_app_bulk_loading) {
Expand All @@ -1632,7 +1642,7 @@ void bulk_load_service::check_app_bulk_load_consistency(std::shared_ptr<app_stat
app->app_name,
app_path,
is_app_bulk_loading);
remove_bulk_load_dir(std::move(app), false);
remove_bulk_load_dir_on_remote_storage(std::move(app), false);
return;
}
}
Expand Down
26 changes: 10 additions & 16 deletions src/dist/replication/meta_server/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,6 @@ struct bulk_load_info
DEFINE_JSON_SERIALIZATION(app_id, app_name, partition_count)
};

template <typename T>
inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map<gpid, T> &mymap)
{
for (auto iter = mymap.begin(); iter != mymap.end();) {
if (iter->first.get_app_id() == app_id) {
mymap.erase(iter++);
}
}
}

class bulk_load_service
{
public:
Expand Down Expand Up @@ -215,14 +205,18 @@ class bulk_load_service
bulk_load_status::type new_status,
bool should_send_request);

// `need_set_app_flag` = true: update app's is_bulk_loading to false on remote_storage
void remove_bulk_load_dir(std::shared_ptr<app_state> app, bool need_set_app_flag);

void remove_bulk_load_dir(int32_t app_id, const std::string &app_name);
// called when app is not available or dropped during bulk load, remove bulk load directory on
// remote storage
void remove_bulk_load_dir_on_remote_storage(int32_t app_id, const std::string &app_name);

// update app's is_bulk_loading to <is_bulk_loading> on remote_storage
void update_app_is_bulk_loading(std::shared_ptr<app_state> app, bool is_bulk_loading);
// called when app is available, remove bulk load directory on remote storage
// if `set_app_not_bulk_loading` = true: call function
// `update_app_not_bulk_loading_on_remote_storage` to set app not bulk_loading after removing
void remove_bulk_load_dir_on_remote_storage(std::shared_ptr<app_state> app,
bool set_app_not_bulk_loading);

// update app's is_bulk_loading to false on remote_storage
void update_app_not_bulk_loading_on_remote_storage(std::shared_ptr<app_state> app);
///
/// sync bulk load states from remote storage
/// called when service initialized or meta server leader switch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,17 +496,18 @@ class bulk_load_process_test : public bulk_load_service_test
_resp.__set_is_group_ingestion_finished(secondary_istatus == ingestion_status::IS_SUCCEED);
}

void mock_response_cleanup_flag(bool finish_cleanup, bulk_load_status::type status)
void mock_response_cleaned_up_flag(bool all_cleaned_up, bulk_load_status::type status)
{
create_basic_response(ERR_OK, status);

partition_bulk_load_state state;
partition_bulk_load_state state, state2;
state.__set_is_cleaned_up(true);
_resp.group_bulk_load_state[PRIMARY] = state;
_resp.group_bulk_load_state[SECONDARY1] = state;
state.__set_is_cleaned_up(finish_cleanup);
_resp.group_bulk_load_state[SECONDARY2] = state;
_resp.__set_is_group_bulk_load_context_cleaned_up(finish_cleanup);

state2.__set_is_cleaned_up(all_cleaned_up);
_resp.group_bulk_load_state[SECONDARY2] = state2;
_resp.__set_is_group_bulk_load_context_cleaned_up(all_cleaned_up);
}

void mock_response_paused(bool is_group_paused)
Expand Down Expand Up @@ -629,21 +630,27 @@ TEST_F(bulk_load_process_test, normal_succeed)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED);
}

TEST_F(bulk_load_process_test, half_cleanup)
TEST_F(bulk_load_process_test, succeed_not_all_finished)
{
mock_response_cleanup_flag(false, bulk_load_status::BLS_FAILED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_FAILED);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
mock_response_cleaned_up_flag(false, bulk_load_status::BLS_SUCCEED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_SUCCEED);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED);
}

TEST_F(bulk_load_process_test, cleanup_succeed)
TEST_F(bulk_load_process_test, succeed_all_finished)
{
mock_response_cleanup_flag(true, bulk_load_status::BLS_SUCCEED);
mock_response_cleaned_up_flag(true, bulk_load_status::BLS_SUCCEED);
test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_SUCCEED);

ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

TEST_F(bulk_load_process_test, half_cleanup)
{
mock_response_cleaned_up_flag(false, bulk_load_status::BLS_FAILED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_FAILED);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, pausing)
{
mock_response_paused(false);
Expand Down

0 comments on commit 617a059

Please sign in to comment.