Skip to content

Commit

Permalink
fix issue 3601 (#3666)
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs authored and Sophie-Xie committed Jan 15, 2022
1 parent c4fdd29 commit f888fcb
Show file tree
Hide file tree
Showing 20 changed files with 148 additions and 95 deletions.
38 changes: 12 additions & 26 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ bool MetaClient::loadData() {
GraphSpaceID spaceId = spaceInfo.first;
std::shared_ptr<SpaceInfoCache> info = spaceInfo.second;
std::shared_ptr<SpaceInfoCache> infoDeepCopy = std::make_shared<SpaceInfoCache>(*info);
infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_);
infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_);
infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_);
infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_);
infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_);
infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_);
newMetaData->localCache_[spaceId] = infoDeepCopy;
Expand Down Expand Up @@ -396,14 +396,14 @@ bool MetaClient::loadData() {
return true;
}

TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, ObjectPool* pool) {
TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec) {
TagSchemas tagSchemas;
TagID lastTagId = -1;
for (auto& tagIt : tagItemVec) {
// meta will return the different version from new to old
auto schema = std::make_shared<NebulaSchemaProvider>(tagIt.get_version());
for (const auto& colIt : tagIt.get_schema().get_columns()) {
addSchemaField(schema.get(), colIt, pool);
addSchemaField(schema.get(), colIt);
}
// handle schema property
schema->setProp(tagIt.get_schema().get_schema_prop());
Expand All @@ -417,16 +417,15 @@ TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, Ob
return tagSchemas;
}

EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec,
ObjectPool* pool) {
EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec) {
EdgeSchemas edgeSchemas;
std::unordered_set<std::pair<GraphSpaceID, EdgeType>> edges;
EdgeType lastEdgeType = -1;
for (auto& edgeIt : edgeItemVec) {
// meta will return the different version from new to old
auto schema = std::make_shared<NebulaSchemaProvider>(edgeIt.get_version());
for (const auto& col : edgeIt.get_schema().get_columns()) {
MetaClient::addSchemaField(schema.get(), col, pool);
MetaClient::addSchemaField(schema.get(), col);
}
// handle shcem property
schema->setProp(edgeIt.get_schema().get_schema_prop());
Expand All @@ -440,32 +439,19 @@ EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec
return edgeSchemas;
}

void MetaClient::addSchemaField(NebulaSchemaProvider* schema,
const cpp2::ColumnDef& col,
ObjectPool* pool) {
void MetaClient::addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col) {
bool hasDef = col.default_value_ref().has_value();
auto& colType = col.get_type();
size_t len = colType.type_length_ref().has_value() ? *colType.get_type_length() : 0;
cpp2::GeoShape geoShape =
colType.geo_shape_ref().has_value() ? *colType.get_geo_shape() : cpp2::GeoShape::ANY;
bool nullable = col.nullable_ref().has_value() ? *col.get_nullable() : false;
Expression* defaultValueExpr = nullptr;
std::string encoded;
if (hasDef) {
auto encoded = *col.get_default_value();
defaultValueExpr = Expression::decode(pool, folly::StringPiece(encoded.data(), encoded.size()));

if (defaultValueExpr == nullptr) {
LOG(ERROR) << "Wrong expr default value for column name: " << col.get_name();
hasDef = false;
}
encoded = *col.get_default_value();
}

schema->addField(col.get_name(),
colType.get_type(),
len,
nullable,
hasDef ? defaultValueExpr : nullptr,
geoShape);
schema->addField(col.get_name(), colType.get_type(), len, nullable, encoded, geoShape);
}

bool MetaClient::loadSchemas(GraphSpaceID spaceId,
Expand Down Expand Up @@ -493,9 +479,9 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId,
auto edgeItemVec = edgeRet.value();
allEdgeMap[spaceId] = {};
spaceInfoCache->tagItemVec_ = tagItemVec;
spaceInfoCache->tagSchemas_ = buildTagSchemas(tagItemVec, &spaceInfoCache->pool_);
spaceInfoCache->tagSchemas_ = buildTagSchemas(tagItemVec);
spaceInfoCache->edgeItemVec_ = edgeItemVec;
spaceInfoCache->edgeSchemas_ = buildEdgeSchemas(edgeItemVec, &spaceInfoCache->pool_);
spaceInfoCache->edgeSchemas_ = buildEdgeSchemas(edgeItemVec);

for (auto& tagIt : tagItemVec) {
tagNameIdMap.emplace(std::make_pair(spaceId, tagIt.get_tag_name()), tagIt.get_tag_id());
Expand Down
8 changes: 3 additions & 5 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ struct SpaceInfoCache {
std::vector<cpp2::IndexItem> edgeIndexItemVec_;
Indexes edgeIndexes_;
Listeners listeners_;
// objPool used to decode when adding field
ObjectPool pool_;
std::unordered_map<PartitionID, TermID> termOfPartition_;

SpaceInfoCache() = default;
Expand Down Expand Up @@ -816,10 +814,10 @@ class MetaClient {
ServiceClientsList serviceClientList_;
};

void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool);
void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col);

TagSchemas buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, ObjectPool* pool);
EdgeSchemas buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec, ObjectPool* pool);
TagSchemas buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec);
EdgeSchemas buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec);

