Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat(one-time backup): part-1 meta server handle rpc and init backup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangyifan27 committed Mar 5, 2021
1 parent 6528eb6 commit d03d910
Show file tree
Hide file tree
Showing 8 changed files with 246 additions and 1 deletion.
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ MAKE_EVENT_CODE_RPC(RPC_CM_NOTIFY_STOP_SPLIT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_CONTROL_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_QUERY_BULK_LOAD_STATUS, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_CM_START_BACKUP_APP, TASK_PRIORITY_COMMON)
#undef CURRENT_THREAD_POOL

#define CURRENT_THREAD_POOL THREAD_POOL_META_STATE
Expand Down
12 changes: 12 additions & 0 deletions src/common/backup.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,15 @@ struct configuration_query_restore_response
2:list<dsn.error_code> restore_status;
3:list<i32> restore_progress;
}

struct start_backup_app_request
{
1:string backup_provider_type;
2:i32 app_id;
}

struct start_backup_app_response
{
1:dsn.error_code err;
2:string hint_message;
}
86 changes: 86 additions & 0 deletions src/meta/backup_engine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/dist/fmt_logging.h>

#include "common/backup_utils.h"
#include "server_state.h"

namespace dsn {
namespace replication {

backup_engine::backup_engine(backup_service *service)
: _backup_service(service), _block_service(nullptr)
{
}

backup_engine::~backup_engine() { _tracker.cancel_outstanding_tasks(); }

error_code backup_engine::get_app_stat(int32_t app_id, std::shared_ptr<app_state> &app)
{
zauto_read_lock l;
_backup_service->get_state()->lock_read(l);
app = _backup_service->get_state()->get_app(app_id);
if (app == nullptr || app->status != app_status::AS_AVAILABLE) {
derror_f("app {} is not available, couldn't do backup now.", app_id);
return ERR_INVALID_STATE;
}
return ERR_OK;
}

error_code backup_engine::init_backup(int32_t app_id)
{
std::shared_ptr<app_state> app;
error_code err = get_app_stat(app_id, app);
if (err != ERR_OK) {
return err;
}

zauto_lock lock(_lock);
_backup_status.clear();
for (int i = 0; i < app->partition_count; ++i) {
_backup_status.emplace(i, backup_status::UNALIVE);
}
_cur_backup.app_id = app_id;
_cur_backup.app_name = app->app_name;
_cur_backup.backup_id = static_cast<int64_t>(dsn_now_ms());
_cur_backup.start_time_ms = _cur_backup.backup_id;
return ERR_OK;
}

error_code backup_engine::set_block_service(const std::string &provider)
{
_provider_type = provider;
_block_service = _backup_service->get_meta_service()
->get_block_service_manager()
.get_or_create_block_filesystem(provider);
if (_block_service == nullptr) {
return ERR_INVALID_PARAMETERS;
}
return ERR_OK;
}

error_code backup_engine::run() { return ERR_OK; }

bool backup_engine::is_backing_up()
{
zauto_lock l(_lock);
return _cur_backup.end_time_ms == 0 && !_cur_backup.is_backup_failed;
}

} // namespace replication
} // namespace dsn
83 changes: 83 additions & 0 deletions src/meta/backup_engine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <dsn/cpp/json_helper.h>
#include <dsn/dist/block_service.h>
#include <dsn/tool-api/zlocks.h>

namespace dsn {
namespace replication {

enum backup_status
{
UNALIVE = 1,
ALIVE = 2,
COMPLETED = 3,
FAILED = 4
};

struct app_backup_info
{
int64_t backup_id;
int64_t start_time_ms;
int64_t end_time_ms;

bool is_backup_failed;

int32_t app_id;
std::string app_name;

app_backup_info() : backup_id(0), start_time_ms(0), end_time_ms(0), is_backup_failed(false) {}

DEFINE_JSON_SERIALIZATION(backup_id, start_time_ms, end_time_ms, app_id, app_name)
};

class app_state;
class backup_service;

class backup_engine
{
public:
backup_engine(backup_service *service);
~backup_engine();

error_code init_backup(int32_t app_id);
error_code set_block_service(const std::string &provider);

error_code run();

int64_t get_current_backup_id() { return _cur_backup.backup_id; }
int32_t get_backup_app_id() { return _cur_backup.app_id; }
bool is_backing_up();

private:
error_code get_app_stat(int32_t app_id, std::shared_ptr<app_state> &app);

backup_service *_backup_service;
dist::block_service::block_filesystem *_block_service;
std::string _provider_type;
dsn::task_tracker _tracker;

// lock _cur_backup and _backup_status.
dsn::zlock _lock;
app_backup_info _cur_backup;
// partition_id -> backup_status
std::map<int32_t, backup_status> _backup_status;
};

} // namespace replication
} // namespace dsn
40 changes: 40 additions & 0 deletions src/meta/meta_backup_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1569,5 +1569,45 @@ std::string backup_service::get_backup_path(const std::string &policy_name, int6
ss << _policy_meta_root << "/" << policy_name << "/" << backup_id;
return ss.str();
}

