From 4c7794150f49979ae673f83db5ba24c2b04b3dca Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 00:45:28 -0700 Subject: [PATCH 1/6] Prevent attribute data corruption --- component/file_cache/file_cache.go | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 35e1aa3be..253332b77 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -528,8 +528,13 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O info, err := dirent.Info() if err == nil { - attr.Mtime = info.ModTime() - attr.Size = info.Size() + // attr is a pointer returned by NextComponent + // modifying attr could corrupt cached directory listings + // to update properties, we need to make a deep copy first + newAttr := *attr + newAttr.Mtime = info.ModTime() + newAttr.Size = info.Size() + attrs[i] = &newAttr } } i++ @@ -1320,8 +1325,13 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // If file is under download then taking size or mod time from it will be incorrect. if !fc.fileLocks.Locked(options.Name) { log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) - attrs.Size = info.Size() - attrs.Mtime = info.ModTime() + // attrs is a pointer returned by NextComponent + // modifying attrs could corrupt cached directory listings + // to update properties, we need to make a deep copy first + newAttr := *attrs + newAttr.Mtime = info.ModTime() + newAttr.Size = info.Size() + attrs = &newAttr } else { log.Debug("FileCache::GetAttr : %s is locked, use storage attributes", options.Name) } From 60321b57b74349fa2f8385ccef03d5a489e2068e Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 01:32:20 -0700 Subject: [PATCH 2/6] Fix remaining race conditions detected in file cache --- component/file_cache/file_cache.go | 4 ++++ component/file_cache/lru_policy.go | 36 +++++++++++++++++++++--------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 253332b77..ef797b8fd 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -1063,7 +1063,9 @@ func (fc *FileCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, er // Read and write operations are very frequent so updating cache policy for every read is a costly operation // Update cache policy every 1K operations (includes both read and write) instead + options.Handle.Lock() options.Handle.OptCnt++ + options.Handle.Unlock() if (options.Handle.OptCnt % defaultCacheUpdateCount) == 0 { localPath := filepath.Join(fc.tmpPath, options.Handle.Path) fc.policy.CacheValid(localPath) @@ -1110,7 +1112,9 @@ func (fc *FileCache) WriteFile(options internal.WriteFileOptions) (int, error) { // Read and write operations are very frequent so updating cache policy for every read is a costly operation // Update cache policy every 1K operations (includes both read and write) instead + options.Handle.Lock() options.Handle.OptCnt++ + options.Handle.Unlock() if (options.Handle.OptCnt % defaultCacheUpdateCount) == 0 { localPath := filepath.Join(fc.tmpPath, options.Handle.Path) fc.policy.CacheValid(localPath) diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index a30cc79db..56d5de7a8 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -36,6 +36,7 @@ import ( ) type lruNode struct { + sync.RWMutex next *lruNode prev *lruNode usage int @@ -176,6 +177,8 @@ func (p *lruPolicy) IsCached(name string) bool { val, found := p.nodeMap.Load(name) if found { node := val.(*lruNode) + node.RLock() + defer node.RUnlock() log.Debug("lruPolicy::IsCached : %s, deleted:%t", name, node.deleted) if !node.deleted { return true @@ -221,18 +224,21 @@ func (p *lruPolicy) cacheValidate(name string) { }) node := val.(*lruNode) + // protect node data + node.Lock() + node.deleted = false + node.usage++ + node.Unlock() + + // protect the LRU p.Lock() defer p.Unlock() - node.deleted = false - // put node at head of linked list if node == p.head { return } p.moveToHead(node) - - node.usage++ } // For all other timer based activities we check the stuff here @@ -295,7 +301,9 @@ func (p *lruPolicy) removeNode(name string) { defer p.Unlock() node = val.(*lruNode) + node.Lock() node.deleted = true + node.Unlock() if node == p.head { p.head = node.next @@ -368,7 +376,9 @@ func (p *lruPolicy) deleteExpiredNodes() { for ; node != nil && count < p.maxEviction; node = node.next { delItems = append(delItems, node) + node.Lock() node.deleted = true + node.Unlock() count++ } @@ -385,7 +395,10 @@ func (p *lruPolicy) deleteExpiredNodes() { log.Debug("lruPolicy::deleteExpiredNodes : List generated %d items", count) for _, item := range delItems { - if item.deleted { + item.RLock() + restored := !item.deleted + item.RUnlock() + if !restored { p.removeNode(item.name) p.deleteItem(item.name) } @@ -408,15 +421,16 @@ func (p *lruPolicy) deleteItem(name string) { } flock := p.fileLocks.Get(azPath) - if p.fileLocks.Locked(azPath) { - log.Warn("lruPolicy::DeleteItem : File in under download %s", azPath) - p.CacheValid(name) - return - } - flock.Lock() defer flock.Unlock() + // check if the file has been marked valid again after removeNode was called + _, found := p.nodeMap.Load(name) + if found { + log.Warn("lruPolicy::DeleteItem : File marked valid %s", azPath) + return + } + // Check if there are any open handles to this file or not if flock.Count() > 0 { log.Warn("lruPolicy::DeleteItem : File in use %s", name) From ff1536612e561c9492a3e0bee2392c36a2263b60 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 02:04:12 -0700 Subject: [PATCH 3/6] Lock file to protect calls to stat. --- component/file_cache/file_cache.go | 55 +++++++++++++++--------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index ef797b8fd..4be55cc76 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -524,9 +524,12 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O j++ } else { // Case 3: Item is in both local cache and cloud - if !attr.IsDir() && !fc.fileLocks.Locked(attr.Path) { - info, err := dirent.Info() - + if !attr.IsDir() { + flock := fc.fileLocks.Get(attr.Path) + flock.Lock() + // use os.Stat instead of entry.Info() to be sure we get good info (with flock locked) + info, err := os.Stat(filepath.Join(localPath, dirent.Name())) // Grab local cache attributes + flock.Unlock() if err == nil { // attr is a pointer returned by NextComponent // modifying attr could corrupt cached directory listings @@ -546,14 +549,19 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O if token == "" { for _, entry := range dirents { entryPath := common.JoinUnixFilepath(options.Name, entry.Name()) - if !entry.IsDir() && !fc.fileLocks.Locked(entryPath) { + if !entry.IsDir() { // This is an overhead for streamdir for now // As list is paginated we have no way to know whether this particular item exists both in local cache // and container or not. So we rely on getAttr to tell if entry was cached then it exists in cloud storage too // If entry does not exists on storage then only return a local item here. _, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: entryPath}) if err != nil && (err == syscall.ENOENT || os.IsNotExist(err)) { - info, err := entry.Info() // Grab local cache attributes + // get the lock on the file, to allow any pending operation to complete + flock := fc.fileLocks.Get(entryPath) + flock.Lock() + // use os.Stat instead of entry.Info() to be sure we get good info (with flock locked) + info, err := os.Stat(filepath.Join(localPath, entry.Name())) // Grab local cache attributes + flock.Unlock() // If local file is not locked then only use its attributes otherwise rely on container attributes if err == nil { // Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes @@ -1321,33 +1329,26 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // To cover cases 2 and 3, grab the attributes from the local cache localPath := filepath.Join(fc.tmpPath, options.Name) + flock := fc.fileLocks.Get(options.Name) + // TODO: should we use a RWMutex and use RLock for stat calls? + flock.Lock() info, err := os.Stat(localPath) + flock.Unlock() // All directory operations are guaranteed to be synced with storage so they cannot be in a case 2 or 3 state. if err == nil && !info.IsDir() { if exists { // Case 3 (file in cloud storage and in local cache) so update the relevant attributes - // Return from local cache only if file is not under download or deletion - // If file is under download then taking size or mod time from it will be incorrect. - if !fc.fileLocks.Locked(options.Name) { - log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) - // attrs is a pointer returned by NextComponent - // modifying attrs could corrupt cached directory listings - // to update properties, we need to make a deep copy first - newAttr := *attrs - newAttr.Mtime = info.ModTime() - newAttr.Size = info.Size() - attrs = &newAttr - } else { - log.Debug("FileCache::GetAttr : %s is locked, use storage attributes", options.Name) - } + log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) + // attrs is a pointer returned by NextComponent + // modifying attrs could corrupt cached directory listings + // to update properties, we need to make a deep copy first + newAttr := *attrs + newAttr.Mtime = info.ModTime() + newAttr.Size = info.Size() + attrs = &newAttr } else { // Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes - if !strings.Contains(localPath, fc.tmpPath) { - // Here if the path is going out of the temp directory then return ENOENT - exists = false - } else { - log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name) - exists = true - attrs = newObjAttr(options.Name, info) - } + log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name) + exists = true + attrs = newObjAttr(options.Name, info) } } From bcb288d3a954339d5104a273f79f8c1cd1e4fd32 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 09:38:59 -0700 Subject: [PATCH 4/6] Reinstate test which was flaky due to a race condition --- component/file_cache/file_cache_test.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index d96d0fd70..4571ca64f 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -1211,12 +1211,7 @@ func (suite *fileCacheTestSuite) TestGetAttrCase3() { suite.assert.NoError(err) suite.assert.NotNil(attr) suite.assert.EqualValues(file, attr.Path) - // this check is flaky in our CI pipeline on Linux, so skip it - if runtime.GOOS != "windows" { - fmt.Println("Skipping TestGetAttrCase3 attr.Size check on Linux because it's flaky.") - } else { - suite.assert.EqualValues(1024, attr.Size) - } + suite.assert.EqualValues(1024, attr.Size) } func (suite *fileCacheTestSuite) TestGetAttrCase4() { From 2ea9b61a43fe047223410d1d5cc7a3eae83c5bd3 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Wed, 27 Nov 2024 10:23:15 -0700 Subject: [PATCH 5/6] Remove protection from stat call, which is not within the scope of this PR. --- component/file_cache/file_cache.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 4be55cc76..08b05ef44 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -1329,11 +1329,7 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // To cover cases 2 and 3, grab the attributes from the local cache localPath := filepath.Join(fc.tmpPath, options.Name) - flock := fc.fileLocks.Get(options.Name) - // TODO: should we use a RWMutex and use RLock for stat calls? - flock.Lock() info, err := os.Stat(localPath) - flock.Unlock() // All directory operations are guaranteed to be synced with storage so they cannot be in a case 2 or 3 state. if err == nil && !info.IsDir() { if exists { // Case 3 (file in cloud storage and in local cache) so update the relevant attributes From 87524e47529c0b68d83e879e17fd59b4e6851f27 Mon Sep 17 00:00:00 2001 From: Michael Habinsky Date: Mon, 2 Dec 2024 16:54:10 -0700 Subject: [PATCH 6/6] Protect os.Stat in GetAttr to replace .Locked() --- component/file_cache/file_cache.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 08b05ef44..3256b5b8a 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -1329,7 +1329,13 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // To cover cases 2 and 3, grab the attributes from the local cache localPath := filepath.Join(fc.tmpPath, options.Name) + // If the file is being downloaded or deleted, the size and mod time will be incorrect + // wait for download or deletion to complete before getting local file info + flock := fc.fileLocks.Get(options.Name) + flock.Lock() + // TODO: Do we need to call NextComponent().GetAttr in this same critical section to avoid a data race with the cloud? info, err := os.Stat(localPath) + flock.Unlock() // All directory operations are guaranteed to be synced with storage so they cannot be in a case 2 or 3 state. if err == nil && !info.IsDir() { if exists { // Case 3 (file in cloud storage and in local cache) so update the relevant attributes