Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

fix(one-time backup): fix bug when backup request is timeout #990

Merged
merged 12 commits into from
Jan 26, 2022
105 changes: 67 additions & 38 deletions src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,37 @@ void backup_engine::backup_app_partition(const gpid &pid)
_backup_status[pid.get_partition_index()] = backup_status::ALIVE;
}

void backup_engine::on_backup_reply(error_code err,
const backup_response &response,
gpid pid,
const rpc_address &primary)
inline void backup_engine::handle_replica_backup_failed(const backup_response &response,
const gpid pid)
Smityz marked this conversation as resolved.
Show resolved Hide resolved
{
dcheck_eq(response.pid, pid);
dcheck_eq(response.backup_id, _cur_backup.backup_id);

derror_f("backup_id({}): backup for partition {} failed, response.err: {}",
_cur_backup.backup_id,
pid.to_string(),
response.err.to_string());
zauto_lock l(_lock);
// if one partition fail, the whole backup plan fail.
_is_backup_failed = true;
_backup_status[pid.get_partition_index()] = backup_status::FAILED;
return;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
}

inline void backup_engine::retry_backup(const dsn::gpid pid)
{
tasking::enqueue(LPC_DEFAULT_CALLBACK,
&_tracker,
[this, pid]() { backup_app_partition(pid); },
0,
std::chrono::seconds(1));
Smityz marked this conversation as resolved.
Show resolved Hide resolved
}

