Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): cancel bulk load #531

Merged
merged 2 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name,
case bulk_load_status::BLS_PAUSING:
pause_bulk_load();
break;
case bulk_load_status::BLS_CANCELED: {
handle_bulk_load_finish(bulk_load_status::BLS_CANCELED);
} break;
// TODO(heyuchen): add other bulk load status
default:
break;
Expand Down Expand Up @@ -600,6 +603,7 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type
report_group_ingestion_status(response);
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_CANCELED:
report_group_cleaned_up(response);
break;
case bulk_load_status::BLS_PAUSING:
Expand Down Expand Up @@ -790,6 +794,7 @@ void replica_bulk_loader::report_bulk_load_states_to_primary(
bulk_load_state.__set_ingest_status(_replica->_app->get_ingestion_status());
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_CANCELED:
bulk_load_state.__set_is_cleaned_up(is_cleaned_up());
break;
case bulk_load_status::BLS_PAUSING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,6 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
// istatus, is_ingestion, create_dir will be used in further tests
// - failed during downloading
// - failed during ingestion
// - cancel during downloaded
// - cancel during ingestion
// - cancel during succeed
struct test_struct
{
bulk_load_status::type local_status;
Expand All @@ -545,7 +542,25 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_SUCCEED,
false}};
false},
{bulk_load_status::BLS_DOWNLOADED,
100,
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_CANCELED,
true},
{bulk_load_status::BLS_INGESTING,
100,
ingestion_status::type::IS_RUNNING,
true,
bulk_load_status::BLS_CANCELED,
true},
{bulk_load_status::BLS_SUCCEED,
100,
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_CANCELED,
true}};

for (auto test : tests) {
if (test.create_dir) {
Expand Down
61 changes: 42 additions & 19 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,24 +575,27 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon
const std::string &app_name = response.app_name;
const gpid &pid = response.pid;

dassert_f(
response.__isset.is_group_bulk_load_context_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), primary_status({}), "
"but is_group_bulk_load_context_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status));
if (!response.__isset.is_group_bulk_load_context_cleaned_up) {
dwarn_f("receive bulk load response from node({}) app({}) partition({}), "
"primary_status({}), but is_group_bulk_load_context_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status));
return;
}

for (const auto &kv : response.group_bulk_load_state) {
dassert_f(kv.second.__isset.is_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), "
"primary_status({}), but node({}) is_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
kv.first.to_string());
if (!kv.second.__isset.is_cleaned_up) {
dwarn_f("receive bulk load response from node({}) app({}), partition({}), "
"primary_status({}), but node({}) is_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
kv.first.to_string());
return;
}
}

{
Expand Down Expand Up @@ -911,7 +914,8 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk

// TODO(heyuchen): add other status
if (new_status == bulk_load_status::BLS_PAUSING ||
new_status == bulk_load_status::BLS_DOWNLOADING) {
new_status == bulk_load_status::BLS_DOWNLOADING ||
new_status == bulk_load_status::BLS_CANCELED) {
for (int i = 0; i < ainfo.partition_count; ++i) {
update_partition_status_on_remote_storage(
ainfo.app_name, gpid(app_id, i), new_status, should_send_request);
Expand Down Expand Up @@ -1175,9 +1179,28 @@ void bulk_load_service::on_control_bulk_load(control_bulk_load_rpc rpc)
bulk_load_status::BLS_DOWNLOADING,
true));
case bulk_load_control_type::BLC_CANCEL:
// TODO(heyuchen): TBD
if (app_status != bulk_load_status::BLS_DOWNLOADING &&
app_status != bulk_load_status::BLS_PAUSED) {
auto hint_msg = fmt::format("can not cancel bulk load for app({}) with status({})",
app_name,
dsn::enum_to_string(app_status));
derror_f("{}", hint_msg);
response.err = ERR_INVALID_STATE;
response.__set_hint_msg(hint_msg);
return;
}
case bulk_load_control_type::BLC_FORCE_CANCEL:
// TODO(heyuchen): TBD
ddebug_f("app({}) start to {} cancel bulk load, original status = {}",
app_name,
control_type == bulk_load_control_type::BLC_FORCE_CANCEL ? "force" : "",
dsn::enum_to_string(app_status));
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::update_app_status_on_remote_storage_unlocked,
this,
app_id,
bulk_load_status::BLS_CANCELED,
app_status == bulk_load_status::BLS_PAUSED));
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ TEST_F(bulk_load_service_test, control_bulk_load_test)
fail::setup();
fail::cfg("meta_update_app_status_on_remote_storage_unlocked", "return()");

// TODO(heyuchen): add restart/cancel/force_cancel test cases
struct control_test
{
bulk_load_control_type::type type;
Expand All @@ -182,7 +181,11 @@ TEST_F(bulk_load_service_test, control_bulk_load_test)
{bulk_load_control_type::BLC_PAUSE, bulk_load_status::BLS_DOWNLOADING, ERR_OK},
{bulk_load_control_type::BLC_PAUSE, bulk_load_status::BLS_DOWNLOADED, ERR_INVALID_STATE},
{bulk_load_control_type::BLC_RESTART, bulk_load_status::BLS_PAUSED, ERR_OK},
{bulk_load_control_type::BLC_RESTART, bulk_load_status::BLS_PAUSING, ERR_INVALID_STATE}};
{bulk_load_control_type::BLC_RESTART, bulk_load_status::BLS_PAUSING, ERR_INVALID_STATE},
{bulk_load_control_type::BLC_CANCEL, bulk_load_status::BLS_DOWNLOADING, ERR_OK},
{bulk_load_control_type::BLC_CANCEL, bulk_load_status::BLS_PAUSED, ERR_OK},
{bulk_load_control_type::BLC_CANCEL, bulk_load_status::BLS_INGESTING, ERR_INVALID_STATE},
{bulk_load_control_type::BLC_FORCE_CANCEL, bulk_load_status::BLS_SUCCEED, ERR_OK}};

for (auto test : tests) {
ASSERT_EQ(control_bulk_load(app->app_id, test.type, test.app_status), test.expected_err);
Expand Down Expand Up @@ -419,6 +422,20 @@ TEST_F(bulk_load_process_test, succeed_all_finished)
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

TEST_F(bulk_load_process_test, cancel_not_all_finished)
{
mock_response_cleaned_up_flag(false, bulk_load_status::BLS_CANCELED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_CANCELED);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_CANCELED);
}

TEST_F(bulk_load_process_test, cancel_all_finished)
{
mock_response_cleaned_up_flag(true, bulk_load_status::BLS_CANCELED);
test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_CANCELED);
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

// TODO(heyuchen): add half cleanup test while failed

TEST_F(bulk_load_process_test, pausing)
Expand Down