Skip to content

Commit

Permalink
using admin client in MetaServiceHandler (#1864)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaphet authored Mar 25, 2020
1 parent 6499c64 commit c2ea941
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 43 deletions.
4 changes: 2 additions & 2 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ MetaServiceHandler::future_listConfigs(const cpp2::ListConfigsReq &req) {

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createSnapshot(const cpp2::CreateSnapshotReq& req) {
auto* processor = CreateSnapshotProcessor::instance(kvstore_);
auto* processor = CreateSnapshotProcessor::instance(kvstore_, adminClient_.get());
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_dropSnapshot(const cpp2::DropSnapshotReq& req) {
auto* processor = DropSnapshotProcessor::instance(kvstore_);
auto* processor = DropSnapshotProcessor::instance(kvstore_, adminClient_.get());
RETURN_FUTURE(processor);
}

Expand Down
6 changes: 3 additions & 3 deletions src/meta/processors/admin/CreateSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
}

// step 2 : Blocking all writes action for storage engines.
auto signRet = Snapshot::instance(kvstore_)->blockingWrites(SignType::BLOCK_ON);
auto signRet = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_ON);
if (signRet != cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Send blocking sign to storage engine error";
handleErrorCode(signRet);
Expand All @@ -50,7 +50,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
}

// step 3 : Create checkpoint for all storage engines and meta engine.
auto csRet = Snapshot::instance(kvstore_)->createSnapshot(snapshot);
auto csRet = Snapshot::instance(kvstore_, client_)->createSnapshot(snapshot);
if (csRet != cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Checkpoint create error on storage engine";
handleErrorCode(csRet);
Expand Down Expand Up @@ -101,7 +101,7 @@ std::string CreateSnapshotProcessor::genSnapshotName() {
}

cpp2::ErrorCode CreateSnapshotProcessor::cancelWriteBlocking() {
auto signRet = Snapshot::instance(kvstore_)->blockingWrites(SignType::BLOCK_OFF);
auto signRet = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF);
if (signRet != cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Cancel write blocking error";
return signRet;
Expand Down
14 changes: 10 additions & 4 deletions src/meta/processors/admin/CreateSnapshotProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,30 @@

#include <gtest/gtest_prod.h>
#include "meta/processors/BaseProcessor.h"
#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {

class CreateSnapshotProcessor : public BaseProcessor<cpp2::ExecResp> {
public:
static CreateSnapshotProcessor* instance(kvstore::KVStore* kvstore) {
return new CreateSnapshotProcessor(kvstore);
static CreateSnapshotProcessor* instance(kvstore::KVStore* kvstore,
AdminClient* client) {
return new CreateSnapshotProcessor(kvstore, client);
}
void process(const cpp2::CreateSnapshotReq& req);

cpp2::ErrorCode cancelWriteBlocking();

private:
explicit CreateSnapshotProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}
explicit CreateSnapshotProcessor(kvstore::KVStore* kvstore,
AdminClient* client)
: BaseProcessor<cpp2::ExecResp>(kvstore), client_(client) {}

std::string genSnapshotName();

private:
AdminClient* client_;
};
} // namespace meta
} // namespace nebula
Expand Down
7 changes: 4 additions & 3 deletions src/meta/processors/admin/DropSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
}

