Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
handle bulk load failed and app unavailable (#532)
Browse files Browse the repository at this point in the history
  • Loading branch information
hycdong authored Jul 7, 2020
1 parent 60761bb commit aa37a2b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 21 deletions.
13 changes: 8 additions & 5 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,13 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name,
case bulk_load_status::BLS_PAUSING:
pause_bulk_load();
break;
case bulk_load_status::BLS_CANCELED: {
case bulk_load_status::BLS_CANCELED:
handle_bulk_load_finish(bulk_load_status::BLS_CANCELED);
} break;
// TODO(heyuchen): add other bulk load status
break;
case bulk_load_status::BLS_FAILED:
handle_bulk_load_finish(bulk_load_status::BLS_FAILED);
// TODO(heyuchen): add perf-counter here
break;
default:
break;
}
Expand Down Expand Up @@ -604,12 +607,12 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_CANCELED:
case bulk_load_status::BLS_FAILED:
report_group_cleaned_up(response);
break;
case bulk_load_status::BLS_PAUSING:
report_group_is_paused(response);
break;
// TODO(heyuchen): add other status
default:
break;
}
Expand Down Expand Up @@ -795,12 +798,12 @@ void replica_bulk_loader::report_bulk_load_states_to_primary(
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_CANCELED:
case bulk_load_status::BLS_FAILED:
bulk_load_state.__set_is_cleaned_up(is_cleaned_up());
break;
case bulk_load_status::BLS_PAUSING:
bulk_load_state.__set_is_paused(local_status == bulk_load_status::BLS_PAUSED);
break;
// TODO(heyuchen): add other status
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,13 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
// Test cases
// - bulk load succeed
// - double bulk load finish
// TODO(heyuchen): add following cases
// istatus, is_ingestion, create_dir will be used in further tests
// - cancel during downloaded
// - cancel during ingestion
// - cancel during succeed
// - failed during downloading
// - failed during ingestion
// Tip: bulk load dir will be removed if bulk load finished, so we should create dir before some
// cases
struct test_struct
{
bulk_load_status::type local_status;
Expand Down Expand Up @@ -560,6 +563,18 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_CANCELED,
true},
{bulk_load_status::BLS_DOWNLOADING,
10,
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_FAILED,
true},
{bulk_load_status::BLS_INGESTING,
100,
ingestion_status::type::IS_FAILED,
false,
bulk_load_status::BLS_FAILED,
true}};

for (auto test : tests) {
Expand Down
22 changes: 14 additions & 8 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,15 +712,21 @@ void bulk_load_service::try_rollback_to_downloading(const std::string &app_name,
// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_bulk_load_failed(int32_t app_id)
{
// TODO(heyuchen): TBD
// replica meets serious error during bulk load, such as file on remote storage is damaged
// should stop bulk load process, set bulk load failed
zauto_write_lock l(_lock);
if (!_apps_cleaning_up[app_id]) {
_apps_cleaning_up[app_id] = true;
update_app_status_on_remote_storage_unlocked(app_id, bulk_load_status::BLS_FAILED);
}
}

// ThreadPool: THREAD_POOL_META_STATE
void bulk_load_service::handle_app_unavailable(int32_t app_id, const std::string &app_name)
{
// TODO(heyuchen): TBD
zauto_write_lock l(_lock);
if (is_app_bulk_loading_unlocked(app_id) && !_apps_cleaning_up[app_id]) {
_apps_cleaning_up[app_id] = true;
remove_bulk_load_dir_on_remote_storage(app_id, app_name);
}
}

// ThreadPool: THREAD_POOL_META_STATE
Expand Down Expand Up @@ -799,7 +805,6 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply(
dsn::enum_to_string(old_status),
dsn::enum_to_string(new_status));

// TODO(heyuchen): add other status
switch (new_status) {
case bulk_load_status::BLS_DOWNLOADED:
case bulk_load_status::BLS_INGESTING:
Expand All @@ -822,6 +827,7 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply(
}
break;
default:
// do nothing in other status
break;
}
}
Expand Down Expand Up @@ -912,10 +918,10 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk
}
}

