diff --git a/src/common/expression/TextSearchExpression.h b/src/common/expression/TextSearchExpression.h index 9c77f8667c9..0bd3b78477c 100644 --- a/src/common/expression/TextSearchExpression.h +++ b/src/common/expression/TextSearchExpression.h @@ -85,8 +85,8 @@ class TextSearchArgument final { std::string val_; std::string op_; int32_t fuzziness_{-2}; - int32_t limit_{100}; - int32_t timeout_{200}; + int32_t limit_{-1}; + int32_t timeout_{-1}; }; class TextSearchExpression : public Expression { diff --git a/src/common/expression/test/CMakeLists.txt b/src/common/expression/test/CMakeLists.txt index 15baabdec6e..b2884aecd67 100644 --- a/src/common/expression/test/CMakeLists.txt +++ b/src/common/expression/test/CMakeLists.txt @@ -33,7 +33,7 @@ set(expression_test_common_libs $ $ $ - $ + $ $ $ $ @@ -47,6 +47,7 @@ set(expression_test_common_libs $ $ $ + $ ) @@ -95,6 +96,7 @@ nebula_add_test( gtest ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + curl ) nebula_add_executable( @@ -110,6 +112,7 @@ nebula_add_executable( boost_regex ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + curl ) nebula_add_executable( diff --git a/src/common/http/HttpClient.cpp b/src/common/http/HttpClient.cpp index 5158e174627..1d3adafefe7 100644 --- a/src/common/http/HttpClient.cpp +++ b/src/common/http/HttpClient.cpp @@ -31,29 +31,61 @@ HttpResponse HttpClient::get(const std::string& url) { } HttpResponse HttpClient::get(const std::string& url, const std::vector& header) { - return sendRequest(url, "GET", header, ""); + return sendRequest(url, "GET", header, "", "", ""); +} + +HttpResponse HttpClient::get(const std::string& url, + const std::vector& header, + const std::string& username, + const std::string& password) { + return sendRequest(url, "GET", header, "", username, password); } HttpResponse HttpClient::post(const std::string& url, const std::vector& header, const std::string& body) { - return sendRequest(url, "POST", header, body); + return sendRequest(url, "POST", header, body, "", ""); +} + +HttpResponse HttpClient::post(const std::string& url, + const std::vector& header, + const std::string& body, + const std::string& username, + const std::string& password) { + return sendRequest(url, "POST", header, body, username, password); } HttpResponse HttpClient::delete_(const std::string& url, const std::vector& header) { - return sendRequest(url, "DELETE", header, ""); + return sendRequest(url, "DELETE", header, "", "", ""); +} + +HttpResponse HttpClient::delete_(const std::string& url, + const std::vector& header, + const std::string& username, + const std::string& password) { + return sendRequest(url, "DELETE", header, "", username, password); } HttpResponse HttpClient::put(const std::string& url, const std::vector& header, const std::string& body) { - return sendRequest(url, "PUT", header, body); + return sendRequest(url, "PUT", header, body, "", ""); +} + +HttpResponse HttpClient::put(const std::string& url, + const std::vector& header, + const std::string& body, + const std::string& username, + const std::string& password) { + return sendRequest(url, "PUT", header, body, username, password); } HttpResponse HttpClient::sendRequest(const std::string& url, const std::string& method, const std::vector& header, - const std::string& body) { + const std::string& body, + const std::string& username, + const std::string& password) { CurlHandle::instance(); HttpResponse resp; CURL* curl = curl_easy_init(); @@ -67,6 +99,7 @@ HttpResponse HttpClient::sendRequest(const std::string& url, setRespHeader(curl, resp.header); setRespBody(curl, resp.body); setTimeout(curl); + setAuth(curl, username, password); resp.curlCode = curl_easy_perform(curl); if (resp.curlCode != 0) { resp.curlMessage = std::string(curl_easy_strerror(resp.curlCode)); @@ -112,6 +145,15 @@ void HttpClient::setTimeout(CURL* curl) { curl_easy_setopt(curl, CURLOPT_TIMEOUT, 3); } +void HttpClient::setAuth(CURL* curl, const std::string& username, const std::string& password) { + if (!username.empty()) { + curl_easy_setopt(curl, CURLOPT_USERNAME, username.c_str()); + if (!password.empty()) { + curl_easy_setopt(curl, CURLOPT_PASSWORD, password.c_str()); + } + } +} + size_t HttpClient::onWriteData(void* ptr, size_t size, size_t nmemb, void* stream) { if (ptr == nullptr || size == 0) { return 0; diff --git a/src/common/http/HttpClient.h b/src/common/http/HttpClient.h index 2e2960c892f..9c041fcc012 100644 --- a/src/common/http/HttpClient.h +++ b/src/common/http/HttpClient.h @@ -33,18 +33,45 @@ struct HttpResponse { class HttpClient { public: static HttpClient& instance(); + virtual ~HttpClient() = default; + virtual HttpResponse get(const std::string& url); + virtual HttpResponse get(const std::string& url, const std::vector& headers); + virtual HttpResponse get(const std::string& url, + const std::vector& headers, + const std::string& username, + const std::string& password); + virtual HttpResponse post(const std::string& url, const std::vector& headers, const std::string& body); + + virtual HttpResponse post(const std::string& url, + const std::vector& headers, + const std::string& body, + const std::string& username, + const std::string& password); + virtual HttpResponse delete_(const std::string& url, const std::vector& headers); + + virtual HttpResponse delete_(const std::string& url, + const std::vector& headers, + const std::string& username, + const std::string& password); + virtual HttpResponse put(const std::string& url, const std::vector& headers, const std::string& body); + virtual HttpResponse put(const std::string& url, + const std::vector& headers, + const std::string& body, + const std::string& username, + const std::string& password); + protected: HttpClient() = default; @@ -52,7 +79,9 @@ class HttpClient { static HttpResponse sendRequest(const std::string& url, const std::string& method, const std::vector& headers, - const std::string& body); + const std::string& body, + const std::string& username, + const std::string& password); static void setUrl(CURL* curl, const std::string& url); static void setMethod(CURL* curl, const std::string& method); static curl_slist* setHeaders(CURL* curl, const std::vector& headers); @@ -60,6 +89,7 @@ class HttpClient { static void setRespBody(CURL* curl, const std::string& body); static void setRespHeader(CURL* curl, const std::string& header); static void setTimeout(CURL* curl); + static void setAuth(CURL* curl, const std::string& username, const std::string& password); static size_t onWriteData(void* ptr, size_t size, size_t nmemb, void* stream); }; diff --git a/src/common/plugin/fulltext/CMakeLists.txt b/src/common/plugin/fulltext/CMakeLists.txt index b5134beefd2..813a7fee234 100644 --- a/src/common/plugin/fulltext/CMakeLists.txt +++ b/src/common/plugin/fulltext/CMakeLists.txt @@ -2,16 +2,6 @@ # # This source code is licensed under Apache 2.0 License. -nebula_add_library( - ft_es_graph_adapter_obj OBJECT - elasticsearch/ESGraphAdapter.cpp -) - -nebula_add_library( - ft_es_storage_adapter_obj OBJECT - elasticsearch/ESStorageAdapter.cpp -) - nebula_add_library( es_adapter_obj OBJECT elasticsearch/ESAdapter.cpp diff --git a/src/common/plugin/fulltext/FTGraphAdapter.h b/src/common/plugin/fulltext/FTGraphAdapter.h deleted file mode 100644 index cf89ad82d08..00000000000 --- a/src/common/plugin/fulltext/FTGraphAdapter.h +++ /dev/null @@ -1,58 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef COMMON_PLUGIN_FULLTEXT_G_ADAPTER_H_ -#define COMMON_PLUGIN_FULLTEXT_G_ADAPTER_H_ - -#include "common/base/Base.h" -#include "common/base/StatusOr.h" -#include "common/plugin/fulltext/FTUtils.h" - -namespace nebula { -namespace plugin { - -class FTGraphAdapter { - public: - FTGraphAdapter() = default; - - virtual ~FTGraphAdapter() = default; - - virtual StatusOr prefix(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - std::vector& rows) const = 0; - - virtual StatusOr wildcard(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - std::vector& rows) const = 0; - - virtual StatusOr regexp(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - std::vector& rows) const = 0; - - virtual StatusOr fuzzy(const HttpClient& client, - const DocItem& index, - const LimitItem& limit, - const folly::dynamic& fuzziness, - const std::string& op, - std::vector& rows) const = 0; - - virtual StatusOr createIndex(const HttpClient& client, - const std::string& index, - const std::string& indexTemplate = "") const = 0; - - virtual StatusOr dropIndex(const HttpClient& client, const std::string& index) const = 0; - - // Clear the fulltext index data and keep the index schema. - virtual StatusOr clearIndex(const HttpClient& client, const std::string& index) const = 0; - - virtual StatusOr indexExists(const HttpClient& client, const std::string& index) const = 0; -}; -} // namespace plugin -} // namespace nebula - -#endif // COMMON_PLUGIN_FULLTEXT_G_ADAPTER_H_ diff --git a/src/common/plugin/fulltext/FTStorageAdapter.h b/src/common/plugin/fulltext/FTStorageAdapter.h deleted file mode 100644 index 2c046988f8f..00000000000 --- a/src/common/plugin/fulltext/FTStorageAdapter.h +++ /dev/null @@ -1,32 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef COMMON_PLUGIN_FULLTEXT_S_ADAPTER_H_ -#define COMMON_PLUGIN_FULLTEXT_S_ADAPTER_H_ - -#include "common/base/Base.h" -#include "common/base/StatusOr.h" -#include "common/plugin/fulltext/FTUtils.h" - -namespace nebula { -namespace plugin { - -class FTStorageAdapter { - public: - virtual ~FTStorageAdapter() = default; - - virtual StatusOr put(const HttpClient& client, const DocItem& item) const = 0; - - virtual StatusOr bulk(const HttpClient& client, - const std::vector& items) const = 0; - - protected: - FTStorageAdapter() = default; -}; - -} // namespace plugin -} // namespace nebula - -#endif // COMMON_PLUGIN_FULLTEXT_S_ADAPTER_H_ diff --git a/src/common/plugin/fulltext/FTUtils.h b/src/common/plugin/fulltext/FTUtils.h deleted file mode 100644 index 72962be5569..00000000000 --- a/src/common/plugin/fulltext/FTUtils.h +++ /dev/null @@ -1,173 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef COMMON_PLUGIN_FULLTEXT_UTILS_H_ -#define COMMON_PLUGIN_FULLTEXT_UTILS_H_ - -#include - -#include -#include - -#include "common/base/Base.h" -#include "common/base/CommonMacro.h" -#include "common/datatypes/HostAddr.h" - -#define CURL_COMMAND "/usr/bin/curl" -#define XPUT " -XPUT" -#define XPOST " -XPOST" -#define XGET " -XGET" -#define XDELETE " -XDELETE" -#define CURL_CONTENT_JSON " -H \"Content-Type: application/json; charset=utf-8\"" -#define CURL_CONTENT_NDJSON " -H \"Content-Type: application/x-ndjson; charset=utf-8\"" -#define ESCAPE_SINGLE_QUOTE "\'\\\'\'" - -namespace nebula { -namespace plugin { - -enum class FT_SEARCH_OP { - kPrefix = 0, - kWildcard = 1, - kRegexp = 2, - kFuzzy = 3, -}; - -struct HttpClient { - HostAddr host; - std::string user; - std::string password; - std::string connType{"http"}; - - HttpClient() = default; - ~HttpClient() = default; - - explicit HttpClient(HttpClient&& v) noexcept - : host(std::move(v.host)), - user(std::move(v.user)), - password(std::move(v.password)), - connType(std::move(v.connType)) {} - - explicit HttpClient(const HttpClient& v) noexcept - : host(v.host), user(v.user), password(v.password), connType(v.connType) {} - - explicit HttpClient(HostAddr&& h) noexcept : host(std::move(h)) {} - - explicit HttpClient(const HostAddr& h) noexcept : host(h) {} - - HttpClient(HostAddr&& h, std::string&& u, std::string&& p) noexcept - : host(std::move(h)), user(std::move(u)), password(std::move(p)) {} - - HttpClient(const HostAddr& h, const std::string& u, const std::string& p) noexcept - : host(h), user(u), password(p) {} - - HttpClient(HostAddr&& h, std::string&& u, std::string&& p, std::string&& c) noexcept - : host(std::move(h)), user(std::move(u)), password(std::move(p)), connType(std::move(c)) {} - - HttpClient(const HostAddr& h, - const std::string& u, - const std::string& p, - const std::string& c) noexcept - : host(h), user(u), password(p), connType(std::move(c)) {} - - void clear() { - host.clear(); - user.clear(); - password.clear(); - connType.clear(); - } - - std::string toString() const { - std::stringstream os; - if (!user.empty()) { - os << " -u " << user; - if (!password.empty()) { - os << ":" << password; - } - } - os << " -k \"" << connType << "://" << host.host << ":" << host.port << "/"; - return os.str(); - } -}; - -struct DocItem { - std::string index; - std::string column; - int32_t part; - std::string val; - - ~DocItem() = default; - - explicit DocItem(DocItem&& v) noexcept - : index(std::move(v.index)), - column(std::move(v.column)), - part(std::move(v.part)), - val(std::move(v.val)) {} - - explicit DocItem(const DocItem& v) noexcept - : index(v.index), column(v.column), part(v.part), val(v.val) {} - - DocItem(std::string&& idx, std::string&& col, std::string&& v) - : index(std::move(idx)), column(std::move(col)), val(std::move(v)) {} - - DocItem(const std::string& idx, const std::string& col, const std::string& v) - : index(idx), column(col), val(v) {} - - DocItem(std::string&& idx, std::string&& col, int32_t&& p, std::string&& v) - : index(std::move(idx)), column(std::move(col)), part(std::move(p)), val(std::move(v)) {} - - DocItem(const std::string& idx, const std::string& col, const int32_t& p, const std::string& v) - : index(idx), column(col), part(p), val(v) {} -}; - -struct LimitItem { - int32_t timeout_; - int32_t maxRows_; - - ~LimitItem() = default; - - LimitItem(int32_t timeout, int32_t maxRows) : timeout_(timeout), maxRows_(maxRows) {} -}; - -struct DocIDTraits { - static std::string id(int32_t id) { - // int32_t max value is 2147483647, It takes 10 bytes to turn into a string - // for PartitionID ,TagID and EdgeType - std::stringstream ss; - ss << std::setw(10) << std::setfill('0') << id; - return ss.str(); - } - - static std::string column(const std::string& col) { - // normalized column name is 32 bytes - return proxygen::md5Encode(folly::StringPiece(col)); - } - - static std::string val(const std::string& v) { - return v; - } - - static std::string normalizedJson(const std::string& v) { - return boost::replace_all_copy(v, "'", ESCAPE_SINGLE_QUOTE); - } - - static std::string docId(const DocItem& item) { - // partId_column_value, - // The value length limit is 255 bytes - // docId structure : partId(10bytes) + schemaId(10Bytes) + - // columnName(32bytes) + encoded_val(max 344bytes) - // the max length of docId is 512 bytes, still have about 100 bytes reserved - auto encoded = proxygen::base64Encode(folly::StringPiece( - (item.val.size() > MAX_INDEX_TYPE_LENGTH) ? item.val.substr(0, MAX_INDEX_TYPE_LENGTH) - : item.val)); - std::replace(encoded.begin(), encoded.end(), '/', '_'); - std::stringstream ss; - ss << id(item.part) << column(item.column) << encoded; - return ss.str(); - } -}; -} // namespace plugin -} // namespace nebula - -#endif // COMMON_PLUGIN_FULLTEXT_UTILS_H_ diff --git a/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp index cec0819a8f9..ed8de86e468 100644 --- a/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp +++ b/src/common/plugin/fulltext/elasticsearch/ESAdapter.cpp @@ -180,36 +180,63 @@ Status ESAdapter::bulk(const ESBulk& bulk, bool refresh) { return Status::Error(folly::toJson(resp)); } -StatusOr ESAdapter::prefix(const std::string& index, const std::string& pattern) { +StatusOr ESAdapter::prefix(const std::string& index, + const std::string& pattern, + int64_t size, + int64_t timeout) { folly::dynamic body = folly::dynamic::object(); body["query"] = folly::dynamic::object(); body["query"]["prefix"] = folly::dynamic::object(); body["query"]["prefix"]["text"] = pattern; - return ESAdapter::query(index, body); + if (size > 0) { + body["size"] = size; + body["from"] = 0; + } + return ESAdapter::query(index, body, timeout); } -StatusOr ESAdapter::wildcard(const std::string& index, const std::string& pattern) { +StatusOr ESAdapter::wildcard(const std::string& index, + const std::string& pattern, + int64_t size, + int64_t timeout) { folly::dynamic body = folly::dynamic::object(); body["query"] = folly::dynamic::object(); body["query"]["wildcard"] = folly::dynamic::object("text", pattern); - return ESAdapter::query(index, body); + if (size > 0) { + body["size"] = size; + body["from"] = 0; + } + return ESAdapter::query(index, body, timeout); } -StatusOr ESAdapter::regexp(const std::string& index, const std::string& pattern) { +StatusOr ESAdapter::regexp(const std::string& index, + const std::string& pattern, + int64_t size, + int64_t timeout) { folly::dynamic body = folly::dynamic::object(); body["query"] = folly::dynamic::object(); body["query"]["regexp"] = folly::dynamic::object("text", pattern); - return ESAdapter::query(index, body); + if (size > 0) { + body["size"] = size; + body["from"] = 0; + } + return ESAdapter::query(index, body, timeout); } StatusOr ESAdapter::fuzzy(const std::string& index, const std::string& pattern, - const std::string& fuzziness) { + const std::string& fuzziness, + int64_t size, + int64_t timeout) { folly::dynamic body = folly::dynamic::object(); body["query"] = folly::dynamic::object(); body["query"]["fuzzy"] = folly::dynamic::object(); body["query"]["fuzzy"]["text"] = folly::dynamic::object("value", pattern)("fuzziness", fuzziness); - return ESAdapter::query(index, body); + if (size > 0) { + body["size"] = size; + body["from"] = 0; + } + return ESAdapter::query(index, body, timeout); } // StatusOr ESAdapter::term(const std::string& index,const std::vector& @@ -221,11 +248,13 @@ StatusOr ESAdapter::match_all(const std::string& index) { folly::dynamic body = folly::dynamic::object(); body["query"] = folly::dynamic::object(); body["query"]["match_all"] = folly::dynamic::object(); - return ESAdapter::query(index, body); + return ESAdapter::query(index, body, -1); } -StatusOr ESAdapter::query(const std::string& index, const folly::dynamic& query) { - auto result = randomClient().search(index, query); +StatusOr ESAdapter::query(const std::string& index, + const folly::dynamic& query, + int64_t timeout) { + auto result = randomClient().search(index, query, timeout); if (!result.ok()) { return std::move(result).status(); } diff --git a/src/common/plugin/fulltext/elasticsearch/ESAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESAdapter.h index 1dcd97c605d..5cbd4281312 100644 --- a/src/common/plugin/fulltext/elasticsearch/ESAdapter.h +++ b/src/common/plugin/fulltext/elasticsearch/ESAdapter.h @@ -60,13 +60,14 @@ class ESAdapter { public: explicit ESAdapter(std::vector&& clients); ESAdapter() = default; - void setClients(std::vector&& clients); - Status createIndex(const std::string& name); - Status dropIndex(const std::string& name); - Status clearIndex(const std::string& name, bool refresh = false); - StatusOr isIndexExist(const std::string& name); + virtual ~ESAdapter() = default; + virtual void setClients(std::vector&& clients); + virtual Status createIndex(const std::string& name); + virtual Status dropIndex(const std::string& name); + virtual Status clearIndex(const std::string& name, bool refresh = false); + virtual StatusOr isIndexExist(const std::string& name); - Status bulk(const ESBulk& bulk, bool refresh = false); + virtual Status bulk(const ESBulk& bulk, bool refresh = false); /** * @brief @@ -88,7 +89,10 @@ class ESAdapter { * @param pattern * @return StatusOr */ - StatusOr prefix(const std::string& index, const std::string& pattern); + virtual StatusOr prefix(const std::string& index, + const std::string& pattern, + int64_t size = -1, + int64_t timeout = -1); /** * @brief @@ -111,9 +115,11 @@ class ESAdapter { * @param pattern * @return StatusOr */ - StatusOr fuzzy(const std::string& index, - const std::string& pattern, - const std::string& fuzziness); + virtual StatusOr fuzzy(const std::string& index, + const std::string& pattern, + const std::string& fuzziness, + int64_t size = -1, + int64_t timeout = -1); /** * @brief @@ -135,7 +141,10 @@ class ESAdapter { * @param pattern * @return StatusOr */ - StatusOr regexp(const std::string& index, const std::string& pattern); + virtual StatusOr regexp(const std::string& index, + const std::string& pattern, + int64_t size = -1, + int64_t timeout = -1); /** * @brief @@ -157,7 +166,10 @@ class ESAdapter { * @param pattern * @return StatusOr */ - StatusOr wildcard(const std::string& index, const std::string& pattern); + virtual StatusOr wildcard(const std::string& index, + const std::string& pattern, + int64_t size = -1, + int64_t timeout = -1); // /** // * @brief @@ -188,10 +200,12 @@ class ESAdapter { // */ // StatusOr term(const std::string& index, const std::vector& words); - StatusOr match_all(const std::string& index); - StatusOr query(const std::string& index, const folly::dynamic& query); + virtual StatusOr match_all(const std::string& index); + StatusOr query(const std::string& index, + const folly::dynamic& query, + int64_t timeout); - private: + protected: static std::string genDocID(const std::string& vid, const std::string& src, const std::string& dst, diff --git a/src/common/plugin/fulltext/elasticsearch/ESClient.cpp b/src/common/plugin/fulltext/elasticsearch/ESClient.cpp index 3369e44cd6e..dd7d2f126cf 100644 --- a/src/common/plugin/fulltext/elasticsearch/ESClient.cpp +++ b/src/common/plugin/fulltext/elasticsearch/ESClient.cpp @@ -16,7 +16,7 @@ ESClient::ESClient(HttpClient& httpClient, : httpClient_(httpClient), protocol_(protocol), address_(address), - user_(user), + username_(user), password_(password) { // TODO(hs.zhang): enable protocol // TODO(hs.zhang): enable user&password @@ -57,8 +57,9 @@ ESClient::ESClient(HttpClient& httpClient, */ StatusOr ESClient::createIndex(const std::string& name, const folly::dynamic& body) { - std::string url = fmt::format("http://{}/{}", address_, name); - auto resp = httpClient_.put(url, {"Content-Type: application/json"}, folly::toJson(body)); + std::string url = fmt::format("{}://{}/{}", protocol_, address_, name); + auto resp = httpClient_.put( + url, {"Content-Type: application/json"}, folly::toJson(body), username_, password_); if (resp.curlCode != 0) { return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); } @@ -67,8 +68,8 @@ StatusOr ESClient::createIndex(const std::string& name, } StatusOr ESClient::dropIndex(const std::string& name) { - std::string url = fmt::format("http://{}/{}", address_, name); - auto resp = httpClient_.delete_(url, {"Content-Type: application/json"}); + std::string url = fmt::format("{}://{}/{}", protocol_, address_, name); + auto resp = httpClient_.delete_(url, {"Content-Type: application/json"}, username_, password_); if (resp.curlCode != 0) { return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); } @@ -76,8 +77,8 @@ StatusOr ESClient::dropIndex(const std::string& name) { } StatusOr ESClient::getIndex(const std::string& name) { - std::string url = fmt::format("http://{}/{}", address_, name); - auto resp = httpClient_.get(url, {"Content-Type: application/json"}); + std::string url = fmt::format("{}://{}/{}", protocol_, address_, name); + auto resp = httpClient_.get(url, {"Content-Type: application/json"}, username_, password_); if (resp.curlCode != 0) { return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); } @@ -87,18 +88,28 @@ StatusOr ESClient::getIndex(const std::string& name) { StatusOr ESClient::deleteByQuery(const std::string& index, const folly::dynamic& query, bool refresh) { - std::string url = fmt::format( - "http://{}/{}/_delete_by_query?refresh={}", address_, index, refresh ? "true" : "false"); - auto resp = httpClient_.post(url, {"Content-Type: application/json"}, folly::toJson(query)); + std::string url = fmt::format("{}://{}/{}/_delete_by_query?refresh={}", + protocol_, + address_, + index, + refresh ? "true" : "false"); + auto resp = httpClient_.post( + url, {"Content-Type: application/json"}, folly::toJson(query), username_, password_); if (resp.curlCode != 0) { return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); } return folly::parseJson(resp.body); } -StatusOr ESClient::search(const std::string& index, const folly::dynamic& query) { - std::string url = fmt::format("http://{}/{}/_search", address_, index); - auto resp = httpClient_.post(url, {"Content-Type: application/json"}, folly::toJson(query)); +StatusOr ESClient::search(const std::string& index, + const folly::dynamic& query, + int64_t timeout) { + std::string url = fmt::format("{}://{}/{}/_search", protocol_, address_, index); + if (timeout > 0) { + url += fmt::format("?timeout={}ms", timeout); + } + auto resp = httpClient_.post( + url, {"Content-Type: application/json"}, folly::toJson(query), username_, password_); if (resp.curlCode != 0) { return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); } @@ -106,13 +117,15 @@ StatusOr ESClient::search(const std::string& index, const folly: } StatusOr ESClient::bulk(const std::vector& bulk, bool refresh) { - std::string url = fmt::format("http://{}/_bulk?refresh={}", address_, refresh ? "true" : "false"); + std::string url = + fmt::format("{}://{}/_bulk?refresh={}", protocol_, address_, refresh ? "true" : "false"); std::string body; for (auto& obj : bulk) { body += folly::toJson(obj); body += "\n"; } - auto resp = httpClient_.post(url, {"Content-Type: application/x-ndjson"}, body); + auto resp = + httpClient_.post(url, {"Content-Type: application/x-ndjson"}, body, username_, password_); if (resp.curlCode != 0) { return Status::Error(fmt::format("curl error({}):{}", resp.curlCode, resp.curlMessage)); } diff --git a/src/common/plugin/fulltext/elasticsearch/ESClient.h b/src/common/plugin/fulltext/elasticsearch/ESClient.h index 6822ecd7596..d81e3331f0e 100644 --- a/src/common/plugin/fulltext/elasticsearch/ESClient.h +++ b/src/common/plugin/fulltext/elasticsearch/ESClient.h @@ -30,14 +30,16 @@ class ESClient { StatusOr deleteByQuery(const std::string& index, const folly::dynamic& query, bool refresh = false); - StatusOr search(const std::string& index, const folly::dynamic& query); + StatusOr search(const std::string& index, + const folly::dynamic& query, + int64_t timeout); StatusOr bulk(const std::vector& bulk, bool refresh = false); private: HttpClient& httpClient_; std::string protocol_; std::string address_; - std::string user_; + std::string username_; std::string password_; StatusOr sendHttpRequest(const std::string& url, diff --git a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp deleted file mode 100644 index d3395cf6c84..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.cpp +++ /dev/null @@ -1,328 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" - -#include "common/process/ProcessUtils.h" - -namespace nebula { -namespace plugin { - -std::unique_ptr ESGraphAdapter::kAdapter = - std::unique_ptr(new ESGraphAdapter()); - -StatusOr ESGraphAdapter::prefix(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const { - std::string cmd = header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kPrefix); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -StatusOr ESGraphAdapter::wildcard(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const { - std::string cmd = - header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kWildcard); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -StatusOr ESGraphAdapter::regexp(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const { - std::string cmd = header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kRegexp); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -StatusOr ESGraphAdapter::fuzzy(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - const folly::dynamic& fuzziness, - const std::string& op, - std::vector& rows) const { - std::string cmd = - header(client, item, limit) + body(item, limit.maxRows_, FT_SEARCH_OP::kFuzzy, fuzziness, op); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return result(ret.value(), rows); -} - -std::string ESGraphAdapter::header() const noexcept { - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_JSON; - return os.str(); -} - -std::string ESGraphAdapter::header(const HttpClient& client, - const DocItem& item, - const LimitItem& limit) const noexcept { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XGET http://127.0.0.1:9200/my_temp_index_3/_search?timeout=10ms - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_JSON << XGET; - os << client.toString() << item.index << "/_search?timeout=" << limit.timeout_ << "ms" - << "\""; - return os.str(); -} - -folly::dynamic ESGraphAdapter::columnBody(const std::string& col) const noexcept { - // "term": {"column_id": "col1"} - folly::dynamic itemColumn = folly::dynamic::object("column_id", DocIDTraits::column(col)); - return folly::dynamic::object("term", itemColumn); -} - -std::string ESGraphAdapter::body(const DocItem& item, - int32_t maxRows, - FT_SEARCH_OP type, - const folly::dynamic& fuzziness, - const std::string& op) const noexcept { - folly::dynamic obj; - switch (type) { - case FT_SEARCH_OP::kPrefix: { - obj = prefixBody(item.val); - break; - } - case FT_SEARCH_OP::kWildcard: { - obj = wildcardBody(item.val); - break; - } - case FT_SEARCH_OP::kRegexp: { - obj = regexpBody(item.val); - break; - } - case FT_SEARCH_OP::kFuzzy: { - obj = fuzzyBody(item.val, fuzziness, op); - } - } - auto itemArray = folly::dynamic::array(columnBody(item.column), obj); - folly::dynamic itemMust = folly::dynamic::object("must", itemArray); - folly::dynamic itemBool = folly::dynamic::object("bool", itemMust); - folly::dynamic itemQuery = - folly::dynamic::object("query", itemBool)("_source", "value")("size", maxRows)("from", 0); - std::stringstream os; - os << " -d'" << DocIDTraits::normalizedJson(folly::toJson(itemQuery)) << "'"; - return os.str(); -} - -folly::dynamic ESGraphAdapter::prefixBody(const std::string& prefix) const noexcept { - // {"prefix": {"value": "a"}} - folly::dynamic itemValue = folly::dynamic::object("value", prefix); - return folly::dynamic::object("prefix", itemValue); -} - -folly::dynamic ESGraphAdapter::wildcardBody(const std::string& wildcard) const noexcept { - // {"wildcard": {"value": "*a"}} - folly::dynamic itemValue = folly::dynamic::object("value", wildcard); - return folly::dynamic::object("wildcard", itemValue); -} - -folly::dynamic ESGraphAdapter::regexpBody(const std::string& regexp) const noexcept { - // {"regexp": {"value": "c+"}} - folly::dynamic itemValue = folly::dynamic::object("value", regexp); - return folly::dynamic::object("regexp", itemValue); -} - -folly::dynamic ESGraphAdapter::fuzzyBody(const std::string& regexp, - const folly::dynamic& fuzziness, - const std::string& op) const noexcept { - // {"match": {"value": {"query":"ccc aaa","fuzziness": "AUTO","operator": - // "and"}}} - folly::dynamic items = - folly::dynamic::object("query", regexp)("fuzziness", fuzziness)("operator", op); - folly::dynamic value = folly::dynamic::object("value", items); - return folly::dynamic::object("match", value); -} - -StatusOr ESGraphAdapter::createIndex(const HttpClient& client, - const std::string& index, - const std::string&) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XPUT "http://127.0.0.1:9200/index_exist" - std::string cmd = createIndexCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http PUT Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return statusCheck(ret.value()); -} - -std::string ESGraphAdapter::createIndexCmd(const HttpClient& client, - const std::string& index, - const std::string&) const noexcept { - std::stringstream os; - os << header() << XPUT << client.toString() << index << "\""; - return os.str(); -} - -StatusOr ESGraphAdapter::dropIndex(const HttpClient& client, const std::string& index) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XDELETE "http://127.0.0.1:9200/index_exist" - std::string cmd = dropIndexCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http DELETE Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return statusCheck(ret.value()); -} - -std::string ESGraphAdapter::dropIndexCmd(const HttpClient& client, - const std::string& index) const noexcept { - std::stringstream os; - os << header() << XDELETE << client.toString() << index << "\""; - return os.str(); -} - -StatusOr ESGraphAdapter::clearIndex(const HttpClient& client, - const std::string& index) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XPOST "http://127.0.0.1:9200/${index}/_delete_by_query?refresh&slices=5" - // -d '{"query": {"match_all":{}}}' - std::string cmd = clearIndexCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http POST Failed: " << cmd; - return Status::Error("command failed : %s", cmd.c_str()); - } - return clearCheck(ret.value()); -} - -std::string ESGraphAdapter::clearIndexCmd(const HttpClient& client, - const std::string& index) const noexcept { - std::stringstream os; - os << header() << XPOST << client.toString() << index << "/_delete_by_query?refresh&slices=5\"" - << " -d '{\"query\": {\"match_all\":{}}}'"; - return os.str(); -} - -StatusOr ESGraphAdapter::indexExists(const HttpClient& client, - const std::string& index) const { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XGET "http://127.0.0.1:9200/_cat/indices/index_exist?format=json" - std::string cmd = indexExistsCmd(client, index); - auto ret = nebula::ProcessUtils::runCommand(cmd.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http GET Failed: " << cmd; - return Status::Error("Http GET Failed: : %s", cmd.c_str()); - } - return indexCheck(ret.value()); -} - -std::string ESGraphAdapter::indexExistsCmd(const HttpClient& client, - const std::string& index) const noexcept { - std::stringstream os; - os << header() << XGET << client.toString() << "_cat/indices/" << index << "?format=json" - << "\""; - return os.str(); -} - -bool ESGraphAdapter::result(const std::string& ret, std::vector& rows) const { - try { - auto root = folly::parseJson(ret); - auto rootHits = root.find("hits"); - if (rootHits != root.items().end()) { - auto subHits = rootHits->second.find("hits"); - if (subHits != rootHits->second.items().end()) { - for (auto& item : subHits->second) { - auto s = item.find("_source"); - if (s != item.items().end()) { - auto v = s->second.find("value"); - if (v != s->second.items().end()) { - rows.emplace_back(v->second.getString()); - } else { - continue; - } - } else { - continue; - } - } - } - return true; - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - LOG(ERROR) << "error reason : " << ret; - return false; -} - -bool ESGraphAdapter::statusCheck(const std::string& ret) const { - try { - auto root = folly::parseJson(ret); - if (root.isArray()) { - return false; - } - auto result = root.find("acknowledged"); - if (result != root.items().end() && result->second.isBool() && result->second.getBool()) { - return true; - } - } catch (const std::exception& ex) { - LOG(ERROR) << "result error : " << ex.what(); - } - - LOG(ERROR) << "error reason : " << ret; - return false; -} - -bool ESGraphAdapter::indexCheck(const std::string& ret) const { - try { - auto root = folly::parseJson(ret); - if (!root.isArray()) { - return false; - } - for (auto& entry : root) { - auto exists = entry.find("index"); - if (exists != entry.items().end()) { - return true; - } - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - LOG(ERROR) << "error reason : " << ret; - return false; -} - -bool ESGraphAdapter::clearCheck(const std::string& ret) const { - try { - auto root = folly::parseJson(ret); - if (root.isArray()) { - return false; - } - auto result = root.find("failures"); - if (result != root.items().end() && result->second.isArray() && result->second.size() == 0) { - return true; - } - } catch (const std::exception& ex) { - LOG(ERROR) << "result error : " << ex.what(); - } - - LOG(ERROR) << "error reason : " << ret; - return false; -} - -} // namespace plugin -} // namespace nebula diff --git a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h deleted file mode 100644 index beac919d59e..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESGraphAdapter.h +++ /dev/null @@ -1,116 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef NEBULA_PLUGIN_ESGRAPHADAPTER_H -#define NEBULA_PLUGIN_ESGRAPHADAPTER_H - -#include - -#include "common/plugin/fulltext/FTGraphAdapter.h" - -namespace nebula { -namespace plugin { -class ESGraphAdapter final : public FTGraphAdapter { - FRIEND_TEST(FulltextPluginTest, ESIndexCheckTest); - FRIEND_TEST(FulltextPluginTest, ESResultTest); - FRIEND_TEST(FulltextPluginTest, ESPrefixTest); - FRIEND_TEST(FulltextPluginTest, ESWildcardTest); - FRIEND_TEST(FulltextPluginTest, ESRegexpTest); - FRIEND_TEST(FulltextPluginTest, ESFuzzyTest); - FRIEND_TEST(FulltextPluginTest, ESCreateIndexTest); - FRIEND_TEST(FulltextPluginTest, ESDropIndexTest); - FRIEND_TEST(FulltextPluginTest, ESClearIndexTest); - - public: - static std::unique_ptr kAdapter; - - StatusOr prefix(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const override; - - StatusOr wildcard(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const override; - - StatusOr regexp(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - std::vector& rows) const override; - - StatusOr fuzzy(const HttpClient& client, - const DocItem& item, - const LimitItem& limit, - const folly::dynamic& fuzziness, - const std::string& op, - std::vector& rows) const override; - - StatusOr createIndex(const HttpClient& client, - const std::string& index, - const std::string& indexTemplate = "") const override; - - StatusOr dropIndex(const HttpClient& client, const std::string& index) const override; - - // Clear the fulltext index data on es and keep the index schema. - // client: es client - // index: fulltext index name - StatusOr clearIndex(const HttpClient& client, const std::string& index) const override; - - StatusOr indexExists(const HttpClient& client, const std::string& index) const override; - - private: - ESGraphAdapter() {} - - std::string header() const noexcept; - - std::string header(const HttpClient& client, - const DocItem& item, - const LimitItem& limit) const noexcept; - - folly::dynamic columnBody(const std::string& col) const noexcept; - - std::string body(const DocItem& item, - int32_t maxRows, - FT_SEARCH_OP type, - const folly::dynamic& fuzziness = nullptr, - const std::string& op = "") const noexcept; - - folly::dynamic prefixBody(const std::string& prefix) const noexcept; - - folly::dynamic wildcardBody(const std::string& wildcard) const noexcept; - - folly::dynamic regexpBody(const std::string& regexp) const noexcept; - - folly::dynamic fuzzyBody(const std::string& regexp, - const folly::dynamic& fuzziness, - const std::string& op) const noexcept; - - bool result(const std::string& ret, std::vector& rows) const; - - bool statusCheck(const std::string& ret) const; - - bool indexCheck(const std::string& ret) const; - - // check the result - bool clearCheck(const std::string& ret) const; - - std::string createIndexCmd(const HttpClient& client, - const std::string& index, - const std::string& indexTemplate = "") const noexcept; - - std::string dropIndexCmd(const HttpClient& client, const std::string& index) const noexcept; - - // Encapsulates the clearIndex command. - // client: es client - // index: fulltext index name - std::string clearIndexCmd(const HttpClient& client, const std::string& index) const noexcept; - - std::string indexExistsCmd(const HttpClient& client, const std::string& index) const noexcept; -}; -} // namespace plugin -} // namespace nebula - -#endif // NEBULA_PLUGIN_ESGRAPHADAPTER_H diff --git a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp b/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp deleted file mode 100644 index a84e16b9fc0..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.cpp +++ /dev/null @@ -1,220 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" - -#include "common/plugin/fulltext/FTUtils.h" -#include "common/process/ProcessUtils.h" - -namespace nebula { -namespace plugin { -std::unique_ptr ESStorageAdapter::kAdapter = - std::unique_ptr(new ESStorageAdapter()); - -bool ESStorageAdapter::checkPut(const std::string& ret, const std::string& cmd) const { - // For example : - // HostAddr localHost_{"127.0.0.1", 9200}; - // DocItem item("index1", "col1", 1, 2, "aaaa"); - // - // Command should be : - // /usr/bin/curl -H "Content-Type: application/json; charset=utf-8" - // -XPUT - // "http://127.0.0.1:9200/index1/_doc/0000000001_0000000002_8c43de7b01bca674276c43e09b3ec5ba_aaaa" - // // NOLINT - // -d'{"value":"aaaa","schema_id":2,"column_id":"8c43de7b01bca674276c43e09b3ec5ba"}' - // - // If successful, the result is returned: - // { - // "_primary_term": 1, - // "_shards": { - // "failed": 0, - // "total": 2, - // "successful": 1 - // }, - // "_id": - // "0000000001_0000000002_8c43de7b01bca674276c43e09b3ec5ba_aaaa", - // "result": "created", - // "_seq_no": 0, - // "_type": "_doc", - // "_index": "index1", - // "_version": 1 - // } - try { - auto root = folly::parseJson(ret); - auto result = root.find("result"); - if (result != root.items().end() && - (result->second.getString() == "created" || result->second.getString() == "updated")) { - return true; - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - VLOG(3) << "Command : " << cmd << "failed : " << ret; - return false; -} - -bool ESStorageAdapter::checkBulk(const std::string& ret) const { - // For example : - // HostAddr localHost_{"127.0.0.1", 9200}; - // DocItem item("bulk_index", "col1", 1, 2, "row_1") - // ("bulk_index", "col1", 1, 2, "row_2") - // - // Command should be : - // curl -H "Content-Type: application/x-ndjson" -XPOST - // localhost:9200/_bulk -d ' { "index" : { "_index" : "bulk_index", "_id" : - // "1" } } { "schema_id" : 1 , "column_id" : "col1", "value" : "row_1"} { - // "index" : { "_index" : "bulk_index", "_id" : "2" } } { "schema_id" : 1 , - // "column_id" : "col1", "value" : "row_2"} - // ' - // - // If successful, the result is returned: - // { - // "took": 18, - // "errors": false, - // "items": [{ - // "index": { - // "_index": "bulk_index", - // "_type": "_doc", - // "_id": "1", - // "_version": 4, - // "result": "updated", - // "_shards": { - // "total": 2, - // "successful": 1, - // "failed": 0 - // }, - // "_seq_no": 4, - // "_primary_term": 1, - // "status": 200 - // } - // }, { - // "index": { - // "_index": "bulk_index", - // "_type": "_doc", - // "_id": "2", - // "_version": 2, - // "result": "updated", - // "_shards": { - // "total": 2, - // "successful": 1, - // "failed": 0 - // }, - // "_seq_no": 5, - // "_primary_term": 1, - // "status": 200 - // } - // }] - // } - try { - auto root = folly::parseJson(ret); - auto result = root.find("errors"); - if (result != root.items().end() && result->second.isBool() && !result->second.getBool()) { - return true; - } - } catch (std::exception& e) { - LOG(ERROR) << "result error : " << e.what(); - } - VLOG(3) << "Bulk insert failed"; - VLOG(3) << ret; - return false; -} - -StatusOr ESStorageAdapter::put(const HttpClient& client, const DocItem& item) const { - auto command = putCmd(client, item); - auto ret = nebula::ProcessUtils::runCommand(command.c_str()); - if (!ret.ok() || ret.value().empty()) { - LOG(ERROR) << "Http PUT Failed: " << command; - return Status::Error("command failed : %s", command.c_str()); - } - return checkPut(ret.value(), command); -} - -StatusOr ESStorageAdapter::bulk(const HttpClient& client, - const std::vector& items) const { - auto command = bulkCmd(client, items); - if (command.empty()) { - return true; - } - auto ret = nebula::ProcessUtils::runCommand(command.c_str()); - if (!ret.ok() || ret.value().empty()) { - VLOG(3) << "Http PUT Failed"; - VLOG(3) << command; - return Status::Error("bulk command failed"); - } - return checkBulk(ret.value()); -} - -std::string ESStorageAdapter::putHeader(const HttpClient& client, - const DocItem& item) const noexcept { - // curl -H "Content-Type: application/json; charset=utf-8" - // -XPUT "http://127.0.0.1:9200/my_temp_index_3/_doc/part1|tag4|col4|hello" - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_JSON << XPUT; - os << client.toString() << item.index << "/_doc/" << DocIDTraits::docId(item) << "\""; - return os.str(); -} - -std::string ESStorageAdapter::putBody(const DocItem& item) const noexcept { - // -d'{"column_id" : "col4", "value" : "hello"}' - folly::dynamic d = folly::dynamic::object("column_id", DocIDTraits::column(item.column))( - "value", DocIDTraits::val(item.val)); - std::stringstream os; - os << " -d'" << DocIDTraits::normalizedJson(folly::toJson(d)) << "'"; - return os.str(); -} - -std::string ESStorageAdapter::putCmd(const HttpClient& client, const DocItem& item) const noexcept { - std::stringstream os; - os << putHeader(client, item) << putBody(item); - return os.str(); -} - -std::string ESStorageAdapter::bulkHeader(const HttpClient& client) const noexcept { - // curl -H "Content-Type: application/x-ndjson; charset=utf-8" - // http://127.0.0.1:9200/_bulk - std::stringstream os; - os << CURL_COMMAND << CURL_CONTENT_NDJSON << XPOST; - os << client.toString() << "_bulk\""; - return os.str(); -} - -std::string ESStorageAdapter::bulkBody(const std::vector& items) const noexcept { - // -d ' - // { "index" : { "_index" : "bulk_index", "_id" : "1" } } - // { "column_id" : "col1", "value" : "row_1"} - // { "index" : { "_index" : "bulk_index", "_id" : "2" } } - // { "column_id" : "col1", "value" : "row_2"} - // ' - if (items.empty()) { - return ""; - } - std::stringstream os; - os << " -d '" - << "\n"; - for (const auto& item : items) { - folly::dynamic meta = - folly::dynamic::object("_id", DocIDTraits::docId(item))("_index", item.index); - folly::dynamic data = folly::dynamic::object("value", DocIDTraits::val(item.val))( - "column_id", DocIDTraits::column(item.column)); - os << folly::toJson(folly::dynamic::object("index", meta)) << "\n"; - os << DocIDTraits::normalizedJson(folly::toJson(data)) << "\n"; - } - os << "'"; - return os.str(); -} - -std::string ESStorageAdapter::bulkCmd(const HttpClient& client, - const std::vector& items) const noexcept { - auto body = bulkBody(items); - if (body.empty()) { - return ""; - } - std::stringstream os; - os << bulkHeader(client) << body; - return os.str(); -} - -} // namespace plugin -} // namespace nebula diff --git a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h b/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h deleted file mode 100644 index 323018724c6..00000000000 --- a/src/common/plugin/fulltext/elasticsearch/ESStorageAdapter.h +++ /dev/null @@ -1,51 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef NEBULA_PLUGIN_ESSTORAGEADAPTER_H -#define NEBULA_PLUGIN_ESSTORAGEADAPTER_H - -#include - -#include "common/base/StatusOr.h" -#include "common/plugin/fulltext/FTStorageAdapter.h" - -namespace nebula { -namespace plugin { - -class ESStorageAdapter final : public FTStorageAdapter { - FRIEND_TEST(FulltextPluginTest, ESPutTest); - FRIEND_TEST(FulltextPluginTest, ESBulkTest); - - public: - static std::unique_ptr kAdapter; - - StatusOr put(const HttpClient& client, const DocItem& item) const override; - - StatusOr bulk(const HttpClient& client, const std::vector& items) const override; - - private: - ESStorageAdapter() {} - - std::string putHeader(const HttpClient& client, const DocItem& item) const noexcept; - - std::string putBody(const DocItem& item) const noexcept; - - std::string putCmd(const HttpClient& client, const DocItem& item) const noexcept; - - std::string bulkHeader(const HttpClient& client) const noexcept; - - std::string bulkBody(const std::vector& items) const noexcept; - - std::string bulkCmd(const HttpClient& client, const std::vector& items) const noexcept; - - bool checkPut(const std::string& ret, const std::string& cmd) const; - - bool checkBulk(const std::string& ret) const; -}; - -} // namespace plugin -} // namespace nebula - -#endif // NEBULA_PLUGIN_ESSTORAGEADAPTER_H diff --git a/src/common/plugin/fulltext/test/ElasticsearchTest.cpp b/src/common/plugin/fulltext/test/ElasticsearchTest.cpp index 9fa44f0f6d2..6e5a8c1b01c 100644 --- a/src/common/plugin/fulltext/test/ElasticsearchTest.cpp +++ b/src/common/plugin/fulltext/test/ElasticsearchTest.cpp @@ -26,22 +26,52 @@ class MockHttpClient : public HttpClient { get, (const std::string& url, const std::vector& headers), (override)); + MOCK_METHOD(HttpResponse, + get, + (const std::string& url, + const std::vector& headers, + const std::string&, + const std::string&), + (override)); MOCK_METHOD(HttpResponse, post, (const std::string& url, const std::vector& headers, const std::string& body), (override)); + MOCK_METHOD(HttpResponse, + post, + (const std::string& url, + const std::vector& headers, + const std::string& body, + const std::string&, + const std::string&), + (override)); MOCK_METHOD(HttpResponse, delete_, (const std::string& url, const std::vector& headers), (override)); + MOCK_METHOD(HttpResponse, + delete_, + (const std::string& url, + const std::vector& headers, + const std::string&, + const std::string&), + (override)); MOCK_METHOD(HttpResponse, put, (const std::string& url, const std::vector& headers, const std::string& body), (override)); + MOCK_METHOD(HttpResponse, + put, + (const std::string& url, + const std::vector& headers, + const std::string& body, + const std::string&, + const std::string&), + (override)); }; HttpResponse operator"" _http_resp(const char* s, size_t) { @@ -112,7 +142,9 @@ TEST_F(ESTest, createIndex) { EXPECT_CALL(mockHttpClient, put("http://127.0.0.1:9200/nebula_index_1", std::vector{"Content-Type: application/json"}, - _)) + _, + "", + "")) .Times(3) .WillOnce(Return(normalSuccessResp_)) .WillOnce(Return(esErrorResp_)) @@ -139,7 +171,9 @@ TEST_F(ESTest, dropIndex) { MockHttpClient mockHttpClient; EXPECT_CALL(mockHttpClient, delete_("http://127.0.0.1:9200/nebula_index_1", - std::vector{"Content-Type: application/json"})) + std::vector{"Content-Type: application/json"}, + "", + "")) .Times(3) .WillOnce(Return(normalSuccessResp_)) .WillOnce(Return(esErrorResp_)) @@ -186,7 +220,9 @@ content-length: 78 EXPECT_CALL(mockHttpClient, get("http://127.0.0.1:9200/nebula_index_1", - std::vector{"Content-Type: application/json"})) + std::vector{"Content-Type: application/json"}, + "", + "")) .Times(4) .WillOnce(Return(indexExistResp)) .WillOnce(Return(indexNotExistResp)) @@ -230,14 +266,18 @@ content-length: 78 EXPECT_CALL(mockHttpClient, post("http://127.0.0.1:9200/nebula_index_1/_delete_by_query?refresh=false", std::vector{"Content-Type: application/json"}, - _)) + _, + "", + "")) .Times(2) .WillOnce(Return(clearSuccessResp_)) .WillOnce(Return(esErrorResp_)); EXPECT_CALL(mockHttpClient, post("http://127.0.0.1:9200/nebula_index_1/_delete_by_query?refresh=true", std::vector{"Content-Type: application/json"}, - _)) + _, + "", + "")) .Times(1) .WillOnce(Return(curlErrorResp_)); @@ -274,7 +314,9 @@ TEST_F(ESTest, prefix) { EXPECT_CALL(mockHttpClient, post("http://127.0.0.1:9200/nebula_index_1/_search", std::vector{"Content-Type: application/json"}, - folly::toJson(prefixBody))) + folly::toJson(prefixBody), + std::string(""), + std::string(""))) .Times(3) .WillOnce(Return(queryResultResp_)) .WillOnce(Return(esErrorResp_)) @@ -282,7 +324,7 @@ TEST_F(ESTest, prefix) { plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", ""); plugin::ESAdapter adapter({client}); { - auto result = adapter.prefix("nebula_index_1", "abc"); + auto result = adapter.prefix("nebula_index_1", "abc", -1, -1); ASSERT_TRUE(result.ok()); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 2); @@ -290,12 +332,12 @@ TEST_F(ESTest, prefix) { ASSERT_EQ(esResult.items[1], plugin::ESQueryResult::Item("a", "b", 10, "edge text")); } { - auto result = adapter.prefix("nebula_index_1", "abc"); + auto result = adapter.prefix("nebula_index_1", "abc", -1, -1); ASSERT_FALSE(result.ok()); ASSERT_EQ(result.status().message(), R"({"reason":"mock error"})"); } { - auto result = adapter.prefix("nebula_index_1", "abc"); + auto result = adapter.prefix("nebula_index_1", "abc", -1, -1); ASSERT_FALSE(result.ok()); ASSERT_EQ(result.status().message(), R"(curl error(7):mock error message)"); } @@ -316,7 +358,9 @@ TEST_F(ESTest, wildcard) { EXPECT_CALL(mockHttpClient, post("http://127.0.0.1:9200/nebula_index_1/_search", std::vector{"Content-Type: application/json"}, - folly::toJson(body))) + folly::toJson(body), + "", + "")) .Times(3) .WillOnce(Return(queryResultResp_)) .WillOnce(Return(esErrorResp_)) @@ -324,7 +368,7 @@ TEST_F(ESTest, wildcard) { plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", ""); plugin::ESAdapter adapter({client}); { - auto result = adapter.wildcard("nebula_index_1", "abc"); + auto result = adapter.wildcard("nebula_index_1", "abc", -1, -1); ASSERT_TRUE(result.ok()); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 2); @@ -332,12 +376,12 @@ TEST_F(ESTest, wildcard) { ASSERT_EQ(esResult.items[1], plugin::ESQueryResult::Item("a", "b", 10, "edge text")); } { - auto result = adapter.wildcard("nebula_index_1", "abc"); + auto result = adapter.wildcard("nebula_index_1", "abc", -1, -1); ASSERT_FALSE(result.ok()); ASSERT_EQ(result.status().message(), R"({"reason":"mock error"})"); } { - auto result = adapter.wildcard("nebula_index_1", "abc"); + auto result = adapter.wildcard("nebula_index_1", "abc", -1, -1); ASSERT_FALSE(result.ok()); ASSERT_EQ(result.status().message(), R"(curl error(7):mock error message)"); } @@ -358,7 +402,9 @@ TEST_F(ESTest, regexp) { EXPECT_CALL(mockHttpClient, post("http://127.0.0.1:9200/nebula_index_1/_search", std::vector{"Content-Type: application/json"}, - folly::toJson(body))) + folly::toJson(body), + "", + "")) .Times(3) .WillOnce(Return(queryResultResp_)) .WillOnce(Return(esErrorResp_)) @@ -366,7 +412,7 @@ TEST_F(ESTest, regexp) { plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", ""); plugin::ESAdapter adapter({client}); { - auto result = adapter.regexp("nebula_index_1", "abc"); + auto result = adapter.regexp("nebula_index_1", "abc", -1, -1); ASSERT_TRUE(result.ok()); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 2); @@ -374,12 +420,12 @@ TEST_F(ESTest, regexp) { ASSERT_EQ(esResult.items[1], plugin::ESQueryResult::Item("a", "b", 10, "edge text")); } { - auto result = adapter.regexp("nebula_index_1", "abc"); + auto result = adapter.regexp("nebula_index_1", "abc", -1, -1); ASSERT_FALSE(result.ok()); ASSERT_EQ(result.status().message(), R"({"reason":"mock error"})"); } { - auto result = adapter.regexp("nebula_index_1", "abc"); + auto result = adapter.regexp("nebula_index_1", "abc", -1, -1); ASSERT_FALSE(result.ok()); ASSERT_EQ(result.status().message(), R"(curl error(7):mock error message)"); } @@ -391,14 +437,18 @@ TEST_F(ESTest, bulk) { EXPECT_CALL(mockHttpClient, post("http://127.0.0.1:9200/_bulk?refresh=true", std::vector{"Content-Type: application/x-ndjson"}, - _)) // TODO(hs.zhang): Matcher + _, + "", + "")) // TODO(hs.zhang): Matcher .Times(2) .WillOnce(Return(queryResultResp_)) .WillOnce(Return(esErrorResp_)); EXPECT_CALL(mockHttpClient, post("http://127.0.0.1:9200/_bulk?refresh=false", std::vector{"Content-Type: application/x-ndjson"}, - _)) // TODO(hs.zhang): Matcher + _, + "", + "")) // TODO(hs.zhang): Matcher .Times(1) .WillOnce(Return(curlErrorResp_)); plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", ""); @@ -440,7 +490,9 @@ TEST_F(ESTest, fuzzy) { EXPECT_CALL(mockHttpClient, post("http://127.0.0.1:9200/nebula_index_1/_search", std::vector{"Content-Type: application/json"}, - _)) + _, + "", + "")) .Times(3) .WillOnce(Return(queryResultResp_)) .WillOnce(Return(esErrorResp_)) @@ -448,7 +500,7 @@ TEST_F(ESTest, fuzzy) { plugin::ESClient client(mockHttpClient, "http", "127.0.0.1:9200", "", ""); plugin::ESAdapter adapter({client}); { - auto result = adapter.fuzzy("nebula_index_1", "abc", "2"); + auto result = adapter.fuzzy("nebula_index_1", "abc", "2", -1, -1); ASSERT_TRUE(result.ok()); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 2); @@ -456,12 +508,12 @@ TEST_F(ESTest, fuzzy) { ASSERT_EQ(esResult.items[1], plugin::ESQueryResult::Item("a", "b", 10, "edge text")); } { - auto result = adapter.fuzzy("nebula_index_1", "abc", "2"); + auto result = adapter.fuzzy("nebula_index_1", "abc", "2", -1, -1); ASSERT_FALSE(result.ok()); ASSERT_EQ(result.status().message(), R"({"reason":"mock error"})"); } { - auto result = adapter.fuzzy("nebula_index_1", "abc", "2"); + auto result = adapter.fuzzy("nebula_index_1", "abc", "2", -1, -1); ASSERT_FALSE(result.ok()); ASSERT_EQ(result.status().message(), R"(curl error(7):mock error message)"); } @@ -509,7 +561,7 @@ TEST_F(RealESTest, DISABLED_QUERY) { ASSERT_TRUE(result.ok()) << result.message(); } { - auto result = adapter.prefix(indexName, "a"); + auto result = adapter.prefix(indexName, "a", -1, -1); ASSERT_TRUE(result.ok()) << result.status().message(); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 2); @@ -522,7 +574,7 @@ TEST_F(RealESTest, DISABLED_QUERY) { ASSERT_EQ(esResult.items[1], plugin::ESQueryResult::Item("2", "abcd")); } { - auto result = adapter.regexp(indexName, "a.*"); + auto result = adapter.regexp(indexName, "a.*", -1, -1); ASSERT_TRUE(result.ok()) << result.status().message(); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 2); @@ -535,7 +587,7 @@ TEST_F(RealESTest, DISABLED_QUERY) { ASSERT_EQ(esResult.items[1], plugin::ESQueryResult::Item("2", "abcd")); } { - auto result = adapter.prefix(indexName, "abcd"); + auto result = adapter.prefix(indexName, "abcd", -1, -1); ASSERT_TRUE(result.ok()) << result.status().message(); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 1); @@ -569,7 +621,7 @@ TEST_F(RealESTest, DISABLED_QUERY) { ASSERT_EQ(esResult.items[4], plugin::ESQueryResult::Item("5", "NebulaGraph是一个图数据库")); } { - auto result = adapter.prefix(indexName, "NebulaGraph"); + auto result = adapter.prefix(indexName, "NebulaGraph", -1, -1); ASSERT_TRUE(result.ok()) << result.status().message(); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 2); @@ -583,7 +635,7 @@ TEST_F(RealESTest, DISABLED_QUERY) { ASSERT_EQ(esResult.items[1], plugin::ESQueryResult::Item("5", "NebulaGraph是一个图数据库")); } { - auto result = adapter.regexp(indexName, "NebulaGraph.*(图数据库|database)"); + auto result = adapter.regexp(indexName, "NebulaGraph.*(图数据库|database)", -1, -1); ASSERT_TRUE(result.ok()) << result.status().message(); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 2); @@ -597,7 +649,7 @@ TEST_F(RealESTest, DISABLED_QUERY) { ASSERT_EQ(esResult.items[1], plugin::ESQueryResult::Item("5", "NebulaGraph是一个图数据库")); } { - auto result = adapter.wildcard(indexName, "Nebula?raph是*"); + auto result = adapter.wildcard(indexName, "Nebula?raph是*", -1, -1); ASSERT_TRUE(result.ok()) << result.status().message(); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 2); @@ -610,7 +662,7 @@ TEST_F(RealESTest, DISABLED_QUERY) { ASSERT_EQ(esResult.items[1], plugin::ESQueryResult::Item("5", "NebulaGraph是一个图数据库")); } { - auto result = adapter.fuzzy(indexName, "Nebulagraph is a graph Database", "2"); + auto result = adapter.fuzzy(indexName, "Nebulagraph is a graph Database", "2", -1, -1); ASSERT_TRUE(result.ok()) << result.status().message(); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 1); @@ -625,7 +677,7 @@ TEST_F(RealESTest, DISABLED_QUERY) { ASSERT_TRUE(result.ok()) << result.message(); } { - auto result = adapter.prefix(indexName, "NebulaGraph"); + auto result = adapter.prefix(indexName, "NebulaGraph", -1, -1); ASSERT_TRUE(result.ok()) << result.status().message(); auto esResult = std::move(result).value(); ASSERT_EQ(esResult.items.size(), 1); diff --git a/src/common/plugin/fulltext/test/FulltextPluginTest.cpp b/src/common/plugin/fulltext/test/FulltextPluginTest.cpp deleted file mode 100644 index c17c5291f3d..00000000000 --- a/src/common/plugin/fulltext/test/FulltextPluginTest.cpp +++ /dev/null @@ -1,296 +0,0 @@ -/* Copyright (c) 2020 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include - -#include "common/base/Base.h" -#include "common/network/NetworkUtils.h" -#include "common/plugin/fulltext/FTUtils.h" -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" -#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" - -namespace nebula { -namespace plugin { - -void verifyBodyStr(const std::string& actual, const folly::dynamic& expect) { - ASSERT_EQ(" -d'", actual.substr(0, 4)); - ASSERT_EQ("'", actual.substr(actual.size() - 1, 1)); - auto body = folly::parseJson(actual.substr(4, actual.size() - 5)); - ASSERT_EQ(expect, body); -} - -void verifyBodyStr(const std::string& actual, const std::vector& expect) { - std::vector lines; - folly::split("\n", actual, lines, true); - if (lines.size() > 0) { - ASSERT_LE(2, lines.size()); - ASSERT_EQ("'", lines[lines.size() - 1]); - for (size_t i = 1; i < lines.size() - 1; i++) { - auto body = folly::parseJson(lines[i]); - ASSERT_EQ(expect[i - 1], body); - } - } -} - -TEST(FulltextPluginTest, ESIndexCheckTest) { - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient client(localHost_); - auto ret = ESGraphAdapter().indexExistsCmd(client, "test_index"); - auto expected = - "/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" " - "-XGET -k \"http://127.0.0.1:9200/_cat/indices/test_index?format=json\""; - ASSERT_EQ(expected, ret); -} - -TEST(FulltextPluginTest, ESCreateIndexTest) { - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient client(localHost_); - auto ret = ESGraphAdapter().createIndexCmd(client, "test_index"); - auto expected = - "/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" " - "-XPUT -k \"http://127.0.0.1:9200/test_index\""; - ASSERT_EQ(expected, ret); -} - -TEST(FulltextPluginTest, ESDropIndexTest) { - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient client(localHost_); - auto ret = ESGraphAdapter().dropIndexCmd(client, "test_index"); - auto expected = - "/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" " - "-XDELETE -k \"http://127.0.0.1:9200/test_index\""; - ASSERT_EQ(expected, ret); -} - -TEST(FulltextPluginTest, ESClearIndexTest) { - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient client(localHost_); - auto ret = ESGraphAdapter().clearIndexCmd(client, "test_index"); - auto expected = - "/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\"" - " -XPOST -k \"http://127.0.0.1:9200/test_index/_delete_by_query?refresh&slices=5\"" - " -d '{\"query\": {\"match_all\":{}}}'"; - ASSERT_EQ(expected, ret); -} - -TEST(FulltextPluginTest, ESPutTest) { - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient hc(localHost_); - DocItem item("index1", "col1", 1, "aaaa"); - auto header = ESStorageAdapter().putHeader(hc, item); - std::string expected = - "/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" " - "-XPUT -k \"http://127.0.0.1:9200/index1/_doc/" - "00000000018c43de7b01bca674276c43e09b3ec5baYWFhYQ==\""; - ASSERT_EQ(expected, header); - - auto body = ESStorageAdapter().putBody(item); - - folly::dynamic d = folly::dynamic::object("column_id", DocIDTraits::column(item.column))( - "value", DocIDTraits::val(item.val)); - verifyBodyStr(std::move(body), std::move(d)); -} - -TEST(FulltextPluginTest, ESBulkTest) { - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient hc(localHost_); - DocItem item1("index1", "col1", 1, "aaaa"); - DocItem item2("index1", "col1", 1, "bbbb"); - std::set strs; - strs.emplace("aaa"); - strs.emplace("bbb"); - std::vector items; - items.emplace_back(DocItem("index1", "col1", 1, "aaaa")); - items.emplace_back(DocItem("index1", "col1", 1, "bbbb")); - auto header = ESStorageAdapter().bulkHeader(hc); - std::string expected = - "/usr/bin/curl -H \"Content-Type: application/x-ndjson; " - "charset=utf-8\" -XPOST -k \"http://127.0.0.1:9200/_bulk\""; - ASSERT_EQ(expected, header); - - std::vector bodies; - for (const auto& item : items) { - folly::dynamic meta = - folly::dynamic::object("_id", DocIDTraits::docId(item))("_index", item.index); - folly::dynamic data = folly::dynamic::object("value", DocIDTraits::val(item.val))( - "column_id", DocIDTraits::column(item.column)); - bodies.emplace_back(folly::dynamic::object("index", std::move(meta))); - bodies.emplace_back(std::move(data)); - } - - auto body = ESStorageAdapter().bulkBody(items); - verifyBodyStr(body, std::move(bodies)); -} - -TEST(FulltextPluginTest, ESPutToTest) { - HostAddr localHost_{"127.0.0.1", 11111}; - HttpClient hc(localHost_); - DocItem item1("index1", "col1", 1, "aaaa"); - // A ElasticSearch instance needs to be turn on at here, so expected false. - auto ret = ESStorageAdapter::kAdapter->put(hc, item1); - ASSERT_FALSE(ret); -} - -TEST(FulltextPluginTest, ESResultTest) { - // { - // "took": 2, - // "timed_out": false, - // "_shards": { - // "total": 1, - // "successful": 1, - // "skipped": 0, - // "failed": 0 - // }, - // "hits": { - // "total": { - // "value": 1, - // "relation": "eq" - // }, - // "max_score": 3.3862944, - // "hits": [ - // { - // "_index": "my_temp_index_3", - // "_type": "_doc", - // "_id": "part1_tag1_col1_aaa", - // "_score": 3.3862944, - // "_source": { - // "value": "aaa" - // } - // }, - // { - // "_index": "my_temp_index_3", - // "_type": "_doc", - // "_id": "part2_tag2_col1_bbb", - // "_score": 1.0, - // "_source": { - // "value": "bbb" - // } - // } - // ] - // } - // } - { - std::string json = - R"({"took": 2,"timed_out": false,"_shards": {"total": 1,"successful": 1, - "skipped": 0,"failed": 0},"hits": {"total": {"value": 1,"relation": - "eq"},"max_score": 3.3862944,"hits": [{"_index": "my_temp_index_3", - "_type": "_doc","_id": "part1_tag1_col1_aaa","_score": 3.3862944, - "_source": {"value": "aaa"}},{"_index": "my_temp_index_3","_type": - "_doc","_id": "part2_tag2_col1_bbb","_score": 1.0, - "_source": {"value": "bbb"}}]}})"; - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient hc(localHost_); - std::vector rows; - auto ret = ESGraphAdapter().result(json, rows); - ASSERT_TRUE(ret); - std::vector expect = {"aaa", "bbb"}; - ASSERT_EQ(expect, rows); - } - - // { - // "took": 1, - // "timed_out": false, - // "_shards": { - // "total": 1, - // "successful": 1, - // "skipped": 0, - // "failed": 0 - // }, - // "hits": { - // "total": { - // "value": 0, - // "relation": "eq" - // }, - // "max_score": null, - // "hits": [] - // } - // } - { - std::string json = - R"({"took": 1,"timed_out": false,"_shards": {"total": 1,"successful": 1, - "skipped": 0,"failed": 0},"hits": {"total": - {"value": 0,"relation": "eq"},"max_score": null,"hits": []}})"; - - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient hc(localHost_); - std::vector rows; - auto ret = ESGraphAdapter().result(json, rows); - ASSERT_TRUE(ret); - ASSERT_EQ(0, rows.size()); - } - - // { - // "error": { - // "root_cause": [ - // { - // "type": "parsing_exception", - // "reason": "Unknown key for a VALUE_STRING in [_source].", - // "line": 1, - // "col": 128 - // } - // ], - // "type": "parsing_exception", - // "reason": "Unknown key for a VALUE_STRING in [_source].", - // "line": 1, - // "col": 128 - // }, - // "status": 400 - // } - { - std::string json = - R"({"error": {"root_cause": [{"type": "parsing_exception","reason": - "Unknown key for a VALUE_STRING in [_source].","line": 1,"col": 128}], - "type": "parsing_exception","reason": "Unknown key for a VALUE_STRING - in [_source].","line": 1,"col": 128},"status": 400})"; - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient hc(localHost_); - std::vector rows; - auto ret = ESGraphAdapter().result(json, rows); - ASSERT_FALSE(ret); - } -} - -// TODO: The json string is not comparable. -TEST(FulltextPluginTest, ESPrefixTest) { - HostAddr localHost_{"127.0.0.1", 9200}; - HttpClient client(localHost_); - DocItem item("index1", "col1", 1, "aa"); - LimitItem limit(10, 100); - auto header = ESGraphAdapter().header(client, item, limit); - std::string expected = - "/usr/bin/curl -H \"Content-Type: application/json; charset=utf-8\" " - "-XGET -k \"http://127.0.0.1:9200/index1/_search?timeout=10ms\""; - ASSERT_EQ(expected, header); - - auto body = ESGraphAdapter().prefixBody("aa"); - ASSERT_EQ("{\"prefix\":{\"value\":\"aa\"}}", folly::toJson(body)); -} - -TEST(FulltextPluginTest, ESWildcardTest) { - auto body = ESGraphAdapter().wildcardBody("a?a"); - ASSERT_EQ("{\"wildcard\":{\"value\":\"a?a\"}}", folly::toJson(body)); -} - -TEST(FulltextPluginTest, ESRegexpTest) { - auto body = ESGraphAdapter().regexpBody("+a"); - ASSERT_EQ("{\"regexp\":{\"value\":\"+a\"}}", folly::toJson(body)); -} - -TEST(FulltextPluginTest, ESFuzzyTest) { - auto body = ESGraphAdapter().fuzzyBody("+a", "AUTO", "OR"); - auto expected = - "{\"match\":{\"value\":{\"operator\":\"OR\"," - "\"query\":\"+a\",\"fuzziness\":\"AUTO\"}}}"; - ASSERT_EQ(folly::parseJson(expected), body); -} -} // namespace plugin -} // namespace nebula - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - return RUN_ALL_TESTS(); -} diff --git a/src/common/utils/test/CMakeLists.txt b/src/common/utils/test/CMakeLists.txt index 940999b720f..afbdc3e9de4 100644 --- a/src/common/utils/test/CMakeLists.txt +++ b/src/common/utils/test/CMakeLists.txt @@ -100,7 +100,7 @@ nebula_add_test( $ $ $ - $ + # $ LIBRARIES ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 4e88107b8b1..ddf35d7f916 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -54,7 +54,7 @@ set(storage_meta_deps $ $ $ - $ + $ $ $ ) @@ -147,7 +147,7 @@ nebula_add_executable( $ $ $ - $ + $ $ $ ${common_deps} @@ -234,7 +234,7 @@ nebula_add_executable( $ $ $ - $ + $ $ $ $ diff --git a/src/graph/context/test/CMakeLists.txt b/src/graph/context/test/CMakeLists.txt index b83013e5101..d01223a6696 100644 --- a/src/graph/context/test/CMakeLists.txt +++ b/src/graph/context/test/CMakeLists.txt @@ -27,7 +27,7 @@ SET(CONTEXT_TEST_LIBS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/executor/admin/SpaceExecutor.cpp b/src/graph/executor/admin/SpaceExecutor.cpp index da761521210..9aa7c4abf50 100644 --- a/src/graph/executor/admin/SpaceExecutor.cpp +++ b/src/graph/executor/admin/SpaceExecutor.cpp @@ -134,15 +134,16 @@ folly::Future DropSpaceExecutor::execute() { qctx()->rctx()->session()->setSpace(std::move(spaceInfo)); } if (!ftIndexes.empty()) { - auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); - if (!tsRet.ok()) { - LOG(WARNING) << "Get text search clients failed: " << tsRet.status(); + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx()->getMetaClient()); + if (!esAdapterRet.ok()) { + LOG(WARNING) << "Get text search clients failed: " << esAdapterRet.status(); return Status::OK(); } + auto esAdapter = std::move(esAdapterRet).value(); for (const auto &ftindex : ftIndexes) { - auto ftRet = FTIndexUtils::dropTSIndex(tsRet.value(), ftindex); + auto ftRet = esAdapter.dropIndex(ftindex); if (!ftRet.ok()) { - LOG(WARNING) << "Drop fulltext index `" << ftindex << "' failed: " << ftRet.status(); + LOG(ERROR) << "Drop fulltext index `" << ftindex << "' failed: " << ftRet; } } } @@ -189,15 +190,16 @@ folly::Future ClearSpaceExecutor::execute() { return resp.status(); } if (!ftIndexes.empty()) { - auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); - if (!tsRet.ok()) { + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx()->getMetaClient()); + if (!esAdapterRet.ok()) { LOG(WARNING) << "Get text search clients failed"; return Status::OK(); } + auto esAdapter = std::move(esAdapterRet).value(); for (const auto &ftindex : ftIndexes) { - auto ftRet = FTIndexUtils::clearTSIndex(tsRet.value(), ftindex); + auto ftRet = esAdapter.clearIndex(ftindex); if (!ftRet.ok()) { - LOG(WARNING) << "Clear fulltext index `" << ftindex << "' failed: " << ftRet.status(); + LOG(WARNING) << "Clear fulltext index `" << ftindex << "' failed: " << ftRet; } } } diff --git a/src/graph/executor/maintain/FTIndexExecutor.cpp b/src/graph/executor/maintain/FTIndexExecutor.cpp index 4b32d3939c6..92d0018ace9 100644 --- a/src/graph/executor/maintain/FTIndexExecutor.cpp +++ b/src/graph/executor/maintain/FTIndexExecutor.cpp @@ -41,14 +41,14 @@ folly::Future DropFTIndexExecutor::execute() { << "' failed: " << resp.status(); return resp.status(); } - auto tsRet = FTIndexUtils::getTSClients(qctx()->getMetaClient()); - if (!tsRet.ok()) { - LOG(WARNING) << "Get text search clients failed: " << tsRet.status(); + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx()->getMetaClient()); + if (!esAdapterRet.ok()) { + LOG(WARNING) << "Get text search clients failed: " << esAdapterRet.status(); } - auto ftRet = FTIndexUtils::dropTSIndex(std::move(tsRet).value(), inode->getName()); + auto esAdapter = std::move(esAdapterRet).value(); + auto ftRet = esAdapter.dropIndex(inode->getName()); if (!ftRet.ok()) { - LOG(WARNING) << "Drop fulltext index '" << inode->getName() - << "' failed: " << ftRet.status(); + LOG(WARNING) << "Drop fulltext index '" << inode->getName() << "' failed: " << ftRet; } return Status::OK(); }); diff --git a/src/graph/executor/test/CMakeLists.txt b/src/graph/executor/test/CMakeLists.txt index af16fa5f3df..cbb5aee6994 100644 --- a/src/graph/executor/test/CMakeLists.txt +++ b/src/graph/executor/test/CMakeLists.txt @@ -34,7 +34,7 @@ SET(EXEC_QUERY_TEST_OBJS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/optimizer/test/CMakeLists.txt b/src/graph/optimizer/test/CMakeLists.txt index 04bd1330ba3..03c56547921 100644 --- a/src/graph/optimizer/test/CMakeLists.txt +++ b/src/graph/optimizer/test/CMakeLists.txt @@ -29,7 +29,7 @@ set(OPTIMIZER_TEST_LIB $ $ $ - $ + $ $ $ $ diff --git a/src/graph/util/FTIndexUtils.cpp b/src/graph/util/FTIndexUtils.cpp index 685c60ede1a..07d857358c5 100644 --- a/src/graph/util/FTIndexUtils.cpp +++ b/src/graph/util/FTIndexUtils.cpp @@ -25,8 +25,7 @@ bool FTIndexUtils::needTextSearch(const Expression* expr) { } } -StatusOr> FTIndexUtils::getTSClients( - meta::MetaClient* client) { +StatusOr<::nebula::plugin::ESAdapter> FTIndexUtils::getESAdapter(meta::MetaClient* client) { auto tcs = client->getServiceClientsFromCache(meta::cpp2::ExternalServiceType::ELASTICSEARCH); if (!tcs.ok()) { return tcs.status(); @@ -34,76 +33,30 @@ StatusOr> FTIndexUtils::getTSClients( if (tcs.value().empty()) { return Status::SemanticError("No text search client found"); } - std::vector tsClients; + std::vector<::nebula::plugin::ESClient> clients; for (const auto& c : tcs.value()) { - nebula::plugin::HttpClient hc; - hc.host = c.host; - if (c.user_ref().has_value() && c.pwd_ref().has_value()) { - hc.user = *c.user_ref(); - hc.password = *c.pwd_ref(); - } - hc.connType = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; - tsClients.emplace_back(std::move(hc)); - } - return tsClients; -} - -StatusOr FTIndexUtils::checkTSIndex(const std::vector& tsClients, - const std::string& index) { - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto ret = - nebula::plugin::ESGraphAdapter::kAdapter->indexExists(randomFTClient(tsClients), index); - if (!ret.ok()) { - continue; - } - return std::move(ret).value(); - } - return Status::Error("fulltext index get failed : %s", index.c_str()); -} - -StatusOr FTIndexUtils::dropTSIndex(const std::vector& tsClients, - const std::string& index) { - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto ret = - nebula::plugin::ESGraphAdapter::kAdapter->dropIndex(randomFTClient(tsClients), index); - if (!ret.ok()) { - continue; - } - return std::move(ret).value(); - } - return Status::Error("drop fulltext index failed : %s", index.c_str()); -} - -StatusOr FTIndexUtils::clearTSIndex(const std::vector& tsClients, - const std::string& index) { - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto ret = - nebula::plugin::ESGraphAdapter::kAdapter->clearIndex(randomFTClient(tsClients), index); - if (!ret.ok()) { - continue; - } - return std::move(ret).value(); + std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; + std::string address = c.host.toString(); + std::string user = c.user_ref().has_value() ? *c.user_ref() : ""; + std::string password = c.pwd_ref().has_value() ? *c.pwd_ref() : ""; + clients.emplace_back(HttpClient::instance(), protocol, address, user, password); } - return Status::Error("clear fulltext index failed : %s", index.c_str()); + return ::nebula::plugin::ESAdapter(std::move(clients)); } -StatusOr FTIndexUtils::rewriteTSFilter( - ObjectPool* pool, - bool isEdge, - Expression* expr, - const std::string& index, - const std::vector& tsClients) { - auto vRet = textSearch(expr, index, tsClients); +StatusOr FTIndexUtils::rewriteTSFilter(ObjectPool* pool, + bool isEdge, + Expression* expr, + const std::string& index, + ::nebula::plugin::ESAdapter& esAdapter) { + auto vRet = textSearch(expr, index, esAdapter); if (!vRet.ok()) { - return Status::SemanticError("Text search error."); + return vRet.status(); } - if (vRet.value().empty()) { + auto result = std::move(vRet).value(); + if (result.items.empty()) { return nullptr; } - auto tsArg = static_cast(expr)->arg(); Expression* propExpr; if (isEdge) { @@ -112,8 +65,8 @@ StatusOr FTIndexUtils::rewriteTSFilter( propExpr = TagPropertyExpression::make(pool, tsArg->from(), tsArg->prop()); } std::vector rels; - for (const auto& row : vRet.value()) { - auto constExpr = ConstantExpression::make(pool, Value(row)); + for (auto& item : result.items) { + auto constExpr = ConstantExpression::make(pool, Value(item.text)); rels.emplace_back(RelationalExpression::makeEQ(pool, propExpr, constExpr)); } if (rels.size() == 1) { @@ -122,67 +75,64 @@ StatusOr FTIndexUtils::rewriteTSFilter( return ExpressionUtils::pushOrs(pool, rels); } -StatusOr> FTIndexUtils::textSearch( - Expression* expr, - const std::string& index, - const std::vector& tsClients) { +StatusOr FTIndexUtils::textSearch( + Expression* expr, const std::string& index, ::nebula::plugin::ESAdapter& esAdapter) { auto tsExpr = static_cast(expr); - - nebula::plugin::DocItem doc(index, tsExpr->arg()->prop(), tsExpr->arg()->val()); - nebula::plugin::LimitItem limit(tsExpr->arg()->timeout(), tsExpr->arg()->limit()); - std::vector result; - // TODO (sky) : External index load balancing - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - StatusOr ret = Status::Error(); - switch (tsExpr->kind()) { - case Expression::Kind::kTSFuzzy: { - folly::dynamic fuzz = folly::dynamic::object(); - if (tsExpr->arg()->fuzziness() < 0) { - fuzz = "AUTO"; - } else { - fuzz = tsExpr->arg()->fuzziness(); - } - std::string op = tsExpr->arg()->op().empty() ? "or" : tsExpr->arg()->op(); - ret = nebula::plugin::ESGraphAdapter::kAdapter->fuzzy( - randomFTClient(tsClients), doc, limit, fuzz, op, result); - break; - } - case Expression::Kind::kTSPrefix: { - ret = nebula::plugin::ESGraphAdapter::kAdapter->prefix( - randomFTClient(tsClients), doc, limit, result); - break; - } - case Expression::Kind::kTSRegexp: { - ret = nebula::plugin::ESGraphAdapter::kAdapter->regexp( - randomFTClient(tsClients), doc, limit, result); - break; - } - case Expression::Kind::kTSWildcard: { - ret = nebula::plugin::ESGraphAdapter::kAdapter->wildcard( - randomFTClient(tsClients), doc, limit, result); - break; - } - default: - return Status::SemanticError("text search expression error"); + std::function()> execFunc; + switch (tsExpr->kind()) { + case Expression::Kind::kTSFuzzy: { + std::string pattern = tsExpr->arg()->val(); + int fuzziness = tsExpr->arg()->fuzziness(); + int64_t size = tsExpr->arg()->limit(); + int64_t timeout = tsExpr->arg()->timeout(); + execFunc = [&index, pattern, &esAdapter, fuzziness, size, timeout]() { + return esAdapter.fuzzy( + index, pattern, fuzziness < 0 ? "AUTO" : std::to_string(fuzziness), size, timeout); + }; + break; } - if (!ret.ok()) { - continue; + case Expression::Kind::kTSPrefix: { + std::string pattern = tsExpr->arg()->val(); + int64_t size = tsExpr->arg()->limit(); + int64_t timeout = tsExpr->arg()->timeout(); + execFunc = [&index, pattern, &esAdapter, size, timeout]() { + return esAdapter.prefix(index, pattern, size, timeout); + }; + break; } - if (ret.value()) { - return result; + case Expression::Kind::kTSRegexp: { + std::string pattern = tsExpr->arg()->val(); + int64_t size = tsExpr->arg()->limit(); + int64_t timeout = tsExpr->arg()->timeout(); + execFunc = [&index, pattern, &esAdapter, size, timeout]() { + return esAdapter.regexp(index, pattern, size, timeout); + }; + break; + } + case Expression::Kind::kTSWildcard: { + std::string pattern = tsExpr->arg()->val(); + int64_t size = tsExpr->arg()->limit(); + int64_t timeout = tsExpr->arg()->timeout(); + execFunc = [&index, pattern, &esAdapter, size, timeout]() { + return esAdapter.wildcard(index, pattern, size, timeout); + }; + break; + } + default: { + return Status::SemanticError("text search expression error"); } - return Status::SemanticError( - "External index error. " - "please check the status of fulltext cluster"); } - return Status::SemanticError("scan external index failed"); -} -const nebula::plugin::HttpClient& FTIndexUtils::randomFTClient( - const std::vector& tsClients) { - auto i = folly::Random::rand32(tsClients.size() - 1); - return tsClients[i]; + auto retryCnt = FLAGS_ft_request_retry_times > 0 ? FLAGS_ft_request_retry_times : 1; + StatusOr result; + while (retryCnt-- > 0) { + result = execFunc(); + if (!result.ok()) { + continue; + } + break; + } + return result; } } // namespace graph diff --git a/src/graph/util/FTIndexUtils.h b/src/graph/util/FTIndexUtils.h index 1231f76ddcc..3266569f96f 100644 --- a/src/graph/util/FTIndexUtils.h +++ b/src/graph/util/FTIndexUtils.h @@ -7,7 +7,7 @@ #include "clients/meta/MetaClient.h" #include "common/base/StatusOr.h" -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "graph/util/SchemaUtil.h" #include "parser/MaintainSentences.h" @@ -19,37 +19,21 @@ class FTIndexUtils final { FTIndexUtils() = delete; // Checks if the filter expression is full-text search related static bool needTextSearch(const Expression* expr); - // Checks meta client and returns the full-text search client if there is one - static StatusOr> getTSClients(meta::MetaClient* client); - // Checks if the full-text index exists - static StatusOr checkTSIndex(const std::vector& tsClients, - const std::string& index); - // Drops the full-text index - static StatusOr dropTSIndex(const std::vector& tsClients, - const std::string& index); - - // Clears the full-text index data, but keeps the index schema - static StatusOr clearTSIndex(const std::vector& tsClients, - const std::string& index); - - // Converts TextSearchExpression into a relational expression that could be pushed down - static StatusOr rewriteTSFilter( - ObjectPool* pool, - bool isEdge, - Expression* expr, - const std::string& index, - const std::vector& tsClients); + + static StatusOr<::nebula::plugin::ESAdapter> getESAdapter(meta::MetaClient* client); + + // Converts TextSearchExpression into a relational expresion that could be pushed down + static StatusOr rewriteTSFilter(ObjectPool* pool, + bool isEdge, + Expression* expr, + const std::string& index, + ::nebula::plugin::ESAdapter& esAdapter); // Performs full-text search using elastic search adapter // Search type is defined by the expression kind - static StatusOr> textSearch( - Expression* expr, - const std::string& index, - const std::vector& tsClients); - - // Picks a random full-text search client from the given list - static const nebula::plugin::HttpClient& randomFTClient( - const std::vector& tsClients); + static StatusOr textSearch(Expression* expr, + const std::string& index, + ::nebula::plugin::ESAdapter& esAdapter); }; } // namespace graph diff --git a/src/graph/util/test/CMakeLists.txt b/src/graph/util/test/CMakeLists.txt index 5de9c601e43..09aa0bd8d6a 100644 --- a/src/graph/util/test/CMakeLists.txt +++ b/src/graph/util/test/CMakeLists.txt @@ -27,6 +27,7 @@ nebula_add_test( SOURCES ExpressionUtilsTest.cpp IdGeneratorTest.cpp + FTindexUtilsTest.cpp OBJECTS $ $ @@ -52,7 +53,6 @@ nebula_add_test( $ $ $ - $ $ $ $ @@ -72,11 +72,14 @@ nebula_add_test( $ $ $ + $ + $ LIBRARIES gtest + gmock gtest_main ${Boost_Thread_LIBRARY} ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} - # curl + curl ) diff --git a/src/graph/util/test/FTindexUtilsTest.cpp b/src/graph/util/test/FTindexUtilsTest.cpp new file mode 100644 index 00000000000..f131d054993 --- /dev/null +++ b/src/graph/util/test/FTindexUtilsTest.cpp @@ -0,0 +1,162 @@ +// Copyright (c) 2022 vesoft inc. All rights reserved. +// +// This source code is licensed under Apache 2.0 License. + +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" +#include "glog/logging.h" +#include "gmock/gmock.h" +#include "graph/util/ExpressionUtils.h" +#include "graph/util/FTIndexUtils.h" +#include "gtest/gtest.h" + +DECLARE_uint32(ft_request_retry_times); + +using ::testing::_; +using ::testing::Return; + +namespace nebula::graph { + +class MockESAdapter : public plugin::ESAdapter { + public: + MOCK_METHOD(StatusOr, + fuzzy, + (const std::string&, const std::string&, const std::string&, int64_t, int64_t), + (override)); + MOCK_METHOD(StatusOr, + prefix, + (const std::string&, const std::string&, int64_t, int64_t), + (override)); + MOCK_METHOD(StatusOr, + regexp, + (const std::string&, const std::string&, int64_t, int64_t), + (override)); + MOCK_METHOD(StatusOr, + wildcard, + (const std::string&, const std::string&, int64_t, int64_t), + (override)); +}; + +class FTIndexUtilsTest : public ::testing::Test { + public: + protected: + ObjectPool pool_; + Expression* eq(Expression* left, Expression* right) { + return RelationalExpression::makeEQ(&pool_, left, right); + } + Expression* or_(const std::vector& exprs) { + return ExpressionUtils::pushOrs(&pool_, exprs); + } + Expression* tagProp(const std::string& tag, const std::string& prop) { + return TagPropertyExpression::make(&pool_, tag, prop); + } + Expression* edgeProp(const std::string& edge, const std::string& prop) { + return EdgePropertyExpression::make(&pool_, edge, prop); + } + template + Expression* constant(const T& value) { + return ConstantExpression::make(&pool_, Value(value)); + } +}; + +TEST_F(FTIndexUtilsTest, rewriteTSFilter) { + ObjectPool pool; + std::string indexName = "test_index"; + std::string tagName = "tag1"; + std::string edgeName = "edge1"; + std::string propName = "prop1"; + using Item = plugin::ESQueryResult::Item; + std::vector items = {Item("1", "abc"), Item("2", "abc1"), Item("3", "abc")}; + plugin::ESQueryResult esResult; + esResult.items = items; + { + MockESAdapter mockESAdapter; + EXPECT_CALL(mockESAdapter, prefix(indexName, "prefix_pattern", -1, -1)) + .WillOnce(Return(esResult)); + auto argument = TextSearchArgument::make(&pool, tagName, propName, "prefix_pattern"); + auto expr = TextSearchExpression::makePrefix(&pool, argument); + auto expect = or_({eq(tagProp(tagName, propName), constant("abc")), + eq(tagProp(tagName, propName), constant("abc1")), + eq(tagProp(tagName, propName), constant("abc"))}); + auto result = FTIndexUtils::rewriteTSFilter(&pool, false, expr, indexName, mockESAdapter); + ASSERT_TRUE(result.ok()) << result.status().message(); + ASSERT_EQ(*result.value(), *expect) << result.value()->toString() << "\t" << expect->toString(); + } + { + plugin::ESQueryResult emptyEsResult; + MockESAdapter mockESAdapter; + EXPECT_CALL(mockESAdapter, prefix(indexName, "prefix_pattern", -1, -1)) + .WillOnce(Return(emptyEsResult)); + auto argument = TextSearchArgument::make(&pool, tagName, propName, "prefix_pattern"); + auto expr = TextSearchExpression::makePrefix(&pool, argument); + auto result = FTIndexUtils::rewriteTSFilter(&pool, false, expr, indexName, mockESAdapter); + ASSERT_TRUE(result.ok()) << result.status().message(); + ASSERT_EQ(result.value(), nullptr); + } + { + Status status = Status::Error("mock error"); + MockESAdapter mockESAdapter; + EXPECT_CALL(mockESAdapter, prefix(indexName, "prefix_pattern", -1, -1)) + .Times(FLAGS_ft_request_retry_times) + .WillRepeatedly(Return(status)); + auto argument = TextSearchArgument::make(&pool, tagName, propName, "prefix_pattern"); + auto expr = TextSearchExpression::makePrefix(&pool, argument); + auto result = FTIndexUtils::rewriteTSFilter(&pool, false, expr, indexName, mockESAdapter); + ASSERT_FALSE(result.ok()); + ASSERT_EQ(result.status().message(), "mock error"); + } + { + MockESAdapter mockESAdapter; + EXPECT_CALL(mockESAdapter, wildcard(indexName, "wildcard_pattern", -1, -1)) + .WillOnce(Return(esResult)); + auto argument = TextSearchArgument::make(&pool, edgeName, propName, "wildcard_pattern"); + auto expr = TextSearchExpression::makeWildcard(&pool, argument); + auto expect = or_({eq(edgeProp(edgeName, propName), constant("abc")), + eq(edgeProp(edgeName, propName), constant("abc1")), + eq(edgeProp(edgeName, propName), constant("abc"))}); + auto result = FTIndexUtils::rewriteTSFilter(&pool, true, expr, indexName, mockESAdapter); + ASSERT_TRUE(result.ok()) << result.status().message(); + ASSERT_EQ(*result.value(), *expect) << result.value()->toString() << "\t" << expect->toString(); + } + { + plugin::ESQueryResult singleEsResult; + singleEsResult.items = {Item("a", "b", 1, "edge text")}; + MockESAdapter mockESAdapter; + EXPECT_CALL(mockESAdapter, wildcard(indexName, "wildcard_pattern", -1, -1)) + .WillOnce(Return(singleEsResult)); + auto argument = TextSearchArgument::make(&pool, edgeName, propName, "wildcard_pattern"); + auto expr = TextSearchExpression::makeWildcard(&pool, argument); + auto expect = eq(edgeProp(edgeName, propName), constant("edge text")); + auto result = FTIndexUtils::rewriteTSFilter(&pool, true, expr, indexName, mockESAdapter); + ASSERT_TRUE(result.ok()) << result.status().message(); + ASSERT_EQ(*result.value(), *expect) << result.value()->toString() << "\t" << expect->toString(); + } + { + MockESAdapter mockESAdapter; + EXPECT_CALL(mockESAdapter, regexp(indexName, "regexp_pattern", -1, -1)) + .WillOnce(Return(esResult)); + auto argument = TextSearchArgument::make(&pool, edgeName, propName, "regexp_pattern"); + auto expr = TextSearchExpression::makeRegexp(&pool, argument); + auto expect = or_({eq(edgeProp(edgeName, propName), constant("abc")), + eq(edgeProp(edgeName, propName), constant("abc1")), + eq(edgeProp(edgeName, propName), constant("abc"))}); + auto result = FTIndexUtils::rewriteTSFilter(&pool, true, expr, indexName, mockESAdapter); + ASSERT_TRUE(result.ok()) << result.status().message(); + ASSERT_EQ(*result.value(), *expect) << result.value()->toString() << "\t" << expect->toString(); + } + { + MockESAdapter mockESAdapter; + EXPECT_CALL(mockESAdapter, fuzzy(indexName, "fuzzy_pattern", "1", -1, -1)) + .WillOnce(Return(esResult)); + auto argument = TextSearchArgument::make(&pool, tagName, propName, "fuzzy_pattern"); + argument->setFuzziness(1); + auto expr = TextSearchExpression::makeFuzzy(&pool, argument); + auto expect = or_({eq(tagProp(tagName, propName), constant("abc")), + eq(tagProp(tagName, propName), constant("abc1")), + eq(tagProp(tagName, propName), constant("abc"))}); + auto result = FTIndexUtils::rewriteTSFilter(&pool, false, expr, indexName, mockESAdapter); + ASSERT_TRUE(result.ok()) << result.status().message(); + ASSERT_EQ(*result.value(), *expect) << result.value()->toString() << "\t" << expect->toString(); + } +} + +} // namespace nebula::graph diff --git a/src/graph/validator/AdminValidator.h b/src/graph/validator/AdminValidator.h index b0a2a87ec10..e9f10b234d5 100644 --- a/src/graph/validator/AdminValidator.h +++ b/src/graph/validator/AdminValidator.h @@ -7,7 +7,6 @@ #define GRAPH_VALIDATOR_ADMINVALIDATOR_H_ #include "clients/meta/MetaClient.h" -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" #include "graph/validator/Validator.h" #include "parser/AdminSentences.h" #include "parser/MaintainSentences.h" diff --git a/src/graph/validator/LookupValidator.cpp b/src/graph/validator/LookupValidator.cpp index 2ea3be001ce..ea88bb0c2e3 100644 --- a/src/graph/validator/LookupValidator.cpp +++ b/src/graph/validator/LookupValidator.cpp @@ -505,11 +505,7 @@ StatusOr LookupValidator::checkTSExpr(Expression* expr) { auto tsi = metaClient->getFTIndexBySpaceSchemaFromCache(spaceId(), schemaId()); NG_RETURN_IF_ERROR(tsi); auto tsName = tsi.value().first; - auto ret = FTIndexUtils::checkTSIndex(tsClients_, tsName); - NG_RETURN_IF_ERROR(ret); - if (!ret.value()) { - return Status::SemanticError("text search index not found : %s", tsName.c_str()); - } + auto ftFields = tsi.value().second.get_fields(); auto tsExpr = static_cast(expr); auto prop = tsExpr->arg()->prop(); @@ -606,13 +602,13 @@ Status LookupValidator::getSchemaProvider(shared_ptr // Generate text search filter, check validity and rewrite StatusOr LookupValidator::genTsFilter(Expression* filter) { - auto tsRet = FTIndexUtils::getTSClients(qctx_->getMetaClient()); - NG_RETURN_IF_ERROR(tsRet); - tsClients_ = std::move(tsRet).value(); + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx_->getMetaClient()); + NG_RETURN_IF_ERROR(esAdapterRet); + auto esAdapter = std::move(esAdapterRet).value(); auto tsIndex = checkTSExpr(filter); NG_RETURN_IF_ERROR(tsIndex); return FTIndexUtils::rewriteTSFilter( - qctx_->objPool(), lookupCtx_->isEdge, filter, tsIndex.value(), tsClients_); + qctx_->objPool(), lookupCtx_->isEdge, filter, tsIndex.value(), esAdapter); } } // namespace graph diff --git a/src/graph/validator/LookupValidator.h b/src/graph/validator/LookupValidator.h index 9889d99971e..79ed4ce4e42 100644 --- a/src/graph/validator/LookupValidator.h +++ b/src/graph/validator/LookupValidator.h @@ -5,7 +5,6 @@ #ifndef _VALIDATOR_LOOKUP_VALIDATOR_H_ #define _VALIDATOR_LOOKUP_VALIDATOR_H_ -#include "common/plugin/fulltext/elasticsearch/ESGraphAdapter.h" #include "graph/validator/Validator.h" namespace nebula { @@ -56,7 +55,6 @@ class LookupValidator final : public Validator { void extractExprProps(); std::unique_ptr lookupCtx_; - std::vector tsClients_; ExpressionProps exprProps_; std::vector idxReturnCols_; std::vector schemaIds_; diff --git a/src/graph/validator/MaintainValidator.cpp b/src/graph/validator/MaintainValidator.cpp index b347fceda5b..02c93c173fe 100644 --- a/src/graph/validator/MaintainValidator.cpp +++ b/src/graph/validator/MaintainValidator.cpp @@ -10,6 +10,7 @@ #include "common/base/Status.h" #include "common/charset/Charset.h" #include "common/expression/ConstantExpression.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "graph/planner/plan/Admin.h" #include "graph/planner/plan/Maintain.h" #include "graph/planner/plan/Query.h" @@ -601,11 +602,12 @@ Status CreateFTIndexValidator::validateImpl() { if (containUpper) { return Status::SyntaxError("Fulltext index names cannot contain uppercase letters"); } - auto tsRet = FTIndexUtils::getTSClients(qctx_->getMetaClient()); - NG_RETURN_IF_ERROR(tsRet); - auto tsIndex = FTIndexUtils::checkTSIndex(std::move(tsRet).value(), name); - NG_RETURN_IF_ERROR(tsIndex); - if (tsIndex.value()) { + auto esAdapterRet = FTIndexUtils::getESAdapter(qctx_->getMetaClient()); + NG_RETURN_IF_ERROR(esAdapterRet); + auto esAdapter = std::move(esAdapterRet).value(); + auto existResult = esAdapter.isIndexExist(name); + NG_RETURN_IF_ERROR(existResult); + if (existResult.value()) { return Status::Error("text search index exist : %s", name.c_str()); } auto space = vctx_->whichSpace(); @@ -621,7 +623,7 @@ Status CreateFTIndexValidator::validateImpl() { } index_.space_id_ref() = space.id; index_.depend_schema_ref() = std::move(id); - index_.fields_ref() = sentence->fields(); + index_.fields_ref()->push_back(sentence->field()); return Status::OK(); } @@ -634,8 +636,6 @@ Status CreateFTIndexValidator::toPlan() { } Status DropFTIndexValidator::validateImpl() { - auto tsRet = FTIndexUtils::getTSClients(qctx_->getMetaClient()); - NG_RETURN_IF_ERROR(tsRet); return Status::OK(); } diff --git a/src/graph/validator/test/CMakeLists.txt b/src/graph/validator/test/CMakeLists.txt index 616fa901998..d1401408bc4 100644 --- a/src/graph/validator/test/CMakeLists.txt +++ b/src/graph/validator/test/CMakeLists.txt @@ -55,7 +55,7 @@ set(VALIDATOR_TEST_LIBS $ $ $ - $ + $ $ $ $ diff --git a/src/graph/visitor/test/CMakeLists.txt b/src/graph/visitor/test/CMakeLists.txt index d3d3acd68f0..0c2b37e40a2 100644 --- a/src/graph/visitor/test/CMakeLists.txt +++ b/src/graph/visitor/test/CMakeLists.txt @@ -71,7 +71,7 @@ nebula_add_test( $ $ $ - $ + $ $ $ $ diff --git a/src/kvstore/LogEncoder.h b/src/kvstore/LogEncoder.h index 4e8b024472b..88ef21f70bc 100644 --- a/src/kvstore/LogEncoder.h +++ b/src/kvstore/LogEncoder.h @@ -200,7 +200,7 @@ class BatchHolder : public boost::noncopyable, public nebula::cpp::NonMovable { * * @return const std::vector>& */ - const std::vector>& getBatch() { + const std::vector>& getBatch() const { return batch_; } diff --git a/src/kvstore/listener/elasticsearch/ESListener.cpp b/src/kvstore/listener/elasticsearch/ESListener.cpp index 58939e4cfcb..bf5d6d83eba 100644 --- a/src/kvstore/listener/elasticsearch/ESListener.cpp +++ b/src/kvstore/listener/elasticsearch/ESListener.cpp @@ -5,7 +5,7 @@ #include "kvstore/listener/elasticsearch/ESListener.h" -#include "common/plugin/fulltext/elasticsearch/ESStorageAdapter.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "common/utils/NebulaKeyUtils.h" DECLARE_uint32(ft_request_retry_times); @@ -25,17 +25,18 @@ void ESListener::init() { if (!cRet.ok() || cRet.value().empty()) { LOG(FATAL) << "elasticsearch clients error"; } + std::vector esClients; for (const auto& c : cRet.value()) { - nebula::plugin::HttpClient hc; - hc.host = c.host; + auto host = c.host; + std::string user, password; if (c.user_ref().has_value()) { - hc.user = *c.user_ref(); - hc.password = *c.pwd_ref(); + user = *c.user_ref(); + password = *c.pwd_ref(); } - hc.connType = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; - esClients_.emplace_back(std::move(hc)); + std::string protocol = c.conn_type_ref().has_value() ? *c.get_conn_type() : "http"; + esClients.emplace_back(HttpClient::instance(), protocol, host.toString(), user, password); } - + esAdapter_.setClients(std::move(esClients)); auto sRet = schemaMan_->toGraphSpaceName(spaceId_); if (!sRet.ok()) { LOG(FATAL) << "space name error"; @@ -43,30 +44,93 @@ void ESListener::init() { spaceName_ = std::make_unique(sRet.value()); } -bool ESListener::apply(const std::vector& data) { - std::vector docItems; - for (const auto& kv : data) { - if (!nebula::NebulaKeyUtils::isTag(vIdLen_, kv.first) && - !nebula::NebulaKeyUtils::isEdge(vIdLen_, kv.first)) { - continue; +bool ESListener::apply(const BatchHolder& batch) { + nebula::plugin::ESBulk bulk; + auto callback = [&bulk](BatchLogType type, + const std::string& index, + const std::string& vid, + const std::string& src, + const std::string& dst, + int64_t rank, + const std::string& text) { + if (type == BatchLogType::OP_BATCH_PUT) { + bulk.put(index, vid, src, dst, rank, text); + } else if (type == BatchLogType::OP_BATCH_REMOVE) { + bulk.delete_(index, vid, src, dst, rank); + } else { + LOG(FATAL) << "Unexpect"; } - if (!appendDocItem(docItems, kv)) { + }; + for (const auto& log : batch.getBatch()) { + pickTagAndEdgeData(std::get<0>(log), std::get<1>(log), std::get<2>(log), callback); + } + if (!bulk.empty()) { + auto status = esAdapter_.bulk(bulk); + if (!status.ok()) { + LOG(ERROR) << status; return false; } - if (docItems.size() >= static_cast(FLAGS_ft_bulk_batch_size)) { - auto suc = writeData(docItems); - if (!suc) { - return suc; - } - docItems.clear(); - } - } - if (!docItems.empty()) { - return writeData(docItems); } return true; } +void ESListener::pickTagAndEdgeData(BatchLogType type, + const std::string& key, + const std::string& value, + const PickFunc& callback) { + if (nebula::NebulaKeyUtils::isTag(vIdLen_, key)) { + auto tagId = NebulaKeyUtils::getTagId(vIdLen_, key); + auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, tagId); + if (!ftIndexRes.ok()) { + return; + } + auto ftIndex = std::move(ftIndexRes).value(); + auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, value); + if (reader == nullptr) { + LOG(ERROR) << "get tag reader failed, tagID " << tagId; + return; + } + if (ftIndex.second.get_fields().size() > 1) { + LOG(ERROR) << "Only one field will create fulltext index"; + } + auto field = ftIndex.second.get_fields().front(); + auto v = reader->getValueByName(field); + if (v.type() != Value::Type::STRING) { + LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + } + std::string indexName = ftIndex.first; + std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString(); + std::string text = std::move(v).getStr(); + callback(type, indexName, vid, "", "", 0, text); + } else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) { + auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key); + auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, edgeType); + if (!ftIndexRes.ok()) { + return; + } + auto ftIndex = std::move(ftIndexRes).value(); + auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, value); + if (reader == nullptr) { + LOG(ERROR) << "get edge reader failed, schema ID " << edgeType; + return; + } + if (ftIndex.second.get_fields().size() > 1) { + LOG(ERROR) << "Only one field will create fulltext index"; + } + auto field = ftIndex.second.get_fields().front(); + auto v = reader->getValueByName(field); + if (v.type() != Value::Type::STRING) { + LOG(ERROR) << "Can't create fulltext index on type " << v.type(); + } + std::string indexName = ftIndex.first; + std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString(); + std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString(); + int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key); + std::string text = std::move(v).getStr(); + callback(type, indexName, "", src, dst, rank, text); + } +} + bool ESListener::persist(LogID lastId, TermID lastTerm, LogID lastApplyLogId) { if (!writeAppliedId(lastId, lastTerm, lastApplyLogId)) { LOG(FATAL) << "last apply ids write failed"; @@ -146,105 +210,6 @@ std::string ESListener::encodeAppliedId(LogID lastId, return val; } -bool ESListener::appendDocItem(std::vector& items, const KV& kv) const { - auto isEdge = NebulaKeyUtils::isEdge(vIdLen_, kv.first); - return isEdge ? appendEdgeDocItem(items, kv) : appendTagDocItem(items, kv); -} - -bool ESListener::appendEdgeDocItem(std::vector& items, const KV& kv) const { - auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, kv.first); - auto ftIndex = schemaMan_->getFTIndex(spaceId_, edgeType); - if (!ftIndex.ok()) { - VLOG(3) << "get text search index failed"; - return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false; - } - auto reader = RowReaderWrapper::getEdgePropReader(schemaMan_, spaceId_, edgeType, kv.second); - if (reader == nullptr) { - VLOG(3) << "get edge reader failed, schema ID " << edgeType; - return false; - } - return appendDocs(items, reader.get(), std::move(ftIndex).value()); -} - -bool ESListener::appendTagDocItem(std::vector& items, const KV& kv) const { - auto tagId = NebulaKeyUtils::getTagId(vIdLen_, kv.first); - auto ftIndex = schemaMan_->getFTIndex(spaceId_, tagId); - if (!ftIndex.ok()) { - VLOG(3) << "get text search index failed"; - return (ftIndex.status() == nebula::Status::IndexNotFound()) ? true : false; - } - auto reader = RowReaderWrapper::getTagPropReader(schemaMan_, spaceId_, tagId, kv.second); - if (reader == nullptr) { - VLOG(3) << "get tag reader failed, tagID " << tagId; - return false; - } - return appendDocs(items, reader.get(), std::move(ftIndex).value()); -} - -bool ESListener::appendDocs(std::vector& items, - RowReader* reader, - const std::pair& fti) const { - for (const auto& field : fti.second.get_fields()) { - auto v = reader->getValueByName(field); - if (v.type() != Value::Type::STRING) { - continue; - } - items.emplace_back(DocItem(fti.first, field, partId_, std::move(v).getStr())); - } - return true; -} - -bool ESListener::writeData(const std::vector& items) const { - bool isNeedWriteOneByOne = false; - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto index = folly::Random::rand32(esClients_.size() - 1); - auto suc = nebula::plugin::ESStorageAdapter::kAdapter->bulk(esClients_[index], items); - if (!suc.ok()) { - VLOG(3) << "bulk failed. retry : " << retryCnt; - continue; - } - if (!suc.value()) { - isNeedWriteOneByOne = true; - break; - } - return true; - } - if (isNeedWriteOneByOne) { - return writeDatum(items); - } - LOG(WARNING) << idStr_ << "Failed to bulk into es."; - return false; -} - -bool ESListener::writeDatum(const std::vector& items) const { - bool done = false; - for (const auto& item : items) { - done = false; - auto retryCnt = FLAGS_ft_request_retry_times; - while (--retryCnt > 0) { - auto index = folly::Random::rand32(esClients_.size() - 1); - auto suc = nebula::plugin::ESStorageAdapter::kAdapter->put(esClients_[index], item); - if (!suc.ok()) { - VLOG(3) << "put failed. retry : " << retryCnt; - continue; - } - if (!suc.value()) { - // TODO (sky) : Record failed data - break; - } - done = true; - break; - } - if (!done) { - // means CURL fails, and no need to take the next step - LOG(INFO) << idStr_ << "Failed to put into es."; - return false; - } - } - return true; -} - void ESListener::processLogs() { std::unique_ptr iter; { @@ -256,8 +221,7 @@ void ESListener::processLogs() { } LogID lastApplyId = -1; - // the kv pair which can sync to remote safely - std::vector data; + BatchHolder batch; while (iter->valid()) { lastApplyId = iter->logId(); @@ -273,28 +237,50 @@ void ESListener::processLogs() { case OP_PUT: { auto pieces = decodeMultiValues(log); DCHECK_EQ(2, pieces.size()); - data.emplace_back(pieces[0], pieces[1]); + batch.put(pieces[0].toString(), pieces[1].toString()); break; } case OP_MULTI_PUT: { auto kvs = decodeMultiValues(log); DCHECK_EQ(0, kvs.size() % 2); for (size_t i = 0; i < kvs.size(); i += 2) { - data.emplace_back(kvs[i], kvs[i + 1]); + batch.put(kvs[i].toString(), kvs[i + 1].toString()); } break; } - case OP_REMOVE: - case OP_REMOVE_RANGE: + case OP_REMOVE: { + auto key = decodeSingleValue(log); + batch.remove(key.toString()); + break; + } + case OP_REMOVE_RANGE: { + LOG(WARNING) << "ESListener don't deal with OP_REMOVE_RANGE"; + break; + } case OP_MULTI_REMOVE: { + auto keys = decodeMultiValues(log); + for (auto key : keys) { + batch.remove(key.toString()); + } break; } case OP_BATCH_WRITE: { - auto batch = decodeBatchValue(log); - for (auto& op : batch) { + auto batchData = decodeBatchValue(log); + for (auto& op : batchData) { // OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored - if (op.first == BatchLogType::OP_BATCH_PUT) { - data.emplace_back(op.second.first, op.second.second); + switch (op.first) { + case BatchLogType::OP_BATCH_PUT: { + batch.put(op.second.first.toString(), op.second.second.toString()); + break; + } + case BatchLogType::OP_BATCH_REMOVE: { + batch.remove(op.second.first.toString()); + break; + } + case BatchLogType::OP_BATCH_REMOVE_RANGE: { + batch.rangeRemove(op.second.first.toString(), op.second.second.toString()); + break; + } } } break; @@ -310,13 +296,14 @@ void ESListener::processLogs() { } } - if (static_cast(data.size()) > FLAGS_listener_commit_batch_size) { + if (static_cast(batch.size()) > FLAGS_listener_commit_batch_size) { break; } ++(*iter); } + // apply to state machine - if (lastApplyId != -1 && apply(data)) { + if (lastApplyId != -1 && apply(batch)) { std::lock_guard guard(raftLock_); lastApplyLogId_ = lastApplyId; persist(committedLogId_, term_, lastApplyLogId_); @@ -332,15 +319,14 @@ std::tuple ESListener::commitSnapshot VLOG(2) << idStr_ << "Listener is committing snapshot."; int64_t count = 0; int64_t size = 0; - std::vector data; - data.reserve(rows.size()); + BatchHolder batch; for (const auto& row : rows) { count++; size += row.size(); auto kv = decodeKV(row); - data.emplace_back(kv.first, kv.second); + batch.put(kv.first.toString(), kv.second.toString()); } - if (!apply(data)) { + if (!apply(batch)) { LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot."; return { nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, kNoSnapshotSize}; diff --git a/src/kvstore/listener/elasticsearch/ESListener.h b/src/kvstore/listener/elasticsearch/ESListener.h index 0e079735926..ba0eaa59a68 100644 --- a/src/kvstore/listener/elasticsearch/ESListener.h +++ b/src/kvstore/listener/elasticsearch/ESListener.h @@ -7,14 +7,12 @@ #define KVSTORE_LISTENER_ES_LISTENER_H_ #include "codec/RowReaderWrapper.h" -#include "common/plugin/fulltext/FTStorageAdapter.h" +#include "common/plugin/fulltext/elasticsearch/ESAdapter.h" #include "kvstore/listener/Listener.h" namespace nebula { namespace kvstore { -using nebula::plugin::DocItem; - class ESListener : public Listener { public: /** @@ -56,7 +54,7 @@ class ESListener : public Listener { * @param data Key/value to apply * @return True if succeed. False if failed. */ - bool apply(const std::vector& data); + bool apply(const BatchHolder& batch); /** * @brief Persist commitLogId commitLogTerm and lastApplyLogId @@ -106,66 +104,22 @@ class ESListener : public Listener { */ std::string encodeAppliedId(LogID lastId, TermID lastTerm, LogID lastApplyLogId) const noexcept; - /** - * @brief Convert key value to DocItem - * - * @param items DocItems to send - * @param kv Key/value to encode into DocItems - * @return Whether append DocItem succeed - */ - bool appendDocItem(std::vector& items, const KV& kv) const; - - /** - * @brief Convert edge key value to DocItem - * - * @param items DocItems to send - * @param kv Edge key/value to encode into DocItems - * @return Whether append DocItem succeed - */ - bool appendEdgeDocItem(std::vector& items, const KV& kv) const; - - /** - * @brief Convert tag key value to DocItem - * - * @param items DocItems to send - * @param kv Edge key/value to encode into DocItems - * @return Whether append DocItem succeed - */ - bool appendTagDocItem(std::vector& items, const KV& kv) const; - - /** - * @brief Add the fulltext index field to DocItem - * - * @param items DocItems to send - * @param reader Key/value's reader - * @param fti Fulltext index schema - * @return Whether append DocItem succeed - */ - bool appendDocs(std::vector& items, - RowReader* reader, - const std::pair& fti) const; - - /** - * @brief Bulk DocItem to es - * - * @param items DocItems to send - * @return Whether send succeed - */ - bool writeData(const std::vector& items) const; - - /** - * @brief Put DocItem to es - * - * @param items DocItems to send - * @return Whether send succeed - */ - bool writeDatum(const std::vector& items) const; - private: meta::SchemaManager* schemaMan_{nullptr}; + using PickFunc = std::function; + void pickTagAndEdgeData(BatchLogType type, + const std::string& key, + const std::string& value, + const PickFunc& func); std::unique_ptr lastApplyLogFile_{nullptr}; std::unique_ptr spaceName_{nullptr}; - std::vector esClients_; + ::nebula::plugin::ESAdapter esAdapter_; int32_t vIdLen_; }; diff --git a/src/kvstore/listener/test/CMakeLists.txt b/src/kvstore/listener/test/CMakeLists.txt index b1e87a75ab2..1041cfd17e9 100644 --- a/src/kvstore/listener/test/CMakeLists.txt +++ b/src/kvstore/listener/test/CMakeLists.txt @@ -37,7 +37,6 @@ set(LISTENER_TEST_LIBS $ $ $ - $ $ $ $ @@ -45,6 +44,8 @@ set(LISTENER_TEST_LIBS $ $ $ + $ + $ ) nebula_add_test( @@ -60,4 +61,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/kvstore/listener/test/NebulaListenerTest.cpp b/src/kvstore/listener/test/NebulaListenerTest.cpp index 3d454cfc23f..34714b8165b 100644 --- a/src/kvstore/listener/test/NebulaListenerTest.cpp +++ b/src/kvstore/listener/test/NebulaListenerTest.cpp @@ -6,6 +6,8 @@ #include #include +#include + #include "common/base/Base.h" #include "common/fs/FileUtils.h" #include "common/fs/TempDir.h" @@ -44,7 +46,11 @@ class DummyListener : public Listener { : Listener(spaceId, partId, localAddr, walPath, ioPool, workers, handlers) {} std::vector data() { - return data_; + std::vector ret; + for (auto& [key, value] : data_) { + ret.emplace_back(key, value); + } + return ret; } std::tuple commitSnapshot(const std::vector& rows, @@ -57,15 +63,14 @@ class DummyListener : public Listener { int64_t size = 0; std::tuple result{ nebula::cpp2::ErrorCode::SUCCEEDED, count, size}; - std::vector data; - data.reserve(rows.size()); + BatchHolder batch; for (const auto& row : rows) { count++; size += row.size(); auto kv = decodeKV(row); - data.emplace_back(kv.first, kv.second); + batch.put(kv.first.toString(), kv.second.toString()); } - if (!apply(data)) { + if (!apply(batch)) { LOG(INFO) << idStr_ << "Failed to apply data while committing snapshot."; result = {nebula::cpp2::ErrorCode::E_RAFT_PERSIST_SNAPSHOT_FAILED, kNoSnapshotCount, @@ -117,8 +122,8 @@ class DummyListener : public Listener { } LogID lastApplyId = -1; - // the kv pair which can sync to remote safely - std::vector data; + // // the kv pair which can sync to remote safely + BatchHolder batch; while (iter->valid()) { lastApplyId = iter->logId(); @@ -134,28 +139,52 @@ class DummyListener : public Listener { case OP_PUT: { auto pieces = decodeMultiValues(log); DCHECK_EQ(2, pieces.size()); - data.emplace_back(pieces[0], pieces[1]); + batch.put(pieces[0].toString(), pieces[1].toString()); break; } case OP_MULTI_PUT: { auto kvs = decodeMultiValues(log); DCHECK_EQ(0, kvs.size() % 2); for (size_t i = 0; i < kvs.size(); i += 2) { - data.emplace_back(kvs[i], kvs[i + 1]); + batch.put(kvs[i].toString(), kvs[i + 1].toString()); } break; } - case OP_REMOVE: - case OP_REMOVE_RANGE: + case OP_REMOVE: { + auto key = decodeSingleValue(log); + batch.remove(key.toString()); + break; + } + case OP_REMOVE_RANGE: { + auto kvs = decodeMultiValues(log); + DCHECK_EQ(2, kvs.size()); + batch.rangeRemove(kvs[0].toString(), kvs[1].toString()); + break; + } case OP_MULTI_REMOVE: { + auto keys = decodeMultiValues(log); + for (auto key : keys) { + batch.remove(key.toString()); + } break; } case OP_BATCH_WRITE: { - auto batch = decodeBatchValue(log); - for (auto& op : batch) { + auto batchData = decodeBatchValue(log); + for (auto& op : batchData) { // OP_BATCH_REMOVE and OP_BATCH_REMOVE_RANGE is igored - if (op.first == BatchLogType::OP_BATCH_PUT) { - data.emplace_back(op.second.first, op.second.second); + switch (op.first) { + case BatchLogType::OP_BATCH_PUT: { + batch.put(op.second.first.toString(), op.second.second.toString()); + break; + } + case BatchLogType::OP_BATCH_REMOVE: { + batch.remove(op.second.first.toString()); + break; + } + case BatchLogType::OP_BATCH_REMOVE_RANGE: { + batch.rangeRemove(op.second.first.toString(), op.second.second.toString()); + break; + } } } break; @@ -173,7 +202,7 @@ class DummyListener : public Listener { ++(*iter); } // apply to state machine - if (lastApplyId != -1 && apply(data)) { + if (lastApplyId != -1 && apply(batch)) { std::lock_guard guard(raftLock_); lastApplyLogId_ = lastApplyId; persist(committedLogId_, term_, lastApplyLogId_); @@ -184,10 +213,30 @@ class DummyListener : public Listener { protected: void init() override {} - bool apply(const std::vector& kvs) { - for (const auto& kv : kvs) { - data_.emplace_back(kv); + bool apply(const BatchHolder& batch) { + for (auto& log : batch.getBatch()) { + switch (std::get<0>(log)) { + case BatchLogType::OP_BATCH_PUT: { + data_[std::get<1>(log)] = std::get<2>(log); + break; + } + case BatchLogType::OP_BATCH_REMOVE: { + data_.erase(std::get<1>(log)); + break; + } + case BatchLogType::OP_BATCH_REMOVE_RANGE: { + auto iter = data_.lower_bound(std::get<1>(log)); + while (iter != data_.end()) { + if (iter->first < std::get<2>(log)) { + iter = data_.erase(iter); + } else { + break; + } + } + } + } } + return true; } @@ -212,7 +261,7 @@ class DummyListener : public Listener { } private: - std::vector data_; + std::map data_; std::pair committedSnapshot_{0, 0}; int32_t snapshotBatchCount_{0}; }; @@ -448,9 +497,14 @@ TEST_P(ListenerBasicTest, SimpleTest) { auto dummy = dummies_[partId]; const auto& data = dummy->data(); CHECK_EQ(100, data.size()); + std::map expect; for (int32_t i = 0; i < static_cast(data.size()); i++) { - CHECK_EQ(folly::stringPrintf("key_%d_%d", partId, i), data[i].first); - CHECK_EQ(folly::stringPrintf("val_%d_%d", partId, i), data[i].second); + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); } } } @@ -518,9 +572,14 @@ TEST_P(ListenerBasicTest, TransLeaderTest) { auto dummy = dummies_[partId]; const auto& data = dummy->data(); CHECK_EQ(200, data.size()); + std::map expect; for (int32_t i = 0; i < static_cast(data.size()); i++) { - CHECK_EQ(folly::stringPrintf("key_%d_%d", partId, i), data[i].first); - CHECK_EQ(folly::stringPrintf("val_%d_%d", partId, i), data[i].second); + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); } } } @@ -552,9 +611,14 @@ TEST_P(ListenerBasicTest, CommitSnapshotTest) { auto dummy = dummies_[partId]; const auto& data = dummy->data(); CHECK_EQ(100, data.size()); + std::map expect; for (int32_t i = 0; i < static_cast(data.size()); i++) { - CHECK_EQ(folly::stringPrintf("key_%d_%d", partId, i), data[i].first); - CHECK_EQ(folly::stringPrintf("val_%d_%d", partId, i), data[i].second); + expect[fmt::format("key_{}_{}", partId, i)] = fmt::format("val_{}_{}", partId, i); + } + auto iter = expect.begin(); + for (int32_t i = 0; i < static_cast(data.size()); i++, iter++) { + CHECK_EQ(iter->first, data[i].first); + CHECK_EQ(iter->second, data[i].second); } } } diff --git a/src/kvstore/test/CMakeLists.txt b/src/kvstore/test/CMakeLists.txt index 2c2e126ef48..5db2e891e36 100644 --- a/src/kvstore/test/CMakeLists.txt +++ b/src/kvstore/test/CMakeLists.txt @@ -37,7 +37,7 @@ set(KVSTORE_TEST_LIBS $ $ $ - $ + $ $ $ $ @@ -46,6 +46,7 @@ set(KVSTORE_TEST_LIBS $ $ $ + $ ) nebula_add_test( @@ -61,6 +62,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -76,6 +78,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -91,6 +94,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -106,6 +110,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -121,6 +126,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -136,6 +142,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -151,6 +158,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -166,6 +174,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -182,6 +191,7 @@ nebula_add_executable( wangle follybenchmark boost_regex + curl ) nebula_add_executable( @@ -199,6 +209,7 @@ nebula_add_executable( follybenchmark gtest boost_regex + curl ) nebula_add_test( @@ -214,4 +225,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 357fa0db736..64ce3c56ec6 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -164,7 +164,7 @@ set(meta_test_deps $ $ $ - $ + $ $ $ $ @@ -173,6 +173,7 @@ set(meta_test_deps $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) diff --git a/src/meta/http/test/CMakeLists.txt b/src/meta/http/test/CMakeLists.txt index c4d059fb41b..6f54cc20ba8 100644 --- a/src/meta/http/test/CMakeLists.txt +++ b/src/meta/http/test/CMakeLists.txt @@ -17,4 +17,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/meta/test/CMakeLists.txt b/src/meta/test/CMakeLists.txt index 52146ed1915..f963bb4f1bf 100644 --- a/src/meta/test/CMakeLists.txt +++ b/src/meta/test/CMakeLists.txt @@ -15,6 +15,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -30,6 +31,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -45,6 +47,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -60,6 +63,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -69,12 +73,14 @@ nebula_add_test( MetaClientTest.cpp OBJECTS ${meta_test_deps} + $ LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -90,6 +96,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) @@ -107,6 +114,7 @@ nebula_add_test( wangle gtest gmock + curl ) @@ -145,6 +153,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) @@ -161,6 +170,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -176,6 +186,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -192,6 +203,7 @@ nebula_add_test( wangle gtest gmock + curl ) nebula_add_test( @@ -208,6 +220,7 @@ nebula_add_test( wangle gtest gmock + curl ) nebula_add_test( @@ -223,6 +236,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -238,6 +252,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -253,6 +268,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -268,6 +284,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -283,4 +300,5 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/parser/MaintainSentences.cpp b/src/parser/MaintainSentences.cpp index d882e5b7873..308c3736914 100644 --- a/src/parser/MaintainSentences.cpp +++ b/src/parser/MaintainSentences.cpp @@ -495,9 +495,7 @@ std::string CreateFTIndexSentence::toString() const { buf += *schemaName_; buf += "("; std::vector fieldDefs; - for (const auto& field : fields()) { - fieldDefs.emplace_back(field); - } + fieldDefs.emplace_back(field()); std::string fields; folly::join(", ", fieldDefs, fields); buf += fields; diff --git a/src/parser/MaintainSentences.h b/src/parser/MaintainSentences.h index cc56f3cdc43..6a2c59c4730 100644 --- a/src/parser/MaintainSentences.h +++ b/src/parser/MaintainSentences.h @@ -1127,11 +1127,11 @@ class CreateFTIndexSentence final : public Sentence { CreateFTIndexSentence(bool isEdge, std::string *indexName, std::string *schemaName, - NameLabelList *fields) { + std::string *field) { isEdge_ = isEdge; indexName_.reset(indexName); schemaName_.reset(schemaName); - fields_.reset(fields); + field_.reset(field); kind_ = Kind::kCreateFTIndex; } @@ -1148,20 +1148,15 @@ class CreateFTIndexSentence final : public Sentence { return schemaName_.get(); } - std::vector fields() const { - std::vector result; - auto fields = fields_->labels(); - result.resize(fields.size()); - auto get = [](auto ptr) { return *ptr; }; - std::transform(fields.begin(), fields.end(), result.begin(), get); - return result; + std::string field() const { + return *field_; } private: bool isEdge_; std::unique_ptr indexName_; std::unique_ptr schemaName_; - std::unique_ptr fields_; + std::unique_ptr field_; }; class DropFTIndexSentence final : public Sentence { public: diff --git a/src/parser/parser.yy b/src/parser/parser.yy index 7ef811740d7..7d86179e850 100644 --- a/src/parser/parser.yy +++ b/src/parser/parser.yy @@ -2693,10 +2693,10 @@ create_edge_index_sentence ; create_fulltext_index_sentence - : KW_CREATE KW_FULLTEXT KW_TAG KW_INDEX name_label KW_ON name_label L_PAREN name_label_list R_PAREN { + : KW_CREATE KW_FULLTEXT KW_TAG KW_INDEX name_label KW_ON name_label L_PAREN name_label R_PAREN { $$ = new CreateFTIndexSentence(false, $5, $7, $9); } - | KW_CREATE KW_FULLTEXT KW_EDGE KW_INDEX name_label KW_ON name_label L_PAREN name_label_list R_PAREN { + | KW_CREATE KW_FULLTEXT KW_EDGE KW_INDEX name_label KW_ON name_label L_PAREN name_label R_PAREN { $$ = new CreateFTIndexSentence(true, $5, $7, $9); } ; diff --git a/src/parser/test/CMakeLists.txt b/src/parser/test/CMakeLists.txt index 27762d80742..474a29b7cfc 100644 --- a/src/parser/test/CMakeLists.txt +++ b/src/parser/test/CMakeLists.txt @@ -32,7 +32,8 @@ set(PARSER_TEST_LIBS $ $ $ - $ + $ + $ $ $ $ @@ -60,7 +61,7 @@ nebula_add_test( SOURCES ParserTest.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) nebula_add_test( @@ -68,21 +69,21 @@ nebula_add_test( SOURCES ScannerTest.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) nebula_add_executable( NAME parser_bm SOURCES ParserBenchmark.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES follybenchmark boost_regex ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES follybenchmark boost_regex ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) nebula_add_test( NAME expression_parsing_test SOURCES ExpressionParsingTest.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} + LIBRARIES gtest gtest_main ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} curl ) if(ENABLE_FUZZ_TEST) diff --git a/src/parser/test/fuzzing/CMakeLists.txt b/src/parser/test/fuzzing/CMakeLists.txt index 2de1ac7fd94..c2814163cdb 100644 --- a/src/parser/test/fuzzing/CMakeLists.txt +++ b/src/parser/test/fuzzing/CMakeLists.txt @@ -7,5 +7,5 @@ nebula_add_test( FUZZER ON SOURCES ParserFuzzer.cpp OBJECTS ${PARSER_TEST_LIBS} - LIBRARIES ${THRIFT_LIBRARIES} wangle + LIBRARIES ${THRIFT_LIBRARIES} wangle curl ) diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 339fea49a7a..f71b6664948 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -53,7 +53,7 @@ set(storage_test_deps $ $ $ - $ + $ $ $ $ @@ -61,6 +61,7 @@ set(storage_test_deps $ $ $ + $ ) if(ENABLE_STANDALONE_VERSION) @@ -83,6 +84,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -98,6 +100,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -113,6 +116,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -128,6 +132,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -143,6 +148,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -158,6 +164,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -173,6 +180,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -188,6 +196,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -203,6 +212,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -218,6 +228,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -233,6 +244,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -249,6 +261,7 @@ nebula_add_executable( wangle follybenchmark boost_regex + curl ) @@ -266,6 +279,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -281,6 +295,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -296,6 +311,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -311,6 +327,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -328,6 +345,7 @@ nebula_add_executable( gtest follybenchmark boost_regex + curl ) nebula_add_executable( @@ -343,6 +361,7 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -358,6 +377,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -373,6 +393,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -392,6 +413,7 @@ nebula_add_executable( follybenchmark boost_regex gtest + curl ) nebula_add_test( @@ -408,6 +430,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -447,6 +470,7 @@ nebula_add_executable( wangle boost_regex gtest + curl ) nebula_add_test( @@ -462,6 +486,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -515,6 +540,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -530,6 +556,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -545,6 +572,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) #nebula_add_executable( @@ -576,6 +604,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -591,6 +620,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -606,6 +636,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -626,6 +657,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -646,6 +678,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -666,6 +699,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -681,6 +715,7 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -697,6 +732,7 @@ nebula_add_executable( wangle follybenchmark boost_regex + curl ) nebula_add_test( @@ -712,6 +748,7 @@ nebula_add_test( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_test( @@ -726,6 +763,7 @@ nebula_add_test( ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} gtest + curl ) nebula_add_executable( @@ -741,6 +779,7 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) nebula_add_executable( @@ -758,4 +797,5 @@ nebula_add_executable( boost_regex wangle gtest + curl ) diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index cd300852a8a..7aa50d98d51 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -57,10 +57,11 @@ set(tools_test_deps $ $ $ - $ + $ $ $ $ + $ ) if(NOT ENABLE_STANDALONE_VERSION) diff --git a/src/tools/simple-kv-verify/CMakeLists.txt b/src/tools/simple-kv-verify/CMakeLists.txt index 850c4c56d09..1113afcc022 100644 --- a/src/tools/simple-kv-verify/CMakeLists.txt +++ b/src/tools/simple-kv-verify/CMakeLists.txt @@ -15,4 +15,5 @@ nebula_add_executable( ${PROXYGEN_LIBRARIES} wangle gtest + curl ) diff --git a/src/tools/storage-perf/CMakeLists.txt b/src/tools/storage-perf/CMakeLists.txt index 518aa7f7718..7be1bd82fd1 100644 --- a/src/tools/storage-perf/CMakeLists.txt +++ b/src/tools/storage-perf/CMakeLists.txt @@ -38,6 +38,7 @@ nebula_add_executable( ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} gtest + curl ) #install(