Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into aio
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Jul 7, 2020
2 parents 19b96dd + 60761bb commit 0b5693e
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 25 deletions.
5 changes: 5 additions & 0 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ error_code replica_bulk_loader::do_bulk_load(const std::string &app_name,
case bulk_load_status::BLS_PAUSING:
pause_bulk_load();
break;
case bulk_load_status::BLS_CANCELED: {
handle_bulk_load_finish(bulk_load_status::BLS_CANCELED);
} break;
// TODO(heyuchen): add other bulk load status
default:
break;
Expand Down Expand Up @@ -600,6 +603,7 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type
report_group_ingestion_status(response);
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_CANCELED:
report_group_cleaned_up(response);
break;
case bulk_load_status::BLS_PAUSING:
Expand Down Expand Up @@ -790,6 +794,7 @@ void replica_bulk_loader::report_bulk_load_states_to_primary(
bulk_load_state.__set_ingest_status(_replica->_app->get_ingestion_status());
break;
case bulk_load_status::BLS_SUCCEED:
case bulk_load_status::BLS_CANCELED:
bulk_load_state.__set_is_cleaned_up(is_cleaned_up());
break;
case bulk_load_status::BLS_PAUSING:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,6 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
// istatus, is_ingestion, create_dir will be used in further tests
// - failed during downloading
// - failed during ingestion
// - cancel during downloaded
// - cancel during ingestion
// - cancel during succeed
struct test_struct
{
bulk_load_status::type local_status;
Expand All @@ -545,7 +542,25 @@ TEST_F(replica_bulk_loader_test, bulk_load_finish_test)
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_SUCCEED,
false}};
false},
{bulk_load_status::BLS_DOWNLOADED,
100,
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_CANCELED,
true},
{bulk_load_status::BLS_INGESTING,
100,
ingestion_status::type::IS_RUNNING,
true,
bulk_load_status::BLS_CANCELED,
true},
{bulk_load_status::BLS_SUCCEED,
100,
ingestion_status::IS_INVALID,
false,
bulk_load_status::BLS_CANCELED,
true}};

for (auto test : tests) {
if (test.create_dir) {
Expand Down
61 changes: 42 additions & 19 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,24 +575,27 @@ void bulk_load_service::handle_bulk_load_finish(const bulk_load_response &respon
const std::string &app_name = response.app_name;
const gpid &pid = response.pid;

dassert_f(
response.__isset.is_group_bulk_load_context_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), primary_status({}), "
"but is_group_bulk_load_context_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status));
if (!response.__isset.is_group_bulk_load_context_cleaned_up) {
dwarn_f("receive bulk load response from node({}) app({}) partition({}), "
"primary_status({}), but is_group_bulk_load_context_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status));
return;
}

for (const auto &kv : response.group_bulk_load_state) {
dassert_f(kv.second.__isset.is_cleaned_up,
"receive bulk load response from node({}) app({}), partition({}), "
"primary_status({}), but node({}) is_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
kv.first.to_string());
if (!kv.second.__isset.is_cleaned_up) {
dwarn_f("receive bulk load response from node({}) app({}), partition({}), "
"primary_status({}), but node({}) is_cleaned_up is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
kv.first.to_string());
return;
}
}

{
Expand Down Expand Up @@ -911,7 +914,8 @@ void bulk_load_service::update_app_status_on_remote_storage_reply(const app_bulk

// TODO(heyuchen): add other status
if (new_status == bulk_load_status::BLS_PAUSING ||
new_status == bulk_load_status::BLS_DOWNLOADING) {
new_status == bulk_load_status::BLS_DOWNLOADING ||
new_status == bulk_load_status::BLS_CANCELED) {
for (int i = 0; i < ainfo.partition_count; ++i) {
update_partition_status_on_remote_storage(
ainfo.app_name, gpid(app_id, i), new_status, should_send_request);
Expand Down Expand Up @@ -1175,9 +1179,28 @@ void bulk_load_service::on_control_bulk_load(control_bulk_load_rpc rpc)
bulk_load_status::BLS_DOWNLOADING,
true));
case bulk_load_control_type::BLC_CANCEL:
// TODO(heyuchen): TBD
if (app_status != bulk_load_status::BLS_DOWNLOADING &&
app_status != bulk_load_status::BLS_PAUSED) {
auto hint_msg = fmt::format("can not cancel bulk load for app({}) with status({})",
app_name,
dsn::enum_to_string(app_status));
derror_f("{}", hint_msg);
response.err = ERR_INVALID_STATE;
response.__set_hint_msg(hint_msg);
return;
}
case bulk_load_control_type::BLC_FORCE_CANCEL:
// TODO(heyuchen): TBD
ddebug_f("app({}) start to {} cancel bulk load, original status = {}",
app_name,
control_type == bulk_load_control_type::BLC_FORCE_CANCEL ? "force" : "",
dsn::enum_to_string(app_status));
tasking::enqueue(LPC_META_STATE_NORMAL,
_meta_svc->tracker(),
std::bind(&bulk_load_service::update_app_status_on_remote_storage_unlocked,
this,
app_id,
bulk_load_status::BLS_CANCELED,
app_status == bulk_load_status::BLS_PAUSED));
default:
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ TEST_F(bulk_load_service_test, control_bulk_load_test)
fail::setup();
fail::cfg("meta_update_app_status_on_remote_storage_unlocked", "return()");

// TODO(heyuchen): add restart/cancel/force_cancel test cases
struct control_test
{
bulk_load_control_type::type type;
Expand All @@ -182,7 +181,11 @@ TEST_F(bulk_load_service_test, control_bulk_load_test)
{bulk_load_control_type::BLC_PAUSE, bulk_load_status::BLS_DOWNLOADING, ERR_OK},
{bulk_load_control_type::BLC_PAUSE, bulk_load_status::BLS_DOWNLOADED, ERR_INVALID_STATE},
{bulk_load_control_type::BLC_RESTART, bulk_load_status::BLS_PAUSED, ERR_OK},
{bulk_load_control_type::BLC_RESTART, bulk_load_status::BLS_PAUSING, ERR_INVALID_STATE}};
{bulk_load_control_type::BLC_RESTART, bulk_load_status::BLS_PAUSING, ERR_INVALID_STATE},
{bulk_load_control_type::BLC_CANCEL, bulk_load_status::BLS_DOWNLOADING, ERR_OK},
{bulk_load_control_type::BLC_CANCEL, bulk_load_status::BLS_PAUSED, ERR_OK},
{bulk_load_control_type::BLC_CANCEL, bulk_load_status::BLS_INGESTING, ERR_INVALID_STATE},
{bulk_load_control_type::BLC_FORCE_CANCEL, bulk_load_status::BLS_SUCCEED, ERR_OK}};

for (auto test : tests) {
ASSERT_EQ(control_bulk_load(app->app_id, test.type, test.app_status), test.expected_err);
Expand Down Expand Up @@ -419,6 +422,20 @@ TEST_F(bulk_load_process_test, succeed_all_finished)
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

TEST_F(bulk_load_process_test, cancel_not_all_finished)
{
mock_response_cleaned_up_flag(false, bulk_load_status::BLS_CANCELED);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_CANCELED);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_CANCELED);
}

TEST_F(bulk_load_process_test, cancel_all_finished)
{
mock_response_cleaned_up_flag(true, bulk_load_status::BLS_CANCELED);
test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_CANCELED);
ASSERT_FALSE(app_is_bulk_loading(APP_NAME));
}

// TODO(heyuchen): add half cleanup test while failed

TEST_F(bulk_load_process_test, pausing)
Expand Down

0 comments on commit 0b5693e

Please sign in to comment.