From c20297f8a5abbddbf9c9398b9ae0fd14024b4304 Mon Sep 17 00:00:00 2001 From: haowen <19355821+wenhaocs@users.noreply.github.com> Date: Tue, 2 Aug 2022 18:28:12 -0700 Subject: [PATCH] Add non-volatile cache to reduce the cost of storage (#1099) * runnable navycache add nvm cache lookup draft done add some comments rename file and add comments finish core funcs unit test done adjust the dram size and now nvm cache test can pass return nullptr when itemhandle is nullptr on Lookup() changed the interface multiple rocksdb share the same secondary instance on the same node for debug set the default secondary cache size to 0 to meta will not use it change gflag type add UUL add bucket power and lock power to conf move impl from CahcelibLRU.h to cpp and clean * type and config * remove comment * cmake * add override * change the default cache file size to 0 * set cache size unsigned long * rebase and add comments * rebase Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- conf/nebula-standalone.conf.default | 12 ++ conf/nebula-storaged.conf.default | 12 ++ conf/nebula-storaged.conf.production | 12 ++ src/common/base/CMakeLists.txt | 5 + src/common/base/CacheLibLRU.cpp | 137 +++++++++++++++++++ src/common/base/CacheLibLRU.h | 172 +++++++++--------------- src/daemons/CMakeLists.txt | 1 + src/drainer/test/CMakeLists.txt | 1 + src/kvstore/RocksEngineConfig.cpp | 13 +- src/kvstore/cache/CMakeLists.txt | 1 + src/kvstore/cache/NVCache.cpp | 113 ++++++++++++++++ src/kvstore/cache/NVCache.h | 95 +++++++++++++ src/kvstore/cache/NVCacheResultHandle.h | 73 ++++++++++ src/kvstore/cache/NVUtils.h | 42 ++++++ src/kvstore/cache/StorageCache.cpp | 15 ++- src/kvstore/cache/StorageCache.h | 4 +- src/kvstore/cache/test/CMakeLists.txt | 17 +++ src/kvstore/cache/test/NVCacheTest.cpp | 147 ++++++++++++++++++++ src/kvstore/test/CMakeLists.txt | 1 + src/kvstore/test/RocksEngineTest.cpp | 70 ++++++++++ src/meta/CMakeLists.txt | 1 + src/storage/test/CMakeLists.txt | 1 + src/tools/CMakeLists.txt | 1 + 23 files changed, 830 insertions(+), 116 deletions(-) create mode 100644 src/common/base/CacheLibLRU.cpp create mode 100644 src/kvstore/cache/NVCache.cpp create mode 100644 src/kvstore/cache/NVCache.h create mode 100644 src/kvstore/cache/NVCacheResultHandle.h create mode 100644 src/kvstore/cache/NVUtils.h create mode 100644 src/kvstore/cache/test/NVCacheTest.cpp diff --git a/conf/nebula-standalone.conf.default b/conf/nebula-standalone.conf.default index d7f05dafb85..57c584a3fac 100644 --- a/conf/nebula-standalone.conf.default +++ b/conf/nebula-standalone.conf.default @@ -159,3 +159,15 @@ --meta_data_path=data/meta --default_replica_factor=1 --default_parts_num=100 + +############## non-volatile cache ############## +# Cache file location +--nv_cache_path=/tmp/cache +# Cache file size in MB +--nv_cache_size=0 +# DRAM part size of non-volatile cache in MB +--nv_dram_size=50 +# DRAM part bucket power. The value is a logarithm with a base of 2. Optional values are 0-32. +--nv_bucket_power=20 +# DRAM part lock power. The value is a logarithm with a base of 2. The recommended value is max(1, nv_bucket_power - 10). +--nv_lock_power=10 diff --git a/conf/nebula-storaged.conf.default b/conf/nebula-storaged.conf.default index 73a74c43a9b..c20bdb3c20d 100644 --- a/conf/nebula-storaged.conf.default +++ b/conf/nebula-storaged.conf.default @@ -156,3 +156,15 @@ --rebuild_index_part_rate_limit=4194304 # The amount of data sent in each batch when leader synchronizes rebuilding index --rebuild_index_batch_size=1048576 + +############## non-volatile cache ############## +# Cache file location +--nv_cache_path=/tmp/cache +# Cache file size in MB +--nv_cache_size=0 +# DRAM part size of non-volatile cache in MB +--nv_dram_size=50 +# DRAM part bucket power. The value is a logarithm with a base of 2. Optional values are 0-32. +--nv_bucket_power=20 +# DRAM part lock power. The value is a logarithm with a base of 2. The recommended value is max(1, nv_bucket_power - 10). +--nv_lock_power=10 diff --git a/conf/nebula-storaged.conf.production b/conf/nebula-storaged.conf.production index cda67b60192..733baaf8d9d 100644 --- a/conf/nebula-storaged.conf.production +++ b/conf/nebula-storaged.conf.production @@ -137,6 +137,18 @@ # TTL in seconds for empty key items in the cache --empty_key_item_ttl=300 +############## non-volatile cache ############## +# Cache file location +--nv_cache_path=/tmp/cache +# Cache file size in MB +--nv_cache_size=0 +# DRAM part size of non-volatile cache in MB +--nv_dram_size=50 +# DRAM part bucket power. The value is a logarithm with a base of 2. Optional values are 0-32. +--nv_bucket_power=20 +# DRAM part lock power. The value is a logarithm with a base of 2. The recommended value is max(1, nv_bucket_power - 10). +--nv_lock_power=10 + ############### misc #################### # Whether remove outdated space data --auto_remove_invalid_space=true diff --git a/src/common/base/CMakeLists.txt b/src/common/base/CMakeLists.txt index 95539f0266e..0810e1176e5 100644 --- a/src/common/base/CMakeLists.txt +++ b/src/common/base/CMakeLists.txt @@ -17,4 +17,9 @@ nebula_add_library( ${gdb_debug_script} ) +nebula_add_library( + cache_obj OBJECT + CacheLibLRU.cpp +) + nebula_add_subdirectory(test) diff --git a/src/common/base/CacheLibLRU.cpp b/src/common/base/CacheLibLRU.cpp new file mode 100644 index 00000000000..3eac92b4fa2 --- /dev/null +++ b/src/common/base/CacheLibLRU.cpp @@ -0,0 +1,137 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "common/base/CacheLibLRU.h" + +#include +#include + +namespace nebula { + +nebula::cpp2::ErrorCode CacheLibLRU::initializeCache() { + VLOG(4) << "Using the following cache config" + << folly::toPrettyJson(folly::toDynamic(allocConfig_.serialize())); + try { + allocConfig_.validate(); // will throw if bad config + nebulaCache_ = std::make_unique(allocConfig_); + } catch (const std::exception& e) { + // We do not stop the service. Users should refer to the log to determine whether to restart + // the service. + LOG(ERROR) << "Cache configuration error: " << e.what(); + return nebula::cpp2::ErrorCode::E_UNKNOWN; + } + + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +nebula::cpp2::ErrorCode CacheLibLRU::addPool(std::string poolName, uint32_t poolSize) { + if (poolIdMap_.find(poolName) != poolIdMap_.end()) { + LOG(ERROR) << "Cache pool creation error. Cache pool exists: " << poolName.data(); + return nebula::cpp2::ErrorCode::E_EXISTED; + } + try { + auto poolId = nebulaCache_->addPool(poolName, poolSize * 1024UL * 1024UL); + poolIdMap_[poolName] = poolId; + } catch (const std::exception& e) { + LOG(ERROR) << "Adding cache pool error: " << e.what(); + return nebula::cpp2::ErrorCode::E_NOT_ENOUGH_SPACE; + } + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +nebula::cpp2::ErrorCode CacheLibLRU::get(const std::string& key, std::string* value) { + folly::SharedMutex::ReadHolder rHolder(lock_); + auto itemHandle = nebulaCache_->find(key); + if (itemHandle) { + value->assign(reinterpret_cast(itemHandle->getMemory()), itemHandle->getSize()); + return nebula::cpp2::ErrorCode::SUCCEEDED; + } + VLOG(4) << "Cache miss: " << key << " Not Found"; + return nebula::cpp2::ErrorCode::E_CACHE_MISS; +} + +Cache::ItemHandle CacheLibLRU::find(const std::string& key) { + return nebulaCache_->find(key); +} + +nebula::cpp2::ErrorCode CacheLibLRU::put(const std::string& key, + const std::string& value, + std::string poolName, + uint32_t ttl) { + if (poolIdMap_.find(poolName) == poolIdMap_.end()) { + LOG(ERROR) << "Cache write error. Pool does not exist: " << poolName.data(); + return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND; + } + auto itemHandle = nebulaCache_->allocate(poolIdMap_[poolName], key, value.size(), ttl); + if (!itemHandle) { + LOG(ERROR) << "Cache write error. Too many pending writes."; + return nebula::cpp2::ErrorCode::E_CACHE_WRITE_FAILURE; + } + + { + folly::SharedMutex::WriteHolder wHolder(lock_); + std::memcpy(itemHandle->getMemory(), value.data(), value.size()); + nebulaCache_->insertOrReplace(itemHandle); + } + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +ErrorOr CacheLibLRU::allocateItem( + const std::string& key, size_t size, std::string poolName, uint32_t ttl) { + if (poolIdMap_.find(poolName) == poolIdMap_.end()) { + LOG(ERROR) << "Cache write error. Pool does not exist: " << poolName; + return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND; + } + auto itemHandle = nebulaCache_->allocate(poolIdMap_[poolName], key, size, ttl); + if (!itemHandle) { + LOG(ERROR) << "Cache write error. Too many pending writes."; + return nebula::cpp2::ErrorCode::E_CACHE_WRITE_FAILURE; + } + return itemHandle; +} + +Cache::ItemHandle CacheLibLRU::insertOrReplace(Cache::ItemHandle& handle) { + return nebulaCache_->insertOrReplace(handle); +} + +void CacheLibLRU::erase(const std::string& key) { + nebulaCache_->remove(key); +} + +nebula::cpp2::ErrorCode CacheLibLRU::invalidateItem(const std::string& key) { + folly::SharedMutex::WriteHolder wHolder(lock_); + VLOG(4) << "Invalidate vertex key: " << folly::hexlify(key); + nebulaCache_->remove(key); + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +nebula::cpp2::ErrorCode CacheLibLRU::invalidateItems(const std::vector& keys) { + folly::SharedMutex::WriteHolder wHolder(lock_); + for (auto key : keys) { + VLOG(4) << "Invalidate vertex key: " << folly::hexlify(key); + nebulaCache_->remove(key); + } + return nebula::cpp2::ErrorCode::SUCCEEDED; +} + +ErrorOr CacheLibLRU::getConfiguredPoolSize( + const std::string& poolName) { + if (poolIdMap_.find(poolName) == poolIdMap_.end()) { + LOG(ERROR) << "Get cache pool size error. Pool does not exist: " << poolName.data(); + return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND; + } + return nebulaCache_->getPoolStats(poolIdMap_[poolName]).poolSize; +} + +ErrorOr CacheLibLRU::getPoolCacheHitCount( + const std::string& poolName) { + if (poolIdMap_.find(poolName) == poolIdMap_.end()) { + LOG(ERROR) << "Get cache hit count error. Pool does not exist: " << poolName.data(); + return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND; + } + return nebulaCache_->getPoolStats(poolIdMap_[poolName]).numPoolGetHits; +} + +} // namespace nebula diff --git a/src/common/base/CacheLibLRU.h b/src/common/base/CacheLibLRU.h index 5759f062d93..16991a80475 100644 --- a/src/common/base/CacheLibLRU.h +++ b/src/common/base/CacheLibLRU.h @@ -1,4 +1,4 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. +/* Copyright (c) 2022 vesoft inc. All rights reserved. * * This source code is licensed under Apache 2.0 License. */ @@ -16,42 +16,19 @@ namespace nebula { using Cache = facebook::cachelib::LruAllocator; +/** + * @brief Provide a cache instance based on CacheLib + */ class CacheLibLRU { public: - explicit CacheLibLRU(std::string name, uint32_t capacity, uint32_t cacheEntriesPower) - : name_(name), capacity_(capacity), cacheEntriesPower_(cacheEntriesPower) {} + explicit CacheLibLRU(Cache::Config& config) : allocConfig_(config) {} /** - * @brief Create cache instance. If there is any exception, we will allow the process continue. + * @brief Create a cachelib instance from config * - * @param poolName: the pool to allocate cache space * @return nebula::cpp2::ErrorCode */ - nebula::cpp2::ErrorCode initializeCache() { - Cache::Config config; - if (cacheEntriesPower_ > kMaxCacheEntriesPower) { - LOG(WARNING) << "Estimated number of cache entries exceeds the cache limit. Nebula will trim " - "the the maximum allowed: " - << kMaxCacheEntriesPower; - cacheEntriesPower_ = kMaxCacheEntriesPower; - } - try { - config - // size cannot exceed the maximum cache size (274'877'906'944 bytes) - .setCacheSize(capacity_ * 1024UL * 1024UL) - .setCacheName(name_) - .setAccessConfig(std::pow(2, cacheEntriesPower_)) - .validate(); // will throw if bad config - } catch (const std::exception& e) { - // We do not stop the service. Users should refer to the log to determine whether to restart - // the service. - LOG(ERROR) << "Cache configuration error: " << e.what(); - return nebula::cpp2::ErrorCode::E_UNKNOWN; - } - nebulaCache_ = std::make_unique(config); - - return nebula::cpp2::ErrorCode::SUCCEEDED; - } + nebula::cpp2::ErrorCode initializeCache(); /** * @brief add cache pool into cache instance @@ -60,40 +37,19 @@ class CacheLibLRU { * @param poolSize * @return nebula::cpp2::ErrorCode */ - nebula::cpp2::ErrorCode addPool(std::string poolName, uint32_t poolSize) { - if (poolIdMap_.find(poolName) != poolIdMap_.end()) { - LOG(ERROR) << "Cache pool creation error. Cache pool exists: " << poolName.data(); - return nebula::cpp2::ErrorCode::E_EXISTED; - } - try { - auto poolId = nebulaCache_->addPool(poolName, poolSize * 1024 * 1024); - poolIdMap_[poolName] = poolId; - } catch (const std::exception& e) { - LOG(ERROR) << "Adding cache pool error: " << e.what(); - return nebula::cpp2::ErrorCode::E_NOT_ENOUGH_SPACE; - } - return nebula::cpp2::ErrorCode::SUCCEEDED; - } + nebula::cpp2::ErrorCode addPool(std::string poolName, uint32_t poolSize); + /**** The following get, put, invalidateItem(s) will manage the race condition via a lock ****/ /** - * @brief Get key from cache. Return true if found. + * @brief Get key from cache. Return true if found. Will manage race condition internally. * * @param key * @return Error (cache miss) or value */ - nebula::cpp2::ErrorCode get(const std::string& key, std::string* value) { - folly::SharedMutex::ReadHolder rHolder(lock_); - auto itemHandle = nebulaCache_->find(key); - if (itemHandle) { - value->assign(reinterpret_cast(itemHandle->getMemory()), itemHandle->getSize()); - return nebula::cpp2::ErrorCode::SUCCEEDED; - } - VLOG(3) << "Cache miss: " << key << " Not Found"; - return nebula::cpp2::ErrorCode::E_CACHE_MISS; - } + nebula::cpp2::ErrorCode get(const std::string& key, std::string* value); /** - * @brief Insert or update value in cache pool + * @brief Insert or update value in cache pool. Will manage race condition internally. * * @param key * @param value @@ -104,54 +60,25 @@ class CacheLibLRU { nebula::cpp2::ErrorCode put(const std::string& key, const std::string& value, std::string poolName, - uint32_t ttl = 300) { - if (poolIdMap_.find(poolName) == poolIdMap_.end()) { - LOG(ERROR) << "Cache write error. Pool does not exist: " << poolName.data(); - return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND; - } - auto itemHandle = nebulaCache_->allocate(poolIdMap_[poolName], key, value.size(), ttl); - if (!itemHandle) { - LOG(ERROR) << "Cache write error. Too many pending writes."; - return nebula::cpp2::ErrorCode::E_CACHE_WRITE_FAILURE; - } - - { - folly::SharedMutex::WriteHolder wHolder(lock_); - std::memcpy(itemHandle->getMemory(), value.data(), value.size()); - nebulaCache_->insertOrReplace(itemHandle); - } - return nebula::cpp2::ErrorCode::SUCCEEDED; - } + uint32_t ttl = 300); /** - * @brief CacheLib will first search for the key. If found, remove it. - * Note here we do not log anything if not found, as it can have a good chance that an item is not - * in the cache. + * @brief CacheLib will first search for the key. If found, remove it. Will manage race condition + * internally. Note here we do not log anything if not found, as it can have a good chance that an + * item is not in the cache. * * @param key * @return nebula::cpp2::ErrorCode */ - nebula::cpp2::ErrorCode invalidateItem(const std::string& key) { - folly::SharedMutex::WriteHolder wHolder(lock_); - VLOG(3) << "Invalidate vertex key: " << folly::hexlify(key); - nebulaCache_->remove(key); - return nebula::cpp2::ErrorCode::SUCCEEDED; - } + nebula::cpp2::ErrorCode invalidateItem(const std::string& key); /** - * @brief CacheLib remove multiple items + * @brief CacheLib remove multiple items. Will manage race condition internally. * * @param keys * @return nebula::cpp2::ErrorCode */ - nebula::cpp2::ErrorCode invalidateItems(const std::vector& keys) { - folly::SharedMutex::WriteHolder wHolder(lock_); - for (auto key : keys) { - VLOG(3) << "Invalidate vertex key: " << folly::hexlify(key); - nebulaCache_->remove(key); - } - return nebula::cpp2::ErrorCode::SUCCEEDED; - } + nebula::cpp2::ErrorCode invalidateItems(const std::vector& keys); /** * @brief Get the configured size of the pool @@ -159,13 +86,7 @@ class CacheLibLRU { * @param poolName * @return Error (pool not existing) or unit64_t */ - ErrorOr getConfiguredPoolSize(const std::string& poolName) { - if (poolIdMap_.find(poolName) == poolIdMap_.end()) { - LOG(ERROR) << "Get cache pool size error. Pool does not exist: " << poolName.data(); - return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND; - } - return nebulaCache_->getPoolStats(poolIdMap_[poolName]).poolSize; - } + ErrorOr getConfiguredPoolSize(const std::string& poolName); /** * @brief Get the count of cache hit of a pool @@ -173,21 +94,50 @@ class CacheLibLRU { * @param poolName * @return Error (pool not existing) or unit64_t */ - ErrorOr getPoolCacheHitCount(const std::string& poolName) { - if (poolIdMap_.find(poolName) == poolIdMap_.end()) { - LOG(ERROR) << "Get cache hit count error. Pool does not exist: " << poolName.data(); - return nebula::cpp2::ErrorCode::E_POOL_NOT_FOUND; - } - return nebulaCache_->getPoolStats(poolIdMap_[poolName]).numPoolGetHits; - } + ErrorOr getPoolCacheHitCount(const std::string& poolName); + + /**** The following find, allocate, insert, erase will not handle race conditions ****/ + /** + * @brief Get the ItemHandle of the key. Require callers to handle race condition. + * + * @param key + * @return Cache::ItemHandle + */ + Cache::ItemHandle find(const std::string& key); + + /** + * @brief Allocate a itemHandle for the key. Require callers to handle race condition. + * + * @param key + * @param size + * @param poolName + * @param ttl + * @return ErrorOr + */ + ErrorOr allocateItem(const std::string& key, + size_t size, + std::string poolName, + uint32_t ttl = 0); + + /** + * @brief Insert the itemHandle to cache. Require callers to handle race condition. + * + * @param handle + * @return Cache::ItemHandle + */ + Cache::ItemHandle insertOrReplace(Cache::ItemHandle& handle); + + /** + * @brief Erase the key from cache. Require callers to handle race condition. + * + * @param key + */ + void erase(const std::string& key); private: - static constexpr uint32_t kMaxCacheEntriesPower = 31; // the cap by cachelib - std::unique_ptr nebulaCache_ = nullptr; + std::unique_ptr nebulaCache_{nullptr}; std::unordered_map poolIdMap_; - std::string name_; - uint32_t capacity_ = 0; // in MB - uint32_t cacheEntriesPower_ = 20; // estimated number of cache entries in base 2 logarithm + Cache::Config allocConfig_{}; // CacheLib does not protect data at item level. We need to synchronize the access. folly::SharedMutex lock_; diff --git a/src/daemons/CMakeLists.txt b/src/daemons/CMakeLists.txt index 12f33096bfc..2991d40923d 100644 --- a/src/daemons/CMakeLists.txt +++ b/src/daemons/CMakeLists.txt @@ -48,6 +48,7 @@ set(storage_meta_deps $ $ $ + $ $ $ $ diff --git a/src/drainer/test/CMakeLists.txt b/src/drainer/test/CMakeLists.txt index 8ea4954df3c..d5ff56fa9d2 100644 --- a/src/drainer/test/CMakeLists.txt +++ b/src/drainer/test/CMakeLists.txt @@ -23,6 +23,7 @@ set(drainer_test_deps $ $ $ + $ $ $ $ diff --git a/src/kvstore/RocksEngineConfig.cpp b/src/kvstore/RocksEngineConfig.cpp index a1a88b51a94..1b044db1364 100644 --- a/src/kvstore/RocksEngineConfig.cpp +++ b/src/kvstore/RocksEngineConfig.cpp @@ -19,6 +19,7 @@ #include "common/fs/FileUtils.h" #include "common/utils/NebulaKeyUtils.h" #include "kvstore/EventListener.h" +#include "kvstore/cache/NVCache.h" // [WAL] DEFINE_bool(rocksdb_disable_wal, false, "Whether to disable the WAL in rocksdb"); @@ -310,8 +311,16 @@ rocksdb::Status initRocksdbOptions(rocksdb::Options& baseOpts, if (FLAGS_rocksdb_block_cache <= 0) { bbtOpts.no_block_cache = true; } else { - static std::shared_ptr blockCache = - rocksdb::NewLRUCache(FLAGS_rocksdb_block_cache * 1024 * 1024, FLAGS_cache_bucket_exp); + rocksdb::LRUCacheOptions cacheOpts( + FLAGS_rocksdb_block_cache * 1024 * 1024, FLAGS_cache_bucket_exp, false, 0.5); + if (FLAGS_nv_cache_size > 0) { + static std::shared_ptr secondaryCache = std::make_shared(); + if (secondaryCache->init()) { + cacheOpts.secondary_cache = secondaryCache; + } + } + + static std::shared_ptr blockCache = rocksdb::NewLRUCache(cacheOpts); bbtOpts.block_cache = blockCache; } diff --git a/src/kvstore/cache/CMakeLists.txt b/src/kvstore/cache/CMakeLists.txt index bf2fcbe6566..4028b7bf5bc 100644 --- a/src/kvstore/cache/CMakeLists.txt +++ b/src/kvstore/cache/CMakeLists.txt @@ -1,6 +1,7 @@ nebula_add_library( storage_cache_obj OBJECT StorageCache.cpp + NVCache.cpp ) nebula_add_subdirectory(test) diff --git a/src/kvstore/cache/NVCache.cpp b/src/kvstore/cache/NVCache.cpp new file mode 100644 index 00000000000..7594febcc92 --- /dev/null +++ b/src/kvstore/cache/NVCache.cpp @@ -0,0 +1,113 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include "kvstore/cache/NVCache.h" + +DEFINE_uint32(navy_item_ttl, 300, "TTL for vertex item in the cache"); + +DEFINE_string(nv_cache_path, "", "Non volatile cache path"); +DEFINE_uint64(nv_cache_size, 0, "Non volatile cache size in MB"); +DEFINE_uint32(nv_dram_size, 0, "DRAM part size of the NV cache in MB"); +DEFINE_uint32(nv_bucket_power, 10, "DRAM part size of the NV cache in MB"); +DEFINE_uint32(nv_lock_power, 1, "DRAM part size of the NV cache in MB"); + +namespace nebula { +namespace kvstore { + +NVCache::NVCache() { + dramCapacity_ = FLAGS_nv_dram_size; + nvCapacity_ = FLAGS_nv_cache_size; + nvPath_ = FLAGS_nv_cache_path; + + auto allocConfig = getNVConfig(); + allocConfig.setCacheName(kNVCacheName); + + cacheInternal_ = std::make_unique(allocConfig); +} + +bool NVCache::init() { + LOG(INFO) << "Start nvm cache on " << FLAGS_nv_cache_path; + auto ret = cacheInternal_->initializeCache(); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(WARNING) << "Cache initialization failed."; + return false; + } + + LOG(INFO) << "Create a DRAM pool: " << kNVPool; + ret = cacheInternal_->addPool(kNVPool, (uint32_t)(FLAGS_nv_dram_size * 0.8)); + if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) { + LOG(WARNING) << "Adding cache pool failed."; + return false; + } + return true; +} + +std::unique_ptr NVCache::Lookup( + const rocksdb::Slice& key, const rocksdb::Cache::CreateCallback& create_cb, bool wait) { + folly::SharedMutex::ReadHolder rHolder(lock_); + std::unique_ptr resultHandle = nullptr; + // Note that itemhandle is essentially ReadItemHandleImpl struct. It's never a nullptr. + auto itemHandle = cacheInternal_->find(key.ToString()); + + VLOG(4) << "Key:" << key.ToString() << " Went to nvm: " << itemHandle.wentToNvm() + << " Is expired: " << itemHandle.wasExpired(); + + if (wait) { + itemHandle.wait(); + } + + if (itemHandle) { + resultHandle.reset(new NVCacheResultHandle(std::move(itemHandle), create_cb)); + } + + return resultHandle; +} + +rocksdb::Status NVCache::Insert(const rocksdb::Slice& key, + void* value, + const rocksdb::Cache::CacheItemHelper* helper) { + // value pointer is checked inside the callback + size_t size = (*helper->size_cb)(value); + + // allocate memory for out buffer + auto res = cacheInternal_->allocateItem(key.ToString(), size, kNVPool); + if (!nebula::ok(res)) { + return rocksdb::Status::Corruption("Cache write error"); + } + auto itemHandle = nebula::value(std::move(res)); + + { + folly::SharedMutex::WriteHolder wHolder(lock_); + // the SaveTo callback will convert value into Block* + auto s = (*helper->saveto_cb)(value, 0, size, itemHandle->getMemory()); + if (!s.ok()) { + return s; + } + cacheInternal_->insertOrReplace(itemHandle); + } + return rocksdb::Status::OK(); +} + +void NVCache::Erase(const rocksdb::Slice& key) { + folly::SharedMutex::WriteHolder wHolder(lock_); + cacheInternal_->erase(key.ToString()); +} + +void NVCache::WaitAll(std::vector handles) { + for (auto h : handles) { + h->Wait(); + } +} + +std::string NVCache::GetPrintableOptions() const { + std::string ret; + ret.append(folly::stringPrintf("DRAM size: %d\n", dramCapacity_)); + ret.append(folly::stringPrintf("NVM size: %d\n", nvCapacity_)); + ret.append(folly::stringPrintf("NVM path: %s\n", nvPath_.c_str())); + return ret; +} + +} // namespace kvstore +} // namespace nebula diff --git a/src/kvstore/cache/NVCache.h b/src/kvstore/cache/NVCache.h new file mode 100644 index 00000000000..c560a4cdd3f --- /dev/null +++ b/src/kvstore/cache/NVCache.h @@ -0,0 +1,95 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef STORAGE_CACHE_NVCACHE_H +#define STORAGE_CACHE_NVCACHE_H + +#include +#include +#include + +#include "common/base/Base.h" +#include "common/base/CacheLibLRU.h" +#include "kvstore/cache/NVCacheResultHandle.h" +#include "kvstore/cache/NVUtils.h" + +namespace nebula { +namespace kvstore { + +static const char kNVCacheName[] = "__NVCache__"; +static const char kNVPool[] = "NVPool"; + +class NVCache : public rocksdb::SecondaryCache { + public: + NVCache(); + + ~NVCache() override { + LOG(INFO) << "Destroy NVM cache"; + } + + bool init(); + + const char* Name() const override { + return kNVCacheName; + } + + /** + * @brief Lookup the data for the given key in NV Cache + * + * @param key + * @param create_cb Used to create the object. The current implementation in RocksDB: + * using CreateCallback = std::function; + * create_cb_ will convert the buf into out_obj. + * It is provided by GetCreateCallback in table/block_based/block_like_traits.h. + * + * @param wait Whether to wait for the return + * @param is_in_sec_cache (not existed in our current outdated rocksdb) + * @return std::unique_ptr + */ + std::unique_ptr Lookup( + const rocksdb::Slice& key, + const rocksdb::Cache::CreateCallback& create_cb, + bool wait) override; + + /** + * @brief Insert the given value into NV cache + * + * @param key + * @param value + * @param helper Helper function used to get size and extract value written to NV Cache + * struct CacheItemHelper { + * SizeCallback size_cb; + * SaveToCallback saveto_cb; + * DeleterFn del_cb; + * ... + * }; + * It is provided by GetCacheItemHelperForRole in table/block_based/block_like_traits.h. + * @return rocksdb::Status + */ + rocksdb::Status Insert(const rocksdb::Slice& key, + void* value, + const rocksdb::Cache::CacheItemHelper* helper) override; + + void Erase(const rocksdb::Slice& key) override; + + // Wait for a collection of handles to become ready + void WaitAll(std::vector handles) override; + + std::string GetPrintableOptions() const override; + + private: + uint32_t dramCapacity_ = 0; // in MB + uint32_t nvCapacity_ = 0; // in MB + std::string nvPath_{}; + std::unique_ptr cacheInternal_{nullptr}; + // CacheLib does not protect data at item level. We need to synchronize the access. + folly::SharedMutex lock_; +}; + +} // namespace kvstore +} // namespace nebula + +#endif // STORAGE_CACHE_NVCACHE_H diff --git a/src/kvstore/cache/NVCacheResultHandle.h b/src/kvstore/cache/NVCacheResultHandle.h new file mode 100644 index 00000000000..141344805ba --- /dev/null +++ b/src/kvstore/cache/NVCacheResultHandle.h @@ -0,0 +1,73 @@ +/* Copyright (c) 2022vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#ifndef STORAGE_CACHE_NVCACHERESULTHANDLE_H +#define STORAGE_CACHE_NVCACHERESULTHANDLE_H + +#include + +#include "common/base/CacheLibLRU.h" + +namespace nebula { +namespace kvstore { + +class NVCacheResultHandle : public rocksdb::SecondaryCacheResultHandle { + public: + // Need to pass in the ItemHandle from the cachelib, in order to achieve more complex functions + // like wait, isReady. + NVCacheResultHandle(Cache::ItemHandle&& itemHandle, + const rocksdb::Cache::CreateCallback& create_cb) + : itemHandle_(std::move(itemHandle)), create_cb_(std::move(create_cb)) {} + + ~NVCacheResultHandle() = default; + + // Explicitly check whether data has been read from navy + bool IsReady() override { + return itemHandle_.isReady(); + } + + // Explicityly wait for the data to get promoted from navy + void Wait() override { + itemHandle_.wait(); + } + + /** + * @brief Return the value converted by create_cb_. + * In the current implementation provided by RocksDB, charge is set to be the same as the input + * buffer size. + * Note that "==", "->" have been overloaded in cachelib and it will wait for the data ready + * implicitly. + * @return void* + */ + + void* Value() override { + void* value = nullptr; + size_t charge = 0; + + // The == operator is overloaded. It will compare the internal *item with the readItemHandle. It + // will retrieve the internal item and wait implicitily. + if (itemHandle_) { + auto status = create_cb_(itemHandle_->getMemory(), itemHandle_->getSize(), &value, &charge); + if (!status.ok()) { + return value; + } + } + return value; + } + + // It will wait for the data ready implicitly + size_t Size() override { + return itemHandle_->getSize(); + } + + private: + Cache::ItemHandle itemHandle_; + const rocksdb::Cache::CreateCallback create_cb_; +}; + +} // namespace kvstore +} // namespace nebula + +#endif diff --git a/src/kvstore/cache/NVUtils.h b/src/kvstore/cache/NVUtils.h new file mode 100644 index 00000000000..2684a5f64d5 --- /dev/null +++ b/src/kvstore/cache/NVUtils.h @@ -0,0 +1,42 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#pragma once + +#include +#include + +DECLARE_uint32(navy_item_ttl); + +DECLARE_string(nv_cache_path); +DECLARE_uint64(nv_cache_size); +DECLARE_uint32(nv_dram_size); +DECLARE_uint32(nv_bucket_power); +DECLARE_uint32(nv_lock_power); + +namespace nebula { +namespace kvstore { + +inline facebook::cachelib::LruAllocator::Config getNVConfig() { + facebook::cachelib::navy::NavyConfig navyConfig{}; + navyConfig.setSimpleFile(FLAGS_nv_cache_path, + FLAGS_nv_cache_size * 1024ULL * 1024ULL /*fileSize*/, + false /*truncateFile*/); + navyConfig.setBlockSize(8192); + navyConfig.blockCache().setRegionSize(16 * 1024 * 1024); + + facebook::cachelib::LruAllocator::Config allocConfig; + facebook::cachelib::LruAllocator::NvmCacheConfig nvmConfig; + nvmConfig.navyConfig = navyConfig; + allocConfig.enableNvmCache(nvmConfig); + + allocConfig.setCacheSize(FLAGS_nv_dram_size * 1024 * 1024); + + allocConfig.setAccessConfig({FLAGS_nv_bucket_power, FLAGS_nv_lock_power}); + return allocConfig; +} + +} // namespace kvstore +} // namespace nebula diff --git a/src/kvstore/cache/StorageCache.cpp b/src/kvstore/cache/StorageCache.cpp index d8f1875dc6f..abf8ea06a4a 100644 --- a/src/kvstore/cache/StorageCache.cpp +++ b/src/kvstore/cache/StorageCache.cpp @@ -31,8 +31,19 @@ namespace kvstore { StorageCache::StorageCache() { capacity_ = FLAGS_storage_cache_capacity; - cacheInternal_ = std::make_unique( - kStorageCacheName, capacity_, FLAGS_storage_cache_entries_power); + cacheEntriesPower_ = FLAGS_storage_cache_entries_power; + if (cacheEntriesPower_ > kMaxCacheEntriesPower) { + LOG(WARNING) << "Estimated number of cache entries exceeds the cache limit. Nebula will trim " + "the the maximum allowed: " + << kMaxCacheEntriesPower; + cacheEntriesPower_ = kMaxCacheEntriesPower; + } + facebook::cachelib::LruAllocator::Config allocConfig; + allocConfig.setCacheSize(FLAGS_storage_cache_capacity * 1024UL * 1024UL) + .setCacheName(kStorageCacheName) + .setAccessConfig(std::pow(2, cacheEntriesPower_)); + + cacheInternal_ = std::make_unique(allocConfig); } bool StorageCache::init() { diff --git a/src/kvstore/cache/StorageCache.h b/src/kvstore/cache/StorageCache.h index 60244da0d12..d06e67b34b3 100644 --- a/src/kvstore/cache/StorageCache.h +++ b/src/kvstore/cache/StorageCache.h @@ -134,7 +134,9 @@ class StorageCache { std::vector& keysToRemove); private: - uint32_t capacity_ = 0; // in MB + static constexpr uint32_t kMaxCacheEntriesPower = 31; // the cap by cachelib + uint32_t capacity_ = 0; // in MB + uint32_t cacheEntriesPower_ = 20; // estimated number of cache entries in base 2 logarithm std::unique_ptr cacheInternal_{nullptr}; std::shared_ptr vertexPool_{nullptr}; std::shared_ptr emptyKeyPool_{nullptr}; diff --git a/src/kvstore/cache/test/CMakeLists.txt b/src/kvstore/cache/test/CMakeLists.txt index 344556c1e4c..dde16a86dd2 100644 --- a/src/kvstore/cache/test/CMakeLists.txt +++ b/src/kvstore/cache/test/CMakeLists.txt @@ -6,6 +6,7 @@ set(cache_test_deps $ $ $ + $ $ $ $ @@ -47,3 +48,19 @@ nebula_add_executable( wangle gtest ) + +nebula_add_executable( + NAME + nv_cache_test + SOURCES + NVCacheTest.cpp + OBJECTS + ${cache_test_deps} + LIBRARIES + ${CACHELIB_LIBRARIES} + ${THRIFT_LIBRARIES} + ${ROCKSDB_LIBRARIES} + ${PROXYGEN_LIBRARIES} + wangle + gtest +) diff --git a/src/kvstore/cache/test/NVCacheTest.cpp b/src/kvstore/cache/test/NVCacheTest.cpp new file mode 100644 index 00000000000..d7444d78337 --- /dev/null +++ b/src/kvstore/cache/test/NVCacheTest.cpp @@ -0,0 +1,147 @@ +/* Copyright (c) 2022 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +#include +#include + +#include "common/fs/TempDir.h" +#include "kvstore/cache/NVCache.h" + +namespace nebula { +namespace kvstore { + +class NVCacheTest : public ::testing::Test { + public: + std::unique_ptr createCache() { + return std::make_unique(); + } + + std::string cacheKey = "test key"; + std::string cacheValue = "test prop"; + std::string longText = decltype(longText)(4096, 'a'); // 4KB data + + protected: + class TestItem { + public: + TestItem(const char* buf, size_t size) : buf_(new char[size]), size_(size) { + memcpy(buf_.get(), buf, size); + } + ~TestItem() {} + + char* Buf() { + return buf_.get(); + } + size_t Size() { + return size_; + } + + private: + std::unique_ptr buf_; + size_t size_; + }; + + static size_t SizeCallback(void* obj) { + return reinterpret_cast(obj)->Size(); + } + + static rocksdb::Status SaveToCallback(void* fromObj, + size_t fromOffset, + size_t length, + void* out) { + TestItem* item = reinterpret_cast(fromObj); + const char* buf = item->Buf(); + EXPECT_EQ(length, item->Size()); + EXPECT_EQ(fromOffset, 0); + memcpy(out, buf, length); + return rocksdb::Status::OK(); + } + + static void DeletionCallback(const rocksdb::Slice& /*key*/, void* obj) { + delete reinterpret_cast(obj); + obj = nullptr; + } + + static rocksdb::Cache::CacheItemHelper helper_; + + rocksdb::Cache::CreateCallback testItemCreator = + [&](const void* buf, size_t size, void** out_obj, size_t* charge) -> rocksdb::Status { + *out_obj = reinterpret_cast(new TestItem(reinterpret_cast(buf), size)); + *charge = size; + return rocksdb::Status::OK(); + }; +}; + +rocksdb::Cache::CacheItemHelper NVCacheTest::helper_(NVCacheTest::SizeCallback, + NVCacheTest::SaveToCallback, + NVCacheTest::DeletionCallback); + +TEST_F(NVCacheTest, SimpleTest) { + FLAGS_nv_cache_size = 100; + FLAGS_nv_dram_size = 50; + fs::TempDir rootPath("/tmp/navy.XXXXXX"); + FLAGS_nv_cache_path = folly::stringPrintf("%s/%s", rootPath.path(), "CACHE"); + + auto cache = createCache(); + EXPECT_TRUE(cache->init()); + + NVCacheTest::TestItem item1(cacheValue.c_str(), cacheValue.size() + 1); + // put vertex + auto status = cache->Insert(cacheKey, &item1, &NVCacheTest::helper_); + EXPECT_TRUE(status.ok()); + + // get value + std::unique_ptr handle = + cache->Lookup(cacheKey, NVCacheTest::testItemCreator, true); + EXPECT_NE(handle, nullptr); + + std::unique_ptr val = + std::unique_ptr(static_cast(handle->Value())); + ASSERT_NE(val, nullptr); + EXPECT_EQ(memcmp(val->Buf(), cacheValue.c_str(), cacheValue.size() + 1), 0); +} + +TEST_F(NVCacheTest, ConcurrentWrites) { + FLAGS_nv_cache_size = 100; + FLAGS_nv_dram_size = 50; + fs::TempDir rootPath("/tmp/navy.XXXXXX"); + FLAGS_nv_cache_path = folly::stringPrintf("%s/%s", rootPath.path(), "CACHE"); + + auto cache = createCache(); + EXPECT_TRUE(cache->init()); + + for (auto i = 0; i < 10240; i++) { + std::string key = std::to_string(i); + std::string val = longText + key; + NVCacheTest::TestItem item1(val.c_str(), val.size() + 1); + // put vertex + auto status = cache->Insert(key, &item1, &NVCacheTest::helper_); + EXPECT_TRUE(status.ok()); + } + + // get value + for (auto i = 0; i < 10240; i++) { + std::string key = std::to_string(i); + std::unique_ptr handle = + cache->Lookup(key, NVCacheTest::testItemCreator, true); + + ASSERT_NE(handle, nullptr); + + std::unique_ptr val = std::unique_ptr( + static_cast(handle->Value())); + ASSERT_NE(val, nullptr); + std::string expectedVal = longText + key; + EXPECT_EQ(memcmp(val->Buf(), expectedVal.c_str(), expectedVal.size() + 1), 0); + } +} + +} // namespace kvstore +} // namespace nebula + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::init(&argc, &argv, true); + google::SetStderrLogging(google::INFO); + return RUN_ALL_TESTS(); +} diff --git a/src/kvstore/test/CMakeLists.txt b/src/kvstore/test/CMakeLists.txt index c3832a8e59f..0f6c7d3b815 100644 --- a/src/kvstore/test/CMakeLists.txt +++ b/src/kvstore/test/CMakeLists.txt @@ -18,6 +18,7 @@ set(KVSTORE_TEST_LIBS $ $ $ + $ $ $ $ diff --git a/src/kvstore/test/RocksEngineTest.cpp b/src/kvstore/test/RocksEngineTest.cpp index 5d4017d387e..80dc4335735 100644 --- a/src/kvstore/test/RocksEngineTest.cpp +++ b/src/kvstore/test/RocksEngineTest.cpp @@ -13,6 +13,7 @@ #include "common/utils/NebulaKeyUtils.h" #include "kvstore/RocksEngine.h" #include "kvstore/RocksEngineConfig.h" +#include "kvstore/cache/NVCache.h" namespace nebula { namespace kvstore { @@ -79,6 +80,75 @@ TEST_P(RocksEngineTest, RangeTest) { checkRange(1, 15, 10, 5); } +TEST_P(RocksEngineTest, SimpleTestWithSecondaryCache) { + fs::TempDir rootPath1("/tmp/rocksdb_engine_RangeTest.XXXXXX"); + fs::TempDir rootPath2("/tmp/rocksdb_engine_RangeTest.XXXXXX"); + fs::TempDir cachePath("/tmp/secondary_cache.XXXXXX"); + FLAGS_nv_cache_path = folly::stringPrintf("%s/%s", cachePath.path(), "CACHE"); + FLAGS_nv_cache_size = 50000; + FLAGS_nv_dram_size = 50; + FLAGS_rocksdb_block_cache = 4; + + std::string longText(4096, 'a'); // 4KB data + + // Create 2 rocksdb instances. These 2 instances should use the same secondary cache instance. + auto engine1 = std::make_unique(kSpaceId, kPartId, kDefaultVIdLen, rootPath1.path()); + auto engine2 = std::make_unique(kSpaceId, kPartId, kDefaultVIdLen, rootPath2.path()); + + auto checkValue = [&](RocksEngine* engine, int32_t engineId, int32_t keyId) { + std::string key = folly::stringPrintf("key_%d_%d", engineId, keyId); + std::string value; + + auto ret = engine->get(key, &value); + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, ret); + EXPECT_EQ(folly::stringPrintf("%s_%d", longText.c_str(), keyId), value); + }; + + // put data into engine1 and read out + { + std::vector data; + for (int32_t i = 0; i < 10240; i++) { + data.emplace_back(folly::stringPrintf("key_1_%d", i), + folly::stringPrintf("%s_%d", longText.c_str(), i)); + } + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine1->multiPut(std::move(data))); + + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine1->flush()); + + for (int32_t i = 0; i < 10240; i++) { + checkValue(engine1.get(), 1, i); + } + } + + // Although two rocksdb instances share the same cache, engine 2 must not find the value of keys + // in engine 1 + { + for (int32_t i = 0; i < 10240; i++) { + std::string key = folly::stringPrintf("key_1_%d", i); + std::string value; + auto ret = engine2->get(key, &value); + EXPECT_EQ(nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND, ret); + } + } + + // Write some data to engine 2. Both engine 1 and 2 and read data out + { + std::vector data; + for (int32_t i = 0; i < 100; i++) { + data.emplace_back(folly::stringPrintf("key_2_%d", i), + folly::stringPrintf("%s_%d", longText.c_str(), i)); + } + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine2->multiPut(std::move(data))); + + EXPECT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, engine2->flush()); + + for (int32_t i = 0; i < 100; i++) { + checkValue(engine1.get(), 1, i); + checkValue(engine2.get(), 2, i); + } + } +} + TEST_P(RocksEngineTest, PrefixTest) { fs::TempDir rootPath("/tmp/rocksdb_engine_PrefixTest.XXXXXX"); auto engine = std::make_unique(kSpaceId, kPartId, kDefaultVIdLen, rootPath.path()); diff --git a/src/meta/CMakeLists.txt b/src/meta/CMakeLists.txt index 802207bc97e..2a3a6bba3dc 100644 --- a/src/meta/CMakeLists.txt +++ b/src/meta/CMakeLists.txt @@ -164,6 +164,7 @@ set(meta_test_deps $ $ $ + $ $ $ $ diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index ff8e9a120ff..555d7e2519d 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -46,6 +46,7 @@ set(storage_test_deps $ $ $ + $ $ $ $ diff --git a/src/tools/CMakeLists.txt b/src/tools/CMakeLists.txt index 8ee6a0e4c10..fa94fa1514d 100644 --- a/src/tools/CMakeLists.txt +++ b/src/tools/CMakeLists.txt @@ -48,6 +48,7 @@ set(tools_test_deps $ $ $ + $ $ $ $