Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using admin client in MetaServiceHandler #1864

Merged
merged 2 commits into from
Mar 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/meta/MetaServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ MetaServiceHandler::future_listConfigs(const cpp2::ListConfigsReq &req) {

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_createSnapshot(const cpp2::CreateSnapshotReq& req) {
auto* processor = CreateSnapshotProcessor::instance(kvstore_);
auto* processor = CreateSnapshotProcessor::instance(kvstore_, adminClient_.get());
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResp>
MetaServiceHandler::future_dropSnapshot(const cpp2::DropSnapshotReq& req) {
auto* processor = DropSnapshotProcessor::instance(kvstore_);
auto* processor = DropSnapshotProcessor::instance(kvstore_, adminClient_.get());
RETURN_FUTURE(processor);
}

Expand Down
6 changes: 3 additions & 3 deletions src/meta/processors/admin/CreateSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
}

// step 2 : Blocking all writes action for storage engines.
auto signRet = Snapshot::instance(kvstore_)->blockingWrites(SignType::BLOCK_ON);
auto signRet = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_ON);
if (signRet != cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Send blocking sign to storage engine error";
handleErrorCode(signRet);
Expand All @@ -50,7 +50,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) {
}

// step 3 : Create checkpoint for all storage engines and meta engine.
auto csRet = Snapshot::instance(kvstore_)->createSnapshot(snapshot);
auto csRet = Snapshot::instance(kvstore_, client_)->createSnapshot(snapshot);
if (csRet != cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Checkpoint create error on storage engine";
handleErrorCode(csRet);
Expand Down Expand Up @@ -101,7 +101,7 @@ std::string CreateSnapshotProcessor::genSnapshotName() {
}

cpp2::ErrorCode CreateSnapshotProcessor::cancelWriteBlocking() {
auto signRet = Snapshot::instance(kvstore_)->blockingWrites(SignType::BLOCK_OFF);
auto signRet = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF);
if (signRet != cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Cancel write blocking error";
return signRet;
Expand Down
14 changes: 10 additions & 4 deletions src/meta/processors/admin/CreateSnapshotProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,30 @@

#include <gtest/gtest_prod.h>
#include "meta/processors/BaseProcessor.h"
#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {

class CreateSnapshotProcessor : public BaseProcessor<cpp2::ExecResp> {
public:
static CreateSnapshotProcessor* instance(kvstore::KVStore* kvstore) {
return new CreateSnapshotProcessor(kvstore);
static CreateSnapshotProcessor* instance(kvstore::KVStore* kvstore,
AdminClient* client) {
return new CreateSnapshotProcessor(kvstore, client);
}
void process(const cpp2::CreateSnapshotReq& req);

cpp2::ErrorCode cancelWriteBlocking();

private:
explicit CreateSnapshotProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}
explicit CreateSnapshotProcessor(kvstore::KVStore* kvstore,
AdminClient* client)
: BaseProcessor<cpp2::ExecResp>(kvstore), client_(client) {}

std::string genSnapshotName();

private:
AdminClient* client_;
};
} // namespace meta
} // namespace nebula
Expand Down
7 changes: 4 additions & 3 deletions src/meta/processors/admin/DropSnapshotProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,17 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) {
}

