Skip to content

Commit

Permalink
Remove removePrefix interface in kvstore
Browse files Browse the repository at this point in the history
  • Loading branch information
heng committed Mar 23, 2020
1 parent 806459e commit 07717e4
Show file tree
Hide file tree
Showing 13 changed files with 37 additions and 153 deletions.
9 changes: 3 additions & 6 deletions src/kvstore/KVEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ class WriteBatch {

virtual ResultCode remove(folly::StringPiece key) = 0;

virtual ResultCode removePrefix(folly::StringPiece prefix) = 0;

// Remove all keys in the range [start, end)
virtual ResultCode removeRange(folly::StringPiece start,
folly::StringPiece end) = 0;
Expand All @@ -44,7 +42,9 @@ class KVEngine {
virtual const char* getDataRoot() const = 0;

virtual std::unique_ptr<WriteBatch> startBatchWrite() = 0;
virtual ResultCode commitBatchWrite(std::unique_ptr<WriteBatch> batch) = 0;

virtual ResultCode commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL = true) = 0;

// Read a single key
virtual ResultCode get(const std::string& key, std::string* value) = 0;
Expand Down Expand Up @@ -84,9 +84,6 @@ class KVEngine {
virtual ResultCode removeRange(const std::string& start,
const std::string& end) = 0;

// Remove rows with the given prefix
virtual ResultCode removePrefix(const std::string& prefix) = 0;

// Add partId into current storage engine.
virtual void addPart(PartitionID partId) = 0;

Expand Down
5 changes: 0 additions & 5 deletions src/kvstore/KVStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ class KVStore {
const std::string& end,
KVCallback cb) = 0;

virtual void asyncRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
KVCallback cb) = 0;

virtual void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
Expand Down
9 changes: 0 additions & 9 deletions src/kvstore/LogEncoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ enum LogType : char {
OP_MULTI_PUT = 0x2,
OP_REMOVE = 0x3,
OP_MULTI_REMOVE = 0x4,
OP_REMOVE_PREFIX = 0x5,
OP_REMOVE_RANGE = 0x6,
OP_ADD_LEARNER = 0x07,
OP_TRANS_LEADER = 0x08,
Expand All @@ -30,7 +29,6 @@ enum BatchLogType : char {
OP_BATCH_PUT = 0x1,
OP_BATCH_REMOVE = 0x2,
OP_BATCH_REMOVE_RANGE = 0x3,
OP_BATCH_REMOVE_PREFIX = 0x4,
};

std::string encodeKV(const folly::StringPiece& key,
Expand Down Expand Up @@ -86,13 +84,6 @@ class BatchHolder {
batch_.emplace_back(std::move(op));
}

void removePrefix(std::string&& key) {
auto op = std::make_tuple(BatchLogType::OP_BATCH_REMOVE_PREFIX,
std::forward<std::string>(key),
"");
batch_.emplace_back(std::move(op));
}

void clear() {
batch_.clear();
}
Expand Down
14 changes: 0 additions & 14 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,20 +520,6 @@ void NebulaStore::asyncRemoveRange(GraphSpaceID spaceId,
part->asyncRemoveRange(start, end, std::move(cb));
}


void NebulaStore::asyncRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
KVCallback cb) {
auto ret = part(spaceId, partId);
if (!ok(ret)) {
cb(error(ret));
return;
}
auto part = nebula::value(ret);
part->asyncRemovePrefix(prefix, std::move(cb));
}

void NebulaStore::asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
Expand Down
5 changes: 0 additions & 5 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,6 @@ class NebulaStore : public KVStore, public Handler {
const std::string& end,
KVCallback cb) override;

void asyncRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
KVCallback cb) override;

void asyncAtomicOp(GraphSpaceID spaceId,
PartitionID partId,
raftex::AtomicOp op,
Expand Down
28 changes: 5 additions & 23 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "kvstore/Part.h"
#include "kvstore/LogEncoder.h"
#include "base/NebulaKeyUtils.h"
#include "kvstore/RocksEngineConfig.h"

DEFINE_int32(cluster_id, 0, "A unique id for each cluster");

Expand Down Expand Up @@ -98,16 +99,6 @@ void Part::asyncMultiRemove(const std::vector<std::string>& keys, KVCallback cb)
}


void Part::asyncRemovePrefix(folly::StringPiece prefix, KVCallback cb) {
std::string log = encodeSingleValue(OP_REMOVE_PREFIX, prefix);

appendAsync(FLAGS_cluster_id, std::move(log))
.thenValue([this, callback = std::move(cb)] (AppendLogResult res) mutable {
callback(this->toResultCode(res));
});
}


void Part::asyncRemoveRange(folly::StringPiece start,
folly::StringPiece end,
KVCallback cb) {
Expand Down Expand Up @@ -249,14 +240,6 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
}
break;
}
case OP_REMOVE_PREFIX: {
auto prefix = decodeSingleValue(log);
if (batch->removePrefix(prefix) != ResultCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::removePrefix()";
return false;
}
break;
}
case OP_REMOVE_RANGE: {
auto range = decodeMultiValues(log);
DCHECK_EQ(2, range.size());
Expand All @@ -276,10 +259,7 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
code = batch->remove(op.second.first);
} else if (op.first == BatchLogType::OP_BATCH_REMOVE_RANGE) {
code = batch->removeRange(op.second.first, op.second.second);
} else if (op.first == BatchLogType::OP_BATCH_REMOVE_PREFIX) {
code = batch->removePrefix(op.second.first);
}

