From 81efaa0bca07ea7f20404820209b36d8f1376e38 Mon Sep 17 00:00:00 2001 From: ilixiaocui Date: Wed, 30 Nov 2022 11:42:16 +0800 Subject: [PATCH] curvefs: support client to memcached cluster Signed-off-by: ilixiaocui --- WORKSPACE | 6 + curvefs/conf/client.conf | 11 +- curvefs/docker/debian10/Dockerfile | 3 + curvefs/docker/debian11/Dockerfile | 3 + curvefs/docker/debian9/Dockerfile | 3 + curvefs/proto/topology.proto | 39 + curvefs/src/client/BUILD | 1 + curvefs/src/client/common/config.cpp | 13 + curvefs/src/client/common/config.h | 6 + curvefs/src/client/fuse_client.cpp | 1 + curvefs/src/client/fuse_s3_client.cpp | 64 +- curvefs/src/client/fuse_s3_client.h | 2 + curvefs/src/client/kvclient/BUILD | 42 +- curvefs/src/client/kvclient/kvclient.h | 27 +- .../src/client/kvclient/kvclient_manager.cpp | 97 ++ .../src/client/kvclient/kvclient_manager.h | 139 +-- curvefs/src/client/kvclient/memcache_client.h | 82 +- curvefs/src/client/metric/client_metric.h | 9 + curvefs/src/client/rpcclient/mds_client.cpp | 6 + curvefs/src/client/rpcclient/mds_client.h | 9 + curvefs/src/client/s3/client_s3_adaptor.cpp | 4 +- .../src/client/s3/client_s3_cache_manager.cpp | 877 ++++++++++-------- .../src/client/s3/client_s3_cache_manager.h | 111 ++- curvefs/test/client/BUILD | 50 +- curvefs/test/client/client_memcache_test.cpp | 227 +++-- .../client/client_s3_adaptor_Integration.cpp | 111 ++- .../test/client/client_s3_adaptor_test.cpp | 5 +- .../test/client/file_cache_manager_test.cpp | 13 + curvefs/test/client/mock_kvclient.h | 46 + .../test/client/rpcclient/mock_mds_client.h | 8 +- curvefs/test/client/volume/BUILD.bazel | 1 + thirdparties/memcache/Makefile | 12 + thirdparties/memcache/memcache.BUILD | 19 + ut.sh | 38 +- util/build.sh | 2 + util/image.sh | 2 +- 36 files changed, 1295 insertions(+), 794 deletions(-) create mode 100644 curvefs/src/client/kvclient/kvclient_manager.cpp create mode 100644 curvefs/test/client/mock_kvclient.h create mode 100644 thirdparties/memcache/Makefile create mode 100644 thirdparties/memcache/memcache.BUILD diff --git a/WORKSPACE b/WORKSPACE index f231086494..0294f57ca3 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -179,6 +179,12 @@ new_local_repository( path = "thirdparties/aws/aws-sdk-cpp", ) +new_local_repository( + name = "libmemcached", + build_file = "//:thirdparties/memcache/memcache.BUILD", + path = "thirdparties/memcache/libmemcached-1.1.2", +) + # C++ rules for Bazel. http_archive( diff --git a/curvefs/conf/client.conf b/curvefs/conf/client.conf index 14231de5bb..059d294a9e 100644 --- a/curvefs/conf/client.conf +++ b/curvefs/conf/client.conf @@ -17,6 +17,7 @@ mdsOpt.rpcRetryOpt.normalRetryTimesBeforeTriggerWait=3 mdsOpt.rpcRetryOpt.waitSleepMs=1000 mdsOpt.rpcRetryOpt.addrs=127.0.0.1:6700,127.0.0.1:6701,127.0.0.1:6702 # __ANSIBLE_TEMPLATE__ {{ groups.mds | join_peer(hostvars, "mds_listen_port") }} __ANSIBLE_TEMPLATE__ + # # lease options # @@ -86,8 +87,14 @@ fuseClient.dCacheLruSize=1000000 fuseClient.enableICacheMetrics=true fuseClient.enableDCacheMetrics=true fuseClient.cto=true -# you should enable it when mount one filesystem to multi mount points, -# it guarantee the consistent of file after rename, otherwise you should + +### kvcache opt +fuseClient.supportKVcache=false +fuseClient.setThreadPool=4 +fuseClient.getThreadPool=4 + +# you shoudle enable it when mount one filesystem to multi mountpoints, +# it gurantee the consistent of file after rename, otherwise you should # disable it for performance. fuseClient.enableMultiMountPointRename=true # splice will bring higher performance in some cases diff --git a/curvefs/docker/debian10/Dockerfile b/curvefs/docker/debian10/Dockerfile index 45ac7bcac4..36d0e0d4b9 100644 --- a/curvefs/docker/debian10/Dockerfile +++ b/curvefs/docker/debian10/Dockerfile @@ -6,5 +6,8 @@ COPY entrypoint.sh / COPY curvefs/tools/sbin/curvefs_tool /usr/bin COPY libaws-cpp-sdk-core.so /usr/lib/ COPY libaws-cpp-sdk-s3-crt.so /usr/lib/ +COPY libmemcached.so /usr/lib/ +COPY libmemcached.so.11 /usr/lib/ +COPY libhashkit.so.2 /usr/lib RUN chmod a+x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/curvefs/docker/debian11/Dockerfile b/curvefs/docker/debian11/Dockerfile index f878fa603b..81ce7034c9 100644 --- a/curvefs/docker/debian11/Dockerfile +++ b/curvefs/docker/debian11/Dockerfile @@ -6,5 +6,8 @@ COPY entrypoint.sh / COPY curvefs/tools/sbin/curvefs_tool /usr/bin COPY libaws-cpp-sdk-core.so /usr/lib/ COPY libaws-cpp-sdk-s3-crt.so /usr/lib/ +COPY libmemcached.so /usr/lib/ +COPY libmemcached.so.11 /usr/lib/ +COPY libhashkit.so.2 /usr/lib RUN chmod a+x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/curvefs/docker/debian9/Dockerfile b/curvefs/docker/debian9/Dockerfile index 339ecd6cc1..692f973686 100644 --- a/curvefs/docker/debian9/Dockerfile +++ b/curvefs/docker/debian9/Dockerfile @@ -6,5 +6,8 @@ COPY entrypoint.sh / COPY curvefs/tools/sbin/curvefs_tool /usr/bin COPY libaws-cpp-sdk-core.so /usr/lib/ COPY libaws-cpp-sdk-s3-crt.so /usr/lib/ +COPY libmemcached.so /usr/lib/ +COPY libmemcached.so.11 /usr/lib/ +COPY libhashkit.so.2 /usr/lib RUN chmod a+x /entrypoint.sh ENTRYPOINT ["/entrypoint.sh"] diff --git a/curvefs/proto/topology.proto b/curvefs/proto/topology.proto index c935222415..d91dae5b8f 100644 --- a/curvefs/proto/topology.proto +++ b/curvefs/proto/topology.proto @@ -458,6 +458,41 @@ message ListTopologyResponse { required ListMetaServerResponse metaservers = 5; } +message MemcachedServer { + required string ip = 1; + required uint32 port = 2; +} + +message RegistMemcacheClusterRequest { + repeated MemcachedServer servers = 1; +} + +message RegistMemcacheClusterResponse { + required TopoStatusCode statusCode = 1; +} + +message ListMemcacheClusterRequest { +} + +message MemcacheCluster { + required uint32 clusterId = 1; + repeated MemcachedServer servers = 2; +} + +message ListMemcacheClusterResponse { + required TopoStatusCode statusCode = 1; + repeated MemcacheCluster memClusters = 2; +} + +message AllocOrGetMemcacheClusterRequest { + required uint32 fsId = 1; +} + +message AllocOrGetMemcacheClusterResponse { + required TopoStatusCode statusCode = 1; + optional MemcacheCluster cluster = 2; +} + service TopologyService { rpc RegistMetaServer(MetaServerRegistRequest) returns (MetaServerRegistResponse); rpc ListMetaServer(ListMetaServerRequest) returns (ListMetaServerResponse); @@ -493,4 +528,8 @@ service TopologyService { rpc StatMetadataUsage(StatMetadataUsageRequest) returns (StatMetadataUsageResponse); rpc ListTopology(ListTopologyRequest) returns (ListTopologyResponse); + + rpc RegistMemcacheCluster(RegistMemcacheClusterRequest) returns (RegistMemcacheClusterResponse); + rpc ListMemcacheCluster(ListMemcacheClusterRequest) returns (ListMemcacheClusterResponse); + rpc AllocOrGetMemcacheCluster(AllocOrGetMemcacheClusterRequest) returns (AllocOrGetMemcacheClusterResponse); } diff --git a/curvefs/src/client/BUILD b/curvefs/src/client/BUILD index 57f14c4713..ca0964ef63 100644 --- a/curvefs/src/client/BUILD +++ b/curvefs/src/client/BUILD @@ -59,6 +59,7 @@ cc_library( "//curvefs/src/client/rpcclient", "//curvefs/src/common:curvefs_common", "//curvefs/src/client/lease:curvefs_lease", + "//curvefs/src/client/kvclient:memcached_client_lib", "//external:brpc", "//external:gflags", "//external:glog", diff --git a/curvefs/src/client/common/config.cpp b/curvefs/src/client/common/config.cpp index 37f05ed65f..29440226fd 100644 --- a/curvefs/src/client/common/config.cpp +++ b/curvefs/src/client/common/config.cpp @@ -49,6 +49,7 @@ namespace common { DEFINE_bool(enableCto, true, "acheieve cto consistency"); DEFINE_bool(useFakeS3, false, "Use fake s3 to inject more metadata for testing metaserver"); +DEFINE_bool(supportKVcache, false, "use kvcache to speed up sharing"); void InitMdsOption(Configuration *conf, MdsOption *mdsOpt) { conf->GetValueFatalIfFail("mdsOpt.mdsMaxRetryMS", &mdsOpt->mdsMaxRetryMS); @@ -232,6 +233,16 @@ void InitRefreshDataOpt(Configuration *conf, &opt->refreshDataIntervalSec); } +void InitKVClientManagerOpt(Configuration *conf, + KVClientManagerOpt *config) { + conf->GetValueFatalIfFail("fuseClient.supportKVcache", + &FLAGS_supportKVcache); + conf->GetValueFatalIfFail("fuseClient.setThreadPool", + &config->setThreadPooln); + conf->GetValueFatalIfFail("fuseClient.getThreadPool", + &config->getThreadPooln); +} + void SetBrpcOpt(Configuration *conf) { curve::common::GflagsLoadValueFromConfIfCmdNotSet dummy; dummy.Load(conf, "defer_close_second", "rpc.defer.close.second", @@ -251,6 +262,7 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) { InitVolumeOption(conf, &clientOption->volumeOpt); InitLeaseOpt(conf, &clientOption->leaseOpt); InitRefreshDataOpt(conf, &clientOption->refreshDataOption); + InitKVClientManagerOpt(conf, &clientOption->kvClientManagerOpt); conf->GetValueFatalIfFail("fuseClient.attrTimeOut", &clientOption->attrTimeOut); @@ -280,6 +292,7 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) { &clientOption->disableXattr); conf->GetValueFatalIfFail("fuseClient.cto", &FLAGS_enableCto); + LOG_IF(WARNING, conf->GetBoolValue("fuseClient.enableSplice", &clientOption->enableFuseSplice)) << "Not found `fuseClient.enableSplice` in conf, use default value `" diff --git a/curvefs/src/client/common/config.h b/curvefs/src/client/common/config.h index 8a41931b73..3641d6a41f 100644 --- a/curvefs/src/client/common/config.h +++ b/curvefs/src/client/common/config.h @@ -80,6 +80,11 @@ struct SpaceAllocServerOption { uint64_t rpcTimeoutMs; }; +struct KVClientManagerOpt { + int setThreadPooln = 4; + int getThreadPooln = 4; +}; + struct DiskCacheOption { DiskCacheType diskCacheType; // cache disk dir @@ -182,6 +187,7 @@ struct FuseClientOption { VolumeOption volumeOpt; LeaseOpt leaseOpt; RefreshDataOption refreshDataOption; + KVClientManagerOpt kvClientManagerOpt; double attrTimeOut; double entryTimeOut; diff --git a/curvefs/src/client/fuse_client.cpp b/curvefs/src/client/fuse_client.cpp index 1f90f15676..93843ed76a 100644 --- a/curvefs/src/client/fuse_client.cpp +++ b/curvefs/src/client/fuse_client.cpp @@ -67,6 +67,7 @@ namespace curvefs { namespace client { namespace common { DECLARE_bool(enableCto); + } // namespace common } // namespace client } // namespace curvefs diff --git a/curvefs/src/client/fuse_s3_client.cpp b/curvefs/src/client/fuse_s3_client.cpp index 0862fe297f..da04692e60 100644 --- a/curvefs/src/client/fuse_s3_client.cpp +++ b/curvefs/src/client/fuse_s3_client.cpp @@ -20,14 +20,20 @@ * Author: xuchaojie */ -#include "curvefs/src/client/fuse_s3_client.h" #include #include + +#include "curvefs/src/client/fuse_s3_client.h" +#include "curvefs/src/client/kvclient/memcache_client.h" + namespace curvefs { namespace client { namespace common { + DECLARE_bool(enableCto); +DECLARE_bool(supportKVcache); + } // namespace common } // namespace client } // namespace curvefs @@ -35,6 +41,11 @@ DECLARE_bool(enableCto); namespace curvefs { namespace client { +using curvefs::client::common::FLAGS_supportKVcache; +using curvefs::client::common::FLAGS_enableCto; +using curvefs::mds::topology::MemcacheCluster; +using curvefs::mds::topology::MemcachedServer; + CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) { FuseClientOption opt(option); CURVEFS_ERROR ret = FuseClient::Init(opt); @@ -42,6 +53,11 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) { return ret; } + // init kvcache + if (FLAGS_supportKVcache && !InitKVCache(option.kvClientManagerOpt)) { + return CURVEFS_ERROR::INTERNAL; + } + // set fsS3Option const auto& s3Info = fsInfo_->detail().s3info(); ::curve::common::S3InfoOption fsS3Option; @@ -80,6 +96,50 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) { return ret; } + +bool FuseS3Client::InitKVCache(const KVClientManagerOpt &opt) { + // get kvcache cluster + MemcacheCluster kvcachecluster; + if (!mdsClient_->AllocOrGetMemcacheCluster(fsInfo_->fsid(), + &kvcachecluster)) { + LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache + << ", but AllocOrGetMemcacheCluster fail"; + return false; + } + + // std::string ip[3] = {"10.182.2.46", "10.182.2.47", "10.182.2.48"}; + // for (int i = 0; i < 2; i++) { + // for (int j = 0; j < 5; j++) { + // int port = 7001 + j; + // MemcachedServer *server = kvcachecluster.add_servers(); + // server->set_ip(ip[i]); + // server->set_port(port); + // } + // } + // kvcachecluster.set_clusterid(1); + + // init kvcache client + auto memcacheClient = std::make_shared(); + if (!memcacheClient->Init(kvcachecluster)) { + LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache + << ", but init memcache client fail"; + return false; + } + + // init kvcacheclient manager + // KVClientManagerOpt config; + // config.kvclient = std::move(memcacheClient); + // config.threadPooln = 4; + g_kvClientManager = new KVClientManager(); + if (!g_kvClientManager->Init(opt, memcacheClient)) { + LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache + << ", but init kvClientManager fail"; + return false; + } + + return true; +} + void FuseS3Client::GetWarmUpFileList(const WarmUpFileContext_t&warmUpFile, std::vector& warmUpFilelist) { struct fuse_file_info fi{}; @@ -536,7 +596,7 @@ CURVEFS_ERROR FuseS3Client::FuseOpFlush(fuse_req_t req, fuse_ino_t ino, CURVEFS_ERROR ret = CURVEFS_ERROR::OK; // if enableCto, flush all write cache both in memory cache and disk cache - if (curvefs::client::common::FLAGS_enableCto) { + if (FLAGS_enableCto) { ret = s3Adaptor_->FlushAllCache(ino); if (ret != CURVEFS_ERROR::OK) { LOG(ERROR) << "FuseOpFlush, flush all cache fail, ret = " << ret diff --git a/curvefs/src/client/fuse_s3_client.h b/curvefs/src/client/fuse_s3_client.h index ec2ce3b055..03a2252539 100644 --- a/curvefs/src/client/fuse_s3_client.h +++ b/curvefs/src/client/fuse_s3_client.h @@ -94,6 +94,8 @@ class FuseS3Client : public FuseClient { struct fuse_file_info *fi) override; private: + bool InitKVCache(const KVClientManagerOpt &opt); + CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override; void FlushData() override; diff --git a/curvefs/src/client/kvclient/BUILD b/curvefs/src/client/kvclient/BUILD index c24cb36762..202b6b5743 100644 --- a/curvefs/src/client/kvclient/BUILD +++ b/curvefs/src/client/kvclient/BUILD @@ -14,24 +14,26 @@ # limitations under the License. # -# load("//:copts.bzl", "CURVE_DEFAULT_COPTS") +load("//:copts.bzl", "CURVE_DEFAULT_COPTS") - - -# cc_library( -# name = "memcache_client_lib", -# srcs = glob(["*.cpp"]), -# hdrs = glob(["*.h"]), -# copts = CURVE_DEFAULT_COPTS, -# linkopts = [ -# "-lmemcached", -# ], -# visibility = ["//visibility:public"], -# deps = [ -# "@com_google_absl//absl/strings", -# "//external:glog", -# "//external:bthread", -# "//src/common:curve_common", -# "//src/common:curve_s3_adapter", -# ], -# ) +cc_library( + name = "memcached_client_lib", + srcs = glob(["*.cpp"]), + hdrs = glob(["*.h"]), + copts = CURVE_DEFAULT_COPTS, + linkopts = [ + "-L/usr/local/lib", + ], + visibility = ["//visibility:public"], + deps = [ + "//curvefs/proto:curvefs_topology_cc_proto", + "//curvefs/src/client/common:common", + "//curvefs/src/client/metric:client_metric", + "@com_google_absl//absl/strings", + "//external:glog", + "//external:bthread", + "//src/common:curve_common", + "//src/common:curve_s3_adapter", + "@libmemcached", + ], +) diff --git a/curvefs/src/client/kvclient/kvclient.h b/curvefs/src/client/kvclient/kvclient.h index 1aa0661cb7..9d7b96d857 100644 --- a/curvefs/src/client/kvclient/kvclient.h +++ b/curvefs/src/client/kvclient/kvclient.h @@ -32,32 +32,25 @@ namespace client { * Single client to kv interface. */ -class KvClient { +class KVClient { public: - KvClient() = default; - ~KvClient() = default; - virtual bool Init() = 0; + KVClient() = default; + ~KVClient() = default; - virtual void UnInit() = 0; + virtual void Init() {} + + virtual void UnInit() {} /** * @param: errorlog: if error occurred, the errorlog will take * the error info and log. * @return: success return true, else return false; */ - virtual bool Set(const std::string& key, - const char* value, - const int value_len, - std::string* errorlog) = 0; + virtual bool Set(const std::string &key, const char *value, + const uint64_t value_len, std::string *errorlog) = 0; - /** - * @param: errorlog: if return is false the errorlog - * will take the info. - * @return: success return the value and true. - */ - virtual bool Get(const std::string& key, - std::string* value, - std::string* errorlog) = 0; + virtual bool Get(const std::string &key, char *value, uint64_t offset, + uint64_t length, std::string *errorlog) = 0; }; } // namespace client diff --git a/curvefs/src/client/kvclient/kvclient_manager.cpp b/curvefs/src/client/kvclient/kvclient_manager.cpp new file mode 100644 index 0000000000..e0894baca2 --- /dev/null +++ b/curvefs/src/client/kvclient/kvclient_manager.cpp @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2022 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * Created Date: 2022-10-18 + * Author: lixiaocui + */ + +#include "curvefs/src/client/kvclient/kvclient_manager.h" +#include "src/client/client_metric.h" +#include "src/common/concurrent/count_down_event.h" + +using curve::client::LatencyGuard; +using curve::common::CountDownEvent; +using curvefs::client::metric::KVClientMetric; + +namespace curvefs { +namespace client { + +KVClientManager *g_kvClientManager = nullptr; +KVClientMetric *g_kvClientMetric = nullptr; + +#define ONRETURN(TYPE, RES, KEY, ERRORLOG) \ + if (RES) { \ + g_kvClientMetric->kvClient##TYPE.qps.count << 1; \ + VLOG(9) << "Set key = " << KEY << " OK"; \ + } else { \ + g_kvClientMetric->kvClient##TYPE.eps.count << 1; \ + LOG(ERROR) << "Set key = " << KEY << " error = " << ERRORLOG; \ + } + +bool KVClientManager::Init(const KVClientManagerOpt &config, + const std::shared_ptr &kvclient) { + client_ = kvclient; + g_kvClientMetric = new KVClientMetric(); + return threadPool_.Start(config.setThreadPooln) == 0; +} + +void KVClientManager::Uninit() { + client_->UnInit(); + threadPool_.Stop(); + delete g_kvClientMetric; +} + +void KVClientManager::Set(std::shared_ptr task) { + threadPool_.Enqueue([task, this]() { + LatencyGuard guard(&g_kvClientMetric->kvClientSet.latency); + + std::string error_log; + auto res = + client_->Set(task->key, task->value, task->length, &error_log); + ONRETURN(Set, res, task->key, error_log); + + if (task->done) { + task->done(task); + } + }); +} + +bool KVClientManager::Get(std::shared_ptr task) { + LatencyGuard guard(&g_kvClientMetric->kvClientGet.latency); + + std::string error_log; + assert(nullptr != task->value); + auto res = client_->Get(task->key, task->value, task->offset, task->length, + &error_log); + ONRETURN(Get, res, task->key, error_log); + return res; +} + + +bool KVClientManager::Get(const std::string &key, char *value, uint64_t offset, + uint64_t length) { + LatencyGuard guard(&g_kvClientMetric->kvClientGet.latency); + + std::string error_log; + auto res = client_->Get(key, value, offset, length, &error_log); + ONRETURN(Get, res, key, error_log); + return res; +} + +} // namespace client +} // namespace curvefs diff --git a/curvefs/src/client/kvclient/kvclient_manager.h b/curvefs/src/client/kvclient/kvclient_manager.h index 3ae93f7fe9..0a00835022 100644 --- a/curvefs/src/client/kvclient/kvclient_manager.h +++ b/curvefs/src/client/kvclient/kvclient_manager.h @@ -32,127 +32,72 @@ #include "absl/strings/string_view.h" #include "curvefs/src/client/kvclient/kvclient.h" +#include "curvefs/src/client/common/config.h" +#include "curvefs/src/client/metric/client_metric.h" #include "src/common/concurrent/thread_pool.h" #include "src/common/s3_adapter.h" -namespace curvefs { +using curvefs::client::metric::KVClientMetric; +namespace curvefs { namespace client { -using ::curve::common::TaskThreadPool; - -struct KvClientManagerConfig { - std::unique_ptr kvclient; - int threadPooln; -}; - -struct SetKvCacheTask; - -using SetKvCacheCallBack = - std::function)>; - -struct SetKvCacheTask { - const std::string& key; - const char* value; - const size_t len; - SetKvCacheCallBack cb; - SetKvCacheTask(const std::string& k, const char* val, const size_t length) - : key(k), value(val), len(length) {} +class KVClientManager; +class SetKVCacheTask; +using curve::common::TaskThreadPool; +using curvefs::client::common::KVClientManagerOpt; + +extern KVClientManager *g_kvClientManager; +extern KVClientMetric *g_kvClientMetric; + +typedef std::function &)> + SetKVCacheDone; + +struct SetKVCacheTask { + std::string key; + const char *value; + uint64_t length; + SetKVCacheDone done; + SetKVCacheTask() = default; + SetKVCacheTask(const std::string &k, const char *val, const uint64_t len) + : key(k), value(val), length(len) {} }; struct GetKvCacheContext { - const std::string& key; - std::string* value; - GetKvCacheContext(const std::string& k, std::string* v) - : key(k), value(v) {} + const std::string &key; + char *value; + uint64_t offset; + uint64_t length; + GetKvCacheContext(const std::string &k, char *v, uint64_t off, uint64_t len) + : key(k), value(v), offset(off), length(len) {} }; - -class KvClientManager { +class KVClientManager { public: - KvClientManager() = default; - ~KvClientManager() { Uninit(); } + KVClientManager() = default; + ~KVClientManager() { Uninit(); } - bool Init(KvClientManagerConfig* config) { - client_ = std::move(config->kvclient); - return threadPool_.Start(config->threadPooln) == 0; - } - - /** - * close the connection with kv - */ - void Uninit() { - client_->UnInit(); - threadPool_.Stop(); - } + bool Init(const KVClientManagerOpt &config, + const std::shared_ptr &kvclient); /** * It will get a db client and set the key value asynchronusly. * The set task will push threadpool, you'd better * don't get the key immediately. */ - void Set(const std::string& key, - const char* value, - const size_t value_len) { - threadPool_.Enqueue([=]() { - std::string error_log; - auto res = client_->Set(key, value, value_len, &error_log); - if (!res) { - auto val_view = absl::string_view(value, - value_len); - LOG(ERROR) << "Set key = " << key << " value = " << val_view - << " " << "vallen = " << value_len << - " " << error_log; - } - }); - } - - void Enqueue(std::shared_ptr task) { - threadPool_.Enqueue([task, this](){ - std::string error_log; - auto res = client_->Set(task->key, task->value, task->len, - &error_log); - if (!res) { - auto val_view = absl::string_view(task->value, - task->len); - LOG(ERROR) << "Set key = " << task->key << " value = " - << val_view << " " << "vallen = " << - task->len << " " << error_log; - return; - } - if (task->cb) { - task->cb(task); - } - }); - } - - bool Get(std::shared_ptr task) { - std::string error_log; - assert(nullptr != task->value); - auto res = client_->Get(task->key, task->value, &error_log); - if (!res) { - VLOG(9) << "Get Key = " << task->key << " " << error_log; - } - return res; - } + void Set(std::shared_ptr task); - /** - * get value by key. - * the value must be empty. - */ - bool Get(const std::string& key, std::string* value) { - std::string error_log; - auto res = client_->Get(key, value, &error_log); - if (!res) { - VLOG(9) << "Get Key = " << key << " " << error_log; - } - return res; - } + bool Get(const std::string &key, char *value, uint64_t offset, + uint64_t length); + + bool Get(std::shared_ptr task); + private: + void Uninit(); private: TaskThreadPool threadPool_; - std::unique_ptr client_; + std::shared_ptr client_; }; } // namespace client diff --git a/curvefs/src/client/kvclient/memcache_client.h b/curvefs/src/client/kvclient/memcache_client.h index ceec1c11aa..0aaa57d04f 100644 --- a/curvefs/src/client/kvclient/memcache_client.h +++ b/curvefs/src/client/kvclient/memcache_client.h @@ -30,16 +30,19 @@ #include #include "curvefs/src/client/kvclient/kvclient.h" +#include "curvefs/proto/topology.pb.h" namespace curvefs { namespace client { +using curvefs::mds::topology::MemcacheCluster; + /** * only the threadpool will operate the kvclient, * for threadsafe and fast, we can make every thread has a client. */ -extern thread_local memcached_st* tcli; +extern thread_local memcached_st *tcli; /** * MemCachedClient is a client to memcached cluster. You'd better @@ -56,7 +59,7 @@ extern thread_local memcached_st* tcli; * uint64_t data; * client->SetClientAttr(MEMCACHED_BEHAVIOR_NO_BLOCK, data); * client->SetClientAttr(MEMCACHED_BEHAVIOR_TCP_NODELAY, data); - * KvClientManager manager; + * KVClientManager manager; * config.kvclient = std::move(client_); * config.threadPooln = n; * manager.Init(&conf); @@ -72,17 +75,31 @@ extern thread_local memcached_st* tcli; * manager.Unint(); */ -class MemCachedClient : public KvClient { +class MemCachedClient : public KVClient { public: MemCachedClient() : server_(nullptr) { client_ = memcached_create(nullptr); } - explicit MemCachedClient(memcached_st* cli) : client_(cli) {} + explicit MemCachedClient(memcached_st *cli) : client_(cli) {} ~MemCachedClient() { UnInit(); } - bool Init() override { + bool Init(const MemcacheCluster &kvcachecluster) { client_ = memcached(nullptr, 0); - return client_ != nullptr; + + for (int i = 0; i < kvcachecluster.servers_size(); i++) { + if (!AddServer(kvcachecluster.servers(i).ip(), + kvcachecluster.servers(i).port())) { + return false; + } + } + + memcached_behavior_set(client_, MEMCACHED_BEHAVIOR_DISTRIBUTION, + MEMCACHED_DISTRIBUTION_CONSISTENT); + memcached_behavior_set(client_, MEMCACHED_BEHAVIOR_RETRY_TIMEOUT, 5); + memcached_behavior_set(client_, + MEMCACHED_BEHAVIOR_REMOVE_FAILED_SERVERS, 1); + + return PushServer(); } void UnInit() override { @@ -92,15 +109,12 @@ class MemCachedClient : public KvClient { } } - bool Set(const std::string& key, - const char* value, - const int value_len, - std::string* errorlog) override { + bool Set(const std::string &key, const char *value, + const uint64_t value_len, std::string *errorlog) override { if (nullptr == tcli) { tcli = memcached_clone(nullptr, client_); } - auto res = memcached_set(tcli, key.c_str(), - key.length(), value, + auto res = memcached_set(tcli, key.c_str(), key.length(), value, value_len, 0, 0); if (MEMCACHED_SUCCESS == res) { return true; @@ -109,9 +123,8 @@ class MemCachedClient : public KvClient { return false; } - bool Get(const std::string& key, - std::string* value, - std::string* errorlog) override { + bool Get(const std::string &key, char *value, uint64_t offset, + uint64_t length, std::string *errorlog) override { if (nullptr == tcli) { // multi thread use a memcached_st* client is unsafe. // should clone it or use memcached_st_pool. @@ -120,15 +133,14 @@ class MemCachedClient : public KvClient { uint32_t flags = 0; size_t value_length = 0; memcached_return_t ue; - char* res = memcached_get(tcli, key.c_str(), key.length(), + char *res = memcached_get(tcli, key.c_str(), key.length(), &value_length, &flags, &ue); - if (res != nullptr && value->empty()) { - value->reserve(value_length + 1); - value->assign(res, res + value_length + 1); - value->resize(value_length); + if (res != nullptr && value) { + memcpy(value, res + offset, length); free(res); return true; } + *errorlog = ResError(ue); return false; } @@ -138,35 +150,11 @@ class MemCachedClient : public KvClient { return memcached_strerror(nullptr, res); } - /** - * for memcached client, you can set some attribute to it. - * @param: flag: the memcached offical doc indicates some attribute - * is not keep maintain. here only give some commonly used. - * more details here - * http://docs.libmemcached.org/memcached_behavior.html#memcached_behavior_set - * MEMCACHED_BEHAVIOR_NO_BLOCK: Causes libmemcached(3) to use asychronous - * IO. This is the fastest transport available for storage functions. - * MEMCACHED_BEHAVIOR_TCP_NODELAY: Turns on the no-delay feature for - * connecting sockets (may be faster in some environments). - */ - bool SetClientAttr(memcached_behavior_t flag, uint64_t data) { - auto res = memcached_behavior_set(client_, flag, data); - if (MEMCACHED_SUCCESS == res) { - return true; - } - LOG(ERROR) << "client setattr " << ResError(res); - return false; - } - - uint64_t GetClientAttr(memcached_behavior_t flag) { - return memcached_behavior_get(client_, flag); - } - /** * @brief: add a remote memcache server to client, * this means just add, you must use push after all server add. */ - bool AddServer(const std::string& hostname, const uint32_t port) { + bool AddServer(const std::string &hostname, const uint32_t port) { memcached_return_t res; server_ = memcached_server_list_append(server_, hostname.c_str(), port, &res); @@ -198,8 +186,8 @@ class MemCachedClient : public KvClient { } private: - memcached_server_st* server_; - memcached_st* client_; + memcached_server_st *server_; + memcached_st *client_; }; } // namespace client diff --git a/curvefs/src/client/metric/client_metric.h b/curvefs/src/client/metric/client_metric.h index 721807b8a3..9e4cddd8e7 100644 --- a/curvefs/src/client/metric/client_metric.h +++ b/curvefs/src/client/metric/client_metric.h @@ -275,6 +275,15 @@ struct DiskCacheMetric { diskUsedBytes(prefix, fsName + "_diskcache_usedbytes", 0) {} }; +struct KVClientMetric { + const std::string prefix = "curvefs_kvclient"; + InterfaceMetric kvClientSet; + InterfaceMetric kvClientGet; + + KVClientMetric() + : kvClientGet(prefix, "get"), kvClientSet(prefix, "set") {} +}; + struct S3ChunkInfoMetric { const std::string prefix = "inode_s3_chunk_info"; diff --git a/curvefs/src/client/rpcclient/mds_client.cpp b/curvefs/src/client/rpcclient/mds_client.cpp index 8efd7629ef..39f31ad0fd 100644 --- a/curvefs/src/client/rpcclient/mds_client.cpp +++ b/curvefs/src/client/rpcclient/mds_client.cpp @@ -606,6 +606,12 @@ FSStatusCode MdsClientImpl::CommitTxWithLock( return CommitTx(request); } + +bool MdsClientImpl::AllocOrGetMemcacheCluster( + uint32_t fsId, curvefs::mds::topology::MemcacheCluster *cluster) { + return false; +} + FSStatusCode MdsClientImpl::ReturnError(int retcode) { // rpc error convert to FSStatusCode::RPC_ERROR if (retcode < 0) { diff --git a/curvefs/src/client/rpcclient/mds_client.h b/curvefs/src/client/rpcclient/mds_client.h index 731d8337e3..6f5fa1b5d3 100644 --- a/curvefs/src/client/rpcclient/mds_client.h +++ b/curvefs/src/client/rpcclient/mds_client.h @@ -131,6 +131,10 @@ class MdsClient { const std::string& uuid, uint64_t sequence) = 0; + // get memcache cluster config + virtual bool AllocOrGetMemcacheCluster( + uint32_t fsId, curvefs::mds::topology::MemcacheCluster *cluster) = 0; + // allocate block group virtual SpaceErrCode AllocateVolumeBlockGroup( uint32_t fsId, @@ -215,6 +219,11 @@ class MdsClientImpl : public MdsClient { const std::string& uuid, uint64_t sequence) override; + // get memcache cluster config + bool + AllocOrGetMemcacheCluster(uint32_t fsId, + curvefs::mds::topology::MemcacheCluster *cluster); + // allocate block group SpaceErrCode AllocateVolumeBlockGroup( uint32_t fsId, diff --git a/curvefs/src/client/s3/client_s3_adaptor.cpp b/curvefs/src/client/s3/client_s3_adaptor.cpp index 5294108991..c39e79c5f5 100644 --- a/curvefs/src/client/s3/client_s3_adaptor.cpp +++ b/curvefs/src/client/s3/client_s3_adaptor.cpp @@ -372,7 +372,9 @@ CURVEFS_ERROR S3ClientAdaptorImpl::FlushAllCache(uint64_t inodeId) { } // force flush data in diskcache to s3 - if (HasDiskCache()) { + if (!g_kvClientManager && HasDiskCache()) { + VLOG(6) << "FlushAllCache, wait inodeId:" << inodeId + << "related chunk upload to s3"; if (ClearDiskCache(inodeId) < 0) { return CURVEFS_ERROR::INTERNAL; } diff --git a/curvefs/src/client/s3/client_s3_cache_manager.cpp b/curvefs/src/client/s3/client_s3_cache_manager.cpp index 7653e0ac2a..db847d4058 100644 --- a/curvefs/src/client/s3/client_s3_cache_manager.cpp +++ b/curvefs/src/client/s3/client_s3_cache_manager.cpp @@ -28,6 +28,7 @@ #include "curvefs/src/client/s3/client_s3_adaptor.h" #include "curvefs/src/common/s3util.h" #include "curvefs/src/client/metric/client_metric.h" +#include "curvefs/src/client/kvclient/kvclient_manager.h" namespace curvefs { namespace client { @@ -94,8 +95,8 @@ FsCacheManager::FindOrCreateFileCacheManager(uint64_t fsId, uint64_t inodeId) { return it->second; } - FileCacheManagerPtr fileCacheManager = - std::make_shared(fsId, inodeId, s3ClientAdaptor_); + FileCacheManagerPtr fileCacheManager = std::make_shared( + fsId, inodeId, s3ClientAdaptor_); auto ret = fileCacheManagerMap_.emplace(inodeId, fileCacheManager); g_s3MultiManagerMetric->fileManagerNum << 1; assert(ret.second); @@ -300,315 +301,345 @@ FileCacheManager::FindOrCreateChunkCacheManager(uint64_t index) { return chunkCacheManager; } -int FileCacheManager::Read(uint64_t inodeId, uint64_t offset, uint64_t length, - char *dataBuf) { - uint64_t chunkSize = s3ClientAdaptor_->GetChunkSize(); - uint64_t index = offset / chunkSize; - uint64_t chunkPos = offset % chunkSize; - uint64_t readLen = 0; - int ret = 0; - uint64_t readOffset = 0; - std::vector totalRequests; +void FileCacheManager::GetChunkLoc(uint64_t offset, uint64_t *index, + uint64_t *chunkPos, uint64_t *chunkSize) { + *chunkSize = s3ClientAdaptor_->GetChunkSize(); + *index = offset / *chunkSize; + *chunkPos = offset % *chunkSize; +} + +void FileCacheManager::GetBlockLoc(uint64_t offset, uint64_t *chunkIndex, + uint64_t *chunkPos, uint64_t *blockIndex, + uint64_t *blockPos) { + uint64_t chunkSize = 0; + uint64_t blockSize = s3ClientAdaptor_->GetBlockSize(); + GetChunkLoc(offset, chunkIndex, chunkPos, &chunkSize); + + *blockIndex = offset % chunkSize / blockSize; + *blockPos = offset % chunkSize % blockSize; +} + +void FileCacheManager::ReadFromMemCache( + uint64_t offset, uint64_t length, char *dataBuf, uint64_t *actualReadLen, + std::vector *memCacheMissRequest) { - // Find offset~len in the write and read cache, - // and The parts that are not in the cache are placed in the totalRequests + uint64_t index = 0, chunkPos = 0, chunkSize = 0; + GetChunkLoc(offset, &index, &chunkPos, &chunkSize); + + uint64_t currentReadLen = 0; + uint64_t dataBufferOffset = 0; while (length > 0) { - std::vector requests; - if (chunkPos + length > chunkSize) { - readLen = chunkSize - chunkPos; - } else { - readLen = length; - } + // |--------------------------------| + // 0 chunksize + // chunkPos length + chunkPos + // |-------------------------| + // |--------------| + // currentReadLen + currentReadLen = + chunkPos + length > chunkSize ? chunkSize - chunkPos : length; + + + // 1. read from local cache + // 2. generate cache miss request ChunkCacheManagerPtr chunkCacheManager = - FindOrCreateChunkCacheManager(index); - chunkCacheManager->ReadChunk(index, chunkPos, readLen, dataBuf, - readOffset, &requests); - totalRequests.insert(totalRequests.end(), requests.begin(), - requests.end()); - length -= readLen; - index++; - readOffset += readLen; - chunkPos = (chunkPos + readLen) % chunkSize; - } + FindOrCreateChunkCacheManager(index); + std::vector tmpMissRequests; + chunkCacheManager->ReadChunk(index, chunkPos, currentReadLen, dataBuf, + dataBufferOffset, &tmpMissRequests); + memCacheMissRequest->insert(memCacheMissRequest->end(), + tmpMissRequests.begin(), + tmpMissRequests.end()); - if (totalRequests.empty()) { - VLOG(3) << "read cache is all the hits."; - return readOffset; + length -= currentReadLen; // left length + index++; // next index + dataBufferOffset += currentReadLen; // next data buffer offset + chunkPos = (chunkPos + currentReadLen) % chunkSize; // next chunkPos } - { - uint32_t maxIntervalMs = - s3ClientAdaptor_->GetMaxReadRetryIntervalMs(); // hardcode, fixme - uint32_t retryIntervalMs = s3ClientAdaptor_->GetReadRetryIntervalMs(); - uint32_t retry = 0; - std::shared_ptr inodeWrapper; - auto inodeManager = s3ClientAdaptor_->GetInodeCacheManager(); - CURVEFS_ERROR r = inodeManager->GetInode(inodeId, inodeWrapper); - if (r != CURVEFS_ERROR::OK) { - LOG(WARNING) << "get inode fail, ret:" << ret; - return -1; - } - std::vector responses; - while (1) { - std::vector totalS3Requests; - auto iter = totalRequests.begin(); - uint64_t fileLen; - { - ::curve::common::UniqueLock lgGuard = - inodeWrapper->GetUniqueLock(); - const Inode* inode = inodeWrapper->GetInodeLocked(); - const auto* s3chunkinfo = inodeWrapper->GetChunkInfoMap(); - VLOG(9) << "FileCacheManager::Read Inode: " - << inode->DebugString(); - fileLen = inode->length(); - for (; iter != totalRequests.end(); iter++) { - VLOG(6) << "ReadRequest index:" << iter->index - << ",chunkPos:" << iter->chunkPos - << ",len:" << iter->len - << ",bufOffset:" << iter->bufOffset; - auto s3InfoListIter = - s3chunkinfo->find(iter->index); - if (s3InfoListIter == s3chunkinfo->end()) { - VLOG(6) - << "s3infolist is not found.index:" << iter->index; - memset(dataBuf + iter->bufOffset, 0, iter->len); - continue; - } - std::vector s3Requests; - GenerateS3Request(*iter, s3InfoListIter->second, dataBuf, - &s3Requests, inode->fsid(), - inode->inodeid()); - totalS3Requests.insert(totalS3Requests.end(), - s3Requests.begin(), - s3Requests.end()); - } - } - if (totalS3Requests.empty()) { - VLOG(6) << "s3 has not data to read."; - return readOffset; - } + *actualReadLen = dataBufferOffset; - uint32_t i; - for (i = 0; i < totalS3Requests.size(); i++) { - S3ReadRequest& tmp_req = totalS3Requests[i]; - VLOG(9) << "S3ReadRequest chunkid:" << tmp_req.chunkId - << ",offset:" << tmp_req.offset - << ",len:" << tmp_req.len - << ",objectOffset:" << tmp_req.objectOffset - << ",readOffset:" << tmp_req.readOffset - << ",compaction:" << tmp_req.compaction - << ",fsid:" << tmp_req.fsId - << ",inodeId:" << tmp_req.inodeId; - } + VLOG_IF(3, memCacheMissRequest->empty()) + << "greate! memory cache all hit."; +} - ret = ReadFromS3(totalS3Requests, &responses, dataBuf, fileLen); - if (ret < 0) { - retry++; - responses.clear(); - if (ret != -2) { - LOG(ERROR) << "read from s3 failed. ret:" << ret; - return ret; - } else { - // ret -2 refs s3obj not exist, first It may be that - // the metaserver compaction update inode is not - // synchronized to the client;so clear inodecache && get - // again;If it returns -2 multiple times, it may be that the - // data of a client has not been flushed back to s3, and you - // only need to keep retrying. - if (1 == retry) { - LOG(INFO) - << "inode cache maybe stale, try to get latest"; - curve::common::UniqueLock lgGuard = - inodeWrapper->GetUniqueLock(); - auto r = inodeWrapper->RefreshS3ChunkInfo(); - if (r != CURVEFS_ERROR::OK) { - LOG(WARNING) << "refresh inode fail, ret:" << ret; - return -1; - } - } - if (retry * retryIntervalMs < maxIntervalMs) { - LOG(WARNING) << "read retry num:" << retry; - bthread_usleep(retryIntervalMs * retry * 1000); - } else { - LOG(WARNING) << "retry is reach max interval ms:" - << maxIntervalMs; - bthread_usleep(maxIntervalMs * 1000); - } - } +int FileCacheManager::GenerateKVReuqest( + const std::shared_ptr &inodeWrapper, + const std::vector &readRequest, char *dataBuf, + std::vector *kvRequest) { + + ::curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); + const Inode *inode = inodeWrapper->GetInodeLocked(); + const auto *s3chunkinfo = inodeWrapper->GetChunkInfoMap(); + VLOG(9) << "process inode: " << inode->DebugString(); + for_each( + readRequest.begin(), readRequest.end(), [&](const ReadRequest &req) { + VLOG(6) << req.DebugString(); + auto infoIter = s3chunkinfo->find(req.index); + if (infoIter == s3chunkinfo->end()) { + VLOG(6) << "inode = " << inode->inodeid() + << " s3chunkinfo do not find index = " << req.index; + memset(dataBuf + req.bufOffset, 0, req.len); + return; } else { - break; + std::vector tmpKVRequests; + GenerateS3Request(req, infoIter->second, dataBuf, + &tmpKVRequests, inode->fsid(), + inode->inodeid()); + kvRequest->insert(kvRequest->end(), tmpKVRequests.begin(), + tmpKVRequests.end()); } + }); + + VLOG(9) << "process inode: " << S3ReadRequestVecDebugString(*kvRequest) + << " ok"; + + return 0; +} + +int FileCacheManager::HandleReadS3NotExist( + int ret, uint32_t retry, + const std::shared_ptr &inodeWrapper) { + uint32_t maxIntervalMs = + s3ClientAdaptor_->GetMaxReadRetryIntervalMs(); // hardcode, fixme + uint32_t retryIntervalMs = s3ClientAdaptor_->GetReadRetryIntervalMs(); + + if (retry == 1) { + curve::common::UniqueLock lgGuard = inodeWrapper->GetUniqueLock(); + if (CURVEFS_ERROR::OK != inodeWrapper->RefreshS3ChunkInfo()) { + LOG(ERROR) << "refresh inode = " << inodeWrapper->GetInodeId() + << " fail"; + return -1; } + } else if (retry * retryIntervalMs < maxIntervalMs) { + LOG(WARNING) << "read inode = " << inodeWrapper->GetInodeId() + << " retry = " << retry; + bthread_usleep(retryIntervalMs * retry * 1000); + } else { + LOG(WARNING) << "read inode = " << inodeWrapper->GetInodeId() + << " retry = " << retry + << ", reach max interval = " << maxIntervalMs << " ms"; + bthread_usleep(maxIntervalMs * 1000); } - return readOffset; + return 0; } -int FileCacheManager::ReadFromS3(const std::vector &requests, - std::vector *responses, - char* dataBuf, uint64_t fileLen) { - uint64_t chunkSize = s3ClientAdaptor_->GetChunkSize(); - uint64_t blockSize = s3ClientAdaptor_->GetBlockSize(); - std::vector::const_iterator iter = requests.begin(); - std::atomic pendingReq(0); - curve::common::CountDownEvent cond(1); - bool async = false; - // first is chunkIndex, second is vector chunkPos - std::map> dataCacheMap; - GetObjectAsyncCallBack cb = - [&](const S3Adapter *adapter, - const std::shared_ptr &context) { - if (context->retCode == 0) { - pendingReq.fetch_sub(1, std::memory_order_relaxed); - cond.Signal(); - return; +int FileCacheManager::Read(uint64_t inodeId, uint64_t offset, uint64_t length, + char *dataBuf) { + // 1. read from memory cache + uint64_t actualReadLen = 0; + std::vector memCacheMissRequest; + ReadFromMemCache(offset, length, dataBuf, &actualReadLen, + &memCacheMissRequest); + if (memCacheMissRequest.empty()) { + return actualReadLen; + } + + + // 2. read from s3 (localcache -> s3) + std::shared_ptr inodeWrapper; + auto inodeManager = s3ClientAdaptor_->GetInodeCacheManager(); + if (CURVEFS_ERROR::OK != inodeManager->GetInode(inodeId, inodeWrapper)) { + LOG(ERROR) << "get inode = " << inodeId << " fail"; + return -1; + } + + uint32_t retry = 0; + do { + // generate kv request + std::vector kvRequest; + GenerateKVReuqest(inodeWrapper, memCacheMissRequest, dataBuf, + &kvRequest); + + // read from s3 + int ret = ReadKVRequest(kvRequest, dataBuf, inodeWrapper->GetLength()); + if (ret >= 0) { + // read from s3 ok + break; + } else if (ret == -2) { + // TODO(@anybody): ret should replace the current number with a + // meaningful error code + // read from s3 not exist + // 1. may be the metaserver compaction update inode is not + // synchronized to the client. clear inodecache && get agin + // 2. if it returns -2 multiple times, it may be the data of a + // client has not been flushed back to s3, and need keep + // retrying. + if (0 != HandleReadS3NotExist(ret, retry++, inodeWrapper)) { + return -1; } + } else { + LOG(INFO) << "read inode = " << inodeId + << " from s3 failed, ret = " << ret; + return ret; + } + } while (1); - LOG(WARNING) << "Get Object failed, key: " << context->key - << ", offset: " << context->offset; - s3ClientAdaptor_->GetS3Client()->DownloadAsync(context); - }; + return actualReadLen; +} - for (; iter != requests.end(); iter++) { - uint64_t blockIndex = iter->offset % chunkSize / blockSize; - uint64_t blockPos = iter->offset % chunkSize % blockSize; - uint64_t chunkIndex = iter->offset / chunkSize; - uint64_t chunkPos = iter->offset % chunkSize; - uint64_t len = iter->len; - uint64_t n = 0; - uint64_t readOffset = 0; - uint64_t objectOffset = iter->objectOffset; - - std::vector &dataCacheVec = dataCacheMap[chunkIndex]; - dataCacheVec.push_back(chunkPos); - S3ReadResponse response(dataBuf + iter->readOffset, len); - VLOG(6) << "HandleReadRequest blockPos:" << blockPos << ",len:" << len - << ",blockIndex:" << blockIndex - << ",objectOffset:" << objectOffset << ",chunkid" - << iter->chunkId << ",fsid" << iter->fsId - << ",inodeId:" << iter->inodeId; - // prefetch read - // TODO(huyao): The read-ahead trigger logic needs to be refactored and - // supplemented with unit tests - if (s3ClientAdaptor_->HasDiskCache()) { - uint64_t blockIndexTmp = blockIndex; - // the counts of blocks that need prefetch - uint32_t prefetchBlocks = s3ClientAdaptor_->GetPrefetchBlocks(); - std::vector> prefetchObjs; - for (int count = 0; count < prefetchBlocks; count++) { - std::string name = curvefs::common::s3util::GenObjName( - iter->chunkId, blockIndexTmp, iter->compaction, iter->fsId, - iter->inodeId); - uint64_t readLen = (blockIndexTmp + 1) * blockSize; - if (readLen > fileLen) { - VLOG(6) << "end, redLen :" << readLen - << ", fileLen: " << fileLen << ", blockIndexTmp " - << blockIndexTmp; - readLen = fileLen - blockIndexTmp * blockSize; - prefetchObjs.push_back(std::make_pair(name, readLen)); - break; - } else { - prefetchObjs.push_back(std::make_pair(name, blockSize)); - } +bool FileCacheManager::ReadKVRequestFromLocalCache(const std::string &name, + char *databuf, + uint64_t offset, + uint64_t len) { + uint64_t start = butil::cpuwide_time_us(); - blockIndexTmp++; - if (blockIndexTmp >= chunkSize / blockSize) { - break; - } - } - // prefetch object from s3 - PrefetchS3Objs(prefetchObjs); + bool mayCached = s3ClientAdaptor_->HasDiskCache() && + s3ClientAdaptor_->GetDiskCacheManager()->IsCached(name); + if (!mayCached) { + return false; + } + + if (0 > s3ClientAdaptor_->GetDiskCacheManager()->Read(name, databuf, offset, + len)) { + LOG(WARNING) << "object " << name << " not cached in disk"; + return false; + } + + if (s3ClientAdaptor_->s3Metric_.get()) { + s3ClientAdaptor_->CollectMetrics( + &s3ClientAdaptor_->s3Metric_->adaptorReadS3, len, start); + } + return true; +} + +bool FileCacheManager::ReadKVRequestFromRemoteCache(const std::string &name, + char *databuf, + uint64_t offset, + uint64_t length) { + if (!g_kvClientManager) { + return false; + } + + return g_kvClientManager->Get(name, databuf, offset, length); +} + +bool FileCacheManager::ReadKVRequestFromS3(const std::string &name, + char *databuf, uint64_t offset, + uint64_t length, int *ret) { + uint64_t start = butil::cpuwide_time_us(); + *ret = s3ClientAdaptor_->GetS3Client()->Download(name, databuf, offset, + length); + if (*ret < 0) { + LOG(ERROR) << "object " << name << " read from s3 fail, ret = " << *ret; + return false; + } else if (s3ClientAdaptor_->s3Metric_.get()) { + s3ClientAdaptor_->CollectMetrics( + &s3ClientAdaptor_->s3Metric_->adaptorReadS3, length, start); + } + + return true; +} + +int FileCacheManager::ReadKVRequest( + const std::vector &kvRequests, char *dataBuf, + uint64_t fileLen) { + + for (auto req = kvRequests.begin(); req != kvRequests.end(); req++) { + VLOG(6) << "read from kv request " << req->DebugString(); + + uint64_t chunkIndex = 0, chunkPos = 0, blockIndex = 0, blockPos = 0; + uint64_t chunkSize = s3ClientAdaptor_->GetChunkSize(); + uint64_t blockSize = s3ClientAdaptor_->GetBlockSize(); + GetBlockLoc(req->offset, &chunkIndex, &chunkPos, &blockIndex, + &blockPos); + + // prefetch + if (s3ClientAdaptor_->HasDiskCache()) { + PrefetchForBlock(*req, fileLen, blockSize, chunkSize, blockIndex); } - while (len > 0) { - if (blockPos + len > blockSize) { - n = blockSize - blockPos; - } else { - n = len; - } + + // read request + // |--------------------------------|----------------------------------| + // 0 blockSize 2*blockSize + // blockPos length + blockPos + // |-------------------------| + // |--------------| + // currentReadLen + uint64_t length = req->len; + uint64_t currentReadLen = 0; + uint64_t readBufOffset = 0; + uint64_t objectOffset = req->objectOffset; + + while (length > 0) { + int ret = 0; + currentReadLen = + length + blockPos > blockSize ? blockSize - blockPos : length; assert(blockPos >= objectOffset); std::string name = curvefs::common::s3util::GenObjName( - iter->chunkId, blockIndex, iter->compaction, iter->fsId, - iter->inodeId); - uint64_t start = butil::cpuwide_time_us(); - if (async) { - VLOG(9) << "async read s3"; - auto context = std::make_shared(); - context->key = name; - context->buf = response.GetDataBuf() + readOffset; - context->offset = blockPos - objectOffset; - context->len = n; - context->cb = cb; - pendingReq.fetch_add(1, std::memory_order_relaxed); - s3ClientAdaptor_->GetS3Client()->DownloadAsync(context); + req->chunkId, blockIndex, req->compaction, req->fsId, + req->inodeId); + char *currentBuf = dataBuf + req->readOffset + readBufOffset; + + // read from localcache -> remotecache -> s3 + if (ReadKVRequestFromLocalCache(name, currentBuf, + blockPos - objectOffset, + currentReadLen)) { + VLOG(9) << "read " << name << " from local cache ok"; + } else if (ReadKVRequestFromRemoteCache(name, currentBuf, + blockPos - objectOffset, + currentReadLen)) { + VLOG(9) << "read " << name << " from remote cache ok"; + } else if (ReadKVRequestFromS3(name, currentBuf, + blockPos - objectOffset, + currentReadLen, &ret)) { + VLOG(9) << "read " << name << " from s3 ok"; } else { - VLOG(9) << "sync read s3"; - int ret = 0; - if (s3ClientAdaptor_->HasDiskCache() && - s3ClientAdaptor_->GetDiskCacheManager()->IsCached(name)) { - VLOG(9) << "cached in disk: " << name; - ret = s3ClientAdaptor_->GetDiskCacheManager()->Read( - name, response.GetDataBuf() + readOffset, - blockPos - objectOffset, n); - if (s3ClientAdaptor_->s3Metric_.get() != nullptr) { - s3ClientAdaptor_->CollectMetrics( - &s3ClientAdaptor_->s3Metric_->adaptorReadDiskCache, - n, start); - } - } else { - VLOG(9) << "not cached in disk: " << name; - ret = s3ClientAdaptor_->GetS3Client()->Download( - name, response.GetDataBuf() + readOffset, - blockPos - objectOffset, n); - if (s3ClientAdaptor_->s3Metric_.get() != nullptr) { - s3ClientAdaptor_->CollectMetrics( - &s3ClientAdaptor_->s3Metric_->adaptorReadS3, n, - start); - } - } - if (ret < 0) { - LOG(ERROR) << "get obj failed, name is: " << name - << ", offset is: " << blockPos - << ", objoffset is: " << objectOffset - << ", len: " << n << ", ret is: " << ret; - return ret; - } + LOG(ERROR) << "read " << name << " fail"; + return ret; } - len -= n; - readOffset += n; - blockIndex++; - blockPos = (blockPos + n) % blockSize; - objectOffset = 0; + // update param + { + length -= currentReadLen; // Remaining read data length + readBufOffset += currentReadLen; // next read offset + blockIndex++; + blockPos = (blockPos + currentReadLen) % blockSize; + objectOffset = 0; + } } - VLOG(6) << "readOffset:" << iter->readOffset - << ",response len:" << response.GetBufLen() - << ",bufLen:" << readOffset; - responses->emplace_back(std::move(response)); - } - while (pendingReq.load(std::memory_order_acquire)) { - cond.Wait(); - } - uint32_t i = 0; - for (auto &dataCacheMapIter : dataCacheMap) { - ChunkCacheManagerPtr chunkCacheManager = - FindOrCreateChunkCacheManager(dataCacheMapIter.first); - std::vector &DataCacheVec = dataCacheMapIter.second; - WriteLockGuard writeLockGuard(chunkCacheManager->rwLockChunk_); - for (auto &chunkPos : DataCacheVec) { + // add data to memory read cache + if (!curvefs::client::common::FLAGS_enableCto) { + auto chunkCacheManager = FindOrCreateChunkCacheManager(chunkIndex); + WriteLockGuard writeLockGuard(chunkCacheManager->rwLockChunk_); DataCachePtr dataCache = std::make_shared( - s3ClientAdaptor_, chunkCacheManager, chunkPos, - (*responses)[i].GetBufLen(), (*responses)[i].GetDataBuf()); - if (!curvefs::client::common::FLAGS_enableCto) { - chunkCacheManager->AddReadDataCache(dataCache); - } - i++; + s3ClientAdaptor_, chunkCacheManager, chunkPos, req->len, + dataBuf + req->readOffset); + chunkCacheManager->AddReadDataCache(dataCache); } } return 0; } +void FileCacheManager::PrefetchForBlock(const S3ReadRequest &req, + uint64_t fileLen, uint64_t blockSize, + uint64_t chunkSize, + uint64_t startBlockIndex) { + uint32_t prefetchBlocks = s3ClientAdaptor_->GetPrefetchBlocks(); + std::vector> prefetchObjs; + + uint64_t blockIndex = startBlockIndex; + for (uint32_t i = 0; i < prefetchBlocks; i++) { + std::string name = curvefs::common::s3util::GenObjName( + req.chunkId, blockIndex, req.compaction, req.fsId, req.inodeId); + uint64_t maxReadLen = (blockIndex + 1) * blockSize; + uint64_t needReadLen = maxReadLen > fileLen + ? fileLen - blockIndex * blockSize + : blockSize; + + prefetchObjs.push_back(std::make_pair(name, needReadLen)); + + blockIndex++; + if (maxReadLen > fileLen || blockIndex >= chunkSize / blockSize) { + break; + } + } + + PrefetchS3Objs(prefetchObjs); +} + class AsyncPrefetchCallback { public: AsyncPrefetchCallback(uint64_t inode, S3ClientAdaptorImpl *s3Client) @@ -1047,10 +1078,14 @@ void ChunkCacheManager::ReadChunk(uint64_t index, uint64_t chunkPos, std::vector *requests) { ReadLockGuard readLockGuard(rwLockChunk_); std::vector cacheMissWriteRequests, cacheMissFlushDataRequest; - ReadByWriteCache(chunkPos, readLen, dataBuf, - dataBufOffset, &cacheMissWriteRequests); + // read by write cache + ReadByWriteCache(chunkPos, readLen, dataBuf, dataBufOffset, + &cacheMissWriteRequests); + + // read by flushing data cache flushingDataCacheMtx_.lock(); if (!IsFlushDataEmpty()) { + // read by flushing data cache for (auto request : cacheMissWriteRequests) { std::vector tmpRequests; ReadByFlushData(request.chunkPos, request.len, @@ -1061,6 +1096,8 @@ void ChunkCacheManager::ReadChunk(uint64_t index, uint64_t chunkPos, tmpRequests.end()); } flushingDataCacheMtx_.unlock(); + + // read by read cache for (auto request : cacheMissFlushDataRequest) { std::vector tmpRequests; ReadByReadCache(request.chunkPos, request.len, @@ -1072,6 +1109,8 @@ void ChunkCacheManager::ReadChunk(uint64_t index, uint64_t chunkPos, return; } flushingDataCacheMtx_.unlock(); + + // read by read cache for (auto request : cacheMissWriteRequests) { std::vector tmpRequests; ReadByReadCache(request.chunkPos, request.len, @@ -1080,6 +1119,7 @@ void ChunkCacheManager::ReadChunk(uint64_t index, uint64_t chunkPos, requests->insert(requests->end(), tmpRequests.begin(), tmpRequests.end()); } + return; } @@ -1683,7 +1723,8 @@ void ChunkCacheManager::AddWriteDataCacheForTest(DataCachePtr dataCache) { DataCache::DataCache(S3ClientAdaptorImpl *s3ClientAdaptor, ChunkCacheManagerPtr chunkCacheManager, uint64_t chunkPos, uint64_t len, const char *data) - : s3ClientAdaptor_(s3ClientAdaptor), chunkCacheManager_(chunkCacheManager), + : s3ClientAdaptor_(std::move(s3ClientAdaptor)), + chunkCacheManager_(chunkCacheManager), status_(DataCacheStatus::Dirty), inReadCache_(false) { uint64_t blockSize = s3ClientAdaptor->GetBlockSize(); uint32_t pageSize = s3ClientAdaptor->GetPageSize(); @@ -2179,166 +2220,188 @@ void DataCache::CopyDataCacheToBuf(uint64_t offset, uint64_t len, char *data) { } CURVEFS_ERROR DataCache::Flush(uint64_t inodeId, bool toS3) { - uint64_t blockSize = s3ClientAdaptor_->GetBlockSize(); + VLOG(9) << "DataCache Flush. chunkPos=" << chunkPos_ << ", len=" << len_ + << ", chunkIndex=" << chunkCacheManager_->GetIndex() + << ", inodeId=" << inodeId; + + // generate flush task + std::vector> s3Tasks; + std::vector> kvCacheTasks; + char *data = new (std::nothrow) char[len_]; + if (!data) { + LOG(ERROR) << "new data failed."; + return CURVEFS_ERROR::INTERNAL; + } + CopyDataCacheToBuf(0, len_, data); + uint64_t writeOffset = 0; + uint64_t chunkId = 0; + CURVEFS_ERROR ret = PrepareFlushTasks( + inodeId, data, &s3Tasks, &kvCacheTasks, &chunkId, &writeOffset); + if (CURVEFS_ERROR::OK != ret) { + return ret; + } + + FlushTaskExecute(GetCachePolicy(toS3), s3Tasks, kvCacheTasks); + delete[] data; + + // inode ship to flush + std::shared_ptr inodeWrapper; + ret = s3ClientAdaptor_->GetInodeCacheManager()->GetInode( + inodeId, inodeWrapper); + if (ret != CURVEFS_ERROR::OK) { + LOG(WARNING) << "get inode fail, ret:" << ret; + status_.store(DataCacheStatus::Dirty, std::memory_order_release); + return ret; + } + + S3ChunkInfo info; + uint64_t chunkIndex = chunkCacheManager_->GetIndex(); uint64_t chunkSize = s3ClientAdaptor_->GetChunkSize(); - uint32_t flushIntervalSec = s3ClientAdaptor_->GetFlushInterval(); + int64_t offset = chunkIndex * chunkSize + chunkPos_; + PrepareS3ChunkInfo(chunkId, offset, writeOffset, &info); + inodeWrapper->AppendS3ChunkInfo(chunkIndex, info); + s3ClientAdaptor_->GetInodeCacheManager()->ShipToFlush(inodeWrapper); + + return CURVEFS_ERROR::OK; +} + +CURVEFS_ERROR DataCache::PrepareFlushTasks(uint64_t inodeId, + char *data, std::vector> *s3Tasks, + std::vector> *kvCacheTasks, + uint64_t *chunkId, uint64_t *writeOffset) { + // allocate chunkid uint32_t fsId = s3ClientAdaptor_->GetFsId(); - uint64_t blockPos; - uint64_t blockIndex; - uint64_t chunkIndex = chunkCacheManager_->GetIndex(); - uint64_t offset; - uint64_t tmpLen; - uint64_t n = 0; - std::string objectName; - uint32_t writeOffset = 0; - uint64_t chunkId; - uint64_t now = ::curve::common::TimeUtility::GetTimeofDaySec(); - char *data = nullptr; - curve::common::CountDownEvent cond(1); - std::atomic pendingReq(0); - FSStatusCode ret; - enum class cachePoily { - NCache, - RCache, - WRCache, - } cachePoily = cachePoily::NCache; - - VLOG(9) << "DataCache::Flush : now:" << now << ",createTime:" << createTime_ - << ",flushIntervalSec:" << flushIntervalSec - << ",chunkPos:" << chunkPos_ << ", len:" << len_ - << ", inodeId:" << inodeId << ",chunkIndex:" << chunkIndex; - - tmpLen = len_; - blockPos = chunkPos_ % blockSize; - blockIndex = chunkPos_ / blockSize; - offset = chunkIndex * chunkSize + chunkPos_; - - ret = s3ClientAdaptor_->AllocS3ChunkId(fsId, 1, &chunkId); + FSStatusCode ret = s3ClientAdaptor_->AllocS3ChunkId(fsId, 1, chunkId); if (ret != FSStatusCode::OK) { LOG(ERROR) << "alloc s3 chunkid fail. ret:" << ret; return CURVEFS_ERROR::INTERNAL; } - data = new (std::nothrow) char[len_]; - if (!data) { - LOG(ERROR) << "new data failed."; - return CURVEFS_ERROR::INTERNAL; + + + // generate flush task + uint64_t blockSize = s3ClientAdaptor_->GetBlockSize(); + uint64_t chunkSize = s3ClientAdaptor_->GetChunkSize(); + uint64_t tmpLen = len_; + uint64_t blockPos = chunkPos_ % blockSize; + uint64_t blockIndex = chunkPos_ / blockSize; + uint64_t n = 0; + while (tmpLen > 0) { + if (blockPos + tmpLen > blockSize) { + n = blockSize - blockPos; + } else { + n = tmpLen; + } + + // generate flush to disk or s3 task + std::string objectName = curvefs::common::s3util::GenObjName( + *chunkId, blockIndex, 0, fsId, inodeId); + int ret = 0; + uint64_t start = butil::cpuwide_time_us(); + auto context = std::make_shared(); + context->key = objectName; + context->buffer = data + (*writeOffset); + context->bufferSize = n; + context->startTime = butil::cpuwide_time_us(); + s3Tasks->emplace_back(context); + + + // generate flush to kvcache task + if (g_kvClientManager) { + auto task = std::make_shared(); + task->key = objectName; + task->value = data + (*writeOffset); + task->length = n; + kvCacheTasks->emplace_back(task); + } + + tmpLen -= n; + blockIndex++; + (*writeOffset) += n; + blockPos = (blockPos + n) % blockSize; } - CopyDataCacheToBuf(0, len_, data); - status_.store(DataCacheStatus::Flush, std::memory_order_release); - VLOG(9) << "start datacache flush, chunkId:" << chunkId - << ", inodeId:" << inodeId - << ",Len:" << tmpLen << ",blockPos:" << blockPos - << ",blockIndex:" << blockIndex; + return CURVEFS_ERROR::OK; +} - const bool mayCache = s3ClientAdaptor_->HasDiskCache() && - !s3ClientAdaptor_->GetDiskCacheManager() - ->IsDiskCacheFull() && !toS3; +CachePoily DataCache::GetCachePolicy(bool toS3) { + const bool mayCache = + s3ClientAdaptor_->HasDiskCache() && + !s3ClientAdaptor_->GetDiskCacheManager()->IsDiskCacheFull() && !toS3; if (s3ClientAdaptor_->IsReadCache() && mayCache) { - cachePoily = cachePoily::RCache; + return CachePoily::RCache; } else if (s3ClientAdaptor_->IsReadWriteCache() && mayCache) { - cachePoily = cachePoily::WRCache; + return CachePoily::WRCache; } else { - cachePoily = cachePoily::NCache; + return CachePoily::NCache; } +} - PutObjectAsyncCallBack cb = - [&, cachePoily] - (const std::shared_ptr &context) { +void DataCache::FlushTaskExecute( + CachePoily cachePoily, + const std::vector> &s3Tasks, + const std::vector> &kvCacheTasks) { + // callback + std::atomic s3PendingTaskCal(s3Tasks.size()); + std::atomic kvPendingTaskCal(kvCacheTasks.size()); + CountDownEvent s3TaskEnvent(s3PendingTaskCal); + CountDownEvent kvTaskEnvent(kvPendingTaskCal); + + PutObjectAsyncCallBack s3cb = + [&](const std::shared_ptr &context) { if (context->retCode == 0) { if (s3ClientAdaptor_->s3Metric_.get() != nullptr) { s3ClientAdaptor_->CollectMetrics( &s3ClientAdaptor_->s3Metric_->adaptorWriteS3, context->bufferSize, context->startTime); } + + if (CachePoily::RCache == cachePoily) { + VLOG(9) << "write to read cache, name = " << context->key; + s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(context, + true); + } + // Don't move the if sentence to the front // it will cause core dumped because s3Metric_ // will be destructed before being accessed - if (pendingReq.fetch_sub( - 1, std::memory_order_seq_cst) == 1) { - VLOG(9) << "pendingReq is over, " << context->key; - cond.Signal(); - } - VLOG(9) << "PutObjectAsyncCallBack: " << context->key - << " pendingReq is: " << pendingReq; - if (cachePoily::RCache == cachePoily) { - VLOG(9) << "Write to read cache, name: " << context->key; - s3ClientAdaptor_->GetDiskCacheManager() - ->Enqueue(context, true); - } + s3TaskEnvent.Signal(); return; } + LOG(WARNING) << "Put object failed, key: " << context->key; s3ClientAdaptor_->GetS3Client()->UploadAsync(context); }; - std::vector> uploadTasks; - - while (tmpLen > 0) { - if (blockPos + tmpLen > blockSize) { - n = blockSize - blockPos; - } else { - n = tmpLen; - } - - objectName = curvefs::common::s3util::GenObjName( - chunkId, blockIndex, 0, fsId, inodeId); - int ret = 0; - uint64_t start = butil::cpuwide_time_us(); - auto context = std::make_shared(); - context->key = objectName; - context->buffer = data + writeOffset; - context->bufferSize = n; - context->cb = cb; - context->startTime = butil::cpuwide_time_us(); - uploadTasks.emplace_back(context); - tmpLen -= n; - blockIndex++; - writeOffset += n; - blockPos = (blockPos + n) % blockSize; + SetKVCacheDone kvdone = [&](const std::shared_ptr &task) { + kvTaskEnvent.Signal(); + return; + }; + + // s3task execute + if (s3PendingTaskCal.load()) { + std::for_each( + s3Tasks.begin(), s3Tasks.end(), + [&](const std::shared_ptr &context) { + context->cb = s3cb; + if (CachePoily::WRCache == cachePoily) { + s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(context); + } else { + s3ClientAdaptor_->GetS3Client()->UploadAsync(context); + } + }); } - - pendingReq.fetch_add(uploadTasks.size(), std::memory_order_seq_cst); - VLOG(9) << "DataCache::Flush data cache flush pendingReq init: " - << pendingReq.load(std::memory_order_seq_cst); - assert(pendingReq.load(std::memory_order_seq_cst) != 0); - if (pendingReq.load(std::memory_order_seq_cst)) { - VLOG(9) << "wait for pendingReq"; - for (auto iter = uploadTasks.begin(); iter != uploadTasks.end(); - ++iter) { - VLOG(9) << "upload start: " << (*iter)->key - << " len : " << (*iter)->bufferSize; - if (cachePoily::WRCache == cachePoily) { - s3ClientAdaptor_->GetDiskCacheManager()->Enqueue(*iter); - } else { - s3ClientAdaptor_->GetS3Client()->UploadAsync(*iter); - } - } - cond.Wait(); + // kvtask execute + if (g_kvClientManager && kvPendingTaskCal.load()) { + std::for_each(kvCacheTasks.begin(), kvCacheTasks.end(), + [&](const std::shared_ptr &task) { + task->done = kvdone; + g_kvClientManager->Set(task); + }); + kvTaskEnvent.Wait(); } - delete[] data; - VLOG(9) << "update inode start, chunkId:" << chunkId - << ",offset:" << offset << ",len:" << writeOffset - << ",inodeId:" << inodeId << ",chunkIndex:" << chunkIndex; - { - std::shared_ptr inodeWrapper; - CURVEFS_ERROR ret = - s3ClientAdaptor_->GetInodeCacheManager()->GetInode( - inodeId, inodeWrapper); - if (ret != CURVEFS_ERROR::OK) { - LOG(WARNING) << "get inode fail, ret:" << ret; - status_.store(DataCacheStatus::Dirty, std::memory_order_release); - return ret; - } - S3ChunkInfo info; - PrepareS3ChunkInfo(chunkId, offset, writeOffset, &info); - inodeWrapper->AppendS3ChunkInfo(chunkIndex, info); - s3ClientAdaptor_->GetInodeCacheManager()->ShipToFlush(inodeWrapper); - } - VLOG(9) << "data flush end, inodeId: " << inodeId; - return CURVEFS_ERROR::OK; + s3TaskEnvent.Wait(); } void DataCache::PrepareS3ChunkInfo(uint64_t chunkId, uint64_t offset, diff --git a/curvefs/src/client/s3/client_s3_cache_manager.h b/curvefs/src/client/s3/client_s3_cache_manager.h index 379ca1bb92..7f17ad44a9 100644 --- a/curvefs/src/client/s3/client_s3_cache_manager.h +++ b/curvefs/src/client/s3/client_s3_cache_manager.h @@ -40,6 +40,9 @@ #include "src/common/concurrent/concurrent.h" #include "src/common/timeutility.h" #include "curvefs/src/client/metric/client_metric.h" +#include "curvefs/src/client/kvclient/kvclient_manager.h" +#include "curvefs/src/client/kvclient/kvclient.h" +#include "curvefs/src/client/inode_wrapper.h" using curve::common::ReadLockGuard; using curve::common::RWLock; @@ -66,11 +69,25 @@ using curvefs::metaserver::S3ChunkInfo; using curvefs::metaserver::S3ChunkInfoList; enum CacheType { Write = 1, Read = 2 }; + +enum class CachePoily { + NCache, + RCache, + WRCache, +}; + struct ReadRequest { uint64_t index; uint64_t chunkPos; uint64_t len; uint64_t bufOffset; + + std::string DebugString() const { + std::ostringstream os; + os << "ReadRequest ( index = " << index << ", chunkPos = " << chunkPos + << ", len = " << len << ", bufOffset = " << bufOffset << " )"; + return os.str(); + } }; struct S3ReadRequest { @@ -82,8 +99,26 @@ struct S3ReadRequest { uint64_t fsId; uint64_t inodeId; uint64_t compaction; + + std::string DebugString() const { + std::ostringstream os; + os << "S3ReadRequest ( chunkId = " << chunkId << ", offset = " << offset + << ", len = " << len << ", objectOffset = " << objectOffset + << ", readOffset = " << readOffset << ", fsId = " << fsId + << ", inodeId = " << inodeId << ", compaction = " << compaction + << " )"; + return os.str(); + } }; +inline std::string +S3ReadRequestVecDebugString(const std::vector &reqs) { + std::ostringstream os; + for_each(reqs.begin(), reqs.end(), + [&](const S3ReadRequest &req) { os << req.DebugString() << " "; }); + return os.str(); +} + struct ObjectChunkInfo { S3ChunkInfo s3ChunkInfo; uint64_t objectOffset; // s3 object's begin in the block @@ -176,6 +211,19 @@ class DataCache : public std::enable_shared_from_this { const char *data); void AddDataBefore(uint64_t len, const char *data); + CURVEFS_ERROR PrepareFlushTasks( + uint64_t inodeId, char *data, + std::vector> *s3Tasks, + std::vector> *kvCacheTasks, + uint64_t *chunkId, uint64_t *writeOffset); + + void FlushTaskExecute( + CachePoily cachePoily, + const std::vector> &s3Tasks, + const std::vector> &kvCacheTasks); + + CachePoily GetCachePolicy(bool toS3); + private: S3ClientAdaptorImpl *s3ClientAdaptor_; ChunkCacheManagerPtr chunkCacheManager_; @@ -215,8 +263,8 @@ class ChunkCacheManager char *dataBuf, uint64_t dataBufOffset, std::vector *requests); virtual void WriteNewDataCache(S3ClientAdaptorImpl *s3ClientAdaptor, - uint32_t chunkPos, uint32_t len, - const char *data); + uint32_t chunkPos, uint32_t len, + const char *data); virtual void AddReadDataCache(DataCachePtr dataCache); virtual DataCachePtr FindWriteableDataCache(uint64_t pos, uint64_t len, @@ -282,15 +330,24 @@ class FileCacheManager { S3ClientAdaptorImpl *s3ClientAdaptor) : fsId_(fsid), inode_(inode), s3ClientAdaptor_(s3ClientAdaptor) {} FileCacheManager() {} + ChunkCacheManagerPtr FindOrCreateChunkCacheManager(uint64_t index); + void ReleaseCache(); + virtual void TruncateCache(uint64_t offset, uint64_t fileSize); + virtual CURVEFS_ERROR Flush(bool force, bool toS3 = false); + virtual int Write(uint64_t offset, uint64_t length, const char *dataBuf); + virtual int Read(uint64_t inodeId, uint64_t offset, uint64_t length, char *dataBuf); + bool IsEmpty() { return chunkCacheMap_.empty(); } + uint64_t GetInodeId() const { return inode_; } + void SetChunkCacheManagerForTest(uint64_t index, ChunkCacheManagerPtr chunkCacheManager) { WriteLockGuard writeLockGuard(rwLock_); @@ -307,21 +364,65 @@ class FileCacheManager { const S3ChunkInfoList &s3ChunkInfoList, char *dataBuf, std::vector *requests, uint64_t fsId, uint64_t inodeId); - int ReadFromS3(const std::vector &requests, - std::vector *responses, - char* dataBuf, uint64_t fileLen); + void PrefetchS3Objs( const std::vector> &prefetchObjs); + void HandleReadRequest(const ReadRequest &request, const S3ChunkInfo &s3ChunkInfo, std::vector *addReadRequests, std::vector *deletingReq, std::vector *requests, char *dataBuf, uint64_t fsId, uint64_t inodeId); + int HandleReadRequest(const std::vector &requests, std::vector *responses, uint64_t fileLen); + // GetChunkLoc: get chunk info according to offset + void GetChunkLoc(uint64_t offset, uint64_t *index, uint64_t *chunkPos, + uint64_t *chunkSize); + + // GetBlockLoc: get block info according to offset + void GetBlockLoc(uint64_t offset, uint64_t *chunkIndex, uint64_t *chunkPos, + uint64_t *blockIndex, uint64_t *blockPos); + + // read data from memory read/write cache + void ReadFromMemCache(uint64_t offset, uint64_t length, char *dataBuf, + uint64_t *actualReadLen, + std::vector *memCacheMissRequest); + + // miss read from memory read/write cache, need read from + // kv(localdisk/remote cache/s3) + int GenerateKVReuqest(const std::shared_ptr &inodeWrapper, + const std::vector &readRequest, + char *dataBuf, std::vector *kvRequest); + + // read kv request, need + int ReadKVRequest(const std::vector &kvRequests, + char *dataBuf, uint64_t fileLen); + + // read kv request from local disk cache + bool ReadKVRequestFromLocalCache(const std::string &name, char *databuf, + uint64_t offset, uint64_t len); + + // read kv reuqest from remote cache like memcached + bool ReadKVRequestFromRemoteCache(const std::string &name, char *databuf, + uint64_t offset, uint64_t length); + + // read kv request from s3 + bool ReadKVRequestFromS3(const std::string &name, char *databuf, + uint64_t offset, uint64_t length, int *ret); + + // read retry policy when read from s3 occur not exist error + int HandleReadS3NotExist(int ret, uint32_t retry, + const std::shared_ptr &inodeWrapper); + + // prefetch for block + void PrefetchForBlock(const S3ReadRequest &req, uint64_t fileLen, + uint64_t blockSize, uint64_t chunkSize, + uint64_t startBlockIndex); + private: friend class AsyncPrefetchCallback; diff --git a/curvefs/test/client/BUILD b/curvefs/test/client/BUILD index 1bc8997fca..de7e3ef453 100644 --- a/curvefs/test/client/BUILD +++ b/curvefs/test/client/BUILD @@ -93,7 +93,8 @@ cc_test( "file_cache_manager_test.cpp", "chunk_cache_manager_test.cpp", "data_cache_test.cpp", - "client_s3_adaptor_Integration.cpp",], + "client_s3_adaptor_Integration.cpp", + ], ), copts = CURVE_TEST_COPTS + ["-I/usr/local/include/fuse3"], deps = [ @@ -113,21 +114,6 @@ cc_test( visibility = ["//visibility:public"], ) -cc_test( - name = "curvefs_fs_cache_manager_test", - srcs = [ - "fs_cache_manager_test.cpp", - ] + glob([ - "*.h", - ]), - deps = [ - "//curvefs/src/client:fuse_client_lib", - "//external:gtest", - "@com_google_googletest//:gtest_main", - ], - copts = CURVE_TEST_COPTS, -) - cc_library( name = "mock", hdrs = glob([ @@ -141,19 +127,19 @@ cc_library( visibility = ["//visibility:public"], ) -# cc_test( -# name = "curvefs_client_memcache_test", -# srcs = [ -# "client_memcache_test.cpp", -# ], -# deps = [ -# "//curvefs/src/client/kvclient:memcache_client_lib", -# "@com_google_absl//absl/strings", -# "@com_google_googletest//:gtest_main", -# "//external:gtest", -# "//external:glog", -# ], -# linkopts = ["-lmemcached"], -# copts = CURVE_TEST_COPTS, -# visibility = ["//visibility:public"], -# ) +cc_test( + name = "curvefs_client_memcache_test", + srcs = [ + "client_memcache_test.cpp", + ], + deps = [ + "//curvefs/src/client/kvclient:memcached_client_lib", + "@com_google_absl//absl/strings", + "@com_google_googletest//:gtest_main", + "//external:gtest", + "//external:glog", + ], + linkopts = ["-lmemcached"], + copts = CURVE_TEST_COPTS, + visibility = ["//visibility:public"], +) diff --git a/curvefs/test/client/client_memcache_test.cpp b/curvefs/test/client/client_memcache_test.cpp index 93b0079a48..08899fb979 100644 --- a/curvefs/test/client/client_memcache_test.cpp +++ b/curvefs/test/client/client_memcache_test.cpp @@ -20,133 +20,114 @@ * Author: fansehep (YangFan) */ -// #include -// #include +#include +#include -// #include -// #include -// #include -// #include +#include +#include +#include +#include -// #include "absl/strings/str_cat.h" -// #include "curvefs/src/client/kvclient/kvclient_manager.h" -// #include "curvefs/src/client/kvclient/memcache_client.h" +#include "absl/strings/str_cat.h" +#include "curvefs/src/client/kvclient/kvclient_manager.h" +#include "curvefs/src/client/kvclient/memcache_client.h" +#include "src/common/concurrent/count_down_event.h" -// using ::curvefs::client::GetKvCacheContext; -// using ::curvefs::client::KvClientManager; -// using ::curvefs::client::KvClientManagerConfig; -// using ::curvefs::client::MemCachedClient; -// using ::curvefs::client::SetKvCacheTask; +using curve::common::CountDownEvent; -// class MemCachedTest : public ::testing::Test { -// public: -// MemCachedTest() = default; -// void SetUp() { -// auto hostname = "127.0.0.1"; -// auto port = 18080; -// memcached_pid = ::fork(); -// if (0 > memcached_pid) { -// ASSERT_FALSE(true); -// } else if (0 == memcached_pid) { -// std::string memcached_config = -// "memcached -p " + std::to_string(port); -// ASSERT_EQ(0, execl("/bin/sh", "sh", "-c", -// memcached_config.c_str(), nullptr)); -// exit(0); -// } -// // wait memcached server start -// std::this_thread::sleep_for(std::chrono::seconds(4)); -// std::unique_ptr client(new MemCachedClient()); -// ASSERT_EQ(true, client->AddServer(hostname, port)); -// ASSERT_EQ(true, client->PushServer()); -// KvClientManagerConfig conf; -// conf.kvclient = std::move(client); -// conf.threadPooln = 2; -// ASSERT_EQ(true, manager_.Init(&conf)); -// } +namespace curvefs { +namespace client { -// void TearDown() { -// manager_.Uninit(); -// auto str = absl::StrCat("kill -9 ", memcached_pid); -// ::system(str.c_str()); -// std::this_thread::sleep_for(std::chrono::seconds(3)); -// } -// pid_t memcached_pid; -// KvClientManager manager_; -// }; +class MemCachedTest : public ::testing::Test { + public: + MemCachedTest() = default; + void SetUp() { + auto hostname = "127.0.0.1"; + auto port = 18080; + memcached_pid = ::fork(); + if (0 > memcached_pid) { + ASSERT_FALSE(true); + } else if (0 == memcached_pid) { + std::string memcached_config = + "memcached -u root -p " + std::to_string(port); + ASSERT_EQ(0, execl("/bin/sh", "sh", "-c", memcached_config.c_str(), + nullptr)); + exit(0); + } -// TEST_F(MemCachedTest, MultiThreadGetSetTest) { -// std::vector workers; -// int i; -// std::vector> kvstr = { -// {"123", "1231"}, -// {"456", "4561"}, -// {"789", "7891"}, -// {"234", "2341"}, -// {"890", "8901"} -// }; -// i = 0; -// for (; i < 5; i++) { -// workers.emplace_back([&, i]() { -// manager_.Set(kvstr[i].first, kvstr[i].second.c_str(), -// kvstr[i].second.length()); -// }); -// } -// for (auto& iter : workers) { -// iter.join(); -// } -// std::this_thread::sleep_for(std::chrono::seconds(3)); -// i = 0; -// for (; i < 5; i++) { -// workers.emplace_back([&, i]() { -// std::string res; -// ASSERT_EQ(true, manager_.Get(kvstr[i].first, &res)); -// ASSERT_EQ(res, kvstr[i].second); -// }); -// } -// for (auto& iter : workers) { -// if (iter.joinable()) { -// iter.join(); -// } -// } -// } + std::shared_ptr client(new MemCachedClient()); + ASSERT_EQ(true, client->AddServer(hostname, port)); + ASSERT_EQ(true, client->PushServer()); + KVClientManagerOpt opt; + opt.setThreadPooln = 2; + opt.getThreadPooln = 2; + ASSERT_EQ(true, manager_.Init(opt, client)); -// TEST_F(MemCachedTest, MultiThreadTask) { -// std::vector workers; -// int i; -// std::vector> kvstr = { -// {"123", "1231"}, -// {"456", "4561"}, -// {"789", "7891"}, -// {"234", "2341"}, -// {"890", "8901"} -// }; -// i = 0; -// for (; i < 5; i++) { -// workers.emplace_back([&, i]() { -// auto task = std::make_shared( -// kvstr[i].first, kvstr[i].second.c_str(), -// kvstr[i].second.length()); -// manager_.Enqueue(task); -// }); -// } -// for (auto& iter : workers) { -// iter.join(); -// } -// std::this_thread::sleep_for(std::chrono::seconds(3)); -// i = 0; -// for (; i < 5; i++) { -// workers.emplace_back([&, i]() { -// std::string result; -// auto context = -// std::make_shared(kvstr[i].first, &result); -// ASSERT_EQ(true, manager_.Get(context)); -// ASSERT_EQ(result, kvstr[i].second); -// }); -// } -// for (auto& iter : workers) { -// if (iter.joinable()) { -// iter.join(); -// } -// } -// } + // wait memcached server start + std::string errorlog; + int retry = 0; + do { + if (client->Set("1", "2", 1, &errorlog) || retry > 100) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + retry++; + } while (1); + } + + void TearDown() { + auto str = absl::StrCat("kill -9 ", memcached_pid); + ::system(str.c_str()); + std::this_thread::sleep_for(std::chrono::seconds(3)); + } + pid_t memcached_pid; + KVClientManager manager_; +}; + + +TEST_F(MemCachedTest, MultiThreadTask) { + // prepare data + std::vector workers; + std::vector> kvstr = { + {"123", "1231"}, + {"456", "4561"}, + {"789", "7891"}, + {"234", "2341"}, + {"890", "8901"} + }; + + // set + CountDownEvent taskEnvent(5); + for (int i = 0; i < 5; i++) { + workers.emplace_back([&, i]() { + auto task = std::make_shared( + kvstr[i].first, kvstr[i].second.c_str(), + kvstr[i].second.length()); + task->done = + [&taskEnvent](const std::shared_ptr &task) { + taskEnvent.Signal(); + }; + manager_.Set(task); + }); + } + taskEnvent.Wait(); + ASSERT_EQ(5, g_kvClientMetric->kvClientSet.latency.count()); + + // get + for (int i = 0; i < 5; i++) { + workers.emplace_back([&, i]() { + char *result = new char[4]; + auto context = std::make_shared(kvstr[i].first, + result, 0, 4); + ASSERT_EQ(true, manager_.Get(context)); + ASSERT_EQ(0, memcmp(result, kvstr[i].second.c_str(), 4)); + }); + } + for (auto &iter : workers) { + if (iter.joinable()) { + iter.join(); + } + } +} +} // namespace client +} // namespace curvefs diff --git a/curvefs/test/client/client_s3_adaptor_Integration.cpp b/curvefs/test/client/client_s3_adaptor_Integration.cpp index 40eb693682..f26157da9c 100644 --- a/curvefs/test/client/client_s3_adaptor_Integration.cpp +++ b/curvefs/test/client/client_s3_adaptor_Integration.cpp @@ -26,11 +26,13 @@ #include "curvefs/src/client/inode_wrapper.h" #include "curvefs/src/client/s3/client_s3_adaptor.h" +#include "curvefs/src/client/kvclient/kvclient_manager.h" +#include "src/common/curve_define.h" #include "curvefs/test/client/mock_client_s3.h" #include "curvefs/test/client/mock_inode_cache_manager.h" #include "curvefs/test/client/mock_metaserver_service.h" +#include "curvefs/test/client/mock_kvclient.h" #include "curvefs/test/client/rpcclient/mock_mds_client.h" -#include "src/common/curve_define.h" namespace curvefs { namespace client { @@ -49,6 +51,8 @@ using ::testing::Invoke; using ::testing::Return; using ::testing::SetArgPointee; using ::testing::SetArgReferee; +using ::testing::SetArrayArgument; +using ::testing::AtLeast; using ::testing::WithArg; using rpcclient::MockMdsClient; @@ -113,7 +117,6 @@ std::shared_ptr InitInodeForIntegration() { return std::make_shared(std::move(inode), nullptr); } - } // namespace class ClientS3IntegrationTest : public testing::Test { @@ -153,12 +156,21 @@ class ClientS3IntegrationTest : public testing::Test { server_.Join(); } + void InitKVClientManager() { + g_kvClientManager = new KVClientManager(); + + KVClientManagerOpt opt; + std::shared_ptr mockKVClient(&mockKVClient_); + g_kvClientManager->Init(opt, mockKVClient); + } + protected: S3ClientAdaptorImpl *s3ClientAdaptor_; MockMetaServerService mockMetaServerService_; MockS3Client mockS3Client_; MockInodeCacheManager mockInodeManager_; MockMdsClient mockMdsClient_; + MockKVClient mockKVClient_; std::string addr_ = "127.0.0.1:5630"; brpc::Server server_; Aws::SDKOptions awsOptions_; @@ -2821,6 +2833,101 @@ TEST_F(ClientS3IntegrationTest, test_fssync_overlap_write) { gObjectDataMaps.clear(); } +TEST_F(ClientS3IntegrationTest, test_write_read_remotekvcache) { + InitKVClientManager(); + + curvefs::client::common::FLAGS_enableCto = true; + + uint64_t offset_0 = 0, offset_4M = (4 << 20), chunkId = 10; + uint64_t inodeId = inode->GetInodeId(); + uint64_t len = 128 * 1024; // 128K + char *buf = new char[len]; + memset(buf, 'b', len); + + // write data and prepare for flush + { + EXPECT_CALL(mockS3Client_, UploadAsync(_)) + .Times(2) + .WillRepeatedly(Invoke( + [&](const std::shared_ptr &context) { + context->retCode = 0; + context->cb(context); + })); + EXPECT_CALL(mockInodeManager_, GetInode(_, _)) + .Times(2) + .WillRepeatedly( + DoAll(SetArgReferee<1>(inode), Return(CURVEFS_ERROR::OK))); + + int ret = + s3ClientAdaptor_->Write(inodeId, offset_0, len, buf); + ASSERT_EQ(len, ret); + ret = s3ClientAdaptor_->Write(inodeId, offset_4M, len, buf); + ASSERT_EQ(len, ret); + } + + // flush data + { + EXPECT_CALL(mockMdsClient_, AllocS3ChunkId(_, _, _)) + .Times(2) + .WillOnce( + DoAll(SetArgPointee<2>(chunkId), Return(FSStatusCode::OK))) + .WillOnce( + DoAll(SetArgPointee<2>(chunkId + 1), Return(FSStatusCode::OK))); + EXPECT_CALL(mockKVClient_, Set(_, _, _, _)) + .Times(2) + .WillOnce(Return(true)) + .WillOnce(Return(false)); + EXPECT_CALL(mockInodeManager_, ShipToFlush(_)).Times(2); + + CURVEFS_ERROR res = s3ClientAdaptor_->Flush(inodeId); + ASSERT_EQ(CURVEFS_ERROR::OK, res); + } + + // read (offset_0, len) + { + char *readBuf = new char[len]; + memset(readBuf, 0, len); + EXPECT_CALL(mockInodeManager_, GetInode(_, _)) + .WillOnce( + DoAll(SetArgReferee<1>(inode), Return(CURVEFS_ERROR::OK))); + EXPECT_CALL(mockKVClient_, Get(_, _, 0, len, _)) + .WillOnce(DoAll(SetArrayArgument<1>(buf, buf + len), Return(true))); + int readLen = s3ClientAdaptor_->Read(inodeId, offset_0, len, readBuf); + EXPECT_EQ(readLen, len); + ASSERT_EQ(0, memcmp(buf, readBuf, len)); + } + + // read (offset_4M, len) + { + char *readBuf = new char[len]; + memset(readBuf, 0, len); + EXPECT_CALL(mockInodeManager_, GetInode(_, _)) + .WillOnce( + DoAll(SetArgReferee<1>(inode), Return(CURVEFS_ERROR::OK))); + EXPECT_CALL(mockKVClient_, Get(_, _, 0, len, _)) + .WillOnce(DoAll(SetArrayArgument<1>(buf, buf + len), Return(true))); + int readLen = s3ClientAdaptor_->Read(inodeId, offset_4M, len, readBuf); + EXPECT_EQ(readLen, len); + ASSERT_EQ(0, memcmp(buf, readBuf, len)); + } + + // read (offset_0, len), remote cache fail + { + char *readBuf = new char[len]; + memset(readBuf, 0, len); + EXPECT_CALL(mockInodeManager_, GetInode(_, _)) + .WillOnce( + DoAll(SetArgReferee<1>(inode), Return(CURVEFS_ERROR::OK))); + EXPECT_CALL(mockKVClient_, Get(_, _, 0, len, _)) + .WillOnce(Return(false)); + EXPECT_CALL(mockS3Client_, Download(_, _, _, _)) + .WillOnce(DoAll(SetArrayArgument<1>(buf, buf + len), Return(true))); + int readLen = s3ClientAdaptor_->Read(inodeId, offset_0, len, readBuf); + EXPECT_EQ(readLen, len); + ASSERT_EQ(0, memcmp(buf, readBuf, len)); + } +} + } // namespace client } // namespace curvefs diff --git a/curvefs/test/client/client_s3_adaptor_test.cpp b/curvefs/test/client/client_s3_adaptor_test.cpp index 566421335f..ec402f2eee 100644 --- a/curvefs/test/client/client_s3_adaptor_test.cpp +++ b/curvefs/test/client/client_s3_adaptor_test.cpp @@ -47,6 +47,8 @@ using ::testing::WithArg; using rpcclient::MockMdsClient; +extern KVClientManager *g_kvClientManager; + class ClientS3AdaptorTest : public testing::Test { protected: ClientS3AdaptorTest() {} @@ -73,6 +75,7 @@ class ClientS3AdaptorTest : public testing::Test { s3ClientAdaptor_->Init(option, mockS3Client_, mockInodeManager_, mockMdsClient_, mockFsCacheManager_, mockDiskcacheManagerImpl_); + g_kvClientManager = nullptr; } void TearDown() override { @@ -262,7 +265,7 @@ TEST_F(ClientS3AdaptorTest, FlushAllCache_with_no_cache) { } TEST_F(ClientS3AdaptorTest, FlushAllCache_with_cache) { - s3ClientAdaptor_->SetDiskCache(DiskCacheType::ReadWrite); + s3ClientAdaptor_->SetDiskCache(DiskCacheType::ReadWrite); LOG(INFO) << "############ case1: clear write cache fail"; auto filecache = std::make_shared(); diff --git a/curvefs/test/client/file_cache_manager_test.cpp b/curvefs/test/client/file_cache_manager_test.cpp index 972f2672ea..8815ea6d2e 100644 --- a/curvefs/test/client/file_cache_manager_test.cpp +++ b/curvefs/test/client/file_cache_manager_test.cpp @@ -29,6 +29,15 @@ #include "curvefs/test/client/mock_inode_cache_manager.h" #include "curvefs/test/client/mock_client_s3.h" +namespace curvefs { +namespace client { +namespace common { +DECLARE_bool(enableCto); +DECLARE_bool(supportKVcache); +} // namespace common +} // namespace client +} // namespace curvefs + namespace curvefs { namespace client { using ::testing::_; @@ -39,6 +48,8 @@ using ::testing::SetArgPointee; using ::testing::SetArgReferee; using ::testing::WithArg; +extern KVClientManager *g_kvClientManager; + class FileCacheManagerTest : public testing::Test { protected: FileCacheManagerTest() {} @@ -68,6 +79,8 @@ class FileCacheManagerTest : public testing::Test { fileCacheManager_ = std::make_shared(fsId, inodeId, s3ClientAdaptor_); mockChunkCacheManager_ = std::make_shared(); + curvefs::client::common::FLAGS_enableCto = false; + g_kvClientManager = nullptr; } void TearDown() override { diff --git a/curvefs/test/client/mock_kvclient.h b/curvefs/test/client/mock_kvclient.h new file mode 100644 index 0000000000..2a1e6c9594 --- /dev/null +++ b/curvefs/test/client/mock_kvclient.h @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2021 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * Created Date: Fri Nov 25 2022 + * Author: lixiaocui + */ + +#ifndef CURVEFS_TEST_CLIENT_MOCK_KVCLIENT_H_ +#define CURVEFS_TEST_CLIENT_MOCK_KVCLIENT_H_ + +#include +#include +#include "curvefs/src/client/kvclient/kvclient.h" + +namespace curvefs { +namespace client { +class MockKVClient : public KVClient { + public: + MockKVClient() : KVClient() {} + ~MockKVClient() = default; + + MOCK_METHOD4(Set, bool(const std::string &, const char *, const uint64_t, + std::string *)); + MOCK_METHOD5(Get, bool(const std::string &, char *, uint64_t, uint64_t, + std::string *)); +}; + +} // namespace client +} // namespace curvefs + +#endif // CURVEFS_TEST_CLIENT_MOCK_KVCLIENT_H_ diff --git a/curvefs/test/client/rpcclient/mock_mds_client.h b/curvefs/test/client/rpcclient/mock_mds_client.h index 1eeda7b8c3..591dc8a58b 100644 --- a/curvefs/test/client/rpcclient/mock_mds_client.h +++ b/curvefs/test/client/rpcclient/mock_mds_client.h @@ -124,9 +124,11 @@ class MockMdsClient : public MdsClient { MOCK_METHOD3( ReleaseVolumeBlockGroup, - SpaceErrCode(uint32_t, - const std::string&, - const std::vector&)); + SpaceErrCode(uint32_t, const std::string &, + const std::vector &)); + + MOCK_METHOD2(AllocOrGetMemcacheCluster, + bool(uint32_t, curvefs::mds::topology::MemcacheCluster *)); }; } // namespace rpcclient } // namespace client diff --git a/curvefs/test/client/volume/BUILD.bazel b/curvefs/test/client/volume/BUILD.bazel index 9d4b821cfc..6df927d6f5 100644 --- a/curvefs/test/client/volume/BUILD.bazel +++ b/curvefs/test/client/volume/BUILD.bazel @@ -25,6 +25,7 @@ cc_test( copts = CURVE_TEST_COPTS, deps = [ "//curvefs/src/client:fuse_client_lib", + "//curvefs/src/client/kvclient:memcached_client_lib", "//curvefs/test/volume/mock", "@com_google_absl//absl/strings", "@com_google_googletest//:gtest", diff --git a/thirdparties/memcache/Makefile b/thirdparties/memcache/Makefile new file mode 100644 index 0000000000..c79cd4da0b --- /dev/null +++ b/thirdparties/memcache/Makefile @@ -0,0 +1,12 @@ + +.PHONY: download build clean + +build: clean download + @cd libmemcached-1.1.2 && mkdir build-libmemcached && cd build-libmemcached && cmake .. && make libmemcached + @cp libmemcached-1.1.2/build-libmemcached/include/libmemcached-1.0/configure.h libmemcached-1.1.2/include/libmemcached-1.0 + +download: clean + @wget https://curve-build.nos-eastchina1.126.net/memcache/libmemcached-1.1.2.tar.gz && tar -zxf libmemcached-1.1.2.tar.gz + +clean: + @rm -fr libmemcached* diff --git a/thirdparties/memcache/memcache.BUILD b/thirdparties/memcache/memcache.BUILD new file mode 100644 index 0000000000..31472aa120 --- /dev/null +++ b/thirdparties/memcache/memcache.BUILD @@ -0,0 +1,19 @@ +# Description: +# Memcache C++ SDK + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "libmemcached", + srcs = glob([ + "build-libmemcached/src/libmemcached/libmemcached.so", + "build-libmemcached/src/libmemcached/libmemcached.so.11", + ]), + hdrs = glob([ + "include/libmemcached-1.0/**/*.h", + "include/libmemcached-1.0/*.h", + ]), + includes = [ + "include/", + ], +) \ No newline at end of file diff --git a/ut.sh b/ut.sh index 0c5bcb3c76..e4e20d7812 100644 --- a/ut.sh +++ b/ut.sh @@ -15,32 +15,6 @@ ps -ef | grep mds | grep -v grep | grep -v gcc | awk '{print $2}' | sudo xargs k ps -ef | grep etcd | grep -v grep | grep -v gcc | awk '{print $2}' | sudo xargs kill -9 || true ps -ef | grep test | grep -v grep | grep -v gcc | awk '{print $2}' | sudo xargs kill -9 || true -################################################################ __ROCKSDB__ -g_build_opts=() -kernel_version=`uname -r | awk -F . '{print $1 * 1000 + $2}'` -if [ $kernel_version -gt 5001 ]; then - g_build_opts+=("--define IO_URING_SUPPORT=1") -fi -g_rocksdb_root="${PWD}/thirdparties/rocksdb" -(cd ${g_rocksdb_root} && make build && make install prefix=${g_rocksdb_root}) -################################################################ __ROCKSDB__ - -g_aws_sdk_root="${PWD}/thirdparties/aws/" -(cd ${g_aws_sdk_root} && make) - -if [ -f /home/nbs/etcdclient/libetcdclient.h ] && [ -f /home/nbs/etcdclient/libetcdclient.so ] -then - cp /home/nbs/etcdclient/libetcdclient.h ${WORKSPACE}thirdparties/etcdclient - cp /home/nbs/etcdclient/libetcdclient.so ${WORKSPACE}thirdparties/etcdclient -else - cd ${WORKSPACE}thirdparties/etcdclient && make all - #sudo cp /curve/curve_multijob/thirdparties/etcdclient/libetcdclient.so /usr/lib/ - #make clean -fi - -export LD_LIBRARY_PATH=${g_aws_sdk_root}/aws-sdk-cpp/build/aws-cpp-sdk-core:${g_aws_sdk_root}/aws-sdk-cpp/build/aws-cpp-sdk-s3-crt:${LD_LIBRARY_PATH} -export LD_LIBRARY_PATH=${WORKSPACE}thirdparties/etcdclient:${LD_LIBRARY_PATH} - cd ${WORKSPACE} bash replace-curve-repo.sh mkdir runlog storage @@ -86,17 +60,13 @@ fi set -e -################################################################ __GCC VERSION__ -if [ `gcc -dumpversion | awk -F'.' '{print $1}'` -gt 6 ]; then - g_build_opts+=("--config=gcc7-later") -fi - -bazel build ... -c dbg --collect_code_coverage --copt -DHAVE_ZLIB=1 --define=with_glog=true --define=libunwind=true --copt -DGFLAGS_NS=google --copt -Wno-error=format-security --copt -DUSE_BTHREAD_MUTEX ${g_build_opts[@]} - #test_bin_dirs="bazel-bin/test/ bazel-bin/nebd/test/ bazel-bin/curvefs/test/" if [ $1 == "curvebs" ];then +make build stor=bs dep=1 only=test/* +make build stor=bs only=nebd/test/* test_bin_dirs="bazel-bin/test/ bazel-bin/nebd/test/" elif [ $1 == "curvefs" ];then +make build stor=fs dep=1 only=curvefs/test/* test_bin_dirs="bazel-bin/curvefs/test/" fi echo $test_bin_dirs @@ -105,7 +75,7 @@ echo $test_bin_dirs for i in 0 1 2 3; do mkdir -p $i/{copysets,recycler}; done # run all unittests background -for i in `find ${test_bin_dirs} -type f -executable -exec file -i '{}' \; | grep -E 'x-executable|x-sharedlib' | grep "charset=binary" | grep -v ".so"|grep test | grep -Ev 'snapshot-server|snapshot_dummy_server|client-test|server-test|multi|topology_dummy|curve_client_workflow|curve_fake_mds|curvefs_client_memcache_test' | awk -F":" '{print $1}' | sed -n '1,40p' ` ;do sudo $i 2>&1 | tee $i.log & done +for i in `find ${test_bin_dirs} -type f -executable -exec file -i '{}' \; | grep -E 'x-executable|x-sharedlib' | grep "charset=binary" | grep -v ".so"|grep test | grep -Ev 'snapshot-server|snapshot_dummy_server|client-test|server-test|multi|topology_dummy|curve_client_workflow|curve_fake_mds' | awk -F":" '{print $1}' | sed -n '1,40p' ` ;do sudo $i 2>&1 | tee $i.log & done if [ $1 == "curvebs" ];then sleep 360 fi diff --git a/util/build.sh b/util/build.sh index bea4bcac20..95d4db04dc 100644 --- a/util/build.sh +++ b/util/build.sh @@ -213,6 +213,8 @@ build_requirements() { fi g_rocksdb_root="${PWD}/thirdparties/rocksdb" (cd ${g_rocksdb_root} && make build from_source=${g_build_rocksdb} && make install prefix=${g_rocksdb_root}) + g_memcache_root="${PWD}/thirdparties/memcache" + (cd ${g_memcache_root} && make build) fi g_aws_sdk_root="thirdparties/aws" (cd ${g_aws_sdk_root} && make) diff --git a/util/image.sh b/util/image.sh index 87099ec06c..e4b30909b7 100644 --- a/util/image.sh +++ b/util/image.sh @@ -75,8 +75,8 @@ install_pkg $1 $prefix install_pkg $1 $prefix etcd install_pkg $1 $prefix monitor copy_file ./thirdparties/aws/aws-sdk-cpp/build/aws-cpp-sdk-core/libaws-cpp-sdk-core.so $docker_prefix -copy_file ./thirdparties/aws/aws-sdk-cpp/build/aws-cpp-sdk-core/libaws-cpp-sdk-core.so $docker_prefix copy_file ./thirdparties/aws/aws-sdk-cpp/build/aws-cpp-sdk-s3-crt/libaws-cpp-sdk-s3-crt.so $docker_prefix +copy_file ./thirdparties/memcache/libmemcached-1.1.2/build-libmemcached/src/libmemcached/libmemcached.so $docker_prefix if [ "$1" == "bs" ]; then paths=`ls conf/* nebd/etc/nebd/*`