Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove removePrefix interface in kvstore #1953

Merged
merged 2 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class WriteBatch {

virtual ResultCode remove(folly::StringPiece key) = 0;

virtual ResultCode removePrefix(folly::StringPiece prefix) = 0;

// Remove all keys in the range [start, end)
virtual ResultCode removeRange(folly::StringPiece start,
folly::StringPiece end) = 0;
Expand All @@ -44,7 +42,9 @@ class KVEngine {
virtual const char* getDataRoot() const = 0;

virtual std::unique_ptr<WriteBatch> startBatchWrite() = 0;
virtual ResultCode commitBatchWrite(std::unique_ptr<WriteBatch> batch) = 0;

virtual ResultCode commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL = true) = 0;

// Read a single key
virtual ResultCode get(const std::string& key, std::string* value) = 0;
Expand Down Expand Up @@ -84,9 +84,6 @@ class KVEngine {
virtual ResultCode removeRange(const std::string& start,
const std::string& end) = 0;

// Remove rows with the given prefix
virtual ResultCode removePrefix(const std::string& prefix) = 0;

// Add partId into current storage engine.
virtual void addPart(PartitionID partId) = 0;

Expand Down
5 changes: 0 additions & 5 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ class KVStore {
const std::string& end,
KVCallback cb) = 0;

virtual void asyncRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
KVCallback cb) = 0;

virtual void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
Expand Down
9 changes: 0 additions & 9 deletions src/kvstore/LogEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ enum LogType : char {
OP_MULTI_PUT = 0x2,
OP_REMOVE = 0x3,
OP_MULTI_REMOVE = 0x4,
OP_REMOVE_PREFIX = 0x5,
OP_REMOVE_RANGE = 0x6,
OP_ADD_LEARNER = 0x07,
OP_TRANS_LEADER = 0x08,
Expand All @@ -30,7 +29,6 @@ enum BatchLogType : char {
OP_BATCH_PUT = 0x1,
OP_BATCH_REMOVE = 0x2,
OP_BATCH_REMOVE_RANGE = 0x3,
OP_BATCH_REMOVE_PREFIX = 0x4,
};

std::string encodeKV(const folly::StringPiece& key,
Expand Down Expand Up @@ -86,13 +84,6 @@ class BatchHolder {
batch_.emplace_back(std::move(op));
}

void removePrefix(std::string&& key) {
auto op = std::make_tuple(BatchLogType::OP_BATCH_REMOVE_PREFIX,
std::forward<std::string>(key),
"");
batch_.emplace_back(std::move(op));
}

void clear() {
batch_.clear();
}
Expand Down
14 changes: 0 additions & 14 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,20 +520,6 @@ void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId,
part->asyncRemoveRange(start, end, std::move(cb));
}


void NebulaStore::asyncRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
KVCallback cb) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
cb(error(ret));
return;
}
auto part = nebula::value(ret);
part->asyncRemovePrefix(prefix, std::move(cb));
}

void NebulaStore::asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
Expand Down
5 changes: 0 additions & 5 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,6 @@ class NebulaStore : public KVStore, public Handler {
const std::string& end,
KVCallback cb) override;

void asyncRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
KVCallback cb) override;

void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
Expand Down
28 changes: 5 additions & 23 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "kvstore/Part.h"
#include "kvstore/LogEncoder.h"
#include "base/NebulaKeyUtils.h"
#include "kvstore/RocksEngineConfig.h"

DEFINE_int32(cluster_id, 0, "A unique id for each cluster");

Expand Down Expand Up @@ -98,16 +99,6 @@ void Part::asyncMultiRemove(const std::vector<std::string>& keys, KVCallback cb)
}


void Part::asyncRemovePrefix(folly::StringPiece prefix, KVCallback cb) {
std::string log = encodeSingleValue(OP_REMOVE_PREFIX, prefix);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)] (AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
}


void Part::asyncRemoveRange(folly::StringPiece start,
folly::StringPiece end,
KVCallback cb) {
Expand Down Expand Up @@ -249,14 +240,6 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
}
break;
}
case OP_REMOVE_PREFIX: {
auto prefix = decodeSingleValue(log);
if (batch->removePrefix(prefix) != ResultCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removePrefix()";
return false;
}
break;
}
case OP_REMOVE_RANGE: {
auto range = decodeMultiValues(log);
DCHECK_EQ(2, range.size());
Expand All @@ -276,10 +259,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
code = batch->remove(op.second.first);
} else if (op.first == BatchLogType::OP_BATCH_REMOVE_RANGE) {
code = batch->removeRange(op.second.first, op.second.second);
} else if (op.first == BatchLogType::OP_BATCH_REMOVE_PREFIX) {
code = batch->removePrefix(op.second.first);
}

