Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(bulk-load): bulk load succeed part2 - meta handle bulk load succ…
Browse files Browse the repository at this point in the history
…eed (#508)
  • Loading branch information
hycdong authored Jun 24, 2020
1 parent 07618f3 commit ed267e2
Show file tree
Hide file tree
Showing 3 changed files with 193 additions and 2 deletions.
148 changes: 146 additions & 2 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,81 @@ void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,
void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is succeed, failed, canceled
const std::string &app_name = response.app_name;
const gpid &pid = response.pid;

dassert_f(
response.__isset.is_group_bulk_load_context_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), primary_status({}), "
"but is_group_bulk_load_context_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status));

for (const auto &kv : response.group_bulk_load_state) {
dassert_f(kv.second.__isset.is_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), "
"primary_status({}), but node({}) is_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
kv.first.to_string());
}

{
zauto_read_lock l(_lock);
if (_partitions_cleaned_up[pid]) {
dwarn_f(
"receive bulk load response from node({}) app({}) partition({}), current partition "
"has already been cleaned up",
primary_addr.to_string(),
app_name,
pid);
return;
}
}

// The replicas have cleaned up their bulk load states and removed temporary sst files
bool group_cleaned_up = response.is_group_bulk_load_context_cleaned_up;
ddebug_f("receive bulk load response from node({}) app({}) partition({}), primary status = {}, "
"is_group_bulk_load_context_cleaned_up = {}",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
group_cleaned_up);
{
zauto_write_lock l(_lock);
_partitions_cleaned_up[pid] = group_cleaned_up;
_partitions_bulk_load_state[pid] = response.group_bulk_load_state;
}

if (group_cleaned_up) {
int32_t count = 0;
{
zauto_write_lock l(_lock);
count = --_apps_in_progress_count[pid.get_app_id()];
}
if (count == 0) {
std::shared_ptr<app_state> app;
{
zauto_read_lock l(app_lock());
app = _state->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
dwarn_f("app(name={}, id={}) is not existed, remove bulk load dir on remote "
"storage",
app_name,
pid.get_app_id());
remove_bulk_load_dir_on_remote_storage(pid.get_app_id(), app_name);
return;
}
}
ddebug_f("app({}) all partitions cleanup bulk load context", app_name);
remove_bulk_load_dir_on_remote_storage(std::move(app), true);
}
}
}

