Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(bulk-load): bulk load download part4 - replica report download status and progress #479

Merged
merged 2 commits into from
May 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 82 additions & 2 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,14 +426,94 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type
bool report_metadata,
/*out*/ bulk_load_response &response)
{
// TODO(heyuchen): TBD
if (status() != partition_status::PS_PRIMARY) {
response.err = ERR_INVALID_STATE;
return;
}

if (report_metadata && !_metadata.files.empty()) {
response.__set_metadata(_metadata);
}

switch (remote_status) {
case bulk_load_status::BLS_DOWNLOADING:
case bulk_load_status::BLS_DOWNLOADED:
report_group_download_progress(response);
break;
// TODO(heyuchen): add other status
default:
break;
}

response.primary_bulk_load_status = _status;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_response &response)
{
if (status() != partition_status::PS_PRIMARY) {
dwarn_replica("replica status={}, should be {}",
enum_to_string(status()),
enum_to_string(partition_status::PS_PRIMARY));
response.err = ERR_INVALID_STATE;
return;
}

partition_bulk_load_state primary_state;
primary_state.__set_download_progress(_download_progress.load());
primary_state.__set_download_status(_download_status.load());
response.group_bulk_load_state[_replica->_primary_states.membership.primary] = primary_state;
ddebug_replica("primary = {}, download progress = {}%, status = {}",
_replica->_primary_states.membership.primary.to_string(),
primary_state.download_progress,
primary_state.download_status);

int32_t total_progress = primary_state.download_progress;
for (const auto &target_address : _replica->_primary_states.membership.secondaries) {
const auto &secondary_state =
_replica->_primary_states.secondary_bulk_load_states[target_address];
int32_t s_progress =
secondary_state.__isset.download_progress ? secondary_state.download_progress : 0;
error_code s_status =
secondary_state.__isset.download_status ? secondary_state.download_status : ERR_OK;
ddebug_replica("secondary = {}, download progress = {}%, status={}",
target_address.to_string(),
s_progress,
s_status);
response.group_bulk_load_state[target_address] = secondary_state;
total_progress += s_progress;
}

total_progress /= _replica->_primary_states.membership.max_replica_count;
ddebug_replica("total download progress = {}%", total_progress);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use float type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

total_progress will report to meta server in bulk_load_response, defined in thrift, it seems that we don't use float in thrift before, and I think it is okay to define it as integer.

response.__set_total_download_progress(total_progress);
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::report_bulk_load_states_to_primary(
bulk_load_status::type remote_status,
/*out*/ group_bulk_load_response &response)
{
// TODO(heyuchen): TBD
if (status() != partition_status::PS_SECONDARY) {
response.err = ERR_INVALID_STATE;
return;
}

partition_bulk_load_state bulk_load_state;
auto local_status = _status;
switch (remote_status) {
case bulk_load_status::BLS_DOWNLOADING:
case bulk_load_status::BLS_DOWNLOADED:
bulk_load_state.__set_download_progress(_download_progress.load());
bulk_load_state.__set_download_status(_download_status.load());
Comment on lines +505 to +508
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always report the two type of status, so why separate to two types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that why not combine download progress and status into one structure? I used to define a structure called download_states before, but I finally separate it, it will make code more complex, besides, download_progress and download_status is more clearer than download_states.

break;
// TODO(heyuchen): add other status
default:
break;
}

response.status = local_status;
response.bulk_load_state = bulk_load_state;
}

} // namespace replication
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class replica_bulk_loader : replica_base
void report_bulk_load_states_to_meta(bulk_load_status::type remote_status,
bool report_metadata,
/*out*/ bulk_load_response &response);
void report_group_download_progress(/*out*/ bulk_load_response &response);

void report_bulk_load_states_to_primary(bulk_load_status::type remote_status,
/*out*/ group_bulk_load_response &response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ class replica_bulk_loader_test : public replica_test_base
return _bulk_loader->verify_file(f_meta, LOCAL_DIR);
}

int32_t test_report_group_download_progress(bulk_load_status::type status,
int32_t p_progress,
int32_t s1_progress,
int32_t s2_progress)
{
mock_group_progress(status, p_progress, s1_progress, s2_progress);
bulk_load_response response;
_bulk_loader->report_group_download_progress(response);
return response.total_download_progress;
}

/// mock structure functions

void
Expand Down Expand Up @@ -105,14 +116,14 @@ class replica_bulk_loader_test : public replica_test_base
void mock_primary_states()
{
mock_replica_config(partition_status::PS_PRIMARY);
primary_context p_context = _replica->get_primary_context();
partition_configuration &config = p_context.membership;
partition_configuration config;
config.max_replica_count = 3;
config.pid = PID;
config.ballot = BALLOT;
config.primary = PRIMARY;
config.secondaries.emplace_back(SECONDARY);
config.secondaries.emplace_back(SECONDARY2);
_replica->set_primary_partition_configuration(config);
}

