Skip to content

Commit

Permalink
Update the interface atomicOp to support empty string (#2002)
Browse files Browse the repository at this point in the history
Co-authored-by: heng <[email protected]>
Co-authored-by: Doodle <[email protected]>
  • Loading branch information
3 people authored and sherman-the-tank committed Mar 31, 2020
1 parent c187868 commit e19f317
Show file tree
Hide file tree
Showing 13 changed files with 69 additions and 35 deletions.
21 changes: 12 additions & 9 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ using nebula::thrift::ThriftClientManager;
using nebula::wal::FileBasedWal;
using nebula::wal::FileBasedWalPolicy;

using OpProcessor = folly::Function<folly::Optional<std::string>(AtomicOp op)>;

class AppendLogsIterator final : public LogIterator {
public:
AppendLogsIterator(LogID firstLogId,
TermID termId,
RaftPart::LogCache logs,
folly::Function<std::string(AtomicOp op)> opCB)
OpProcessor opCB)
: firstLogId_(firstLogId)
, termId_(termId)
, logId_(firstLogId)
Expand Down Expand Up @@ -92,7 +94,7 @@ class AppendLogsIterator final : public LogIterator {
// Process AtomicOp log
CHECK(!!opCB_);
opResult_ = opCB_(std::move(std::get<3>(tup)));
if (opResult_.size() > 0) {
if (opResult_.hasValue()) {
// AtomicOp Succeeded
return true;
} else {
Expand Down Expand Up @@ -145,7 +147,8 @@ class AppendLogsIterator final : public LogIterator {
folly::StringPiece logMsg() const override {
DCHECK(valid());
if (currLogType_ == LogType::ATOMIC_OP) {
return opResult_;
CHECK(opResult_.hasValue());
return opResult_.value();
} else {
return std::get<2>(logs_.at(idx_));
}
Expand Down Expand Up @@ -180,12 +183,12 @@ class AppendLogsIterator final : public LogIterator {
bool valid_{true};
LogType lastLogType_{LogType::NORMAL};
LogType currLogType_{LogType::NORMAL};
std::string opResult_;
folly::Optional<std::string> opResult_;
LogID firstLogId_;
TermID termId_;
LogID logId_;
RaftPart::LogCache logs_;
folly::Function<std::string(AtomicOp op)> opCB_;
OpProcessor opCB_;
};


Expand Down Expand Up @@ -637,10 +640,10 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
firstId,
termId,
std::move(swappedOutLogs),
[this] (AtomicOp opCB) -> std::string {
[this] (AtomicOp opCB) -> folly::Optional<std::string> {
CHECK(opCB != nullptr);
auto opRet = opCB();
if (opRet.empty()) {
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(AppendLogResult::E_ATOMIC_OP_FAILURE);
}
Expand Down Expand Up @@ -921,9 +924,9 @@ void RaftPart::processAppendLogResponses(
firstLogId,
currTerm,
std::move(logs_),
[this] (AtomicOp op) -> std::string {
[this] (AtomicOp op) -> folly::Optional<std::string> {
auto opRet = op();
if (opRet.empty()) {
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(
AppendLogResult::E_ATOMIC_OP_FAILURE);
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class AppendLogsIterator;
* otherwise it will return the new operation's encoded string whick should be applied atomically.
* You could implement CAS, READ-MODIFY-WRITE operations though it.
* */
using AtomicOp = folly::Function<std::string(void)>;
using AtomicOp = folly::Function<folly::Optional<std::string>(void)>;

class RaftPart : public std::enable_shared_from_this<RaftPart> {
friend class AppendLogsIterator;
Expand Down
25 changes: 25 additions & 0 deletions src/kvstore/raftex/test/LogCASTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,31 @@ TEST_F(LogCASTest, ZipCasTest) {
checkConsensus(copies_, 0, 4, msgs);
}

TEST_F(LogCASTest, EmptyTest) {
{
LOG(INFO) << "return empty string for atomic operation!";
folly::Baton<> baton;
leader_->atomicOpAsync([log = std::move(log)] () mutable {
return "";
}).then([&baton] (AppendLogResult res) {
ASSERT_EQ(AppendLogResult::SUCCEEDED, res);
baton.post();
});
baton.wait();
}
{
LOG(INFO) << "return none string for atomic operation!";
folly::Baton<> baton;
leader_->atomicOpAsync([log = std::move(log)] () mutable {
return folly::none;
}).then([&baton] (AppendLogResult res) {
ASSERT_EQ(AppendLogResult::E_ATOMIC_OP_FAILURE, res);
baton.post();
});
baton.wait();
}
}

} // namespace raftex
} // namespace nebula

Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/raftex/test/TestShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ HostAddr decodeLearner(const folly::StringPiece& log) {
return learner;
}

std::string compareAndSet(const std::string& log) {
folly::Optional<std::string> compareAndSet(const std::string& log) {
switch (log[0]) {
case 'T':
return log.substr(1);
default:
return std::string();
return folly::none;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ std::string encodeLearner(const HostAddr& addr);

HostAddr decodeLearner(const folly::StringPiece& log);

std::string compareAndSet(const std::string& log);
folly::Optional<std::string> compareAndSet(const std::string& log);

std::string encodeTransferLeader(const HostAddr& addr);

Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) {
std::for_each(req.parts.begin(), req.parts.end(), [&](auto& partEdges) {
auto partId = partEdges.first;
auto atomic = [version, partId, edges = std::move(partEdges.second), this]()
-> std::string {
-> folly::Optional<std::string> {
return addEdges(version, partId, edges);
};
auto callback = [partId, this](kvstore::ResultCode code) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) {
std::for_each(partVertices.begin(), partVertices.end(), [&](auto &pv) {
auto partId = pv.first;
auto atomic = [version, partId, vertices = std::move(pv.second), this]()
-> std::string {
-> folly::Optional<std::string> {
return addVertices(version, partId, vertices);
};
auto callback = [partId, this](kvstore::ResultCode code) {
Expand Down
11 changes: 6 additions & 5 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
std::for_each(req.parts.begin(), req.parts.end(), [spaceId, this](auto &partEdges) {
auto partId = partEdges.first;
auto atomic = [spaceId, partId, edges = std::move(partEdges.second), this]()
-> std::string {
-> folly::Optional<std::string> {
return deleteEdges(spaceId, partId, edges);
};
auto callback = [spaceId, partId, this](kvstore::ResultCode code) {
Expand All @@ -74,9 +74,10 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
}
}

std::string DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<cpp2::EdgeKey>& edges) {
folly::Optional<std::string>
DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<cpp2::EdgeKey>& edges) {
std::unique_ptr<kvstore::BatchHolder> batchHolder = std::make_unique<kvstore::BatchHolder>();
for (auto& edge : edges) {
auto type = edge.edge_type;
Expand All @@ -89,7 +90,7 @@ std::string DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
if (ret != kvstore::ResultCode::SUCCEEDED) {
VLOG(3) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceId " << spaceId;
return "";
return folly::none;
}
bool isLatestVE = true;
while (iter->valid()) {
Expand Down
6 changes: 3 additions & 3 deletions src/storage/mutate/DeleteEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class DeleteEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
, indexMan_(indexMan) {}


std::string deleteEdges(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<cpp2::EdgeKey>& edges);
folly::Optional<std::string> deleteEdges(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<cpp2::EdgeKey>& edges);

private:
meta::IndexManager* indexMan_{nullptr};
Expand Down
14 changes: 9 additions & 5 deletions src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
callingNum_ = req.parts.size();
std::for_each(req.parts.begin(), req.parts.end(), [spaceId, this](auto &pv) {
auto partId = pv.first;
auto atomic = [spaceId, partId, v = std::move(pv.second), this]() -> std::string {
auto atomic = [spaceId,
partId,
v = std::move(pv.second),
this]() -> folly::Optional<std::string> {
return deleteVertices(spaceId, partId, v);
};

Expand All @@ -75,9 +78,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
}
}

std::string DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<VertexID>& vertices) {
folly::Optional<std::string>
DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<VertexID>& vertices) {
std::unique_ptr<kvstore::BatchHolder> batchHolder = std::make_unique<kvstore::BatchHolder>();
for (auto& vertex : vertices) {
auto prefix = NebulaKeyUtils::vertexPrefix(partId, vertex);
Expand All @@ -86,7 +90,7 @@ std::string DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
if (ret != kvstore::ResultCode::SUCCEEDED) {
VLOG(3) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceId " << spaceId;
return "";
return folly::none;
}
TagID latestVVId = -1;
while (iter->valid()) {
Expand Down
7 changes: 4 additions & 3 deletions src/storage/mutate/DeleteVerticesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ class DeleteVerticesProcessor : public BaseProcessor<cpp2::ExecResponse> {
, indexMan_(indexMan)
, vertexCache_(cache) {}

std::string deleteVertices(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<VertexID>& vertices);
folly::Optional<std::string>
deleteVertices(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<VertexID>& vertices);

private:
meta::IndexManager* indexMan_{nullptr};
Expand Down
4 changes: 2 additions & 2 deletions src/storage/mutate/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) {
<< ", dst: " << edgeKey.get_dst() << ", ranking: " << edgeKey.get_ranking();
CHECK_NOTNULL(kvstore_);
this->kvstore_->asyncAtomicOp(this->spaceId_, partId,
[partId, edgeKey, this] () -> std::string {
[partId, edgeKey, this] () -> folly::Optional<std::string> {
// TODO(shylock) the AtomicOP can't return various error
// so put it in the processor
filterResult_ = checkFilter(partId, edgeKey);
Expand All @@ -477,7 +477,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) {
case FilterResult::E_ERROR:
// fallthrough
default: {
return "";
return folly::none;
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions src/storage/mutate/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
<< ", partId: " << partId << ", vId: " << vId;
CHECK_NOTNULL(kvstore_);
this->kvstore_->asyncAtomicOp(this->spaceId_, partId,
[partId, vId, this] () -> std::string {
[partId, vId, this] () -> folly::Optional<std::string> {
// TODO(shylock) the AtomicOP can't return various error
// so put it in the processor
filterResult_ = checkFilter(partId, vId);
Expand All @@ -426,7 +426,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
case FilterResult::E_ERROR:
// Fallthrough
default: {
return "";
return folly::none;
}
}
},
Expand Down

0 comments on commit e19f317

Please sign in to comment.