void backup_engine::on_backup_reply(const error_code err,
const backup_response &response,
const gpid pid,
const rpc_address &primary)
{
{
zauto_lock l(_lock);
// if backup of some partition failed, we would not handle response from other partitions.
Expand All @@ -215,51 +238,57 @@ void backup_engine::on_backup_reply(error_code err,
// if backup failed, receive ERR_LOCAL_APP_FAILURE;
// backup not completed in other cases.
// see replica::on_cold_backup() for details.
int32_t partition = pid.get_partition_index();
if (err == dsn::ERR_OK && response.err == dsn::ERR_OK &&
response.progress == cold_backup_constant::PROGRESS_FINISHED) {
if (err != ERR_OK) {
Smityz marked this conversation as resolved.
Show resolved Hide resolved
derror_f("backup_id({}): send backup request to server {} failed, rpc error: {}, retry to "
"send backup request.",
_cur_backup.backup_id,
primary.to_string(),
err.to_string());

retry_backup(pid);
return;
};

if (response.err == ERR_LOCAL_APP_FAILURE) {
handle_replica_backup_failed(response, pid);
return;
}

if (response.err != ERR_OK) {
derror_f("backup_id({}): replica {} backup failed, response.err: {} , retry to send backup "
"request.",
_cur_backup.backup_id,
primary.to_string(),
response.err.to_string());

retry_backup(pid);
return;
}

if (response.progress == cold_backup_constant::PROGRESS_FINISHED) {
dcheck_eq(response.pid, pid);
dcheck_eq(response.backup_id, _cur_backup.backup_id);
ddebug_f("backup_id({}): backup for partition {} completed.",
_cur_backup.backup_id,
pid.to_string());
{
zauto_lock l(_lock);
_backup_status[partition] = backup_status::COMPLETED;
_backup_status[pid.get_partition_index()] = backup_status::COMPLETED;
}
complete_current_backup();
return;
}

if (response.err == ERR_LOCAL_APP_FAILURE) {
derror_f("backup_id({}): backup for partition {} failed.",
_cur_backup.backup_id,
pid.to_string());
zauto_lock l(_lock);
_is_backup_failed = true;
_backup_status[partition] = backup_status::FAILED;
return;
}
// backup is not finished, meta polling to send request
ddebug_f("backup_id({}): receive backup response for partition {} from server {}, now "
"progress {}, retry to send backup request.",
_cur_backup.backup_id,
pid.to_string(),
primary.to_string(),
response.progress);
Smityz marked this conversation as resolved.
Show resolved Hide resolved

if (err != ERR_OK) {
dwarn_f("backup_id({}): send backup request to server {} failed, rpc error: {}, retry to "
"send backup request.",
_cur_backup.backup_id,
primary.to_string(),
err.to_string());
} else {
ddebug_f(
"backup_id({}): receive backup response for partition {} from server {}, rpc error "
"{}, response error {}, retry to send backup request.",
_cur_backup.backup_id,
pid.to_string(),
primary.to_string(),
err.to_string(),
response.err.to_string());
}
tasking::enqueue(LPC_DEFAULT_CALLBACK,
&_tracker,
[this, pid]() { backup_app_partition(pid); },
0,
std::chrono::seconds(1));
retry_backup(pid);
return;
Smityz marked this conversation as resolved.
Show resolved Hide resolved
}

void backup_engine::write_backup_info()
Expand Down
2 changes: 2 additions & 0 deletions src/meta/backup_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class backup_engine
const rpc_address &primary);
void write_backup_info();
void complete_current_backup();
void handle_replica_backup_failed(const backup_response &response, const gpid pid);
void retry_backup(const dsn::gpid pid);

const std::string get_policy_name() const
{
Expand Down
2 changes: 2 additions & 0 deletions src/meta/meta_backup_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ class backup_service
zlock _lock;
std::map<std::string, std::shared_ptr<policy_context>>
_policy_states; // policy_name -> policy_context

// _backup_states stores all states of one-time backup in the cluster, not persistence to ZK
std::vector<std::shared_ptr<backup_engine>> _backup_states;

// the root of policy metas, stored on remote_storage(zookeeper)
Expand Down
51 changes: 39 additions & 12 deletions src/meta/test/meta_backup_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,26 @@ class backup_engine_test : public meta_test_base
_backup_engine->on_backup_reply(rpc_err, resp, pid, mock_primary_address);
}

void mock_on_backup_reply_when_timeout(int32_t partition_index, error_code rpc_err)
{
gpid pid = gpid(_app_id, partition_index);
rpc_address mock_primary_address = rpc_address("127.0.0.1", 10000 + partition_index);
backup_response resp;
_backup_engine->on_backup_reply(rpc_err, resp, pid, mock_primary_address);
}

bool is_backup_failed() const
{
zauto_lock l(_backup_engine->_lock);
return _backup_engine->_is_backup_failed;
}

void reset_backup_engine()
{
zauto_lock l(_backup_engine->_lock);
_backup_engine->_is_backup_failed = false;
}

protected:
const std::string _policy_root;
const std::string _backup_root;
Expand All @@ -267,28 +281,41 @@ TEST_F(backup_engine_test, test_on_backup_reply)
ASSERT_TRUE(_backup_engine->is_in_progress());

// recieve a backup finished response
reset_backup_engine();
mock_on_backup_reply(/*partition_index=*/1,
ERR_OK,
ERR_OK,
/*progress=*/cold_backup_constant::PROGRESS_FINISHED);
ASSERT_TRUE(_backup_engine->is_in_progress());

// recieve a backup in-progress response
// receive a backup in-progress response
reset_backup_engine();
mock_on_backup_reply(/*partition_index=*/2, ERR_OK, ERR_BUSY, /*progress=*/0);
ASSERT_TRUE(_backup_engine->is_in_progress());
ASSERT_EQ(_backup_engine->_backup_status[2], backup_status::ALIVE);

// recieve a backup failed response
mock_on_backup_reply(/*partition_index=*/3, ERR_OK, ERR_LOCAL_APP_FAILURE, /*progress=*/0);
ASSERT_TRUE(is_backup_failed());
// if one partition fail, all backup plan will fail
{
// receive a backup failed response
reset_backup_engine();
mock_on_backup_reply(/*partition_index=*/3, ERR_OK, ERR_LOCAL_APP_FAILURE, /*progress=*/0);
ASSERT_TRUE(is_backup_failed());

// this backup is still a failure even recieved non-failure response
mock_on_backup_reply(/*partition_index=*/4, ERR_OK, ERR_BUSY, /*progress=*/0);
ASSERT_TRUE(is_backup_failed());
mock_on_backup_reply(/*partition_index=*/5,
ERR_OK,
ERR_OK,
/*progress=*/cold_backup_constant::PROGRESS_FINISHED);
ASSERT_TRUE(is_backup_failed());
// this backup is still a failure even received non-failure response
mock_on_backup_reply(/*partition_index=*/4, ERR_OK, ERR_BUSY, /*progress=*/0);
ASSERT_TRUE(is_backup_failed());

mock_on_backup_reply(/*partition_index=*/5,
ERR_OK,
ERR_OK,
/*progress=*/cold_backup_constant::PROGRESS_FINISHED);
ASSERT_TRUE(is_backup_failed());
}

// meta request is timeout
reset_backup_engine();
mock_on_backup_reply_when_timeout(/*partition_index=*/5, ERR_TIMEOUT);
ASSERT_FALSE(is_backup_failed());
}

TEST_F(backup_engine_test, test_backup_completed)
Expand Down
2 changes: 1 addition & 1 deletion src/replica/replica_backup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ void replica::send_backup_request_to_secondary(const backup_request &request)
{
for (const auto &target_address : _primary_states.membership.secondaries) {
// primary will send backup_request to secondary periodically
// so, we shouldn't handler the response
// so, we shouldn't handle the response
rpc::call_one_way_typed(target_address, RPC_COLD_BACKUP, request, get_gpid().thread_hash());
}
}
Expand Down