Skip to content

Commit

Permalink
feat(bulk-load): bulk load ingestion part6 - meta handle bulk_load_re…
Browse files Browse the repository at this point in the history
…sponse during ingestion (#500)
  • Loading branch information
hycdong authored Jun 16, 2020
1 parent 6f55b21 commit 11e1bec
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 12 deletions.
12 changes: 6 additions & 6 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,10 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
_replica->_primary_states.membership.primary.to_string(),
enum_to_string(primary_state.ingest_status));

bool is_group_ingestion_finish = primary_state.ingest_status == ingestion_status::IS_SUCCEED;
bool is_group_ingestion_finish =
(primary_state.ingest_status == ingestion_status::IS_SUCCEED) &&
(_replica->_primary_states.membership.secondaries.size() + 1 ==
_replica->_primary_states.membership.max_replica_count);
for (const auto &target_address : _replica->_primary_states.membership.secondaries) {
const auto &secondary_state =
_replica->_primary_states.secondary_bulk_load_states[target_address];
Expand All @@ -562,12 +565,9 @@ void replica_bulk_loader::report_group_ingestion_status(/*out*/ bulk_load_respon
target_address.to_string(),
enum_to_string(ingest_status));
response.group_bulk_load_state[target_address] = secondary_state;
is_group_ingestion_finish =
is_group_ingestion_finish && (ingest_status == ingestion_status::IS_SUCCEED);
is_group_ingestion_finish &= (ingest_status == ingestion_status::IS_SUCCEED);
}
response.__set_is_group_ingestion_finished(
is_group_ingestion_finish && (_replica->_primary_states.membership.secondaries.size() + 1 ==
_replica->_primary_states.membership.max_replica_count));
response.__set_is_group_ingestion_finished(is_group_ingestion_finish);

