Skip to content

Commit

Permalink
Clear space (#3989)
Browse files Browse the repository at this point in the history
* temp commit

* temp commit

* Complete the code on the graph side

* temp commit

* requires careful review

* complete the code

* Compiled successfully

* add some comment

* fix review

* temp commit

* temp commit

* add tck

* fix format

* fix review

* add some tck

* fix tck

* fix comment
  • Loading branch information
zhaohaifei authored Mar 14, 2022
1 parent 25997c4 commit c9f046e
Show file tree
Hide file tree
Showing 50 changed files with 826 additions and 54 deletions.
16 changes: 10 additions & 6 deletions .linters/cpp/checkKeyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
'KW_ADD',
'KW_CREATE',
'KW_DROP',
'KW_CLEAR',
'KW_REMOVE',
'KW_IF',
'KW_NOT',
Expand Down Expand Up @@ -133,25 +134,28 @@ def get_unreserved_keyword(file_path):
if flag == 1:
if line.strip() == ';':
break
unreserved_key_words.append(re.sub('\\s+[:|]\\s+(\\w+)\\s+.*', '\\1', line).strip())
unreserved_key_words.append(
re.sub('\\s+[:|]\\s+(\\w+)\\s+.*', '\\1', line).strip())
continue

parser_file.close()
return unreserved_key_words


if __name__ == '__main__':
cmd = 'git diff --diff-filter=ACMRTUXB HEAD -p ' + SCANNER_FILE_PATH + '|grep "^+"|grep -v "^+++"|grep "KW_"'
cmd = 'git diff --diff-filter=ACMRTUXB HEAD -p ' + \
SCANNER_FILE_PATH + '|grep "^+"|grep -v "^+++"|grep "KW_"'
content = os.popen(cmd)
keywords=[]
for line in content.readlines():
keyword = re.sub('.*(KW_\\w+)\s*;.*','\\1',line.strip())
keywords = []
for line in content.readlines():
keyword = re.sub('.*(KW_\\w+)\s*;.*', '\\1', line.strip())
keywords.append(keyword)

if len(keywords) == 0:
exit(0)
unreserved_key_words = get_unreserved_keyword(PARSER_FILE_PATH)
new_key_words = [word for word in keywords if word not in reserved_key_words]
new_key_words = [
word for word in keywords if word not in reserved_key_words]
if len(new_key_words) == 0:
exit(0)
result = [word for word in new_key_words if word not in unreserved_key_words]
Expand Down
16 changes: 16 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,22 @@ folly::Future<StatusOr<bool>> MetaClient::dropSpace(std::string name, const bool
return future;
}

folly::Future<StatusOr<bool>> MetaClient::clearSpace(std::string name, const bool ifExists) {
cpp2::ClearSpaceReq req;
req.space_name_ref() = std::move(name);
req.if_exists_ref() = ifExists;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_clearSpace(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

folly::Future<StatusOr<std::vector<cpp2::HostItem>>> MetaClient::listHosts(cpp2::ListHostType tp) {
cpp2::ListHostsReq req;
req.type_ref() = tp;
Expand Down
3 changes: 3 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class MetaClient {

folly::Future<StatusOr<bool>> dropSpace(std::string name, bool ifExists = false);

// clear space data, but keep the space schema.
folly::Future<StatusOr<bool>> clearSpace(std::string name, bool ifExists = false);

folly::Future<StatusOr<std::vector<cpp2::HostItem>>> listHosts(
cpp2::ListHostType type = cpp2::ListHostType::ALLOC);

Expand Down
3 changes: 3 additions & 0 deletions src/common/plugin/fulltext/FTGraphAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class FTGraphAdapter {

virtual StatusOr<bool> dropIndex(const HttpClient& client, const std::string& index) const = 0;

// Clear the fulltext index data and keep the index schema.
virtual StatusOr<bool> clearIndex(const HttpClient& client, const std::string& index) const = 0;

virtual StatusOr<bool> indexExists(const HttpClient& client, const std::string& index) const = 0;
};
} // namespace plugin
Expand Down
41 changes: 41 additions & 0 deletions src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,28 @@ std::string ESGraphAdapter::dropIndexCmd(const HttpClient& client,
return os.str();
}

StatusOr<bool> ESGraphAdapter::clearIndex(const HttpClient& client,
const std::string& index) const {
// curl -H "Content-Type: application/json; charset=utf-8"
// -XPOST "http://127.0.0.1:9200/${index}/_delete_by_query?refresh&slices=5"
// -d '{"query": {"match_all":{}}}'
std::string cmd = clearIndexCmd(client, index);
auto ret = nebula::ProcessUtils::runCommand(cmd.c_str());
if (!ret.ok() || ret.value().empty()) {
LOG(ERROR) << "Http POST Failed: " << cmd;
return Status::Error("command failed : %s", cmd.c_str());
}
return clearCheck(ret.value());
}

std::string ESGraphAdapter::clearIndexCmd(const HttpClient& client,
const std::string& index) const noexcept {
std::stringstream os;
os << header() << XPOST << client.toString() << index << "/_delete_by_query?refresh&slices=5\""
<< " -d '{\"query\": {\"match_all\":{}}}'";
return os.str();
}

StatusOr<bool> ESGraphAdapter::indexExists(const HttpClient& client,
const std::string& index) const {
// curl -H "Content-Type: application/json; charset=utf-8"
Expand Down Expand Up @@ -283,5 +305,24 @@ bool ESGraphAdapter::indexCheck(const std::string& ret) const {
LOG(ERROR) << "error reason : " << ret;
return false;
}

bool ESGraphAdapter::clearCheck(const std::string& ret) const {
try {
auto root = folly::parseJson(ret);
if (root.isArray()) {
return false;
}
auto result = root.find("failures");
if (result != root.items().end() && result->second.isArray() && result->second.size() == 0) {
return true;
}
} catch (const std::exception& ex) {
LOG(ERROR) << "result error : " << ex.what();
}

LOG(ERROR) << "error reason : " << ret;
return false;
}

} // namespace plugin
} // namespace nebula
14 changes: 14 additions & 0 deletions src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ESGraphAdapter final : public FTGraphAdapter {
FRIEND_TEST(FulltextPluginTest, ESFuzzyTest);
FRIEND_TEST(FulltextPluginTest, ESCreateIndexTest);
FRIEND_TEST(FulltextPluginTest, ESDropIndexTest);
FRIEND_TEST(FulltextPluginTest, ESClearIndexTest);

public:
static std::unique_ptr<FTGraphAdapter> kAdapter;
Expand Down Expand Up @@ -53,6 +54,11 @@ class ESGraphAdapter final : public FTGraphAdapter {

StatusOr<bool> dropIndex(const HttpClient& client, const std::string& index) const override;

// Clear the fulltext index data on es and keep the index schema.
// client: es client
// index: fulltext index name
StatusOr<bool> clearIndex(const HttpClient& client, const std::string& index) const override;

StatusOr<bool> indexExists(const HttpClient& client, const std::string& index) const override;

private:
Expand Down Expand Up @@ -88,12 +94,20 @@ class ESGraphAdapter final : public FTGraphAdapter {

bool indexCheck(const std::string& ret) const;

// check the result
bool clearCheck(const std::string& ret) const;

std::string createIndexCmd(const HttpClient& client,
const std::string& index,
const std::string& indexTemplate = "") const noexcept;

std::string dropIndexCmd(const HttpClient& client, const std::string& index) const noexcept;

// Encapsulates the clearIndex command.
// client: es client
// index: fulltext index name
std::string clearIndexCmd(const HttpClient& client, const std::string& index) const noexcept;

std::string indexExistsCmd(const HttpClient& client, const std::string& index) const noexcept;
};
} // namespace plugin
Expand Down
11 changes: 11 additions & 0 deletions src/common/plugin/fulltext/test/FulltextPluginTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ TEST(FulltextPluginTest, ESDropIndexTest) {
ASSERT_EQ(expected, ret);
}

TEST(FulltextPluginTest, ESClearIndexTest) {
HostAddr localHost_{"127.0.0.1", 9200};
HttpClient client(localHost_);
auto ret = ESGraphAdapter().clearIndexCmd(client, "test_index");
auto expected =
"/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\""
" -XPOST -k \"http://127.0.0.1:9200/test_index/_delete_by_query?refresh&slices=5\""
" -d '{\"query\": {\"match_all\":{}}}'";
ASSERT_EQ(expected, ret);
}

TEST(FulltextPluginTest, ESPutTest) {
HostAddr localHost_{"127.0.0.1", 9200};
HttpClient hc(localHost_);
Expand Down
3 changes: 3 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kDropSpace: {
return pool->add(new DropSpaceExecutor(node, qctx));
}
case PlanNode::Kind::kClearSpace: {
return pool->add(new ClearSpaceExecutor(node, qctx));
}
case PlanNode::Kind::kShowCreateSpace: {
return pool->add(new ShowCreateSpaceExecutor(node, qctx));
}
Expand Down
44 changes: 44 additions & 0 deletions src/graph/executor/admin/SpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,50 @@ void DropSpaceExecutor::unRegisterSpaceLevelMetrics(const std::string &spaceName
}
}

folly::Future<Status> ClearSpaceExecutor::execute() {
SCOPED_TIMER(&execTime_);

auto *csNode = asNode<ClearSpace>(node());

// prepare text search index.
std::vector<std::string> ftIndexes;
auto spaceIdRet = qctx()->getMetaClient()->getSpaceIdByNameFromCache(csNode->getSpaceName());
if (spaceIdRet.ok()) {
auto ftIndexesRet = qctx()->getMetaClient()->getFTIndexBySpaceFromCache(spaceIdRet.value());
NG_RETURN_IF_ERROR(ftIndexesRet);
auto map = std::move(ftIndexesRet).value();
auto get = [](const auto &ptr) { return ptr.first; };
std::transform(map.begin(), map.end(), std::back_inserter(ftIndexes), get);
} else {
LOG(WARNING) << "Get space ID failed when prepare text index: " << csNode->getSpaceName();
}

return qctx()
->getMetaClient()
->clearSpace(csNode->getSpaceName(), csNode->getIfExists())
.via(runner())
.thenValue([this, csNode, spaceIdRet, ftIndexes = std::move(ftIndexes)](StatusOr<bool> resp) {
if (!resp.ok()) {
LOG(ERROR) << "Clear space `" << csNode->getSpaceName() << "' failed: " << resp.status();
return resp.status();
}
if (!ftIndexes.empty()) {
auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient());
if (!tsRet.ok()) {
LOG(WARNING) << "Get text search clients failed";
return Status::OK();
}
for (const auto &ftindex : ftIndexes) {
auto ftRet = FTIndexUtils::clearTSIndex(std::move(tsRet).value(), ftindex);
if (!ftRet.ok()) {
LOG(WARNING) << "Clear fulltext index `" << ftindex << "' failed: " << ftRet.status();
}
}
}
return Status::OK();
});
}

folly::Future<Status> ShowSpacesExecutor::execute() {
SCOPED_TIMER(&execTime_);

Expand Down
8 changes: 8 additions & 0 deletions src/graph/executor/admin/SpaceExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ class DropSpaceExecutor final : public Executor {
folly::Future<Status> execute() override;
};

class ClearSpaceExecutor final : public Executor {
public:
ClearSpaceExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("ClearSpaceExecutor", node, qctx) {}

folly::Future<Status> execute() override;
};

class ShowSpacesExecutor final : public Executor {
public:
ShowSpacesExecutor(const PlanNode *node, QueryContext *qctx)
Expand Down
7 changes: 7 additions & 0 deletions src/graph/planner/plan/Admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ std::unique_ptr<PlanNodeDescription> DropSpace::explain() const {
return desc;
}

std::unique_ptr<PlanNodeDescription> ClearSpace::explain() const {
auto desc = SingleDependencyNode::explain();
addDescription("spaceName", spaceName_, desc.get());
addDescription("ifExists", util::toJson(ifExists_), desc.get());
return desc;
}

std::unique_ptr<PlanNodeDescription> DescSpace::explain() const {
auto desc = SingleDependencyNode::explain();
addDescription("spaceName", spaceName_, desc.get());
Expand Down
31 changes: 31 additions & 0 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,37 @@ class DropSpace final : public SingleDependencyNode {
bool ifExists_;
};

class ClearSpace final : public SingleDependencyNode {
public:
static ClearSpace* make(QueryContext* qctx,
PlanNode* input,
std::string spaceName,
bool ifExists) {
return qctx->objPool()->add(new ClearSpace(qctx, input, std::move(spaceName), ifExists));
}

std::unique_ptr<PlanNodeDescription> explain() const override;

const std::string& getSpaceName() const {
return spaceName_;
}

bool getIfExists() const {
return ifExists_;
}

private:
ClearSpace(QueryContext* qctx, PlanNode* input, std::string spaceName, bool ifExists)
: SingleDependencyNode(qctx, Kind::kClearSpace, input) {
spaceName_ = std::move(spaceName);
ifExists_ = ifExists;
}

private:
std::string spaceName_;
bool ifExists_;
};

class AlterSpace final : public SingleDependencyNode {
public:
static AlterSpace* make(QueryContext* qctx,
Expand Down
2 changes: 2 additions & 0 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "ShowCreateEdgeIndex";
case Kind::kDropSpace:
return "DropSpace";
case Kind::kClearSpace:
return "ClearSpace";
case Kind::kDropTag:
return "DropTag";
case Kind::kDropEdge:
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class PlanNode {
kShowCreateTag,
kShowCreateEdge,
kDropSpace,
kClearSpace,
kDropTag,
kDropEdge,
kAlterSpace,
Expand Down
5 changes: 3 additions & 2 deletions src/graph/service/PermissionCheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace graph {

/**
* Read space : kUse, kDescribeSpace
* Write space : kCreateSpace, kDropSpace, kCreateSnapshot, kDropSnapshot
* kBalance, kAdmin, kConfig, kIngest, kDownload
* Write space : kCreateSpace, kDropSpace, kClearSpace, kCreateSnapshot,
* kDropSnapshot, kBalance, kAdmin, kConfig, kIngest, kDownload
* Read schema : kDescribeTag, kDescribeEdge,
* kDescribeTagIndex, kDescribeEdgeIndex
* Write schema : kCreateTag, kAlterTag, kCreateEdge,
Expand Down Expand Up @@ -53,6 +53,7 @@ namespace graph {
case Sentence::Kind::kAlterSpace:
case Sentence::Kind::kCreateSpaceAs:
case Sentence::Kind::kDropSpace:
case Sentence::Kind::kClearSpace:
case Sentence::Kind::kCreateSnapshot:
case Sentence::Kind::kDropSnapshot:
case Sentence::Kind::kAddHosts:
Expand Down
14 changes: 14 additions & 0 deletions src/graph/util/FTIndexUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ StatusOr<bool> FTIndexUtils::dropTSIndex(const std::vector<nebula::plugin::HttpC
return Status::Error("drop fulltext index failed : %s", index.c_str());
}

StatusOr<bool> FTIndexUtils::clearTSIndex(const std::vector<nebula::plugin::HttpClient>& tsClients,
const std::string& index) {
auto retryCnt = FLAGS_ft_request_retry_times;
while (--retryCnt > 0) {
auto ret =
nebula::plugin::ESGraphAdapter::kAdapter->clearIndex(randomFTClient(tsClients), index);
if (!ret.ok()) {
continue;
}
return std::move(ret).value();
}
return Status::Error("clear fulltext index failed : %s", index.c_str());
}

StatusOr<Expression*> FTIndexUtils::rewriteTSFilter(
ObjectPool* pool,
bool isEdge,
Expand Down
4 changes: 4 additions & 0 deletions src/graph/util/FTIndexUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class FTIndexUtils final {
static StatusOr<bool> dropTSIndex(const std::vector<nebula::plugin::HttpClient>& tsClients,
const std::string& index);

// Clears the full-text index data, but keeps the index schema
static StatusOr<bool> clearTSIndex(const std::vector<nebula::plugin::HttpClient>& tsClients,
const std::string& index);

// Converts TextSearchExpression into a relational expresion that could be pushed down
static StatusOr<Expression*> rewriteTSFilter(
ObjectPool* pool,
Expand Down
Loading

0 comments on commit c9f046e

Please sign in to comment.