Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the interface atomicOp to support empty string #2002

Merged
merged 2 commits into from
Mar 27, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ using nebula::thrift::ThriftClientManager;
using nebula::wal::FileBasedWal;
using nebula::wal::FileBasedWalPolicy;

using OpProcessor = folly::Function<folly::Optional<std::string>(AtomicOp op)>;

class AppendLogsIterator final : public LogIterator {
public:
AppendLogsIterator(LogID firstLogId,
TermID termId,
RaftPart::LogCache logs,
folly::Function<std::string(AtomicOp op)> opCB)
OpProcessor opCB)
: firstLogId_(firstLogId)
, termId_(termId)
, logId_(firstLogId)
Expand Down Expand Up @@ -92,7 +94,7 @@ class AppendLogsIterator final : public LogIterator {
// Process AtomicOp log
CHECK(!!opCB_);
opResult_ = opCB_(std::move(std::get<3>(tup)));
if (opResult_.size() > 0) {
if (opResult_.hasValue()) {
// AtomicOp Succeeded
return true;
} else {
Expand Down Expand Up @@ -145,7 +147,8 @@ class AppendLogsIterator final : public LogIterator {
folly::StringPiece logMsg() const override {
DCHECK(valid());
if (currLogType_ == LogType::ATOMIC_OP) {
return opResult_;
CHECK(opResult_.hasValue());
return opResult_.value();
} else {
return std::get<2>(logs_.at(idx_));
}
Expand Down Expand Up @@ -180,12 +183,12 @@ class AppendLogsIterator final : public LogIterator {
bool valid_{true};
LogType lastLogType_{LogType::NORMAL};
LogType currLogType_{LogType::NORMAL};
std::string opResult_;
folly::Optional<std::string> opResult_;
LogID firstLogId_;
TermID termId_;
LogID logId_;
RaftPart::LogCache logs_;
folly::Function<std::string(AtomicOp op)> opCB_;
OpProcessor opCB_;
};


Expand Down Expand Up @@ -637,10 +640,10 @@ folly::Future<AppendLogResult> RaftPart::appendLogAsync(ClusterID source,
firstId,
termId,
std::move(swappedOutLogs),
[this] (AtomicOp opCB) -> std::string {
[this] (AtomicOp opCB) -> folly::Optional<std::string> {
CHECK(opCB != nullptr);
auto opRet = opCB();
if (opRet.empty()) {
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(AppendLogResult::E_ATOMIC_OP_FAILURE);
}
Expand Down Expand Up @@ -921,9 +924,9 @@ void RaftPart::processAppendLogResponses(
firstLogId,
currTerm,
std::move(logs_),
[this] (AtomicOp op) -> std::string {
[this] (AtomicOp op) -> folly::Optional<std::string> {
auto opRet = op();
if (opRet.empty()) {
if (!opRet.hasValue()) {
// Failed
sendingPromise_.setOneSingleValue(
AppendLogResult::E_ATOMIC_OP_FAILURE);
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/RaftPart.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class AppendLogsIterator;
* otherwise it will return the new operation's encoded string whick should be applied atomically.
* You could implement CAS, READ-MODIFY-WRITE operations though it.
* */
using AtomicOp = folly::Function<std::string(void)>;
using AtomicOp = folly::Function<folly::Optional<std::string>(void)>;

class RaftPart : public std::enable_shared_from_this<RaftPart> {
friend class AppendLogsIterator;
Expand Down
25 changes: 25 additions & 0 deletions src/kvstore/raftex/test/LogCASTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,31 @@ TEST_F(LogCASTest, ZipCasTest) {
checkConsensus(copies_, 0, 4, msgs);
}

TEST_F(LogCASTest, EmptyTest) {
{
LOG(INFO) << "return empty string for atomic operation!";
folly::Baton<> baton;
leader_->atomicOpAsync([log = std::move(log)] () mutable {
return "";
}).then([&baton] (AppendLogResult res) {
ASSERT_EQ(AppendLogResult::SUCCEEDED, res);
baton.post();
});
baton.wait();
}
{
LOG(INFO) << "return none string for atomic operation!";
folly::Baton<> baton;
leader_->atomicOpAsync([log = std::move(log)] () mutable {
return folly::none;
}).then([&baton] (AppendLogResult res) {
ASSERT_EQ(AppendLogResult::E_ATOMIC_OP_FAILURE, res);
baton.post();
});
baton.wait();
}
}

} // namespace raftex
} // namespace nebula

Expand Down
4 changes: 2 additions & 2 deletions src/kvstore/raftex/test/TestShard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ HostAddr decodeLearner(const folly::StringPiece& log) {
return learner;
}

std::string compareAndSet(const std::string& log) {
folly::Optional<std::string> compareAndSet(const std::string& log) {
switch (log[0]) {
case 'T':
return log.substr(1);
default:
return std::string();
return folly::none;
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/test/TestShard.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ std::string encodeLearner(const HostAddr& addr);

HostAddr decodeLearner(const folly::StringPiece& log);

std::string compareAndSet(const std::string& log);
folly::Optional<std::string> compareAndSet(const std::string& log);

std::string encodeTransferLeader(const HostAddr& addr);

Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void AddEdgesProcessor::process(const cpp2::AddEdgesRequest& req) {
std::for_each(req.parts.begin(), req.parts.end(), [&](auto& partEdges) {
auto partId = partEdges.first;
auto atomic = [version, partId, edges = std::move(partEdges.second), this]()
-> std::string {
-> folly::Optional<std::string> {
return addEdges(version, partId, edges);
};
auto callback = [partId, this](kvstore::ResultCode code) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) {
std::for_each(partVertices.begin(), partVertices.end(), [&](auto &pv) {
auto partId = pv.first;
auto atomic = [version, partId, vertices = std::move(pv.second), this]()
-> std::string {
-> folly::Optional<std::string> {
return addVertices(version, partId, vertices);
};
auto callback = [partId, this](kvstore::ResultCode code) {
Expand Down
11 changes: 6 additions & 5 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
std::for_each(req.parts.begin(), req.parts.end(), [spaceId, this](auto &partEdges) {
auto partId = partEdges.first;
auto atomic = [spaceId, partId, edges = std::move(partEdges.second), this]()
-> std::string {
-> folly::Optional<std::string> {
return deleteEdges(spaceId, partId, edges);
};
auto callback = [spaceId, partId, this](kvstore::ResultCode code) {
Expand All @@ -74,9 +74,10 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
}
}

std::string DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<cpp2::EdgeKey>& edges) {
folly::Optional<std::string>
DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<cpp2::EdgeKey>& edges) {
std::unique_ptr<kvstore::BatchHolder> batchHolder = std::make_unique<kvstore::BatchHolder>();
for (auto& edge : edges) {
auto type = edge.edge_type;
Expand All @@ -89,7 +90,7 @@ std::string DeleteEdgesProcessor::deleteEdges(GraphSpaceID spaceId,
if (ret != kvstore::ResultCode::SUCCEEDED) {
VLOG(3) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceId " << spaceId;
return "";
return folly::none;
}
bool isLatestVE = true;
while (iter->valid()) {
Expand Down
6 changes: 3 additions & 3 deletions src/storage/mutate/DeleteEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class DeleteEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
, indexMan_(indexMan) {}


std::string deleteEdges(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<cpp2::EdgeKey>& edges);
folly::Optional<std::string> deleteEdges(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<cpp2::EdgeKey>& edges);

private:
meta::IndexManager* indexMan_{nullptr};
Expand Down
14 changes: 9 additions & 5 deletions src/storage/mutate/DeleteVerticesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
callingNum_ = req.parts.size();
std::for_each(req.parts.begin(), req.parts.end(), [spaceId, this](auto &pv) {
auto partId = pv.first;
auto atomic = [spaceId, partId, v = std::move(pv.second), this]() -> std::string {
auto atomic = [spaceId,
partId,
v = std::move(pv.second),
this]() -> folly::Optional<std::string> {
return deleteVertices(spaceId, partId, v);
};

Expand All @@ -75,9 +78,10 @@ void DeleteVerticesProcessor::process(const cpp2::DeleteVerticesRequest& req) {
}
}

std::string DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<VertexID>& vertices) {
folly::Optional<std::string>
DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<VertexID>& vertices) {
std::unique_ptr<kvstore::BatchHolder> batchHolder = std::make_unique<kvstore::BatchHolder>();
for (auto& vertex : vertices) {
auto prefix = NebulaKeyUtils::vertexPrefix(partId, vertex);
Expand All @@ -86,7 +90,7 @@ std::string DeleteVerticesProcessor::deleteVertices(GraphSpaceID spaceId,
if (ret != kvstore::ResultCode::SUCCEEDED) {
VLOG(3) << "Error! ret = " << static_cast<int32_t>(ret)
<< ", spaceId " << spaceId;
return "";
return folly::none;
}
TagID latestVVId = -1;
while (iter->valid()) {
Expand Down
7 changes: 4 additions & 3 deletions src/storage/mutate/DeleteVerticesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ class DeleteVerticesProcessor : public BaseProcessor<cpp2::ExecResponse> {
, indexMan_(indexMan)
, vertexCache_(cache) {}

std::string deleteVertices(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<VertexID>& vertices);
folly::Optional<std::string>
deleteVertices(GraphSpaceID spaceId,
PartitionID partId,
const std::vector<VertexID>& vertices);

private:
meta::IndexManager* indexMan_{nullptr};
Expand Down
4 changes: 2 additions & 2 deletions src/storage/mutate/UpdateEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) {
<< ", dst: " << edgeKey.get_dst() << ", ranking: " << edgeKey.get_ranking();
CHECK_NOTNULL(kvstore_);
this->kvstore_->asyncAtomicOp(this->spaceId_, partId,
[partId, edgeKey, this] () -> std::string {
[partId, edgeKey, this] () -> folly::Optional<std::string> {
// TODO(shylock) the AtomicOP can't return various error
// so put it in the processor
filterResult_ = checkFilter(partId, edgeKey);
Expand All @@ -477,7 +477,7 @@ void UpdateEdgeProcessor::process(const cpp2::UpdateEdgeRequest& req) {
case FilterResult::E_ERROR:
// fallthrough
default: {
return "";
return folly::none;
}
}
},
Expand Down
4 changes: 2 additions & 2 deletions src/storage/mutate/UpdateVertexProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
<< ", partId: " << partId << ", vId: " << vId;
CHECK_NOTNULL(kvstore_);
this->kvstore_->asyncAtomicOp(this->spaceId_, partId,
[partId, vId, this] () -> std::string {
[partId, vId, this] () -> folly::Optional<std::string> {
// TODO(shylock) the AtomicOP can't return various error
// so put it in the processor
filterResult_ = checkFilter(partId, vId);
Expand All @@ -426,7 +426,7 @@ void UpdateVertexProcessor::process(const cpp2::UpdateVertexRequest& req) {
case FilterResult::E_ERROR:
// Fallthrough
default: {
return "";
return folly::none;
}
}
},
Expand Down