diff --git a/src/common/replication_common.cpp b/src/common/replication_common.cpp index f8a7f07765..0dad8f0d7a 100644 --- a/src/common/replication_common.cpp +++ b/src/common/replication_common.cpp @@ -115,8 +115,6 @@ replication_options::replication_options() learn_app_max_concurrent_count = 5; - max_concurrent_uploading_file_count = 10; - cold_backup_checkpoint_reserve_minutes = 10; } @@ -510,12 +508,6 @@ void replication_options::initialize() cold_backup_root = dsn_config_get_value_string( "replication", "cold_backup_root", "", "cold backup remote storage path prefix"); - max_concurrent_uploading_file_count = - (int32_t)dsn_config_get_value_uint64("replication", - "max_concurrent_uploading_file_count", - max_concurrent_uploading_file_count, - "concurrent uploading file count"); - cold_backup_checkpoint_reserve_minutes = (int)dsn_config_get_value_uint64("replication", "cold_backup_checkpoint_reserve_minutes", diff --git a/src/common/replication_common.h b/src/common/replication_common.h index da95fe5dcb..c5ae401931 100644 --- a/src/common/replication_common.h +++ b/src/common/replication_common.h @@ -116,7 +116,6 @@ class replication_options int32_t learn_app_max_concurrent_count; std::string cold_backup_root; - int32_t max_concurrent_uploading_file_count; int32_t cold_backup_checkpoint_reserve_minutes; std::string bulk_load_provider_root; diff --git a/src/replica/CMakeLists.txt b/src/replica/CMakeLists.txt index 45cd7dc6d4..2827ea9a85 100644 --- a/src/replica/CMakeLists.txt +++ b/src/replica/CMakeLists.txt @@ -9,7 +9,8 @@ set(DUPLICATION_SRC duplication/mutation_batch.cpp ) -set(BACKUP_SRC backup/replica_backup_manager.cpp) +set(BACKUP_SRC backup/replica_backup_manager.cpp + backup/cold_backup_context.cpp) set(BULK_LOAD_SRC bulk_load/replica_bulk_loader.cpp) diff --git a/src/replica/backup/cold_backup_context.cpp b/src/replica/backup/cold_backup_context.cpp new file mode 100644 index 0000000000..2343d53eba --- /dev/null +++ b/src/replica/backup/cold_backup_context.cpp @@ -0,0 +1,1089 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cold_backup_context.h" +#include "replica/replica.h" +#include "replica/replica_stub.h" +#include "block_service/block_service_manager.h" + +#include + +namespace dsn { +namespace replication { + +const char *cold_backup_status_to_string(cold_backup_status status) +{ + switch (status) { + case ColdBackupInvalid: + return "ColdBackupInvalid"; + case ColdBackupChecking: + return "ColdBackupChecking"; + case ColdBackupChecked: + return "ColdBackupChecked"; + case ColdBackupCheckpointing: + return "ColdBackupCheckpointing"; + case ColdBackupCheckpointed: + return "ColdBackupCheckpointed"; + case ColdBackupUploading: + return "ColdBackupUploading"; + case ColdBackupPaused: + return "ColdBackupPaused"; + case ColdBackupCanceled: + return "ColdBackupCanceled"; + case ColdBackupCompleted: + return "ColdBackupCompleted"; + case ColdBackupFailed: + return "ColdBackupFailed"; + default: + dassert(false, ""); + } + return "ColdBackupXXX"; +} + +void cold_backup_context::cancel() +{ + _status.store(ColdBackupCanceled); + if (_owner_replica != nullptr) { + _owner_replica->get_replica_stub()->_counter_cold_backup_recent_cancel_count->increment(); + } +} + +bool cold_backup_context::start_check() +{ + int invalid = ColdBackupInvalid; + if (_status.compare_exchange_strong(invalid, ColdBackupChecking)) { + _start_time_ms = dsn_now_ms(); + return true; + } else { + return false; + } +} + +bool cold_backup_context::fail_check(const char *failure_reason) +{ + int checking = ColdBackupChecking; + if (_status.compare_exchange_strong(checking, ColdBackupFailed)) { + strncpy(_reason, failure_reason, sizeof(_reason) - 1); + _reason[sizeof(_reason) - 1] = '\0'; + if (_owner_replica != nullptr) { + _owner_replica->get_replica_stub()->_counter_cold_backup_recent_fail_count->increment(); + } + return true; + } else { + return false; + } +} + +bool cold_backup_context::complete_check(bool uploaded) +{ + int checking = ColdBackupChecking; + if (uploaded) { + _progress.store(cold_backup_constant::PROGRESS_FINISHED); + if (_owner_replica != nullptr) { + _owner_replica->get_replica_stub()->_counter_cold_backup_recent_succ_count->increment(); + } + return _status.compare_exchange_strong(checking, ColdBackupCompleted); + } else { + return _status.compare_exchange_strong(checking, ColdBackupChecked); + } +} + +bool cold_backup_context::start_checkpoint() +{ + int checked = ColdBackupChecked; + if (_status.compare_exchange_strong(checked, ColdBackupCheckpointing)) { + return true; + } else { + return false; + } +} + +bool cold_backup_context::fail_checkpoint(const char *failure_reason) +{ + int checkpointing = ColdBackupCheckpointing; + if (_status.compare_exchange_strong(checkpointing, ColdBackupFailed)) { + strncpy(_reason, failure_reason, sizeof(_reason) - 1); + _reason[sizeof(_reason) - 1] = '\0'; + if (_owner_replica != nullptr) { + _owner_replica->get_replica_stub()->_counter_cold_backup_recent_fail_count->increment(); + } + return true; + } else { + return false; + } +} + +bool cold_backup_context::complete_checkpoint() +{ + int checkpointing = ColdBackupCheckpointing; + if (_status.compare_exchange_strong(checkpointing, ColdBackupCheckpointed)) { + return true; + } else { + return false; + } +} + +bool cold_backup_context::pause_upload() +{ + int uploading = ColdBackupUploading; + if (_status.compare_exchange_strong(uploading, ColdBackupPaused)) { + if (_owner_replica != nullptr) { + _owner_replica->get_replica_stub() + ->_counter_cold_backup_recent_pause_count->increment(); + } + return true; + } else { + return false; + } +} + +bool cold_backup_context::fail_upload(const char *failure_reason) +{ + int uploading = ColdBackupUploading; + int paused = ColdBackupPaused; + if (_status.compare_exchange_strong(uploading, ColdBackupFailed) || + _status.compare_exchange_strong(paused, ColdBackupFailed)) { + strncpy(_reason, failure_reason, sizeof(_reason) - 1); + _reason[sizeof(_reason) - 1] = '\0'; + if (_owner_replica != nullptr) { + _owner_replica->get_replica_stub()->_counter_cold_backup_recent_fail_count->increment(); + } + return true; + } else { + return false; + } +} + +bool cold_backup_context::complete_upload() +{ + int uploading = ColdBackupUploading; + int paused = ColdBackupPaused; + if (_status.compare_exchange_strong(uploading, ColdBackupCompleted) || + _status.compare_exchange_strong(paused, ColdBackupCompleted)) { + _progress.store(cold_backup_constant::PROGRESS_FINISHED); + if (_owner_replica != nullptr) { + _owner_replica->get_replica_stub()->_counter_cold_backup_recent_succ_count->increment(); + } + return true; + } else { + return false; + } +} + +// run in REPLICATION_LONG thread +void cold_backup_context::check_backup_on_remote() +{ + // check whether current checkpoint file is exist on remote, and verify whether the checkpoint + // directory is exist + std::string current_chkpt_file = cold_backup::get_current_chkpt_file( + backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); + dist::block_service::create_file_request req; + req.file_name = current_chkpt_file; + req.ignore_metadata = false; + + // incr the ref counter, and must release_ref() after callback is execute + add_ref(); + + block_service->create_file( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, current_chkpt_file](const dist::block_service::create_file_response &resp) { + if (!is_ready_for_check()) { + ddebug("%s: backup status has changed to %s, ignore checking backup on remote", + name, + cold_backup_status_to_string(status())); + ignore_check(); + } else if (resp.err == ERR_OK) { + const dist::block_service::block_file_ptr &file_handle = resp.file_handle; + dassert(file_handle != nullptr, ""); + if (file_handle->get_md5sum().empty() && file_handle->get_size() <= 0) { + ddebug("%s: check backup on remote, current_checkpoint file %s is not exist", + name, + current_chkpt_file.c_str()); + complete_check(false); + } else { + ddebug("%s: check backup on remote, current_checkpoint file %s is exist", + name, + current_chkpt_file.c_str()); + read_current_chkpt_file(file_handle); + } + } else if (resp.err == ERR_TIMEOUT) { + derror("%s: block service create file timeout, retry after 10 seconds, file = %s", + name, + current_chkpt_file.c_str()); + + // before retry, should add_ref(), and must release_ref() after retry + add_ref(); + + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this]() { + // before retry, should check whether the status is ready for + // check + if (!is_ready_for_check()) { + ddebug("%s: backup status has changed to %s, ignore " + "checking backup on remote", + name, + cold_backup_status_to_string(status())); + ignore_check(); + } else { + check_backup_on_remote(); + } + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + derror("%s: block service create file failed, file = %s, err = %s", + name, + current_chkpt_file.c_str(), + resp.err.to_string()); + fail_check("block service create file failed"); + } + release_ref(); + }); +} + +void cold_backup_context::read_current_chkpt_file( + const dist::block_service::block_file_ptr &file_handle) +{ + dist::block_service::read_request req; + req.remote_pos = 0; + req.remote_length = -1; + + add_ref(); + + file_handle->read( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, file_handle](const dist::block_service::read_response &resp) { + if (!is_ready_for_check()) { + ddebug("%s: backup status has changed to %s, ignore checking backup on remote", + name, + cold_backup_status_to_string(status())); + ignore_check(); + } else if (resp.err == ERR_OK) { + std::string chkpt_dirname(resp.buffer.data(), resp.buffer.length()); + if (chkpt_dirname.empty()) { + complete_check(false); + } else { + ddebug("%s: after read current_checkpoint_file, check whether remote " + "checkpoint dir = %s is exist", + name, + chkpt_dirname.c_str()); + remote_chkpt_dir_exist(chkpt_dirname); + } + } else if (resp.err == ERR_TIMEOUT) { + derror("%s: read remote file timeout, retry after 10s, file = %s", + name, + file_handle->file_name().c_str()); + add_ref(); + + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this, file_handle]() { + if (!is_ready_for_check()) { + ddebug("%s: backup status has changed to %s, ignore " + "checking backup on remote", + name, + cold_backup_status_to_string(status())); + ignore_check(); + } else { + read_current_chkpt_file(file_handle); + } + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + derror("%s: read remote file failed, file = %s, err = %s", + name, + file_handle->file_name().c_str(), + resp.err.to_string()); + fail_check("read remote file failed"); + } + release_ref(); + return; + }); +} + +void cold_backup_context::remote_chkpt_dir_exist(const std::string &chkpt_dirname) +{ + dist::block_service::ls_request req; + req.dir_name = cold_backup::get_replica_backup_path( + backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); + + add_ref(); + + block_service->list_dir( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, chkpt_dirname](const dist::block_service::ls_response &resp) { + if (!is_ready_for_check()) { + ddebug("%s: backup status has changed to %s, ignore checking backup on remote", + name, + cold_backup_status_to_string(status())); + ignore_check(); + } else if (resp.err == ERR_OK) { + bool found_chkpt_dir = false; + for (const auto &entry : (*resp.entries)) { + if (entry.is_directory && entry.entry_name == chkpt_dirname) { + found_chkpt_dir = true; + break; + } + } + if (found_chkpt_dir) { + ddebug("%s: remote checkpoint dir is already exist, so upload have already " + "complete, remote_checkpoint_dirname = %s", + name, + chkpt_dirname.c_str()); + complete_check(true); + } else { + ddebug("%s: remote checkpoint dir is not exist, should re-upload checkpoint " + "dir, remote_checkpoint_dirname = %s", + name, + chkpt_dirname.c_str()); + complete_check(false); + } + } else if (resp.err == ERR_OBJECT_NOT_FOUND) { + ddebug("%s: remote checkpoint dir is not exist, should re-upload checkpoint dir, " + "remote_checkpoint_dirname = %s", + name, + chkpt_dirname.c_str()); + complete_check(false); + } else if (resp.err == ERR_TIMEOUT) { + derror("%s: block service list remote dir timeout, retry after 10s, dirname = %s", + name, + chkpt_dirname.c_str()); + add_ref(); + + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this, chkpt_dirname]() { + if (!is_ready_for_check()) { + ddebug("%s: backup status has changed to %s, ignore " + "checking backup on remote", + name, + cold_backup_status_to_string(status())); + ignore_check(); + } else { + remote_chkpt_dir_exist(chkpt_dirname); + } + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + derror("%s: block service list remote dir failed, dirname = %s, err = %s", + name, + chkpt_dirname.c_str(), + resp.err.to_string()); + fail_check("list remote dir failed"); + } + release_ref(); + return; + }); +} + +void cold_backup_context::upload_checkpoint_to_remote() +{ + if (!is_ready_for_upload()) { + ddebug("%s: backup status has changed to %s, ignore upload checkpoint", + name, + cold_backup_status_to_string(status())); + return; + } + + bool old_status = false; + // here, just allow one task to check upload status, and it will set _upload_status base on + // the result it has checked; But, because of upload_checkpoint_to_remote maybe call multi-times + // (for pause - uploading), so we use the atomic variant to implement + if (!_have_check_upload_status.compare_exchange_strong(old_status, true)) { + ddebug("%s: upload status has already been checked, start upload checkpoint dir directly", + name); + on_upload_chkpt_dir(); + return; + } + + // check whether cold_backup_metadata is exist and verify cold_backup_metadata if exist + std::string metadata = cold_backup::get_remote_chkpt_meta_file( + backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); + dist::block_service::create_file_request req; + req.file_name = metadata; + req.ignore_metadata = false; + + add_ref(); + + block_service->create_file( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, metadata](const dist::block_service::create_file_response &resp) { + if (resp.err == ERR_OK) { + dassert(resp.file_handle != nullptr, ""); + if (resp.file_handle->get_md5sum().empty() && resp.file_handle->get_size() <= 0) { + _upload_status.store(UploadUncomplete); + ddebug("%s: check upload_status complete, cold_backup_metadata isn't exist, " + "start upload checkpoint dir", + name); + on_upload_chkpt_dir(); + } else { + ddebug("%s: cold_backup_metadata is exist, read it's context", name); + read_backup_metadata(resp.file_handle); + } + } else if (resp.err == ERR_TIMEOUT) { + derror("%s: block service create file timeout, retry after 10s, file = %s", + name, + metadata.c_str()); + // when create backup_metadata timeout, should reset _have_check_upload_status + // false to allow re-check + _have_check_upload_status.store(false); + add_ref(); + + tasking::enqueue( + LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this]() { + if (!is_ready_for_upload()) { + ddebug("%s: backup status has changed to %s, stop check upload status", + name, + cold_backup_status_to_string(status())); + } else { + upload_checkpoint_to_remote(); + } + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + derror("%s: block service create file failed, file = %s, err = %s", + name, + metadata.c_str(), + resp.err.to_string()); + _have_check_upload_status.store(false); + fail_upload("block service create file failed"); + } + release_ref(); + return; + }); +} + +void cold_backup_context::read_backup_metadata( + const dist::block_service::block_file_ptr &file_handle) +{ + dist::block_service::read_request req; + req.remote_pos = 0; + req.remote_length = -1; + + add_ref(); + + file_handle->read( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, file_handle](const dist::block_service::read_response &resp) { + if (resp.err == ERR_OK) { + ddebug("%s: read cold_backup_metadata succeed, verify it's context, file = %s", + name, + file_handle->file_name().c_str()); + verify_backup_metadata(resp.buffer); + on_upload_chkpt_dir(); + } else if (resp.err == ERR_TIMEOUT) { + derror("%s: read remote file timeout, retry after 10s, file = %s", + name, + file_handle->file_name().c_str()); + add_ref(); + + tasking::enqueue( + LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this, file_handle] { + if (!is_ready_for_upload()) { + ddebug("%s: backup status has changed to %s, stop check upload status", + name, + cold_backup_status_to_string(status())); + _have_check_upload_status.store(false); + } else { + read_backup_metadata(file_handle); + } + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + derror("%s: read remote file failed, file = %s, err = %s", + name, + file_handle->file_name().c_str(), + resp.err.to_string()); + _have_check_upload_status.store(false); + fail_upload("read remote file failed"); + } + release_ref(); + return; + }); +} + +void cold_backup_context::verify_backup_metadata(const blob &value) +{ + cold_backup_metadata tmp; + if (value.length() > 0 && json::json_forwarder::decode(value, tmp)) { + ddebug("%s: check upload status complete, checkpoint dir uploading has already complete", + name); + _upload_status.store(UploadComplete); + } else { + ddebug("%s: check upload status complete, checkpoint dir uploading isn't complete yet", + name); + _upload_status.store(UploadUncomplete); + } +} + +void cold_backup_context::on_upload_chkpt_dir() +{ + if (_upload_status.load() == UploadInvalid || !is_ready_for_upload()) { + ddebug("%s: replica is not ready for uploading, ignore upload, cold_backup_status(%s)", + name, + cold_backup_status_to_string(status())); + return; + } + + if (_upload_status.load() == UploadComplete) { + // TODO: if call upload_checkpint_to_remote multi times, maybe write_current_chkpt_file + // multi times + std::string chkpt_dirname = cold_backup::get_remote_chkpt_dirname(); + write_current_chkpt_file(chkpt_dirname); + return; + } + + prepare_upload(); + + // prepare_upload maybe fail, so here check status + if (!is_ready_for_upload()) { + derror("%s: backup status has changed to %s, stop upload checkpoint dir", + name, + cold_backup_status_to_string(status())); + return; + } + + if (checkpoint_files.size() <= 0) { + ddebug("%s: checkpoint dir is empty, so upload is complete and just start write " + "backup_metadata", + name); + bool old_status = false; + // using atomic variant _have_write_backup_metadata is to allow one task to + // write backup_metadata because on_upload_chkpt_dir maybe call multi-time + if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) { + write_backup_metadata(); + } + } else { + ddebug("%s: start upload checkpoint dir, checkpoint dir = %s, total checkpoint file = %d", + name, + checkpoint_dir.c_str(), + checkpoint_files.size()); + std::vector files; + if (!upload_complete_or_fetch_uncomplete_files(files)) { + for (auto &file : files) { + ddebug("%s: start upload checkpoint file to remote, file = %s", name, file.c_str()); + upload_file(file); + } + } else { + ddebug("%s: upload checkpoint dir to remote complete, total_file_cnt = %d", + name, + checkpoint_files.size()); + bool old_status = false; + if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) { + write_backup_metadata(); + } + } + } +} + +void cold_backup_context::prepare_upload() +{ + zauto_lock l(_lock); + // only need initialize once + if (_metadata.files.size() > 0) { + return; + } + _file_remain_cnt = checkpoint_files.size(); + + _metadata.checkpoint_decree = checkpoint_decree; + _metadata.checkpoint_timestamp = checkpoint_timestamp; + _metadata.checkpoint_total_size = checkpoint_file_total_size; + for (int32_t idx = 0; idx < checkpoint_files.size(); idx++) { + std::string &file = checkpoint_files[idx]; + file_meta f_meta; + f_meta.name = file; + std::string file_full_path = ::dsn::utils::filesystem::path_combine(checkpoint_dir, file); + int64_t file_size = checkpoint_file_sizes[idx]; + std::string file_md5; + if (::dsn::utils::filesystem::md5sum(file_full_path, file_md5) != ERR_OK) { + derror("%s: get local file size or md5 fail, file = %s", name, file_full_path.c_str()); + fail_upload("compute local file size or md5 failed"); + return; + } + f_meta.md5 = file_md5; + f_meta.size = file_size; + _metadata.files.emplace_back(f_meta); + _file_status.insert(std::make_pair(file, FileUploadUncomplete)); + _file_infos.insert(std::make_pair(file, std::make_pair(file_size, file_md5))); + } + _upload_file_size.store(0); +} + +void cold_backup_context::upload_file(const std::string &local_filename) +{ + std::string remote_chkpt_dir = cold_backup::get_remote_chkpt_dir( + backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); + dist::block_service::create_file_request req; + req.file_name = ::dsn::utils::filesystem::path_combine(remote_chkpt_dir, local_filename); + req.ignore_metadata = false; + + add_ref(); + + block_service->create_file( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, local_filename](const dist::block_service::create_file_response &resp) { + if (resp.err == ERR_OK) { + const dist::block_service::block_file_ptr &file_handle = resp.file_handle; + dassert(file_handle != nullptr, ""); + int64_t local_file_size = _file_infos.at(local_filename).first; + std::string md5 = _file_infos.at(local_filename).second; + std::string full_path_local_file = + ::dsn::utils::filesystem::path_combine(checkpoint_dir, local_filename); + if (md5 == file_handle->get_md5sum() && + local_file_size == file_handle->get_size()) { + ddebug("%s: checkpoint file already exist on remote, file = %s", + name, + full_path_local_file.c_str()); + on_upload_file_complete(local_filename); + } else { + ddebug("%s: start upload checkpoint file to remote, file = %s", + name, + full_path_local_file.c_str()); + on_upload(file_handle, full_path_local_file); + } + } else if (resp.err == ERR_TIMEOUT) { + derror("%s: block service create file timeout, retry after 10s, file = %s", + name, + local_filename.c_str()); + add_ref(); + + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this, local_filename]() { + // TODO: status change from ColdBackupUploading to + // ColdBackupPaused, and upload file timeout, but when callback + // is executed it catches the status(ColdBackupPaused) + // now, if status back to ColdBackupUploading very soon, and + // call upload_checkpoint_to_remote() here, + // upload_checkpoint_to_remote() maybe acquire the _lock first, + // then stop give back file(upload timeout), the file is still + // in uploading this file will not be uploaded until you call + // upload_checkpoint_to_remote() after it's given back + if (!is_ready_for_upload()) { + std::string full_path_local_file = + ::dsn::utils::filesystem::path_combine(checkpoint_dir, + local_filename); + ddebug("%s: backup status has changed to %s, stop upload " + "checkpoint file to remote, file = %s", + name, + cold_backup_status_to_string(status()), + full_path_local_file.c_str()); + file_upload_uncomplete(local_filename); + } else { + upload_file(local_filename); + } + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + derror("%s: block service create file failed, file = %s, err = %s", + name, + local_filename.c_str(), + resp.err.to_string()); + fail_upload("create file failed"); + } + if (resp.err != ERR_OK && _owner_replica != nullptr) { + _owner_replica->get_replica_stub() + ->_counter_cold_backup_recent_upload_file_fail_count->increment(); + } + release_ref(); + return; + }); +} + +void cold_backup_context::on_upload(const dist::block_service::block_file_ptr &file_handle, + const std::string &full_path_local_file) +{ + dist::block_service::upload_request req; + req.input_local_name = full_path_local_file; + + add_ref(); + + file_handle->upload( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, file_handle, full_path_local_file]( + const dist::block_service::upload_response &resp) { + if (resp.err == ERR_OK) { + std::string local_filename = + ::dsn::utils::filesystem::get_file_name(full_path_local_file); + dassert(_file_infos.at(local_filename).first == + static_cast(resp.uploaded_size), + ""); + ddebug("%s: upload checkpoint file complete, file = %s", + name, + full_path_local_file.c_str()); + on_upload_file_complete(local_filename); + } else if (resp.err == ERR_TIMEOUT) { + derror("%s: upload checkpoint file timeout, retry after 10s, file = %s", + name, + full_path_local_file.c_str()); + add_ref(); + + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this, file_handle, full_path_local_file]() { + if (!is_ready_for_upload()) { + derror("%s: backup status has changed to %s, stop upload " + "checkpoint file to remote, file = %s", + name, + cold_backup_status_to_string(status()), + full_path_local_file.c_str()); + std::string local_filename = + ::dsn::utils::filesystem::get_file_name( + full_path_local_file); + file_upload_uncomplete(local_filename); + } else { + on_upload(file_handle, full_path_local_file); + } + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + derror("%s: upload checkpoint file to remote failed, file = %s, err = %s", + name, + full_path_local_file.c_str(), + resp.err.to_string()); + fail_upload("upload checkpoint file to remote failed"); + } + if (resp.err != ERR_OK && _owner_replica != nullptr) { + _owner_replica->get_replica_stub() + ->_counter_cold_backup_recent_upload_file_fail_count->increment(); + } + release_ref(); + return; + }); +} + +void cold_backup_context::write_backup_metadata() +{ + if (_upload_status.load() == UploadComplete) { + ddebug("%s: upload have already done, no need write metadata again", name); + return; + } + std::string metadata = cold_backup::get_remote_chkpt_meta_file( + backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); + dist::block_service::create_file_request req; + req.file_name = metadata; + req.ignore_metadata = true; + + add_ref(); + + block_service->create_file( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, metadata](const dist::block_service::create_file_response &resp) { + if (resp.err == ERR_OK) { + dassert(resp.file_handle != nullptr, ""); + blob buffer = json::json_forwarder::encode(_metadata); + // hold itself until callback is executed + add_ref(); + ddebug("%s: create backup metadata file succeed, start to write file, file = %s", + name, + metadata.c_str()); + this->on_write(resp.file_handle, buffer, [this](bool succeed) { + if (succeed) { + std::string chkpt_dirname = cold_backup::get_remote_chkpt_dirname(); + _upload_status.store(UploadComplete); + ddebug("%s: write backup metadata complete, write current checkpoint file", + name); + write_current_chkpt_file(chkpt_dirname); + } + // NOTICE: write file fail will internal error be processed in on_write() + release_ref(); + }); + } else if (resp.err == ERR_TIMEOUT) { + derror("%s: block service create file timeout, retry after 10s, file = %s", + name, + metadata.c_str()); + add_ref(); + + tasking::enqueue( + LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this]() { + if (!is_ready_for_upload()) { + _have_write_backup_metadata.store(false); + derror( + "%s: backup status has changed to %s, stop write backup_metadata", + name, + cold_backup_status_to_string(status())); + } else { + write_backup_metadata(); + } + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + derror("%s: block service create file failed, file = %s, err = %s", + name, + metadata.c_str(), + resp.err.to_string()); + _have_write_backup_metadata.store(false); + fail_upload("create file failed"); + } + release_ref(); + return; + }); +} + +void cold_backup_context::write_current_chkpt_file(const std::string &value) +{ + // before we write current checkpoint file, we can release the memory occupied by _metadata, + // _file_status and _file_infos, because even if write current checkpoint file failed, the + // backup_metadata is uploading succeed, so we will not re-upload + _metadata.files.clear(); + _file_infos.clear(); + _file_status.clear(); + + if (!is_ready_for_upload()) { + ddebug("%s: backup status has changed to %s, stop write current checkpoint file", + name, + cold_backup_status_to_string(status())); + return; + } + + std::string current_chkpt_file = cold_backup::get_current_chkpt_file( + backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); + dist::block_service::create_file_request req; + req.file_name = current_chkpt_file; + req.ignore_metadata = true; + + add_ref(); + + block_service->create_file( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, value, current_chkpt_file](const dist::block_service::create_file_response &resp) { + if (resp.err == ERR_OK) { + dassert(resp.file_handle != nullptr, ""); + auto len = value.length(); + std::shared_ptr buf = utils::make_shared_array(len); + ::memcpy(buf.get(), value.c_str(), len); + blob write_buf(std::move(buf), static_cast(len)); + ddebug("%s: create current checkpoint file succeed, start write file ,file = %s", + name, + current_chkpt_file.c_str()); + add_ref(); + this->on_write(resp.file_handle, write_buf, [this](bool succeed) { + if (succeed) { + complete_upload(); + } + release_ref(); + }); + } else if (resp.err == ERR_TIMEOUT) { + derror("%s: block file create file timeout, retry after 10s, file = %s", + name, + current_chkpt_file.c_str()); + add_ref(); + + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this, value]() { + if (!is_ready_for_upload()) { + ddebug("%s: backup status has changed to %s, stop write " + "current checkpoint file", + name, + cold_backup_status_to_string(status())); + } else { + write_current_chkpt_file(value); + } + + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + derror("%s: block service create file failed, file = %s, err = %s", + name, + current_chkpt_file.c_str(), + resp.err.to_string()); + fail_upload("create file failed"); + } + release_ref(); + return; + }); +} + +void cold_backup_context::on_write(const dist::block_service::block_file_ptr &file_handle, + const blob &value, + const std::function &callback) +{ + dassert(file_handle != nullptr, ""); + dist::block_service::write_request req; + req.buffer = value; + + add_ref(); + + file_handle->write( + std::move(req), + LPC_BACKGROUND_COLD_BACKUP, + [this, value, file_handle, callback](const dist::block_service::write_response &resp) { + if (resp.err == ERR_OK) { + ddebug("%s: write remote file succeed, file = %s", + name, + file_handle->file_name().c_str()); + callback(true); + } else if (resp.err == ERR_TIMEOUT) { + ddebug("%s: write remote file timeout, retry after 10s, file = %s", + name, + file_handle->file_name().c_str()); + add_ref(); + + tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, + nullptr, + [this, file_handle, value, callback]() { + if (!is_ready_for_upload()) { + ddebug("%s: backup status has changed to %s, stop write " + "remote file, file = %s", + name, + cold_backup_status_to_string(status()), + file_handle->file_name().c_str()); + } else { + on_write(file_handle, value, callback); + } + release_ref(); + }, + 0, + std::chrono::seconds(10)); + } else { + // here, must call the callback to release_ref + callback(false); + derror("%s: write remote file failed, file = %s, err = %s", + name, + file_handle->file_name().c_str(), + resp.err.to_string()); + fail_upload("write remote file failed"); + } + release_ref(); + return; + }); +} + +void cold_backup_context::on_upload_file_complete(const std::string &local_filename) +{ + const int64_t &f_size = _file_infos.at(local_filename).first; + _upload_file_size.fetch_add(f_size); + file_upload_complete(local_filename); + if (_owner_replica != nullptr) { + _owner_replica->get_replica_stub() + ->_counter_cold_backup_recent_upload_file_succ_count->increment(); + _owner_replica->get_replica_stub()->_counter_cold_backup_recent_upload_file_size->add( + f_size); + } + // update progress + // int a = 10; int b = 3; then b/a = 0; + // double a = 10; double b = 3; then b/a = 0.3 + auto total = static_cast(checkpoint_file_total_size); + auto complete_size = static_cast(_upload_file_size.load()); + + if (total <= complete_size) { + ddebug("%s: upload checkpoint to remote complete, checkpoint dir = %s, total file size = " + "%" PRId64 ", file count = %d", + name, + checkpoint_dir.c_str(), + static_cast(total), + checkpoint_files.size()); + bool old_status = false; + if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) { + write_backup_metadata(); + } + return; + } else { + dassert(total != 0.0, "total = %" PRId64 "", total); + update_progress(static_cast(complete_size / total * 1000)); + ddebug("%s: the progress of upload checkpoint is %d", name, _progress.load()); + } + if (is_ready_for_upload()) { + std::vector upload_files; + upload_complete_or_fetch_uncomplete_files(upload_files); + for (auto &file : upload_files) { + ddebug("%s: start upload checkpoint file to remote, file = %s", name, file.c_str()); + upload_file(file); + } + } +} + +bool cold_backup_context::upload_complete_or_fetch_uncomplete_files(std::vector &files) +{ + bool upload_complete = false; + + zauto_lock l(_lock); + if (_file_remain_cnt > 0 && _cur_upload_file_cnt < _max_concurrent_uploading_file_cnt) { + for (const auto &_pair : _file_status) { + if (_pair.second == file_status::FileUploadUncomplete) { + files.emplace_back(_pair.first); + _file_remain_cnt -= 1; + _file_status[_pair.first] = file_status::FileUploading; + _cur_upload_file_cnt += 1; + } + if (_file_remain_cnt <= 0 || + _cur_upload_file_cnt >= _max_concurrent_uploading_file_cnt) { + break; + } + } + } + if (_file_remain_cnt <= 0 && _cur_upload_file_cnt <= 0) { + upload_complete = true; + } + return upload_complete; +} + +void cold_backup_context::file_upload_uncomplete(const std::string &filename) +{ + zauto_lock l(_lock); + + dassert(_cur_upload_file_cnt >= 1, "cur_upload_file_cnt = %d", _cur_upload_file_cnt); + _cur_upload_file_cnt -= 1; + _file_remain_cnt += 1; + _file_status[filename] = file_status::FileUploadUncomplete; +} + +void cold_backup_context::file_upload_complete(const std::string &filename) +{ + zauto_lock l(_lock); + + dassert(_cur_upload_file_cnt >= 1, "cur_upload_file_cnt = %d", _cur_upload_file_cnt); + _cur_upload_file_cnt -= 1; + _file_status[filename] = file_status::FileUploadComplete; +} + +} // namespace replication +} // namespace dsn diff --git a/src/replica/backup/cold_backup_context.h b/src/replica/backup/cold_backup_context.h new file mode 100644 index 0000000000..fec3028a12 --- /dev/null +++ b/src/replica/backup/cold_backup_context.h @@ -0,0 +1,379 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +#include "common/replication_common.h" + +class replication_service_test_app; + +namespace dsn { +namespace replication { + +class replica; + +// +// ColdBackupInvalid +// | +// V +// |<------ ColdBackupChecking ---------------------------------->| +// | | | +// | V | +// | ColdBackupChecked ----------------------------------->| +// | | | +// | V | +// ColdBackupCompleted <---| ColdBackupCheckpointing ----------------------------->| +// | | | | +// | | V |---> +// ColdBackupCanceled +// | | ColdBackupCheckpointed ------------------------------>| +// | | | | +// | | V | +// | |<------ ColdBackupUploading <======> ColdBackupPaused ------>| +// | | | | +// | |____________________________| | +// | | | +// | V | +// | ColdBackupFailed -------------------->| +// | | +// |---------------------------------------------------------------------------->| +// +enum cold_backup_status +{ + ColdBackupInvalid = 0, + ColdBackupChecking, + ColdBackupChecked, + ColdBackupCheckpointing, + ColdBackupCheckpointed, + ColdBackupUploading, + ColdBackupPaused, + ColdBackupCanceled, + ColdBackupCompleted, + ColdBackupFailed +}; +const char *cold_backup_status_to_string(cold_backup_status status); + +struct cold_backup_metadata +{ + int64_t checkpoint_decree; + int64_t checkpoint_timestamp; + std::vector files; + int64_t checkpoint_total_size; + DEFINE_JSON_SERIALIZATION(checkpoint_decree, checkpoint_timestamp, files, checkpoint_total_size) +}; + +// +// the process of uploading the checkpoint directory to block filesystem: +// 1, upload all the file of the checkpoint to block filesystem +// 2, write a cold_backup_metadata to block filesystem(which includes all the file's name, size +// and md5 and so on) +// 3, write a current_checkpoint file to block filesystem, which is used to mark which +// checkpoint is invalid +// + +// +// the process of check whether uploading is finished on block filesystem: +// 1, check whether the current checkpoint file exist, if exist continue, otherwise not finish +// 2, read the context of the current checkpoint file, the context of this file is the valid +// checkpoint dirname on block filesystem +// 3, verify whether the checkpoint dirname is exist, if exist uploading is already finished, +// otherwise uploading is not finished +// + +class cold_backup_context : public ref_counter +{ +public: + explicit cold_backup_context(replica *r_, + const backup_request &request_, + int max_upload_file_cnt) + : request(request_), + block_service(nullptr), + checkpoint_decree(0), + checkpoint_timestamp(0), + durable_decree_when_checkpoint(-1), + checkpoint_file_total_size(0), + _status(ColdBackupInvalid), + _progress(0), + _upload_file_size(0), + _have_check_upload_status(false), + _have_write_backup_metadata(false), + _upload_status(UploadInvalid), + _max_concurrent_uploading_file_cnt(max_upload_file_cnt), + _cur_upload_file_cnt(0), + _file_remain_cnt(0), + _owner_replica(r_), + _start_time_ms(0) + { + sprintf(name, + "backup{%d.%d.%s.%" PRId64 "}", + request.pid.get_app_id(), + request.pid.get_partition_index(), + request.policy.policy_name.c_str(), + request.backup_id); + memset(_reason, 0, sizeof(_reason)); + } + + ~cold_backup_context() {} + + // cancel backup. + // {*} --> ColdBackupCanceled + // + // Will be called in replication thread. + void cancel(); + + // start checking backup on remote. + // ColdBackupInvalid --> ColdBackupChecking + // Returns: + // - true if status is successfully changed to ColdBackupChecking. + bool start_check(); + + // ignore checking backup on remote and switch backward status. + // ColdBackupChecking --> ColdBackupInvalid + // Returns: + // - true if status is successfully changed to ColdBackupInvalid. + bool ignore_check() + { + int checking = ColdBackupChecking; + return _status.compare_exchange_strong(checking, ColdBackupInvalid); + } + + // mark failed when checking backup on remote. + // ColdBackupChecking --> ColdBackupFailed + // Returns: + // - true if status is successfully changed to ColdBackupFailed. + bool fail_check(const char *failure_reason); + + // complete checking backup on remote. + // ColdBackupChecking --> { ColdBackupChecked | ColdBackupCompleted } + // Returns: + // - true if status is successfully changed to ColdBackupChecked or ColdBackupCompleted. + bool complete_check(bool uploaded); + + // start generating checkpoint. + // ColdBackupChecked --> ColdBackupCheckpointing + // Returns: + // - true if status is successfully changed to ColdBackupCheckpointing. + bool start_checkpoint(); + + // ignore generating checkpoint and switch backward status. + // ColdBackupCheckpointing --> ColdBackupChecked + // Returns: + // - true if status is successfully changed to ColdBackupChecked. + bool ignore_checkpoint() + { + int checkpointing = ColdBackupCheckpointing; + return _status.compare_exchange_strong(checkpointing, ColdBackupChecked); + } + + // mark failed when generating checkpoint. + // ColdBackupCheckpointing --> ColdBackupFailed + // Returns: + // - true if status is successfully changed to ColdBackupFailed. + bool fail_checkpoint(const char *failure_reason); + + // complete generating checkpoint. + // ColdBackupCheckpointing --> ColdBackupCheckpointed + // Returns: + // - true if status is successfully changed to ColdBackupCheckpointed. + bool complete_checkpoint(); + + // start uploading checkpoint to remote. + // { ColdBackupCheckpointed | ColdBackupPaused } --> ColdBackupUploading + // + // Will be called in replication thread. + // Returns: + // - true if status is successfully changed to ColdBackupUploading. + bool start_upload() + { + int checkpointed = ColdBackupCheckpointed; + int paused = ColdBackupPaused; + return _status.compare_exchange_strong(checkpointed, ColdBackupUploading) || + _status.compare_exchange_strong(paused, ColdBackupUploading); + } + + // pause uploading checkpoint to remote. + // ColdBackupUploading --> ColdBackupPaused + // Returns: + // - true if status is successfully changed to ColdBackupPaused. + bool pause_upload(); + + // mark failed when uploading checkpoint to remote. + // { ColdBackupUploading | ColdBackupPaused } --> ColdBackupFailed + // Returns: + // - true if status is successfully changed to ColdBackupFailed. + bool fail_upload(const char *failure_reason); + + // complete uploading checkpoint to remote. + // { ColdBackupUploading | ColdBackupPaused } --> ColdBackupCompleted + // Returns: + // - true if status is successfully changed to ColdBackupCompleted. + bool complete_upload(); + + // update progress. + // Progress should be in range of [0, 1000]. + void update_progress(int progress) + { + dassert(progress >= 0 && progress <= cold_backup_constant::PROGRESS_FINISHED, + "invalid progress %d", + progress); + _progress.store(progress); + } + + // check if it is ready for checking. + bool is_ready_for_check() const { return _status.load() == ColdBackupChecking; } + + // check if it is ready for checkpointing. + bool is_checkpointing() const { return _status.load() == ColdBackupCheckpointing; } + + // check if it is ready for uploading. + bool is_ready_for_upload() const { return _status.load() == ColdBackupUploading; } + + // get current status. + cold_backup_status status() const { return (cold_backup_status)_status.load(); } + + // get current progress. + int progress() const { return _progress.load(); } + + // get failure reason. + const char *reason() const { return _reason; } + + // check if backup is aleady exist on remote. + // Preconditions: + // - name/request are set + // - checkpoint_dir/checkpoint_decree/checkpoint_files are not set + // - status is one of { ColdBackupChecking, ColdBackupCanceled } + // Will be called in background thread. + void check_backup_on_remote(); + + // upload backup checkpoint to remote. + // Preconditions: + // - name/request are set + // - checkpoint_dir/checkpoint_decree/checkpoint_files are set + // - status is one of { ColdBackupUploading, ColdBackupPaused, ColdBackupCanceled } + // Will be called in background thread. + void upload_checkpoint_to_remote(); + + uint64_t get_start_time_ms() { return _start_time_ms; } + + uint64_t get_upload_file_size() { return _upload_file_size.load(); } + + int64_t get_checkpoint_total_size() { return checkpoint_file_total_size; } + +private: + void read_current_chkpt_file(const dist::block_service::block_file_ptr &file_handle); + void remote_chkpt_dir_exist(const std::string &chkpt_dirname); + + void read_backup_metadata(const dist::block_service::block_file_ptr &file_handle); + // value is a json string, verify it's validity + // validity means uploading checkpoint directory complete, so just write_current_chkpt_file + // otherwise, upload checkpoint directory + void verify_backup_metadata(const blob &value); + // after upload_checkpoint_directory ---> write_backup_metadata --> write_current_chkpt_file --> + // notify meta + void write_backup_metadata(); + + void write_current_chkpt_file(const std::string &value); + // write value to file, if succeed then callback(true), else callback(false) + void on_write(const dist::block_service::block_file_ptr &file_handle, + const blob &value, + const std::function &callback); + void prepare_upload(); + void on_upload_chkpt_dir(); + void upload_file(const std::string &local_filename); + void on_upload(const dist::block_service::block_file_ptr &file_handle, + const std::string &full_path_local_file); + void on_upload_file_complete(const std::string &local_filename); + + // functions access the structure protected by _lock + // return: + // -- true, uploading is complete + // -- false, uploading is not complete; and put uncomplete file into 'files' + bool upload_complete_or_fetch_uncomplete_files(std::vector &files); + void file_upload_uncomplete(const std::string &filename); + void file_upload_complete(const std::string &filename); + +public: + /// the following variables are public, and will only be set once, and will not be changed once + /// set. + char name[256]; // backup{...} + // all logging should print the name + backup_request request; + dist::block_service::block_filesystem *block_service; + std::string backup_root; + decree checkpoint_decree; + int64_t checkpoint_timestamp; + decree durable_decree_when_checkpoint; + std::string checkpoint_dir; + std::vector checkpoint_files; + std::vector checkpoint_file_sizes; + int64_t checkpoint_file_total_size; + +private: + friend class ::replication_service_test_app; + + /// state variables + std::atomic_int _status; + std::atomic_int _progress; // [0,1000], 1000 means completed + char _reason[1024]; // failure reason + + std::atomic_llong _upload_file_size; + // TODO: if chechpoint directory has many files, cold_backup_metadata may + // occupy large amount of memory + // for example, if a single file occupy 32B, then 1,000,000 files may occupy 32MB + cold_backup_metadata _metadata; + + enum upload_status + { + UploadInvalid = 0, + UploadUncomplete, + UploadComplete + }; + enum file_status + { + FileUploadUncomplete = 0, + FileUploading, + FileUploadComplete + }; + + // two atomic variants is to ensure check_upload_status and write_backup_metadata just be + // executed once + std::atomic_bool _have_check_upload_status; + std::atomic_bool _have_write_backup_metadata; + + std::atomic_int _upload_status; + + int32_t _max_concurrent_uploading_file_cnt; + // filename -> + std::map> _file_infos; + + zlock _lock; // lock the structure below + std::map _file_status; + int32_t _cur_upload_file_cnt; + int32_t _file_remain_cnt; + + replica *_owner_replica; + uint64_t _start_time_ms; +}; + +typedef dsn::ref_ptr cold_backup_context_ptr; + +} // namespace replication +} // namespace dsn diff --git a/src/replica/backup/replica_backup_manager.cpp b/src/replica/backup/replica_backup_manager.cpp index 4f6787d922..599af6454f 100644 --- a/src/replica/backup/replica_backup_manager.cpp +++ b/src/replica/backup/replica_backup_manager.cpp @@ -3,6 +3,7 @@ // can be found in the LICENSE file in the root directory of this source tree. #include "replica_backup_manager.h" +#include "cold_backup_context.h" #include #include diff --git a/src/replica/replica.cpp b/src/replica/replica.cpp index 13c1b9c2c9..246f6ae841 100644 --- a/src/replica/replica.cpp +++ b/src/replica/replica.cpp @@ -30,6 +30,7 @@ #include "replica_stub.h" #include "duplication/replica_duplicator_manager.h" #include "backup/replica_backup_manager.h" +#include "backup/cold_backup_context.h" #include "bulk_load/replica_bulk_loader.h" #include diff --git a/src/replica/replica.h b/src/replica/replica.h index a260532543..91182d8ec2 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -65,6 +65,10 @@ class replica_duplicator_manager; class replica_backup_manager; class replica_bulk_loader; +class cold_backup_context; +typedef dsn::ref_ptr cold_backup_context_ptr; +class cold_backup_metadata; + namespace test { class test_checker; } @@ -339,7 +343,6 @@ class replica : public serverlet, public ref_counter, public replica_ba void send_backup_request_to_secondary(const backup_request &request); // set all cold_backup_state cancel/pause void set_backup_context_cancel(); - void set_backup_context_pause(); void clear_cold_backup_state(); ///////////////////////////////////////////////////////////////// diff --git a/src/replica/replica_backup.cpp b/src/replica/replica_backup.cpp index 8d849685f0..b6b800d6e0 100644 --- a/src/replica/replica_backup.cpp +++ b/src/replica/replica_backup.cpp @@ -4,18 +4,23 @@ #include #include #include +#include #include "block_service/block_service_manager.h" #include "backup/replica_backup_manager.h" +#include "backup/cold_backup_context.h" #include "replica.h" -#include "mutation.h" -#include "mutation_log.h" #include "replica_stub.h" namespace dsn { namespace replication { +DSN_DEFINE_uint64("replication", + max_concurrent_uploading_file_count, + 10, + "concurrent uploading file count to block service"); + void replica::on_cold_backup(const backup_request &request, /*out*/ backup_response &response) { _checker.only_one_thread_access(); @@ -23,7 +28,7 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo const std::string &policy_name = request.policy.policy_name; auto backup_id = request.backup_id; cold_backup_context_ptr new_context( - new cold_backup_context(this, request, _options->max_concurrent_uploading_file_count)); + new cold_backup_context(this, request, FLAGS_max_concurrent_uploading_file_count)); ddebug_replica("{}: received cold backup request, partition_status = {}", new_context->name, @@ -55,11 +60,7 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo backup_context->backup_root = _options->cold_backup_root; } - dassert(backup_context != nullptr, ""); - dassert(backup_context->request.policy.policy_name == policy_name, - "%s VS %s", - backup_context->request.policy.policy_name.c_str(), - policy_name.c_str()); + dcheck_eq_replica(backup_context->request.policy.policy_name, policy_name); cold_backup_status backup_status = backup_context->status(); if (backup_context->request.backup_id < backup_id || backup_status == ColdBackupCanceled) { @@ -76,6 +77,8 @@ void replica::on_cold_backup(const backup_request &request, /*out*/ backup_respo get_gpid().thread_hash(), std::chrono::seconds(100)); } else { + // TODO(wutao1): deleting cold backup context should be + // extracted as a function like try_delete_cold_backup_context; // clear obsoleted backup context firstly ddebug("%s: clear obsoleted cold backup context, old_backup_id = %" PRId64 ", old_backup_status = %s", @@ -692,16 +695,6 @@ void replica::set_backup_context_cancel() } } -void replica::set_backup_context_pause() -{ - for (auto &pair : _cold_backup_contexts) { - pair.second->pause_upload(); - ddebug("%s: pause backup progress, backup_request = %s", - name(), - boost::lexical_cast(pair.second->request).c_str()); - } -} - void replica::clear_cold_backup_state() { _cold_backup_contexts.clear(); } -} -} // namespace +} // namespace replication +} // namespace dsn diff --git a/src/replica/replica_context.cpp b/src/replica/replica_context.cpp index 3437f4d014..c306b3702c 100644 --- a/src/replica/replica_context.cpp +++ b/src/replica/replica_context.cpp @@ -237,1066 +237,6 @@ bool potential_secondary_context::is_cleaned() nullptr == catchup_with_private_log_task && nullptr == completion_notify_task; } -const char *cold_backup_status_to_string(cold_backup_status status) -{ - switch (status) { - case ColdBackupInvalid: - return "ColdBackupInvalid"; - case ColdBackupChecking: - return "ColdBackupChecking"; - case ColdBackupChecked: - return "ColdBackupChecked"; - case ColdBackupCheckpointing: - return "ColdBackupCheckpointing"; - case ColdBackupCheckpointed: - return "ColdBackupCheckpointed"; - case ColdBackupUploading: - return "ColdBackupUploading"; - case ColdBackupPaused: - return "ColdBackupPaused"; - case ColdBackupCanceled: - return "ColdBackupCanceled"; - case ColdBackupCompleted: - return "ColdBackupCompleted"; - case ColdBackupFailed: - return "ColdBackupFailed"; - default: - dassert(false, ""); - } - return "ColdBackupXXX"; -} - -void cold_backup_context::cancel() -{ - _status.store(ColdBackupCanceled); - if (_owner_replica != nullptr) { - _owner_replica->get_replica_stub()->_counter_cold_backup_recent_cancel_count->increment(); - } -} - -bool cold_backup_context::start_check() -{ - int invalid = ColdBackupInvalid; - if (_status.compare_exchange_strong(invalid, ColdBackupChecking)) { - _start_time_ms = dsn_now_ms(); - return true; - } else { - return false; - } -} - -bool cold_backup_context::fail_check(const char *failure_reason) -{ - int checking = ColdBackupChecking; - if (_status.compare_exchange_strong(checking, ColdBackupFailed)) { - strncpy(_reason, failure_reason, sizeof(_reason) - 1); - _reason[sizeof(_reason) - 1] = '\0'; - if (_owner_replica != nullptr) { - _owner_replica->get_replica_stub()->_counter_cold_backup_recent_fail_count->increment(); - } - return true; - } else { - return false; - } -} - -bool cold_backup_context::complete_check(bool uploaded) -{ - int checking = ColdBackupChecking; - if (uploaded) { - _progress.store(cold_backup_constant::PROGRESS_FINISHED); - if (_owner_replica != nullptr) { - _owner_replica->get_replica_stub()->_counter_cold_backup_recent_succ_count->increment(); - } - return _status.compare_exchange_strong(checking, ColdBackupCompleted); - } else { - return _status.compare_exchange_strong(checking, ColdBackupChecked); - } -} - -bool cold_backup_context::start_checkpoint() -{ - int checked = ColdBackupChecked; - if (_status.compare_exchange_strong(checked, ColdBackupCheckpointing)) { - return true; - } else { - return false; - } -} - -bool cold_backup_context::fail_checkpoint(const char *failure_reason) -{ - int checkpointing = ColdBackupCheckpointing; - if (_status.compare_exchange_strong(checkpointing, ColdBackupFailed)) { - strncpy(_reason, failure_reason, sizeof(_reason) - 1); - _reason[sizeof(_reason) - 1] = '\0'; - if (_owner_replica != nullptr) { - _owner_replica->get_replica_stub()->_counter_cold_backup_recent_fail_count->increment(); - } - return true; - } else { - return false; - } -} - -bool cold_backup_context::complete_checkpoint() -{ - int checkpointing = ColdBackupCheckpointing; - if (_status.compare_exchange_strong(checkpointing, ColdBackupCheckpointed)) { - return true; - } else { - return false; - } -} - -bool cold_backup_context::pause_upload() -{ - int uploading = ColdBackupUploading; - if (_status.compare_exchange_strong(uploading, ColdBackupPaused)) { - if (_owner_replica != nullptr) { - _owner_replica->get_replica_stub() - ->_counter_cold_backup_recent_pause_count->increment(); - } - return true; - } else { - return false; - } -} - -bool cold_backup_context::fail_upload(const char *failure_reason) -{ - int uploading = ColdBackupUploading; - int paused = ColdBackupPaused; - if (_status.compare_exchange_strong(uploading, ColdBackupFailed) || - _status.compare_exchange_strong(paused, ColdBackupFailed)) { - strncpy(_reason, failure_reason, sizeof(_reason) - 1); - _reason[sizeof(_reason) - 1] = '\0'; - if (_owner_replica != nullptr) { - _owner_replica->get_replica_stub()->_counter_cold_backup_recent_fail_count->increment(); - } - return true; - } else { - return false; - } -} - -bool cold_backup_context::complete_upload() -{ - int uploading = ColdBackupUploading; - int paused = ColdBackupPaused; - if (_status.compare_exchange_strong(uploading, ColdBackupCompleted) || - _status.compare_exchange_strong(paused, ColdBackupCompleted)) { - _progress.store(cold_backup_constant::PROGRESS_FINISHED); - if (_owner_replica != nullptr) { - _owner_replica->get_replica_stub()->_counter_cold_backup_recent_succ_count->increment(); - } - return true; - } else { - return false; - } -} - -// run in REPLICATION_LONG thread -void cold_backup_context::check_backup_on_remote() -{ - // check whether current checkpoint file is exist on remote, and verify whether the checkpoint - // directory is exist - std::string current_chkpt_file = cold_backup::get_current_chkpt_file( - backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); - dist::block_service::create_file_request req; - req.file_name = current_chkpt_file; - req.ignore_metadata = false; - - // incr the ref counter, and must release_ref() after callback is execute - add_ref(); - - block_service->create_file( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, current_chkpt_file](const dist::block_service::create_file_response &resp) { - if (!is_ready_for_check()) { - ddebug("%s: backup status has changed to %s, ignore checking backup on remote", - name, - cold_backup_status_to_string(status())); - ignore_check(); - } else if (resp.err == ERR_OK) { - const dist::block_service::block_file_ptr &file_handle = resp.file_handle; - dassert(file_handle != nullptr, ""); - if (file_handle->get_md5sum().empty() && file_handle->get_size() <= 0) { - ddebug("%s: check backup on remote, current_checkpoint file %s is not exist", - name, - current_chkpt_file.c_str()); - complete_check(false); - } else { - ddebug("%s: check backup on remote, current_checkpoint file %s is exist", - name, - current_chkpt_file.c_str()); - read_current_chkpt_file(file_handle); - } - } else if (resp.err == ERR_TIMEOUT) { - derror("%s: block service create file timeout, retry after 10 seconds, file = %s", - name, - current_chkpt_file.c_str()); - - // before retry, should add_ref(), and must release_ref() after retry - add_ref(); - - tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this]() { - // before retry, should check whether the status is ready for - // check - if (!is_ready_for_check()) { - ddebug("%s: backup status has changed to %s, ignore " - "checking backup on remote", - name, - cold_backup_status_to_string(status())); - ignore_check(); - } else { - check_backup_on_remote(); - } - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - derror("%s: block service create file failed, file = %s, err = %s", - name, - current_chkpt_file.c_str(), - resp.err.to_string()); - fail_check("block service create file failed"); - } - release_ref(); - }); -} - -void cold_backup_context::read_current_chkpt_file( - const dist::block_service::block_file_ptr &file_handle) -{ - dist::block_service::read_request req; - req.remote_pos = 0; - req.remote_length = -1; - - add_ref(); - - file_handle->read( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, file_handle](const dist::block_service::read_response &resp) { - if (!is_ready_for_check()) { - ddebug("%s: backup status has changed to %s, ignore checking backup on remote", - name, - cold_backup_status_to_string(status())); - ignore_check(); - } else if (resp.err == ERR_OK) { - std::string chkpt_dirname(resp.buffer.data(), resp.buffer.length()); - if (chkpt_dirname.empty()) { - complete_check(false); - } else { - ddebug("%s: after read current_checkpoint_file, check whether remote " - "checkpoint dir = %s is exist", - name, - chkpt_dirname.c_str()); - remote_chkpt_dir_exist(chkpt_dirname); - } - } else if (resp.err == ERR_TIMEOUT) { - derror("%s: read remote file timeout, retry after 10s, file = %s", - name, - file_handle->file_name().c_str()); - add_ref(); - - tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this, file_handle]() { - if (!is_ready_for_check()) { - ddebug("%s: backup status has changed to %s, ignore " - "checking backup on remote", - name, - cold_backup_status_to_string(status())); - ignore_check(); - } else { - read_current_chkpt_file(file_handle); - } - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - derror("%s: read remote file failed, file = %s, err = %s", - name, - file_handle->file_name().c_str(), - resp.err.to_string()); - fail_check("read remote file failed"); - } - release_ref(); - return; - }); -} - -void cold_backup_context::remote_chkpt_dir_exist(const std::string &chkpt_dirname) -{ - dist::block_service::ls_request req; - req.dir_name = cold_backup::get_replica_backup_path( - backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); - - add_ref(); - - block_service->list_dir( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, chkpt_dirname](const dist::block_service::ls_response &resp) { - if (!is_ready_for_check()) { - ddebug("%s: backup status has changed to %s, ignore checking backup on remote", - name, - cold_backup_status_to_string(status())); - ignore_check(); - } else if (resp.err == ERR_OK) { - bool found_chkpt_dir = false; - for (const auto &entry : (*resp.entries)) { - if (entry.is_directory && entry.entry_name == chkpt_dirname) { - found_chkpt_dir = true; - break; - } - } - if (found_chkpt_dir) { - ddebug("%s: remote checkpoint dir is already exist, so upload have already " - "complete, remote_checkpoint_dirname = %s", - name, - chkpt_dirname.c_str()); - complete_check(true); - } else { - ddebug("%s: remote checkpoint dir is not exist, should re-upload checkpoint " - "dir, remote_checkpoint_dirname = %s", - name, - chkpt_dirname.c_str()); - complete_check(false); - } - } else if (resp.err == ERR_OBJECT_NOT_FOUND) { - ddebug("%s: remote checkpoint dir is not exist, should re-upload checkpoint dir, " - "remote_checkpoint_dirname = %s", - name, - chkpt_dirname.c_str()); - complete_check(false); - } else if (resp.err == ERR_TIMEOUT) { - derror("%s: block service list remote dir timeout, retry after 10s, dirname = %s", - name, - chkpt_dirname.c_str()); - add_ref(); - - tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this, chkpt_dirname]() { - if (!is_ready_for_check()) { - ddebug("%s: backup status has changed to %s, ignore " - "checking backup on remote", - name, - cold_backup_status_to_string(status())); - ignore_check(); - } else { - remote_chkpt_dir_exist(chkpt_dirname); - } - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - derror("%s: block service list remote dir failed, dirname = %s, err = %s", - name, - chkpt_dirname.c_str(), - resp.err.to_string()); - fail_check("list remote dir failed"); - } - release_ref(); - return; - }); -} - -void cold_backup_context::upload_checkpoint_to_remote() -{ - if (!is_ready_for_upload()) { - ddebug("%s: backup status has changed to %s, ignore upload checkpoint", - name, - cold_backup_status_to_string(status())); - return; - } - - bool old_status = false; - // here, just allow one task to check upload status, and it will set _upload_status base on - // the result it has checked; But, because of upload_checkpoint_to_remote maybe call multi-times - // (for pause - uploading), so we use the atomic variant to implement - if (!_have_check_upload_status.compare_exchange_strong(old_status, true)) { - ddebug("%s: upload status has already been checked, start upload checkpoint dir directly", - name); - on_upload_chkpt_dir(); - return; - } - - // check whether cold_backup_metadata is exist and verify cold_backup_metadata if exist - std::string metadata = cold_backup::get_remote_chkpt_meta_file( - backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); - dist::block_service::create_file_request req; - req.file_name = metadata; - req.ignore_metadata = false; - - add_ref(); - - block_service->create_file( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, metadata](const dist::block_service::create_file_response &resp) { - if (resp.err == ERR_OK) { - dassert(resp.file_handle != nullptr, ""); - if (resp.file_handle->get_md5sum().empty() && resp.file_handle->get_size() <= 0) { - _upload_status.store(UploadUncomplete); - ddebug("%s: check upload_status complete, cold_backup_metadata isn't exist, " - "start upload checkpoint dir", - name); - on_upload_chkpt_dir(); - } else { - ddebug("%s: cold_backup_metadata is exist, read it's context", name); - read_backup_metadata(resp.file_handle); - } - } else if (resp.err == ERR_TIMEOUT) { - derror("%s: block service create file timeout, retry after 10s, file = %s", - name, - metadata.c_str()); - // when create backup_metadata timeout, should reset _have_check_upload_status - // false to allow re-check - _have_check_upload_status.store(false); - add_ref(); - - tasking::enqueue( - LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this]() { - if (!is_ready_for_upload()) { - ddebug("%s: backup status has changed to %s, stop check upload status", - name, - cold_backup_status_to_string(status())); - } else { - upload_checkpoint_to_remote(); - } - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - derror("%s: block service create file failed, file = %s, err = %s", - name, - metadata.c_str(), - resp.err.to_string()); - _have_check_upload_status.store(false); - fail_upload("block service create file failed"); - } - release_ref(); - return; - }); -} - -void cold_backup_context::read_backup_metadata( - const dist::block_service::block_file_ptr &file_handle) -{ - dist::block_service::read_request req; - req.remote_pos = 0; - req.remote_length = -1; - - add_ref(); - - file_handle->read( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, file_handle](const dist::block_service::read_response &resp) { - if (resp.err == ERR_OK) { - ddebug("%s: read cold_backup_metadata succeed, verify it's context, file = %s", - name, - file_handle->file_name().c_str()); - verify_backup_metadata(resp.buffer); - on_upload_chkpt_dir(); - } else if (resp.err == ERR_TIMEOUT) { - derror("%s: read remote file timeout, retry after 10s, file = %s", - name, - file_handle->file_name().c_str()); - add_ref(); - - tasking::enqueue( - LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this, file_handle] { - if (!is_ready_for_upload()) { - ddebug("%s: backup status has changed to %s, stop check upload status", - name, - cold_backup_status_to_string(status())); - _have_check_upload_status.store(false); - } else { - read_backup_metadata(file_handle); - } - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - derror("%s: read remote file failed, file = %s, err = %s", - name, - file_handle->file_name().c_str(), - resp.err.to_string()); - _have_check_upload_status.store(false); - fail_upload("read remote file failed"); - } - release_ref(); - return; - }); -} - -void cold_backup_context::verify_backup_metadata(const blob &value) -{ - cold_backup_metadata tmp; - if (value.length() > 0 && json::json_forwarder::decode(value, tmp)) { - ddebug("%s: check upload status complete, checkpoint dir uploading has already complete", - name); - _upload_status.store(UploadComplete); - } else { - ddebug("%s: check upload status complete, checkpoint dir uploading isn't complete yet", - name); - _upload_status.store(UploadUncomplete); - } -} - -void cold_backup_context::on_upload_chkpt_dir() -{ - if (_upload_status.load() == UploadInvalid || !is_ready_for_upload()) { - ddebug("%s: replica is not ready for uploading, ignore upload, cold_backup_status(%s)", - name, - cold_backup_status_to_string(status())); - return; - } - - if (_upload_status.load() == UploadComplete) { - // TODO: if call upload_checkpint_to_remote multi times, maybe write_current_chkpt_file - // multi times - std::string chkpt_dirname = cold_backup::get_remote_chkpt_dirname(); - write_current_chkpt_file(chkpt_dirname); - return; - } - - prepare_upload(); - - // prepare_upload maybe fail, so here check status - if (!is_ready_for_upload()) { - derror("%s: backup status has changed to %s, stop upload checkpoint dir", - name, - cold_backup_status_to_string(status())); - return; - } - - if (checkpoint_files.size() <= 0) { - ddebug("%s: checkpoint dir is empty, so upload is complete and just start write " - "backup_metadata", - name); - bool old_status = false; - // using atomic variant _have_write_backup_metadata is to allow one task to - // write backup_metadata because on_upload_chkpt_dir maybe call multi-time - if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) { - write_backup_metadata(); - } - } else { - ddebug("%s: start upload checkpoint dir, checkpoint dir = %s, total checkpoint file = %d", - name, - checkpoint_dir.c_str(), - checkpoint_files.size()); - std::vector files; - if (!upload_complete_or_fetch_uncomplete_files(files)) { - for (auto &file : files) { - ddebug("%s: start upload checkpoint file to remote, file = %s", name, file.c_str()); - upload_file(file); - } - } else { - ddebug("%s: upload checkpoint dir to remote complete, total_file_cnt = %d", - name, - checkpoint_files.size()); - bool old_status = false; - if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) { - write_backup_metadata(); - } - } - } -} - -void cold_backup_context::prepare_upload() -{ - zauto_lock l(_lock); - // only need initialize once - if (_metadata.files.size() > 0) { - return; - } - _file_remain_cnt = checkpoint_files.size(); - - _metadata.checkpoint_decree = checkpoint_decree; - _metadata.checkpoint_timestamp = checkpoint_timestamp; - _metadata.checkpoint_total_size = checkpoint_file_total_size; - for (int32_t idx = 0; idx < checkpoint_files.size(); idx++) { - std::string &file = checkpoint_files[idx]; - file_meta f_meta; - f_meta.name = file; - std::string file_full_path = ::dsn::utils::filesystem::path_combine(checkpoint_dir, file); - int64_t file_size = checkpoint_file_sizes[idx]; - std::string file_md5; - if (::dsn::utils::filesystem::md5sum(file_full_path, file_md5) != ERR_OK) { - derror("%s: get local file size or md5 fail, file = %s", name, file_full_path.c_str()); - fail_upload("compute local file size or md5 failed"); - return; - } - f_meta.md5 = file_md5; - f_meta.size = file_size; - _metadata.files.emplace_back(f_meta); - _file_status.insert(std::make_pair(file, FileUploadUncomplete)); - _file_infos.insert(std::make_pair(file, std::make_pair(file_size, file_md5))); - } - _upload_file_size.store(0); -} - -void cold_backup_context::upload_file(const std::string &local_filename) -{ - std::string remote_chkpt_dir = cold_backup::get_remote_chkpt_dir( - backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); - dist::block_service::create_file_request req; - req.file_name = ::dsn::utils::filesystem::path_combine(remote_chkpt_dir, local_filename); - req.ignore_metadata = false; - - add_ref(); - - block_service->create_file( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, local_filename](const dist::block_service::create_file_response &resp) { - if (resp.err == ERR_OK) { - const dist::block_service::block_file_ptr &file_handle = resp.file_handle; - dassert(file_handle != nullptr, ""); - int64_t local_file_size = _file_infos.at(local_filename).first; - std::string md5 = _file_infos.at(local_filename).second; - std::string full_path_local_file = - ::dsn::utils::filesystem::path_combine(checkpoint_dir, local_filename); - if (md5 == file_handle->get_md5sum() && - local_file_size == file_handle->get_size()) { - ddebug("%s: checkpoint file already exist on remote, file = %s", - name, - full_path_local_file.c_str()); - on_upload_file_complete(local_filename); - } else { - ddebug("%s: start upload checkpoint file to remote, file = %s", - name, - full_path_local_file.c_str()); - on_upload(file_handle, full_path_local_file); - } - } else if (resp.err == ERR_TIMEOUT) { - derror("%s: block service create file timeout, retry after 10s, file = %s", - name, - local_filename.c_str()); - add_ref(); - - tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this, local_filename]() { - // TODO: status change from ColdBackupUploading to - // ColdBackupPaused, and upload file timeout, but when callback - // is executed it catches the status(ColdBackupPaused) - // now, if status back to ColdBackupUploading very soon, and - // call upload_checkpoint_to_remote() here, - // upload_checkpoint_to_remote() maybe acquire the _lock first, - // then stop give back file(upload timeout), the file is still - // in uploading this file will not be uploaded until you call - // upload_checkpoint_to_remote() after it's given back - if (!is_ready_for_upload()) { - std::string full_path_local_file = - ::dsn::utils::filesystem::path_combine(checkpoint_dir, - local_filename); - ddebug("%s: backup status has changed to %s, stop upload " - "checkpoint file to remote, file = %s", - name, - cold_backup_status_to_string(status()), - full_path_local_file.c_str()); - file_upload_uncomplete(local_filename); - } else { - upload_file(local_filename); - } - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - derror("%s: block service create file failed, file = %s, err = %s", - name, - local_filename.c_str(), - resp.err.to_string()); - fail_upload("create file failed"); - } - if (resp.err != ERR_OK && _owner_replica != nullptr) { - _owner_replica->get_replica_stub() - ->_counter_cold_backup_recent_upload_file_fail_count->increment(); - } - release_ref(); - return; - }); -} - -void cold_backup_context::on_upload(const dist::block_service::block_file_ptr &file_handle, - const std::string &full_path_local_file) -{ - dist::block_service::upload_request req; - req.input_local_name = full_path_local_file; - - add_ref(); - - file_handle->upload( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, file_handle, full_path_local_file]( - const dist::block_service::upload_response &resp) { - if (resp.err == ERR_OK) { - std::string local_filename = - ::dsn::utils::filesystem::get_file_name(full_path_local_file); - dassert(_file_infos.at(local_filename).first == - static_cast(resp.uploaded_size), - ""); - ddebug("%s: upload checkpoint file complete, file = %s", - name, - full_path_local_file.c_str()); - on_upload_file_complete(local_filename); - } else if (resp.err == ERR_TIMEOUT) { - derror("%s: upload checkpoint file timeout, retry after 10s, file = %s", - name, - full_path_local_file.c_str()); - add_ref(); - - tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this, file_handle, full_path_local_file]() { - if (!is_ready_for_upload()) { - derror("%s: backup status has changed to %s, stop upload " - "checkpoint file to remote, file = %s", - name, - cold_backup_status_to_string(status()), - full_path_local_file.c_str()); - std::string local_filename = - ::dsn::utils::filesystem::get_file_name( - full_path_local_file); - file_upload_uncomplete(local_filename); - } else { - on_upload(file_handle, full_path_local_file); - } - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - derror("%s: upload checkpoint file to remote failed, file = %s, err = %s", - name, - full_path_local_file.c_str(), - resp.err.to_string()); - fail_upload("upload checkpoint file to remote failed"); - } - if (resp.err != ERR_OK && _owner_replica != nullptr) { - _owner_replica->get_replica_stub() - ->_counter_cold_backup_recent_upload_file_fail_count->increment(); - } - release_ref(); - return; - }); -} - -void cold_backup_context::write_backup_metadata() -{ - if (_upload_status.load() == UploadComplete) { - ddebug("%s: upload have already done, no need write metadata again", name); - return; - } - std::string metadata = cold_backup::get_remote_chkpt_meta_file( - backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); - dist::block_service::create_file_request req; - req.file_name = metadata; - req.ignore_metadata = true; - - add_ref(); - - block_service->create_file( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, metadata](const dist::block_service::create_file_response &resp) { - if (resp.err == ERR_OK) { - dassert(resp.file_handle != nullptr, ""); - blob buffer = json::json_forwarder::encode(_metadata); - // hold itself until callback is executed - add_ref(); - ddebug("%s: create backup metadata file succeed, start to write file, file = %s", - name, - metadata.c_str()); - this->on_write(resp.file_handle, buffer, [this](bool succeed) { - if (succeed) { - std::string chkpt_dirname = cold_backup::get_remote_chkpt_dirname(); - _upload_status.store(UploadComplete); - ddebug("%s: write backup metadata complete, write current checkpoint file", - name); - write_current_chkpt_file(chkpt_dirname); - } - // NOTICE: write file fail will internal error be processed in on_write() - release_ref(); - }); - } else if (resp.err == ERR_TIMEOUT) { - derror("%s: block service create file timeout, retry after 10s, file = %s", - name, - metadata.c_str()); - add_ref(); - - tasking::enqueue( - LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this]() { - if (!is_ready_for_upload()) { - _have_write_backup_metadata.store(false); - derror( - "%s: backup status has changed to %s, stop write backup_metadata", - name, - cold_backup_status_to_string(status())); - } else { - write_backup_metadata(); - } - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - derror("%s: block service create file failed, file = %s, err = %s", - name, - metadata.c_str(), - resp.err.to_string()); - _have_write_backup_metadata.store(false); - fail_upload("create file failed"); - } - release_ref(); - return; - }); -} - -void cold_backup_context::write_current_chkpt_file(const std::string &value) -{ - // before we write current checkpoint file, we can release the memory occupied by _metadata, - // _file_status and _file_infos, because even if write current checkpoint file failed, the - // backup_metadata is uploading succeed, so we will not re-upload - _metadata.files.clear(); - _file_infos.clear(); - _file_status.clear(); - - if (!is_ready_for_upload()) { - ddebug("%s: backup status has changed to %s, stop write current checkpoint file", - name, - cold_backup_status_to_string(status())); - return; - } - - std::string current_chkpt_file = cold_backup::get_current_chkpt_file( - backup_root, request.policy.policy_name, request.app_name, request.pid, request.backup_id); - dist::block_service::create_file_request req; - req.file_name = current_chkpt_file; - req.ignore_metadata = true; - - add_ref(); - - block_service->create_file( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, value, current_chkpt_file](const dist::block_service::create_file_response &resp) { - if (resp.err == ERR_OK) { - dassert(resp.file_handle != nullptr, ""); - auto len = value.length(); - std::shared_ptr buf = utils::make_shared_array(len); - ::memcpy(buf.get(), value.c_str(), len); - blob write_buf(std::move(buf), static_cast(len)); - ddebug("%s: create current checkpoint file succeed, start write file ,file = %s", - name, - current_chkpt_file.c_str()); - add_ref(); - this->on_write(resp.file_handle, write_buf, [this](bool succeed) { - if (succeed) { - complete_upload(); - } - release_ref(); - }); - } else if (resp.err == ERR_TIMEOUT) { - derror("%s: block file create file timeout, retry after 10s, file = %s", - name, - current_chkpt_file.c_str()); - add_ref(); - - tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this, value]() { - if (!is_ready_for_upload()) { - ddebug("%s: backup status has changed to %s, stop write " - "current checkpoint file", - name, - cold_backup_status_to_string(status())); - } else { - write_current_chkpt_file(value); - } - - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - derror("%s: block service create file failed, file = %s, err = %s", - name, - current_chkpt_file.c_str(), - resp.err.to_string()); - fail_upload("create file failed"); - } - release_ref(); - return; - }); -} - -void cold_backup_context::on_write(const dist::block_service::block_file_ptr &file_handle, - const blob &value, - const std::function &callback) -{ - dassert(file_handle != nullptr, ""); - dist::block_service::write_request req; - req.buffer = value; - - add_ref(); - - file_handle->write( - std::move(req), - LPC_BACKGROUND_COLD_BACKUP, - [this, value, file_handle, callback](const dist::block_service::write_response &resp) { - if (resp.err == ERR_OK) { - ddebug("%s: write remote file succeed, file = %s", - name, - file_handle->file_name().c_str()); - callback(true); - } else if (resp.err == ERR_TIMEOUT) { - ddebug("%s: write remote file timeout, retry after 10s, file = %s", - name, - file_handle->file_name().c_str()); - add_ref(); - - tasking::enqueue(LPC_BACKGROUND_COLD_BACKUP, - nullptr, - [this, file_handle, value, callback]() { - if (!is_ready_for_upload()) { - ddebug("%s: backup status has changed to %s, stop write " - "remote file, file = %s", - name, - cold_backup_status_to_string(status()), - file_handle->file_name().c_str()); - } else { - on_write(file_handle, value, callback); - } - release_ref(); - }, - 0, - std::chrono::seconds(10)); - } else { - // here, must call the callback to release_ref - callback(false); - derror("%s: write remote file failed, file = %s, err = %s", - name, - file_handle->file_name().c_str(), - resp.err.to_string()); - fail_upload("write remote file failed"); - } - release_ref(); - return; - }); -} - -void cold_backup_context::on_upload_file_complete(const std::string &local_filename) -{ - const int64_t &f_size = _file_infos.at(local_filename).first; - _upload_file_size.fetch_add(f_size); - file_upload_complete(local_filename); - if (_owner_replica != nullptr) { - _owner_replica->get_replica_stub() - ->_counter_cold_backup_recent_upload_file_succ_count->increment(); - _owner_replica->get_replica_stub()->_counter_cold_backup_recent_upload_file_size->add( - f_size); - } - // update progress - // int a = 10; int b = 3; then b/a = 0; - // double a = 10; double b = 3; then b/a = 0.3 - auto total = static_cast(checkpoint_file_total_size); - auto complete_size = static_cast(_upload_file_size.load()); - - if (total <= complete_size) { - ddebug("%s: upload checkpoint to remote complete, checkpoint dir = %s, total file size = " - "%" PRId64 ", file count = %d", - name, - checkpoint_dir.c_str(), - static_cast(total), - checkpoint_files.size()); - bool old_status = false; - if (_have_write_backup_metadata.compare_exchange_strong(old_status, true)) { - write_backup_metadata(); - } - return; - } else { - dassert(total != 0.0, "total = %" PRId64 "", total); - update_progress(static_cast(complete_size / total * 1000)); - ddebug("%s: the progress of upload checkpoint is %d", name, _progress.load()); - } - if (is_ready_for_upload()) { - std::vector upload_files; - upload_complete_or_fetch_uncomplete_files(upload_files); - for (auto &file : upload_files) { - ddebug("%s: start upload checkpoint file to remote, file = %s", name, file.c_str()); - upload_file(file); - } - } -} - -bool cold_backup_context::upload_complete_or_fetch_uncomplete_files(std::vector &files) -{ - bool upload_complete = false; - - zauto_lock l(_lock); - if (_file_remain_cnt > 0 && _cur_upload_file_cnt < _max_concurrent_uploading_file_cnt) { - for (const auto &_pair : _file_status) { - if (_pair.second == file_status::FileUploadUncomplete) { - files.emplace_back(_pair.first); - _file_remain_cnt -= 1; - _file_status[_pair.first] = file_status::FileUploading; - _cur_upload_file_cnt += 1; - } - if (_file_remain_cnt <= 0 || - _cur_upload_file_cnt >= _max_concurrent_uploading_file_cnt) { - break; - } - } - } - if (_file_remain_cnt <= 0 && _cur_upload_file_cnt <= 0) { - upload_complete = true; - } - return upload_complete; -} - -void cold_backup_context::file_upload_uncomplete(const std::string &filename) -{ - zauto_lock l(_lock); - - dassert(_cur_upload_file_cnt >= 1, "cur_upload_file_cnt = %d", _cur_upload_file_cnt); - _cur_upload_file_cnt -= 1; - _file_remain_cnt += 1; - _file_status[filename] = file_status::FileUploadUncomplete; -} - -void cold_backup_context::file_upload_complete(const std::string &filename) -{ - zauto_lock l(_lock); - - dassert(_cur_upload_file_cnt >= 1, "cur_upload_file_cnt = %d", _cur_upload_file_cnt); - _cur_upload_file_cnt -= 1; - _file_status[filename] = file_status::FileUploadComplete; -} - bool partition_split_context::cleanup(bool force) { CLEANUP_TASK(async_learn_task, force) diff --git a/src/replica/replica_context.h b/src/replica/replica_context.h index d88f4a966b..354d510ecb 100644 --- a/src/replica/replica_context.h +++ b/src/replica/replica_context.h @@ -224,351 +224,6 @@ class potential_secondary_context ::dsn::task_ptr completion_notify_task; }; -// -// ColdBackupInvalid -// | -// V -// |<------ ColdBackupChecking ---------------------------------->| -// | | | -// | V | -// | ColdBackupChecked ----------------------------------->| -// | | | -// | V | -// ColdBackupCompleted <---| ColdBackupCheckpointing ----------------------------->| -// | | | | -// | | V |---> -// ColdBackupCanceled -// | | ColdBackupCheckpointed ------------------------------>| -// | | | | -// | | V | -// | |<------ ColdBackupUploading <======> ColdBackupPaused ------>| -// | | | | -// | |____________________________| | -// | | | -// | V | -// | ColdBackupFailed -------------------->| -// | | -// |---------------------------------------------------------------------------->| -// -enum cold_backup_status -{ - ColdBackupInvalid = 0, - ColdBackupChecking, - ColdBackupChecked, - ColdBackupCheckpointing, - ColdBackupCheckpointed, - ColdBackupUploading, - ColdBackupPaused, - ColdBackupCanceled, - ColdBackupCompleted, - ColdBackupFailed -}; -const char *cold_backup_status_to_string(cold_backup_status status); - -struct cold_backup_metadata -{ - int64_t checkpoint_decree; - int64_t checkpoint_timestamp; - std::vector files; - int64_t checkpoint_total_size; - DEFINE_JSON_SERIALIZATION(checkpoint_decree, checkpoint_timestamp, files, checkpoint_total_size) -}; - -// -// the process of uploading the checkpoint directory to block filesystem: -// 1, upload all the file of the checkpoint to block filesystem -// 2, write a cold_backup_metadata to block filesystem(which includes all the file's name, size -// and md5 and so on) -// 3, write a current_checkpoint file to block filesystem, which is used to mark which -// checkpoint is invalid -// - -// -// the process of check whether uploading is finished on block filesystem: -// 1, check whether the current checkpoint file exist, if exist continue, otherwise not finish -// 2, read the context of the current checkpoint file, the context of this file is the valid -// checkpoint dirname on block filesystem -// 3, verify whether the checkpoint dirname is exist, if exist uploading is already finished, -// otherwise uploading is not finished -// - -class cold_backup_context : public ref_counter -{ -public: - explicit cold_backup_context(replica *r_, - const backup_request &request_, - int max_upload_file_cnt) - : request(request_), - block_service(nullptr), - checkpoint_decree(0), - checkpoint_timestamp(0), - durable_decree_when_checkpoint(-1), - checkpoint_file_total_size(0), - _status(ColdBackupInvalid), - _progress(0), - _upload_file_size(0), - _have_check_upload_status(false), - _have_write_backup_metadata(false), - _upload_status(UploadInvalid), - _max_concurrent_uploading_file_cnt(max_upload_file_cnt), - _cur_upload_file_cnt(0), - _file_remain_cnt(0), - _owner_replica(r_), - _start_time_ms(0) - { - sprintf(name, - "backup{%d.%d.%s.%" PRId64 "}", - request.pid.get_app_id(), - request.pid.get_partition_index(), - request.policy.policy_name.c_str(), - request.backup_id); - memset(_reason, 0, sizeof(_reason)); - } - - ~cold_backup_context() {} - - // cancel backup. - // {*} --> ColdBackupCanceled - // - // Will be called in replication thread. - void cancel(); - - // start checking backup on remote. - // ColdBackupInvalid --> ColdBackupChecking - // Returns: - // - true if status is successfully changed to ColdBackupChecking. - bool start_check(); - - // ignore checking backup on remote and switch backward status. - // ColdBackupChecking --> ColdBackupInvalid - // Returns: - // - true if status is successfully changed to ColdBackupInvalid. - bool ignore_check() - { - int checking = ColdBackupChecking; - return _status.compare_exchange_strong(checking, ColdBackupInvalid); - } - - // mark failed when checking backup on remote. - // ColdBackupChecking --> ColdBackupFailed - // Returns: - // - true if status is successfully changed to ColdBackupFailed. - bool fail_check(const char *failure_reason); - - // complete checking backup on remote. - // ColdBackupChecking --> { ColdBackupChecked | ColdBackupCompleted } - // Returns: - // - true if status is successfully changed to ColdBackupChecked or ColdBackupCompleted. - bool complete_check(bool uploaded); - - // start generating checkpoint. - // ColdBackupChecked --> ColdBackupCheckpointing - // Returns: - // - true if status is successfully changed to ColdBackupCheckpointing. - bool start_checkpoint(); - - // ignore generating checkpoint and switch backward status. - // ColdBackupCheckpointing --> ColdBackupChecked - // Returns: - // - true if status is successfully changed to ColdBackupChecked. - bool ignore_checkpoint() - { - int checkpointing = ColdBackupCheckpointing; - return _status.compare_exchange_strong(checkpointing, ColdBackupChecked); - } - - // mark failed when generating checkpoint. - // ColdBackupCheckpointing --> ColdBackupFailed - // Returns: - // - true if status is successfully changed to ColdBackupFailed. - bool fail_checkpoint(const char *failure_reason); - - // complete generating checkpoint. - // ColdBackupCheckpointing --> ColdBackupCheckpointed - // Returns: - // - true if status is successfully changed to ColdBackupCheckpointed. - bool complete_checkpoint(); - - // start uploading checkpoint to remote. - // { ColdBackupCheckpointed | ColdBackupPaused } --> ColdBackupUploading - // - // Will be called in replication thread. - // Returns: - // - true if status is successfully changed to ColdBackupUploading. - bool start_upload() - { - int checkpointed = ColdBackupCheckpointed; - int paused = ColdBackupPaused; - return _status.compare_exchange_strong(checkpointed, ColdBackupUploading) || - _status.compare_exchange_strong(paused, ColdBackupUploading); - } - - // pause uploading checkpoint to remote. - // ColdBackupUploading --> ColdBackupPaused - // Returns: - // - true if status is successfully changed to ColdBackupPaused. - bool pause_upload(); - - // mark failed when uploading checkpoint to remote. - // { ColdBackupUploading | ColdBackupPaused } --> ColdBackupFailed - // Returns: - // - true if status is successfully changed to ColdBackupFailed. - bool fail_upload(const char *failure_reason); - - // complete uploading checkpoint to remote. - // { ColdBackupUploading | ColdBackupPaused } --> ColdBackupCompleted - // Returns: - // - true if status is successfully changed to ColdBackupCompleted. - bool complete_upload(); - - // update progress. - // Progress should be in range of [0, 1000]. - void update_progress(int progress) - { - dassert(progress >= 0 && progress <= cold_backup_constant::PROGRESS_FINISHED, - "invalid progress %d", - progress); - _progress.store(progress); - } - - // check if it is ready for checking. - bool is_ready_for_check() const { return _status.load() == ColdBackupChecking; } - - // check if it is ready for checkpointing. - bool is_checkpointing() const { return _status.load() == ColdBackupCheckpointing; } - - // check if it is ready for uploading. - bool is_ready_for_upload() const { return _status.load() == ColdBackupUploading; } - - // get current status. - cold_backup_status status() const { return (cold_backup_status)_status.load(); } - - // get current progress. - int progress() const { return _progress.load(); } - - // get failure reason. - const char *reason() const { return _reason; } - - // check if backup is aleady exist on remote. - // Preconditions: - // - name/request are set - // - checkpoint_dir/checkpoint_decree/checkpoint_files are not set - // - status is one of { ColdBackupChecking, ColdBackupCanceled } - // Will be called in background thread. - void check_backup_on_remote(); - - // upload backup checkpoint to remote. - // Preconditions: - // - name/request are set - // - checkpoint_dir/checkpoint_decree/checkpoint_files are set - // - status is one of { ColdBackupUploading, ColdBackupPaused, ColdBackupCanceled } - // Will be called in background thread. - void upload_checkpoint_to_remote(); - - uint64_t get_start_time_ms() { return _start_time_ms; } - - uint64_t get_upload_file_size() { return _upload_file_size.load(); } - - int64_t get_checkpoint_total_size() { return checkpoint_file_total_size; } - -private: - void read_current_chkpt_file(const dist::block_service::block_file_ptr &file_handle); - void remote_chkpt_dir_exist(const std::string &chkpt_dirname); - - void read_backup_metadata(const dist::block_service::block_file_ptr &file_handle); - // value is a json string, verify it's validity - // validity means uploading checkpoint directory complete, so just write_current_chkpt_file - // otherwise, upload checkpoint directory - void verify_backup_metadata(const blob &value); - // after upload_checkpoint_directory ---> write_backup_metadata --> write_current_chkpt_file --> - // notify meta - void write_backup_metadata(); - - void write_current_chkpt_file(const std::string &value); - // write value to file, if succeed then callback(true), else callback(false) - void on_write(const dist::block_service::block_file_ptr &file_handle, - const blob &value, - const std::function &callback); - void prepare_upload(); - void on_upload_chkpt_dir(); - void upload_file(const std::string &local_filename); - void on_upload(const dist::block_service::block_file_ptr &file_handle, - const std::string &full_path_local_file); - void on_upload_file_complete(const std::string &local_filename); - - // functions access the structure protected by _lock - // return: - // -- true, uploading is complete - // -- false, uploading is not complete; and put uncomplete file into 'files' - bool upload_complete_or_fetch_uncomplete_files(std::vector &files); - void file_upload_uncomplete(const std::string &filename); - void file_upload_complete(const std::string &filename); - -public: - /// the following variables are public, and will only be set once, and will not be changed once - /// set. - char name[256]; // backup{...} - // all logging should print the name - backup_request request; - dist::block_service::block_filesystem *block_service; - std::string backup_root; - decree checkpoint_decree; - int64_t checkpoint_timestamp; - decree durable_decree_when_checkpoint; - std::string checkpoint_dir; - std::vector checkpoint_files; - std::vector checkpoint_file_sizes; - int64_t checkpoint_file_total_size; - -private: - friend class ::replication_service_test_app; - - /// state variables - std::atomic_int _status; - std::atomic_int _progress; // [0,1000], 1000 means completed - char _reason[1024]; // failure reason - - std::atomic_llong _upload_file_size; - // TODO: if chechpoint directory has many files, cold_backup_metadata may - // occupy large amount of memory - // for example, if a single file occupy 32B, then 1,000,000 files may occupy 32MB - cold_backup_metadata _metadata; - - enum upload_status - { - UploadInvalid = 0, - UploadUncomplete, - UploadComplete - }; - enum file_status - { - FileUploadUncomplete = 0, - FileUploading, - FileUploadComplete - }; - - // two atomic variants is to ensure check_upload_status and write_backup_metadata just be - // executed once - std::atomic_bool _have_check_upload_status; - std::atomic_bool _have_write_backup_metadata; - - std::atomic_int _upload_status; - - int32_t _max_concurrent_uploading_file_cnt; - // filename -> - std::map> _file_infos; - - zlock _lock; // lock the structure below - std::map _file_status; - int32_t _cur_upload_file_cnt; - int32_t _file_remain_cnt; - - replica *_owner_replica; - uint64_t _start_time_ms; -}; - -typedef dsn::ref_ptr cold_backup_context_ptr; - class partition_split_context { public: diff --git a/src/replica/replica_restore.cpp b/src/replica/replica_restore.cpp index 29a72e8aca..4bc3a86659 100644 --- a/src/replica/replica_restore.cpp +++ b/src/replica/replica_restore.cpp @@ -13,6 +13,7 @@ #include "mutation_log.h" #include "replica_stub.h" #include "block_service/block_service_manager.h" +#include "backup/cold_backup_context.h" using namespace dsn::dist::block_service; diff --git a/src/replica/test/cold_backup_context_test.cpp b/src/replica/test/cold_backup_context_test.cpp index 46e63f435a..4c105fee09 100644 --- a/src/replica/test/cold_backup_context_test.cpp +++ b/src/replica/test/cold_backup_context_test.cpp @@ -1,5 +1,6 @@ #include #include "backup_block_service_mock.h" +#include "replica/backup/cold_backup_context.h" ref_ptr current_chkpt_file = new block_file_mock("", 0, ""); ref_ptr backup_metadata_file = new block_file_mock("", 0, "");