Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
Merge branch 'master' into aio
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 authored Jul 8, 2020
2 parents c24c19d + 24f5524 commit f62eb69
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 5 deletions.
47 changes: 42 additions & 5 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,33 @@ void replica_bulk_loader::on_group_bulk_load_reply(error_code err,

_replica->_primary_states.group_bulk_load_pending_replies.erase(req.target_address);

// TODO(heyuchen): TBD
// if error happened, reset secondary bulk_load_state
// otherwise, set secondary bulk_load_states from resp
if (err != ERR_OK) {
derror_replica("failed to receive group_bulk_load_reply from {}, error = {}",
req.target_address.to_string(),
err.to_string());
_replica->_primary_states.reset_node_bulk_load_states(req.target_address);
return;
}

if (resp.err != ERR_OK) {
derror_replica("receive group_bulk_load response from {} failed, error = {}",
req.target_address.to_string(),
resp.err.to_string());
_replica->_primary_states.reset_node_bulk_load_states(req.target_address);
return;
}

if (req.config.ballot != get_ballot()) {
derror_replica("recevied wrong group_bulk_load response from {}, request ballot = {}, "
"current ballot = {}",
req.target_address.to_string(),
req.config.ballot,
get_ballot());
_replica->_primary_states.reset_node_bulk_load_states(req.target_address);
return;
}

_replica->_primary_states.secondary_bulk_load_states[req.target_address] = resp.bulk_load_state;
}

// ThreadPool: THREAD_POOL_REPLICATION
Expand Down Expand Up @@ -484,7 +508,9 @@ void replica_bulk_loader::handle_bulk_load_finish(bulk_load_status::type new_sta
}

if (status() == partition_status::PS_PRIMARY) {
// TODO(heyuchen): cleanup _primary_states.secondary_bulk_load_states
for (const auto &target_address : _replica->_primary_states.membership.secondaries) {
_replica->_primary_states.reset_node_bulk_load_states(target_address);
}
}

ddebug_replica("bulk load finished, old_status = {}, new_status = {}",
Expand Down Expand Up @@ -543,7 +569,7 @@ void replica_bulk_loader::clear_bulk_load_states()
_replica->_is_bulk_load_ingestion = false;
_replica->_app->set_ingestion_status(ingestion_status::IS_INVALID);

// TODO(heyuchen): clear other states
// TODO(heyuchen): clear other states for perf-counter

_status = bulk_load_status::BLS_INVALID;
}
Expand Down Expand Up @@ -812,5 +838,16 @@ void replica_bulk_loader::report_bulk_load_states_to_primary(
response.bulk_load_state = bulk_load_state;
}

// ThreadPool: THREAD_POOL_REPLICATION
void replica_bulk_loader::clear_bulk_load_states_if_needed(partition_status::type new_status)
{
partition_status::type old_status = status();
if ((new_status == partition_status::PS_PRIMARY ||
new_status == partition_status::PS_SECONDARY) &&
new_status != old_status) {
clear_bulk_load_states();
}
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/dist/replication/lib/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ class replica_bulk_loader : replica_base
void report_bulk_load_states_to_primary(bulk_load_status::type remote_status,
/*out*/ group_bulk_load_response &response);

// called by `update_local_configuration` to do possible states cleaning up
void clear_bulk_load_states_if_needed(partition_status::type new_status);

///
/// bulk load path on remote file provider:
/// <bulk_load_root>/<cluster_name>/<app_name>/{bulk_load_info}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,17 @@ class replica_bulk_loader_test : public replica_test_base
return response.is_group_bulk_load_paused;
}

void test_on_group_bulk_load_reply(bulk_load_status::type req_status,
ballot req_ballot,
error_code resp_error = ERR_OK,
error_code rpc_error = ERR_OK)
{
create_group_bulk_load_request(req_status, req_ballot);
group_bulk_load_response resp;
resp.err = resp_error;
_bulk_loader->on_group_bulk_load_reply(rpc_error, _group_req, resp);
}

/// mock structure functions

void
Expand Down Expand Up @@ -322,6 +333,15 @@ class replica_bulk_loader_test : public replica_test_base
_replica->set_secondary_bulk_load_state(SECONDARY2, state2);
}

void mock_group_ingestion_states(ingestion_status::type s1_status,
ingestion_status::type s2_status,
bool is_empty_prepare_sent = true)
{
mock_replica_bulk_load_varieties(
bulk_load_status::BLS_INGESTING, 100, ingestion_status::IS_SUCCEED);
mock_secondary_ingestion_states(s1_status, s2_status, is_empty_prepare_sent);
}

