diff --git a/src/meta/MetaServiceHandler.cpp b/src/meta/MetaServiceHandler.cpp index 13293c16709..b4801650e7f 100644 --- a/src/meta/MetaServiceHandler.cpp +++ b/src/meta/MetaServiceHandler.cpp @@ -376,13 +376,13 @@ MetaServiceHandler::future_listConfigs(const cpp2::ListConfigsReq &req) { folly::Future MetaServiceHandler::future_createSnapshot(const cpp2::CreateSnapshotReq& req) { - auto* processor = CreateSnapshotProcessor::instance(kvstore_); + auto* processor = CreateSnapshotProcessor::instance(kvstore_, adminClient_.get()); RETURN_FUTURE(processor); } folly::Future MetaServiceHandler::future_dropSnapshot(const cpp2::DropSnapshotReq& req) { - auto* processor = DropSnapshotProcessor::instance(kvstore_); + auto* processor = DropSnapshotProcessor::instance(kvstore_, adminClient_.get()); RETURN_FUTURE(processor); } diff --git a/src/meta/processors/admin/CreateSnapshotProcessor.cpp b/src/meta/processors/admin/CreateSnapshotProcessor.cpp index fd4504b39a2..7212b9e6019 100644 --- a/src/meta/processors/admin/CreateSnapshotProcessor.cpp +++ b/src/meta/processors/admin/CreateSnapshotProcessor.cpp @@ -40,7 +40,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { } // step 2 : Blocking all writes action for storage engines. - auto signRet = Snapshot::instance(kvstore_)->blockingWrites(SignType::BLOCK_ON); + auto signRet = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_ON); if (signRet != cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Send blocking sign to storage engine error"; handleErrorCode(signRet); @@ -50,7 +50,7 @@ void CreateSnapshotProcessor::process(const cpp2::CreateSnapshotReq&) { } // step 3 : Create checkpoint for all storage engines and meta engine. - auto csRet = Snapshot::instance(kvstore_)->createSnapshot(snapshot); + auto csRet = Snapshot::instance(kvstore_, client_)->createSnapshot(snapshot); if (csRet != cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Checkpoint create error on storage engine"; handleErrorCode(csRet); @@ -101,7 +101,7 @@ std::string CreateSnapshotProcessor::genSnapshotName() { } cpp2::ErrorCode CreateSnapshotProcessor::cancelWriteBlocking() { - auto signRet = Snapshot::instance(kvstore_)->blockingWrites(SignType::BLOCK_OFF); + auto signRet = Snapshot::instance(kvstore_, client_)->blockingWrites(SignType::BLOCK_OFF); if (signRet != cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Cancel write blocking error"; return signRet; diff --git a/src/meta/processors/admin/CreateSnapshotProcessor.h b/src/meta/processors/admin/CreateSnapshotProcessor.h index e9b2f7bd8eb..9389e8a2d1a 100644 --- a/src/meta/processors/admin/CreateSnapshotProcessor.h +++ b/src/meta/processors/admin/CreateSnapshotProcessor.h @@ -9,24 +9,30 @@ #include #include "meta/processors/BaseProcessor.h" +#include "meta/processors/admin/AdminClient.h" namespace nebula { namespace meta { class CreateSnapshotProcessor : public BaseProcessor { public: - static CreateSnapshotProcessor* instance(kvstore::KVStore* kvstore) { - return new CreateSnapshotProcessor(kvstore); + static CreateSnapshotProcessor* instance(kvstore::KVStore* kvstore, + AdminClient* client) { + return new CreateSnapshotProcessor(kvstore, client); } void process(const cpp2::CreateSnapshotReq& req); cpp2::ErrorCode cancelWriteBlocking(); private: - explicit CreateSnapshotProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} + explicit CreateSnapshotProcessor(kvstore::KVStore* kvstore, + AdminClient* client) + : BaseProcessor(kvstore), client_(client) {} std::string genSnapshotName(); + +private: + AdminClient* client_; }; } // namespace meta } // namespace nebula diff --git a/src/meta/processors/admin/DropSnapshotProcessor.cpp b/src/meta/processors/admin/DropSnapshotProcessor.cpp index b0c210620d2..4b4d3cbb8b6 100644 --- a/src/meta/processors/admin/DropSnapshotProcessor.cpp +++ b/src/meta/processors/admin/DropSnapshotProcessor.cpp @@ -30,8 +30,8 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { } auto hosts = MetaServiceUtils::parseSnapshotHosts(val); - auto peers = NetworkUtils::toHosts(hosts); - if (!peers.ok()) { + auto peersRet = NetworkUtils::toHosts(hosts); + if (!peersRet.ok()) { LOG(ERROR) << "Get checkpoint hosts error"; handleErrorCode(cpp2::ErrorCode::E_SNAPSHOT_FAILURE); onFinished(); @@ -39,7 +39,8 @@ void DropSnapshotProcessor::process(const cpp2::DropSnapshotReq& req) { } std::vector data; - auto dsRet = Snapshot::instance(kvstore_)->dropSnapshot(snapshot, std::move(peers.value())); + auto peers = peersRet.value(); + auto dsRet = Snapshot::instance(kvstore_, client_)->dropSnapshot(snapshot, std::move(peers)); if (dsRet != cpp2::ErrorCode::SUCCEEDED) { LOG(ERROR) << "Drop snapshot error on storage engine"; // Need update the snapshot status to invalid, maybe some storage engine drop done. diff --git a/src/meta/processors/admin/DropSnapshotProcessor.h b/src/meta/processors/admin/DropSnapshotProcessor.h index ae001f7c3f5..d20d95caa12 100644 --- a/src/meta/processors/admin/DropSnapshotProcessor.h +++ b/src/meta/processors/admin/DropSnapshotProcessor.h @@ -9,20 +9,27 @@ #include #include "meta/processors/BaseProcessor.h" +#include "meta/processors/admin/AdminClient.h" namespace nebula { namespace meta { class DropSnapshotProcessor : public BaseProcessor { public: - static DropSnapshotProcessor* instance(kvstore::KVStore* kvstore) { - return new DropSnapshotProcessor(kvstore); + static DropSnapshotProcessor* instance(kvstore::KVStore* kvstore, + AdminClient* client) { + return new DropSnapshotProcessor(kvstore, client); } + void process(const cpp2::DropSnapshotReq& req); private: - explicit DropSnapshotProcessor(kvstore::KVStore* kvstore) - : BaseProcessor(kvstore) {} + explicit DropSnapshotProcessor(kvstore::KVStore* kvstore, + AdminClient* client) + : BaseProcessor(kvstore), client_(client) {} + +private: + AdminClient* client_; }; } // namespace meta } // namespace nebula diff --git a/src/meta/processors/admin/SnapShot.h b/src/meta/processors/admin/SnapShot.h index fa6b14df555..b2f248550e3 100644 --- a/src/meta/processors/admin/SnapShot.h +++ b/src/meta/processors/admin/SnapShot.h @@ -19,9 +19,8 @@ namespace meta { class Snapshot { public: - static Snapshot* instance(kvstore::KVStore* kv) { - static std::unique_ptr client(new AdminClient(kv)); - static std::unique_ptr snapshot(new Snapshot(kv, std::move(client))); + static Snapshot* instance(kvstore::KVStore* kv, AdminClient* client) { + static std::unique_ptr snapshot(new Snapshot(kv, client)); return snapshot.get(); } @@ -37,9 +36,7 @@ class Snapshot { getLeaderParts(HostLeaderMap *hostLeaderMap, GraphSpaceID spaceId); private: - Snapshot(kvstore::KVStore* kv, std::unique_ptr client) - : kv_(kv) - , client_(std::move(client)) { + Snapshot(kvstore::KVStore* kv, AdminClient* client) : kv_(kv), client_(client) { executor_.reset(new folly::CPUThreadPoolExecutor(1)); } @@ -47,7 +44,7 @@ class Snapshot { private: kvstore::KVStore* kv_{nullptr}; - std::unique_ptr client_{nullptr}; + AdminClient* client_{nullptr}; std::unique_ptr executor_; }; diff --git a/src/storage/admin/CreateCheckpointProcessor.cpp b/src/storage/admin/CreateCheckpointProcessor.cpp index 816d7c9e375..6dadc431333 100644 --- a/src/storage/admin/CreateCheckpointProcessor.cpp +++ b/src/storage/admin/CreateCheckpointProcessor.cpp @@ -10,18 +10,10 @@ namespace nebula { namespace storage { void CreateCheckpointProcessor::process(const cpp2::CreateCPRequest& req) { - if (FLAGS_store_type != "nebula") { - cpp2::ResultCode thriftRet; - thriftRet.set_code(cpp2::ErrorCode::E_FAILED_TO_CHECKPOINT); - codes_.emplace_back(thriftRet); - onFinished(); - return; - } - + CHECK_NOTNULL(kvstore_); auto spaceId = req.get_space_id(); auto& name = req.get_name(); - auto* store = dynamic_cast(kvstore_); - auto retCode = store->createCheckpoint(spaceId, std::move(name)); + auto retCode = kvstore_->createCheckpoint(spaceId, std::move(name)); if (retCode != kvstore::ResultCode::SUCCEEDED) { cpp2::ResultCode thriftRet; thriftRet.set_code(to(retCode)); diff --git a/src/storage/admin/DropCheckpointProcessor.cpp b/src/storage/admin/DropCheckpointProcessor.cpp index 5c7f588dfb7..54633c38739 100644 --- a/src/storage/admin/DropCheckpointProcessor.cpp +++ b/src/storage/admin/DropCheckpointProcessor.cpp @@ -10,18 +10,10 @@ namespace nebula { namespace storage { void DropCheckpointProcessor::process(const cpp2::DropCPRequest& req) { - if (FLAGS_store_type != "nebula") { - cpp2::ResultCode thriftRet; - thriftRet.set_code(cpp2::ErrorCode::E_FAILED_TO_CHECKPOINT); - codes_.emplace_back(thriftRet); - onFinished(); - return; - } - + CHECK_NOTNULL(kvstore_); auto spaceId = req.get_space_id(); auto& name = req.get_name(); - auto* store = dynamic_cast(kvstore_); - auto retCode = store->dropCheckpoint(spaceId, std::move(name)); + auto retCode = kvstore_->dropCheckpoint(spaceId, std::move(name)); if (retCode != kvstore::ResultCode::SUCCEEDED) { cpp2::ResultCode thriftRet; thriftRet.set_code(to(retCode));