Skip to content

Commit

Permalink
Cherry pick to v3.0.0 (0110-0114) (#3730)
Browse files Browse the repository at this point in the history
* add LogMonitor to check log disk freeBytes and change log level when space is almost full (#3576)

* add LogMonitor to check the log disk is full

* fix comments

* add comments for default flags

Co-authored-by: yaphet <[email protected]>

* Ldbc test cases. (#3537)

* Add ldbc test/

* Add all cases.

* Fix some test cases.

* Fix ldbc cases.

* Fix pytest.

Co-authored-by: Yee <[email protected]>

* fix issue 3675 (#3678)

* Fix expression rewrite (#3614)

* Do not transfer the filter expression if it contains 2 lableAttribute exprs

* Fix expression overflow check

* Fix rewriteRelExpr

* Refactor rewriteRelExpr

* Fix log usage

* Check whether the minus expression contains string

* Add tck cases

* Address comments

* Fix conflicts

* Address comments

* modify metrics in conf files

* Address comments

* fix divide zone should be failed if two zones use the same host (#3699)

* Support more check about merge zone arguments (#3703)

* fix match index (#3694)

* check scheam

* add test case

* fix graph crash

* address comment'
'

* add test case

* fix error

* fix vid select error

* fix unit test error

* address comment

* fix issue 3601 (#3666)

Co-authored-by: Nivras <[email protected]>
Co-authored-by: yaphet <[email protected]>
Co-authored-by: cpw <[email protected]>
Co-authored-by: Yee <[email protected]>
Co-authored-by: Yichen Wang <[email protected]>
Co-authored-by: jimingquan <[email protected]>
Co-authored-by: hs.zhang <[email protected]>
  • Loading branch information
8 people authored Jan 17, 2022
1 parent a9d4713 commit 2fd67bc
Show file tree
Hide file tree
Showing 61 changed files with 1,927 additions and 268 deletions.
5 changes: 5 additions & 0 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ jobs:
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} tck
working-directory: tests/
timeout-minutes: 60
- name: LDBC
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} ldbc
working-directory: tests/
timeout-minutes: 60
- name: Down cluster
run: |
make RM_DIR=false down
Expand Down
5 changes: 3 additions & 2 deletions conf/nebula-graphd.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@
# System memory high watermark ratio, cancel the memory checking when the ratio greater than 1.0
--system_memory_high_watermark_ratio=0.8

########## metrics ##########
--enable_space_level_metrics=false

########## experimental feature ##########
# if use experimental features
--enable_experimental_feature=false

--enable_space_level_metrics=false
5 changes: 3 additions & 2 deletions conf/nebula-graphd.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@
# System memory high watermark ratio, cancel the memory checking when the ratio greater than 1.0
--system_memory_high_watermark_ratio=0.8

########## metrics ##########
--enable_space_level_metrics=false

########## experimental feature ##########
# if use experimental features
--enable_experimental_feature=false

--enable_space_level_metrics=false
42 changes: 16 additions & 26 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ bool MetaClient::loadData() {
GraphSpaceID spaceId = spaceInfo.first;
std::shared_ptr<SpaceInfoCache> info = spaceInfo.second;
std::shared_ptr<SpaceInfoCache> infoDeepCopy = std::make_shared<SpaceInfoCache>(*info);
infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_, &infoDeepCopy->pool_);
infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_, &infoDeepCopy->pool_);
infoDeepCopy->tagSchemas_ = buildTagSchemas(infoDeepCopy->tagItemVec_);
infoDeepCopy->edgeSchemas_ = buildEdgeSchemas(infoDeepCopy->edgeItemVec_);
infoDeepCopy->tagIndexes_ = buildIndexes(infoDeepCopy->tagIndexItemVec_);
infoDeepCopy->edgeIndexes_ = buildIndexes(infoDeepCopy->edgeIndexItemVec_);
newMetaData->localCache_[spaceId] = infoDeepCopy;
Expand Down Expand Up @@ -396,14 +396,14 @@ bool MetaClient::loadData() {
return true;
}

TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, ObjectPool* pool) {
TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec) {
TagSchemas tagSchemas;
TagID lastTagId = -1;
for (auto& tagIt : tagItemVec) {
// meta will return the different version from new to old
auto schema = std::make_shared<NebulaSchemaProvider>(tagIt.get_version());
for (const auto& colIt : tagIt.get_schema().get_columns()) {
addSchemaField(schema.get(), colIt, pool);
addSchemaField(schema.get(), colIt);
}
// handle schema property
schema->setProp(tagIt.get_schema().get_schema_prop());
Expand All @@ -417,16 +417,15 @@ TagSchemas MetaClient::buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, Ob
return tagSchemas;
}

EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec,
ObjectPool* pool) {
EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec) {
EdgeSchemas edgeSchemas;
std::unordered_set<std::pair<GraphSpaceID, EdgeType>> edges;
EdgeType lastEdgeType = -1;
for (auto& edgeIt : edgeItemVec) {
// meta will return the different version from new to old
auto schema = std::make_shared<NebulaSchemaProvider>(edgeIt.get_version());
for (const auto& col : edgeIt.get_schema().get_columns()) {
MetaClient::addSchemaField(schema.get(), col, pool);
MetaClient::addSchemaField(schema.get(), col);
}
// handle shcem property
schema->setProp(edgeIt.get_schema().get_schema_prop());
Expand All @@ -440,32 +439,19 @@ EdgeSchemas MetaClient::buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec
return edgeSchemas;
}

void MetaClient::addSchemaField(NebulaSchemaProvider* schema,
const cpp2::ColumnDef& col,
ObjectPool* pool) {
void MetaClient::addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col) {
bool hasDef = col.default_value_ref().has_value();
auto& colType = col.get_type();
size_t len = colType.type_length_ref().has_value() ? *colType.get_type_length() : 0;
cpp2::GeoShape geoShape =
colType.geo_shape_ref().has_value() ? *colType.get_geo_shape() : cpp2::GeoShape::ANY;
bool nullable = col.nullable_ref().has_value() ? *col.get_nullable() : false;
Expression* defaultValueExpr = nullptr;
std::string encoded;
if (hasDef) {
auto encoded = *col.get_default_value();
defaultValueExpr = Expression::decode(pool, folly::StringPiece(encoded.data(), encoded.size()));

if (defaultValueExpr == nullptr) {
LOG(ERROR) << "Wrong expr default value for column name: " << col.get_name();
hasDef = false;
}
encoded = *col.get_default_value();
}

schema->addField(col.get_name(),
colType.get_type(),
len,
nullable,
hasDef ? defaultValueExpr : nullptr,
geoShape);
schema->addField(col.get_name(), colType.get_type(), len, nullable, encoded, geoShape);
}

bool MetaClient::loadSchemas(GraphSpaceID spaceId,
Expand Down Expand Up @@ -493,9 +479,9 @@ bool MetaClient::loadSchemas(GraphSpaceID spaceId,
auto edgeItemVec = edgeRet.value();
allEdgeMap[spaceId] = {};
spaceInfoCache->tagItemVec_ = tagItemVec;
spaceInfoCache->tagSchemas_ = buildTagSchemas(tagItemVec, &spaceInfoCache->pool_);
spaceInfoCache->tagSchemas_ = buildTagSchemas(tagItemVec);
spaceInfoCache->edgeItemVec_ = edgeItemVec;
spaceInfoCache->edgeSchemas_ = buildEdgeSchemas(edgeItemVec, &spaceInfoCache->pool_);
spaceInfoCache->edgeSchemas_ = buildEdgeSchemas(edgeItemVec);

for (auto& tagIt : tagItemVec) {
tagNameIdMap.emplace(std::make_pair(spaceId, tagIt.get_tag_name()), tagIt.get_tag_id());
Expand Down Expand Up @@ -859,6 +845,10 @@ Status MetaClient::handleResponse(const RESP& resp) {
return Status::Error("Invalid param!");
case nebula::cpp2::ErrorCode::E_WRONGCLUSTER:
return Status::Error("Wrong cluster!");
case nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH:
return Status::Error("Zone not enough!");
case nebula::cpp2::ErrorCode::E_ZONE_IS_EMPTY:
return Status::Error("Zone is empty!");
case nebula::cpp2::ErrorCode::E_STORE_FAILURE:
return Status::Error("Store failure!");
case nebula::cpp2::ErrorCode::E_STORE_SEGMENT_ILLEGAL:
Expand Down
8 changes: 3 additions & 5 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ struct SpaceInfoCache {
std::vector<cpp2::IndexItem> edgeIndexItemVec_;
Indexes edgeIndexes_;
Listeners listeners_;
// objPool used to decode when adding field
ObjectPool pool_;
std::unordered_map<PartitionID, TermID> termOfPartition_;

SpaceInfoCache() = default;
Expand Down Expand Up @@ -816,10 +814,10 @@ class MetaClient {
ServiceClientsList serviceClientList_;
};

void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col, ObjectPool* pool);
void addSchemaField(NebulaSchemaProvider* schema, const cpp2::ColumnDef& col);

TagSchemas buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec, ObjectPool* pool);
EdgeSchemas buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec, ObjectPool* pool);
TagSchemas buildTagSchemas(std::vector<cpp2::TagItem> tagItemVec);
EdgeSchemas buildEdgeSchemas(std::vector<cpp2::EdgeItem> edgeItemVec);

std::unique_ptr<thread::GenericWorker> bgThread_;
SpaceNameIdMap spaceIndexByName_;
Expand Down
6 changes: 4 additions & 2 deletions src/codec/RowWriterV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,9 @@ WriteResult RowWriterV2::checkUnsetFields() noexcept {

WriteResult r = WriteResult::SUCCEEDED;
if (field->hasDefault()) {
auto expr = field->defaultValue()->clone();
ObjectPool pool;
auto& exprStr = field->defaultValue();
auto expr = Expression::decode(&pool, folly::StringPiece(exprStr.data(), exprStr.size()));
auto defVal = Expression::eval(expr, expCtx);
switch (defVal.type()) {
case Value::Type::NULLVALUE:
Expand Down Expand Up @@ -851,7 +853,7 @@ WriteResult RowWriterV2::checkUnsetFields() noexcept {
default:
LOG(FATAL) << "Unsupported default value type: " << defVal.typeName()
<< ", default value: " << defVal
<< ", default value expr: " << field->defaultValue()->toString();
<< ", default value expr: " << field->defaultValue();
}
} else {
// Set NULL
Expand Down
6 changes: 3 additions & 3 deletions src/codec/test/ResultSchemaProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ ResultSchemaProvider::ResultSchemaField::ResultSchemaField(std::string name,
bool nullable,
int32_t offset,
size_t nullFlagPos,
Expression* defaultValue,
std::string defaultValue,
meta::cpp2::GeoShape geoShape)
: name_(std::move(name)),
type_(type),
Expand All @@ -42,14 +42,14 @@ PropertyType ResultSchemaProvider::ResultSchemaField::type() const {
}

bool ResultSchemaProvider::ResultSchemaField::hasDefault() const {
return defaultValue_ != nullptr;
return defaultValue_ != "";
}

bool ResultSchemaProvider::ResultSchemaField::nullable() const {
return nullable_;
}

Expression* ResultSchemaProvider::ResultSchemaField::defaultValue() const {
const std::string& ResultSchemaProvider::ResultSchemaField::defaultValue() const {
return defaultValue_;
}

Expand Down
6 changes: 3 additions & 3 deletions src/codec/test/ResultSchemaProvider.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ class ResultSchemaProvider : public meta::SchemaProviderIf {
bool nullable,
int32_t offset,
size_t nullFlagPos,
Expression* defaultValue = nullptr,
std::string defaultValue = "",
meta::cpp2::GeoShape = meta::cpp2::GeoShape::ANY);

const char* name() const override;
nebula::cpp2::PropertyType type() const override;
bool nullable() const override;
bool hasDefault() const override;
Expression* defaultValue() const override;
const std::string& defaultValue() const override;
size_t size() const override;
size_t offset() const override;
size_t nullFlagPos() const override;
Expand All @@ -41,7 +41,7 @@ class ResultSchemaProvider : public meta::SchemaProviderIf {
bool nullable_;
int32_t offset_;
size_t nullFlagPos_;
Expression* defaultValue_;
std::string defaultValue_;
meta::cpp2::GeoShape geoShape_;
};

Expand Down
10 changes: 8 additions & 2 deletions src/codec/test/SchemaWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,14 @@ SchemaWriter& SchemaWriter::appendCol(folly::StringPiece name,
nullFlagPos = numNullableFields_++;
}

columns_.emplace_back(
name.toString(), type, size, nullable, offset, nullFlagPos, defaultValue, geoShape);
columns_.emplace_back(name.toString(),
type,
size,
nullable,
offset,
nullFlagPos,
defaultValue ? defaultValue->encode() : "",
geoShape);
nameIndex_.emplace(std::make_pair(hash, columns_.size() - 1));

return *this;
Expand Down
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ nebula_add_subdirectory(ssl)
nebula_add_subdirectory(geo)
nebula_add_subdirectory(memory)
nebula_add_subdirectory(id)
nebula_add_subdirectory(log)
2 changes: 2 additions & 0 deletions src/common/graph/Response.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@
X(E_CONFLICT, -2008) \
X(E_INVALID_PARM, -2009) \
X(E_WRONGCLUSTER, -2010) \
X(E_ZONE_NOT_ENOUGH, -2011) \
X(E_ZONE_IS_EMPTY, -2012) \
\
X(E_STORE_FAILURE, -2021) \
X(E_STORE_SEGMENT_ILLEGAL, -2022) \
Expand Down
8 changes: 8 additions & 0 deletions src/common/log/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright (c) 2021 vesoft inc. All rights reserved.
#
# This source code is licensed under Apache 2.0 License.

nebula_add_library(
log_monitor_obj OBJECT
LogMonitor.cpp
)
69 changes: 69 additions & 0 deletions src/common/log/LogMonitor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#include "common/log/LogMonitor.h"

namespace nebula {

// default min_warn is 256M, disk freebytes < 256M will change LOG_LEVEL to WARNING
DEFINE_uint64(log_min_reserved_bytes_to_warn,
256 * (1UL << 20),
"if freebytes in logdir less than this, will change log level to WARN");

// default min_error is 64M, disk freebytes < 64M will change LOG_LEVEL to ERROR
DEFINE_uint64(log_min_reserved_bytes_to_error,
64 * (1UL << 20),
"if freebytes in logdir less than this, will change log level to ERROR");

// default min_fatal is 4M, disk freebytes < 4M will change LOG_LEVEL to FATAL
DEFINE_uint64(log_min_reserved_bytes_to_fatal,
4 * (1UL << 20),
"if freebytes in logdir less than this, will change log level to FATAL");

// default check log_disk interval is 10s
DEFINE_int32(log_disk_check_interval_secs, 10, "interval to check free space of log path");

void LogMonitor::getLogDiskFreeByte() {
boost::system::error_code ec;
auto info = boost::filesystem::space(FLAGS_log_dir, ec);
if (!ec) {
freeByte_ = info.available;
} else {
LOG(WARNING) << "Get filesystem info of logdir: " << FLAGS_log_dir << " failed";
}
}

void LogMonitor::checkAndChangeLogLevel() {
getLogDiskFreeByte();

if (FLAGS_log_min_reserved_bytes_to_fatal > FLAGS_log_min_reserved_bytes_to_error ||
FLAGS_log_min_reserved_bytes_to_fatal > FLAGS_log_min_reserved_bytes_to_warn ||
FLAGS_log_min_reserved_bytes_to_error > FLAGS_log_min_reserved_bytes_to_warn) {
LOG(ERROR) << "Get Invalid config in LogMonitor, the LogMonitor config should be "
<< "FLAGS_log_min_reserved_bytes_to_warn >"
<< "FLAGS_log_min_reserved_bytes_to_error > FLAGS_log_min_reserved_bytes_to_fatal;";
return;
}

if (freeByte_ < FLAGS_log_min_reserved_bytes_to_fatal) {
LOG(ERROR) << "log disk freebyte is less than " << FLAGS_log_min_reserved_bytes_to_fatal
<< ", change log level to FATAL";
FLAGS_minloglevel = google::GLOG_FATAL;
} else if (freeByte_ < FLAGS_log_min_reserved_bytes_to_error) {
LOG(ERROR) << "log disk freebyte is less than " << FLAGS_log_min_reserved_bytes_to_error
<< ", change log level to ERROR";
FLAGS_minloglevel = google::GLOG_ERROR;
} else if (freeByte_ < FLAGS_log_min_reserved_bytes_to_warn) {
LOG(ERROR) << "log disk freebyte is less than " << FLAGS_log_min_reserved_bytes_to_warn
<< ", change log level to WARNING";
FLAGS_minloglevel = google::GLOG_WARNING;
} else {
// if freeByte_ is bigger than every min_log_flag, reset the FLAGS_minloglevel to old value
if (FLAGS_minloglevel != oldMinLogLevel_) {
FLAGS_minloglevel = oldMinLogLevel_;
}
}
}

} // namespace nebula
43 changes: 43 additions & 0 deletions src/common/log/LogMonitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/
#pragma once

#include <boost/filesystem.hpp>
#include <boost/system/error_code.hpp>

#include "common/thread/GenericWorker.h"

namespace nebula {

DECLARE_uint64(log_min_reserved_bytes_to_warn);
DECLARE_uint64(log_min_reserved_bytes_to_error);
DECLARE_uint64(log_min_reserved_bytes_to_fatal);
DECLARE_int32(log_disk_check_interval_secs);

class LogMonitor {
public:
LogMonitor() : oldMinLogLevel_(FLAGS_minloglevel), freeByte_(1UL << 60) {
worker_ = std::make_shared<thread::GenericWorker>();
CHECK(worker_->start());
worker_->addRepeatTask(
FLAGS_log_disk_check_interval_secs * 1000, &LogMonitor::checkAndChangeLogLevel, this);
}

~LogMonitor() {
worker_->stop();
worker_->wait();
}

void getLogDiskFreeByte();

void checkAndChangeLogLevel();

private:
int32_t oldMinLogLevel_;
std::shared_ptr<thread::GenericWorker> worker_;
std::atomic_uint64_t freeByte_;
};

} // namespace nebula
Loading

0 comments on commit 2fd67bc

Please sign in to comment.