Skip to content

Commit

Permalink
To support delete in toss (vesoft-inc#440)
Browse files Browse the repository at this point in the history
#### What type of PR is this?
- [ ] bug
- [X ] feature
- [ ] enhancement

#### What does this PR do?


#### Which issue(s)/PR(s) this PR relates to?

support delete out/in edges in TOSS. 

  
#### Special notes for your reviewer, ex. impact of this fix, etc:


#### Additional context:


#### Checklist:
- [ ] Documentation affected (Please add the label if documentation needs to be modified.)
- [ ] Incompatible (If it is incompatible, please describe it and add corresponding label.)
- [ ] Need to cherry-pick (If need to cherry-pick to some branches, please label the destination version(s).)
- [ ] Performance impacted: Consumes more CPU/Memory


#### Release notes:
Please confirm whether to reflect in release notes and how to describe:
>                                                                 `


Migrated from vesoft-inc#3374

Co-authored-by: [email protected] <[email protected]>
  • Loading branch information
nebula-bots and liuyu85cn authored Jan 6, 2022
1 parent 5e1afc7 commit cff8082
Show file tree
Hide file tree
Showing 59 changed files with 2,150 additions and 599 deletions.
45 changes: 44 additions & 1 deletion src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
auto partId = directReq.get_parts().begin()->first;
auto optLeader = getLeader(directReq.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId);
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
Expand Down Expand Up @@ -189,5 +190,47 @@ InternalStorageClient::getPartLeader(
return clusters;
}

void InternalStorageClient::chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
const std::string& txnId,
TermID termId,
folly::Promise<nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb) {
auto spaceId = req.get_space_id();
auto partId = req.get_parts().begin()->first;
auto optLeader = getLeader(req.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(2) << "leader host: " << leader;

cpp2::ChainDeleteEdgesRequest chainReq;
chainReq.space_id_ref() = req.get_space_id();
chainReq.parts_ref() = req.get_parts();
chainReq.txn_id_ref() = txnId;
chainReq.term_ref() = termId;
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainDeleteEdgesRequest& r) {
return client->future_chainDeleteEdges(r);
});

std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainDeleteEdges(req, txnId, termId, std::move(p));
} else {
p.setValue(code);
}
return;
});
}

} // namespace storage
} // namespace nebula
6 changes: 6 additions & 0 deletions src/clients/storage/InternalStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ class InternalStorageClient
std::unordered_map<PartitionID, std::vector<std::string>> data,
folly::EventBase* evb = nullptr);

virtual void chainDeleteEdges(cpp2::DeleteEdgesRequest& req,
const std::string& txnId,
TermID termId,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

private:
cpp2::ChainAddEdgesRequest makeChainAddReq(const cpp2::AddEdgesRequest& req,
TermID termId,
Expand Down
6 changes: 4 additions & 2 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::deleteEdges(

return collectResponse(param.evb,
std::move(requests),
[](ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return client->future_deleteEdges(r);
[useToss = param.useExperimentalFeature](
ThriftClientType* client, const cpp2::DeleteEdgesRequest& r) {
return useToss ? client->future_chainDeleteEdges(r)
: client->future_deleteEdges(r);
});
}

Expand Down
2 changes: 2 additions & 0 deletions src/common/utils/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ enum class NebulaKeyType : uint32_t {
kOperation = 0x00000005,
kKeyValue = 0x00000006,
kVertex = 0x00000007,
kPrime = 0x00000008, // used in TOSS, if we write a lock succeed
kDoublePrime = 0x00000009, // used in TOSS, if we get RPC back from remote.
};

enum class NebulaSystemKeyType : uint32_t {
Expand Down
2 changes: 2 additions & 0 deletions src/graph/executor/mutate/DeleteExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "graph/context/QueryContext.h"
#include "graph/executor/mutate/DeleteExecutor.h"
#include "graph/planner/plan/Mutate.h"
#include "graph/service/GraphFlags.h"
#include "graph/util/SchemaUtil.h"

using nebula::storage::StorageClient;
Expand Down Expand Up @@ -208,6 +209,7 @@ folly::Future<Status> DeleteEdgesExecutor::deleteEdges() {
auto plan = qctx()->plan();
StorageClient::CommonRequestParam param(
spaceId, qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
return qctx()
->getStorageClient()
->deleteEdges(param, std::move(edgeKeys))
Expand Down
12 changes: 11 additions & 1 deletion src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,7 @@ service GraphStorageService {

UpdateResponse chainUpdateEdge(1: UpdateEdgeRequest req);
ExecResponse chainAddEdges(1: AddEdgesRequest req);
ExecResponse chainDeleteEdges(1: DeleteEdgesRequest req);

KVGetResponse get(1: KVGetRequest req);
ExecResponse put(1: KVPutRequest req);
Expand Down Expand Up @@ -884,7 +885,6 @@ struct ChainAddEdgesRequest {
3: list<binary> prop_names,
// if true, when edge already exists, do nothing
4: bool if_not_exists,
// 5: map<common.PartitionID, i64> term_of_parts,
5: i64 term
6: optional i64 edge_version
// 6: optional map<common.PartitionID, list<i64>>(
Expand All @@ -900,10 +900,20 @@ struct ChainUpdateEdgeRequest {
5: required list<common.PartitionID> parts,
}

struct ChainDeleteEdgesRequest {
1: common.GraphSpaceID space_id,
// partId => edgeKeys
2: map<common.PartitionID, list<EdgeKey>>
(cpp.template = "std::unordered_map") parts,
3: binary txn_id
4: i64 term,
}

service InternalStorageService {
ExecResponse chainAddEdges(1: ChainAddEdgesRequest req);
UpdateResponse chainUpdateEdge(1: ChainUpdateEdgeRequest req);

// Interfaces for log storage
ExecResponse syncData(1: SyncDataRequest req);
ExecResponse chainDeleteEdges(1: ChainDeleteEdgesRequest req);
}
4 changes: 2 additions & 2 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
// Make the number of values are an even number
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
VLOG(2) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
<< ", val = " << folly::hexlify(kvs[i + 1]);
auto code = batch->put(kvs[i], kvs[i + 1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down Expand Up @@ -295,7 +295,7 @@ std::tuple<nebula::cpp2::ErrorCode, LogID, TermID> Part::commitLogs(
case OP_BATCH_WRITE: {
auto data = decodeBatchValue(log);
for (auto& op : data) {
VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
VLOG(2) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
<< ", val=" << folly::hexlify(op.second.second);
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
if (op.first == BatchLogType::OP_BATCH_PUT) {
Expand Down
1 change: 0 additions & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2026,7 +2026,6 @@ void RaftPart::checkRemoteListeners(const std::set<HostAddr>& expected) {
}
}
}

bool RaftPart::leaseValid() {
std::lock_guard<std::mutex> g(raftLock_);
if (hosts_.empty()) {
Expand Down
12 changes: 8 additions & 4 deletions src/mock/MockData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ std::vector<VertexID> MockData::mockPlayerVerticeIds() {
return ret;
}

std::vector<EdgeData> MockData::mockEdges(bool upper) {
std::vector<EdgeData> MockData::mockEdges(bool upper, bool hasInEdges) {
std::vector<EdgeData> ret;
// Use serve data, positive edgeType is 101, reverse edgeType is -101
for (auto& serve : serves_) {
Expand Down Expand Up @@ -788,7 +788,9 @@ std::vector<EdgeData> MockData::mockEdges(bool upper) {
positiveEdge.props_ = std::move(props);
auto reverseData = getReverseEdge(positiveEdge);
ret.emplace_back(std::move(positiveEdge));
ret.emplace_back(std::move(reverseData));
if (hasInEdges) {
ret.emplace_back(std::move(reverseData));
}
}
return ret;
}
Expand Down Expand Up @@ -947,11 +949,13 @@ nebula::storage::cpp2::DeleteVerticesRequest MockData::mockDeleteVerticesReq(int
return req;
}

nebula::storage::cpp2::AddEdgesRequest MockData::mockAddEdgesReq(bool upper, int32_t parts) {
nebula::storage::cpp2::AddEdgesRequest MockData::mockAddEdgesReq(bool upper,
int32_t parts,
bool hasInEdges) {
nebula::storage::cpp2::AddEdgesRequest req;
req.space_id_ref() = 1;
req.if_not_exists_ref() = true;
auto retRecs = mockEdges(upper);
auto retRecs = mockEdges(upper, hasInEdges);
for (auto& rec : retRecs) {
nebula::storage::cpp2::NewEdge newEdge;
nebula::storage::cpp2::EdgeKey edgeKey;
Expand Down
6 changes: 4 additions & 2 deletions src/mock/MockData.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ class MockData {
static std::vector<std::pair<PartitionID, std::string>> mockPlayerIndexKeys(bool upper = false);

// generate serve edge
static std::vector<EdgeData> mockEdges(bool upper = false);
// param: includeInEdges, if the return set has both out and in edges
static std::vector<EdgeData> mockEdges(bool upper = false, bool includeInEdges = true);

static std::vector<std::pair<PartitionID, std::string>> mockServeIndexKeys();

Expand Down Expand Up @@ -169,7 +170,8 @@ class MockData {
int32_t parts = 6);

static nebula::storage::cpp2::AddEdgesRequest mockAddEdgesReq(bool upper = false,
int32_t parts = 6);
int32_t parts = 6,
bool hasInEdges = true);

static nebula::storage::cpp2::DeleteVerticesRequest mockDeleteVerticesReq(int32_t parts = 6);

Expand Down
13 changes: 9 additions & 4 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,22 @@ nebula_add_library(
storage_transaction_executor OBJECT
transaction/TransactionManager.cpp
transaction/ConsistUtil.cpp
transaction/ChainUpdateEdgeProcessorLocal.cpp
transaction/ChainUpdateEdgeProcessorRemote.cpp
transaction/ChainUpdateEdgeLocalProcessor.cpp
transaction/ChainUpdateEdgeRemoteProcessor.cpp
transaction/ChainResumeProcessor.cpp
transaction/ChainAddEdgesGroupProcessor.cpp
transaction/ChainAddEdgesProcessorLocal.cpp
transaction/ChainAddEdgesProcessorRemote.cpp
transaction/ChainAddEdgesLocalProcessor.cpp
transaction/ChainAddEdgesRemoteProcessor.cpp
transaction/ResumeAddEdgeProcessor.cpp
transaction/ResumeAddEdgeRemoteProcessor.cpp
transaction/ResumeUpdateProcessor.cpp
transaction/ResumeUpdateRemoteProcessor.cpp
transaction/ChainProcessorFactory.cpp
transaction/ChainDeleteEdgesGroupProcessor.cpp
transaction/ChainDeleteEdgesLocalProcessor.cpp
transaction/ChainDeleteEdgesRemoteProcessor.cpp
transaction/ChainDeleteEdgesResumeProcessor.cpp
transaction/ChainDeleteEdgesResumeRemoteProcessor.cpp
)

nebula_add_library(
Expand Down
11 changes: 9 additions & 2 deletions src/storage/GraphStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@
#include "storage/query/ScanEdgeProcessor.h"
#include "storage/query/ScanVertexProcessor.h"
#include "storage/transaction/ChainAddEdgesGroupProcessor.h"
#include "storage/transaction/ChainUpdateEdgeProcessorLocal.h"
#include "storage/transaction/ChainDeleteEdgesGroupProcessor.h"
#include "storage/transaction/ChainUpdateEdgeLocalProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand Down Expand Up @@ -112,7 +113,7 @@ folly::Future<cpp2::UpdateResponse> GraphStorageServiceHandler::future_updateEdg

folly::Future<cpp2::UpdateResponse> GraphStorageServiceHandler::future_chainUpdateEdge(
const cpp2::UpdateEdgeRequest& req) {
auto* proc = ChainUpdateEdgeProcessorLocal::instance(env_);
auto* proc = ChainUpdateEdgeLocalProcessor::instance(env_);
RETURN_FUTURE(proc);
}

Expand Down Expand Up @@ -160,6 +161,12 @@ folly::Future<cpp2::ExecResponse> GraphStorageServiceHandler::future_chainAddEdg
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse> GraphStorageServiceHandler::future_chainDeleteEdges(
const cpp2::DeleteEdgesRequest& req) {
auto* processor = ChainDeleteEdgesGroupProcessor::instance(env_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse> GraphStorageServiceHandler::future_put(
const cpp2::KVPutRequest& req) {
auto* processor = PutProcessor::instance(env_);
Expand Down
3 changes: 3 additions & 0 deletions src/storage/GraphStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class GraphStorageServiceHandler final : public cpp2::GraphStorageServiceSvIf {

folly::Future<cpp2::ScanResponse> future_scanVertex(const cpp2::ScanVertexRequest& req) override;

folly::Future<cpp2::ExecResponse> future_chainDeleteEdges(
const cpp2::DeleteEdgesRequest& req) override;

folly::Future<cpp2::ScanResponse> future_scanEdge(const cpp2::ScanEdgeRequest& req) override;

folly::Future<cpp2::GetUUIDResp> future_getUUID(const cpp2::GetUUIDReq& req) override;
Expand Down
15 changes: 11 additions & 4 deletions src/storage/InternalStorageServiceHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
#include "storage/InternalStorageServiceHandler.h"

#include "storage/kv/SyncDataProcessor.h"
#include "storage/transaction/ChainAddEdgesProcessorRemote.h"
#include "storage/transaction/ChainUpdateEdgeProcessorRemote.h"
#include "storage/transaction/ChainAddEdgesRemoteProcessor.h"
#include "storage/transaction/ChainDeleteEdgesRemoteProcessor.h"
#include "storage/transaction/ChainUpdateEdgeRemoteProcessor.h"

#define RETURN_FUTURE(processor) \
auto f = processor->getFuture(); \
Expand Down Expand Up @@ -38,13 +39,19 @@ InternalStorageServiceHandler::InternalStorageServiceHandler(StorageEnv* env) :

folly::Future<cpp2::ExecResponse> InternalStorageServiceHandler::future_chainAddEdges(
const cpp2::ChainAddEdgesRequest& req) {
auto* processor = ChainAddEdgesProcessorRemote::instance(env_);
auto* processor = ChainAddEdgesRemoteProcessor::instance(env_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::UpdateResponse> InternalStorageServiceHandler::future_chainUpdateEdge(
const cpp2::ChainUpdateEdgeRequest& req) {
auto* processor = ChainUpdateEdgeProcessorRemote::instance(env_);
auto* processor = ChainUpdateEdgeRemoteProcessor::instance(env_);
RETURN_FUTURE(processor);
}

folly::Future<cpp2::ExecResponse> InternalStorageServiceHandler::future_chainDeleteEdges(
const cpp2::ChainDeleteEdgesRequest& req) {
auto* processor = ChainDeleteEdgesRemoteProcessor::instance(env_);
RETURN_FUTURE(processor);
}

Expand Down
3 changes: 3 additions & 0 deletions src/storage/InternalStorageServiceHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class InternalStorageServiceHandler final : public cpp2::InternalStorageServiceS

folly::Future<cpp2::ExecResponse> future_syncData(const cpp2::SyncDataRequest& req) override;

folly::Future<cpp2::ExecResponse> future_chainDeleteEdges(
const cpp2::ChainDeleteEdgesRequest& p_req);

private:
StorageEnv* env_{nullptr};
std::shared_ptr<folly::Executor> readerPool_;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/mutate/AddEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ extern ProcessorCounters kAddEdgesCounters;

class AddEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
friend class TransactionManager;
friend class ChainAddEdgesProcessorLocal;
friend class ChainAddEdgesLocalProcessor;

public:
static AddEdgesProcessor* instance(StorageEnv* env,
Expand Down
23 changes: 21 additions & 2 deletions src/storage/mutate/DeleteEdgesProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,22 @@ void DeleteEdgesProcessor::process(const cpp2::DeleteEdgesRequest& req) {
handleAsync(spaceId_, partId, code);
continue;
}
doRemove(spaceId_, partId, std::move(keys));
stats::StatsManager::addValue(kNumEdgesDeleted, keys.size());

HookFuncPara para;
if (tossHookFunc_) {
para.keys.emplace(&keys);
(*tossHookFunc_)(para);
}
if (para.result) {
env_->kvstore_->asyncAppendBatch(
spaceId_,
partId,
std::move(para.result.value()),
[partId, this](nebula::cpp2::ErrorCode rc) { handleAsync(spaceId_, partId, rc); });
} else {
doRemove(spaceId_, partId, std::move(keys));
stats::StatsManager::addValue(kNumEdgesDeleted, keys.size());
}
}
} else {
for (auto& part : partEdges) {
Expand Down Expand Up @@ -198,6 +212,11 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> DeleteEdgesProcessor::deleteEdges(
}
}

if (tossHookFunc_) {
HookFuncPara para;
para.batch.emplace(batchHolder.get());
(*tossHookFunc_)(para);
}
return encodeBatchValue(batchHolder->getBatch());
}

Expand Down
Loading

0 comments on commit cff8082

Please sign in to comment.