Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear space #3989

Merged
merged 17 commits into from
Mar 14, 2022
16 changes: 10 additions & 6 deletions .linters/cpp/checkKeyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
'KW_ADD',
'KW_CREATE',
'KW_DROP',
'KW_CLEAR',
'KW_REMOVE',
'KW_IF',
'KW_NOT',
Expand Down Expand Up @@ -133,25 +134,28 @@ def get_unreserved_keyword(file_path):
if flag == 1:
if line.strip() == ';':
break
unreserved_key_words.append(re.sub('\\s+[:|]\\s+(\\w+)\\s+.*', '\\1', line).strip())
unreserved_key_words.append(
re.sub('\\s+[:|]\\s+(\\w+)\\s+.*', '\\1', line).strip())
continue

parser_file.close()
return unreserved_key_words


if __name__ == '__main__':
cmd = 'git diff --diff-filter=ACMRTUXB HEAD -p ' + SCANNER_FILE_PATH + '|grep "^+"|grep -v "^+++"|grep "KW_"'
cmd = 'git diff --diff-filter=ACMRTUXB HEAD -p ' + \
SCANNER_FILE_PATH + '|grep "^+"|grep -v "^+++"|grep "KW_"'
content = os.popen(cmd)
keywords=[]
for line in content.readlines():
keyword = re.sub('.*(KW_\\w+)\s*;.*','\\1',line.strip())
keywords = []
for line in content.readlines():
keyword = re.sub('.*(KW_\\w+)\s*;.*', '\\1', line.strip())
keywords.append(keyword)

if len(keywords) == 0:
exit(0)
unreserved_key_words = get_unreserved_keyword(PARSER_FILE_PATH)
new_key_words = [word for word in keywords if word not in reserved_key_words]
new_key_words = [
word for word in keywords if word not in reserved_key_words]
if len(new_key_words) == 0:
exit(0)
result = [word for word in new_key_words if word not in unreserved_key_words]
Expand Down
16 changes: 16 additions & 0 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1239,6 +1239,22 @@ folly::Future<StatusOr<bool>> MetaClient::dropSpace(std::string name, const bool
return future;
}

