Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(bulk-load): bulk load download part4 - replica report download s…
Browse files Browse the repository at this point in the history
…tatus and progress (#479)
  • Loading branch information
hycdong authored May 29, 2020
1 parent 88dd590 commit c682bf1
Show file tree
Hide file tree
Showing 6 changed files with 180 additions and 6 deletions.
84 changes: 82 additions & 2 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -426,14 +426,94 @@ void replica_bulk_loader::report_bulk_load_states_to_meta(bulk_load_status::type
bool report_metadata,
/*out*/ bulk_load_response &response)
{
// TODO(heyuchen): TBD
if (status() != partition_status::PS_PRIMARY) {
response.err = ERR_INVALID_STATE;
return;
}

if (report_metadata && !_metadata.files.empty()) {
response.__set_metadata(_metadata);
}

switch (remote_status) {
case bulk_load_status::BLS_DOWNLOADING:
case bulk_load_status::BLS_DOWNLOADED:
report_group_download_progress(response);
break;
// TODO(heyuchen): add other status
default:
break;
}

response.primary_bulk_load_status = _status;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::report_group_download_progress(/*out*/ bulk_load_response &response)
{
if (status() != partition_status::PS_PRIMARY) {
dwarn_replica("replica status={}, should be {}",
enum_to_string(status()),
enum_to_string(partition_status::PS_PRIMARY));
response.err = ERR_INVALID_STATE;
return;
}

partition_bulk_load_state primary_state;
primary_state.__set_download_progress(_download_progress.load());
primary_state.__set_download_status(_download_status.load());
response.group_bulk_load_state[_replica->_primary_states.membership.primary] = primary_state;
ddebug_replica("primary = {}, download progress = {}%, status = {}",
_replica->_primary_states.membership.primary.to_string(),
primary_state.download_progress,
primary_state.download_status);

int32_t total_progress = primary_state.download_progress;
for (const auto &target_address : _replica->_primary_states.membership.secondaries) {
const auto &secondary_state =
_replica->_primary_states.secondary_bulk_load_states[target_address];
int32_t s_progress =
secondary_state.__isset.download_progress ? secondary_state.download_progress : 0;
error_code s_status =
secondary_state.__isset.download_status ? secondary_state.download_status : ERR_OK;
ddebug_replica("secondary = {}, download progress = {}%, status={}",
target_address.to_string(),
s_progress,
s_status);
response.group_bulk_load_state[target_address] = secondary_state;
total_progress += s_progress;
}

total_progress /= _replica->_primary_states.membership.max_replica_count;
ddebug_replica("total download progress = {}%", total_progress);
response.__set_total_download_progress(total_progress);
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::report_bulk_load_states_to_primary(
bulk_load_status::type remote_status,
/*out*/ group_bulk_load_response &response)
{
// TODO(heyuchen): TBD
if (status() != partition_status::PS_SECONDARY) {
response.err = ERR_INVALID_STATE;
return;
}

partition_bulk_load_state bulk_load_state;
auto local_status = _status;
switch (remote_status) {
case bulk_load_status::BLS_DOWNLOADING:
case bulk_load_status::BLS_DOWNLOADED:
bulk_load_state.__set_download_progress(_download_progress.load());
bulk_load_state.__set_download_status(_download_status.load());
break;
// TODO(heyuchen): add other status
default:
break;
}

response.status = local_status;
response.bulk_load_state = bulk_load_state;
}

} // namespace replication
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class replica_bulk_loader : replica_base
void report_bulk_load_states_to_meta(bulk_load_status::type remote_status,
bool report_metadata,
/*out*/ bulk_load_response &response);
void report_group_download_progress(/*out*/ bulk_load_response &response);

void report_bulk_load_states_to_primary(bulk_load_status::type remote_status,
/*out*/ group_bulk_load_response &response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ class replica_bulk_loader_test : public replica_test_base
return _bulk_loader->verify_file(f_meta, LOCAL_DIR);
}

int32_t test_report_group_download_progress(bulk_load_status::type status,
int32_t p_progress,
int32_t s1_progress,
int32_t s2_progress)
{
mock_group_progress(status, p_progress, s1_progress, s2_progress);
bulk_load_response response;
_bulk_loader->report_group_download_progress(response);
return response.total_download_progress;
}

/// mock structure functions

void
Expand Down Expand Up @@ -105,14 +116,14 @@ class replica_bulk_loader_test : public replica_test_base
void mock_primary_states()
{
mock_replica_config(partition_status::PS_PRIMARY);
primary_context p_context = _replica->get_primary_context();
partition_configuration &config = p_context.membership;
partition_configuration config;
config.max_replica_count = 3;
config.pid = PID;
config.ballot = BALLOT;
config.primary = PRIMARY;
config.secondaries.emplace_back(SECONDARY);
config.secondaries.emplace_back(SECONDARY2);
_replica->set_primary_partition_configuration(config);
}

void create_local_file(const std::string &file_name)
Expand Down Expand Up @@ -178,6 +189,51 @@ class replica_bulk_loader_test : public replica_test_base
return true;
}

void mock_replica_bulk_load_varieties(bulk_load_status::type status,
int32_t download_progress,
ingestion_status::type istatus,
bool is_ingestion = false)
{
_bulk_loader->_status = status;
_bulk_loader->_download_progress = download_progress;
// TODO(heyuchen): add ingestion status
}

void mock_secondary_progress(int32_t secondary_progress1, int32_t secondary_progress2)
{
mock_primary_states();
partition_bulk_load_state state1, state2;
state1.__set_download_status(ERR_OK);
state1.__set_download_progress(secondary_progress1);
state2.__set_download_status(ERR_OK);
state2.__set_download_progress(secondary_progress2);
_replica->set_secondary_bulk_load_state(SECONDARY, state1);
_replica->set_secondary_bulk_load_state(SECONDARY2, state2);
}

void mock_group_progress(bulk_load_status::type p_status,
int32_t p_progress,
int32_t s1_progress,
int32_t s2_progress)
{
if (p_status == bulk_load_status::BLS_INVALID) {
p_progress = 0;
} else if (p_status == bulk_load_status::BLS_DOWNLOADED) {
p_progress = 100;
}
mock_replica_bulk_load_varieties(p_status, p_progress, ingestion_status::IS_INVALID);
mock_secondary_progress(s1_progress, s2_progress);
}

void mock_group_progress(bulk_load_status::type p_status)
{
if (p_status == bulk_load_status::BLS_INVALID) {
mock_group_progress(p_status, 0, 0, 0);
} else if (p_status == bulk_load_status::BLS_DOWNLOADED) {
mock_group_progress(p_status, 100, 100, 100);
}
}

// helper functions
bulk_load_status::type get_bulk_load_status() const { return _bulk_loader->_status; }

Expand Down Expand Up @@ -328,5 +384,30 @@ TEST_F(replica_bulk_loader_test, verify_file_succeed)
utils::filesystem::remove_path(LOCAL_DIR);
}

