Skip to content

Commit

Permalink
curvefs/client: fix solve concurrency problems of FileCacheManager::F…
Browse files Browse the repository at this point in the history
…lush

NOTE:
After cond.Signal() is executed, FileCacheManager::Flush can exit, FileCacheManager may be erased,
and the callback function needs to execute the destructor of writeLockGuard, resulting in process core dump
  • Loading branch information
ilixiaocui committed May 25, 2022
1 parent f79f1d7 commit d0e9dfb
Showing 1 changed file with 37 additions and 49 deletions.
86 changes: 37 additions & 49 deletions curvefs/src/client/s3/client_s3_cache_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -984,74 +984,62 @@ CURVEFS_ERROR FileCacheManager::Flush(bool force, bool toS3) {
curve::common::CountDownEvent cond(1);
FlushChunkCacheCallBack cb =
[&](const std::shared_ptr<FlushChunkCacheContext> &context) {
WriteLockGuard writeLockGuard(rwLock_);
if (ret != CURVEFS_ERROR::OK) {
return;
}
ret = context->retCode;
if (context->retCode != CURVEFS_ERROR::OK) {
LOG(ERROR) << "fileCacheManager Flush error, ret:" << ret
<< ", inode: " << context->inode
<< ", chunkIndex: "
<< context->chunkCacheManptr->GetIndex();
<< ", inode: " << context->inode << ", chunkIndex: "
<< context->chunkCacheManptr->GetIndex();
cond.Signal();
return;
}

{
auto iter1 = chunkCacheMap_.find(
context->chunkCacheManptr->GetIndex());
if (iter1 == chunkCacheMap_.end()) {
VLOG(9) << "Flush, chunk cache for inodeid: " << inode_
<< " is removed";
if (pendingReq.fetch_sub(1,
std::memory_order_seq_cst) == 1) {
VLOG(9) << "pendingReq is over";
cond.Signal();
}
return;
}
VLOG(6) << "ChunkCacheManagerPtr count:"
WriteLockGuard writeLockGuard(rwLock_);
auto iter1 =
chunkCacheMap_.find(context->chunkCacheManptr->GetIndex());
if (iter1 != chunkCacheMap_.end() && iter1->second->IsEmpty() &&
(iter1->second.use_count() <= 3)) {
// tmp、chunkCacheMap_ and context->chunkCacheManptr has
// this ChunkCacheManagerPtr, so count is 3 if count more
// than 3, this mean someone thread has this
// ChunkCacheManagerPtr
VLOG(6)
<< "ChunkCacheManagerPtr count:"
<< context->chunkCacheManptr.use_count()
<< ",inode:" << inode_
<< ", index:" << context->chunkCacheManptr->GetIndex();
// tmp、chunkCacheMap_ and context->chunkCacheManptr has this
// ChunkCacheManagerPtr, so count is 3 if count more than 3,
// this mean someone thread has this ChunkCacheManagerPtr
if (iter1->second->IsEmpty() &&
(iter1->second.use_count() <= 3)) {
VLOG(9) << "erase iter: " << iter1->first
<< ", inode: " << context->inode;
<< ", index:" << context->chunkCacheManptr->GetIndex()
<< "erase iter: " << iter1->first;
chunkCacheMap_.erase(iter1);
g_s3MultiManagerMetric->chunkManagerNum << -1;
}
}

if (pendingReq.fetch_sub(1, std::memory_order_seq_cst) == 1) {
VLOG(9) << "pendingReq is over";
cond.Signal();
}
};
std::vector<std::shared_ptr<FlushChunkCacheContext>> flushTasks;
auto iter = tmp.begin();
VLOG(6) << "flush size is: " << tmp.size() << ",inodeId:" << inode_;
for (; iter != tmp.end(); iter++) {
auto context = std::make_shared<FlushChunkCacheContext>();
context->inode = inode_;
context->cb = cb;
context->force = force;
context->chunkCacheManptr = iter->second;
flushTasks.emplace_back(context);
}
pendingReq.fetch_add(flushTasks.size(), std::memory_order_seq_cst);
if (pendingReq.load(std::memory_order_seq_cst)) {
VLOG(6) << "wait for pendingReq";
for (auto iter = flushTasks.begin();
iter != flushTasks.end(); ++iter) {
s3ClientAdaptor_->Enqueue(*iter);
}
cond.Wait();
std::vector<std::shared_ptr<FlushChunkCacheContext>> flushTasks;
auto iter = tmp.begin();
VLOG(6) << "flush size is: " << tmp.size() << ",inodeId:" << inode_;
for (; iter != tmp.end(); iter++) {
auto context = std::make_shared<FlushChunkCacheContext>();
context->inode = inode_;
context->cb = cb;
context->force = force;
context->chunkCacheManptr = iter->second;
flushTasks.emplace_back(context);
}
pendingReq.fetch_add(flushTasks.size(), std::memory_order_seq_cst);
if (pendingReq.load(std::memory_order_seq_cst)) {
VLOG(6) << "wait for pendingReq";
for (auto iter = flushTasks.begin(); iter != flushTasks.end(); ++iter) {
s3ClientAdaptor_->Enqueue(*iter);
}
VLOG(6) << "file cache flush over";
return ret;
cond.Wait();
}
VLOG(6) << "file cache flush over";
return ret;
}

void ChunkCacheManager::ReadChunk(uint64_t index, uint64_t chunkPos,
Expand Down

0 comments on commit d0e9dfb

Please sign in to comment.