std::unique_ptr<thread::GenericWorker> bgThread_;
SpaceNameIdMap spaceIndexByName_;
Expand Down
6 changes: 4 additions & 2 deletions src/codec/RowWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,9 @@ WriteResult RowWriterV2::checkUnsetFields() noexcept {

WriteResult r = WriteResult::SUCCEEDED;
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
ObjectPool pool;
auto& exprStr = field->defaultValue();
auto expr = Expression::decode(&pool, folly::StringPiece(exprStr.data(), exprStr.size()));
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::NULLVALUE:
Expand Down Expand Up @@ -851,7 +853,7 @@ WriteResult RowWriterV2::checkUnsetFields() noexcept {
default:
LOG(FATAL) << "Unsupported default value type: " << defVal.typeName()
<< ", default value: " << defVal
<< ", default value expr: " << field->defaultValue()->toString();
<< ", default value expr: " << field->defaultValue();
}
} else {
// Set NULL
Expand Down
6 changes: 3 additions & 3 deletions src/codec/test/ResultSchemaProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ResultSchemaProvider::ResultSchemaField::ResultSchemaField(std::string name,
bool nullable,
int32_t offset,
size_t nullFlagPos,
Expression* defaultValue,
std::string defaultValue,
meta::cpp2::GeoShape geoShape)
: name_(std::move(name)),
type_(type),
Expand All @@ -42,14 +42,14 @@ PropertyType ResultSchemaProvider::ResultSchemaField::type() const {
}

bool ResultSchemaProvider::ResultSchemaField::hasDefault() const {
return defaultValue_ != nullptr;
return defaultValue_ != "";
}

bool ResultSchemaProvider::ResultSchemaField::nullable() const {
return nullable_;
}

Expression* ResultSchemaProvider::ResultSchemaField::defaultValue() const {
const std::string& ResultSchemaProvider::ResultSchemaField::defaultValue() const {
return defaultValue_;
}

Expand Down
6 changes: 3 additions & 3 deletions src/codec/test/ResultSchemaProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ class ResultSchemaProvider : public meta::SchemaProviderIf {
bool nullable,
int32_t offset,
size_t nullFlagPos,
Expression* defaultValue = nullptr,
std::string defaultValue = "",
meta::cpp2::GeoShape = meta::cpp2::GeoShape::ANY);

const char* name() const override;
nebula::cpp2::PropertyType type() const override;
bool nullable() const override;
bool hasDefault() const override;
Expression* defaultValue() const override;
const std::string& defaultValue() const override;
size_t size() const override;
size_t offset() const override;
size_t nullFlagPos() const override;
Expand All @@ -41,7 +41,7 @@ class ResultSchemaProvider : public meta::SchemaProviderIf {
bool nullable_;
int32_t offset_;
size_t nullFlagPos_;
Expression* defaultValue_;
std::string defaultValue_;
meta::cpp2::GeoShape geoShape_;
};

Expand Down
10 changes: 8 additions & 2 deletions src/codec/test/SchemaWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,14 @@ SchemaWriter& SchemaWriter::appendCol(folly::StringPiece name,
nullFlagPos = numNullableFields_++;
}

columns_.emplace_back(
name.toString(), type, size, nullable, offset, nullFlagPos, defaultValue, geoShape);
columns_.emplace_back(name.toString(),
type,
size,
nullable,
offset,
nullFlagPos,
defaultValue ? defaultValue->encode() : "",
geoShape);
nameIndex_.emplace(std::make_pair(hash, columns_.size() - 1));

return *this;
Expand Down
4 changes: 2 additions & 2 deletions src/common/meta/NebulaSchemaProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void NebulaSchemaProvider::addField(folly::StringPiece name,
PropertyType type,
size_t fixedStrLen,
bool nullable,
Expression* defaultValue,
std::string defaultValue,
cpp2::GeoShape geoShape) {
size_t size = fieldSize(type, fixedStrLen);

Expand All @@ -116,7 +116,7 @@ void NebulaSchemaProvider::addField(folly::StringPiece name,
fields_.emplace_back(name.toString(),
type,
nullable,
defaultValue != nullptr,
defaultValue != "",
defaultValue,
size,
offset,
Expand Down
8 changes: 4 additions & 4 deletions src/common/meta/NebulaSchemaProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class NebulaSchemaProvider : public SchemaProviderIf {
nebula::cpp2::PropertyType type,
bool nullable,
bool hasDefault,
Expression* defaultValue,
std::string defaultValue,
size_t size,
size_t offset,
size_t nullFlagPos,
Expand Down Expand Up @@ -56,7 +56,7 @@ class NebulaSchemaProvider : public SchemaProviderIf {
return hasDefault_;
}

Expression* defaultValue() const override {
const std::string& defaultValue() const override {
return defaultValue_;
}

Expand All @@ -82,7 +82,7 @@ class NebulaSchemaProvider : public SchemaProviderIf {
nebula::cpp2::PropertyType type_;
bool nullable_;
bool hasDefault_;
Expression* defaultValue_;
std::string defaultValue_;
size_t size_;
size_t offset_;
size_t nullFlagPos_;
Expand Down Expand Up @@ -113,7 +113,7 @@ class NebulaSchemaProvider : public SchemaProviderIf {
nebula::cpp2::PropertyType type,
size_t fixedStrLen = 0,
bool nullable = false,
Expression* defaultValue = nullptr,
std::string defaultValue = "",
cpp2::GeoShape geoShape = cpp2::GeoShape::ANY);

static std::size_t fieldSize(nebula::cpp2::PropertyType type, std::size_t fixedStrLimit);
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/SchemaProviderIf.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SchemaProviderIf {
virtual nebula::cpp2::PropertyType type() const = 0;
virtual bool nullable() const = 0;
virtual bool hasDefault() const = 0;
virtual Expression* defaultValue() const = 0;
virtual const std::string& defaultValue() const = 0;
// This method returns the number of bytes the field will occupy
// when the field is persisted on the storage medium
// For the variant length string, the size will return 8
Expand Down
5 changes: 4 additions & 1 deletion src/common/utils/IndexKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <thrift/lib/cpp2/protocol/Serializer.h>

#include "common/expression/Expression.h"
#include "common/geo/GeoIndex.h"
#include "common/utils/DefaultValueContext.h"

Expand Down Expand Up @@ -212,7 +213,9 @@ StatusOr<Value> IndexKeyUtils::readValueWithLatestSche(RowReader* reader,
}
if (field->hasDefault()) {
DefaultValueContext expCtx;
auto expr = field->defaultValue()->clone();
ObjectPool pool;
auto& exprStr = field->defaultValue();
auto expr = Expression::decode(&pool, folly::StringPiece(exprStr.data(), exprStr.size()));
return Expression::eval(expr, expCtx);
} else if (field->nullable()) {
return NullType::__NULL__;
Expand Down
15 changes: 15 additions & 0 deletions src/common/utils/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ nebula_add_test(
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:geo_index_obj>
$<TARGET_OBJECTS:expression_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
LIBRARIES
gtest
${THRIFT_LIBRARIES}
Expand All @@ -33,6 +38,11 @@ nebula_add_test(
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:geo_index_obj>
$<TARGET_OBJECTS:expression_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
LIBRARIES
gtest
${THRIFT_LIBRARIES}
Expand All @@ -56,6 +66,11 @@ nebula_add_test(
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:geo_index_obj>
$<TARGET_OBJECTS:expression_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
LIBRARIES
gtest
${THRIFT_LIBRARIES}
Expand Down
8 changes: 4 additions & 4 deletions src/graph/util/SchemaUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ Status SchemaUtil::validateProps(const std::vector<SchemaPropItem *> &schemaProp

// static
std::shared_ptr<const meta::NebulaSchemaProvider> SchemaUtil::generateSchemaProvider(
ObjectPool *pool, const SchemaVer ver, const meta::cpp2::Schema &schema) {
const SchemaVer ver, const meta::cpp2::Schema &schema) {
auto schemaPtr = std::make_shared<meta::NebulaSchemaProvider>(ver);
for (auto col : schema.get_columns()) {
bool hasDef = col.default_value_ref().has_value();
Expression *defaultValueExpr = nullptr;
std::string exprStr;
if (hasDef) {
defaultValueExpr = Expression::decode(pool, *col.default_value_ref());
exprStr = *col.default_value_ref();
}
schemaPtr->addField(col.get_name(),
col.get_type().get_type(),
col.type.type_length_ref().value_or(0),
col.nullable_ref().value_or(false),
hasDef ? defaultValueExpr : nullptr,
exprStr,
col.type.geo_shape_ref().value_or(meta::cpp2::GeoShape::ANY));
}
return schemaPtr;
Expand Down
2 changes: 1 addition & 1 deletion src/graph/util/SchemaUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SchemaUtil final {
meta::cpp2::Schema& schema);

static std::shared_ptr<const meta::NebulaSchemaProvider> generateSchemaProvider(
ObjectPool* pool, const SchemaVer ver, const meta::cpp2::Schema& schema);
const SchemaVer ver, const meta::cpp2::Schema& schema);

static Status setTTLDuration(SchemaPropItem* schemaProp, meta::cpp2::Schema& schema);

Expand Down
6 changes: 2 additions & 4 deletions src/graph/validator/MaintainValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,7 @@ Status CreateTagValidator::validateImpl() {
NG_RETURN_IF_ERROR(validateColumns(sentence->columnSpecs(), schema));
NG_RETURN_IF_ERROR(SchemaUtil::validateProps(sentence->getSchemaProps(), schema));
// Save the schema in validateContext
auto pool = qctx_->objPool();
auto schemaPro = SchemaUtil::generateSchemaProvider(pool, 0, schema);
auto schemaPro = SchemaUtil::generateSchemaProvider(0, schema);
vctx_->addSchema(name, schemaPro);
createCtx_->name = std::move(name);
createCtx_->schema = std::move(schema);
Expand All @@ -180,8 +179,7 @@ Status CreateEdgeValidator::validateImpl() {
NG_RETURN_IF_ERROR(validateColumns(sentence->columnSpecs(), schema));
NG_RETURN_IF_ERROR(SchemaUtil::validateProps(sentence->getSchemaProps(), schema));
// Save the schema in validateContext
auto pool = qctx_->objPool();
auto schemaPro = SchemaUtil::generateSchemaProvider(pool, 0, schema);
auto schemaPro = SchemaUtil::generateSchemaProvider(0, schema);
vctx_->addSchema(name, schemaPro);
createCtx_->name = std::move(name);
createCtx_->schema = std::move(schema);
Expand Down
9 changes: 6 additions & 3 deletions src/graph/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,12 @@ Status InsertEdgesValidator::prepareEdges() {
auto iter = std::find(propNames_.begin(), propNames_.end(), propName);
if (iter == propNames_.end()) {
if (field->hasDefault()) {
auto *defaultValue = field->defaultValue();
DCHECK(!!defaultValue);
auto v = defaultValue->eval(QueryExpressionContext()(nullptr));
auto &defaultValue = field->defaultValue();
DCHECK(!defaultValue.empty());
ObjectPool pool;
auto expr = Expression::decode(
&pool, folly::StringPiece(defaultValue.data(), defaultValue.size()));
auto v = expr->eval(QueryExpressionContext()(nullptr));
entirePropValues.emplace_back(v);
} else {
if (!field->nullable()) {
Expand Down
Loading

0 comments on commit f888fcb

Please sign in to comment.