Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat(one-time backup): part-1 meta server handle rpc and init backup #772

Merged
merged 4 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_NOTIFY_STOP_SPLIT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

#define CURRENT_THREAD_POOL THREAD_POOL_META_STATE
Expand Down
16 changes: 16 additions & 0 deletions src/common/backup.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,19 @@ struct configuration_query_restore_response
2:list<dsn.error_code> restore_status;
3:list<i32> restore_progress;
}

struct start_backup_app_request
{
1:string backup_provider_type;
2:i32 app_id;
}

struct start_backup_app_response
{
// Possible error:
// - ERR_INVALID_STATE: app is not available or is backing up
// - ERR_INVALID_PARAMETERS: backup provider type is invalid
// - ERR_SERVICE_NOT_ACTIVE: meta doesn't enable backup service
1:dsn.error_code err;
zhangyifan27 marked this conversation as resolved.
Show resolved Hide resolved
2:string hint_message;
}
86 changes: 86 additions & 0 deletions src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/dist/fmt_logging.h>

#include "common/backup_utils.h"
#include "server_state.h"

namespace dsn {
namespace replication {

backup_engine::backup_engine(backup_service *service)
: _backup_service(service), _block_service(nullptr), is_backup_failed(false)
{
}

backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); }

error_code backup_engine::get_app_stat(int32_t app_id, std::shared_ptr<app_state> &app)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"stat" and "state" are different. The former is usually the abbr of "statistics."

Suggested change
error_code backup_engine::get_app_stat(int32_t app_id, std::shared_ptr<app_state> &app)
error_code backup_engine::get_app_state(int32_t app_id, /*out*/ std::shared_ptr<app_state> &app)

{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
app = _backup_service->get_state()->get_app(app_id);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("app {} is not available, couldn't do backup now.", app_id);
return ERR_INVALID_STATE;
}
return ERR_OK;
}

error_code backup_engine::init_backup(int32_t app_id)
{
std::shared_ptr<app_state> app;
error_code err = get_app_stat(app_id, app);
if (err != ERR_OK) {
return err;
}

zauto_lock lock(_lock);
_backup_status.clear();
for (int i = 0; i < app->partition_count; ++i) {
_backup_status.emplace(i, backup_status::UNALIVE);
}
_cur_backup.app_id = app_id;
_cur_backup.app_name = app->app_name;
_cur_backup.backup_id = static_cast<int64_t>(dsn_now_ms());
_cur_backup.start_time_ms = _cur_backup.backup_id;
return ERR_OK;
}

error_code backup_engine::set_block_service(const std::string &provider)
{
_provider_type = provider;
_block_service = _backup_service->get_meta_service()
->get_block_service_manager()
.get_or_create_block_filesystem(provider);
if (_block_service == nullptr) {
return ERR_INVALID_PARAMETERS;
}
return ERR_OK;
}

error_code backup_engine::start() { return ERR_OK; }

bool backup_engine::is_backing_up()
{
zauto_lock l(_lock);
return _cur_backup.end_time_ms == 0 && !is_backup_failed;
}

} // namespace replication
} // namespace dsn
82 changes: 82 additions & 0 deletions src/meta/backup_engine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/cpp/json_helper.h>
#include <dsn/dist/block_service.h>
#include <dsn/tool-api/zlocks.h>

namespace dsn {
namespace replication {

enum backup_status
{
UNALIVE = 1,
ALIVE = 2,
COMPLETED = 3,
FAILED = 4
};

struct app_backup_info
{
int64_t backup_id;
int64_t start_time_ms;
int64_t end_time_ms;

int32_t app_id;
std::string app_name;

app_backup_info() : backup_id(0), start_time_ms(0), end_time_ms(0) {}

DEFINE_JSON_SERIALIZATION(backup_id, start_time_ms, end_time_ms, app_id, app_name)
zhangyifan27 marked this conversation as resolved.
Show resolved Hide resolved
};

class app_state;
class backup_service;

class backup_engine
{
public:
backup_engine(backup_service *service);
~backup_engine();

error_code init_backup(int32_t app_id);
error_code set_block_service(const std::string &provider);

error_code start();

int64_t get_current_backup_id() const { return _cur_backup.backup_id; }
int32_t get_backup_app_id() const { return _cur_backup.app_id; }
bool is_backing_up();

private:
error_code get_app_stat(int32_t app_id, std::shared_ptr<app_state> &app);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
error_code get_app_stat(int32_t app_id, std::shared_ptr<app_state> &app);
error_code get_app_stat(int32_t app_id, /*out*/ std::shared_ptr<app_state> &app);


backup_service *_backup_service;
dist::block_service::block_filesystem *_block_service;
std::string _provider_type;
dsn::task_tracker _tracker;

// lock the following variables.
dsn::zlock _lock;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can declare _lock a mutable so that you can let is_backing_up const. Making locks mutable is a convention in C++.