auto hosts = MetaServiceUtils::parseSnapshotHosts(val);
auto peers = NetworkUtils::toHosts(hosts);
if (!peers.ok()) {
auto peersRet = NetworkUtils::toHosts(hosts);
if (!peersRet.ok()) {
LOG(ERROR) << "Get checkpoint hosts error";
handleErrorCode(cpp2::ErrorCode::E_SNAPSHOT_FAILURE);
onFinished();
return;
}

std::vector<kvstore::KV> data;
auto dsRet = Snapshot::instance(kvstore_)->dropSnapshot(snapshot, std::move(peers.value()));
auto peers = peersRet.value();
auto dsRet = Snapshot::instance(kvstore_, client_)->dropSnapshot(snapshot, std::move(peers));
if (dsRet != cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Drop snapshot error on storage engine";
// Need update the snapshot status to invalid, maybe some storage engine drop done.
Expand Down
15 changes: 11 additions & 4 deletions src/meta/processors/admin/DropSnapshotProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@

#include <gtest/gtest_prod.h>
#include "meta/processors/BaseProcessor.h"
#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {

class DropSnapshotProcessor : public BaseProcessor<cpp2::ExecResp> {
public:
static DropSnapshotProcessor* instance(kvstore::KVStore* kvstore) {
return new DropSnapshotProcessor(kvstore);
static DropSnapshotProcessor* instance(kvstore::KVStore* kvstore,
AdminClient* client) {
return new DropSnapshotProcessor(kvstore, client);
}

void process(const cpp2::DropSnapshotReq& req);

private:
explicit DropSnapshotProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}
explicit DropSnapshotProcessor(kvstore::KVStore* kvstore,
AdminClient* client)
: BaseProcessor<cpp2::ExecResp>(kvstore), client_(client) {}

private:
AdminClient* client_;
};
} // namespace meta
} // namespace nebula
Expand Down
11 changes: 4 additions & 7 deletions src/meta/processors/admin/SnapShot.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ namespace meta {

class Snapshot {
public:
static Snapshot* instance(kvstore::KVStore* kv) {
static std::unique_ptr<AdminClient> client(new AdminClient(kv));
static std::unique_ptr<Snapshot> snapshot(new Snapshot(kv, std::move(client)));
static Snapshot* instance(kvstore::KVStore* kv, AdminClient* client) {
static std::unique_ptr<Snapshot> snapshot(new Snapshot(kv, client));
return snapshot.get();
}

Expand All @@ -37,17 +36,15 @@ class Snapshot {
getLeaderParts(HostLeaderMap *hostLeaderMap, GraphSpaceID spaceId);

private:
Snapshot(kvstore::KVStore* kv, std::unique_ptr<AdminClient> client)
: kv_(kv)
, client_(std::move(client)) {
Snapshot(kvstore::KVStore* kv, AdminClient* client) : kv_(kv), client_(client) {
executor_.reset(new folly::CPUThreadPoolExecutor(1));
}

bool getAllSpaces(std::vector<GraphSpaceID>& spaces, kvstore::ResultCode& retCode);

private:
kvstore::KVStore* kv_{nullptr};
std::unique_ptr<AdminClient> client_{nullptr};
AdminClient* client_{nullptr};
std::unique_ptr<folly::Executor> executor_;
};

Expand Down
12 changes: 2 additions & 10 deletions src/storage/admin/CreateCheckpointProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,10 @@ namespace nebula {
namespace storage {

void CreateCheckpointProcessor::process(const cpp2::CreateCPRequest& req) {
if (FLAGS_store_type != "nebula") {
cpp2::ResultCode thriftRet;
thriftRet.set_code(cpp2::ErrorCode::E_FAILED_TO_CHECKPOINT);
codes_.emplace_back(thriftRet);
onFinished();
return;
}

CHECK_NOTNULL(kvstore_);
auto spaceId = req.get_space_id();
auto& name = req.get_name();
auto* store = dynamic_cast<kvstore::NebulaStore*>(kvstore_);
auto retCode = store->createCheckpoint(spaceId, std::move(name));
auto retCode = kvstore_->createCheckpoint(spaceId, std::move(name));
if (retCode != kvstore::ResultCode::SUCCEEDED) {
cpp2::ResultCode thriftRet;
thriftRet.set_code(to(retCode));
Expand Down
12 changes: 2 additions & 10 deletions src/storage/admin/DropCheckpointProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,10 @@ namespace nebula {
namespace storage {

void DropCheckpointProcessor::process(const cpp2::DropCPRequest& req) {
if (FLAGS_store_type != "nebula") {
cpp2::ResultCode thriftRet;
thriftRet.set_code(cpp2::ErrorCode::E_FAILED_TO_CHECKPOINT);
codes_.emplace_back(thriftRet);
onFinished();
return;
}

CHECK_NOTNULL(kvstore_);
auto spaceId = req.get_space_id();
auto& name = req.get_name();
auto* store = dynamic_cast<kvstore::NebulaStore*>(kvstore_);
auto retCode = store->dropCheckpoint(spaceId, std::move(name));
auto retCode = kvstore_->dropCheckpoint(spaceId, std::move(name));
if (retCode != kvstore::ResultCode::SUCCEEDED) {
cpp2::ResultCode thriftRet;
thriftRet.set_code(to(retCode));
Expand Down

0 comments on commit c2ea941

Please sign in to comment.