Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memcache Support #2096

Merged
merged 1 commit into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ only?= "*"
tag?= "curvebs:unknown"
case?= "*"
os?= "debian9"
ci?=0

define help_msg
## list
Expand Down Expand Up @@ -59,7 +60,7 @@ list:
@bash util/build.sh --stor=$(stor) --list

build:
@bash util/build.sh --stor=$(stor) --only=$(only) --dep=$(dep) --release=$(release) --os=$(os)
@bash util/build.sh --stor=${stor} --only=$(only) --dep=$(dep) --release=$(release) --ci=$(ci) --os=$(os)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wthat's the 'ci' option used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image


dep:
@bash util/build.sh --stor=$(stor) --only="" --dep=1
Expand Down
6 changes: 6 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ new_local_repository(
path = "thirdparties/aws/aws-sdk-cpp",
)

new_local_repository(
name = "libmemcached",
build_file = "//:thirdparties/memcache/memcache.BUILD",
path = "thirdparties/memcache/libmemcached-1.1.2",
)


# C++ rules for Bazel.
http_archive(
Expand Down
11 changes: 9 additions & 2 deletions curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mdsOpt.rpcRetryOpt.normalRetryTimesBeforeTriggerWait=3
mdsOpt.rpcRetryOpt.waitSleepMs=1000
mdsOpt.rpcRetryOpt.addrs=127.0.0.1:6700,127.0.0.1:6701,127.0.0.1:6702 # __ANSIBLE_TEMPLATE__ {{ groups.mds | join_peer(hostvars, "mds_listen_port") }} __ANSIBLE_TEMPLATE__


#
# lease options
#
Expand Down Expand Up @@ -86,8 +87,14 @@ fuseClient.dCacheLruSize=1000000
fuseClient.enableICacheMetrics=true
fuseClient.enableDCacheMetrics=true
fuseClient.cto=true
# you should enable it when mount one filesystem to multi mount points,
# it guarantee the consistent of file after rename, otherwise you should

### kvcache opt
fuseClient.supportKVcache=false
fuseClient.setThreadPool=4
fuseClient.getThreadPool=4

# you shoudle enable it when mount one filesystem to multi mountpoints,
# it gurantee the consistent of file after rename, otherwise you should
# disable it for performance.
fuseClient.enableMultiMountPointRename=true
# splice will bring higher performance in some cases
Expand Down
3 changes: 3 additions & 0 deletions curvefs/docker/debian10/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ COPY entrypoint.sh /
COPY curvefs/tools/sbin/curvefs_tool /usr/bin
COPY libaws-cpp-sdk-core.so /usr/lib/
COPY libaws-cpp-sdk-s3-crt.so /usr/lib/
COPY libmemcached.so /usr/lib/
COPY libmemcached.so.11 /usr/lib/
COPY libhashkit.so.2 /usr/lib
RUN chmod a+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
3 changes: 3 additions & 0 deletions curvefs/docker/debian11/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ COPY entrypoint.sh /
COPY curvefs/tools/sbin/curvefs_tool /usr/bin
COPY libaws-cpp-sdk-core.so /usr/lib/
COPY libaws-cpp-sdk-s3-crt.so /usr/lib/
COPY libmemcached.so /usr/lib/
COPY libmemcached.so.11 /usr/lib/
COPY libhashkit.so.2 /usr/lib
RUN chmod a+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
3 changes: 3 additions & 0 deletions curvefs/docker/debian9/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,8 @@ COPY entrypoint.sh /
COPY curvefs/tools/sbin/curvefs_tool /usr/bin
COPY libaws-cpp-sdk-core.so /usr/lib/
COPY libaws-cpp-sdk-s3-crt.so /usr/lib/
COPY libmemcached.so /usr/lib/
COPY libmemcached.so.11 /usr/lib/
COPY libhashkit.so.2 /usr/lib
RUN chmod a+x /entrypoint.sh
ENTRYPOINT ["/entrypoint.sh"]
39 changes: 39 additions & 0 deletions curvefs/proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,41 @@ message ListTopologyResponse {
required ListMetaServerResponse metaservers = 5;
}

message MemcachedServer {
required string ip = 1;
required uint32 port = 2;
}

message RegistMemcacheClusterRequest {
repeated MemcachedServer servers = 1;
}

message RegistMemcacheClusterResponse {
required TopoStatusCode statusCode = 1;
}

message ListMemcacheClusterRequest {
}

message MemcacheCluster {
required uint32 clusterId = 1;
repeated MemcachedServer servers = 2;
}

message ListMemcacheClusterResponse {
required TopoStatusCode statusCode = 1;
repeated MemcacheCluster memClusters = 2;
}

message AllocOrGetMemcacheClusterRequest {
required uint32 fsId = 1;
}

message AllocOrGetMemcacheClusterResponse {
required TopoStatusCode statusCode = 1;
optional MemcacheCluster cluster = 2;
}

service TopologyService {
rpc RegistMetaServer(MetaServerRegistRequest) returns (MetaServerRegistResponse);
rpc ListMetaServer(ListMetaServerRequest) returns (ListMetaServerResponse);
Expand Down Expand Up @@ -493,4 +528,8 @@ service TopologyService {
rpc StatMetadataUsage(StatMetadataUsageRequest) returns (StatMetadataUsageResponse);

rpc ListTopology(ListTopologyRequest) returns (ListTopologyResponse);

rpc RegistMemcacheCluster(RegistMemcacheClusterRequest) returns (RegistMemcacheClusterResponse);
rpc ListMemcacheCluster(ListMemcacheClusterRequest) returns (ListMemcacheClusterResponse);
rpc AllocOrGetMemcacheCluster(AllocOrGetMemcacheClusterRequest) returns (AllocOrGetMemcacheClusterResponse);
}
1 change: 1 addition & 0 deletions curvefs/src/client/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ cc_library(
"//curvefs/src/client/rpcclient",
"//curvefs/src/common:curvefs_common",
"//curvefs/src/client/lease:curvefs_lease",
"//curvefs/src/client/kvclient:memcached_client_lib",
"//external:brpc",
"//external:gflags",
"//external:glog",
Expand Down
13 changes: 13 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ namespace common {
DEFINE_bool(enableCto, true, "acheieve cto consistency");
DEFINE_bool(useFakeS3, false,
"Use fake s3 to inject more metadata for testing metaserver");
DEFINE_bool(supportKVcache, false, "use kvcache to speed up sharing");

void InitMdsOption(Configuration *conf, MdsOption *mdsOpt) {
conf->GetValueFatalIfFail("mdsOpt.mdsMaxRetryMS", &mdsOpt->mdsMaxRetryMS);
Expand Down Expand Up @@ -232,6 +233,16 @@ void InitRefreshDataOpt(Configuration *conf,
&opt->refreshDataIntervalSec);
}

void InitKVClientManagerOpt(Configuration *conf,
KVClientManagerOpt *config) {
conf->GetValueFatalIfFail("fuseClient.supportKVcache",
&FLAGS_supportKVcache);
conf->GetValueFatalIfFail("fuseClient.setThreadPool",
&config->setThreadPooln);
conf->GetValueFatalIfFail("fuseClient.getThreadPool",
&config->getThreadPooln);
}

void SetBrpcOpt(Configuration *conf) {
curve::common::GflagsLoadValueFromConfIfCmdNotSet dummy;
dummy.Load(conf, "defer_close_second", "rpc.defer.close.second",
Expand All @@ -251,6 +262,7 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) {
InitVolumeOption(conf, &clientOption->volumeOpt);
InitLeaseOpt(conf, &clientOption->leaseOpt);
InitRefreshDataOpt(conf, &clientOption->refreshDataOption);
InitKVClientManagerOpt(conf, &clientOption->kvClientManagerOpt);

conf->GetValueFatalIfFail("fuseClient.attrTimeOut",
&clientOption->attrTimeOut);
Expand Down Expand Up @@ -280,6 +292,7 @@ void InitFuseClientOption(Configuration *conf, FuseClientOption *clientOption) {
&clientOption->disableXattr);
conf->GetValueFatalIfFail("fuseClient.cto", &FLAGS_enableCto);


LOG_IF(WARNING, conf->GetBoolValue("fuseClient.enableSplice",
&clientOption->enableFuseSplice))
<< "Not found `fuseClient.enableSplice` in conf, use default value `"
Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ struct SpaceAllocServerOption {
uint64_t rpcTimeoutMs;
};

struct KVClientManagerOpt {
int setThreadPooln = 4;
int getThreadPooln = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getThreadPooln where does this param use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a reserved parameter, libmemcached, the current overall reading process is synchronous. Also waiting for pr #2080

};

struct DiskCacheOption {
DiskCacheType diskCacheType;
// cache disk dir
Expand Down Expand Up @@ -182,6 +187,7 @@ struct FuseClientOption {
VolumeOption volumeOpt;
LeaseOpt leaseOpt;
RefreshDataOption refreshDataOption;
KVClientManagerOpt kvClientManagerOpt;

double attrTimeOut;
double entryTimeOut;
Expand Down
51 changes: 48 additions & 3 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,45 @@
* Author: xuchaojie
*/

#include "curvefs/src/client/fuse_s3_client.h"

#include <memory>
#include <vector>

#include "curvefs/src/client/fuse_s3_client.h"
#include "curvefs/src/client/kvclient/memcache_client.h"

namespace curvefs {
namespace client {
namespace common {

DECLARE_bool(enableCto);
DECLARE_bool(supportKVcache);

} // namespace common
} // namespace client
} // namespace curvefs

namespace curvefs {
namespace client {

using curvefs::client::common::FLAGS_supportKVcache;
using curvefs::client::common::FLAGS_enableCto;
using curvefs::mds::topology::MemcacheCluster;
using curvefs::mds::topology::MemcachedServer;

CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
FuseClientOption opt(option);
// init kvcache
if (FLAGS_supportKVcache && !InitKVCache(option.kvClientManagerOpt)) {
return CURVEFS_ERROR::INTERNAL;
}

CURVEFS_ERROR ret = FuseClient::Init(opt);
if (ret != CURVEFS_ERROR::OK) {
return ret;
}

// set fsS3Option
// set fs S3Option
const auto& s3Info = fsInfo_->detail().s3info();
::curve::common::S3InfoOption fsS3Option;
::curvefs::client::common::S3Info2FsS3Option(s3Info, &fsS3Option);
Expand Down Expand Up @@ -80,6 +96,35 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
return ret;
}


bool FuseS3Client::InitKVCache(const KVClientManagerOpt &opt) {
// get kvcache cluster
MemcacheCluster kvcachecluster;
if (!mdsClient_->AllocOrGetMemcacheCluster(fsInfo_->fsid(),
&kvcachecluster)) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but AllocOrGetMemcacheCluster fail";
return false;
}

// init kvcache client
auto memcacheClient = std::make_shared<MemCachedClient>();
if (!memcacheClient->Init(kvcachecluster)) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but init memcache client fail";
return false;
}

g_kvClientManager = new KVClientManager();
if (!g_kvClientManager->Init(opt, memcacheClient)) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but init kvClientManager fail";
return false;
}

return true;
}

void FuseS3Client::GetWarmUpFileList(const WarmUpFileContext_t&warmUpFile,
std::vector<std::string>& warmUpFilelist) {
struct fuse_file_info fi{};
Expand Down Expand Up @@ -536,7 +581,7 @@ CURVEFS_ERROR FuseS3Client::FuseOpFlush(fuse_req_t req, fuse_ino_t ino,
CURVEFS_ERROR ret = CURVEFS_ERROR::OK;

// if enableCto, flush all write cache both in memory cache and disk cache
if (curvefs::client::common::FLAGS_enableCto) {
if (FLAGS_enableCto) {
ret = s3Adaptor_->FlushAllCache(ino);
if (ret != CURVEFS_ERROR::OK) {
LOG(ERROR) << "FuseOpFlush, flush all cache fail, ret = " << ret
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ class FuseS3Client : public FuseClient {
struct fuse_file_info *fi) override;

private:
bool InitKVCache(const KVClientManagerOpt &opt);

CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override;

void FlushData() override;
Expand Down
42 changes: 22 additions & 20 deletions curvefs/src/client/kvclient/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@
# limitations under the License.
#

# load("//:copts.bzl", "CURVE_DEFAULT_COPTS")
load("//:copts.bzl", "CURVE_DEFAULT_COPTS")



# cc_library(
# name = "memcache_client_lib",
# srcs = glob(["*.cpp"]),
# hdrs = glob(["*.h"]),
# copts = CURVE_DEFAULT_COPTS,
# linkopts = [
# "-lmemcached",
# ],
# visibility = ["//visibility:public"],
# deps = [
# "@com_google_absl//absl/strings",
# "//external:glog",
# "//external:bthread",
# "//src/common:curve_common",
# "//src/common:curve_s3_adapter",
# ],
# )
cc_library(
name = "memcached_client_lib",
srcs = glob(["*.cpp"]),
hdrs = glob(["*.h"]),
copts = CURVE_DEFAULT_COPTS,
linkopts = [
"-L/usr/local/lib",
],
visibility = ["//visibility:public"],
deps = [
"//curvefs/proto:curvefs_topology_cc_proto",
"//curvefs/src/client/common:common",
"//curvefs/src/client/metric:client_metric",
"@com_google_absl//absl/strings",
"//external:glog",
"//external:bthread",
"//src/common:curve_common",
"//src/common:curve_s3_adapter",
"@libmemcached",
],
)
27 changes: 10 additions & 17 deletions curvefs/src/client/kvclient/kvclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,25 @@ namespace client {
* Single client to kv interface.
*/

class KvClient {
class KVClient {
public:
KvClient() = default;
~KvClient() = default;
virtual bool Init() = 0;
KVClient() = default;
~KVClient() = default;

virtual void UnInit() = 0;
virtual void Init() {}

virtual void UnInit() {}

/**
* @param: errorlog: if error occurred, the errorlog will take
* the error info and log.
* @return: success return true, else return false;
*/
virtual bool Set(const std::string& key,
const char* value,
const int value_len,
std::string* errorlog) = 0;
virtual bool Set(const std::string &key, const char *value,
const uint64_t value_len, std::string *errorlog) = 0;

/**
* @param: errorlog: if return is false the errorlog
* will take the info.
* @return: success return the value and true.
*/
virtual bool Get(const std::string& key,
std::string* value,
std::string* errorlog) = 0;
virtual bool Get(const std::string &key, char *value, uint64_t offset,
uint64_t length, std::string *errorlog) = 0;
};

} // namespace client
Expand Down
Loading