Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
move backup_engine to separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyifan27 committed Mar 4, 2021
1 parent 7f450c7 commit fa7eab5
Show file tree
Hide file tree
Showing 8 changed files with 401 additions and 341 deletions.
2 changes: 1 addition & 1 deletion include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_NOTIFY_STOP_SPLIT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APPS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

#define CURRENT_THREAD_POOL THREAD_POOL_META_STATE
Expand Down
8 changes: 4 additions & 4 deletions src/common/backup.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ struct configuration_query_restore_response

struct start_backup_apps_request
{
1:string backup_provider_type;
2:set<i32> app_ids;
1:string backup_provider_type;
2:i32 app_id;
}

struct start_backup_apps_response
{
1:dsn.error_code err;
2:string hint_message;
1:dsn.error_code err;
2:string hint_message;
}
278 changes: 278 additions & 0 deletions src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/dist/fmt_logging.h>

#include "common/backup_utils.h"
#include "server_state.h"

namespace dsn {
namespace replication {

backup_engine::backup_engine(backup_service *service)
: _backup_service(service), _block_service(nullptr)
{
}

backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); }

error_code backup_engine::init_backup(int32_t app_id)
{
zauto_lock lock(_lock);
_backup_status.clear();

std::shared_ptr<app_state> app;
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
app = _backup_service->get_state()->get_app(app_id);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("app {} is not available, couldn't do backup now.", app_id);
return ERR_INVALID_STATE;
}
}
for (int i = 0; i < app->partition_count; ++i) {
_backup_status.emplace(i, backup_status::UNALIVE);
}

_cur_backup.backup_id = static_cast<int64_t>(dsn_now_ms());
_cur_backup.start_time_ms = _cur_backup.backup_id;
return ERR_OK;
}

error_code backup_engine::set_block_service(const std::string &provider)
{
_provider_type = provider;
_block_service = _backup_service->get_meta_service()
->get_block_service_manager()
.get_or_create_block_filesystem(provider);
if (_block_service == nullptr) {
return ERR_INVALID_PARAMETERS;
}
return ERR_OK;
}

error_code backup_engine::write_backup_file(const std::string &file_name,
const dsn::blob &write_buffer)
{
dist::block_service::create_file_request create_file_req;
create_file_req.ignore_metadata = true;
create_file_req.file_name = file_name;

dsn::error_code err;
dist::block_service::block_file_ptr remote_file;
_block_service
->create_file(create_file_req,
TASK_CODE_EXEC_INLINED,
[&err, &remote_file](const dist::block_service::create_file_response &resp) {
err = resp.err;
remote_file = resp.file_handle;
})
->wait();
if (err != dsn::ERR_OK) {
ddebug_f("create file {} failed", file_name);
return err;
}
dassert_f(remote_file != nullptr,
"create file {} succeed, but can't get handle",
create_file_req.file_name);

remote_file
->write(dist::block_service::write_request{write_buffer},
TASK_CODE_EXEC_INLINED,
[&err](const dist::block_service::write_response &resp) { err = resp.err; })
->wait();
return err;
}

error_code backup_engine::backup_app_meta()
{
dsn::blob buffer;
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
const std::shared_ptr<app_state> &app =
_backup_service->get_state()->get_app(_cur_backup.app_id);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
return ERR_INVALID_STATE;
}
// do not write app envs to backup file
app_state tmp = *app;
if (!tmp.envs.empty()) {
tmp.envs.clear();
}
buffer = dsn::json::json_forwarder<app_info>::encode(tmp);
}
std::string file_name = cold_backup::get_app_metadata_file(_backup_service->backup_root(),
_cur_backup.app_name,
_cur_backup.app_id,
_cur_backup.backup_id);
return write_backup_file(file_name, buffer);
}

void backup_engine::backup_app_partition(const gpid &pid)
{
dsn::rpc_address partition_primary;
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
const std::shared_ptr<app_state> &app =
_backup_service->get_state()->get_app(pid.get_app_id());

if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("backup_id({}): app {} is unavailable, could not do backups for it now.",
_cur_backup.backup_id,
pid.get_app_id());
return;
}
partition_primary = app->partitions[pid.get_partition_index()].primary;
}

if (partition_primary.is_invalid()) {
dwarn_f("backup_id({}): partition {} doesn't have a primary now, retry to backup it later.",
_cur_backup.backup_id,
pid.to_string());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
&_tracker,
[this, pid]() { backup_app_partition(pid); },
0,
std::chrono::seconds(10));
return;
}

backup_request req;
req.pid = pid;
policy_info backup_policy_info;
backup_policy_info.__set_backup_provider_type(_provider_type);
backup_policy_info.__set_policy_name(get_policy_name());
req.policy = backup_policy_info;
req.backup_id = _cur_backup.backup_id;
req.app_name = _cur_backup.app_name;
dsn::message_ex *request =
dsn::message_ex::create_request(RPC_COLD_BACKUP, 0, pid.thread_hash());
dsn::marshall(request, req);
dsn::rpc_response_task_ptr rpc_callback = rpc::create_rpc_response_task(
request,
&_tracker,
[this, pid, partition_primary](error_code err, backup_response &&response) {
on_backup_reply(err, std::move(response), pid, partition_primary);
});
ddebug_f("backup_id({}): send backup request to partition {}, target_addr = {}",
_cur_backup.backup_id,
pid.to_string(),
partition_primary.to_string());
_backup_service->get_meta_service()->send_request(request, partition_primary, rpc_callback);

