Skip to content

Commit

Permalink
curvefs: support client to memcached cluster
Browse files Browse the repository at this point in the history
Signed-off-by: ilixiaocui <[email protected]>
  • Loading branch information
ilixiaocui committed Dec 8, 2022
1 parent 4213163 commit ba71cc1
Show file tree
Hide file tree
Showing 38 changed files with 3,070 additions and 2,471 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ only?= "*"
tag?= "curvebs:unknown"
case?= "*"
os?= "debian9"
ci?=0

define help_msg
## list
Expand Down Expand Up @@ -59,7 +60,7 @@ list:
@bash util/build.sh --stor=$(stor) --list

build:
@bash util/build.sh --stor=$(stor) --only=$(only) --dep=$(dep) --release=$(release) --os=$(os)
@bash util/build.sh --stor=${stor} --only=$(only) --dep=$(dep) --release=$(release) --ci=$(ci) --os=$(os)

dep:
@bash util/build.sh --stor=$(stor) --only="" --dep=1
Expand Down
6 changes: 6 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ new_local_repository(
path = "thirdparties/aws/aws-sdk-cpp",
)

new_local_repository(
name = "libmemcached",
build_file = "//:thirdparties/memcache/memcache.BUILD",
path = "thirdparties/memcache/libmemcached-1.1.2",
)