// if group ingestion finish, recover wirte immediately
if (is_group_ingestion_finish) {
Expand Down
63 changes: 57 additions & 6 deletions src/dist/replication/meta_server/meta_bulk_load_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ void bulk_load_service::on_partition_bulk_load_reply(error_code err,
int32_t interval = bulk_load_constant::BULK_LOAD_REQUEST_INTERVAL;

if (err != ERR_OK) {
derror_f("app({}), partition({}) failed to recevie bulk load response, error = {}",
derror_f("app({}), partition({}) failed to receive bulk load response, error = {}",
app_name,
pid,
err.to_string());
Expand Down Expand Up @@ -447,7 +447,7 @@ void bulk_load_service::handle_app_downloading(const bulk_load_response &respons

if (!response.__isset.total_download_progress) {
dwarn_f(
"recevie bulk load response from node({}) app({}), partition({}), primary_status({}), "
"receive bulk load response from node({}) app({}), partition({}), primary_status({}), "
"but total_download_progress is not set",
primary_addr.to_string(),
app_name,
Expand All @@ -460,7 +460,7 @@ void bulk_load_service::handle_app_downloading(const bulk_load_response &respons
const auto &bulk_load_states = kv.second;
if (!bulk_load_states.__isset.download_progress ||
!bulk_load_states.__isset.download_status) {
dwarn_f("recevie bulk load response from node({}) app({}), partition({}), "
dwarn_f("receive bulk load response from node({}) app({}), partition({}), "
"primary_status({}), but node({}) progress or status is not set",
primary_addr.to_string(),
app_name,
Expand Down Expand Up @@ -489,7 +489,7 @@ void bulk_load_service::handle_app_downloading(const bulk_load_response &respons

// update download progress
int32_t total_progress = response.total_download_progress;
ddebug_f("recevie bulk load response from node({}) app({}) partition({}), primary_status({}), "
ddebug_f("receive bulk load response from node({}) app({}) partition({}), primary_status({}), "
"total_download_progress = {}",
primary_addr.to_string(),
app_name,
Expand All @@ -514,8 +514,58 @@ void bulk_load_service::handle_app_downloading(const bulk_load_response &respons
void bulk_load_service::handle_app_ingestion(const bulk_load_response &response,
const rpc_address &primary_addr)
{
// TODO(heyuchen): TBD
// called by `on_partition_bulk_load_reply` when app status is ingesting
const std::string &app_name = response.app_name;
const gpid &pid = response.pid;

if (!response.__isset.is_group_ingestion_finished) {
dwarn_f("receive bulk load response from node({}) app({}) partition({}), "
"primary_status({}), but is_group_ingestion_finished is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status));
return;
}

for (const auto &kv : response.group_bulk_load_state) {
const auto &bulk_load_states = kv.second;
if (!bulk_load_states.__isset.ingest_status) {
dwarn_f("receive bulk load response from node({}) app({}) partition({}), "
"primary_status({}), but node({}) ingestion_status is not set",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
kv.first.to_string());
return;
}

if (bulk_load_states.ingest_status == ingestion_status::IS_FAILED) {
derror_f("app({}) partition({}) on node({}) ingestion failed",
app_name,
pid,
kv.first.to_string());
handle_bulk_load_failed(pid.get_app_id());
return;
}
}

ddebug_f("receive bulk load response from node({}) app({}) partition({}), primary_status({}), "
"is_group_ingestion_finished = {}",
primary_addr.to_string(),
app_name,
pid,
dsn::enum_to_string(response.primary_bulk_load_status),
response.is_group_ingestion_finished);
{
zauto_write_lock l(_lock);
_partitions_bulk_load_state[pid] = response.group_bulk_load_state;
}

if (response.is_group_ingestion_finished) {
ddebug_f("app({}) partition({}) ingestion files succeed", app_name, pid);
update_partition_status_on_remote_storage(app_name, pid, bulk_load_status::BLS_SUCCEED);
}
}

// ThreadPool: THREAD_POOL_META_STATE
Expand Down Expand Up @@ -635,6 +685,7 @@ void bulk_load_service::update_partition_status_on_remote_storage_reply(
switch (new_status) {
case bulk_load_status::BLS_DOWNLOADED:
case bulk_load_status::BLS_INGESTING:
case bulk_load_status::BLS_SUCCEED:
if (old_status != new_status && --_apps_in_progress_count[pid.get_app_id()] == 0) {
update_app_status_on_remote_storage_unlocked(pid.get_app_id(), new_status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,20 @@ class bulk_load_process_test : public bulk_load_service_test
_resp.__set_metadata(metadata);
}

void mock_response_ingestion_status(ingestion_status::type secondary_istatus)
{
create_basic_response(ERR_OK, bulk_load_status::BLS_INGESTING);

partition_bulk_load_state state, state2;
state.__set_ingest_status(ingestion_status::IS_SUCCEED);
state2.__set_ingest_status(secondary_istatus);

_resp.group_bulk_load_state[PRIMARY] = state;
_resp.group_bulk_load_state[SECONDARY1] = state;
_resp.group_bulk_load_state[SECONDARY2] = state2;
_resp.__set_is_group_ingestion_finished(secondary_istatus == ingestion_status::IS_SUCCEED);
}

void test_on_partition_bulk_load_reply(int32_t in_progress_count,
bulk_load_status::type status,
error_code resp_err = ERR_OK)
Expand Down Expand Up @@ -297,6 +311,22 @@ TEST_F(bulk_load_process_test, start_ingesting)
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING);
}

TEST_F(bulk_load_process_test, ingestion_running)
{
mock_response_ingestion_status(ingestion_status::IS_RUNNING);
test_on_partition_bulk_load_reply(_partition_count, bulk_load_status::BLS_INGESTING);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_INGESTING);
}

// TODO(heyuchen): add ingestion_error unit tests after implement function `handle_app_failed`

TEST_F(bulk_load_process_test, normal_succeed)
{
mock_response_ingestion_status(ingestion_status::IS_SUCCEED);
test_on_partition_bulk_load_reply(1, bulk_load_status::BLS_INGESTING);
ASSERT_EQ(get_app_bulk_load_status(_app_id), bulk_load_status::BLS_SUCCEED);
}

// TODO(heyuchen): add other unit tests for `on_partition_bulk_load_reply`

/// on_partition_ingestion_reply unit tests
Expand Down

0 comments on commit 11e1bec

Please sign in to comment.