Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): bulk load succeed part2 - meta handle bulk load succeed #508

Merged
merged 9 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 146 additions & 2 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,81 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,
void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is succeed, failed, canceled
const std::string &app_name = response.app_name;
const gpid &pid = response.pid;

dassert_f(
response.__isset.is_group_bulk_load_context_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), primary_status({}), "
"but is_group_bulk_load_context_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status));

for (const auto &kv : response.group_bulk_load_state) {
dassert_f(kv.second.__isset.is_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), "
"primary_status({}), but node({}) is_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
kv.first.to_string());
}

{
zauto_read_lock l(_lock);
if (_partitions_cleaned_up[pid]) {
dwarn_f(
"receive bulk load response from node({}) app({}) partition({}), current partition "
"has already been cleaned up",
primary_addr.to_string(),
app_name,
pid);
return;
}
}

// The replicas have cleaned up their bulk load states and removed temporary sst files
bool group_cleaned_up = response.is_group_bulk_load_context_cleaned_up;
hycdong marked this conversation as resolved.
Show resolved Hide resolved
ddebug_f("receive bulk load response from node({}) app({}) partition({}), primary status = {}, "
"is_group_bulk_load_context_cleaned_up = {}",
Copy link
Contributor

@neverchanje neverchanje Jun 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please tell a meaningful message in the log so that we can learn what "is_group_bulk_load_context_cleaned_up" means.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the replica group will succeed without their states cleared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when partition finish ingesting, its status will turn ingesting -> succeed, meta will still send bulk_load_request to replica, replica will turn ingesting -> succeed and cleanup its bulk load context, replica status should be invalid when it clean up bulk load status not succeed.

primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
group_cleaned_up);
{
zauto_write_lock l(_lock);
_partitions_cleaned_up[pid] = group_cleaned_up;
_partitions_bulk_load_state[pid] = response.group_bulk_load_state;
}

if (group_cleaned_up) {
int32_t count = 0;
{
zauto_write_lock l(_lock);
count = --_apps_in_progress_count[pid.get_app_id()];
}
if (count == 0) {
std::shared_ptr<app_state> app;
{
zauto_read_lock l(app_lock());
app = _state->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, remove bulk load dir on remote "
"storage",
app_name,
pid.get_app_id());
remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name);
return;
}
}
ddebug_f("app({}) all partitions cleanup bulk load context", app_name);
remove_bulk_load_dir_on_remote_storage(std::move(app), true);
}
}
}

