Skip to content

Commit

Permalink
sync thrift modification of incremental backup and restore (#4527)
Browse files Browse the repository at this point in the history
Co-authored-by: Sophie <[email protected]>
  • Loading branch information
panda-sheep and Sophie-Xie authored Aug 22, 2022
1 parent ff8daf1 commit 891b2c6
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 22 deletions.
3 changes: 2 additions & 1 deletion src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2792,7 +2792,8 @@ folly::Future<StatusOr<bool>> MetaClient::createSnapshot() {

folly::Future<StatusOr<bool>> MetaClient::dropSnapshot(const std::string& name) {
cpp2::DropSnapshotReq req;
req.name_ref() = name;
std::vector<std::string> names{name};
req.names_ref() = names;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
Expand Down
12 changes: 8 additions & 4 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,11 @@ struct Duration {
} (cpp.type = "nebula::Duration")

struct LogInfo {
1: LogID log_id;
2: TermID term_id;
1: LogID log_id,
2: TermID term_id,
3: LogID commit_log_id,
// storage part checkpoint directory name
4: binary checkpoint_path,
}

struct DirInfo {
Expand All @@ -249,9 +252,10 @@ struct DirInfo {

struct CheckpointInfo {
1: GraphSpaceID space_id,
// Only part of the leader
2: map<PartitionID, LogInfo> (cpp.template = "std::unordered_map") parts,
// storage checkpoint directory name
3: binary path,
// The datapath corresponding to the current checkpointInfo
3: binary data_path,
}

// used for drainer
Expand Down
29 changes: 26 additions & 3 deletions src/interface/meta.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ struct CreateSnapshotReq {
}

struct DropSnapshotReq {
1: binary name,
1: list<binary> names,
}

struct ListSnapshotsReq {
Expand Down Expand Up @@ -953,7 +953,7 @@ struct SpaceBackupInfo {
}

struct BackupMeta {
// space_name => SpaceBackupInfo
// spaceId => SpaceBackupInfo
1: map<common.GraphSpaceID, SpaceBackupInfo>(cpp.template = "std::unordered_map") space_backups,
// sst file
2: list<binary> meta_files,
Expand All @@ -962,11 +962,21 @@ struct BackupMeta {
4: bool full,
5: bool all_spaces,
6: i64 create_time,
7: binary base_backup_name,
8: list<common.HostAddr> storage_hosts,
// The clusterId of the current cluster
9: ClusterID cluster_id,
}

struct CreateBackupReq {
// null means all spaces
1: optional list<binary> spaces,
// When empty, it means full backup
// When there is a value, it indicates the last incremental backup
2: optional binary base_backup_name,
// The clusterId of the cluster corresponding to base_backup_name
3: optional ClusterID cluster_id,

}

struct CreateBackupResp {
Expand All @@ -985,6 +995,19 @@ struct RestoreMetaReq {
2: list<HostPair> hosts,
}

struct PartInfo {
1: common.PartitionID part_id,
2: list<common.HostAddr> hosts,
}

struct RestoreMetaResp {
1: common.ErrorCode code,
// Valid if ret equals E_LEADER_CHANGED.
2: common.HostAddr leader,
3: map<common.GraphSpaceID, list<PartInfo>>
(cpp.template = "std::unordered_map") part_hosts,
}

enum ExternalServiceType {
ELASTICSEARCH = 0x01,
} (cpp.enum_strict)
Expand Down Expand Up @@ -1271,7 +1294,7 @@ service MetaService {

// Interfaces for backup and restore
CreateBackupResp createBackup(1: CreateBackupReq req);
ExecResp restoreMeta(1: RestoreMetaReq req);
RestoreMetaResp restoreMeta(1: RestoreMetaReq req);
ListClusterInfoResp listCluster(1: ListClusterInfoReq req);
GetMetaDirInfoResp getMetaDirInfo(1: GetMetaDirInfoReq req);

Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1137,7 +1137,7 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::CheckpointInfo>> NebulaStore:
}

nebula::cpp2::CheckpointInfo cpInfo;
cpInfo.path_ref() = std::move(result.value());
cpInfo.data_path_ref() = std::move(result.value());
cpInfo.parts_ref() = std::move(partsInfo);
cpInfo.space_id_ref() = spaceId;
cpInfoList.emplace_back(std::move(cpInfo));
Expand Down
4 changes: 2 additions & 2 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ folly::Future<cpp2::ListSnapshotsResp> MetaServiceHandler::future_listSnapshots(

folly::Future<cpp2::CreateBackupResp> MetaServiceHandler::future_createBackup(
const cpp2::CreateBackupReq& req) {
auto* processor = CreateBackupProcessor::instance(kvstore_, adminClient_.get());
auto* processor = CreateBackupProcessor::instance(kvstore_, adminClient_.get(), clusterId_);
RETURN_FUTURE(processor);
}

Expand Down Expand Up @@ -474,7 +474,7 @@ folly::Future<cpp2::ListListenerResp> MetaServiceHandler::future_listListener(
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp> MetaServiceHandler::future_restoreMeta(
folly::Future<cpp2::RestoreMetaResp> MetaServiceHandler::future_restoreMeta(
const cpp2::RestoreMetaReq& req) {
auto* processor = RestoreProcessor::instance(kvstore_);
RETURN_FUTURE(processor);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/MetaServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class MetaServiceHandler final : public cpp2::MetaServiceSvIf {
folly::Future<cpp2::ListListenerResp> future_listListener(
const cpp2::ListListenerReq& req) override;

folly::Future<cpp2::ExecResp> future_restoreMeta(const cpp2::RestoreMetaReq& req) override;
folly::Future<cpp2::RestoreMetaResp> future_restoreMeta(const cpp2::RestoreMetaReq& req) override;

folly::Future<cpp2::GetStatsResp> future_getStats(const cpp2::GetStatsReq& req) override;

Expand Down
12 changes: 8 additions & 4 deletions src/meta/processors/admin/CreateBackupProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ namespace meta {
*/
class CreateBackupProcessor : public BaseProcessor<cpp2::CreateBackupResp> {
public:
static CreateBackupProcessor* instance(kvstore::KVStore* kvstore, AdminClient* client) {
return new CreateBackupProcessor(kvstore, client);
static CreateBackupProcessor* instance(kvstore::KVStore* kvstore,
AdminClient* client,
ClusterID clusterId = 0) {
return new CreateBackupProcessor(kvstore, client, clusterId);
}

void process(const cpp2::CreateBackupReq& req);

private:
CreateBackupProcessor(kvstore::KVStore* kvstore, AdminClient* client)
: BaseProcessor<cpp2::CreateBackupResp>(kvstore), client_(client) {}
CreateBackupProcessor(kvstore::KVStore* kvstore, AdminClient* client, ClusterID clusterId)
: BaseProcessor<cpp2::CreateBackupResp>(kvstore), client_(client), clusterId_(clusterId) {}

nebula::cpp2::ErrorCode cancelWriteBlocking();

Expand All @@ -35,6 +37,8 @@ class CreateBackupProcessor : public BaseProcessor<cpp2::CreateBackupResp> {

private:
AdminClient* client_;

ClusterID clusterId_{0};
};

} // namespace meta
Expand Down
3 changes: 2 additions & 1 deletion src/meta/processors/admin/DropSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ namespace nebula {
namespace meta {

void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
auto& snapshot = req.get_name();
auto& snapshots = req.get_names();
auto snapshot = snapshots[0];
folly::SharedMutex::WriteHolder holder(LockUtils::snapshotLock());

// Check snapshot is exists
Expand Down
5 changes: 3 additions & 2 deletions src/meta/processors/admin/RestoreProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@ namespace meta {
* 3. machine info: machine key -> ""
*
*/
class RestoreProcessor : public BaseProcessor<cpp2::ExecResp> {
class RestoreProcessor : public BaseProcessor<cpp2::RestoreMetaResp> {
public:
static RestoreProcessor* instance(kvstore::KVStore* kvstore) {
return new RestoreProcessor(kvstore);
}
void process(const cpp2::RestoreMetaReq& req);

private:
explicit RestoreProcessor(kvstore::KVStore* kvstore) : BaseProcessor<cpp2::ExecResp>(kvstore) {}
explicit RestoreProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::RestoreMetaResp>(kvstore) {}

nebula::cpp2::ErrorCode replaceHostInPartition(kvstore::WriteBatch* batch,
std::map<HostAddr, HostAddr>& hostMap);
Expand Down
2 changes: 1 addition & 1 deletion src/meta/test/AdminClientTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf {
storage::cpp2::ResponseCommon result;
resp.code_ref() = nebula::cpp2::ErrorCode::SUCCEEDED;
nebula::cpp2::CheckpointInfo cpInfo;
cpInfo.path_ref() = "snapshot_path";
cpInfo.data_path_ref() = "snapshot_path";
resp.info_ref() = {cpInfo};
pro.setValue(std::move(resp));
return f;
Expand Down
4 changes: 2 additions & 2 deletions src/meta/test/CreateBackupProcessorTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TestStorageService : public storage::cpp2::StorageAdminServiceSvIf {
logInfo.term_id_ref() = termId;
info.emplace(1, std::move(logInfo));
nebula::cpp2::CheckpointInfo cpInfo;
cpInfo.path_ref() = "snapshot_path";
cpInfo.data_path_ref() = "snapshot_path";
cpInfo.parts_ref() = std::move(info);
cpInfo.space_id_ref() = req.get_space_ids()[0];
resp.info_ref() = {cpInfo};
Expand Down Expand Up @@ -211,7 +211,7 @@ TEST(ProcessorTest, CreateBackupTest) {
ASSERT_EQ(1, spaceBackup.get_host_backups()[0].get_checkpoints().size());

auto checkInfo = spaceBackup.get_host_backups()[0].get_checkpoints()[0];
ASSERT_EQ("snapshot_path", checkInfo.get_path());
ASSERT_EQ("snapshot_path", checkInfo.get_data_path());
ASSERT_TRUE(meta.get_full());
ASSERT_FALSE(meta.get_all_spaces());
auto parts = checkInfo.get_parts();
Expand Down

0 comments on commit 891b2c6

Please sign in to comment.