void backup_service::start_backup_app(start_backup_app_rpc rpc)
{
const start_backup_app_request &request = rpc.request();
start_backup_app_response &response = rpc.response();

int32_t app_id = request.app_id;
std::shared_ptr<backup_engine> engine = std::make_shared<backup_engine>(this);
error_code err = engine->init_backup(app_id);
if (err != ERR_OK) {
response.err = err;
response.hint_message = fmt::format("Backup failed: invalid app id {}.", app_id);
return;
}

err = engine->set_block_service(request.backup_provider_type);
if (err != ERR_OK) {
response.err = err;
response.hint_message = fmt::format("Backup failed: invalid backup_provider_type {}.",
request.backup_provider_type);
return;
}

for (const auto &it : _backup_states) {
const auto &tmp_engine = it.second;
if (app_id == tmp_engine->get_backup_app_id() && tmp_engine->is_backing_up()) {
response.err = ERR_INVALID_STATE;
response.hint_message = fmt::format("Backup failed: app {} is backing up now.", app_id);
return;
}
}

err = engine->run();
if (err == ERR_OK) {
int64_t backup_id = engine->get_current_backup_id();
_backup_states.emplace(backup_id, std::move(engine));
}
response.err = err;
}

} // namespace replication
} // namespace dsn
7 changes: 6 additions & 1 deletion src/meta/meta_backup_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <dsn/perf_counter/perf_counter_wrapper.h>
#include <gtest/gtest_prod.h>

#include "backup_engine.h"
#include "meta_data.h"

namespace dsn {
Expand All @@ -42,6 +43,7 @@ typedef rpc_holder<configuration_query_backup_policy_request,
typedef rpc_holder<configuration_modify_backup_policy_request,
configuration_modify_backup_policy_response>
configuration_modify_backup_policy_rpc;
typedef rpc_holder<start_backup_app_request, start_backup_app_response> start_backup_app_rpc;

struct backup_info_status
{
Expand Down Expand Up @@ -339,6 +341,7 @@ class backup_service
void add_backup_policy(dsn::message_ex* msg);
void query_backup_policy(query_backup_policy_rpc rpc);
void modify_backup_policy(configuration_modify_backup_policy_rpc rpc);
void start_backup_app(start_backup_app_rpc rpc);

// compose the absolute path(AP) for policy
// input:
Expand Down Expand Up @@ -375,10 +378,12 @@ class backup_service
meta_service *_meta_svc;
server_state *_state;

// _lock is only used to lock _policy_states
// lock _policy_states and _backup_states.
zlock _lock;
std::map<std::string, std::shared_ptr<policy_context>>
_policy_states; // policy_name -> policy_context
// backup_id -> backup_engine
std::unordered_map<int32_t, std::shared_ptr<backup_engine>> _backup_states;

// the root of policy metas, stored on remote_storage(zookeeper)
std::string _policy_meta_root;
Expand Down
15 changes: 15 additions & 0 deletions src/meta/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ void meta_service::register_rpc_handlers()
register_rpc_handler_with_rpc_holder(RPC_CM_QUERY_BULK_LOAD_STATUS,
"query_bulk_load_status",
&meta_service::on_query_bulk_load_status);
register_rpc_handler_with_rpc_holder(
RPC_CM_START_BACKUP_APP, "start_backup_app", &meta_service::on_start_backup_app);
}

int meta_service::check_leader(dsn::message_ex *req, dsn::rpc_address *forward_address)
Expand Down Expand Up @@ -1123,5 +1125,18 @@ void meta_service::on_query_bulk_load_status(query_bulk_load_rpc rpc)
_bulk_load_svc->on_query_bulk_load_status(std::move(rpc));
}

void meta_service::on_start_backup_app(start_backup_app_rpc rpc)
{
if (!check_status(rpc)) {
return;
}
if (_backup_handler == nullptr) {
derror_f("meta doesn't enable backup service");
rpc.response().err = ERR_SERVICE_NOT_ACTIVE;
return;
}
_backup_handler->start_backup_app(std::move(rpc));
}

} // namespace replication
} // namespace dsn
3 changes: 3 additions & 0 deletions src/meta/meta_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ class meta_service : public serverlet<meta_service>
// meta control
void on_control_meta_level(configuration_meta_control_rpc rpc);
void on_start_recovery(configuration_recovery_rpc rpc);

// backup/restore
void on_start_backup_app(start_backup_app_rpc rpc);
void on_start_restore(dsn::message_ex *req);
void on_add_backup_policy(dsn::message_ex *req);
void on_query_backup_policy(query_backup_policy_rpc policy_rpc);
Expand Down

0 comments on commit d03d910

Please sign in to comment.