Skip to content

Commit

Permalink
feat(disk_balance): add do_disk_migrate_replica to support migrate …
Browse files Browse the repository at this point in the history
…origin data (#664)
  • Loading branch information
foreverneverer authored Nov 25, 2020
1 parent f80fba8 commit c999305
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 40 deletions.
2 changes: 1 addition & 1 deletion include/dsn/dist/replication/replication.codes.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ MAKE_EVENT_CODE(LPC_QUERY_CONFIGURATION_ALL, TASK_PRIORITY_HIGH)
MAKE_EVENT_CODE(LPC_MEM_RELEASE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_CREATE_CHILD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_QUERY_DISK_INFO, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_REPLICA_DISK_MIGRATE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_DETECT_HOTKEY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_ANALYZE_HOTKEY, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_BULK_LOAD_INGESTION, TASK_PRIORITY_HIGH)
Expand Down Expand Up @@ -158,7 +159,6 @@ MAKE_EVENT_CODE_RPC(RPC_SPLIT_NOTIFY_CATCH_UP, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_SPLIT_UPDATE_CHILD_PARTITION_COUNT, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_GROUP_BULK_LOAD, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE_RPC(RPC_REPLICA_DISK_MIGRATE, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_LOW, TASK_PRIORITY_LOW)
MAKE_EVENT_CODE(LPC_REPLICATION_COMMON, TASK_PRIORITY_COMMON)
MAKE_EVENT_CODE(LPC_REPLICATION_HIGH, TASK_PRIORITY_HIGH)
Expand Down
1 change: 1 addition & 0 deletions include/dsn/dist/replication/replication_app_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ class replication_app_base : public replica_base
friend class replica;
friend class replica_stub;
friend class mock_replica;
friend class replica_disk_migrator;

::dsn::error_code open_internal(replica *r);
::dsn::error_code
Expand Down
2 changes: 2 additions & 0 deletions src/replica/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ class replica : public serverlet<replica>, public ref_counter, public replica_ba
friend class replica_bulk_loader;
friend class replica_split_manager;
friend class replica_disk_migrator;
friend class replica_disk_test;
friend class replica_disk_migrate_test;

// replica configuration, updated by update_local_configuration ONLY
replica_configuration _config;
Expand Down
200 changes: 181 additions & 19 deletions src/replica/replica_disk_migrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,60 @@
#include "replica/replica_stub.h"
#include "replica_disk_migrator.h"

#include <boost/algorithm/string/replace.hpp>
#include <dsn/utility/filesystem.h>
#include <dsn/dist/fmt_logging.h>
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/utility/fail_point.h>

namespace dsn {
namespace replication {

const std::string replica_disk_migrator::kReplicaDirTempSuffix = ".disk.balance.tmp";
const std::string replica_disk_migrator::kDataDirFolder = "data/rdb/";
const std::string replica_disk_migrator::kAppInfo = ".app-info";

replica_disk_migrator::replica_disk_migrator(replica *r) : replica_base(r), _replica(r) {}

replica_disk_migrator::~replica_disk_migrator() = default;

// THREAD_POOL_REPLICATION
void replica_disk_migrator::on_migrate_replica(const replica_disk_migrate_request &req,
/*out*/ replica_disk_migrate_response &resp)
// THREAD_POOL_DEFAULT
void replica_disk_migrator::on_migrate_replica(replica_disk_migrate_rpc rpc)
{
if (!check_migration_args(req, resp)) {
return;
}
tasking::enqueue(
LPC_REPLICATION_COMMON,
_replica->tracker(),
[=]() {

_status = disk_migration_status::MOVING;
ddebug_replica(
"received replica disk migrate request(origin={}, target={}), update status from {}=>{}",
req.origin_disk,
req.target_disk,
enum_to_string(disk_migration_status::IDLE),
enum_to_string(status()));
if (!check_migration_args(rpc)) {
return;
}

tasking::enqueue(
LPC_REPLICATION_LONG_COMMON, _replica->tracker(), [=]() { migrate_replica(req); });
_status = disk_migration_status::MOVING;
ddebug_replica(
"received replica disk migrate request(origin={}, target={}), update status "
"from {}=>{}",
rpc.request().origin_disk,
rpc.request().target_disk,
enum_to_string(disk_migration_status::IDLE),
enum_to_string(status()));

const auto request = rpc.request();
tasking::enqueue(LPC_REPLICATION_LONG_COMMON, _replica->tracker(), [=]() {
migrate_replica(request);
});
},
get_gpid().thread_hash());
}

bool replica_disk_migrator::check_migration_args(const replica_disk_migrate_request &req,
/*out*/ replica_disk_migrate_response &resp)
// THREAD_POOL_REPLICATION
bool replica_disk_migrator::check_migration_args(replica_disk_migrate_rpc rpc)
{
_replica->_checker.only_one_thread_access();

const replica_disk_migrate_request &req = rpc.request();
replica_disk_migrate_response &resp = rpc.response();

// TODO(jiashuo1) may need manager control migration flow
if (status() != disk_migration_status::IDLE) {
std::string err_msg =
Expand Down Expand Up @@ -148,12 +168,154 @@ bool replica_disk_migrator::check_migration_args(const replica_disk_migrate_requ
return true;
}

// TODO(jiashuo1)
// THREAD_POOL_REPLICATION_LONG
void replica_disk_migrator::migrate_replica(const replica_disk_migrate_request &req) {}
void replica_disk_migrator::migrate_replica(const replica_disk_migrate_request &req)
{
if (status() != disk_migration_status::MOVING) {
derror_replica("disk migration(origin={}, target={}), err = Invalid migration status({})",
req.origin_disk,
req.target_disk,
enum_to_string(status()));
reset_status();
return;
}

if (init_target_dir(req) && migrate_replica_checkpoint(req) && migrate_replica_app_info(req)) {
_status = disk_migration_status::MOVED;
ddebug_replica("disk migration(origin={}, target={}) copy data complete, update status "
"from {}=>{}, ready to "
"close origin replica({})",
req.origin_disk,
req.target_disk,
enum_to_string(disk_migration_status::MOVING),
enum_to_string(status()),
_replica->dir());

close_current_replica();
}
}

// THREAD_POOL_REPLICATION_LONG
bool replica_disk_migrator::init_target_dir(const replica_disk_migrate_request &req)
{
FAIL_POINT_INJECT_F("init_target_dir", [this](string_view) -> bool {
reset_status();
return false;
});
// replica_dir: /root/origin_disk_tag/gpid.app_type
std::string replica_dir = _replica->dir();
// using origin dir to init new dir
boost::replace_first(replica_dir, req.origin_disk, req.target_disk);
if (utils::filesystem::directory_exists(replica_dir)) {
derror_replica("migration target replica dir({}) has existed", replica_dir);
reset_status();
return false;
}

// _target_replica_dir = /root/target_disk_tag/gpid.app_type.disk.balance.tmp, it will update to
// /root/target_disk_tag/gpid.app_type in replica_disk_migrator::update_replica_dir finally
_target_replica_dir = fmt::format("{}{}", replica_dir, kReplicaDirTempSuffix);
if (utils::filesystem::directory_exists(_target_replica_dir)) {
dwarn_replica("disk migration(origin={}, target={}) target replica dir({}) has existed, "
"delete it now",
req.origin_disk,
req.target_disk,
_target_replica_dir);
utils::filesystem::remove_path(_target_replica_dir);
}

// _target_replica_data_dir = /root/gpid.app_type.disk.balance.tmp/data/rdb, it will update to
// /root/target/gpid.app_type/data/rdb in replica_disk_migrator::update_replica_dir finally
_target_data_dir = utils::filesystem::path_combine(_target_replica_dir, kDataDirFolder);
if (!utils::filesystem::create_directory(_target_data_dir)) {
derror_replica(
"disk migration(origin={}, target={}) create target temp data dir({}) failed",
req.origin_disk,
req.target_disk,
_target_data_dir);
reset_status();
return false;
}

return true;
}

// THREAD_POOL_REPLICATION_LONG
bool replica_disk_migrator::migrate_replica_checkpoint(const replica_disk_migrate_request &req)
{
FAIL_POINT_INJECT_F("migrate_replica_checkpoint", [this](string_view) -> bool {
reset_status();
return false;
});

const auto &sync_checkpoint_err = _replica->get_app()->sync_checkpoint();
if (sync_checkpoint_err != ERR_OK) {
derror_replica("disk migration(origin={}, target={}) sync_checkpoint failed({})",
req.origin_disk,
req.target_disk,
sync_checkpoint_err.to_string());
reset_status();
return false;
}

const auto &copy_checkpoint_err =
_replica->get_app()->copy_checkpoint_to_dir(_target_data_dir.c_str(), 0 /*last_decree*/);
if (copy_checkpoint_err != ERR_OK) {
derror_replica("disk migration(origin={}, target={}) copy checkpoint to dir({}) "
"failed(error={}), the dir({}) will be deleted",
req.origin_disk,
req.target_disk,
_target_data_dir,
copy_checkpoint_err.to_string(),
_target_replica_dir);
reset_status();
utils::filesystem::remove_path(_target_replica_dir);
return false;
}

return true;
}

// THREAD_POOL_REPLICATION_LONG
bool replica_disk_migrator::migrate_replica_app_info(const replica_disk_migrate_request &req)
{
FAIL_POINT_INJECT_F("migrate_replica_app_info", [this](string_view) -> bool {
reset_status();
return false;
});
replica_init_info init_info = _replica->get_app()->init_info();
const auto &store_init_info_err = init_info.store(_target_replica_dir);
if (store_init_info_err != ERR_OK) {
derror_replica("disk migration(origin={}, target={}) stores app init info failed({})",
req.origin_disk,
req.target_disk,
store_init_info_err.to_string());
reset_status();
return false;
}

replica_app_info info(&_replica->_app_info);
const auto &path = utils::filesystem::path_combine(_target_replica_dir, kAppInfo);
info.store(path.c_str());
const auto &store_info_err = info.store(path.c_str());
if (store_info_err != ERR_OK) {
derror_replica("disk migration(origin={}, target={}) stores app info failed({})",
req.origin_disk,
req.target_disk,
store_info_err.to_string());
reset_status();
return false;
}

return true;
}

// TODO(jiashuo1)
// THREAD_POOL_REPLICATION_LONG
void replica_disk_migrator::close_current_replica() {}

// TODO(jiashuo1)
// run in replica::close_replica of THREAD_POOL_REPLICATION_LONG
void replica_disk_migrator::update_replica_dir() {}
} // namespace replication
} // namespace dsn
22 changes: 16 additions & 6 deletions src/replica/replica_disk_migrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,38 @@ class replica_disk_migrator : replica_base
explicit replica_disk_migrator(replica *r);
~replica_disk_migrator();

void on_migrate_replica(const replica_disk_migrate_request &req,
/*out*/ replica_disk_migrate_response &resp);
void on_migrate_replica(replica_disk_migrate_rpc rpc);

disk_migration_status::type status() const { return _status; }

void set_status(const disk_migration_status::type &status) { _status = status; }

private:
bool check_migration_args(const replica_disk_migrate_request &req,
/*out*/ replica_disk_migrate_response &resp);
bool check_migration_args(replica_disk_migrate_rpc rpc);

// TODO(jiashuo1)
void migrate_replica(const replica_disk_migrate_request &req);

// TODO(jiashuo1)
bool init_target_dir(const replica_disk_migrate_request &req);
bool migrate_replica_checkpoint(const replica_disk_migrate_request &req);
bool migrate_replica_app_info(const replica_disk_migrate_request &req);

void close_current_replica();
void update_replica_dir();

void reset_status() { _status = disk_migration_status::IDLE; }

private:
const static std::string kReplicaDirTempSuffix;
const static std::string kDataDirFolder;
const static std::string kAppInfo;

replica *_replica;

std::string _target_replica_dir; // /root/ssd_tag/gpid.pegasus/
std::string _target_data_dir; // /root/ssd_tag/gpid.pegasus/data/rdb
disk_migration_status::type _status{disk_migration_status::IDLE};

friend class replica;
friend class replica_disk_migrate_test;
};

Expand Down
19 changes: 7 additions & 12 deletions src/replica/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,7 @@ void replica_stub::on_disk_migrate(replica_disk_migrate_rpc rpc)

replica_ptr rep = get_replica(request.pid);
if (rep != nullptr) {
rep->disk_migrator()->on_migrate_replica(request, response);
rep->disk_migrator()->on_migrate_replica(rpc); // THREAD_POOL_DEFAULT
} else {
response.err = ERR_OBJECT_NOT_FOUND;
}
Expand Down Expand Up @@ -1624,21 +1624,16 @@ void replica_stub::on_gc()
//
// How to trigger memtable flush?
// we add a parameter `is_emergency' in dsn_app_async_checkpoint() function, when set true,
// the undering
// storage system should flush memtable as soon as possiable.
// the undering storage system should flush memtable as soon as possiable.
//
// When to trigger memtable flush?
// 1. Using `[replication].checkpoint_max_interval_hours' option, we can set max interval time
// of two
// adjacent checkpoints; If the time interval is arrived, then emergency checkpoint will be
// triggered.
// of two adjacent checkpoints; If the time interval is arrived, then emergency checkpoint
// will be triggered.
// 2. Using `[replication].log_shared_file_count_limit' option, we can set max file count of
// shared log;
// If the limit is exceeded, then emergency checkpoint will be triggered; Instead of
// triggering all
// replicas to do checkpoint, we will only trigger a few of necessary replicas which block
// garbage
// collection of the oldest log file.
// shared log; If the limit is exceeded, then emergency checkpoint will be triggered; Instead
// of triggering all replicas to do checkpoint, we will only trigger a few of necessary
// replicas which block garbage collection of the oldest log file.
//
if (_log != nullptr) {
replica_log_info_map gc_condition;
Expand Down
6 changes: 5 additions & 1 deletion src/replica/test/mock_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ class mock_replication_app_base : public replication_app_base
error_code copy_checkpoint_to_dir(const char *checkpoint_dir,
/*output*/ int64_t *last_decree) override
{
*last_decree = _decree;
if (last_decree != nullptr) {
*last_decree = _decree;
}

utils::filesystem::create_file(fmt::format("{}/checkpoint.file", checkpoint_dir));
return ERR_OK;
}
int on_request(message_ex *request) override { return 0; }
Expand Down
Loading

0 comments on commit c999305

Please sign in to comment.