// ThreadPool: THREAD_POOL_META_STATE
Expand Down Expand Up @@ -898,6 +971,77 @@ void bulk_load_service::on_partition_ingestion_reply(error_code err,
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::remove_bulk_load_dir_on_remote_storage(int32_t app_id,
const std::string &app_name)
{
std::string bulk_load_path = get_app_bulk_load_path(app_id);
_meta_svc->get_meta_storage()->delete_node_recursively(
std::move(bulk_load_path), [this, app_id, app_name, bulk_load_path]() {
ddebug_f("remove app({}) bulk load dir {} succeed", app_name, bulk_load_path);
reset_local_bulk_load_states(app_id, app_name);
});
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::remove_bulk_load_dir_on_remote_storage(std::shared_ptr<app_state> app,
bool set_app_not_bulk_loading)
{
std::string bulk_load_path = get_app_bulk_load_path(app->app_id);
_meta_svc->get_meta_storage()->delete_node_recursively(
std::move(bulk_load_path), [this, app, set_app_not_bulk_loading, bulk_load_path]() {
ddebug_f("remove app({}) bulk load dir {} succeed", app->app_name, bulk_load_path);
reset_local_bulk_load_states(app->app_id, app->app_name);
if (set_app_not_bulk_loading) {
update_app_not_bulk_loading_on_remote_storage(std::move(app));
}
});
}

template <typename T>
inline void erase_map_elem_by_id(int32_t app_id, std::unordered_map<gpid, T> &mymap)
{
for (auto iter = mymap.begin(); iter != mymap.end();) {
if (iter->first.get_app_id() == app_id) {
mymap.erase(iter++);
} else {
iter++;
}
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::string &app_name)
{
zauto_write_lock l(_lock);
_app_bulk_load_info.erase(app_id);
_apps_in_progress_count.erase(app_id);
_apps_pending_sync_flag.erase(app_id);
erase_map_elem_by_id(app_id, _partitions_pending_sync_flag);
erase_map_elem_by_id(app_id, _partitions_bulk_load_state);
erase_map_elem_by_id(app_id, _partition_bulk_load_info);
erase_map_elem_by_id(app_id, _partitions_total_download_progress);
erase_map_elem_by_id(app_id, _partitions_cleaned_up);
// TODO(heyuchen): add other varieties
_bulk_load_app_id.erase(app_id);
ddebug_f("reset local app({}) bulk load context", app_name);
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::update_app_not_bulk_loading_on_remote_storage(
std::shared_ptr<app_state> app)
{
app_info info = *app;
info.__set_is_bulk_loading(false);

blob value = dsn::json::json_forwarder<app_info>::encode(info);
_meta_svc->get_meta_storage()->set_data(
_state->get_app_path(*app), std::move(value), [app, this]() {
zauto_write_lock l(app_lock());
app->is_bulk_loading = false;
ddebug_f("app({}) update app is_bulk_loading to false", app->app_name);
});
}

void bulk_load_service::create_bulk_load_root_dir(error_code &err, task_tracker &tracker)
{
blob value = blob();
Expand Down
17 changes: 17 additions & 0 deletions src/dist/replication/meta_server/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ class bulk_load_service
const std::string &app_name,
const gpid &pid);

void reset_local_bulk_load_states(int32_t app_id, const std::string &app_name);

///
/// update bulk load states to remote storage functions
///
Expand Down Expand Up @@ -188,6 +190,19 @@ class bulk_load_service
bulk_load_status::type new_status,
bool should_send_request);

// called when app is not available or dropped during bulk load, remove bulk load directory on
// remote storage
void remove_bulk_load_dir_on_remote_storage(int32_t app_id, const std::string &app_name);

// called when app is available, remove bulk load directory on remote storage
// if `set_app_not_bulk_loading` = true: call function
// `update_app_not_bulk_loading_on_remote_storage` to set app not bulk_loading after removing
void remove_bulk_load_dir_on_remote_storage(std::shared_ptr<app_state> app,
bool set_app_not_bulk_loading);

// update app's is_bulk_loading to false on remote_storage
void update_app_not_bulk_loading_on_remote_storage(std::shared_ptr<app_state> app);

///
/// sync bulk load states from remote storage
/// called when service initialized or meta server leader switch
Expand Down Expand Up @@ -315,6 +330,8 @@ class bulk_load_service
// partition_index -> group bulk load states(node address -> state)
std::unordered_map<gpid, std::map<rpc_address, partition_bulk_load_state>>
_partitions_bulk_load_state;

std::unordered_map<gpid, bool> _partitions_cleaned_up;
};

} // namespace replication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,20 @@ class bulk_load_process_test : public bulk_load_service_test
_resp.__set_is_group_ingestion_finished(secondary_istatus == ingestion_status::IS_SUCCEED);
}

void mock_response_cleaned_up_flag(bool all_cleaned_up, bulk_load_status::type status)
{
create_basic_response(ERR_OK, status);

partition_bulk_load_state state, state2;
state.__set_is_cleaned_up(true);
_resp.group_bulk_load_state[PRIMARY] = state;
_resp.group_bulk_load_state[SECONDARY1] = state;

state2.__set_is_cleaned_up(all_cleaned_up);
_resp.group_bulk_load_state[SECONDARY2] = state2;
_resp.__set_is_group_bulk_load_context_cleaned_up(all_cleaned_up);
}

void test_on_partition_bulk_load_reply(int32_t in_progress_count,
bulk_load_status::type status,
error_code resp_err = ERR_OK)
Expand Down Expand Up @@ -327,6 +341,22 @@ TEST_F(bulk_load_process_test, normal_succeed)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED);
}

TEST_F(bulk_load_process_test, succeed_not_all_finished)
{
mock_response_cleaned_up_flag(false, bulk_load_status::BLS_SUCCEED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_SUCCEED);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED);
}

TEST_F(bulk_load_process_test, succeed_all_finished)
{
mock_response_cleaned_up_flag(true, bulk_load_status::BLS_SUCCEED);
test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_SUCCEED);
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

// TODO(heyuchen): add half cleanup test while failed

// TODO(heyuchen): add other unit tests for `on_partition_bulk_load_reply`

/// on_partition_ingestion_reply unit tests
Expand Down

0 comments on commit ed267e2

Please sign in to comment.