diff --git a/component/file_cache/file_cache.go b/component/file_cache/file_cache.go index 4846cc96a..9884a97d1 100644 --- a/component/file_cache/file_cache.go +++ b/component/file_cache/file_cache.go @@ -525,12 +525,20 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O j++ } else { // Case 3: Item is in both local cache and cloud - if !attr.IsDir() && !fc.fileLocks.Locked(attr.Path) { - info, err := dirent.Info() - + if !attr.IsDir() { + flock := fc.fileLocks.Get(attr.Path) + flock.Lock() + // use os.Stat instead of entry.Info() to be sure we get good info (with flock locked) + info, err := os.Stat(filepath.Join(localPath, dirent.Name())) // Grab local cache attributes + flock.Unlock() if err == nil { - attr.Mtime = info.ModTime() - attr.Size = info.Size() + // attr is a pointer returned by NextComponent + // modifying attr could corrupt cached directory listings + // to update properties, we need to make a deep copy first + newAttr := *attr + newAttr.Mtime = info.ModTime() + newAttr.Size = info.Size() + attrs[i] = &newAttr } } i++ @@ -542,14 +550,19 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O if token == "" { for _, entry := range dirents { entryPath := common.JoinUnixFilepath(options.Name, entry.Name()) - if !entry.IsDir() && !fc.fileLocks.Locked(entryPath) { + if !entry.IsDir() { // This is an overhead for streamdir for now // As list is paginated we have no way to know whether this particular item exists both in local cache // and container or not. So we rely on getAttr to tell if entry was cached then it exists in cloud storage too // If entry does not exists on storage then only return a local item here. _, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: entryPath}) if err != nil && (err == syscall.ENOENT || os.IsNotExist(err)) { - info, err := entry.Info() // Grab local cache attributes + // get the lock on the file, to allow any pending operation to complete + flock := fc.fileLocks.Get(entryPath) + flock.Lock() + // use os.Stat instead of entry.Info() to be sure we get good info (with flock locked) + info, err := os.Stat(filepath.Join(localPath, entry.Name())) // Grab local cache attributes + flock.Unlock() // If local file is not locked then only use its attributes otherwise rely on container attributes if err == nil { // Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes @@ -1059,7 +1072,9 @@ func (fc *FileCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, er // Read and write operations are very frequent so updating cache policy for every read is a costly operation // Update cache policy every 1K operations (includes both read and write) instead + options.Handle.Lock() options.Handle.OptCnt++ + options.Handle.Unlock() if (options.Handle.OptCnt % defaultCacheUpdateCount) == 0 { localPath := filepath.Join(fc.tmpPath, options.Handle.Path) fc.policy.CacheValid(localPath) @@ -1106,7 +1121,9 @@ func (fc *FileCache) WriteFile(options internal.WriteFileOptions) (int, error) { // Read and write operations are very frequent so updating cache policy for every read is a costly operation // Update cache policy every 1K operations (includes both read and write) instead + options.Handle.Lock() options.Handle.OptCnt++ + options.Handle.Unlock() if (options.Handle.OptCnt % defaultCacheUpdateCount) == 0 { localPath := filepath.Join(fc.tmpPath, options.Handle.Path) fc.policy.CacheValid(localPath) @@ -1313,28 +1330,28 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr // To cover cases 2 and 3, grab the attributes from the local cache localPath := filepath.Join(fc.tmpPath, options.Name) + // If the file is being downloaded or deleted, the size and mod time will be incorrect + // wait for download or deletion to complete before getting local file info + flock := fc.fileLocks.Get(options.Name) + flock.Lock() + // TODO: Do we need to call NextComponent().GetAttr in this same critical section to avoid a data race with the cloud? info, err := os.Stat(localPath) + flock.Unlock() // All directory operations are guaranteed to be synced with storage so they cannot be in a case 2 or 3 state. if err == nil && !info.IsDir() { if exists { // Case 3 (file in cloud storage and in local cache) so update the relevant attributes - // Return from local cache only if file is not under download or deletion - // If file is under download then taking size or mod time from it will be incorrect. - if !fc.fileLocks.Locked(options.Name) { - log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) - attrs.Size = info.Size() - attrs.Mtime = info.ModTime() - } else { - log.Debug("FileCache::GetAttr : %s is locked, use storage attributes", options.Name) - } + log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name) + // attrs is a pointer returned by NextComponent + // modifying attrs could corrupt cached directory listings + // to update properties, we need to make a deep copy first + newAttr := *attrs + newAttr.Mtime = info.ModTime() + newAttr.Size = info.Size() + attrs = &newAttr } else { // Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes - if !strings.Contains(localPath, fc.tmpPath) { - // Here if the path is going out of the temp directory then return ENOENT - exists = false - } else { - log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name) - exists = true - attrs = newObjAttr(options.Name, info) - } + log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name) + exists = true + attrs = newObjAttr(options.Name, info) } } diff --git a/component/file_cache/file_cache_test.go b/component/file_cache/file_cache_test.go index da56a6a27..3e033eda7 100644 --- a/component/file_cache/file_cache_test.go +++ b/component/file_cache/file_cache_test.go @@ -1209,12 +1209,7 @@ func (suite *fileCacheTestSuite) TestGetAttrCase3() { suite.assert.NoError(err) suite.assert.NotNil(attr) suite.assert.EqualValues(file, attr.Path) - // this check is flaky in our CI pipeline on Linux, so skip it - if runtime.GOOS != "windows" { - fmt.Println("Skipping TestGetAttrCase3 attr.Size check on Linux because it's flaky.") - } else { - suite.assert.EqualValues(1024, attr.Size) - } + suite.assert.EqualValues(1024, attr.Size) } func (suite *fileCacheTestSuite) TestGetAttrCase4() { diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index 6c0782775..8fbc984c7 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -36,6 +36,7 @@ import ( ) type lruNode struct { + sync.RWMutex next *lruNode prev *lruNode usage int @@ -174,6 +175,8 @@ func (p *lruPolicy) IsCached(name string) bool { val, found := p.nodeMap.Load(name) if found { node := val.(*lruNode) + node.RLock() + defer node.RUnlock() log.Debug("lruPolicy::IsCached : %s, deleted:%t", name, node.deleted) if !node.deleted { return true @@ -219,19 +222,22 @@ func (p *lruPolicy) cacheValidate(name string) { }) node := val.(*lruNode) + // protect node data + node.Lock() + node.deleted = false + node.usage++ + node.Unlock() + + // protect the LRU p.Lock() defer p.Unlock() - node.deleted = false - // put node at head of linked list if node == p.head { return } p.extractNode(node) p.setHead(node) - - node.usage++ } // For all other timer based activities we check the stuff here @@ -294,7 +300,9 @@ func (p *lruPolicy) removeNode(name string) { defer p.Unlock() node = val.(*lruNode) + node.Lock() node.deleted = true + node.Unlock() p.extractNode(node) } @@ -360,7 +368,9 @@ func (p *lruPolicy) deleteExpiredNodes() { for ; node != nil && count < p.maxEviction; node = node.next { delItems = append(delItems, node) + node.Lock() node.deleted = true + node.Unlock() count++ } @@ -377,7 +387,10 @@ func (p *lruPolicy) deleteExpiredNodes() { log.Debug("lruPolicy::deleteExpiredNodes : List generated %d items", count) for _, item := range delItems { - if item.deleted { + item.RLock() + restored := !item.deleted + item.RUnlock() + if !restored { p.removeNode(item.name) p.deleteItem(item.name) } @@ -400,15 +413,16 @@ func (p *lruPolicy) deleteItem(name string) { } flock := p.fileLocks.Get(azPath) - if p.fileLocks.Locked(azPath) { - log.Warn("lruPolicy::DeleteItem : File in under download %s", azPath) - p.CacheValid(name) - return - } - flock.Lock() defer flock.Unlock() + // check if the file has been marked valid again after removeNode was called + _, found := p.nodeMap.Load(name) + if found { + log.Warn("lruPolicy::DeleteItem : File marked valid %s", azPath) + return + } + // Check if there are any open handles to this file or not if flock.Count() > 0 { log.Warn("lruPolicy::DeleteItem : File in use %s", name)