# C++ rules for Bazel.
http_archive(
Expand Down
11 changes: 9 additions & 2 deletions curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mdsOpt.rpcRetryOpt.normalRetryTimesBeforeTriggerWait=3
mdsOpt.rpcRetryOpt.waitSleepMs=1000
mdsOpt.rpcRetryOpt.addrs=127.0.0.1:6700,127.0.0.1:6701,127.0.0.1:6702 # __ANSIBLE_TEMPLATE__ {{ groups.mds | join_peer(hostvars, "mds_listen_port") }} __ANSIBLE_TEMPLATE__


#
# lease options
#
Expand Down Expand Up @@ -86,8 +87,14 @@ fuseClient.dCacheLruSize=1000000
fuseClient.enableICacheMetrics=true
fuseClient.enableDCacheMetrics=true
fuseClient.cto=true
# you should enable it when mount one filesystem to multi mount points,
# it guarantee the consistent of file after rename, otherwise you should

### kvcache opt
fuseClient.supportKVcache=false
fuseClient.setThreadPool=4
fuseClient.getThreadPool=4

# you shoudle enable it when mount one filesystem to multi mountpoints,
# it gurantee the consistent of file after rename, otherwise you should
# disable it for performance.
fuseClient.enableMultiMountPointRename=true
# splice will bring higher performance in some cases
Expand Down
3 changes: 3 additions & 0 deletions curvefs/docker/debian10/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ COPY entrypoint.sh /
COPY curvefs/tools/sbin/curvefs_tool /usr/bin
COPY libaws-cpp-sdk-core.so /usr/lib/
COPY libaws-cpp-sdk-s3-crt.so /usr/lib/
COPY libmemcached.so /usr/lib/
COPY libmemcached.so.11 /usr/lib/
COPY libhashkit.so.2 /usr/lib
RUN chmod a+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
3 changes: 3 additions & 0 deletions curvefs/docker/debian11/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ COPY entrypoint.sh /
COPY curvefs/tools/sbin/curvefs_tool /usr/bin
COPY libaws-cpp-sdk-core.so /usr/lib/
COPY libaws-cpp-sdk-s3-crt.so /usr/lib/
COPY libmemcached.so /usr/lib/
COPY libmemcached.so.11 /usr/lib/
COPY libhashkit.so.2 /usr/lib
RUN chmod a+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
3 changes: 3 additions & 0 deletions curvefs/docker/debian9/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ COPY entrypoint.sh /
COPY curvefs/tools/sbin/curvefs_tool /usr/bin
COPY libaws-cpp-sdk-core.so /usr/lib/
COPY libaws-cpp-sdk-s3-crt.so /usr/lib/
COPY libmemcached.so /usr/lib/
COPY libmemcached.so.11 /usr/lib/
COPY libhashkit.so.2 /usr/lib
RUN chmod a+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
39 changes: 39 additions & 0 deletions curvefs/proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,41 @@ message ListTopologyResponse {
required ListMetaServerResponse metaservers = 5;
}

message MemcachedServer {
required string ip = 1;
required uint32 port = 2;
}

message RegistMemcacheClusterRequest {
repeated MemcachedServer servers = 1;
}

message RegistMemcacheClusterResponse {
required TopoStatusCode statusCode = 1;
}

message ListMemcacheClusterRequest {
}

message MemcacheCluster {
required uint32 clusterId = 1;
repeated MemcachedServer servers = 2;
}

message ListMemcacheClusterResponse {
required TopoStatusCode statusCode = 1;
repeated MemcacheCluster memClusters = 2;
}

message AllocOrGetMemcacheClusterRequest {
required uint32 fsId = 1;
}

message AllocOrGetMemcacheClusterResponse {
required TopoStatusCode statusCode = 1;
optional MemcacheCluster cluster = 2;
}

service TopologyService {
rpc RegistMetaServer(MetaServerRegistRequest) returns (MetaServerRegistResponse);
rpc ListMetaServer(ListMetaServerRequest) returns (ListMetaServerResponse);
Expand Down Expand Up @@ -493,4 +528,8 @@ service TopologyService {
rpc StatMetadataUsage(StatMetadataUsageRequest) returns (StatMetadataUsageResponse);

rpc ListTopology(ListTopologyRequest) returns (ListTopologyResponse);

rpc RegistMemcacheCluster(RegistMemcacheClusterRequest) returns (RegistMemcacheClusterResponse);
rpc ListMemcacheCluster(ListMemcacheClusterRequest) returns (ListMemcacheClusterResponse);
rpc AllocOrGetMemcacheCluster(AllocOrGetMemcacheClusterRequest) returns (AllocOrGetMemcacheClusterResponse);
}
1 change: 1 addition & 0 deletions curvefs/src/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ cc_library(
"//curvefs/src/client/rpcclient",
"//curvefs/src/common:curvefs_common",
"//curvefs/src/client/lease:curvefs_lease",
"//curvefs/src/client/kvclient:memcached_client_lib",
"//external:brpc",
"//external:gflags",
"//external:glog",
Expand Down
13 changes: 13 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ namespace common {
DEFINE_bool(enableCto, true, "acheieve cto consistency");
DEFINE_bool(useFakeS3, false,
"Use fake s3 to inject more metadata for testing metaserver");
DEFINE_bool(supportKVcache, false, "use kvcache to speed up sharing");

void InitMdsOption(Configuration *conf, MdsOption *mdsOpt) {
conf->GetValueFatalIfFail("mdsOpt.mdsMaxRetryMS", &mdsOpt->mdsMaxRetryMS);
Expand Down Expand Up @@ -232,6 +233,16 @@ void InitRefreshDataOpt(Configuration *conf,
&opt->refreshDataIntervalSec);
}

void InitKVClientManagerOpt(Configuration *conf,
KVClientManagerOpt *config) {
conf->GetValueFatalIfFail("fuseClient.supportKVcache",
&FLAGS_supportKVcache);
conf->GetValueFatalIfFail("fuseClient.setThreadPool",
&config->setThreadPooln);
conf->GetValueFatalIfFail("fuseClient.getThreadPool",
&config->getThreadPooln);
}

void SetBrpcOpt(Configuration *conf) {
curve::common::GflagsLoadValueFromConfIfCmdNotSet dummy;
dummy.Load(conf, "defer_close_second", "rpc.defer.close.second",
Expand All @@ -251,6 +262,7 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) {
InitVolumeOption(conf, &clientOption->volumeOpt);
InitLeaseOpt(conf, &clientOption->leaseOpt);
InitRefreshDataOpt(conf, &clientOption->refreshDataOption);
InitKVClientManagerOpt(conf, &clientOption->kvClientManagerOpt);

conf->GetValueFatalIfFail("fuseClient.attrTimeOut",
&clientOption->attrTimeOut);
Expand Down Expand Up @@ -280,6 +292,7 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) {
&clientOption->disableXattr);
conf->GetValueFatalIfFail("fuseClient.cto", &FLAGS_enableCto);


LOG_IF(WARNING, conf->GetBoolValue("fuseClient.enableSplice",
&clientOption->enableFuseSplice))
<< "Not found `fuseClient.enableSplice` in conf, use default value `"
Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ struct SpaceAllocServerOption {
uint64_t rpcTimeoutMs;
};

struct KVClientManagerOpt {
int setThreadPooln = 4;
int getThreadPooln = 4;
};

struct DiskCacheOption {
DiskCacheType diskCacheType;
// cache disk dir
Expand Down Expand Up @@ -182,6 +187,7 @@ struct FuseClientOption {
VolumeOption volumeOpt;
LeaseOpt leaseOpt;
RefreshDataOption refreshDataOption;
KVClientManagerOpt kvClientManagerOpt;

double attrTimeOut;
double entryTimeOut;
Expand Down
51 changes: 48 additions & 3 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,45 @@
* Author: xuchaojie
*/

#include "curvefs/src/client/fuse_s3_client.h"

#include <memory>
#include <vector>

#include "curvefs/src/client/fuse_s3_client.h"
#include "curvefs/src/client/kvclient/memcache_client.h"

namespace curvefs {
namespace client {
namespace common {

DECLARE_bool(enableCto);
DECLARE_bool(supportKVcache);

} // namespace common
} // namespace client
} // namespace curvefs

namespace curvefs {
namespace client {

using curvefs::client::common::FLAGS_supportKVcache;
using curvefs::client::common::FLAGS_enableCto;
using curvefs::mds::topology::MemcacheCluster;
using curvefs::mds::topology::MemcachedServer;

CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
FuseClientOption opt(option);
// init kvcache
if (FLAGS_supportKVcache && !InitKVCache(option.kvClientManagerOpt)) {
return CURVEFS_ERROR::INTERNAL;
}

CURVEFS_ERROR ret = FuseClient::Init(opt);
if (ret != CURVEFS_ERROR::OK) {
return ret;
}

// set fsS3Option
// set fs S3Option
const auto& s3Info = fsInfo_->detail().s3info();
::curve::common::S3InfoOption fsS3Option;
::curvefs::client::common::S3Info2FsS3Option(s3Info, &fsS3Option);
Expand Down Expand Up @@ -80,6 +96,35 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
return ret;
}


bool FuseS3Client::InitKVCache(const KVClientManagerOpt &opt) {
// get kvcache cluster
MemcacheCluster kvcachecluster;
if (!mdsClient_->AllocOrGetMemcacheCluster(fsInfo_->fsid(),
&kvcachecluster)) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but AllocOrGetMemcacheCluster fail";
return false;
}

// init kvcache client
auto memcacheClient = std::make_shared<MemCachedClient>();
if (!memcacheClient->Init(kvcachecluster)) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but init memcache client fail";
return false;
}

g_kvClientManager = new KVClientManager();
if (!g_kvClientManager->Init(opt, memcacheClient)) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but init kvClientManager fail";
return false;
}

return true;
}

void FuseS3Client::GetWarmUpFileList(const WarmUpFileContext_t&warmUpFile,
std::vector<std::string>& warmUpFilelist) {
struct fuse_file_info fi{};
Expand Down Expand Up @@ -536,7 +581,7 @@ CURVEFS_ERROR FuseS3Client::FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
CURVEFS_ERROR ret = CURVEFS_ERROR::OK;

// if enableCto, flush all write cache both in memory cache and disk cache
if (curvefs::client::common::FLAGS_enableCto) {
if (FLAGS_enableCto) {
ret = s3Adaptor_->FlushAllCache(ino);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "FuseOpFlush, flush all cache fail, ret = " << ret
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class FuseS3Client : public FuseClient {
struct fuse_file_info *fi) override;

private:
bool InitKVCache(const KVClientManagerOpt &opt);

CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override;

void FlushData() override;
Expand Down
42 changes: 22 additions & 20 deletions curvefs/src/client/kvclient/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@
# limitations under the License.
#

# load("//:copts.bzl", "CURVE_DEFAULT_COPTS")
load("//:copts.bzl", "CURVE_DEFAULT_COPTS")



# cc_library(
# name = "memcache_client_lib",
# srcs = glob(["*.cpp"]),
# hdrs = glob(["*.h"]),
# copts = CURVE_DEFAULT_COPTS,
# linkopts = [
# "-lmemcached",
# ],
# visibility = ["//visibility:public"],
# deps = [
# "@com_google_absl//absl/strings",
# "//external:glog",
# "//external:bthread",
# "//src/common:curve_common",
# "//src/common:curve_s3_adapter",
# ],
# )
cc_library(
name = "memcached_client_lib",
srcs = glob(["*.cpp"]),
hdrs = glob(["*.h"]),
copts = CURVE_DEFAULT_COPTS,
linkopts = [
"-L/usr/local/lib",
],
visibility = ["//visibility:public"],
deps = [
"//curvefs/proto:curvefs_topology_cc_proto",
"//curvefs/src/client/common:common",
"//curvefs/src/client/metric:client_metric",
"@com_google_absl//absl/strings",
"//external:glog",
"//external:bthread",
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
"@libmemcached",
],
)
27 changes: 10 additions & 17 deletions curvefs/src/client/kvclient/kvclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,25 @@ namespace client {
* Single client to kv interface.
*/

class KvClient {
class KVClient {
public:
KvClient() = default;
~KvClient() = default;
virtual bool Init() = 0;
KVClient() = default;
~KVClient() = default;

virtual void UnInit() = 0;
virtual void Init() {}

virtual void UnInit() {}

/**
* @param: errorlog: if error occurred, the errorlog will take
* the error info and log.
* @return: success return true, else return false;
*/
virtual bool Set(const std::string& key,
const char* value,
const int value_len,
std::string* errorlog) = 0;
virtual bool Set(const std::string &key, const char *value,
const uint64_t value_len, std::string *errorlog) = 0;

/**
* @param: errorlog: if return is false the errorlog
* will take the info.
* @return: success return the value and true.
*/
virtual bool Get(const std::string& key,
std::string* value,
std::string* errorlog) = 0;
virtual bool Get(const std::string &key, char *value, uint64_t offset,
uint64_t length, std::string *errorlog) = 0;
};

} // namespace client
Expand Down
Loading

0 comments on commit ba71cc1

Please sign in to comment.