// ThreadPool: THREAD_POOL_META_STATE
Expand Down Expand Up @@ -898,6 +971,77 @@ void bulk_load_service::on_partition_ingestion_reply(error_code err,
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::remove_bulk_load_dir_on_remote_storage(int32_t app_id,
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
const std::string &app_name)
{
std::string bulk_load_path = get_app_bulk_load_path(app_id);
_meta_svc->get_meta_storage()->delete_node_recursively(
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
std::move(bulk_load_path), [this, app_id, app_name, bulk_load_path]() {
ddebug_f("remove app({}) bulk load dir {} succeed", app_name, bulk_load_path);
reset_local_bulk_load_states(app_id, app_name);
});
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::remove_bulk_load_dir_on_remote_storage(std::shared_ptr<app_state> app,
bool set_app_not_bulk_loading)
{
std::string bulk_load_path = get_app_bulk_load_path(app->app_id);
_meta_svc->get_meta_storage()->delete_node_recursively(
std::move(bulk_load_path), [this, app, set_app_not_bulk_loading, bulk_load_path]() {
ddebug_f("remove app({}) bulk load dir {} succeed", app->app_name, bulk_load_path);
reset_local_bulk_load_states(app->app_id, app->app_name);
if (set_app_not_bulk_loading) {
update_app_not_bulk_loading_on_remote_storage(std::move(app));
}
});
}

template <typename T>
inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map<gpid, T> &mymap)
{
for (auto iter = mymap.begin(); iter != mymap.end();) {
if (iter->first.get_app_id() == app_id) {
hycdong marked this conversation as resolved.
Show resolved Hide resolved
mymap.erase(iter++);
} else {
iter++;
}
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::string &app_name)
{
zauto_write_lock l(_lock);
_app_bulk_load_info.erase(app_id);
_apps_in_progress_count.erase(app_id);
_apps_pending_sync_flag.erase(app_id);
erase_map_elem_by_id(app_id, _partitions_pending_sync_flag);
erase_map_elem_by_id(app_id, _partitions_bulk_load_state);
erase_map_elem_by_id(app_id, _partition_bulk_load_info);
erase_map_elem_by_id(app_id, _partitions_total_download_progress);
erase_map_elem_by_id(app_id, _partitions_cleaned_up);
// TODO(heyuchen): add other varieties
_bulk_load_app_id.erase(app_id);
ddebug_f("reset local app({}) bulk load context", app_name);
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::update_app_not_bulk_loading_on_remote_storage(
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<app_state> app)
{
app_info info = *app;
info.__set_is_bulk_loading(false);

blob value = dsn::json::json_forwarder<app_info>::encode(info);
_meta_svc->get_meta_storage()->set_data(
_state->get_app_path(*app), std::move(value), [app, this]() {
zauto_write_lock l(app_lock());
app->is_bulk_loading = false;
ddebug_f("app({}) update app is_bulk_loading to false", app->app_name);
});
}

void bulk_load_service::create_bulk_load_root_dir(error_code &err, task_tracker &tracker)
{
blob value = blob();
Expand Down
17 changes: 17 additions & 0 deletions src/dist/replication/meta_server/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ class bulk_load_service
const std::string &app_name,
const gpid &pid);

void reset_local_bulk_load_states(int32_t app_id, const std::string &app_name);

///
/// update bulk load states to remote storage functions
///
Expand Down Expand Up @@ -188,6 +190,19 @@ class bulk_load_service
bulk_load_status::type new_status,
bool should_send_request);

// called when app is not available or dropped during bulk load, remove bulk load directory on
// remote storage
void remove_bulk_load_dir_on_remote_storage(int32_t app_id, const std::string &app_name);

// called when app is available, remove bulk load directory on remote storage
// if `set_app_not_bulk_loading` = true: call function
// `update_app_not_bulk_loading_on_remote_storage` to set app not bulk_loading after removing
void remove_bulk_load_dir_on_remote_storage(std::shared_ptr<app_state> app,
bool set_app_not_bulk_loading);

// update app's is_bulk_loading to false on remote_storage
void update_app_not_bulk_loading_on_remote_storage(std::shared_ptr<app_state> app);

///
/// sync bulk load states from remote storage
/// called when service initialized or meta server leader switch
Expand Down Expand Up @@ -315,6 +330,8 @@ class bulk_load_service
// partition_index -> group bulk load states(node address -> state)
std::unordered_map<gpid, std::map<rpc_address, partition_bulk_load_state>>
_partitions_bulk_load_state;

std::unordered_map<gpid, bool> _partitions_cleaned_up;
};

} // namespace replication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,20 @@ class bulk_load_process_test : public bulk_load_service_test
_resp.__set_is_group_ingestion_finished(secondary_istatus == ingestion_status::IS_SUCCEED);
}

void mock_response_cleaned_up_flag(bool all_cleaned_up, bulk_load_status::type status)
{
create_basic_response(ERR_OK, status);

partition_bulk_load_state state, state2;
state.__set_is_cleaned_up(true);
_resp.group_bulk_load_state[PRIMARY] = state;
_resp.group_bulk_load_state[SECONDARY1] = state;

state2.__set_is_cleaned_up(all_cleaned_up);
_resp.group_bulk_load_state[SECONDARY2] = state2;
_resp.__set_is_group_bulk_load_context_cleaned_up(all_cleaned_up);
}

void test_on_partition_bulk_load_reply(int32_t in_progress_count,
bulk_load_status::type status,
error_code resp_err = ERR_OK)
Expand Down Expand Up @@ -327,6 +341,22 @@ TEST_F(bulk_load_process_test, normal_succeed)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED);
}

TEST_F(bulk_load_process_test, succeed_not_all_finished)
{
mock_response_cleaned_up_flag(false, bulk_load_status::BLS_SUCCEED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_SUCCEED);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED);
}

TEST_F(bulk_load_process_test, succeed_all_finished)
{
mock_response_cleaned_up_flag(true, bulk_load_status::BLS_SUCCEED);
test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_SUCCEED);
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

// TODO(heyuchen): add half cleanup test while failed

// TODO(heyuchen): add other unit tests for `on_partition_bulk_load_reply`

/// on_partition_ingestion_reply unit tests
Expand Down