Skip to content

Commit

Permalink
improve memtracker, add missed check & remove unnecessary thenError&t…
Browse files Browse the repository at this point in the history
…ryCatch check (#5199)

* [memtracker] check code run with memoery check on all works

refine code

all code memory checked

fix lint

refine code & fix build with gcc+sanitize

* fix build break

* fix lint

* refine code

* remove debug code

* fix test fail build with debug

* fix test fail build with debug

* restore commented test

* minor

* minor
  • Loading branch information
codesigner authored Jan 6, 2023
1 parent 492e7fb commit e30bdf0
Show file tree
Hide file tree
Showing 76 changed files with 845 additions and 1,214 deletions.
1 change: 1 addition & 0 deletions cmake/nebula/SanitizerConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ if(ENABLE_ASAN)
add_compile_options(-fsanitize=address)
add_compile_options(-g)
add_compile_options(-fno-omit-frame-pointer)
add_definitions(-DENABLE_ASAN)
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address")
endif()

Expand Down
50 changes: 29 additions & 21 deletions src/clients/storage/StorageClientBase-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,21 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
return folly::collectAll(respFutures)
.deferValue([this, requests = std::move(requests), totalLatencies, hosts](
std::vector<folly::Try<StatusOr<Response>>>&& resps) {
// throw in MemoryCheckGuard verified
memory::MemoryCheckGuard guard;
StorageRpcResponse<Response> rpcResp(resps.size());
for (size_t i = 0; i < resps.size(); i++) {
auto& host = hosts->at(i);
auto& tryResp = resps[i];
std::optional<std::string> errMsg;
folly::Try<StatusOr<Response>>& tryResp = resps[i];
if (tryResp.hasException()) {
errMsg = std::string(tryResp.exception().what().c_str());
std::string errMsg = tryResp.exception().what().toStdString();
rpcResp.markFailure();
LOG(ERROR) << "There some RPC errors: " << errMsg;
auto req = requests.at(host);
auto parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
} else {
auto status = std::move(tryResp).value();
StatusOr<Response> status = std::move(tryResp).value();
if (status.ok()) {
auto resp = std::move(status).value();
auto result = resp.get_result();
Expand All @@ -128,17 +133,18 @@ StorageClientBase<ClientType, ClientManagerType>::collectResponse(
// Keep the response
rpcResp.addResponse(std::move(resp));
} else {
errMsg = std::move(status).status().message();
rpcResp.markFailure();
Status s = std::move(status).status();
nebula::cpp2::ErrorCode errorCode =
s.code() == Status::Code::kGraphMemoryExceeded
? nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED
: nebula::cpp2::ErrorCode::E_RPC_FAILURE;
LOG(ERROR) << "There some RPC errors: " << s.message();
auto req = requests.at(host);
auto parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, errorCode);
}
}

if (errMsg) {
rpcResp.markFailure();
LOG(ERROR) << "There some RPC errors: " << errMsg.value();
auto req = requests.at(host);
auto parts = getReqPartsId(req);
rpcResp.appendFailedParts(parts, nebula::cpp2::ErrorCode::E_RPC_FAILURE);
}
}

return rpcResp;
Expand All @@ -160,12 +166,16 @@ folly::Future<StatusOr<Response>> StorageClientBase<ClientType, ClientManagerTyp
auto spaceId = request.get_space_id();
return folly::via(evb)
.thenValue([remoteFunc = std::move(remoteFunc), request, evb, host, this](auto&&) {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
// NOTE: Create new channel on each thread to avoid TIMEOUT RPC error
auto client = clientsMan_->client(host, evb, false, FLAGS_storage_client_timeout_ms);
// Encoding invoke Cpp2Ops::write the request to protocol is in current thread,
// do not need to turn on in Cpp2Ops::write
return remoteFunc(client.get(), request);
})
.thenValue([spaceId, this](Response&& resp) mutable -> StatusOr<Response> {
// MemoryTrackerVerified
memory::MemoryCheckGuard guard;
auto& result = resp.get_result();
for (auto& part : result.get_failed_parts()) {
Expand Down Expand Up @@ -196,14 +206,12 @@ folly::Future<StatusOr<Response>> StorageClientBase<ClientType, ClientManagerTyp
}
return std::move(resp);
})
.thenError(folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<StatusOr<Response>>(std::bad_alloc());
})
.thenError(folly::tag_t<std::exception>{},
[](const std::exception& e) {
return folly::makeFuture<StatusOr<Response>>(std::runtime_error(e.what()));
})
.thenError(
folly::tag_t<std::bad_alloc>{},
[](const std::bad_alloc&) {
return folly::makeFuture<StatusOr<Response>>(Status::GraphMemoryExceeded(
"(%d)", static_cast<int32_t>(nebula::cpp2::ErrorCode::E_GRAPH_MEMORY_EXCEEDED)));
})
.thenError([request, host, spaceId, this](
folly::exception_wrapper&& exWrapper) mutable -> StatusOr<Response> {
stats::StatsManager::addValue(kNumRpcSentToStoragedFailed);
Expand Down
2 changes: 2 additions & 0 deletions src/codec/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ set(CODEC_TEST_LIBS
$<TARGET_OBJECTS:file_based_cluster_id_man_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:network_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:thread_obj>
$<TARGET_OBJECTS:stats_obj>
Expand Down
4 changes: 4 additions & 0 deletions src/common/base/Status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ const char *Status::toString(Code code) {
return "StatementEmpty: ";
case kSemanticError:
return "SemanticError: ";
case kGraphMemoryExceeded:
return "GraphMemoryExceeded: ";
case kKeyNotFound:
return "KeyNotFound: ";
case kPartialSuccess:
return "PartialSuccess: ";
case kStorageMemoryExceeded:
return "StorageMemoryExceeded: ";
case kSpaceNotFound:
return "SpaceNotFound: ";
case kHostNotFound:
Expand Down
5 changes: 5 additions & 0 deletions src/common/base/Status.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,15 @@ class Status final {
// Graph engine errors
STATUS_GENERATOR(SyntaxError);
STATUS_GENERATOR(SemanticError);
STATUS_GENERATOR(GraphMemoryExceeded);

// Nothing is executed When command is comment
STATUS_GENERATOR(StatementEmpty);

// Storage engine errors
STATUS_GENERATOR(KeyNotFound);
STATUS_GENERATOR(PartialSuccess);
STATUS_GENERATOR(StorageMemoryExceeded);

// Meta engine errors
// TODO(dangleptr) we could use ErrorOr to replace SpaceNotFound here.
Expand Down Expand Up @@ -166,9 +169,11 @@ class Status final {
kSyntaxError = 201,
kStatementEmpty = 202,
kSemanticError = 203,
kGraphMemoryExceeded = 204,
// 3xx, for storage engine errors
kKeyNotFound = 301,
kPartialSuccess = 302,
kStorageMemoryExceeded = 303,
// 4xx, for meta service errors
kSpaceNotFound = 404,
kHostNotFound = 405,
Expand Down
2 changes: 2 additions & 0 deletions src/common/base/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,10 @@ nebula_add_executable(
$<TARGET_OBJECTS:expression_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:ast_match_path_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
Expand Down
13 changes: 13 additions & 0 deletions src/common/datatypes/DataSetOps-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "common/base/Base.h"
#include "common/datatypes/CommonCpp2Ops.h"
#include "common/datatypes/DataSet.h"
#include "common/memory/MemoryTracker.h"

namespace apache {
namespace thrift {
Expand Down Expand Up @@ -47,7 +48,10 @@ inline constexpr protocol::TType Cpp2Ops<nebula::DataSet>::thriftType() {

template <class Protocol>
uint32_t Cpp2Ops<nebula::DataSet>::write(Protocol* proto, nebula::DataSet const* obj) {
// we do not turn on memory tracker here, when the DataSet object is creating & inserting, it is
// in Processor::process(), where memory tracker is turned on. so we think that is enough.
uint32_t xfer = 0;

xfer += proto->writeStructBegin("DataSet");

xfer += proto->writeFieldBegin("column_names", protocol::T_LIST, 1);
Expand All @@ -62,11 +66,20 @@ uint32_t Cpp2Ops<nebula::DataSet>::write(Protocol* proto, nebula::DataSet const*

xfer += proto->writeFieldStop();
xfer += proto->writeStructEnd();

return xfer;
}

template <class Protocol>
void Cpp2Ops<nebula::DataSet>::read(Protocol* proto, nebula::DataSet* obj) {
// memory usage during decode a StorageResponse should be mostly occupied
// by DataSet (see interface/storage.thrift), turn on memory check here.
//
// MemoryTrackerVerified:
// throw std::bad_alloc has verified, can be captured in
// StorageClientBase::getResponse's onError
nebula::memory::MemoryCheckGuard guard;

apache::thrift::detail::ProtocolReaderStructReadState<Protocol> readState;

readState.readStructBegin(proto);
Expand Down
1 change: 1 addition & 0 deletions src/common/datatypes/ValueOps-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ template <class Protocol>
uint32_t Cpp2Ops<nebula::Value>::write(Protocol* proto, nebula::Value const* obj) {
uint32_t xfer = 0;
xfer += proto->writeStructBegin("Value");
// MemoryTrackerVerified: throw bad_alloc verified

switch (obj->type()) {
case nebula::Value::Type::NULLVALUE: {
Expand Down
2 changes: 2 additions & 0 deletions src/common/datatypes/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ nebula_add_test(
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
Expand Down Expand Up @@ -115,6 +116,7 @@ nebula_add_test(
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
LIBRARIES
Expand Down
2 changes: 2 additions & 0 deletions src/common/expression/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:expr_ctx_mock_obj>
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
Expand All @@ -153,6 +154,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:function_manager_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
Expand Down
3 changes: 3 additions & 0 deletions src/common/geo/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ nebula_add_test(
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:datatypes_obj>
$<TARGET_OBJECTS:geo_index_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:storage_thrift_obj>
$<TARGET_OBJECTS:meta_thrift_obj>
$<TARGET_OBJECTS:common_thrift_obj>
Expand Down
3 changes: 3 additions & 0 deletions src/common/graph/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ nebula_add_test(
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:graph_obj>
$<TARGET_OBJECTS:graph_thrift_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:common_thrift_obj>
$<TARGET_OBJECTS:datatypes_obj>
$<TARGET_OBJECTS:wkt_wkb_io_obj>
Expand Down
2 changes: 2 additions & 0 deletions src/common/id/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ nebula_add_executable(
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
Expand Down Expand Up @@ -67,6 +68,7 @@ nebula_add_test(
$<TARGET_OBJECTS:agg_function_manager_obj>
$<TARGET_OBJECTS:time_utils_obj>
$<TARGET_OBJECTS:datetime_parser_obj>
$<TARGET_OBJECTS:memory_obj>
$<TARGET_OBJECTS:time_obj>
$<TARGET_OBJECTS:fs_obj>
$<TARGET_OBJECTS:meta_client_stats_obj>
Expand Down
4 changes: 4 additions & 0 deletions src/common/memory/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ void MemoryTracker::free(int64_t size) {
MemoryStats::instance().free(size);
}

bool MemoryTracker::isOn() {
return MemoryStats::instance().throwOnMemoryExceeded();
}

void MemoryTracker::allocImpl(int64_t size, bool) {
MemoryStats::instance().alloc(size);
}
Expand Down
8 changes: 8 additions & 0 deletions src/common/memory/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class MemoryStats {
threadMemoryStats_.throwOnMemoryExceeded = false;
}

// return true if current thread's throwOnMemoryExceeded'
static bool throwOnMemoryExceeded() {
return threadMemoryStats_.throwOnMemoryExceeded;
}

private:
inline ALWAYS_INLINE void allocGlobal(int64_t size) {
int64_t willBe = size + used_.fetch_add(size, std::memory_order_relaxed);
Expand Down Expand Up @@ -182,6 +187,9 @@ struct MemoryTracker {
/// This function should be called after memory deallocation.
static void free(int64_t size);

/// Test state of memory tracker, return true if memory tracker is turned on, otherwise false.
static bool isOn();

private:
static void allocImpl(int64_t size, bool throw_if_memory_exceeded);
};
Expand Down
16 changes: 5 additions & 11 deletions src/common/memory/MemoryUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
}

// MemoryStats depends on jemalloc
#if ENABLE_JEMALLOC
#ifdef ENABLE_JEMALLOC
#ifndef ENABLE_ASAN
// set MemoryStats limit (MemoryTracker track-able memory)
int64_t trackable = total - FLAGS_memory_tracker_untracked_reserved_memory_mb * MiB;
if (trackable > 0) {
Expand All @@ -134,16 +135,9 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
if (FLAGS_memory_purge_enabled) {
int64_t now = time::WallClock::fastNowInSec();
if (now - kLastPurge_ > FLAGS_memory_purge_interval_seconds) {
// mallctl seems has issue with address_sanitizer, do purge only when address_sanitizer is off
#if defined(__clang__)
#if defined(__has_feature)
#if not __has_feature(address_sanitizer)
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
#endif
#endif
#else // gcc
// jemalloc seems has issue with address_sanitizer, do purge only when address_sanitizer is
// off
mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", nullptr, nullptr, nullptr, 0);
#endif
kLastPurge_ = now;
}
}
Expand Down Expand Up @@ -173,7 +167,7 @@ StatusOr<bool> MemoryUtils::hitsHighWatermark() {
kLastPrintMemoryTrackerStats_ = now;
}
}

#endif
#endif

auto hits = (1 - available / total) > FLAGS_system_memory_high_watermark_ratio;
Expand Down
13 changes: 3 additions & 10 deletions src/common/memory/NewDelete.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,16 @@
/// Two condition need check before MemoryTracker is on
/// 1. jemalloc is used
/// MemoryTracker need jemalloc API to get accurate size of alloc/free memory.
#if ENABLE_JEMALLOC
/// 2. address_sanitizer is off
/// sanitizer has already override the new/delete operator,
/// only override new/delete operator only when address_sanitizer is off
#if defined(__clang__)
#if defined(__has_feature)
#if not __has_feature(address_sanitizer)
#ifdef ENABLE_JEMALLOC
#ifndef ENABLE_ASAN
#define ENABLE_MEMORY_TRACKER
#endif
#endif

#else // gcc
#define ENABLE_MEMORY_TRACKER
#endif
#endif

#if defined(ENABLE_MEMORY_TRACKER)
#ifdef ENABLE_MEMORY_TRACKER
/// new
void *operator new(std::size_t size) {
nebula::memory::trackMemory(size);
Expand Down
Loading

0 comments on commit e30bdf0

Please sign in to comment.