folly::Future<StatusOr<bool>> MetaClient::clearSpace(std::string name, const bool ifExists) {
cpp2::ClearSpaceReq req;
req.space_name_ref() = std::move(name);
req.if_exists_ref() = ifExists;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_clearSpace(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

folly::Future<StatusOr<std::vector<cpp2::HostItem>>> MetaClient::listHosts(cpp2::ListHostType tp) {
cpp2::ListHostsReq req;
req.type_ref() = tp;
Expand Down
3 changes: 3 additions & 0 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ class MetaClient {

folly::Future<StatusOr<bool>> dropSpace(std::string name, bool ifExists = false);

// clear space data, but keep the space schema.
folly::Future<StatusOr<bool>> clearSpace(std::string name, bool ifExists = false);

folly::Future<StatusOr<std::vector<cpp2::HostItem>>> listHosts(
cpp2::ListHostType type = cpp2::ListHostType::ALLOC);

Expand Down
3 changes: 3 additions & 0 deletions src/common/plugin/fulltext/FTGraphAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class FTGraphAdapter {

virtual StatusOr<bool> dropIndex(const HttpClient& client, const std::string& index) const = 0;

// Clear the fulltext index data and keep the index schema.
virtual StatusOr<bool> clearIndex(const HttpClient& client, const std::string& index) const = 0;

virtual StatusOr<bool> indexExists(const HttpClient& client, const std::string& index) const = 0;
};
} // namespace plugin
Expand Down
41 changes: 41 additions & 0 deletions src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,28 @@ std::string ESGraphAdapter::dropIndexCmd(const HttpClient& client,
return os.str();
}

StatusOr<bool> ESGraphAdapter::clearIndex(const HttpClient& client,
const std::string& index) const {
// curl -H "Content-Type: application/json; charset=utf-8"
// -XPOST "http://127.0.0.1:9200/${index}/_delete_by_query?refresh&slices=5"
// -d '{"query": {"match_all":{}}}'
std::string cmd = clearIndexCmd(client, index);
auto ret = nebula::ProcessUtils::runCommand(cmd.c_str());
if (!ret.ok() || ret.value().empty()) {
LOG(ERROR) << "Http POST Failed: " << cmd;
return Status::Error("command failed : %s", cmd.c_str());
}
return clearCheck(ret.value());
}

std::string ESGraphAdapter::clearIndexCmd(const HttpClient& client,
const std::string& index) const noexcept {
std::stringstream os;
os << header() << XPOST << client.toString() << index << "/_delete_by_query?refresh&slices=5\""
<< " -d '{\"query\": {\"match_all\":{}}}'";
return os.str();
}

StatusOr<bool> ESGraphAdapter::indexExists(const HttpClient& client,
const std::string& index) const {
// curl -H "Content-Type: application/json; charset=utf-8"
Expand Down Expand Up @@ -283,5 +305,24 @@ bool ESGraphAdapter::indexCheck(const std::string& ret) const {
LOG(ERROR) << "error reason : " << ret;
return false;
}

bool ESGraphAdapter::clearCheck(const std::string& ret) const {
try {
auto root = folly::parseJson(ret);
if (root.isArray()) {
return false;
}
auto result = root.find("failures");
if (result != root.items().end() && result->second.isArray() && result->second.size() == 0) {
return true;
}
} catch (const std::exception& ex) {
LOG(ERROR) << "result error : " << ex.what();
}

LOG(ERROR) << "error reason : " << ret;
return false;
}

} // namespace plugin
} // namespace nebula
14 changes: 14 additions & 0 deletions src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ESGraphAdapter final : public FTGraphAdapter {
FRIEND_TEST(FulltextPluginTest, ESFuzzyTest);
FRIEND_TEST(FulltextPluginTest, ESCreateIndexTest);
FRIEND_TEST(FulltextPluginTest, ESDropIndexTest);
FRIEND_TEST(FulltextPluginTest, ESClearIndexTest);

public:
static std::unique_ptr<FTGraphAdapter> kAdapter;
Expand Down Expand Up @@ -53,6 +54,11 @@ class ESGraphAdapter final : public FTGraphAdapter {

StatusOr<bool> dropIndex(const HttpClient& client, const std::string& index) const override;

// Clear the fulltext index data on es and keep the index schema.
// client: es client
// index: fulltext index name
StatusOr<bool> clearIndex(const HttpClient& client, const std::string& index) const override;

StatusOr<bool> indexExists(const HttpClient& client, const std::string& index) const override;

private:
Expand Down Expand Up @@ -88,12 +94,20 @@ class ESGraphAdapter final : public FTGraphAdapter {

bool indexCheck(const std::string& ret) const;

// check the result
bool clearCheck(const std::string& ret) const;

std::string createIndexCmd(const HttpClient& client,
const std::string& index,
const std::string& indexTemplate = "") const noexcept;

std::string dropIndexCmd(const HttpClient& client, const std::string& index) const noexcept;

// Encapsulates the clearIndex command.
// client: es client
// index: fulltext index name
std::string clearIndexCmd(const HttpClient& client, const std::string& index) const noexcept;

std::string indexExistsCmd(const HttpClient& client, const std::string& index) const noexcept;
};
} // namespace plugin
Expand Down
11 changes: 11 additions & 0 deletions src/common/plugin/fulltext/test/FulltextPluginTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ TEST(FulltextPluginTest, ESDropIndexTest) {
ASSERT_EQ(expected, ret);
}

TEST(FulltextPluginTest, ESClearIndexTest) {
HostAddr localHost_{"127.0.0.1", 9200};
HttpClient client(localHost_);
auto ret = ESGraphAdapter().clearIndexCmd(client, "test_index");
auto expected =
"/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\""
" -XPOST -k \"http://127.0.0.1:9200/test_index/_delete_by_query?refresh&slices=5\""
" -d '{\"query\": {\"match_all\":{}}}'";
ASSERT_EQ(expected, ret);
}

TEST(FulltextPluginTest, ESPutTest) {
HostAddr localHost_{"127.0.0.1", 9200};
HttpClient hc(localHost_);
Expand Down
3 changes: 3 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kDropSpace: {
return pool->add(new DropSpaceExecutor(node, qctx));
}
case PlanNode::Kind::kClearSpace: {
return pool->add(new ClearSpaceExecutor(node, qctx));
}
case PlanNode::Kind::kShowCreateSpace: {
return pool->add(new ShowCreateSpaceExecutor(node, qctx));
}
Expand Down
44 changes: 44 additions & 0 deletions src/graph/executor/admin/SpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,50 @@ void DropSpaceExecutor::unRegisterSpaceLevelMetrics(const std::string &spaceName
}
}

folly::Future<Status> ClearSpaceExecutor::execute() {
SCOPED_TIMER(&execTime_);

auto *csNode = asNode<ClearSpace>(node());

// prepare text search index.
std::vector<std::string> ftIndexes;
auto spaceIdRet = qctx()->getMetaClient()->getSpaceIdByNameFromCache(csNode->getSpaceName());
if (spaceIdRet.ok()) {
auto ftIndexesRet = qctx()->getMetaClient()->getFTIndexBySpaceFromCache(spaceIdRet.value());
NG_RETURN_IF_ERROR(ftIndexesRet);
auto map = std::move(ftIndexesRet).value();
auto get = [](const auto &ptr) { return ptr.first; };
std::transform(map.begin(), map.end(), std::back_inserter(ftIndexes), get);
} else {
LOG(WARNING) << "Get space ID failed when prepare text index: " << csNode->getSpaceName();
}

return qctx()
->getMetaClient()
->clearSpace(csNode->getSpaceName(), csNode->getIfExists())
.via(runner())
.thenValue([this, csNode, spaceIdRet, ftIndexes = std::move(ftIndexes)](StatusOr<bool> resp) {
if (!resp.ok()) {
LOG(ERROR) << "Clear space `" << csNode->getSpaceName() << "' failed: " << resp.status();
return resp.status();
}
if (!ftIndexes.empty()) {
auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient());
if (!tsRet.ok()) {
LOG(WARNING) << "Get text search clients failed";
return Status::OK();
}
for (const auto &ftindex : ftIndexes) {
auto ftRet = FTIndexUtils::clearTSIndex(std::move(tsRet).value(), ftindex);
if (!ftRet.ok()) {
LOG(WARNING) << "Clear fulltext index `" << ftindex << "' failed: " << ftRet.status();
}
}
}
return Status::OK();
});
}

folly::Future<Status> ShowSpacesExecutor::execute() {
SCOPED_TIMER(&execTime_);

Expand Down
8 changes: 8 additions & 0 deletions src/graph/executor/admin/SpaceExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ class DropSpaceExecutor final : public Executor {
folly::Future<Status> execute() override;
};

class ClearSpaceExecutor final : public Executor {
public:
ClearSpaceExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("ClearSpaceExecutor", node, qctx) {}

folly::Future<Status> execute() override;
};

class ShowSpacesExecutor final : public Executor {
public:
ShowSpacesExecutor(const PlanNode *node, QueryContext *qctx)
Expand Down
7 changes: 7 additions & 0 deletions src/graph/planner/plan/Admin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ std::unique_ptr<PlanNodeDescription> DropSpace::explain() const {
return desc;
}

std::unique_ptr<PlanNodeDescription> ClearSpace::explain() const {
auto desc = SingleDependencyNode::explain();
addDescription("spaceName", spaceName_, desc.get());
addDescription("ifExists", util::toJson(ifExists_), desc.get());
return desc;
}

std::unique_ptr<PlanNodeDescription> DescSpace::explain() const {
auto desc = SingleDependencyNode::explain();
addDescription("spaceName", spaceName_, desc.get());
Expand Down
31 changes: 31 additions & 0 deletions src/graph/planner/plan/Admin.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,37 @@ class DropSpace final : public SingleDependencyNode {
bool ifExists_;
};

class ClearSpace final : public SingleDependencyNode {
public:
static ClearSpace* make(QueryContext* qctx,
PlanNode* input,
std::string spaceName,
bool ifExists) {
return qctx->objPool()->add(new ClearSpace(qctx, input, std::move(spaceName), ifExists));
}

std::unique_ptr<PlanNodeDescription> explain() const override;

const std::string& getSpaceName() const {
return spaceName_;
}

bool getIfExists() const {
return ifExists_;
}

private:
ClearSpace(QueryContext* qctx, PlanNode* input, std::string spaceName, bool ifExists)
: SingleDependencyNode(qctx, Kind::kClearSpace, input) {
spaceName_ = std::move(spaceName);
ifExists_ = ifExists;
}

private:
std::string spaceName_;
bool ifExists_;
};

class AlterSpace final : public SingleDependencyNode {
public:
static AlterSpace* make(QueryContext* qctx,
Expand Down
2 changes: 2 additions & 0 deletions src/graph/planner/plan/PlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ const char* PlanNode::toString(PlanNode::Kind kind) {
return "ShowCreateEdgeIndex";
case Kind::kDropSpace:
return "DropSpace";
case Kind::kClearSpace:
return "ClearSpace";
case Kind::kDropTag:
return "DropTag";
case Kind::kDropEdge:
Expand Down
1 change: 1 addition & 0 deletions src/graph/planner/plan/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class PlanNode {
kShowCreateTag,
kShowCreateEdge,
kDropSpace,
kClearSpace,
kDropTag,
kDropEdge,
kAlterSpace,
Expand Down
5 changes: 3 additions & 2 deletions src/graph/service/PermissionCheck.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ namespace graph {

/**
* Read space : kUse, kDescribeSpace
* Write space : kCreateSpace, kDropSpace, kCreateSnapshot, kDropSnapshot
* kBalance, kAdmin, kConfig, kIngest, kDownload
* Write space : kCreateSpace, kDropSpace, kClearSpace, kCreateSnapshot,
* kDropSnapshot, kBalance, kAdmin, kConfig, kIngest, kDownload
* Read schema : kDescribeTag, kDescribeEdge,
* kDescribeTagIndex, kDescribeEdgeIndex
* Write schema : kCreateTag, kAlterTag, kCreateEdge,
Expand Down Expand Up @@ -53,6 +53,7 @@ namespace graph {
case Sentence::Kind::kAlterSpace:
case Sentence::Kind::kCreateSpaceAs:
case Sentence::Kind::kDropSpace:
case Sentence::Kind::kClearSpace:
case Sentence::Kind::kCreateSnapshot:
case Sentence::Kind::kDropSnapshot:
case Sentence::Kind::kAddHosts:
Expand Down
14 changes: 14 additions & 0 deletions src/graph/util/FTIndexUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ StatusOr<bool> FTIndexUtils::dropTSIndex(const std::vector<nebula::plugin::HttpC
return Status::Error("drop fulltext index failed : %s", index.c_str());
}

StatusOr<bool> FTIndexUtils::clearTSIndex(const std::vector<nebula::plugin::HttpClient>& tsClients,
const std::string& index) {
auto retryCnt = FLAGS_ft_request_retry_times;
while (--retryCnt > 0) {
auto ret =
nebula::plugin::ESGraphAdapter::kAdapter->clearIndex(randomFTClient(tsClients), index);
if (!ret.ok()) {
continue;
}
return std::move(ret).value();
}
return Status::Error("clear fulltext index failed : %s", index.c_str());
}

StatusOr<Expression*> FTIndexUtils::rewriteTSFilter(
ObjectPool* pool,
bool isEdge,
Expand Down
4 changes: 4 additions & 0 deletions src/graph/util/FTIndexUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ class FTIndexUtils final {
static StatusOr<bool> dropTSIndex(const std::vector<nebula::plugin::HttpClient>& tsClients,
const std::string& index);

// Clears the full-text index data, but keeps the index schema
static StatusOr<bool> clearTSIndex(const std::vector<nebula::plugin::HttpClient>& tsClients,
const std::string& index);

// Converts TextSearchExpression into a relational expresion that could be pushed down
static StatusOr<Expression*> rewriteTSFilter(
ObjectPool* pool,
Expand Down
Loading