Skip to content

Commit

Permalink
Storage cache support for Nebula (Part 1) (vesoft-inc#383)
Browse files Browse the repository at this point in the history
* add cachelib and storage cache class

include new files

change cmakelist

update thrift

fix build

fix build

fix build

add constructor for struct

clean and add test

add cmake related

fix test

trying cmake

add more test and cmake change

cmake trying out failed and revert
fix test bug

now fix cmake
add a test

fix poolname bug
delete a test

add lock

update conf

* update format and change name

* fix typo in cmake

* remove unneeded lib in cmake

* fix cmake

* fix comment
change to use & in put

* add some log and update signature of functions in storage cache

* add function getting cache hit count

* update config

* update config

* change the vertex pool ttl default value

* format

* fix typo

* change to use error code

* fix typo

* typo

* update error code value

* add lrt

Co-authored-by: Doodle <[email protected]>
  • Loading branch information
wenhaocs and critical27 authored Jan 5, 2022
1 parent 5e28a35 commit b165b6f
Show file tree
Hide file tree
Showing 12 changed files with 610 additions and 0 deletions.
40 changes: 40 additions & 0 deletions cmake/FindCacheLib.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# - Try to find CacheLib includes dirs and libraries
#
# Usage of this module as follows:
#
# find_package(CacheLib)
#
# Variables used by this module, they can change the default behaviour and need
# to be set before calling find_package:
#
# Variables defined by this module:
#
# CacheLib_FOUND System has CacheLib, include and lib dirs found
# CacheLib_INCLUDE_DIR The CacheLib includes directories.
# CacheLib_LIBRARIES The CacheLib libraries.

find_path(CacheLib_INCLUDE_DIR NAMES cachelib)
find_library(CacheLib_ALLOCATOR NAMES libcachelib_allocator.a)
find_library(CacheLib_DATATYPE NAMES libcachelib_datatype.a)
find_library(CacheLib_SHM NAMES libcachelib_shm.a)
find_library(CacheLib_COMMON NAMES libcachelib_common.a)
find_library(CacheLib_NAVY NAMES libcachelib_navy.a)


if(CacheLib_INCLUDE_DIR
AND CacheLib_ALLOCATOR
AND CacheLib_DATATYPE
AND CacheLib_SHM
AND CacheLib_COMMON
AND CacheLib_NAVY)
set(CacheLib_FOUND TRUE)
set(CacheLib_LIBRARIES ${CacheLib_DATATYPE} ${CacheLib_COMMON} ${CacheLib_SHM} ${CacheLib_ALLOCATOR} ${CacheLib_NAVY})
mark_as_advanced(
CacheLib_INCLUDE_DIR
CacheLib_LIBRARIES
)
endif()

if(NOT CacheLib_FOUND)
message(FATAL_ERROR "CacheLib doesn't exist")
endif()
1 change: 1 addition & 0 deletions cmake/nebula/LinkerConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ nebula_add_exe_linker_flag(-static-libstdc++)
nebula_add_exe_linker_flag(-static-libgcc)
nebula_add_exe_linker_flag(-no-pie)
nebula_add_exe_linker_flag(-rdynamic)
nebula_add_exe_linker_flag(-lrt)

if(NOT ${CMAKE_BUILD_TYPE} STREQUAL "Debug")
add_definitions(-D_FORTIFY_SOURCE=2)
Expand Down
2 changes: 2 additions & 0 deletions cmake/nebula/ThirdPartyConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ find_package(Sodium REQUIRED)
if (${CMAKE_HOST_SYSTEM_PROCESSOR} MATCHES "x86_64")
find_package(Breakpad REQUIRED)
endif()
find_package(CacheLib REQUIRED)

set(CMAKE_EXE_LINKER_FLAGS "-L ${NEBULA_THIRDPARTY_ROOT}/lib ${CMAKE_EXE_LINKER_FLAGS}")
set(CMAKE_EXE_LINKER_FLAGS "-L ${NEBULA_THIRDPARTY_ROOT}/lib64 ${CMAKE_EXE_LINKER_FLAGS}")
Expand Down Expand Up @@ -143,6 +144,7 @@ set(PROXYGEN_LIBRARIES
)

set(ROCKSDB_LIBRARIES ${Rocksdb_LIBRARY})
set(CACHELIB_LIBRARIES ${CacheLib_LIBRARIES})

# All compression libraries
set(COMPRESSION_LIBRARIES bz2 snappy zstd z lz4)
Expand Down
14 changes: 14 additions & 0 deletions conf/nebula-storaged.conf.default
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,17 @@
--rocksdb_column_family_options={"write_buffer_size":"67108864","max_write_buffer_number":"4","max_bytes_for_level_base":"268435456"}
# rocksdb BlockBasedTableOptions in json, each name and value of option is string, given as "option_name":"option_value" separated by comma
--rocksdb_block_based_table_options={"block_size":"8192"}

############## storage cache ##############
# Whether to enable storage cache
--enable_storage_cache=false
# Total capacity reserved for storage in memory cache in MB
--storage_cache_capacity=0
# Number of buckets in base 2 logarithm. E.g., in case of 10, the total number of buckets will be 2^10.
--storage_cache_buckets_power=10
# Number of locks in base 2 logarithm. E.g., in case of 5, the total number of locks will be 2^5.
--storage_cache_locks_power=5
# Vertex pool size in MB
--vertex_pool_capacity=50
# TTL in seconds for vertex items in the cache
--vertex_item_ttl=300
14 changes: 14 additions & 0 deletions conf/nebula-storaged.conf.production
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@
# Whether to garbage collect blobs during compaction
--rocksdb_enable_blob_garbage_collection=true

############## storage cache ##############
# Whether to enable storage cache
--enable_storage_cache=false
# Total capacity reserved for storage in memory cache in MB
--storage_cache_capacity=0
# Number of buckets in base 2 logarithm. E.g., in case of 10, the total number of buckets will be 2^10.
--storage_cache_buckets_power=10
# Number of locks in base 2 logarithm. E.g., in case of 5, the total number of locks will be 2^5.
--storage_cache_locks_power=5
# Vertex pool size in MB
--vertex_pool_capacity=50
# TTL in seconds for vertex items in the cache
--vertex_item_ttl=300

############### misc ####################
--snapshot_part_rate_limit=10485760
--snapshot_batch_size=1048576
Expand Down
178 changes: 178 additions & 0 deletions src/common/base/CacheLibLRU.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

#ifndef COMMON_BASE_CACHELIBLRU_H_
#define COMMON_BASE_CACHELIBLRU_H_

#include <cachelib/allocator/CacheAllocator.h>

#include "common/base/Base.h"
#include "common/base/ErrorOr.h"
#include "interface/gen-cpp2/common_types.h"

namespace nebula {

using Cache = facebook::cachelib::LruAllocator;

class CacheLibLRU {
public:
explicit CacheLibLRU(std::string name,
uint32_t capacity,
uint32_t bucketsPower,
uint32_t locksPower)
: name_(name), capacity_(capacity), bucketsPower_(bucketsPower), locksPower_(locksPower) {}

/**
* @brief Create cache instance. If there is any exception, we will allow the process continue.
*
* @param poolName: the pool to allocate cache space
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode initializeCache() {
Cache::Config config;
try {
config
// size cannot exceed the maximum cache size (274'877'906'944 bytes)
.setCacheSize(capacity_ * 1024 * 1024)
.setCacheName(name_)
.setAccessConfig({bucketsPower_, locksPower_})
.validate(); // will throw if bad config
} catch (const std::exception& e) {
// We do not stop the service. Users should refer to the log to determine whether to restart
// the service.
LOG(ERROR) << "Cache configuration error: " << e.what();
return nebula::cpp2::ErrorCode::E_UNKNOWN;
}
nebulaCache_ = std::make_unique<Cache>(config);

return nebula::cpp2::ErrorCode::SUCCEEDED;
}

/**
* @brief add cache pool into cache instance
*
* @param poolName
* @param poolSize
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode addPool(std::string poolName, uint32_t poolSize) {
if (poolIdMap_.find(poolName) != poolIdMap_.end()) {
LOG(ERROR) << "Cache pool creation error. Cache pool exists: " << poolName.data();
return nebula::cpp2::ErrorCode::E_EXISTED;
}
try {
auto poolId = nebulaCache_->addPool(poolName, poolSize * 1024 * 1024);
poolIdMap_[poolName] = poolId;
} catch (const std::exception& e) {
LOG(ERROR) << "Adding cache pool error: " << e.what();
return nebula::cpp2::ErrorCode::E_NOT_ENOUGH_SPACE;
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

/**
* @brief Get key from cache. Return true if found.
*
* @param key
* @return Error (cache miss) or value
*/
ErrorOr<nebula::cpp2::ErrorCode, std::string> get(const std::string& key) {
auto itemHandle = nebulaCache_->find(key);
if (itemHandle) {
return std::string(reinterpret_cast<const char*>(itemHandle->getMemory()),
itemHandle->getSize());
}
VLOG(3) << "Cache miss: " << key << " Not Found";
return nebula::cpp2::ErrorCode::E_CACHE_MISS;
}

/**
* @brief Insert or update value in cache pool
*
* @param key
* @param value
* @param poolName: The pool name to insert/update cache item
* @param ttl
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode put(const std::string& key,
const std::string& value,
std::string poolName,
uint32_t ttl = 300) {
if (poolIdMap_.find(poolName) == poolIdMap_.end()) {
LOG(ERROR) << "Cache write error. Pool does not exist: " << poolName.data();
return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND;
}
auto itemHandle = nebulaCache_->allocate(poolIdMap_[poolName], key, value.size(), ttl);
if (!itemHandle) {
LOG(ERROR) << "Cache write error. Too many pending writes.";
return nebula::cpp2::ErrorCode::E_CACHE_WRITE_FAILURE;
}

{
std::unique_lock<std::shared_mutex> guard(lock_);
std::memcpy(itemHandle->getMemory(), value.data(), value.size());
nebulaCache_->insertOrReplace(itemHandle);
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

/**
* @brief CacheLib will first search for the key. If found, remove it.
* Note here we do not log anything if not found, as it can have a good chance that an item is not
* in the cache.
*
* @param key
* @return nebula::cpp2::ErrorCode
*/
nebula::cpp2::ErrorCode invalidateItem(const std::string& key) {
std::unique_lock<std::shared_mutex> guard(lock_);
nebulaCache_->remove(key);
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

/**
* @brief Get the configured size of the pool
*
* @param poolName
* @return Error (pool not existing) or unit64_t
*/
ErrorOr<nebula::cpp2::ErrorCode, uint64_t> getConfiguredPoolSize(const std::string& poolName) {
if (poolIdMap_.find(poolName) == poolIdMap_.end()) {
LOG(ERROR) << "Get cache pool size error. Pool does not exist: " << poolName.data();
return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND;
}
return nebulaCache_->getPoolStats(poolIdMap_[poolName]).poolSize;
}

/**
* @brief Get the count of cache hit of a pool
*
* @param poolName
* @return Error (pool not existing) or unit64_t
*/
ErrorOr<nebula::cpp2::ErrorCode, uint64_t> getPoolCacheHitCount(const std::string& poolName) {
if (poolIdMap_.find(poolName) == poolIdMap_.end()) {
LOG(ERROR) << "Get cache hit count error. Pool does not exist: " << poolName.data();
return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND;
}
return nebulaCache_->getPoolStats(poolIdMap_[poolName]).numPoolGetHits;
}

private:
std::unique_ptr<Cache> nebulaCache_ = nullptr;
std::unordered_map<std::string, facebook::cachelib::PoolId> poolIdMap_;
std::string name_;
uint32_t capacity_ = 0; // in MB
uint32_t bucketsPower_ = 25; // bucketsPower number of buckets in base 2 logarithm
uint32_t locksPower_ = 2; // locksPower number of locks in base 2 logarithm

// CacheLib does not protect data at item level. We need to synchronize the access.
mutable std::shared_mutex lock_;
};

} // namespace nebula

#endif // COMMON_BASE_CACHELIBLRU_H_
7 changes: 7 additions & 0 deletions src/interface/common.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -487,5 +487,12 @@ enum ErrorCode {
E_REQ_CONFLICT = -4007,
E_DATA_ILLEGAL = -4008,

// 5xxx for cache
E_CACHE_CONFIG_ERROR = -5001,
E_NOT_ENOUGH_SPACE = -5002,
E_CACHE_MISS = -5003,
E_POOL_NOT_FOUND = -5004,
E_CACHE_WRITE_FAILURE = -5005,

E_UNKNOWN = -8000,
} (cpp.enum_strict)
5 changes: 5 additions & 0 deletions src/storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,10 @@ nebula_add_library(
StorageServer.cpp
)

nebula_add_library(
storage_cache_obj OBJECT
cache/StorageCache.cpp
)

nebula_add_subdirectory(stats)
nebula_add_subdirectory(test)
Loading

0 comments on commit b165b6f

Please sign in to comment.