Skip to content

Commit

Permalink
Merge branch 'master' into fix-reduceUnaryNot
Browse files Browse the repository at this point in the history
  • Loading branch information
Sophie-Xie authored Dec 23, 2022
2 parents 8ed5165 + b5df05c commit b11f847
Show file tree
Hide file tree
Showing 22 changed files with 1,272 additions and 79 deletions.
29 changes: 26 additions & 3 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,8 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("There are still space on the host");
case nebula::cpp2::ErrorCode::E_RELATED_FULLTEXT_INDEX_EXISTS:
return Status::Error("Related fulltext index exists, please drop it first");
case nebula::cpp2::ErrorCode::E_HOST_CAN_NOT_BE_ADDED:
return Status::Error("Could not add a host, which is not a storage and not expired either");
default:
return Status::Error("Unknown error!");
}
Expand Down Expand Up @@ -3456,8 +3458,8 @@ StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> MetaClient::getFTIndexB
return indexes;
}

StatusOr<std::pair<std::string, cpp2::FTIndex>> MetaClient::getFTIndexBySpaceSchemaFromCache(
GraphSpaceID spaceId, int32_t schemaId) {
StatusOr<std::pair<std::string, cpp2::FTIndex>> MetaClient::getFTIndexFromCache(
GraphSpaceID spaceId, int32_t schemaId, const std::string& field) {
if (!ready_) {
return Status::Error("Not ready!");
}
Expand All @@ -3467,13 +3469,34 @@ StatusOr<std::pair<std::string, cpp2::FTIndex>> MetaClient::getFTIndexBySpaceSch
auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type
? it.second.get_depend_schema().get_edge_type()
: it.second.get_depend_schema().get_tag_id();
if (it.second.get_space_id() == spaceId && id == schemaId) {
// There will only be one field. However, in order to minimize changes, the IDL was not modified
auto f = it.second.fields()->front();
if (it.second.get_space_id() == spaceId && id == schemaId && f == field) {
return std::make_pair(it.first, it.second);
}
}
return Status::IndexNotFound();
}

StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> MetaClient::getFTIndexFromCache(
GraphSpaceID spaceId, int32_t schemaId) {
if (!ready_) {
return Status::Error("Not ready!");
}
folly::rcu_reader guard;
const auto& metadata = *metadata_.load();
std::unordered_map<std::string, cpp2::FTIndex> ret;
for (auto& it : metadata.fulltextIndexMap_) {
auto id = it.second.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type
? it.second.get_depend_schema().get_edge_type()
: it.second.get_depend_schema().get_tag_id();
if (it.second.get_space_id() == spaceId && id == schemaId) {
ret[it.first] = it.second;
}
}
return ret;
}

StatusOr<cpp2::FTIndex> MetaClient::getFTIndexByNameFromCache(GraphSpaceID spaceId,
const std::string& name) {
if (!ready_) {
Expand Down
8 changes: 6 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,12 @@ class MetaClient : public BaseMetaClient {
StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> getFTIndexBySpaceFromCache(
GraphSpaceID spaceId);

StatusOr<std::pair<std::string, cpp2::FTIndex>> getFTIndexBySpaceSchemaFromCache(
GraphSpaceID spaceId, int32_t schemaId);
StatusOr<std::pair<std::string, cpp2::FTIndex>> getFTIndexFromCache(GraphSpaceID spaceId,
int32_t schemaId,
const std::string& field);

StatusOr<std::unordered_map<std::string, cpp2::FTIndex>> getFTIndexFromCache(GraphSpaceID spaceId,
int32_t schemaId);

StatusOr<cpp2::FTIndex> getFTIndexByNameFromCache(GraphSpaceID spaceId, const std::string& name);

Expand Down
30 changes: 30 additions & 0 deletions src/codec/Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,35 @@ inline std::string toHexStr(folly::StringPiece str) {
return buf;
}

// If v is longer than maxLen, return a safepoint to be cut which contains legal utf-8 characters,
// and make sure the returned size is less than or equal to maxLen
inline size_t utf8CutSize(folly::StringPiece v, size_t maxLen) {
DCHECK_GT(v.size(), maxLen);
size_t len = 0;
size_t curLen = 0; // current length of utf-8 character
while (len < maxLen) {
auto tmp = static_cast<unsigned char>(v[len]);
if (tmp >= 0xFC) {
curLen = 6;
} else if (tmp >= 0xF8) {
curLen = 5;
} else if (tmp >= 0xF0) {
curLen = 4;
} else if (tmp >= 0xE0) {
curLen = 3;
} else if (tmp >= 0xC0) {
curLen = 2;
} else {
curLen = 1;
}
if (len + curLen <= maxLen) {
len += curLen;
} else {
break;
}
}
return len;
}

} // namespace nebula
#endif // CODEC_COMMON_H_
3 changes: 2 additions & 1 deletion src/codec/RowWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <cmath>

#include "codec/Common.h"
#include "common/time/TimeUtils.h"
#include "common/time/WallClock.h"
#include "common/utils/DefaultValueContext.h"
Expand Down Expand Up @@ -679,7 +680,7 @@ WriteResult RowWriterV2::write(ssize_t index, folly::StringPiece v) noexcept {
case PropertyType::FIXED_STRING: {
// In-place string. If the pass-in string is longer than the pre-defined
// fixed length, the string will be truncated to the fixed length
size_t len = v.size() > field->size() ? field->size() : v.size();
size_t len = v.size() > field->size() ? utf8CutSize(v, field->size()) : v.size();
strncpy(&buf_[offset], v.data(), len);
if (len < field->size()) {
memset(&buf_[offset + len], 0, field->size() - len);
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/SchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class SchemaManager {
StatusOr<std::pair<bool, int32_t>> getSchemaIDByName(GraphSpaceID space,
folly::StringPiece schemaName);

virtual StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
virtual StatusOr<std::unordered_map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) = 0;

protected:
Expand Down
6 changes: 3 additions & 3 deletions src/common/meta/ServerBasedSchemaManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,9 @@ ServerBasedSchemaManager::getServiceClients(meta::cpp2::ExternalServiceType type
return std::move(ret).value();
}

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> ServerBasedSchemaManager::getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) {
auto ret = metaClient_->getFTIndexBySpaceSchemaFromCache(spaceId, schemaId);
StatusOr<std::unordered_map<std::string, nebula::meta::cpp2::FTIndex>>
ServerBasedSchemaManager::getFTIndex(GraphSpaceID spaceId, int32_t schemaId) {
auto ret = metaClient_->getFTIndexFromCache(spaceId, schemaId);
if (!ret.ok()) {
return ret.status();
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/ServerBasedSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class ServerBasedSchemaManager : public SchemaManager {
StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>> getServiceClients(
cpp2::ExternalServiceType type) override;

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
StatusOr<std::unordered_map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t schemaId) override;

void init(MetaClient *client);
Expand Down
13 changes: 3 additions & 10 deletions src/graph/validator/LookupValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -501,19 +501,12 @@ StatusOr<Expression*> LookupValidator::checkConstExpr(Expression* expr,

// Check does test search contains properties search in test search expression
StatusOr<std::string> LookupValidator::checkTSExpr(Expression* expr) {
auto tsExpr = static_cast<TextSearchExpression*>(expr);
auto prop = tsExpr->arg()->prop();
auto metaClient = qctx_->getMetaClient();
auto tsi = metaClient->getFTIndexBySpaceSchemaFromCache(spaceId(), schemaId());
auto tsi = metaClient->getFTIndexFromCache(spaceId(), schemaId(), prop);
NG_RETURN_IF_ERROR(tsi);
auto tsName = tsi.value().first;

auto ftFields = tsi.value().second.get_fields();
auto tsExpr = static_cast<TextSearchExpression*>(expr);
auto prop = tsExpr->arg()->prop();

auto iter = std::find(ftFields.begin(), ftFields.end(), prop);
if (iter == ftFields.end()) {
return Status::SemanticError("Column %s not found in %s", prop.c_str(), tsName.c_str());
}
return tsName;
}

Expand Down
4 changes: 2 additions & 2 deletions src/graph/validator/test/MockSchemaManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ class MockSchemaManager final : public nebula::meta::SchemaManager {
LOG(FATAL) << "Unimplemented.";
}

StatusOr<std::pair<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(GraphSpaceID,
int32_t) override {
StatusOr<std::unordered_map<std::string, nebula::meta::cpp2::FTIndex>> getFTIndex(
GraphSpaceID, int32_t) override {
LOG(FATAL) << "Unimplemented";
return Status::Error("Unimplemented");
}
Expand Down
1 change: 1 addition & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ enum ErrorCode {
E_NO_VALID_HOST = -2026, // Lack of valid hosts
E_CORRUPTED_BALANCE_PLAN = -2027, // A data balancing plan that has been corrupted
E_NO_INVALID_BALANCE_PLAN = -2028, // No invalid balance plan
E_HOST_CAN_NOT_BE_ADDED = -2029, // the host can not be added for it's not a storage host


// Authentication Failure
Expand Down
52 changes: 28 additions & 24 deletions src/kvstore/listener/elasticsearch/ESListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,20 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
LOG(ERROR) << "get tag reader failed, tagID " << tagId;
return;
}
if (ftIndex.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = ftIndex.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
std::string indexName = index.first;
std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
std::string text = std::move(v).getStr();
callback(type, indexName, vid, "", "", 0, text);
}
std::string indexName = ftIndex.first;
std::string vid = NebulaKeyUtils::getVertexId(vIdLen_, key).toString();
std::string text = std::move(v).getStr();
callback(type, indexName, vid, "", "", 0, text);
} else if (nebula::NebulaKeyUtils::isEdge(vIdLen_, key)) {
auto edgeType = NebulaKeyUtils::getEdgeType(vIdLen_, key);
auto ftIndexRes = schemaMan_->getFTIndex(spaceId_, edgeType);
Expand All @@ -114,20 +116,22 @@ void ESListener::pickTagAndEdgeData(BatchLogType type,
LOG(ERROR) << "get edge reader failed, schema ID " << edgeType;
return;
}
if (ftIndex.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = ftIndex.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
for (auto& index : ftIndex) {
if (index.second.get_fields().size() > 1) {
LOG(ERROR) << "Only one field will create fulltext index";
}
auto field = index.second.get_fields().front();
auto v = reader->getValueByName(field);
if (v.type() != Value::Type::STRING) {
LOG(ERROR) << "Can't create fulltext index on type " << v.type();
}
std::string indexName = index.first;
std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key);
std::string text = std::move(v).getStr();
callback(type, indexName, "", src, dst, rank, text);
}
std::string indexName = ftIndex.first;
std::string src = NebulaKeyUtils::getSrcId(vIdLen_, key).toString();
std::string dst = NebulaKeyUtils::getDstId(vIdLen_, key).toString();
int64_t rank = NebulaKeyUtils::getRank(vIdLen_, key);
std::string text = std::move(v).getStr();
callback(type, indexName, "", src, dst, rank, text);
}
}

Expand Down
20 changes: 12 additions & 8 deletions src/meta/processors/BaseProcessor-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ ErrorOr<nebula::cpp2::ErrorCode, std::vector<cpp2::IndexItem>> BaseProcessor<RES
}

template <typename RESP>
ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex> BaseProcessor<RESP>::getFTIndex(
GraphSpaceID spaceId, int32_t tagOrEdge) {
ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<std::string, cpp2::FTIndex>>
BaseProcessor<RESP>::getFTIndex(GraphSpaceID spaceId, int32_t tagOrEdge) {
const auto& indexPrefix = MetaKeyUtils::fulltextIndexPrefix();
auto iterRet = doPrefix(indexPrefix);
if (!nebula::ok(iterRet)) {
Expand All @@ -422,18 +422,18 @@ ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex> BaseProcessor<RESP>::getFTIndex(
return retCode;
}
auto indexIter = nebula::value(iterRet).get();

std::unordered_map<std::string, cpp2::FTIndex> ret;
while (indexIter->valid()) {
auto index = MetaKeyUtils::parsefulltextIndex(indexIter->val());
auto id = index.get_depend_schema().getType() == nebula::cpp2::SchemaID::Type::edge_type
? index.get_depend_schema().get_edge_type()
: index.get_depend_schema().get_tag_id();
if (spaceId == index.get_space_id() && tagOrEdge == id) {
return index;
ret[indexIter->key().toString()] = index;
}
indexIter->next();
}
return nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND;
return ret;
}

template <typename RESP>
Expand Down Expand Up @@ -464,14 +464,18 @@ nebula::cpp2::ErrorCode BaseProcessor<RESP>::indexCheck(

template <typename RESP>
nebula::cpp2::ErrorCode BaseProcessor<RESP>::ftIndexCheck(
const std::vector<std::string>& cols, const std::vector<cpp2::AlterSchemaItem>& alterItems) {
const std::unordered_map<std::string, cpp2::FTIndex>& ftIndices,
const std::vector<cpp2::AlterSchemaItem>& alterItems) {
std::set<std::string> cols;
for (auto& [indexName, index] : ftIndices) {
cols.insert(index.fields_ref()->front());
}
for (const auto& item : alterItems) {
if (*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::CHANGE ||
*item.op_ref() == nebula::meta::cpp2::AlterSchemaOp::DROP) {
const auto& itemCols = item.get_schema().get_columns();
for (const auto& iCol : itemCols) {
auto it =
std::find_if(cols.begin(), cols.end(), [&](const auto& c) { return c == iCol.name; });
auto it = cols.find(iCol.name);
if (it != cols.end()) {
LOG(INFO) << "fulltext index conflict";
return nebula::cpp2::ErrorCode::E_RELATED_FULLTEXT_INDEX_EXISTS;
Expand Down
9 changes: 5 additions & 4 deletions src/meta/processors/BaseProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,9 @@ class BaseProcessor {
* @param alterItems
* @return ErrorCode::E_RELATED_FULLTEXT_INDEX_EXISTS if contains
*/
nebula::cpp2::ErrorCode ftIndexCheck(const std::vector<std::string>& cols,
const std::vector<cpp2::AlterSchemaItem>& alterItems);
nebula::cpp2::ErrorCode ftIndexCheck(
const std::unordered_map<std::string, cpp2::FTIndex>& ftIndices,
const std::vector<cpp2::AlterSchemaItem>& alterItems);

/**
* @brief List all tag/edge index for given space and tag/edge id.
Expand All @@ -374,8 +375,8 @@ class BaseProcessor {
* @param tagOrEdge
* @return ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex>
*/
ErrorOr<nebula::cpp2::ErrorCode, cpp2::FTIndex> getFTIndex(GraphSpaceID spaceId,
int32_t tagOrEdge);
ErrorOr<nebula::cpp2::ErrorCode, std::unordered_map<std::string, cpp2::FTIndex>> getFTIndex(
GraphSpaceID spaceId, int32_t tagOrEdge);

/**
* @brief Check if index on given fields alredy exist.
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/AlterEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void AlterEdgeProcessor::process(const cpp2::AlterEdgeReq& req) {
auto ftIdxRet = getFTIndex(spaceId, edgeType);
if (nebula::ok(ftIdxRet)) {
auto fti = std::move(nebula::value(ftIdxRet));
auto ftStatus = ftIndexCheck(fti.get_fields(), edgeItems);
auto ftStatus = ftIndexCheck(fti, edgeItems);
if (ftStatus != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(ftStatus);
onFinished();
Expand Down
2 changes: 1 addition & 1 deletion src/meta/processors/schema/AlterTagProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ void AlterTagProcessor::process(const cpp2::AlterTagReq& req) {
auto ftIdxRet = getFTIndex(spaceId, tagId);
if (nebula::ok(ftIdxRet)) {
auto fti = std::move(nebula::value(ftIdxRet));
auto ftStatus = ftIndexCheck(fti.get_fields(), tagItems);
auto ftStatus = ftIndexCheck(fti, tagItems);
if (ftStatus != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(ftStatus);
onFinished();
Expand Down
16 changes: 8 additions & 8 deletions src/meta/processors/schema/DropEdgeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ void DropEdgeProcessor::process(const cpp2::DropEdgeReq& req) {

auto ftIdxRet = getFTIndex(spaceId, edgeType);
if (nebula::ok(ftIdxRet)) {
LOG(INFO) << "Drop edge error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}

if (nebula::error(ftIdxRet) != nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND) {
if (!nebula::value(ftIdxRet).empty()) {
LOG(INFO) << "Drop edge error, fulltext index conflict, "
<< "please delete fulltext index first.";
handleErrorCode(nebula::cpp2::ErrorCode::E_RELATED_INDEX_EXISTS);
onFinished();
return;
}
} else {
handleErrorCode(nebula::error(ftIdxRet));
onFinished();
return;
Expand Down
Loading

0 comments on commit b11f847

Please sign in to comment.