Skip to content

Commit

Permalink
feat(one-time backup): part-2 backup app metadata and send rpc to par…
Browse files Browse the repository at this point in the history
…tition primary (#775)
  • Loading branch information
zhangyifan27 authored Mar 16, 2021
1 parent 4a9a50c commit a6b1c6c
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/common/replication_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ typedef rpc_holder<register_child_request, register_child_response> register_chi
typedef rpc_holder<notify_stop_split_request, notify_stop_split_response> notify_stop_split_rpc;
typedef rpc_holder<query_child_state_request, query_child_state_response> query_child_state_rpc;

typedef rpc_holder<backup_request, backup_response> backup_rpc;

class replication_options
{
public:
Expand Down
168 changes: 148 additions & 20 deletions src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <dsn/dist/fmt_logging.h>

#include "common/backup_utils.h"
#include "common/replication_common.h"
#include "server_state.h"

namespace dsn {
Expand All @@ -30,33 +31,29 @@ backup_engine::backup_engine(backup_service *service)

backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); }

error_code backup_engine::get_app_stat(int32_t app_id, std::shared_ptr<app_state> &app)
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
app = _backup_service->get_state()->get_app(app_id);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("app {} is not available, couldn't do backup now.", app_id);
return ERR_INVALID_STATE;
}
return ERR_OK;
}

error_code backup_engine::init_backup(int32_t app_id)
{
std::shared_ptr<app_state> app;
error_code err = get_app_stat(app_id, app);
if (err != ERR_OK) {
return err;
std::string app_name;
int partition_count;
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
std::shared_ptr<app_state> app = _backup_service->get_state()->get_app(app_id);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("app {} is not available, couldn't do backup now.", app_id);
return ERR_INVALID_STATE;
}
app_name = app->app_name;
partition_count = app->partition_count;
}

zauto_lock lock(_lock);
_backup_status.clear();
for (int i = 0; i < app->partition_count; ++i) {
for (int i = 0; i < partition_count; ++i) {
_backup_status.emplace(i, backup_status::UNALIVE);
}
_cur_backup.app_id = app_id;
_cur_backup.app_name = app->app_name;
_cur_backup.app_name = app_name;
_cur_backup.backup_id = static_cast<int64_t>(dsn_now_ms());
_cur_backup.start_time_ms = _cur_backup.backup_id;
return ERR_OK;
Expand All @@ -74,9 +71,140 @@ error_code backup_engine::set_block_service(const std::string &provider)
return ERR_OK;
}

error_code backup_engine::start() { return ERR_OK; }
error_code backup_engine::write_backup_file(const std::string &file_name,
const dsn::blob &write_buffer)
{
dist::block_service::create_file_request create_file_req;
create_file_req.ignore_metadata = true;
create_file_req.file_name = file_name;

dsn::error_code err;
dist::block_service::block_file_ptr remote_file;
_block_service
->create_file(create_file_req,
TASK_CODE_EXEC_INLINED,
[&err, &remote_file](const dist::block_service::create_file_response &resp) {
err = resp.err;
remote_file = resp.file_handle;
})
->wait();
if (err != dsn::ERR_OK) {
ddebug_f("create file {} failed", file_name);
return err;
}
dassert_f(remote_file != nullptr,
"create file {} succeed, but can't get handle",
create_file_req.file_name);
remote_file
->write(dist::block_service::write_request{write_buffer},
TASK_CODE_EXEC_INLINED,
[&err](const dist::block_service::write_response &resp) { err = resp.err; })
->wait();
return err;
}

error_code backup_engine::backup_app_meta()
{
dsn::blob app_info_buffer;
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
std::shared_ptr<app_state> app = _backup_service->get_state()->get_app(_cur_backup.app_id);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("app {} is not available, couldn't do backup now.", _cur_backup.app_id);
return ERR_INVALID_STATE;
}
app_state tmp = *app;
// Because we don't restore app envs, so no need to write app envs to backup file.
tmp.envs.clear();
app_info_buffer = dsn::json::json_forwarder<app_info>::encode(tmp);
}

std::string file_name = cold_backup::get_app_metadata_file(_backup_service->backup_root(),
_cur_backup.app_name,
_cur_backup.app_id,
_cur_backup.backup_id);
return write_backup_file(file_name, app_info_buffer);
}

void backup_engine::backup_app_partition(const gpid &pid)
{
dsn::rpc_address partition_primary;
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
std::shared_ptr<app_state> app = _backup_service->get_state()->get_app(pid.get_app_id());
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("app {} is not available, couldn't do backup now.", pid.get_app_id());

zauto_lock lock(_lock);
is_backup_failed = true;
return;
}
partition_primary = app->partitions[pid.get_partition_index()].primary;
}