auto hosts = MetaServiceUtils::parseSnapshotHosts(val);
auto peers = NetworkUtils::toHosts(hosts);
if (!peers.ok()) {
auto peersRet = NetworkUtils::toHosts(hosts);
if (!peersRet.ok()) {
LOG(ERROR) << "Get checkpoint hosts error";
handleErrorCode(cpp2::ErrorCode::E_SNAPSHOT_FAILURE);
onFinished();
return;
}

std::vector<kvstore::KV> data;
auto dsRet = Snapshot::instance(kvstore_)->dropSnapshot(snapshot, std::move(peers.value()));
auto peers = peersRet.value();
auto dsRet = Snapshot::instance(kvstore_, client_)->dropSnapshot(snapshot, std::move(peers));
if (dsRet != cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Drop snapshot error on storage engine";
// Need update the snapshot status to invalid, maybe some storage engine drop done.
Expand Down
15 changes: 11 additions & 4 deletions src/meta/processors/admin/DropSnapshotProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,27 @@

#include <gtest/gtest_prod.h>
#include "meta/processors/BaseProcessor.h"
#include "meta/processors/admin/AdminClient.h"

namespace nebula {
namespace meta {

class DropSnapshotProcessor : public BaseProcessor<cpp2::ExecResp> {
public:
static DropSnapshotProcessor* instance(kvstore::KVStore* kvstore) {
return new DropSnapshotProcessor(kvstore);
static DropSnapshotProcessor* instance(kvstore::KVStore* kvstore,
AdminClient* client) {
return new DropSnapshotProcessor(kvstore, client);
}

void process(const cpp2::DropSnapshotReq& req);

private:
explicit DropSnapshotProcessor(kvstore::KVStore* kvstore)
: BaseProcessor<cpp2::ExecResp>(kvstore) {}
explicit DropSnapshotProcessor(kvstore::KVStore* kvstore,
AdminClient* client)
: BaseProcessor<cpp2::ExecResp>(kvstore), client_(client) {}

private:
AdminClient* client_;
};
} // namespace meta
} // namespace nebula
Expand Down
11 changes: 4 additions & 7 deletions src/meta/processors/admin/SnapShot.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ namespace meta {

class Snapshot {
public:
static Snapshot* instance(kvstore::KVStore* kv) {
static std::unique_ptr<AdminClient> client(new AdminClient(kv));
static std::unique_ptr<Snapshot> snapshot(new Snapshot(kv, std::move(client)));
static Snapshot* instance(kvstore::KVStore* kv, AdminClient* client) {
static std::unique_ptr<Snapshot> snapshot(new Snapshot(kv, client));
return snapshot.get();
}

Expand All @@ -37,17 +36,15 @@ class Snapshot {
getLeaderParts(HostLeaderMap *hostLeaderMap, GraphSpaceID spaceId);

private:
Snapshot(kvstore::KVStore* kv, std::unique_ptr<AdminClient> client)
: kv_(kv)
, client_(std::move(client)) {
Snapshot(kvstore::KVStore* kv, AdminClient* client) : kv_(kv), client_(client) {
executor_.reset(new folly::CPUThreadPoolExecutor(1));
}

bool getAllSpaces(std::vector<GraphSpaceID>& spaces, kvstore::ResultCode& retCode);

private:
kvstore::KVStore* kv_{nullptr};
std::unique_ptr<AdminClient> client_{nullptr};
AdminClient* client_{nullptr};
std::unique_ptr<folly::Executor> executor_;
};

Expand Down
12 changes: 2 additions & 10 deletions src/storage/admin/CreateCheckpointProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,10 @@ namespace nebula {
namespace storage {

void CreateCheckpointProcessor::process(const cpp2::CreateCPRequest& req) {
if (FLAGS_store_type != "nebula") {
cpp2::ResultCode thriftRet;
thriftRet.set_code(cpp2::ErrorCode::E_FAILED_TO_CHECKPOINT);
codes_.emplace_back(thriftRet);
onFinished();
return;
}

CHECK_NOTNULL(kvstore_);
auto spaceId = req.get_space_id();
auto& name = req.get_name();
auto* store = dynamic_cast<kvstore::NebulaStore*>(kvstore_);
auto retCode = store->createCheckpoint(spaceId, std::move(name));
auto retCode = kvstore_->createCheckpoint(spaceId, std::move(name));
if (retCode != kvstore::ResultCode::SUCCEEDED) {
cpp2::ResultCode thriftRet;
thriftRet.set_code(to(retCode));
Expand Down
12 changes: 2 additions & 10 deletions src/storage/admin/DropCheckpointProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,10 @@ namespace nebula {
namespace storage {

void DropCheckpointProcessor::process(const cpp2::DropCPRequest& req) {
if (FLAGS_store_type != "nebula") {
cpp2::ResultCode thriftRet;
thriftRet.set_code(cpp2::ErrorCode::E_FAILED_TO_CHECKPOINT);
codes_.emplace_back(thriftRet);
onFinished();
return;
}

CHECK_NOTNULL(kvstore_);
auto spaceId = req.get_space_id();
auto& name = req.get_name();
auto* store = dynamic_cast<kvstore::NebulaStore*>(kvstore_);
auto retCode = store->dropCheckpoint(spaceId, std::move(name));
auto retCode = kvstore_->dropCheckpoint(spaceId, std::move(name));
if (retCode != kvstore::ResultCode::SUCCEEDED) {
cpp2::ResultCode thriftRet;
thriftRet.set_code(to(retCode));
Expand Down