if (code != ResultCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch";
return false;
Expand Down Expand Up @@ -325,7 +305,8 @@ bool Part::commitLogs(std::unique_ptr<LogIterator> iter) {
return false;
}
}
return engine_->commitBatchWrite(std::move(batch)) == ResultCode::SUCCEEDED;
return engine_->commitBatchWrite(std::move(batch), FLAGS_rocksdb_disable_wal)
== ResultCode::SUCCEEDED;
}

std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>& rows,
Expand All @@ -350,7 +331,8 @@ std::pair<int64_t, int64_t> Part::commitSnapshot(const std::vector<std::string>&
return std::make_pair(0, 0);
}
}
if (ResultCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch))) {
// For snapshot, we open the rocksdb's wal to avoid loss data if crash.
if (ResultCode::SUCCEEDED != engine_->commitBatchWrite(std::move(batch), false)) {
LOG(ERROR) << idStr_ << "Put failed in commit";
return std::make_pair(0, 0);
}
Expand Down
1 change: 0 additions & 1 deletion src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class Part : public raftex::RaftPart {

void asyncRemove(folly::StringPiece key, KVCallback cb);
void asyncMultiRemove(const std::vector<std::string>& keys, KVCallback cb);
void asyncRemovePrefix(folly::StringPiece prefix, KVCallback cb);
void asyncRemoveRange(folly::StringPiece start,
folly::StringPiece end,
KVCallback cb);
Expand Down
61 changes: 4 additions & 57 deletions src/kvstore/RocksEngine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ namespace {
class RocksWriteBatch : public WriteBatch {
private:
rocksdb::WriteBatch batch_;
rocksdb::DB* db_{nullptr};

public:
explicit RocksWriteBatch(rocksdb::DB* db) : batch_(FLAGS_rocksdb_batch_size), db_(db) {}
RocksWriteBatch() : batch_(FLAGS_rocksdb_batch_size) {}

virtual ~RocksWriteBatch() = default;

Expand All @@ -50,25 +49,6 @@ class RocksWriteBatch : public WriteBatch {
}
}

ResultCode removePrefix(folly::StringPiece prefix) override {
rocksdb::Slice pre(prefix.begin(), prefix.size());
rocksdb::ReadOptions options;
std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(options));
iter->Seek(pre);
while (iter->Valid()) {
if (iter->key().starts_with(pre)) {
if (!batch_.Delete(iter->key()).ok()) {
return ResultCode::ERR_UNKNOWN;
}
} else {
// Done
break;
}
iter->Next();
}
return ResultCode::SUCCEEDED;
}

// Remove all keys in the range [start, end)
ResultCode removeRange(folly::StringPiece start, folly::StringPiece end) override {
if (batch_.DeleteRange(toSlice(start), toSlice(end)).ok()) {
Expand Down Expand Up @@ -127,13 +107,13 @@ RocksEngine::RocksEngine(GraphSpaceID spaceId,


std::unique_ptr<WriteBatch> RocksEngine::startBatchWrite() {
return std::make_unique<RocksWriteBatch>(db_.get());
return std::make_unique<RocksWriteBatch>();
}


ResultCode RocksEngine::commitBatchWrite(std::unique_ptr<WriteBatch> batch) {
ResultCode RocksEngine::commitBatchWrite(std::unique_ptr<WriteBatch> batch, bool disableWAL) {
rocksdb::WriteOptions options;
options.disableWAL = FLAGS_rocksdb_disable_wal;
options.disableWAL = disableWAL;
auto* b = static_cast<RocksWriteBatch*>(batch.get());
rocksdb::Status status = db_->Write(options, b->data());
if (status.ok()) {
Expand Down Expand Up @@ -285,8 +265,6 @@ ResultCode RocksEngine::removeRange(const std::string& start,
const std::string& end) {
rocksdb::WriteOptions options;
options.disableWAL = FLAGS_rocksdb_disable_wal;
// TODO(sye) Given the RocksDB version we are using,
// we should avoud using DeleteRange
auto status = db_->DeleteRange(options, db_->DefaultColumnFamily(), start, end);
if (status.ok()) {
return ResultCode::SUCCEEDED;
Expand All @@ -296,41 +274,10 @@ ResultCode RocksEngine::removeRange(const std::string& start,
}
}


ResultCode RocksEngine::removePrefix(const std::string& prefix) {
rocksdb::Slice pre(prefix.data(), prefix.size());
rocksdb::ReadOptions readOptions;
rocksdb::WriteBatch batch;
std::unique_ptr<rocksdb::Iterator> iter(db_->NewIterator(readOptions));
iter->Seek(pre);
while (iter->Valid()) {
if (iter->key().starts_with(pre)) {
auto status = batch.Delete(iter->key());
if (!status.ok()) {
return ResultCode::ERR_UNKNOWN;
}
} else {
// Done
break;
}
iter->Next();
}

rocksdb::WriteOptions writeOptions;
writeOptions.disableWAL = FLAGS_rocksdb_disable_wal;
if (db_->Write(writeOptions, &batch).ok()) {
return ResultCode::SUCCEEDED;
} else {
return ResultCode::ERR_UNKNOWN;
}
}


std::string RocksEngine::partKey(PartitionID partId) {
return NebulaKeyUtils::systemPartKey(partId);
}


void RocksEngine::addPart(PartitionID partId) {
auto ret = put(partKey(partId), "");
if (ret == ResultCode::SUCCEEDED) {
Expand Down
8 changes: 5 additions & 3 deletions src/kvstore/RocksEngine.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ class RocksEngine : public KVEngine {
}

std::unique_ptr<WriteBatch> startBatchWrite() override;
ResultCode commitBatchWrite(std::unique_ptr<WriteBatch> batch) override;

ResultCode commitBatchWrite(std::unique_ptr<WriteBatch> batch,
bool disableWAL) override;

/*********************
* Data retrieval
Expand Down Expand Up @@ -143,8 +145,6 @@ class RocksEngine : public KVEngine {
ResultCode removeRange(const std::string& start,
const std::string& end) override;

ResultCode removePrefix(const std::string& prefix) override;

/*********************
* Non-data operation
********************/
Expand Down Expand Up @@ -176,6 +176,8 @@ class RocksEngine : public KVEngine {
private:
std::string partKey(PartitionID partId);

std::string nextKey(const std::string& currKey) const;

private:
std::string dataPath_;
std::unique_ptr<rocksdb::DB> db_{nullptr};
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/plugins/hbase/HBaseStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ class HBaseStore : public KVStore {
void asyncRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
const std::string& prefix,
KVCallback cb) override;
KVCallback cb);

void asyncAtomicOp(GraphSpaceID,
PartitionID,
Expand Down
2 changes: 0 additions & 2 deletions src/storage/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ class BaseProcessor {
void doRemoveRange(GraphSpaceID spaceId, PartitionID partId, std::string start,
std::string end);

void doRemovePrefix(GraphSpaceID spaceId, PartitionID partId, std::string prefix);

kvstore::ResultCode doRange(GraphSpaceID spaceId, PartitionID partId, std::string start,
std::string end, std::unique_ptr<kvstore::KVIterator>* iter);

Expand Down
10 changes: 0 additions & 10 deletions src/storage/BaseProcessor.inl
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,6 @@ void BaseProcessor<RESP>::doRemoveRange(GraphSpaceID spaceId,
});
}

template <typename RESP>
void BaseProcessor<RESP>::doRemovePrefix(GraphSpaceID spaceId,
PartitionID partId,
std::string prefix) {
this->kvstore_->asyncRemovePrefix(
spaceId, partId, prefix, [spaceId, partId, this](kvstore::ResultCode code) {
handleAsync(spaceId, partId, code);
});
}

template<typename RESP>
kvstore::ResultCode BaseProcessor<RESP>::doRange(GraphSpaceID spaceId,
PartitionID partId,
Expand Down
Loading

0 comments on commit 07717e4

Please sign in to comment.