if (partition_primary.is_invalid()) {
dwarn_f("backup_id({}): partition {} doesn't have a primary now, retry to backup it later.",
_cur_backup.backup_id,
pid.to_string());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
&_tracker,
[this, pid]() { backup_app_partition(pid); },
0,
std::chrono::seconds(10));
return;
}

auto req = std::make_unique<backup_request>();
req->pid = pid;
policy_info backup_policy_info;
backup_policy_info.__set_backup_provider_type(_provider_type);
backup_policy_info.__set_policy_name(get_policy_name());
req->policy = backup_policy_info;
req->backup_id = _cur_backup.backup_id;
req->app_name = _cur_backup.app_name;

ddebug_f("backup_id({}): send backup request to partition {}, target_addr = {}",
_cur_backup.backup_id,
pid.to_string(),
partition_primary.to_string());
backup_rpc rpc(std::move(req), RPC_COLD_BACKUP, 10000_ms, 0, pid.thread_hash());
rpc.call(
partition_primary, &_tracker, [this, rpc, pid, partition_primary](error_code err) mutable {
on_backup_reply(err, rpc.response(), pid, partition_primary);
});

zauto_lock l(_lock);
_backup_status[pid.get_partition_index()] = backup_status::ALIVE;
}

void backup_engine::on_backup_reply(error_code err,
const backup_response &response,
gpid pid,
const rpc_address &primary)
{
}

error_code backup_engine::start()
{
error_code err = backup_app_meta();
if (err != ERR_OK) {
derror_f("backup_id({}): backup meta data for app {} failed, error {}",
_cur_backup.backup_id,
_cur_backup.app_id,
err.to_string());
return err;
}
for (int i = 0; i < _backup_status.size(); ++i) {
tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, i]() {
backup_app_partition(gpid(_cur_backup.app_id, i));
});
}
return ERR_OK;
}

bool backup_engine::is_backing_up()
bool backup_engine::is_backing_up() const
{
zauto_lock l(_lock);
return _cur_backup.end_time_ms == 0 && !is_backup_failed;
Expand Down
17 changes: 14 additions & 3 deletions src/meta/backup_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,29 @@ class backup_engine

int64_t get_current_backup_id() const { return _cur_backup.backup_id; }
int32_t get_backup_app_id() const { return _cur_backup.app_id; }
bool is_backing_up();
bool is_backing_up() const;

private:
error_code get_app_stat(int32_t app_id, std::shared_ptr<app_state> &app);
error_code write_backup_file(const std::string &file_name, const dsn::blob &write_buffer);
error_code backup_app_meta();
void backup_app_partition(const gpid &pid);
void on_backup_reply(error_code err,
const backup_response &response,
gpid pid,
const rpc_address &primary);

const std::string get_policy_name() const
{
return "fake_policy_" + std::to_string(_cur_backup.backup_id);
}

backup_service *_backup_service;
dist::block_service::block_filesystem *_block_service;
std::string _provider_type;
dsn::task_tracker _tracker;

// lock the following variables.
dsn::zlock _lock;
mutable dsn::zlock _lock;
bool is_backup_failed;
app_backup_info _cur_backup;
// partition_id -> backup_status
Expand Down
8 changes: 8 additions & 0 deletions src/meta/meta_backup_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,14 @@ void backup_service::start_backup_app(start_backup_app_rpc rpc)
if (err == ERR_OK) {
int64_t backup_id = engine->get_current_backup_id();
_backup_states.emplace(backup_id, std::move(engine));
response.hint_message =
fmt::format("Backup id {} : metadata of app {} has been successfully backed up and "
"backup request has been sent to replica servers.",
backup_id,
app_id);
} else {
response.hint_message =
fmt::format("Backup failed: could not backup metadata for app {}.", app_id);
}
response.err = err;
}
Expand Down
4 changes: 2 additions & 2 deletions src/replica/backup/replica_backup_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
#include <dsn/dist/replication/replication_types.h>
#include <dsn/cpp/rpc_holder.h>

#include "common/replication_common.h"

namespace dsn {
namespace replication {

class replica_stub;

typedef rpc_holder<backup_request, backup_response> backup_rpc;

// A server distributes the cold-backup task to the targeted replica.
class replica_backup_server
{
Expand Down

0 comments on commit a6b1c6c

Please sign in to comment.