diff --git a/src/kvstore/KVEngine.h b/src/kvstore/KVEngine.h index 578c249ffd4..52cc0c13b43 100644 --- a/src/kvstore/KVEngine.h +++ b/src/kvstore/KVEngine.h @@ -23,8 +23,6 @@ class WriteBatch { virtual ResultCode remove(folly::StringPiece key) = 0; - virtual ResultCode removePrefix(folly::StringPiece prefix) = 0; - // Remove all keys in the range [start, end) virtual ResultCode removeRange(folly::StringPiece start, folly::StringPiece end) = 0; @@ -44,7 +42,9 @@ class KVEngine { virtual const char* getDataRoot() const = 0; virtual std::unique_ptr startBatchWrite() = 0; - virtual ResultCode commitBatchWrite(std::unique_ptr batch) = 0; + + virtual ResultCode commitBatchWrite(std::unique_ptr batch, + bool disableWAL = true) = 0; // Read a single key virtual ResultCode get(const std::string& key, std::string* value) = 0; @@ -84,9 +84,6 @@ class KVEngine { virtual ResultCode removeRange(const std::string& start, const std::string& end) = 0; - // Remove rows with the given prefix - virtual ResultCode removePrefix(const std::string& prefix) = 0; - // Add partId into current storage engine. virtual void addPart(PartitionID partId) = 0; diff --git a/src/kvstore/KVStore.h b/src/kvstore/KVStore.h index be06b0dbab5..6c7694e69de 100644 --- a/src/kvstore/KVStore.h +++ b/src/kvstore/KVStore.h @@ -153,11 +153,6 @@ class KVStore { const std::string& end, KVCallback cb) = 0; - virtual void asyncRemovePrefix(GraphSpaceID spaceId, - PartitionID partId, - const std::string& prefix, - KVCallback cb) = 0; - virtual void asyncAtomicOp(GraphSpaceID spaceId, PartitionID partId, raftex::AtomicOp op, diff --git a/src/kvstore/LogEncoder.h b/src/kvstore/LogEncoder.h index 3c42d143386..7c0e646d34b 100644 --- a/src/kvstore/LogEncoder.h +++ b/src/kvstore/LogEncoder.h @@ -17,7 +17,6 @@ enum LogType : char { OP_MULTI_PUT = 0x2, OP_REMOVE = 0x3, OP_MULTI_REMOVE = 0x4, - OP_REMOVE_PREFIX = 0x5, OP_REMOVE_RANGE = 0x6, OP_ADD_LEARNER = 0x07, OP_TRANS_LEADER = 0x08, @@ -30,7 +29,6 @@ enum BatchLogType : char { OP_BATCH_PUT = 0x1, OP_BATCH_REMOVE = 0x2, OP_BATCH_REMOVE_RANGE = 0x3, - OP_BATCH_REMOVE_PREFIX = 0x4, }; std::string encodeKV(const folly::StringPiece& key, @@ -86,13 +84,6 @@ class BatchHolder { batch_.emplace_back(std::move(op)); } - void removePrefix(std::string&& key) { - auto op = std::make_tuple(BatchLogType::OP_BATCH_REMOVE_PREFIX, - std::forward(key), - ""); - batch_.emplace_back(std::move(op)); - } - void clear() { batch_.clear(); } diff --git a/src/kvstore/NebulaStore.cpp b/src/kvstore/NebulaStore.cpp index 82bc6c5ab44..b610b769caf 100644 --- a/src/kvstore/NebulaStore.cpp +++ b/src/kvstore/NebulaStore.cpp @@ -520,20 +520,6 @@ void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId, part->asyncRemoveRange(start, end, std::move(cb)); } - -void NebulaStore::asyncRemovePrefix(GraphSpaceID spaceId, - PartitionID partId, - const std::string& prefix, - KVCallback cb) { - auto ret = part(spaceId, partId); - if (!ok(ret)) { - cb(error(ret)); - return; - } - auto part = nebula::value(ret); - part->asyncRemovePrefix(prefix, std::move(cb)); -} - void NebulaStore::asyncAtomicOp(GraphSpaceID spaceId, PartitionID partId, raftex::AtomicOp op, diff --git a/src/kvstore/NebulaStore.h b/src/kvstore/NebulaStore.h index 03c3ace396e..e879b5b4bd7 100644 --- a/src/kvstore/NebulaStore.h +++ b/src/kvstore/NebulaStore.h @@ -171,11 +171,6 @@ class NebulaStore : public KVStore, public Handler { const std::string& end, KVCallback cb) override; - void asyncRemovePrefix(GraphSpaceID spaceId, - PartitionID partId, - const std::string& prefix, - KVCallback cb) override; - void asyncAtomicOp(GraphSpaceID spaceId, PartitionID partId, raftex::AtomicOp op, diff --git a/src/kvstore/Part.cpp b/src/kvstore/Part.cpp index 56d693cf28b..215b2ddd032 100644 --- a/src/kvstore/Part.cpp +++ b/src/kvstore/Part.cpp @@ -7,6 +7,7 @@ #include "kvstore/Part.h" #include "kvstore/LogEncoder.h" #include "base/NebulaKeyUtils.h" +#include "kvstore/RocksEngineConfig.h" DEFINE_int32(cluster_id, 0, "A unique id for each cluster"); @@ -98,16 +99,6 @@ void Part::asyncMultiRemove(const std::vector& keys, KVCallback cb) } -void Part::asyncRemovePrefix(folly::StringPiece prefix, KVCallback cb) { - std::string log = encodeSingleValue(OP_REMOVE_PREFIX, prefix); - - appendAsync(FLAGS_cluster_id, std::move(log)) - .thenValue([this, callback = std::move(cb)] (AppendLogResult res) mutable { - callback(this->toResultCode(res)); - }); -} - - void Part::asyncRemoveRange(folly::StringPiece start, folly::StringPiece end, KVCallback cb) { @@ -249,14 +240,6 @@ bool Part::commitLogs(std::unique_ptr iter) { } break; } - case OP_REMOVE_PREFIX: { - auto prefix = decodeSingleValue(log); - if (batch->removePrefix(prefix) != ResultCode::SUCCEEDED) { - LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removePrefix()"; - return false; - } - break; - } case OP_REMOVE_RANGE: { auto range = decodeMultiValues(log); DCHECK_EQ(2, range.size()); @@ -276,10 +259,7 @@ bool Part::commitLogs(std::unique_ptr iter) { code = batch->remove(op.second.first); } else if (op.first == BatchLogType::OP_BATCH_REMOVE_RANGE) { code = batch->removeRange(op.second.first, op.second.second); - } else if (op.first == BatchLogType::OP_BATCH_REMOVE_PREFIX) { - code = batch->removePrefix(op.second.first); } - if (code != ResultCode::SUCCEEDED) { LOG(ERROR) << idStr_ << "Failed to call WriteBatch"; return false; @@ -325,7 +305,8 @@ bool Part::commitLogs(std::unique_ptr iter) { return false; } } - return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED; + return engine_->commitBatchWrite(std::move(batch), FLAGS_rocksdb_disable_wal) + == ResultCode::SUCCEEDED; } std::pair Part::commitSnapshot(const std::vector& rows, @@ -350,7 +331,8 @@ std::pair Part::commitSnapshot(const std::vector& return std::make_pair(0, 0); } } - if (ResultCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch))) { + // For snapshot, we open the rocksdb's wal to avoid loss data if crash. + if (ResultCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch), false)) { LOG(ERROR) << idStr_ << "Put failed in commit"; return std::make_pair(0, 0); } diff --git a/src/kvstore/Part.h b/src/kvstore/Part.h index d4b1dca47dd..90cd98f0984 100644 --- a/src/kvstore/Part.h +++ b/src/kvstore/Part.h @@ -45,7 +45,6 @@ class Part : public raftex::RaftPart { void asyncRemove(folly::StringPiece key, KVCallback cb); void asyncMultiRemove(const std::vector& keys, KVCallback cb); - void asyncRemovePrefix(folly::StringPiece prefix, KVCallback cb); void asyncRemoveRange(folly::StringPiece start, folly::StringPiece end, KVCallback cb); diff --git a/src/kvstore/RocksEngine.cpp b/src/kvstore/RocksEngine.cpp index db15060c043..6c954c04658 100644 --- a/src/kvstore/RocksEngine.cpp +++ b/src/kvstore/RocksEngine.cpp @@ -27,10 +27,9 @@ namespace { class RocksWriteBatch : public WriteBatch { private: rocksdb::WriteBatch batch_; - rocksdb::DB* db_{nullptr}; public: - explicit RocksWriteBatch(rocksdb::DB* db) : batch_(FLAGS_rocksdb_batch_size), db_(db) {} + RocksWriteBatch() : batch_(FLAGS_rocksdb_batch_size) {} virtual ~RocksWriteBatch() = default; @@ -50,25 +49,6 @@ class RocksWriteBatch : public WriteBatch { } } - ResultCode removePrefix(folly::StringPiece prefix) override { - rocksdb::Slice pre(prefix.begin(), prefix.size()); - rocksdb::ReadOptions options; - std::unique_ptr iter(db_->NewIterator(options)); - iter->Seek(pre); - while (iter->Valid()) { - if (iter->key().starts_with(pre)) { - if (!batch_.Delete(iter->key()).ok()) { - return ResultCode::ERR_UNKNOWN; - } - } else { - // Done - break; - } - iter->Next(); - } - return ResultCode::SUCCEEDED; - } - // Remove all keys in the range [start, end) ResultCode removeRange(folly::StringPiece start, folly::StringPiece end) override { if (batch_.DeleteRange(toSlice(start), toSlice(end)).ok()) { @@ -127,13 +107,13 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId, std::unique_ptr RocksEngine::startBatchWrite() { - return std::make_unique(db_.get()); + return std::make_unique(); } -ResultCode RocksEngine::commitBatchWrite(std::unique_ptr batch) { +ResultCode RocksEngine::commitBatchWrite(std::unique_ptr batch, bool disableWAL) { rocksdb::WriteOptions options; - options.disableWAL = FLAGS_rocksdb_disable_wal; + options.disableWAL = disableWAL; auto* b = static_cast(batch.get()); rocksdb::Status status = db_->Write(options, b->data()); if (status.ok()) { @@ -285,8 +265,6 @@ ResultCode RocksEngine::removeRange(const std::string& start, const std::string& end) { rocksdb::WriteOptions options; options.disableWAL = FLAGS_rocksdb_disable_wal; - // TODO(sye) Given the RocksDB version we are using, - // we should avoud using DeleteRange auto status = db_->DeleteRange(options, db_->DefaultColumnFamily(), start, end); if (status.ok()) { return ResultCode::SUCCEEDED; @@ -296,41 +274,10 @@ ResultCode RocksEngine::removeRange(const std::string& start, } } - -ResultCode RocksEngine::removePrefix(const std::string& prefix) { - rocksdb::Slice pre(prefix.data(), prefix.size()); - rocksdb::ReadOptions readOptions; - rocksdb::WriteBatch batch; - std::unique_ptr iter(db_->NewIterator(readOptions)); - iter->Seek(pre); - while (iter->Valid()) { - if (iter->key().starts_with(pre)) { - auto status = batch.Delete(iter->key()); - if (!status.ok()) { - return ResultCode::ERR_UNKNOWN; - } - } else { - // Done - break; - } - iter->Next(); - } - - rocksdb::WriteOptions writeOptions; - writeOptions.disableWAL = FLAGS_rocksdb_disable_wal; - if (db_->Write(writeOptions, &batch).ok()) { - return ResultCode::SUCCEEDED; - } else { - return ResultCode::ERR_UNKNOWN; - } -} - - std::string RocksEngine::partKey(PartitionID partId) { return NebulaKeyUtils::systemPartKey(partId); } - void RocksEngine::addPart(PartitionID partId) { auto ret = put(partKey(partId), ""); if (ret == ResultCode::SUCCEEDED) { diff --git a/src/kvstore/RocksEngine.h b/src/kvstore/RocksEngine.h index 11d1a3275da..0026f10a5b6 100644 --- a/src/kvstore/RocksEngine.h +++ b/src/kvstore/RocksEngine.h @@ -108,7 +108,9 @@ class RocksEngine : public KVEngine { } std::unique_ptr startBatchWrite() override; - ResultCode commitBatchWrite(std::unique_ptr batch) override; + + ResultCode commitBatchWrite(std::unique_ptr batch, + bool disableWAL) override; /********************* * Data retrieval @@ -143,8 +145,6 @@ class RocksEngine : public KVEngine { ResultCode removeRange(const std::string& start, const std::string& end) override; - ResultCode removePrefix(const std::string& prefix) override; - /********************* * Non-data operation ********************/ diff --git a/src/kvstore/plugins/hbase/HBaseStore.h b/src/kvstore/plugins/hbase/HBaseStore.h index c55e2f01fb2..7cf5800c8c5 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.h +++ b/src/kvstore/plugins/hbase/HBaseStore.h @@ -156,7 +156,7 @@ class HBaseStore : public KVStore { void asyncRemovePrefix(GraphSpaceID spaceId, PartitionID partId, const std::string& prefix, - KVCallback cb) override; + KVCallback cb); void asyncAtomicOp(GraphSpaceID, PartitionID, diff --git a/src/storage/BaseProcessor.h b/src/storage/BaseProcessor.h index 0b4c446bb96..52129da45be 100644 --- a/src/storage/BaseProcessor.h +++ b/src/storage/BaseProcessor.h @@ -68,8 +68,6 @@ class BaseProcessor { void doRemoveRange(GraphSpaceID spaceId, PartitionID partId, std::string start, std::string end); - void doRemovePrefix(GraphSpaceID spaceId, PartitionID partId, std::string prefix); - kvstore::ResultCode doRange(GraphSpaceID spaceId, PartitionID partId, std::string start, std::string end, std::unique_ptr* iter); diff --git a/src/storage/BaseProcessor.inl b/src/storage/BaseProcessor.inl index 555ba4f28cf..8fbf8fc2a4e 100644 --- a/src/storage/BaseProcessor.inl +++ b/src/storage/BaseProcessor.inl @@ -104,16 +104,6 @@ void BaseProcessor::doRemoveRange(GraphSpaceID spaceId, }); } -template -void BaseProcessor::doRemovePrefix(GraphSpaceID spaceId, - PartitionID partId, - std::string prefix) { - this->kvstore_->asyncRemovePrefix( - spaceId, partId, prefix, [spaceId, partId, this](kvstore::ResultCode code) { - handleAsync(spaceId, partId, code); - }); -} - template kvstore::ResultCode BaseProcessor::doRange(GraphSpaceID spaceId, PartitionID partId, diff --git a/src/storage/mutate/DeleteVerticesProcessor.cpp b/src/storage/mutate/DeleteVerticesProcessor.cpp index 590742ad709..39b51dcdf78 100644 --- a/src/storage/mutate/DeleteVerticesProcessor.cpp +++ b/src/storage/mutate/DeleteVerticesProcessor.cpp @@ -30,29 +30,31 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) { const auto& vertices = pv->second; for (auto v = vertices.begin(); v != vertices.end(); v++) { auto prefix = NebulaKeyUtils::vertexPrefix(part, *v); + std::unique_ptr iter; + auto ret = this->kvstore_->prefix(spaceId, part, prefix, &iter); + if (ret != kvstore::ResultCode::SUCCEEDED) { + VLOG(3) << "Error! ret = " << static_cast(ret) + << ", spaceID " << spaceId; + this->onFinished(); + return; + } - // Evict vertices from cache - if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) { - std::unique_ptr iter; - auto ret = this->kvstore_->prefix(spaceId, part, prefix, &iter); - if (ret != kvstore::ResultCode::SUCCEEDED) { - VLOG(3) << "Error! ret = " << static_cast(ret) - << ", spaceID " << spaceId; - this->onFinished(); - return; - } - - while (iter->valid()) { - auto key = iter->key(); - if (NebulaKeyUtils::isVertex(key)) { - auto tag = NebulaKeyUtils::getTagId(key); + std::vector keys; + keys.reserve(32); + while (iter->valid()) { + auto key = iter->key(); + if (NebulaKeyUtils::isVertex(key)) { + auto tag = NebulaKeyUtils::getTagId(key); + // Evict vertices from cache + if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) { VLOG(3) << "Evict vertex cache for VID " << *v << ", TagID " << tag; vertexCache_->evict(std::make_pair(*v, tag), part); } - iter->next(); + keys.emplace_back(key.str()); } + iter->next(); } - doRemovePrefix(spaceId, part, std::move(prefix)); + doRemove(spaceId, part, std::move(keys)); } } } else {