diff --git a/src/meta/backup_engine.cpp b/src/meta/backup_engine.cpp index 4df992782f..47af195257 100644 --- a/src/meta/backup_engine.cpp +++ b/src/meta/backup_engine.cpp @@ -194,14 +194,36 @@ void backup_engine::backup_app_partition(const gpid &pid) _backup_status[pid.get_partition_index()] = backup_status::ALIVE; } -void backup_engine::on_backup_reply(error_code err, - const backup_response &response, - gpid pid, - const rpc_address &primary) +inline void backup_engine::handle_replica_backup_failed(const backup_response &response, + const gpid pid) { dcheck_eq(response.pid, pid); dcheck_eq(response.backup_id, _cur_backup.backup_id); + derror_f("backup_id({}): backup for partition {} failed, response.err: {}", + _cur_backup.backup_id, + pid.to_string(), + response.err.to_string()); + zauto_lock l(_lock); + // if one partition fail, the whole backup plan fail. + _is_backup_failed = true; + _backup_status[pid.get_partition_index()] = backup_status::FAILED; +} + +inline void backup_engine::retry_backup(const dsn::gpid pid) +{ + tasking::enqueue(LPC_DEFAULT_CALLBACK, + &_tracker, + [this, pid]() { backup_app_partition(pid); }, + 0, + std::chrono::seconds(1)); +} + +void backup_engine::on_backup_reply(const error_code err, + const backup_response &response, + const gpid pid, + const rpc_address &primary) +{ { zauto_lock l(_lock); // if backup of some partition failed, we would not handle response from other partitions. @@ -215,51 +237,47 @@ void backup_engine::on_backup_reply(error_code err, // if backup failed, receive ERR_LOCAL_APP_FAILURE; // backup not completed in other cases. // see replica::on_cold_backup() for details. - int32_t partition = pid.get_partition_index(); - if (err == dsn::ERR_OK && response.err == dsn::ERR_OK && - response.progress == cold_backup_constant::PROGRESS_FINISHED) { + + auto rep_error = err == ERR_OK ? response.err : err; + + if (rep_error == ERR_LOCAL_APP_FAILURE) { + handle_replica_backup_failed(response, pid); + return; + } + + if (rep_error != ERR_OK) { + derror_f("backup_id({}): backup request to server {} failed, error: {}, retry to " + "send backup request.", + _cur_backup.backup_id, + primary.to_string(), + rep_error.to_string()); + retry_backup(pid); + return; + }; + + if (response.progress == cold_backup_constant::PROGRESS_FINISHED) { + dcheck_eq(response.pid, pid); + dcheck_eq(response.backup_id, _cur_backup.backup_id); ddebug_f("backup_id({}): backup for partition {} completed.", _cur_backup.backup_id, pid.to_string()); { zauto_lock l(_lock); - _backup_status[partition] = backup_status::COMPLETED; + _backup_status[pid.get_partition_index()] = backup_status::COMPLETED; } complete_current_backup(); return; } - if (response.err == ERR_LOCAL_APP_FAILURE) { - derror_f("backup_id({}): backup for partition {} failed.", - _cur_backup.backup_id, - pid.to_string()); - zauto_lock l(_lock); - _is_backup_failed = true; - _backup_status[partition] = backup_status::FAILED; - return; - } + // backup is not finished, meta polling to send request + ddebug_f("backup_id({}): receive backup response for partition {} from server {}, now " + "progress {}, retry to send backup request.", + _cur_backup.backup_id, + pid.to_string(), + primary.to_string(), + response.progress); - if (err != ERR_OK) { - dwarn_f("backup_id({}): send backup request to server {} failed, rpc error: {}, retry to " - "send backup request.", - _cur_backup.backup_id, - primary.to_string(), - err.to_string()); - } else { - ddebug_f( - "backup_id({}): receive backup response for partition {} from server {}, rpc error " - "{}, response error {}, retry to send backup request.", - _cur_backup.backup_id, - pid.to_string(), - primary.to_string(), - err.to_string(), - response.err.to_string()); - } - tasking::enqueue(LPC_DEFAULT_CALLBACK, - &_tracker, - [this, pid]() { backup_app_partition(pid); }, - 0, - std::chrono::seconds(1)); + retry_backup(pid); } void backup_engine::write_backup_info() diff --git a/src/meta/backup_engine.h b/src/meta/backup_engine.h index 55f1065da3..75a84b7935 100644 --- a/src/meta/backup_engine.h +++ b/src/meta/backup_engine.h @@ -84,6 +84,8 @@ class backup_engine const rpc_address &primary); void write_backup_info(); void complete_current_backup(); + void handle_replica_backup_failed(const backup_response &response, const gpid pid); + void retry_backup(const dsn::gpid pid); const std::string get_policy_name() const { diff --git a/src/meta/meta_backup_service.h b/src/meta/meta_backup_service.h index e50b099ab6..7a38b1bb9d 100644 --- a/src/meta/meta_backup_service.h +++ b/src/meta/meta_backup_service.h @@ -380,6 +380,8 @@ class backup_service zlock _lock; std::map> _policy_states; // policy_name -> policy_context + + // _backup_states stores all states of one-time backup in the cluster, not persistence to ZK std::vector> _backup_states; // the root of policy metas, stored on remote_storage(zookeeper) diff --git a/src/meta/test/meta_backup_test.cpp b/src/meta/test/meta_backup_test.cpp index e8b528be09..ce7cbfa7c2 100644 --- a/src/meta/test/meta_backup_test.cpp +++ b/src/meta/test/meta_backup_test.cpp @@ -243,12 +243,26 @@ class backup_engine_test : public meta_test_base _backup_engine->on_backup_reply(rpc_err, resp, pid, mock_primary_address); } + void mock_on_backup_reply_when_timeout(int32_t partition_index, error_code rpc_err) + { + gpid pid = gpid(_app_id, partition_index); + rpc_address mock_primary_address = rpc_address("127.0.0.1", 10000 + partition_index); + backup_response resp; + _backup_engine->on_backup_reply(rpc_err, resp, pid, mock_primary_address); + } + bool is_backup_failed() const { zauto_lock l(_backup_engine->_lock); return _backup_engine->_is_backup_failed; } + void reset_backup_engine() + { + zauto_lock l(_backup_engine->_lock); + _backup_engine->_is_backup_failed = false; + } + protected: const std::string _policy_root; const std::string _backup_root; @@ -267,28 +281,41 @@ TEST_F(backup_engine_test, test_on_backup_reply) ASSERT_TRUE(_backup_engine->is_in_progress()); // recieve a backup finished response + reset_backup_engine(); mock_on_backup_reply(/*partition_index=*/1, ERR_OK, ERR_OK, /*progress=*/cold_backup_constant::PROGRESS_FINISHED); ASSERT_TRUE(_backup_engine->is_in_progress()); - // recieve a backup in-progress response + // receive a backup in-progress response + reset_backup_engine(); mock_on_backup_reply(/*partition_index=*/2, ERR_OK, ERR_BUSY, /*progress=*/0); ASSERT_TRUE(_backup_engine->is_in_progress()); + ASSERT_EQ(_backup_engine->_backup_status[2], backup_status::ALIVE); - // recieve a backup failed response - mock_on_backup_reply(/*partition_index=*/3, ERR_OK, ERR_LOCAL_APP_FAILURE, /*progress=*/0); - ASSERT_TRUE(is_backup_failed()); + // if one partition fail, all backup plan will fail + { + // receive a backup failed response + reset_backup_engine(); + mock_on_backup_reply(/*partition_index=*/3, ERR_OK, ERR_LOCAL_APP_FAILURE, /*progress=*/0); + ASSERT_TRUE(is_backup_failed()); - // this backup is still a failure even recieved non-failure response - mock_on_backup_reply(/*partition_index=*/4, ERR_OK, ERR_BUSY, /*progress=*/0); - ASSERT_TRUE(is_backup_failed()); - mock_on_backup_reply(/*partition_index=*/5, - ERR_OK, - ERR_OK, - /*progress=*/cold_backup_constant::PROGRESS_FINISHED); - ASSERT_TRUE(is_backup_failed()); + // this backup is still a failure even received non-failure response + mock_on_backup_reply(/*partition_index=*/4, ERR_OK, ERR_BUSY, /*progress=*/0); + ASSERT_TRUE(is_backup_failed()); + + mock_on_backup_reply(/*partition_index=*/5, + ERR_OK, + ERR_OK, + /*progress=*/cold_backup_constant::PROGRESS_FINISHED); + ASSERT_TRUE(is_backup_failed()); + } + + // meta request is timeout + reset_backup_engine(); + mock_on_backup_reply_when_timeout(/*partition_index=*/5, ERR_TIMEOUT); + ASSERT_FALSE(is_backup_failed()); } TEST_F(backup_engine_test, test_backup_completed) diff --git a/src/replica/replica_backup.cpp b/src/replica/replica_backup.cpp index b9ba9860ea..2268fe5936 100644 --- a/src/replica/replica_backup.cpp +++ b/src/replica/replica_backup.cpp @@ -219,7 +219,7 @@ void replica::send_backup_request_to_secondary(const backup_request &request) { for (const auto &target_address : _primary_states.membership.secondaries) { // primary will send backup_request to secondary periodically - // so, we shouldn't handler the response + // so, we shouldn't handle the response rpc::call_one_way_typed(target_address, RPC_COLD_BACKUP, request, get_gpid().thread_hash()); } }