Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
liuminjian committed Nov 6, 2023
2 parents 45cf27d + 48014b5 commit 5e1d609
Show file tree
Hide file tree
Showing 31 changed files with 730 additions and 292 deletions.
4 changes: 4 additions & 0 deletions conf/mds.conf
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ mds.etcd.retry.times=3
mds.etcd.dlock.timeoutMs=10000
# dlock lease timeout
mds.etcd.dlock.ttlSec=10
# etcd auth options
etcd.auth.enable=false
etcd.auth.username=
etcd.auth.password=

#
# segment分配量统计相关配置
Expand Down
5 changes: 5 additions & 0 deletions conf/snapshot_clone_server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ etcd.retry.times=3
etcd.dlock.timeoutMs=10000
# dlock lease timeout
etcd.dlock.ttlSec=10
# etcd auth options
etcd.auth.enable=false
etcd.auth.username=
etcd.auth.password=


#
# leader选举相关参数
Expand Down
4 changes: 4 additions & 0 deletions curvefs/conf/mds.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ etcd.dailtimeoutMs=5000
etcd.operation.timeoutMs=5000
# number of times a failed operation can be retried
etcd.retry.times=3
# etcd auth options
etcd.auth.enable=false
etcd.auth.username=
etcd.auth.password=

#
# leader election options
Expand Down
3 changes: 2 additions & 1 deletion curvefs/docker/debian11/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
FROM opencurvedocker/curve-base:debian11
COPY libmemcached.so libmemcached.so.11 libhashkit.so.2 /usr/lib/
COPY libmemcached.so libmemcached.so.11 libhashkit.so.2 libetcdclient.so /usr/lib/
COPY curvefs /curvefs
RUN mkdir -p /etc/curvefs /core /etc/curve && chmod a+x /entrypoint.sh \
&& cp /curvefs/tools/sbin/curvefs_tool /usr/bin \
&& cp /curvefs/etcd/sbin/etcdctl /usr/bin/ \
&& cp /curvefs/tools-v2/sbin/curve /usr/bin/
27 changes: 21 additions & 6 deletions curvefs/src/mds/mds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ MDS::MDS()
etcdClient_(),
leaderElection_(),
status_(),
etcdEndpoint_() {}
etcdEndpoint_(),
etcdUsername_(),
etcdPassword_() {}

MDS::~MDS() {}

Expand Down Expand Up @@ -355,28 +357,41 @@ void MDS::InitEtcdClient() {
<< ", etcd address: " << std::string(etcdConf.Endpoints, etcdConf.len)
<< ", etcdtimeout: " << etcdConf.DialTimeout
<< ", operation timeout: " << etcdTimeout
<< ", etcd retrytimes: " << etcdRetryTimes;
<< ", etcd retrytimes: " << etcdRetryTimes
<< ", etcd auth enable: " << etcdConf.authEnable;

LOG_IF(FATAL, !CheckEtcd()) << "Check etcd failed";

LOG(INFO) << "Init etcd client succeeded, etcd address: "
<< std::string(etcdConf.Endpoints, etcdConf.len)
<< ", etcdtimeout: " << etcdConf.DialTimeout
<< ", operation timeout: " << etcdTimeout
<< ", etcd retrytimes: " << etcdRetryTimes;
<< ", etcd retrytimes: " << etcdRetryTimes
<< ", etcd auth enable: " << etcdConf.authEnable;

etcdClientInited_ = true;
}

void MDS::InitEtcdConf(EtcdConf* etcdConf) {
conf_->GetValueFatalIfFail("etcd.endpoint", &etcdEndpoint_);
etcdConf->len = etcdEndpoint_.size();
etcdConf->Endpoints = &etcdEndpoint_[0];
conf_->GetValueFatalIfFail("etcd.dailtimeoutMs", &etcdConf->DialTimeout);
// etcd auth config
bool authEnable = false;
conf_->GetBoolValue("etcd.auth.enable", &authEnable);
etcdConf->authEnable = authEnable ? 1 : 0;
if (authEnable) {
conf_->GetValueFatalIfFail("etcd.auth.username", &etcdUsername_);
etcdConf->username = &etcdUsername_[0];
etcdConf->usernameLen = etcdUsername_.size();
conf_->GetValueFatalIfFail("etcd.auth.password", &etcdPassword_);
etcdConf->password = &etcdPassword_[0];
etcdConf->passwordLen = etcdPassword_.size();
}

LOG(INFO) << "etcd.endpoint: " << etcdEndpoint_;
LOG(INFO) << "etcd.dailtimeoutMs: " << etcdConf->DialTimeout;

etcdConf->len = etcdEndpoint_.size();
etcdConf->Endpoints = &etcdEndpoint_[0];
}

