Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in file cache #379

Merged
merged 7 commits into from
Dec 3, 2024
59 changes: 35 additions & 24 deletions component/file_cache/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,12 +524,20 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O
j++
} else {
// Case 3: Item is in both local cache and cloud
if !attr.IsDir() && !fc.fileLocks.Locked(attr.Path) {
info, err := dirent.Info()

if !attr.IsDir() {
flock := fc.fileLocks.Get(attr.Path)
flock.Lock()
// use os.Stat instead of entry.Info() to be sure we get good info (with flock locked)
info, err := os.Stat(filepath.Join(localPath, dirent.Name())) // Grab local cache attributes
flock.Unlock()
if err == nil {
attr.Mtime = info.ModTime()
attr.Size = info.Size()
// attr is a pointer returned by NextComponent
// modifying attr could corrupt cached directory listings
// to update properties, we need to make a deep copy first
newAttr := *attr
newAttr.Mtime = info.ModTime()
newAttr.Size = info.Size()
attrs[i] = &newAttr
}
}
i++
Expand All @@ -541,14 +549,19 @@ func (fc *FileCache) StreamDir(options internal.StreamDirOptions) ([]*internal.O
if token == "" {
for _, entry := range dirents {
entryPath := common.JoinUnixFilepath(options.Name, entry.Name())
if !entry.IsDir() && !fc.fileLocks.Locked(entryPath) {
if !entry.IsDir() {
// This is an overhead for streamdir for now
// As list is paginated we have no way to know whether this particular item exists both in local cache
// and container or not. So we rely on getAttr to tell if entry was cached then it exists in cloud storage too
// If entry does not exists on storage then only return a local item here.
_, err := fc.NextComponent().GetAttr(internal.GetAttrOptions{Name: entryPath})
if err != nil && (err == syscall.ENOENT || os.IsNotExist(err)) {
info, err := entry.Info() // Grab local cache attributes
// get the lock on the file, to allow any pending operation to complete
flock := fc.fileLocks.Get(entryPath)
flock.Lock()
// use os.Stat instead of entry.Info() to be sure we get good info (with flock locked)
info, err := os.Stat(filepath.Join(localPath, entry.Name())) // Grab local cache attributes
flock.Unlock()
// If local file is not locked then only use its attributes otherwise rely on container attributes
if err == nil {
// Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes
Expand Down Expand Up @@ -1058,7 +1071,9 @@ func (fc *FileCache) ReadInBuffer(options internal.ReadInBufferOptions) (int, er

// Read and write operations are very frequent so updating cache policy for every read is a costly operation
// Update cache policy every 1K operations (includes both read and write) instead
options.Handle.Lock()
options.Handle.OptCnt++
options.Handle.Unlock()
if (options.Handle.OptCnt % defaultCacheUpdateCount) == 0 {
localPath := filepath.Join(fc.tmpPath, options.Handle.Path)
fc.policy.CacheValid(localPath)
Expand Down Expand Up @@ -1105,7 +1120,9 @@ func (fc *FileCache) WriteFile(options internal.WriteFileOptions) (int, error) {

// Read and write operations are very frequent so updating cache policy for every read is a costly operation
// Update cache policy every 1K operations (includes both read and write) instead
options.Handle.Lock()
options.Handle.OptCnt++
options.Handle.Unlock()
if (options.Handle.OptCnt % defaultCacheUpdateCount) == 0 {
localPath := filepath.Join(fc.tmpPath, options.Handle.Path)
fc.policy.CacheValid(localPath)
Expand Down Expand Up @@ -1316,24 +1333,18 @@ func (fc *FileCache) GetAttr(options internal.GetAttrOptions) (*internal.ObjAttr
// All directory operations are guaranteed to be synced with storage so they cannot be in a case 2 or 3 state.
if err == nil && !info.IsDir() {
if exists { // Case 3 (file in cloud storage and in local cache) so update the relevant attributes
// Return from local cache only if file is not under download or deletion
// If file is under download then taking size or mod time from it will be incorrect.
if !fc.fileLocks.Locked(options.Name) {
jfantinhardesty marked this conversation as resolved.
Show resolved Hide resolved
log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name)
attrs.Size = info.Size()
attrs.Mtime = info.ModTime()
} else {
log.Debug("FileCache::GetAttr : %s is locked, use storage attributes", options.Name)
}
log.Debug("FileCache::GetAttr : updating %s from local cache", options.Name)
// attrs is a pointer returned by NextComponent
// modifying attrs could corrupt cached directory listings
// to update properties, we need to make a deep copy first
newAttr := *attrs
newAttr.Mtime = info.ModTime()
newAttr.Size = info.Size()
attrs = &newAttr
} else { // Case 2 (file only in local cache) so create a new attributes and add them to the storage attributes
if !strings.Contains(localPath, fc.tmpPath) {
// Here if the path is going out of the temp directory then return ENOENT
exists = false
} else {
log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name)
exists = true
attrs = newObjAttr(options.Name, info)
}
log.Debug("FileCache::GetAttr : serving %s attr from local cache", options.Name)
exists = true
attrs = newObjAttr(options.Name, info)
}
}
jfantinhardesty marked this conversation as resolved.
Show resolved Hide resolved

Expand Down
7 changes: 1 addition & 6 deletions component/file_cache/file_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,12 +1211,7 @@ func (suite *fileCacheTestSuite) TestGetAttrCase3() {
suite.assert.NoError(err)
suite.assert.NotNil(attr)
suite.assert.EqualValues(file, attr.Path)
// this check is flaky in our CI pipeline on Linux, so skip it
if runtime.GOOS != "windows" {
fmt.Println("Skipping TestGetAttrCase3 attr.Size check on Linux because it's flaky.")
} else {
suite.assert.EqualValues(1024, attr.Size)
}
suite.assert.EqualValues(1024, attr.Size)
}

func (suite *fileCacheTestSuite) TestGetAttrCase4() {
Expand Down
36 changes: 25 additions & 11 deletions component/file_cache/lru_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
)

type lruNode struct {
sync.RWMutex
next *lruNode
prev *lruNode
usage int
Expand Down Expand Up @@ -176,6 +177,8 @@ func (p *lruPolicy) IsCached(name string) bool {
val, found := p.nodeMap.Load(name)
if found {
node := val.(*lruNode)
node.RLock()
defer node.RUnlock()
log.Debug("lruPolicy::IsCached : %s, deleted:%t", name, node.deleted)
if !node.deleted {
return true
Expand Down Expand Up @@ -221,18 +224,21 @@ func (p *lruPolicy) cacheValidate(name string) {
})
node := val.(*lruNode)

// protect node data
node.Lock()
node.deleted = false
node.usage++
node.Unlock()

// protect the LRU
p.Lock()
defer p.Unlock()

node.deleted = false

// put node at head of linked list
if node == p.head {
return
}
p.moveToHead(node)

node.usage++
}

// For all other timer based activities we check the stuff here
Expand Down Expand Up @@ -295,7 +301,9 @@ func (p *lruPolicy) removeNode(name string) {
defer p.Unlock()

node = val.(*lruNode)
node.Lock()
node.deleted = true
node.Unlock()

if node == p.head {
p.head = node.next
Expand Down Expand Up @@ -368,7 +376,9 @@ func (p *lruPolicy) deleteExpiredNodes() {

for ; node != nil && count < p.maxEviction; node = node.next {
delItems = append(delItems, node)
node.Lock()
node.deleted = true
node.Unlock()
count++
}

Expand All @@ -385,7 +395,10 @@ func (p *lruPolicy) deleteExpiredNodes() {
log.Debug("lruPolicy::deleteExpiredNodes : List generated %d items", count)

for _, item := range delItems {
if item.deleted {
item.RLock()
restored := !item.deleted
item.RUnlock()
if !restored {
p.removeNode(item.name)
p.deleteItem(item.name)
}
Expand All @@ -408,15 +421,16 @@ func (p *lruPolicy) deleteItem(name string) {
}

flock := p.fileLocks.Get(azPath)
if p.fileLocks.Locked(azPath) {
log.Warn("lruPolicy::DeleteItem : File in under download %s", azPath)
p.CacheValid(name)
return
}

flock.Lock()
defer flock.Unlock()

// check if the file has been marked valid again after removeNode was called
_, found := p.nodeMap.Load(name)
if found {
log.Warn("lruPolicy::DeleteItem : File marked valid %s", azPath)
return
}

// Check if there are any open handles to this file or not
if flock.Count() > 0 {
log.Warn("lruPolicy::DeleteItem : File in use %s", name)
Expand Down
Loading