// report_group_download_progress unit tests
TEST_F(replica_bulk_loader_test, report_group_download_progress_test)
{
struct test_struct
{
bulk_load_status::type primary_status;
int32_t primary_progress;
int32_t secondary1_progress;
int32_t secondary2_progress;
int32_t total_progress;
} tests[]{
{bulk_load_status::BLS_DOWNLOADING, 10, 10, 10, 10},
{bulk_load_status::BLS_DOWNLOADED, 100, 0, 0, 33},
{bulk_load_status::BLS_DOWNLOADED, 100, 100, 100, 100},
};

for (auto test : tests) {
ASSERT_EQ(test_report_group_download_progress(test.primary_status,
test.primary_progress,
test.secondary1_progress,
test.secondary2_progress),
test.total_progress);
}
}

} // namespace replication
} // namespace dsn
2 changes: 2 additions & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ void primary_context::cleanup_bulk_load_states()
// TODO(heyuchen): TBD
// primary will save bulk load states reported from secondaries, this function is to cleanup
// those states
secondary_bulk_load_states.erase(secondary_bulk_load_states.begin(),
secondary_bulk_load_states.end());
}

bool secondary_context::cleanup(bool force)
Expand Down
2 changes: 2 additions & 0 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ class primary_context
// Used for bulk load
// group bulk_load response tasks of RPC_GROUP_BULK_LOAD for each secondary replica
node_tasks group_bulk_load_pending_replies;
// bulk_load_state of secondary replicas
std::unordered_map<rpc_address, partition_bulk_load_state> secondary_bulk_load_states;
};

class secondary_context
Expand Down
11 changes: 9 additions & 2 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,15 @@ class mock_replica : public replica
void prepare_list_commit_hard(decree d) { _prepare_list->commit(d, COMMIT_TO_DECREE_HARD); }
decree get_app_last_committed_decree() { return _app->last_committed_decree(); }
void set_app_last_committed_decree(decree d) { _app->_last_committed_decree = d; }
primary_context get_primary_context() { return _primary_states; }
void set_primary_context(primary_context context) { _primary_states = context; }
void set_primary_partition_configuration(partition_configuration &pconfig)
{
_primary_states.membership = pconfig;
}
void set_secondary_bulk_load_state(const rpc_address &node,
const partition_bulk_load_state &state)
{
_primary_states.secondary_bulk_load_states[node] = state;
}

private:
decree _max_gced_decree{invalid_decree - 1};
Expand Down

0 comments on commit c682bf1

Please sign in to comment.