bool MDS::CheckEtcd() {
Expand Down
2 changes: 2 additions & 0 deletions curvefs/src/mds/mds.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ class MDS {
bvar::Status<std::string> status_;

std::string etcdEndpoint_;
std::string etcdUsername_;
std::string etcdPassword_;
};

} // namespace mds
Expand Down
7 changes: 5 additions & 2 deletions curvefs/src/metaserver/metaserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,11 @@ void Metaserver::InitLocalFileSystem() {
void InitS3Option(const std::shared_ptr<Configuration>& conf,
S3ClientAdaptorOption* s3Opt) {
LOG_IF(FATAL, !conf->GetUInt64Value("s3.batchsize", &s3Opt->batchSize));
LOG_IF(FATAL, !conf->GetBoolValue("s3.enableBatchDelete",
&s3Opt->enableBatchDelete));
bool ret =
conf->GetBoolValue("s3.enableBatchDelete", &s3Opt->enableBatchDelete);
LOG_IF(WARNING, ret == false)
<< "config no s3.enableBatchDelete info, using default value "
<< s3Opt->enableBatchDelete;
}

void Metaserver::InitPartitionOption(std::shared_ptr<S3ClientAdaptor> s3Adaptor,
Expand Down
97 changes: 56 additions & 41 deletions curvefs/src/metaserver/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <memory>
#include <string>
#include <utility>
#include <future>

#include "curvefs/proto/metaserver.pb.h"
#include "curvefs/src/metaserver/copyset/copyset_node_manager.h"
Expand Down Expand Up @@ -537,54 +538,68 @@ MetaStatusCode Partition::GetAllBlockGroup(
}

void Partition::StartS3Compact() {
S3CompactManager::GetInstance().Register(
S3Compact{inodeManager_, partitionInfo_});
// register s3 compaction task in a separate thread, since the caller may
// holds a pthread wrlock when calling this function, and create `S3Compact`
// will acquire a bthread rwlock, may cause thread switching, thus causing a
// deadlock.
// FIXME(wuhanqing): handle it in a more elegant way
auto handle = std::async(std::launch::async, [this]() {
S3CompactManager::GetInstance().Register(
S3Compact{inodeManager_, partitionInfo_});
});

handle.wait();
}

void Partition::CancelS3Compact() {
S3CompactManager::GetInstance().Cancel(partitionInfo_.partitionid());
}

void Partition::StartVolumeDeallocate() {
FsInfo fsInfo;
bool ok =
FsInfoManager::GetInstance().GetFsInfo(partitionInfo_.fsid(), &fsInfo);
if (!ok) {
LOG(ERROR)
<< "Partition start volume deallocate fail, get fsinfo fail. fsid="
<< partitionInfo_.fsid();
return;
}

if (!fsInfo.detail().has_volume()) {
LOG(INFO) << "Partition not belong to volume, do not need start "
"deallocate. partitionInfo="
<< partitionInfo_.DebugString();
return;
}

VolumeDeallocateCalOption calOpt;
calOpt.kvStorage = kvStorage_;
calOpt.inodeStorage = inodeStorage_;
calOpt.nameGen = nameGen_;
auto copysetNode =
copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode(
partitionInfo_.poolid(), partitionInfo_.copysetid());
if (copysetNode == nullptr) {
LOG(ERROR) << "Partition get copyset node failed. poolid="
<< partitionInfo_.poolid()
<< ", copysetid=" << partitionInfo_.copysetid();
return;
}

InodeVolumeSpaceDeallocate task(partitionInfo_.fsid(),
partitionInfo_.partitionid(), copysetNode);
task.Init(calOpt);

VolumeDeallocateManager::GetInstance().Register(std::move(task));

VLOG(3) << "Partition start volume deallocate success. partitionInfo="
<< partitionInfo_.DebugString();
// FIXME(wuhanqing): same as `StartS3Compact`
auto handle = std::async(std::launch::async, [this]() {
FsInfo fsInfo;
bool ok = FsInfoManager::GetInstance().GetFsInfo(partitionInfo_.fsid(),
&fsInfo);
if (!ok) {
LOG(ERROR) << "Partition start volume deallocate fail, get fsinfo "
"fail. fsid="
<< partitionInfo_.fsid();
return;
}

if (!fsInfo.detail().has_volume()) {
LOG(INFO) << "Partition not belong to volume, do not need start "
"deallocate. partitionInfo="
<< partitionInfo_.DebugString();
return;
}

VolumeDeallocateCalOption calOpt;
calOpt.kvStorage = kvStorage_;
calOpt.inodeStorage = inodeStorage_;
calOpt.nameGen = nameGen_;
auto copysetNode =
copyset::CopysetNodeManager::GetInstance().GetSharedCopysetNode(
partitionInfo_.poolid(), partitionInfo_.copysetid());
if (copysetNode == nullptr) {
LOG(ERROR) << "Partition get copyset node failed. poolid="
<< partitionInfo_.poolid()
<< ", copysetid=" << partitionInfo_.copysetid();
return;
}

InodeVolumeSpaceDeallocate task(
partitionInfo_.fsid(), partitionInfo_.partitionid(), copysetNode);
task.Init(calOpt);

VolumeDeallocateManager::GetInstance().Register(std::move(task));

VLOG(3) << "Partition start volume deallocate success. partitionInfo="
<< partitionInfo_.DebugString();
});

handle.wait();
}

void Partition::CancelVolumeDeallocate() {
Expand Down
2 changes: 2 additions & 0 deletions docker/debian11/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ RUN cd /curve-tgt/curve-sdk && \
make install-programs && \
rm -rf /curve-tgt
COPY curvebs /curvebs
COPY libetcdclient.so /usr/lib/
RUN mkdir -p /etc/curve /etc/nebd /curve/init.d/ && \
chmod a+x /entrypoint.sh && \
cp /curvebs/nbd/sbin/curve-nbd /usr/bin/ && \
cp /curvebs/tools/sbin/curve_ops_tool /usr/bin/ && \
cp /curvebs/etcd/sbin/etcdctl /usr/bin/ && \
cp /curvebs/tools-v2/sbin/curve /usr/bin/
1 change: 0 additions & 1 deletion src/chunkserver/chunkserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ int ChunkServer::Run(int argc, char** argv) {

// 打印参数
conf.PrintConfig();
conf.ExposeMetric("chunkserver_config");
curve::common::ExposeCurveVersion();

// ============================初始化各模块==========================//
Expand Down
58 changes: 53 additions & 5 deletions src/common/concurrent/rw_lock.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,31 @@
#ifndef SRC_COMMON_CONCURRENT_RW_LOCK_H_
#define SRC_COMMON_CONCURRENT_RW_LOCK_H_

#include <pthread.h>
#include <assert.h>
#include <glog/logging.h>
#include <bthread/bthread.h>
#include <glog/logging.h>
#include <pthread.h>
#include <sys/types.h> // gettid

#include "include/curve_compiler_specific.h"
#include "src/common/uncopyable.h"

// Due to the mixed use of bthread and pthread in some cases, acquiring another
// bthread lock(mutex/rwlock) after acquiring a write lock on a pthread rwlock
// may result in switching the bthread coroutine, and then the operation of
// releasing the previous write lock in the other pthread will not take effect
// (implying that the write lock is still held), thus causing a deadlock.

// Check pthread rwlock tid between wrlock and unlock
#if defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID) && \
(ENABLE_CHECK_PTHREAD_WRLOCK_TID == 1)
#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1
#elif !defined(ENABLE_CHECK_PTHREAD_WRLOCK_TID)
#define CURVE_CHECK_PTHREAD_WRLOCK_TID 1
#else
#define CURVE_CHECK_PTHREAD_WRLOCK_TID 0
#endif

namespace curve {
namespace common {

Expand All @@ -51,10 +69,21 @@ class PthreadRWLockBase : public RWLockBase {
void WRLock() override {
int ret = pthread_rwlock_wrlock(&rwlock_);
CHECK(0 == ret) << "wlock failed: " << ret << ", " << strerror(ret);
#if CURVE_CHECK_PTHREAD_WRLOCK_TID
tid_ = gettid();
#endif
}

int TryWRLock() override {
return pthread_rwlock_trywrlock(&rwlock_);
int ret = pthread_rwlock_trywrlock(&rwlock_);
if (CURVE_UNLIKELY(ret != 0)) {
return ret;
}

#if CURVE_CHECK_PTHREAD_WRLOCK_TID
tid_ = gettid();
#endif
return 0;
}

void RDLock() override {
Expand All @@ -67,6 +96,19 @@ class PthreadRWLockBase : public RWLockBase {
}

void Unlock() override {
#if CURVE_CHECK_PTHREAD_WRLOCK_TID
if (tid_ != 0) {
const pid_t current = gettid();
// If CHECK here is triggered, please look at the comments at the
// beginning of the file.
// In the meantime, the simplest solution might be to use
// `BthreadRWLock` locks everywhere.
CHECK(tid_ == current)
<< ", tid has changed, previous tid: " << tid_
<< ", current tid: " << current;
tid_ = 0;
}
#endif
pthread_rwlock_unlock(&rwlock_);
}

Expand All @@ -76,8 +118,14 @@ class PthreadRWLockBase : public RWLockBase {

pthread_rwlock_t rwlock_;
pthread_rwlockattr_t rwlockAttr_;

#if CURVE_CHECK_PTHREAD_WRLOCK_TID
pid_t tid_ = 0;
#endif
};

#undef CURVE_CHECK_PTHREAD_WRLOCK_TID

class RWLock : public PthreadRWLockBase {
public:
RWLock() {
Expand Down Expand Up @@ -122,7 +170,7 @@ class BthreadRWLock : public RWLockBase {
}

int TryWRLock() override {
// not support yet
LOG(WARNING) << "TryWRLock not support yet";
return EINVAL;
}

Expand All @@ -132,7 +180,7 @@ class BthreadRWLock : public RWLockBase {
}

int TryRDLock() override {
// not support yet
LOG(WARNING) << "TryRDLock not support yet";
return EINVAL;
}

Expand Down
Loading

0 comments on commit 5e1d609

Please sign in to comment.