    bool is_backing_up() const;
Suggested change
dsn::zlock _lock;
mutable dsn::zlock _lock;

bool is_backup_failed;
app_backup_info _cur_backup;
// partition_id -> backup_status
std::map<int32_t, backup_status> _backup_status;
};

} // namespace replication
} // namespace dsn
40 changes: 40 additions & 0 deletions src/meta/meta_backup_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1569,5 +1569,45 @@ std::string backup_service::get_backup_path(const std::string &policy_name, int6
ss << _policy_meta_root << "/" << policy_name << "/" << backup_id;
return ss.str();
}

void backup_service::start_backup_app(start_backup_app_rpc rpc)
{
const start_backup_app_request &request = rpc.request();
start_backup_app_response &response = rpc.response();

int32_t app_id = request.app_id;
std::shared_ptr<backup_engine> engine = std::make_shared<backup_engine>(this);
zhangyifan27 marked this conversation as resolved.
Show resolved Hide resolved
error_code err = engine->init_backup(app_id);
if (err != ERR_OK) {
response.err = err;
response.hint_message = fmt::format("Backup failed: invalid app id {}.", app_id);
return;
}

err = engine->set_block_service(request.backup_provider_type);
if (err != ERR_OK) {
response.err = err;
response.hint_message = fmt::format("Backup failed: invalid backup_provider_type {}.",
request.backup_provider_type);
return;
}

for (const auto &it : _backup_states) {
const auto &tmp_engine = it.second;
if (app_id == tmp_engine->get_backup_app_id() && tmp_engine->is_backing_up()) {
response.err = ERR_INVALID_STATE;
response.hint_message = fmt::format("Backup failed: app {} is backing up now.", app_id);
return;
}
}

err = engine->run();
if (err == ERR_OK) {
int64_t backup_id = engine->get_current_backup_id();
_backup_states.emplace(backup_id, std::move(engine));
}
response.err = err;
}

} // namespace replication
} // namespace dsn
7 changes: 6 additions & 1 deletion src/meta/meta_backup_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <gtest/gtest_prod.h>

#include "backup_engine.h"
#include "meta_data.h"

namespace dsn {
Expand All @@ -42,6 +43,7 @@ typedef rpc_holder<configuration_query_backup_policy_request,
typedef rpc_holder<configuration_modify_backup_policy_request,
configuration_modify_backup_policy_response>
configuration_modify_backup_policy_rpc;
typedef rpc_holder<start_backup_app_request, start_backup_app_response> start_backup_app_rpc;

struct backup_info_status
{
Expand Down Expand Up @@ -339,6 +341,7 @@ class backup_service
void add_backup_policy(dsn::message_ex* msg);
void query_backup_policy(query_backup_policy_rpc rpc);
void modify_backup_policy(configuration_modify_backup_policy_rpc rpc);
void start_backup_app(start_backup_app_rpc rpc);

// compose the absolute path(AP) for policy
// input:
Expand Down Expand Up @@ -375,10 +378,12 @@ class backup_service
meta_service *_meta_svc;
server_state *_state;

// _lock is only used to lock _policy_states
// lock _policy_states and _backup_states.
zlock _lock;
std::map<std::string, std::shared_ptr<policy_context>>
_policy_states; // policy_name -> policy_context
// backup_id -> backup_engine
std::unordered_map<int32_t, std::shared_ptr<backup_engine>> _backup_states;

// the root of policy metas, stored on remote_storage(zookeeper)
std::string _policy_meta_root;
Expand Down
15 changes: 15 additions & 0 deletions src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ void meta_service::register_rpc_handlers()
register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_BULK_LOAD_STATUS,
"query_bulk_load_status",
&meta_service::on_query_bulk_load_status);
register_rpc_handler_with_rpc_holder(
RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app);
}

int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address)
Expand Down Expand Up @@ -1123,5 +1125,18 @@ void meta_service::on_query_bulk_load_status(query_bulk_load_rpc rpc)
_bulk_load_svc->on_query_bulk_load_status(std::move(rpc));
}

void meta_service::on_start_backup_app(start_backup_app_rpc rpc)
{
if (!check_status(rpc)) {
return;
}
if (_backup_handler == nullptr) {
derror_f("meta doesn't enable backup service");
rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
return;
}
_backup_handler->start_backup_app(std::move(rpc));
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ class meta_service : public serverlet<meta_service>
// meta control
void on_control_meta_level(configuration_meta_control_rpc rpc);
void on_start_recovery(configuration_recovery_rpc rpc);

// backup/restore
void on_start_backup_app(start_backup_app_rpc rpc);
void on_start_restore(dsn::message_ex *req);
void on_add_backup_policy(dsn::message_ex *req);
void on_query_backup_policy(query_backup_policy_rpc policy_rpc);
Expand Down