Skip to content

Commit

Permalink
ignore existed index (#3392)
Browse files Browse the repository at this point in the history
* ignore existed index

* remove a info log

* fix memory leak
  • Loading branch information
critical27 authored Dec 16, 2021
1 parent 75f1112 commit f14aa0e
Show file tree
Hide file tree
Showing 19 changed files with 335 additions and 57 deletions.
8 changes: 6 additions & 2 deletions src/clients/storage/StorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
const CommonRequestParam& param,
std::vector<cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> propNames,
bool ifNotExists) {
bool ifNotExists,
bool ignoreExistedIndex) {
auto cbStatus = getIdFromNewVertex(param.space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ExecResponse>>(
Expand All @@ -133,6 +134,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
auto& req = requests[host];
req.set_space_id(param.space);
req.set_if_not_exists(ifNotExists);
req.set_ignore_existed_index(ignoreExistedIndex);
req.set_parts(std::move(c.second));
req.set_prop_names(propNames);
req.set_common(common);
Expand All @@ -149,7 +151,8 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addVertices(
StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addEdges(const CommonRequestParam& param,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists) {
bool ifNotExists,
bool ignoreExistedIndex) {
auto cbStatus = getIdFromNewEdge(param.space);
if (!cbStatus.ok()) {
return folly::makeFuture<StorageRpcResponse<cpp2::ExecResponse>>(
Expand All @@ -170,6 +173,7 @@ StorageRpcRespFuture<cpp2::ExecResponse> StorageClient::addEdges(const CommonReq
auto& req = requests[host];
req.set_space_id(param.space);
req.set_if_not_exists(ifNotExists);
req.set_ignore_existed_index(ignoreExistedIndex);
req.set_parts(std::move(c.second));
req.set_prop_names(propNames);
req.set_common(common);
Expand Down
6 changes: 4 additions & 2 deletions src/clients/storage/StorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,14 @@ class StorageClient : public StorageClientBase<cpp2::GraphStorageServiceAsyncCli
const CommonRequestParam& param,
std::vector<cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> propNames,
bool ifNotExists);
bool ifNotExists,
bool ignoreExistedIndex);

StorageRpcRespFuture<cpp2::ExecResponse> addEdges(const CommonRequestParam& param,
std::vector<cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists);
bool ifNotExists,
bool ignoreExistedIndex);

StorageRpcRespFuture<cpp2::ExecResponse> deleteEdges(const CommonRequestParam& param,
std::vector<storage::cpp2::EdgeKey> edges);
Expand Down
1 change: 0 additions & 1 deletion src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ void StorageClientBase<ClientType>::getResponseImpl(
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
auto spaceId = request.second.get_space_id();
auto partsId = getReqPartsId(request.second);
LOG(INFO) << "Send request to storage " << host;
remoteFunc(client.get(), request.second)
.via(evb)
.thenValue([spaceId, pro, this](Response&& resp) mutable {
Expand Down
12 changes: 10 additions & 2 deletions src/graph/executor/mutate/InsertExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ folly::Future<Status> InsertVerticesExecutor::insertVertices() {
ivNode->getSpace(), qctx()->rctx()->session()->id(), plan->id(), plan->isProfileEnabled());
return qctx()
->getStorageClient()
->addVertices(param, ivNode->getVertices(), ivNode->getPropNames(), ivNode->getIfNotExists())
->addVertices(param,
ivNode->getVertices(),
ivNode->getPropNames(),
ivNode->getIfNotExists(),
ivNode->getIgnoreExistedIndex())
.via(runner())
.ensure([addVertTime]() {
VLOG(1) << "Add vertices time: " << addVertTime.elapsedInUSec() << "us";
Expand All @@ -52,7 +56,11 @@ folly::Future<Status> InsertEdgesExecutor::insertEdges() {
param.useExperimentalFeature = FLAGS_enable_experimental_feature;
return qctx()
->getStorageClient()
->addEdges(param, ieNode->getEdges(), ieNode->getPropNames(), ieNode->getIfNotExists())
->addEdges(param,
ieNode->getEdges(),
ieNode->getPropNames(),
ieNode->getIfNotExists(),
ieNode->getIgnoreExistedIndex())
.via(runner())
.ensure(
[addEdgeTime]() { VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us"; })
Expand Down
37 changes: 30 additions & 7 deletions src/graph/planner/plan/Mutate.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@ class InsertVertices final : public SingleDependencyNode {
GraphSpaceID spaceId,
std::vector<storage::cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> tagPropNames,
bool ifNotExists) {
return qctx->objPool()->add(new InsertVertices(
qctx, input, spaceId, std::move(vertices), std::move(tagPropNames), ifNotExists));
bool ifNotExists,
bool ignoreExistedIndex) {
return qctx->objPool()->add(new InsertVertices(qctx,
input,
spaceId,
std::move(vertices),
std::move(tagPropNames),
ifNotExists,
ignoreExistedIndex));
}

std::unique_ptr<PlanNodeDescription> explain() const override;
Expand All @@ -39,24 +45,29 @@ class InsertVertices final : public SingleDependencyNode {

bool getIfNotExists() const { return ifNotExists_; }

bool getIgnoreExistedIndex() const { return ignoreExistedIndex_; }

private:
InsertVertices(QueryContext* qctx,
PlanNode* input,
GraphSpaceID spaceId,
std::vector<storage::cpp2::NewVertex> vertices,
std::unordered_map<TagID, std::vector<std::string>> tagPropNames,
bool ifNotExists)
bool ifNotExists,
bool ignoreExistedIndex)
: SingleDependencyNode(qctx, Kind::kInsertVertices, input),
spaceId_(spaceId),
vertices_(std::move(vertices)),
tagPropNames_(std::move(tagPropNames)),
ifNotExists_(ifNotExists) {}
ifNotExists_(ifNotExists),
ignoreExistedIndex_(ignoreExistedIndex) {}

private:
GraphSpaceID spaceId_{-1};
std::vector<storage::cpp2::NewVertex> vertices_;
std::unordered_map<TagID, std::vector<std::string>> tagPropNames_;
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
};

class InsertEdges final : public SingleDependencyNode {
Expand All @@ -67,9 +78,16 @@ class InsertEdges final : public SingleDependencyNode {
std::vector<storage::cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists,
bool ignoreExistedIndex,
bool useChainInsert = false) {
return qctx->objPool()->add(new InsertEdges(
qctx, input, spaceId, std::move(edges), std::move(propNames), ifNotExists, useChainInsert));
return qctx->objPool()->add(new InsertEdges(qctx,
input,
spaceId,
std::move(edges),
std::move(propNames),
ifNotExists,
ignoreExistedIndex,
useChainInsert));
}

std::unique_ptr<PlanNodeDescription> explain() const override;
Expand All @@ -80,6 +98,8 @@ class InsertEdges final : public SingleDependencyNode {

bool getIfNotExists() const { return ifNotExists_; }

bool getIgnoreExistedIndex() const { return ignoreExistedIndex_; }

GraphSpaceID getSpace() const { return spaceId_; }

bool useChainInsert() const { return useChainInsert_; }
Expand All @@ -91,19 +111,22 @@ class InsertEdges final : public SingleDependencyNode {
std::vector<storage::cpp2::NewEdge> edges,
std::vector<std::string> propNames,
bool ifNotExists,
bool ignoreExistedIndex,
bool useChainInsert)
: SingleDependencyNode(qctx, Kind::kInsertEdges, input),
spaceId_(spaceId),
edges_(std::move(edges)),
propNames_(std::move(propNames)),
ifNotExists_(ifNotExists),
ignoreExistedIndex_(ignoreExistedIndex),
useChainInsert_(useChainInsert) {}

private:
GraphSpaceID spaceId_{-1};
std::vector<storage::cpp2::NewEdge> edges_;
std::vector<std::string> propNames_;
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
// if this enabled, add edge request will only sent to
// outbound edges. (toss)
bool useChainInsert_{false};
Expand Down
12 changes: 10 additions & 2 deletions src/graph/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ Status InsertVerticesValidator::validateImpl() {
}

Status InsertVerticesValidator::toPlan() {
auto doNode = InsertVertices::make(
qctx_, nullptr, spaceId_, std::move(vertices_), std::move(tagPropNames_), ifNotExists_);
auto doNode = InsertVertices::make(qctx_,
nullptr,
spaceId_,
std::move(vertices_),
std::move(tagPropNames_),
ifNotExists_,
ignoreExistedIndex_);
root_ = doNode;
tail_ = root_;
return Status::OK();
Expand All @@ -32,6 +37,7 @@ Status InsertVerticesValidator::toPlan() {
Status InsertVerticesValidator::check() {
auto sentence = static_cast<InsertVerticesSentence *>(sentence_);
ifNotExists_ = sentence->isIfNotExists();
ignoreExistedIndex_ = sentence->ignoreExistedIndex();
rows_ = sentence->rows();
if (rows_.empty()) {
return Status::SemanticError("VALUES cannot be empty");
Expand Down Expand Up @@ -150,6 +156,7 @@ Status InsertEdgesValidator::toPlan() {
std::move(edges_),
std::move(entirePropNames_),
ifNotExists_,
ignoreExistedIndex_,
useChainInsert);
root_ = doNode;
tail_ = root_;
Expand All @@ -159,6 +166,7 @@ Status InsertEdgesValidator::toPlan() {
Status InsertEdgesValidator::check() {
auto sentence = static_cast<InsertEdgesSentence *>(sentence_);
ifNotExists_ = sentence->isIfNotExists();
ignoreExistedIndex_ = sentence->ignoreExistedIndex();
auto edgeStatus = qctx_->schemaMng()->toEdgeType(spaceId_, *sentence->edge());
NG_RETURN_IF_ERROR(edgeStatus);
edgeType_ = edgeStatus.value();
Expand Down
2 changes: 2 additions & 0 deletions src/graph/validator/MutateValidator.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class InsertVerticesValidator final : public Validator {
std::vector<std::pair<TagID, TagSchema>> schemas_;
uint16_t propSize_{0};
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
std::vector<storage::cpp2::NewVertex> vertices_;
};

Expand All @@ -54,6 +55,7 @@ class InsertEdgesValidator final : public Validator {
private:
GraphSpaceID spaceId_{-1};
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
EdgeType edgeType_{-1};
std::shared_ptr<const meta::SchemaProviderIf> schema_;
std::vector<std::string> propNames_;
Expand Down
7 changes: 5 additions & 2 deletions src/interface/storage.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ struct AddVerticesRequest {
(cpp.template = "std::unordered_map") prop_names,
// if true, when (vertexID,tagID) already exists, do nothing
4: bool if_not_exists,
5: optional RequestCommon common,
5: bool ignore_existed_index = false,
6: optional RequestCommon common,
}

struct AddEdgesRequest {
Expand All @@ -356,7 +357,9 @@ struct AddEdgesRequest {
3: list<binary> prop_names,
// if true, when edge already exists, do nothing
4: bool if_not_exists,
5: optional RequestCommon common,
// If true, existed index won't be removed
5: bool ignore_existed_index = false,
6: optional RequestCommon common,
}

/*
Expand Down
18 changes: 16 additions & 2 deletions src/parser/MutateSentences.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,14 @@ class VertexRowList final {

class InsertVerticesSentence final : public Sentence {
public:
InsertVerticesSentence(VertexTagList *tagList, VertexRowList *rows, bool ifNotExists) {
InsertVerticesSentence(VertexTagList *tagList,
VertexRowList *rows,
bool ifNotExists,
bool ignoreExistedIndex) {
tagList_.reset(tagList);
rows_.reset(rows);
ifNotExists_ = ifNotExists;
ignoreExistedIndex_ = ignoreExistedIndex;
kind_ = Kind::kInsertVertices;
}

Expand All @@ -151,8 +155,11 @@ class InsertVerticesSentence final : public Sentence {

bool isIfNotExists() const { return ifNotExists_; }

bool ignoreExistedIndex() const { return ignoreExistedIndex_; }

private:
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
std::unique_ptr<VertexTagList> tagList_;
std::unique_ptr<VertexRowList> rows_;
};
Expand Down Expand Up @@ -209,11 +216,15 @@ class EdgeRowList final {

class InsertEdgesSentence final : public Sentence {
public:
explicit InsertEdgesSentence(std::string *edge, EdgeRowList *rows, bool ifNotExists)
explicit InsertEdgesSentence(std::string *edge,
EdgeRowList *rows,
bool ifNotExists,
bool ignoreExistedIndex)
: Sentence(Kind::kInsertEdges) {
edge_.reset(edge);
rows_.reset(rows);
ifNotExists_ = ifNotExists;
ignoreExistedIndex_ = ignoreExistedIndex;
}

const std::string *edge() const { return edge_.get(); }
Expand All @@ -231,6 +242,8 @@ class InsertEdgesSentence final : public Sentence {

bool isIfNotExists() const { return ifNotExists_; }

bool ignoreExistedIndex() const { return ignoreExistedIndex_; }

void setDefaultPropNames() { isDefaultPropNames_ = true; }

bool isDefaultPropNames() const { return isDefaultPropNames_; }
Expand All @@ -240,6 +253,7 @@ class InsertEdgesSentence final : public Sentence {
private:
bool isDefaultPropNames_{false};
bool ifNotExists_{false};
bool ignoreExistedIndex_{false};
std::unique_ptr<std::string> edge_;
std::unique_ptr<PropertyList> properties_;
std::unique_ptr<EdgeRowList> rows_;
Expand Down
26 changes: 16 additions & 10 deletions src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ static constexpr size_t kCommentLengthLimit = 256;
%token KW_BOOL KW_INT8 KW_INT16 KW_INT32 KW_INT64 KW_INT KW_FLOAT KW_DOUBLE
%token KW_STRING KW_FIXED_STRING KW_TIMESTAMP KW_DATE KW_TIME KW_DATETIME
%token KW_GO KW_AS KW_TO KW_USE KW_SET KW_FROM KW_WHERE KW_ALTER
%token KW_MATCH KW_INSERT KW_VALUE KW_VALUES KW_YIELD KW_RETURN KW_CREATE KW_VERTEX KW_VERTICES
%token KW_MATCH KW_INSERT KW_VALUE KW_VALUES KW_YIELD KW_RETURN KW_CREATE KW_VERTEX KW_VERTICES KW_IGNORE_EXISTED_INDEX
%token KW_EDGE KW_EDGES KW_STEPS KW_OVER KW_UPTO KW_REVERSELY KW_SPACE KW_DELETE KW_FIND
%token KW_TAG KW_TAGS KW_UNION KW_INTERSECT KW_MINUS
%token KW_NO KW_OVERWRITE KW_IN KW_DESCRIBE KW_DESC KW_SHOW KW_HOST KW_HOSTS KW_PART KW_PARTS KW_ADD
Expand Down Expand Up @@ -391,6 +391,7 @@ static constexpr size_t kCommentLengthLimit = 256;
%type <boolval> opt_if_not_exists
%type <boolval> opt_if_exists
%type <boolval> opt_with_properties
%type <boolval> opt_ignore_existed_index

%left QM COLON
%left KW_OR KW_XOR
Expand Down Expand Up @@ -2274,6 +2275,11 @@ opt_create_schema_prop_list
}
;

opt_ignore_existed_index
: %empty { $$=false; }
| KW_IGNORE_EXISTED_INDEX { $$=true; }
;

create_schema_prop_list
: create_schema_prop_item {
$$ = new SchemaPropList();
Expand Down Expand Up @@ -2782,8 +2788,8 @@ assignment_sentence
;

insert_vertex_sentence
: KW_INSERT KW_VERTEX opt_if_not_exists vertex_tag_list KW_VALUES vertex_row_list {
$$ = new InsertVerticesSentence($4, $6, $3);
: KW_INSERT KW_VERTEX opt_if_not_exists opt_ignore_existed_index vertex_tag_list KW_VALUES vertex_row_list {
$$ = new InsertVerticesSentence($5, $7, $3, $4);
}
;

Expand Down Expand Up @@ -2860,19 +2866,19 @@ value_list
;

insert_edge_sentence
: KW_INSERT KW_EDGE opt_if_not_exists name_label KW_VALUES edge_row_list {
auto sentence = new InsertEdgesSentence($4, $6, $3);
: KW_INSERT KW_EDGE opt_if_not_exists opt_ignore_existed_index name_label KW_VALUES edge_row_list {
auto sentence = new InsertEdgesSentence($5, $7, $3, $4);
sentence->setDefaultPropNames();
$$ = sentence;
}
| KW_INSERT KW_EDGE opt_if_not_exists name_label L_PAREN R_PAREN KW_VALUES edge_row_list {
auto sentence = new InsertEdgesSentence($4, $8, $3);
| KW_INSERT KW_EDGE opt_if_not_exists opt_ignore_existed_index name_label L_PAREN R_PAREN KW_VALUES edge_row_list {
auto sentence = new InsertEdgesSentence($5, $9, $3, $4);
sentence->setProps(new PropertyList());
$$ = sentence;
}
| KW_INSERT KW_EDGE opt_if_not_exists name_label L_PAREN prop_list R_PAREN KW_VALUES edge_row_list {
auto sentence = new InsertEdgesSentence($4, $9, $3);
sentence->setProps($6);
| KW_INSERT KW_EDGE opt_if_not_exists opt_ignore_existed_index name_label L_PAREN prop_list R_PAREN KW_VALUES edge_row_list {
auto sentence = new InsertEdgesSentence($5, $10, $3, $4);
sentence->setProps($7);
$$ = sentence;
}
;
Expand Down
Loading

0 comments on commit f14aa0e

Please sign in to comment.