void create_local_file(const std::string &file_name)
Expand Down Expand Up @@ -178,6 +189,51 @@ class replica_bulk_loader_test : public replica_test_base
return true;
}

void mock_replica_bulk_load_varieties(bulk_load_status::type status,
int32_t download_progress,
ingestion_status::type istatus,
bool is_ingestion = false)
{
_bulk_loader->_status = status;
_bulk_loader->_download_progress = download_progress;
// TODO(heyuchen): add ingestion status
}

void mock_secondary_progress(int32_t secondary_progress1, int32_t secondary_progress2)
{
mock_primary_states();
partition_bulk_load_state state1, state2;
state1.__set_download_status(ERR_OK);
state1.__set_download_progress(secondary_progress1);
state2.__set_download_status(ERR_OK);
state2.__set_download_progress(secondary_progress2);
_replica->set_secondary_bulk_load_state(SECONDARY, state1);
_replica->set_secondary_bulk_load_state(SECONDARY2, state2);
}

void mock_group_progress(bulk_load_status::type p_status,
int32_t p_progress,
int32_t s1_progress,
int32_t s2_progress)
{
if (p_status == bulk_load_status::BLS_INVALID) {
p_progress = 0;
} else if (p_status == bulk_load_status::BLS_DOWNLOADED) {
p_progress = 100;
}
mock_replica_bulk_load_varieties(p_status, p_progress, ingestion_status::IS_INVALID);
mock_secondary_progress(s1_progress, s2_progress);
}

void mock_group_progress(bulk_load_status::type p_status)
{
if (p_status == bulk_load_status::BLS_INVALID) {
mock_group_progress(p_status, 0, 0, 0);
} else if (p_status == bulk_load_status::BLS_DOWNLOADED) {
mock_group_progress(p_status, 100, 100, 100);
}
}

// helper functions
bulk_load_status::type get_bulk_load_status() const { return _bulk_loader->_status; }

Expand Down Expand Up @@ -328,5 +384,30 @@ TEST_F(replica_bulk_loader_test, verify_file_succeed)
utils::filesystem::remove_path(LOCAL_DIR);
}

// report_group_download_progress unit tests
TEST_F(replica_bulk_loader_test, report_group_download_progress_test)
{
struct test_struct
{
bulk_load_status::type primary_status;
int32_t primary_progress;
int32_t secondary1_progress;
int32_t secondary2_progress;
int32_t total_progress;
} tests[]{
{bulk_load_status::BLS_DOWNLOADING, 10, 10, 10, 10},
{bulk_load_status::BLS_DOWNLOADED, 100, 0, 0, 33},
{bulk_load_status::BLS_DOWNLOADED, 100, 100, 100, 100},
};

for (auto test : tests) {
ASSERT_EQ(test_report_group_download_progress(test.primary_status,
test.primary_progress,
test.secondary1_progress,
test.secondary2_progress),
test.total_progress);
}
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ void primary_context::cleanup_bulk_load_states()
// TODO(heyuchen): TBD
// primary will save bulk load states reported from secondaries, this function is to cleanup
// those states
secondary_bulk_load_states.erase(secondary_bulk_load_states.begin(),
secondary_bulk_load_states.end());
}

bool secondary_context::cleanup(bool force)
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ class primary_context
// Used for bulk load
// group bulk_load response tasks of RPC_GROUP_BULK_LOAD for each secondary replica
node_tasks group_bulk_load_pending_replies;
// bulk_load_state of secondary replicas
std::unordered_map<rpc_address, partition_bulk_load_state> secondary_bulk_load_states;
};

class secondary_context
Expand Down
11 changes: 9 additions & 2 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,15 @@ class mock_replica : public replica
void prepare_list_commit_hard(decree d) { _prepare_list->commit(d, COMMIT_TO_DECREE_HARD); }
decree get_app_last_committed_decree() { return _app->last_committed_decree(); }
void set_app_last_committed_decree(decree d) { _app->_last_committed_decree = d; }
primary_context get_primary_context() { return _primary_states; }
void set_primary_context(primary_context context) { _primary_states = context; }
void set_primary_partition_configuration(partition_configuration &pconfig)
{
_primary_states.membership = pconfig;
}
void set_secondary_bulk_load_state(const rpc_address &node,
const partition_bulk_load_state &state)
{
_primary_states.secondary_bulk_load_states[node] = state;
}

private:
decree _max_gced_decree{invalid_decree - 1};
Expand Down