void mock_group_cleanup_flag(bulk_load_status::type primary_status,
bool s1_cleaned_up = true,
bool s2_cleaned_up = true)
Expand All @@ -343,6 +363,19 @@ class replica_bulk_loader_test : public replica_test_base
bulk_load_status::type get_bulk_load_status() const { return _bulk_loader->_status; }
bool is_cleaned_up() { return _bulk_loader->is_cleaned_up(); }
int32_t get_download_progress() { return _bulk_loader->_download_progress.load(); }
bool is_secondary_bulk_load_state_reset()
{
const partition_bulk_load_state &state = _replica->get_secondary_bulk_load_state(SECONDARY);
bool is_download_state_reset =
(state.__isset.download_progress && state.__isset.download_status &&
state.download_progress == 0 && state.download_status == ERR_OK);
bool is_ingestion_status_reset =
(state.__isset.ingest_status && state.ingest_status == ingestion_status::IS_INVALID);
bool is_cleanup_flag_reset = (state.__isset.is_cleaned_up && !state.is_cleaned_up);
bool is_paused_flag_reset = (state.__isset.is_paused && !state.is_paused);
return is_download_state_reset && is_ingestion_status_reset && is_cleanup_flag_reset &&
is_paused_flag_reset;
}

public:
std::unique_ptr<mock_replica> _replica;
Expand Down Expand Up @@ -748,5 +781,58 @@ TEST_F(replica_bulk_loader_test, report_group_is_paused_test)
}
}

// on_group_bulk_load_reply unit tests
TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_downloading_error)
{
mock_group_progress(bulk_load_status::BLS_DOWNLOADING, 30, 30, 60);
test_on_group_bulk_load_reply(bulk_load_status::BLS_DOWNLOADING, BALLOT, ERR_BUSY);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_downloaded_error)
{
mock_group_progress(bulk_load_status::BLS_DOWNLOADED);
test_on_group_bulk_load_reply(bulk_load_status::BLS_DOWNLOADED, BALLOT, ERR_INVALID_STATE);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_ingestion_error)
{
mock_group_ingestion_states(ingestion_status::IS_RUNNING, ingestion_status::IS_SUCCEED);
test_on_group_bulk_load_reply(
bulk_load_status::BLS_INGESTING, BALLOT - 1, ERR_OK, ERR_INVALID_STATE);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_succeed_error)
{
mock_group_cleanup_flag(bulk_load_status::BLS_SUCCEED);
test_on_group_bulk_load_reply(
bulk_load_status::BLS_SUCCEED, BALLOT - 1, ERR_OK, ERR_INVALID_STATE);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_failed_error)
{
mock_group_ingestion_states(ingestion_status::IS_RUNNING, ingestion_status::IS_SUCCEED);
test_on_group_bulk_load_reply(bulk_load_status::BLS_FAILED, BALLOT, ERR_OK, ERR_TIMEOUT);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_pausing_error)
{
mock_group_progress(bulk_load_status::BLS_PAUSED, 100, 50, 10);
test_on_group_bulk_load_reply(
bulk_load_status::BLS_PAUSING, BALLOT, ERR_OK, ERR_NETWORK_FAILURE);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

TEST_F(replica_bulk_loader_test, on_group_bulk_load_reply_rpc_error)
{
mock_group_cleanup_flag(bulk_load_status::BLS_INVALID, true, false);
test_on_group_bulk_load_reply(bulk_load_status::BLS_CANCELED, BALLOT, ERR_OBJECT_NOT_FOUND);
ASSERT_TRUE(is_secondary_bulk_load_state_reset());
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/dist/replication/lib/replica_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "mutation.h"
#include "mutation_log.h"
#include "replica_stub.h"
#include "bulk_load/replica_bulk_loader.h"
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/fail_point.h>
Expand Down Expand Up @@ -749,6 +750,8 @@ bool replica::update_local_configuration(const replica_configuration &config,
max_prepared_decree(),
last_committed_decree());

_bulk_loader->clear_bulk_load_states_if_needed(config.status);

// Notice: there has five ways that primary can change its partition_status
// 1, primary change partition config, such as add/remove secondary
// 2, downgrage to secondary because of load balance
Expand Down
9 changes: 9 additions & 0 deletions src/dist/replication/lib/replica_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,15 @@ bool primary_context::check_exist(::dsn::rpc_address node, partition_status::typ
}
}

void primary_context::reset_node_bulk_load_states(const rpc_address &node)
{
secondary_bulk_load_states[node].__set_download_progress(0);
secondary_bulk_load_states[node].__set_download_status(ERR_OK);
secondary_bulk_load_states[node].__set_ingest_status(ingestion_status::IS_INVALID);
secondary_bulk_load_states[node].__set_is_cleaned_up(false);
secondary_bulk_load_states[node].__set_is_paused(false);
}

void primary_context::cleanup_bulk_load_states()
{
secondary_bulk_load_states.erase(secondary_bulk_load_states.begin(),
Expand Down
3 changes: 3 additions & 0 deletions src/dist/replication/lib/replica_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class primary_context

void do_cleanup_pending_mutations(bool clean_pending_mutations = true);

// reset bulk load states in secondary_bulk_load_states by node address
void reset_node_bulk_load_states(const rpc_address &node);

void cleanup_bulk_load_states();

public:
Expand Down
4 changes: 4 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ class mock_replica : public replica
{
_primary_states.membership = pconfig;
}
partition_bulk_load_state get_secondary_bulk_load_state(const rpc_address &node)
{
return _primary_states.secondary_bulk_load_states[node];
}
void set_secondary_bulk_load_state(const rpc_address &node,
const partition_bulk_load_state &state)
{
Expand Down

0 comments on commit f62eb69

Please sign in to comment.