From d0e9dfb6fb1d7af1fdef8b8c0fc123493fd279aa Mon Sep 17 00:00:00 2001 From: lixiaocui1 Date: Wed, 25 May 2022 09:47:39 +0800 Subject: [PATCH] curvefs/client: fix solve concurrency problems of FileCacheManager::Flush NOTE: After cond.Signal() is executed, FileCacheManager::Flush can exit, FileCacheManager may be erased, and the callback function needs to execute the destructor of writeLockGuard, resulting in process core dump --- .../src/client/s3/client_s3_cache_manager.cpp | 86 ++++++++----------- 1 file changed, 37 insertions(+), 49 deletions(-) diff --git a/curvefs/src/client/s3/client_s3_cache_manager.cpp b/curvefs/src/client/s3/client_s3_cache_manager.cpp index 776be770df..8d858b1a59 100644 --- a/curvefs/src/client/s3/client_s3_cache_manager.cpp +++ b/curvefs/src/client/s3/client_s3_cache_manager.cpp @@ -984,74 +984,62 @@ CURVEFS_ERROR FileCacheManager::Flush(bool force, bool toS3) { curve::common::CountDownEvent cond(1); FlushChunkCacheCallBack cb = [&](const std::shared_ptr &context) { - WriteLockGuard writeLockGuard(rwLock_); - if (ret != CURVEFS_ERROR::OK) { - return; - } ret = context->retCode; if (context->retCode != CURVEFS_ERROR::OK) { LOG(ERROR) << "fileCacheManager Flush error, ret:" << ret - << ", inode: " << context->inode - << ", chunkIndex: " - << context->chunkCacheManptr->GetIndex(); + << ", inode: " << context->inode << ", chunkIndex: " + << context->chunkCacheManptr->GetIndex(); cond.Signal(); return; } + { - auto iter1 = chunkCacheMap_.find( - context->chunkCacheManptr->GetIndex()); - if (iter1 == chunkCacheMap_.end()) { - VLOG(9) << "Flush, chunk cache for inodeid: " << inode_ - << " is removed"; - if (pendingReq.fetch_sub(1, - std::memory_order_seq_cst) == 1) { - VLOG(9) << "pendingReq is over"; - cond.Signal(); - } - return; - } - VLOG(6) << "ChunkCacheManagerPtr count:" + WriteLockGuard writeLockGuard(rwLock_); + auto iter1 = + chunkCacheMap_.find(context->chunkCacheManptr->GetIndex()); + if (iter1 != chunkCacheMap_.end() && iter1->second->IsEmpty() && + (iter1->second.use_count() <= 3)) { + // tmp态chunkCacheMap_ and context->chunkCacheManptr has + // this ChunkCacheManagerPtr, so count is 3 if count more + // than 3, this mean someone thread has this + // ChunkCacheManagerPtr + VLOG(6) + << "ChunkCacheManagerPtr count:" << context->chunkCacheManptr.use_count() << ",inode:" << inode_ - << ", index:" << context->chunkCacheManptr->GetIndex(); - // tmp态chunkCacheMap_ and context->chunkCacheManptr has this - // ChunkCacheManagerPtr, so count is 3 if count more than 3, - // this mean someone thread has this ChunkCacheManagerPtr - if (iter1->second->IsEmpty() && - (iter1->second.use_count() <= 3)) { - VLOG(9) << "erase iter: " << iter1->first - << ", inode: " << context->inode; + << ", index:" << context->chunkCacheManptr->GetIndex() + << "erase iter: " << iter1->first; chunkCacheMap_.erase(iter1); g_s3MultiManagerMetric->chunkManagerNum << -1; } } + if (pendingReq.fetch_sub(1, std::memory_order_seq_cst) == 1) { VLOG(9) << "pendingReq is over"; cond.Signal(); } }; - std::vector> flushTasks; - auto iter = tmp.begin(); - VLOG(6) << "flush size is: " << tmp.size() << ",inodeId:" << inode_; - for (; iter != tmp.end(); iter++) { - auto context = std::make_shared(); - context->inode = inode_; - context->cb = cb; - context->force = force; - context->chunkCacheManptr = iter->second; - flushTasks.emplace_back(context); - } - pendingReq.fetch_add(flushTasks.size(), std::memory_order_seq_cst); - if (pendingReq.load(std::memory_order_seq_cst)) { - VLOG(6) << "wait for pendingReq"; - for (auto iter = flushTasks.begin(); - iter != flushTasks.end(); ++iter) { - s3ClientAdaptor_->Enqueue(*iter); - } - cond.Wait(); + std::vector> flushTasks; + auto iter = tmp.begin(); + VLOG(6) << "flush size is: " << tmp.size() << ",inodeId:" << inode_; + for (; iter != tmp.end(); iter++) { + auto context = std::make_shared(); + context->inode = inode_; + context->cb = cb; + context->force = force; + context->chunkCacheManptr = iter->second; + flushTasks.emplace_back(context); + } + pendingReq.fetch_add(flushTasks.size(), std::memory_order_seq_cst); + if (pendingReq.load(std::memory_order_seq_cst)) { + VLOG(6) << "wait for pendingReq"; + for (auto iter = flushTasks.begin(); iter != flushTasks.end(); ++iter) { + s3ClientAdaptor_->Enqueue(*iter); } - VLOG(6) << "file cache flush over"; - return ret; + cond.Wait(); + } + VLOG(6) << "file cache flush over"; + return ret; } void ChunkCacheManager::ReadChunk(uint64_t index, uint64_t chunkPos,