// TODO(heyuchen): add other status
if (new_status == bulk_load_status::BLS_PAUSING ||
new_status == bulk_load_status::BLS_DOWNLOADING ||
new_status == bulk_load_status::BLS_CANCELED) {
new_status == bulk_load_status::BLS_CANCELED ||
new_status == bulk_load_status::BLS_FAILED) {
for (int i = 0; i < ainfo.partition_count; ++i) {
update_partition_status_on_remote_storage(
ainfo.app_name, gpid(app_id, i), new_status, should_send_request);
Expand Down Expand Up @@ -1088,7 +1094,7 @@ void bulk_load_service::reset_local_bulk_load_states(int32_t app_id, const std::
erase_map_elem_by_id(app_id, _partition_bulk_load_info);
erase_map_elem_by_id(app_id, _partitions_total_download_progress);
erase_map_elem_by_id(app_id, _partitions_cleaned_up);
// TODO(heyuchen): add other varieties
_apps_cleaning_up.erase(app_id);
_bulk_load_app_id.erase(app_id);
ddebug_f("reset local app({}) bulk load context", app_name);
}
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/meta_server/meta_bulk_load_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ class bulk_load_service
_partitions_bulk_load_state;

std::unordered_map<gpid, bool> _partitions_cleaned_up;
// Used for bulk load failed and app unavailable to avoid duplicated clean up
std::unordered_map<app_id, bool> _apps_cleaning_up;
};

} // namespace replication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,12 @@ class bulk_load_process_test : public bulk_load_service_test

/// on_partition_bulk_load_reply unit tests

// TODO(heyuchen):
// add `downloading_fs_error` unit tests after implement function `handle_bulk_load_failed`
// add `downloading_corrupt` unit tests after implement function `handle_bulk_load_failed`
TEST_F(bulk_load_process_test, downloading_fs_error)
{
test_on_partition_bulk_load_reply(
_partition_count, bulk_load_status::BLS_DOWNLOADING, ERR_FS_INTERNAL);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, downloading_busy)
{
Expand All @@ -361,6 +364,13 @@ TEST_F(bulk_load_process_test, downloading_busy)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_DOWNLOADING);
}

TEST_F(bulk_load_process_test, downloading_corrupt)
{
mock_response_progress(ERR_CORRUPTION, false);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_DOWNLOADING);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, downloading_report_metadata)
{
mock_response_bulk_load_metadata();
Expand Down Expand Up @@ -399,7 +409,12 @@ TEST_F(bulk_load_process_test, ingestion_running)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING);
}

// TODO(heyuchen): add ingestion_error unit tests after implement function `handle_app_failed`
TEST_F(bulk_load_process_test, ingestion_error)
{
mock_response_ingestion_status(ingestion_status::IS_FAILED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_INGESTING);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, normal_succeed)
{
Expand Down Expand Up @@ -436,7 +451,19 @@ TEST_F(bulk_load_process_test, cancel_all_finished)
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

// TODO(heyuchen): add half cleanup test while failed
TEST_F(bulk_load_process_test, failed_not_all_finished)
{
mock_response_cleaned_up_flag(false, bulk_load_status::BLS_FAILED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_FAILED);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, failed_all_finished)
{
mock_response_cleaned_up_flag(true, bulk_load_status::BLS_FAILED);
test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_FAILED);
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

TEST_F(bulk_load_process_test, pausing)
{
Expand All @@ -457,7 +484,6 @@ TEST_F(bulk_load_process_test, pause_succeed)
/// on_partition_ingestion_reply unit tests
// TODO(heyuchen):
// add ingest_rpc_error unit tests after implement function `rollback_downloading`
// add ingest_wrong unit tests after implement function `handle_app_failed`

TEST_F(bulk_load_process_test, ingest_empty_write_error)
{
Expand All @@ -467,6 +493,14 @@ TEST_F(bulk_load_process_test, ingest_empty_write_error)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING);
}

TEST_F(bulk_load_process_test, ingest_wrong)
{
mock_ingestion_context(ERR_OK, 1, _partition_count);
test_on_partition_ingestion_reply(_ingestion_resp, gpid(_app_id, _pidx));
wait_all();
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_FAILED);
}

TEST_F(bulk_load_process_test, ingest_succeed)
{
mock_ingestion_context(ERR_OK, 0, 1);
Expand Down

0 comments on commit aa37a2b

Please sign in to comment.