zauto_lock l(_lock);
_backup_status[pid.get_partition_index()] = backup_status::ALIVE;
}

error_code backup_engine::run()
{
error_code err = backup_app_meta();
if (err != ERR_OK) {
derror_f("backup_id({}): backup meta data for app {} failed, error {}",
_cur_backup.backup_id,
_cur_backup.app_id,
err.to_string());
return err;
}
for (int i = 0; i < _backup_status.size(); ++i) {
tasking::enqueue(LPC_DEFAULT_CALLBACK, &_tracker, [this, i]() {
backup_app_partition(gpid(_cur_backup.app_id, i));
});
}
return ERR_OK;
}

void backup_engine::on_backup_reply(error_code err,
backup_response &&response,
gpid pid,
const rpc_address &primary)
{
ddebug_f("backup_id({}): receive backup response for partition {} from server {}, rpc error "
"{}, response error {}.",
_cur_backup.backup_id,
pid.to_string(),
primary.to_string(),
err.to_string(),
response.err.to_string());
dassert_f(response.pid == pid,
"backup partition id {} vs {} don't match",
response.pid.to_string(),
pid.to_string());
dassert_f(response.backup_id == _cur_backup.backup_id,
"backup id {} vs {} don't match",
response.backup_id,
_cur_backup.backup_id);

// if backup completed, receive ERR_OK;
// if backup failed, receive ERR_LOCAL_APP_FAILURE;
// receive ERR_BUSY or ERR_INVALID_STATE in other cases.
// see replica::on_cold_backup() for details.
if (_cur_backup.is_backup_failed) {
return;
}
int32_t partition = pid.get_partition_index();
if (err == dsn::ERR_OK && response.err == dsn::ERR_OK &&
response.progress == cold_backup_constant::PROGRESS_FINISHED) {
{
zauto_lock l(_lock);
_backup_status[partition] = backup_status::COMPLETED;
}
complete_current_backup();
return;
}
if (response.err == ERR_LOCAL_APP_FAILURE) {
derror_f("backup_id({}): backup for partition {} failed.",
_cur_backup.backup_id,
pid.to_string());
zauto_lock l(_lock);
_cur_backup.is_backup_failed = true;
_backup_status[partition] = backup_status::FAILED;
return;
}

ddebug_f("backup_id({}): retry to send backup request for partition {}.",
_cur_backup.backup_id,
pid.to_string());
tasking::enqueue(LPC_DEFAULT_CALLBACK,
&_tracker,
[this, pid]() { backup_app_partition(pid); },
0,
std::chrono::seconds(1));
}

void backup_engine::complete_current_backup()
{
zauto_lock l(_lock);
for (int i = 0; i < _backup_status.size(); ++i) {
if (_backup_status[i] != backup_status::COMPLETED) {
return;
}
}

_cur_backup.end_time_ms = dsn_now_ms();
std::string file_name =
cold_backup::get_backup_info_file(_backup_service->backup_root(), _cur_backup.backup_id);
blob buf = dsn::json::json_forwarder<app_backup_info>::encode(_cur_backup);
// TODO(zhangyifan): handle errors if writing backup info failed.
write_backup_file(file_name, buf);
}

} // namespace replication
} // namespace dsn
93 changes: 93 additions & 0 deletions src/meta/backup_engine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/cpp/json_helper.h>
#include <dsn/dist/block_service.h>
#include <dsn/tool-api/zlocks.h>

namespace dsn {
namespace replication {

enum backup_status
{
UNALIVE = 1,
ALIVE = 2,
COMPLETED = 3,
FAILED = 4
};

struct app_backup_info
{
int64_t backup_id;
int64_t start_time_ms;
int64_t end_time_ms;

bool is_backup_failed;

int32_t app_id;
std::string app_name;

app_backup_info() : backup_id(0), start_time_ms(0), end_time_ms(0), is_backup_failed(false) {}

DEFINE_JSON_SERIALIZATION(backup_id, start_time_ms, end_time_ms, app_id, app_name)
};

class backup_service;

class backup_engine
{
public:
backup_engine(backup_service *service);
~backup_engine();

error_code init_backup(int32_t app_id);
error_code set_block_service(const std::string &provider);

error_code write_backup_file(const std::string &file_name, const dsn::blob &write_buffer);
error_code backup_app_meta();
void backup_app_partition(const gpid &pid);
void on_backup_reply(dsn::error_code err,
backup_response &&response,
gpid pid,
const rpc_address &primary);

error_code run();

// if all partitions have been backed up, write backup_info file.
void complete_current_backup();

int64_t get_current_backup_id() { return _cur_backup.backup_id; }
int32_t get_backup_app_id() { return _cur_backup.app_id; }
bool is_backing_up() { return _cur_backup.end_time_ms != 0 && !_cur_backup.is_backup_failed; }

private:
std::string get_policy_name() { return "fake_policy_" + std::to_string(_cur_backup.backup_id); }

backup_service *_backup_service;
dist::block_service::block_filesystem *_block_service;
std::string _provider_type;
dsn::task_tracker _tracker;

// lock _cur_backup and _backup_status.
dsn::zlock _lock;
app_backup_info _cur_backup;
// partition_id -> backup_status
std::map<int32_t, backup_status> _backup_status;
};

} // namespace replication
} // namespace dsn
Loading

0 comments on commit fa7eab5

Please sign in to comment.