Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shiyaoliang qos for fuse #2069

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ CURVEFS_ERROR FuseS3Client::Init(const FuseClientOption &option) {
bgFetchStop_.store(false, std::memory_order_release);
bgFetchThread_ = Thread(&FuseS3Client::BackGroundFetch, this);
GetTaskFetchPool();
InitQosParam();
return ret;
}

Expand Down Expand Up @@ -131,6 +132,17 @@ void FuseS3Client::BackGroundFetch() {
return;
}

void FuseS3Client::InitQosParam() {
ReadWriteThrottleParams params;
params.iopsWrite = ThrottleParams(FLAGS_avgFlushIops, 0, 0);
params.bpsWrite = ThrottleParams(FLAGS_avgFlushBytes, FLAGS_burstFlushBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

configurable is better

FLAGS_burstSecs);
params.iopsRead = ThrottleParams(FLAGS_avgReadFileIops, 0, 0);
params.bpsRead = ThrottleParams(FLAGS_avgReadFileBytes, 0, 0);

fuseS3Throttle_.UpdateThrottleParams(params);
}

void FuseS3Client::fetchDataEnqueue(fuse_ino_t ino) {
VLOG(9) << "fetchDataEnqueue start: " << ino;
auto task = [this, ino]() {
Expand Down Expand Up @@ -351,6 +363,8 @@ CURVEFS_ERROR FuseS3Client::FuseOpWrite(fuse_req_t req, fuse_ino_t ino,
is_aligned(size, DirectIOAlignment)))
return CURVEFS_ERROR::INVALIDPARAM;
}

fuseS3Throttle_.Add(false, length);
uint64_t start = butil::cpuwide_time_us();
int wRet = s3Adaptor_->Write(ino, off, size, buf);
if (wRet < 0) {
Expand Down Expand Up @@ -440,6 +454,7 @@ CURVEFS_ERROR FuseS3Client::FuseOpRead(fuse_req_t req, fuse_ino_t ino,
len = size;
}

fuseS3Throttle_.Add(true, length);
// Read do not change inode. so we do not get lock here.
int rRet = s3Adaptor_->Read(ino, off, len, buffer);
if (rRet < 0) {
Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/client/fuse_s3_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@
#include "curvefs/src/client/fuse_client.h"
#include "curvefs/src/client/s3/client_s3_cache_manager.h"
#include "src/common/s3_adapter.h"
#include "src/common/throttle.h"

namespace curvefs {
namespace client {

using curve::common::GetObjectAsyncContext;
using curve::common::GetObjectAsyncCallBack;
using curve::common::ReadWriteThrottleParams;
using curve::common::Throttle;
using curve::common::ThrottleParams;

class FuseS3Client : public FuseClient {
public:
Expand Down Expand Up @@ -114,13 +118,15 @@ class FuseS3Client : public FuseClient {
void WarmUpAllObjs(const std::list<
std::pair<std::string, uint64_t>> &prefetchObjs);

void InitQosParam();
private:
// s3 adaptor
std::shared_ptr<S3ClientAdaptor> s3Adaptor_;

Thread bgFetchThread_;
std::atomic<bool> bgFetchStop_;
std::mutex fetchMtx_;
Throttle fuseS3Throttle_;
};


Expand Down
14 changes: 14 additions & 0 deletions curvefs/src/client/fuse_volume_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ CURVEFS_ERROR FuseVolumeClient::Init(const FuseClientOption &option) {
return CURVEFS_ERROR::INTERNAL;
}

InitQosParam();
return ret;
}

Expand All @@ -76,6 +77,17 @@ void FuseVolumeClient::UnInit() {
FuseClient::UnInit();
}

void FuseVolumeClient::InitQosParam() {
ReadWriteThrottleParams params;
params.iopsWrite = ThrottleParams(FLAGS_avgFlushIops, 0, 0);
params.bpsWrite = ThrottleParams(FLAGS_avgFlushBytes, FLAGS_burstFlushBytes,
FLAGS_burstSecs);
params.iopsRead = ThrottleParams(FLAGS_avgReadFileIops, 0, 0);
params.bpsRead = ThrottleParams(FLAGS_avgReadFileBytes, 0, 0);

fuseVolumeThrottle_.UpdateThrottleParams(params);
}

CURVEFS_ERROR FuseVolumeClient::FuseOpInit(void *userdata,
struct fuse_conn_info *conn) {
auto ret = FuseClient::FuseOpInit(userdata, conn);
Expand Down Expand Up @@ -145,6 +157,7 @@ CURVEFS_ERROR FuseVolumeClient::FuseOpWrite(fuse_req_t req,
}
}

fuseVolumeThrottle_.Add(false, length);
butil::Timer timer;
timer.start();

Expand Down Expand Up @@ -202,6 +215,7 @@ CURVEFS_ERROR FuseVolumeClient::FuseOpRead(fuse_req_t req,
}
}

fuseVolumeThrottle_.Add(true, length);
butil::Timer timer;
timer.start();

Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/client/fuse_volume_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "curvefs/src/client/volume/volume_storage.h"
#include "curvefs/src/volume/block_device_client.h"
#include "curvefs/src/volume/space_manager.h"
#include "src/common/throttle.h"

namespace curvefs {
namespace client {
Expand All @@ -38,6 +39,9 @@ using common::VolumeOption;
using ::curvefs::volume::BlockDeviceClient;
using ::curvefs::volume::BlockDeviceClientImpl;
using ::curvefs::volume::SpaceManager;
using curve::common::ReadWriteThrottleParams;
using curve::common::Throttle;
using curve::common::ThrottleParams;

class FuseVolumeClient : public FuseClient {
public:
Expand Down Expand Up @@ -100,13 +104,15 @@ class FuseVolumeClient : public FuseClient {
CURVEFS_ERROR Truncate(InodeWrapper *inode, uint64_t length) override;

void FlushData() override;
void InitQosParam();

private:
std::shared_ptr<BlockDeviceClient> blockDeviceClient_;
std::unique_ptr<SpaceManager> spaceManager_;
std::unique_ptr<VolumeStorage> storage_;

VolumeOption volOpts_;
Throttle fuseVolumeThrottle_;
};

} // namespace client
Expand Down