if (code != ResultCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch";
return false;
Expand Down Expand Up @@ -325,7 +305,8 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
return false;
}
}
return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED;
return engine_->commitBatchWrite(std::move(batch), FLAGS_rocksdb_disable_wal)
== ResultCode::SUCCEEDED;
}

std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>& rows,
Expand All @@ -350,7 +331,8 @@ std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>&
return std::make_pair(0, 0);
}
}
if (ResultCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch))) {
// For snapshot, we open the rocksdb's wal to avoid loss data if crash.
if (ResultCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch), false)) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return std::make_pair(0, 0);
}
Expand Down
1 change: 0 additions & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class Part : public raftex::RaftPart {

void asyncRemove(folly::StringPiece key, KVCallback cb);
void asyncMultiRemove(const std::vector<std::string>& keys, KVCallback cb);
void asyncRemovePrefix(folly::StringPiece prefix, KVCallback cb);
void asyncRemoveRange(folly::StringPiece start,
folly::StringPiece end,
KVCallback cb);
Expand Down
61 changes: 4 additions & 57 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ namespace {
class RocksWriteBatch : public WriteBatch {
private:
rocksdb::WriteBatch batch_;
rocksdb::DB* db_{nullptr};

public:
explicit RocksWriteBatch(rocksdb::DB* db) : batch_(FLAGS_rocksdb_batch_size), db_(db) {}
RocksWriteBatch() : batch_(FLAGS_rocksdb_batch_size) {}

virtual ~RocksWriteBatch() = default;

Expand All @@ -50,25 +49,6 @@ class RocksWriteBatch : public WriteBatch {
}
}

ResultCode removePrefix(folly::StringPiece prefix) override {
rocksdb::Slice pre(prefix.begin(), prefix.size());
rocksdb::ReadOptions options;
std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(options));
iter->Seek(pre);
while (iter->Valid()) {
if (iter->key().starts_with(pre)) {
if (!batch_.Delete(iter->key()).ok()) {
return ResultCode::ERR_UNKNOWN;
}
} else {
// Done
break;
}
iter->Next();
}
return ResultCode::SUCCEEDED;
}

// Remove all keys in the range [start, end)
ResultCode removeRange(folly::StringPiece start, folly::StringPiece end) override {
if (batch_.DeleteRange(toSlice(start), toSlice(end)).ok()) {
Expand Down Expand Up @@ -127,13 +107,13 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,


std::unique_ptr<WriteBatch> RocksEngine::startBatchWrite() {
return std::make_unique<RocksWriteBatch>(db_.get());
return std::make_unique<RocksWriteBatch>();
}


ResultCode RocksEngine::commitBatchWrite(std::unique_ptr<WriteBatch> batch) {
ResultCode RocksEngine::commitBatchWrite(std::unique_ptr<WriteBatch> batch, bool disableWAL) {
rocksdb::WriteOptions options;
options.disableWAL = FLAGS_rocksdb_disable_wal;
options.disableWAL = disableWAL;
auto* b = static_cast<RocksWriteBatch*>(batch.get());
rocksdb::Status status = db_->Write(options, b->data());
if (status.ok()) {
Expand Down Expand Up @@ -285,8 +265,6 @@ ResultCode RocksEngine::removeRange(const std::string& start,
const std::string& end) {
rocksdb::WriteOptions options;
options.disableWAL = FLAGS_rocksdb_disable_wal;
// TODO(sye) Given the RocksDB version we are using,
// we should avoud using DeleteRange
auto status = db_->DeleteRange(options, db_->DefaultColumnFamily(), start, end);
if (status.ok()) {
return ResultCode::SUCCEEDED;
Expand All @@ -296,41 +274,10 @@ ResultCode RocksEngine::removeRange(const std::string& start,
}
}


ResultCode RocksEngine::removePrefix(const std::string& prefix) {
rocksdb::Slice pre(prefix.data(), prefix.size());
rocksdb::ReadOptions readOptions;
rocksdb::WriteBatch batch;
std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(readOptions));
iter->Seek(pre);
while (iter->Valid()) {
if (iter->key().starts_with(pre)) {
auto status = batch.Delete(iter->key());
if (!status.ok()) {
return ResultCode::ERR_UNKNOWN;
}
} else {
// Done
break;
}
iter->Next();
}

rocksdb::WriteOptions writeOptions;
writeOptions.disableWAL = FLAGS_rocksdb_disable_wal;
if (db_->Write(writeOptions, &batch).ok()) {
return ResultCode::SUCCEEDED;
} else {
return ResultCode::ERR_UNKNOWN;
}
}


std::string RocksEngine::partKey(PartitionID partId) {
return NebulaKeyUtils::systemPartKey(partId);
}


void RocksEngine::addPart(PartitionID partId) {
auto ret = put(partKey(partId), "");
if (ret == ResultCode::SUCCEEDED) {
Expand Down
6 changes: 3 additions & 3 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ class RocksEngine : public KVEngine {
}

std::unique_ptr<WriteBatch> startBatchWrite() override;
ResultCode commitBatchWrite(std::unique_ptr<WriteBatch> batch) override;

ResultCode commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL) override;

/*********************
* Data retrieval
Expand Down Expand Up @@ -143,8 +145,6 @@ class RocksEngine : public KVEngine {
ResultCode removeRange(const std::string& start,
const std::string& end) override;

ResultCode removePrefix(const std::string& prefix) override;

/*********************
* Non-data operation
********************/
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/plugins/hbase/HBaseStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class HBaseStore : public KVStore {
void asyncRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
KVCallback cb) override;
KVCallback cb);

void asyncAtomicOp(GraphSpaceID,
PartitionID,
Expand Down
2 changes: 0 additions & 2 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ class BaseProcessor {
void doRemoveRange(GraphSpaceID spaceId, PartitionID partId, std::string start,
std::string end);

void doRemovePrefix(GraphSpaceID spaceId, PartitionID partId, std::string prefix);

kvstore::ResultCode doRange(GraphSpaceID spaceId, PartitionID partId, std::string start,
std::string end, std::unique_ptr<kvstore::KVIterator>* iter);

Expand Down
10 changes: 0 additions & 10 deletions src/storage/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,6 @@ void BaseProcessor<RESP>::doRemoveRange(GraphSpaceID spaceId,
});
}

template <typename RESP>
void BaseProcessor<RESP>::doRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
std::string prefix) {
this->kvstore_->asyncRemovePrefix(
spaceId, partId, prefix, [spaceId, partId, this](kvstore::ResultCode code) {
handleAsync(spaceId, partId, code);
});
}

template<typename RESP>
kvstore::ResultCode BaseProcessor<RESP>::doRange(GraphSpaceID spaceId,
PartitionID partId,
Expand Down
36 changes: 19 additions & 17 deletions src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,29 +30,31 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
const auto& vertices = pv->second;
for (auto v = vertices.begin(); v != vertices.end(); v++) {
auto prefix = NebulaKeyUtils::vertexPrefix(part, *v);
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = this->kvstore_->prefix(spaceId, part, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
VLOG(3) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceID " << spaceId;
this->onFinished();
return;
}

// Evict vertices from cache
if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) {
std::unique_ptr<kvstore::KVIterator> iter;
auto ret = this->kvstore_->prefix(spaceId, part, prefix, &iter);
if (ret != kvstore::ResultCode::SUCCEEDED) {
VLOG(3) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceID " << spaceId;
this->onFinished();
return;
}

while (iter->valid()) {
auto key = iter->key();
if (NebulaKeyUtils::isVertex(key)) {
auto tag = NebulaKeyUtils::getTagId(key);
std::vector<std::string> keys;
keys.reserve(32);
while (iter->valid()) {
auto key = iter->key();
if (NebulaKeyUtils::isVertex(key)) {
auto tag = NebulaKeyUtils::getTagId(key);
// Evict vertices from cache
if (FLAGS_enable_vertex_cache && vertexCache_ != nullptr) {
VLOG(3) << "Evict vertex cache for VID " << *v << ", TagID " << tag;
vertexCache_->evict(std::make_pair(*v, tag), part);
}
iter->next();
keys.emplace_back(key.str());
}
iter->next();
}
doRemovePrefix(spaceId, part, std::move(prefix));
doRemove(spaceId, part, std::move(keys));
}
}
} else {
Expand Down