diff --git a/pkg/chunked/cache_linux.go b/pkg/chunked/cache_linux.go index a7604535f2..cedf6322b8 100644 --- a/pkg/chunked/cache_linux.go +++ b/pkg/chunked/cache_linux.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "os" + "runtime" "sort" "strconv" "strings" @@ -21,6 +22,7 @@ import ( jsoniter "github.com/json-iterator/go" digest "github.com/opencontainers/go-digest" "github.com/sirupsen/logrus" + "golang.org/x/sys/unix" ) const ( @@ -41,10 +43,19 @@ type layer struct { id string cacheFile *cacheFile target string + // mmapBuffer is nil when the cache file is fully loaded in memory. + // Otherwise it points to a mmap'ed buffer that is referenced by cacheFile.vdata. + mmapBuffer []byte + + // reloadWithMmap is set when the current process generates the cache file, + // and cacheFile reuses the memory buffer used by the generation function. + // Next time the layer cache is used, attempt to reload the file using + // mmap. + reloadWithMmap bool } type layersCache struct { - layers []layer + layers []*layer refs int store storage.Store mutex sync.RWMutex @@ -56,14 +67,29 @@ var ( cache *layersCache ) +func (c *layer) release() { + runtime.SetFinalizer(c, nil) + if c.mmapBuffer != nil { + unix.Munmap(c.mmapBuffer) + } +} + +func layerFinalizer(c *layer) { + c.release() +} + func (c *layersCache) release() { cacheMutex.Lock() defer cacheMutex.Unlock() c.refs-- - if c.refs == 0 { - cache = nil + if c.refs != 0 { + return } + for _, l := range c.layers { + l.release() + } + cache = nil } func getLayersCacheRef(store storage.Store) *layersCache { @@ -91,83 +117,152 @@ func getLayersCache(store storage.Store) (*layersCache, error) { return c, nil } +// loadLayerBigData attempts to load the specified cacheKey from a file and mmap its content. +// If the cache is not backed by a file, then it loads the entire content in memory. +// Returns the cache content, and if mmap'ed, the mmap buffer to Munmap. +func (c *layersCache) loadLayerBigData(layerID, cacheKey string) ([]byte, []byte, error) { + inputFile, err := c.store.LayerBigData(layerID, cacheKey) + if err != nil { + return nil, nil, err + } + defer inputFile.Close() + + // if the cache is backed by a file, attempt to mmap it. + if osFile, ok := inputFile.(*os.File); ok { + st, err := osFile.Stat() + if err != nil { + logrus.Warningf("Error stat'ing cache file for layer %q: %v", layerID, err) + goto fallback + } + size := st.Size() + if size == 0 { + logrus.Warningf("Cache file size is zero for layer %q: %v", layerID, err) + goto fallback + } + buf, err := unix.Mmap(int(osFile.Fd()), 0, int(size), unix.PROT_READ, unix.MAP_SHARED) + if err != nil { + logrus.Warningf("Error mmap'ing cache file for layer %q: %v", layerID, err) + goto fallback + } + // best effort advise to the kernel. + _ = unix.Madvise(buf, unix.MADV_RANDOM) + + return buf, buf, nil + } +fallback: + buf, err := io.ReadAll(inputFile) + return buf, nil, err +} + +func (c *layersCache) loadLayerCache(layerID string) (_ *layer, errRet error) { + buffer, mmapBuffer, err := c.loadLayerBigData(layerID, cacheKey) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, err + } + // there is no existing cache to load + if err != nil || buffer == nil { + return nil, nil + } + defer func() { + if errRet != nil && mmapBuffer != nil { + unix.Munmap(mmapBuffer) + } + }() + cacheFile, err := readCacheFileFromMemory(buffer) + if err != nil { + return nil, err + } + return c.createLayer(layerID, cacheFile, mmapBuffer) +} + +func (c *layersCache) createCacheFileFromTOC(layerID string) (*layer, error) { + clFile, err := c.store.LayerBigData(layerID, chunkedLayerDataKey) + if err != nil && !errors.Is(err, os.ErrNotExist) { + return nil, err + } + var lcd chunkedLayerData + if clFile != nil { + cl, err := io.ReadAll(clFile) + if err != nil { + return nil, fmt.Errorf("open manifest file: %w", err) + } + json := jsoniter.ConfigCompatibleWithStandardLibrary + + if err := json.Unmarshal(cl, &lcd); err != nil { + return nil, err + } + } + manifestReader, err := c.store.LayerBigData(layerID, bigDataKey) + if err != nil { + return nil, err + } + defer manifestReader.Close() + + manifest, err := io.ReadAll(manifestReader) + if err != nil { + return nil, fmt.Errorf("read manifest file: %w", err) + } + + cacheFile, err := writeCache(manifest, lcd.Format, layerID, c.store) + if err != nil { + return nil, err + } + l, err := c.createLayer(layerID, cacheFile, nil) + if err != nil { + return nil, err + } + l.reloadWithMmap = true + return l, nil +} + func (c *layersCache) load() error { c.mutex.Lock() defer c.mutex.Unlock() + loadedLayers := make(map[string]*layer) + for i, r := range c.layers { + loadedLayers[r.id] = c.layers[i] + } allLayers, err := c.store.Layers() if err != nil { return err } - existingLayers := make(map[string]string) - for _, r := range c.layers { - existingLayers[r.id] = r.target - } - currentLayers := make(map[string]string) + var newLayers []*layer for _, r := range allLayers { - currentLayers[r.ID] = r.ID - if _, found := existingLayers[r.ID]; found { - continue - } - - bigData, err := c.store.LayerBigData(r.ID, cacheKey) - // if the cache already exists, read and use it - if err == nil { - defer bigData.Close() - cacheFile, err := readCacheFileFromReader(bigData) - if err == nil { - c.addLayer(r.ID, cacheFile) + // The layer is present in the store and it is already loaded. Attempt to + // re-use it if mmap'ed. + if l, found := loadedLayers[r.ID]; found { + // If the layer is not marked for re-load, move it to newLayers. + if !l.reloadWithMmap { + delete(loadedLayers, r.ID) + newLayers = append(newLayers, l) continue } - logrus.Warningf("Error reading cache file for layer %q: %v", r.ID, err) - } else if !errors.Is(err, os.ErrNotExist) { - return err } - - var lcd chunkedLayerData - - clFile, err := c.store.LayerBigData(r.ID, chunkedLayerDataKey) - if err != nil && !errors.Is(err, os.ErrNotExist) { - return err - } - if clFile != nil { - cl, err := io.ReadAll(clFile) - if err != nil { - return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err) - } - json := jsoniter.ConfigCompatibleWithStandardLibrary - if err := json.Unmarshal(cl, &lcd); err != nil { - return err - } - } - - // otherwise create it from the layer TOC. - manifestReader, err := c.store.LayerBigData(r.ID, bigDataKey) + // try to read the existing cache file. + l, err := c.loadLayerCache(r.ID) if err != nil { + logrus.Warningf("Error loading cache file for layer %q: %v", r.ID, err) + } + if l != nil { + newLayers = append(newLayers, l) continue } - defer manifestReader.Close() - - manifest, err := io.ReadAll(manifestReader) + // the cache file is either not present or broken. Try to generate it from the TOC. + l, err = c.createCacheFileFromTOC(r.ID) if err != nil { - return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err) - } - - cacheFile, err := writeCache(manifest, lcd.Format, r.ID, c.store) - if err == nil { - c.addLayer(r.ID, cacheFile) + logrus.Warningf("Error creating cache file for layer %q: %v", r.ID, err) } - } - - var newLayers []layer - for _, l := range c.layers { - if _, found := currentLayers[l.id]; found { + if l != nil { newLayers = append(newLayers, l) } } + // The layers that are still in loadedLayers are either stale or fully loaded in memory. Clean them up. + for _, l := range loadedLayers { + l.release() + } c.layers = newLayers - return nil } @@ -237,7 +332,7 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin digestLen := 0 var tagsBuffer bytes.Buffer - toc, err := prepareMetadata(manifest, format) + toc, err := prepareCacheFile(manifest, format) if err != nil { return nil, err } @@ -272,7 +367,6 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin if _, err := vdata.Write(location); err != nil { return nil, err } - digestLen = len(k.Digest) } if k.ChunkDigest != "" { @@ -377,7 +471,9 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin }, nil } -func readCacheFileFromReader(bigData io.Reader) (*cacheFile, error) { +func readCacheFileFromMemory(bigDataBuffer []byte) (*cacheFile, error) { + bigData := bytes.NewReader(bigDataBuffer) + var version, tagLen, digestLen, tagsLen, vdataLen uint64 if err := binary.Read(bigData, binary.LittleEndian, &version); err != nil { return nil, err @@ -403,10 +499,8 @@ func readCacheFileFromReader(bigData io.Reader) (*cacheFile, error) { return nil, err } - vdata := make([]byte, vdataLen) - if _, err := bigData.Read(vdata); err != nil { - return nil, err - } + // retrieve the unread part of the buffer. + vdata := bigDataBuffer[len(bigDataBuffer)-bigData.Len():] return &cacheFile{ tagLen: int(tagLen), @@ -416,7 +510,7 @@ func readCacheFileFromReader(bigData io.Reader) (*cacheFile, error) { }, nil } -func prepareMetadata(manifest []byte, format graphdriver.DifferOutputFormat) ([]*fileMetadata, error) { +func prepareCacheFile(manifest []byte, format graphdriver.DifferOutputFormat) ([]*fileMetadata, error) { toc, err := unmarshalToc(manifest) if err != nil { // ignore errors here. They might be caused by a different manifest format. @@ -462,19 +556,21 @@ func prepareMetadata(manifest []byte, format graphdriver.DifferOutputFormat) ([] return r, nil } -func (c *layersCache) addLayer(id string, cacheFile *cacheFile) error { +func (c *layersCache) createLayer(id string, cacheFile *cacheFile, mmapBuffer []byte) (*layer, error) { target, err := c.store.DifferTarget(id) if err != nil { - return fmt.Errorf("get checkout directory layer %q: %w", id, err) + return nil, fmt.Errorf("get checkout directory layer %q: %w", id, err) } - - l := layer{ - id: id, - cacheFile: cacheFile, - target: target, + l := &layer{ + id: id, + cacheFile: cacheFile, + target: target, + mmapBuffer: mmapBuffer, } - c.layers = append(c.layers, l) - return nil + if mmapBuffer != nil { + runtime.SetFinalizer(l, layerFinalizer) + } + return l, nil } func byteSliceAsString(b []byte) string { diff --git a/pkg/chunked/cache_linux_test.go b/pkg/chunked/cache_linux_test.go index 36744a21d4..190ddb2f39 100644 --- a/pkg/chunked/cache_linux_test.go +++ b/pkg/chunked/cache_linux_test.go @@ -61,26 +61,26 @@ const jsonTOC = ` ` func TestPrepareMetadata(t *testing.T) { - toc, err := prepareMetadata([]byte(jsonTOC), graphdriver.DifferOutputFormatDir) + toc, err := prepareCacheFile([]byte(jsonTOC), graphdriver.DifferOutputFormatDir) if err != nil { - t.Errorf("got error from prepareMetadata: %v", err) + t.Errorf("got error from prepareCacheFile: %v", err) } if len(toc) != 2 { - t.Error("prepareMetadata returns the wrong length") + t.Error("prepareCacheFile returns the wrong length") } } func TestPrepareMetadataFlat(t *testing.T) { - toc, err := prepareMetadata([]byte(jsonTOC), graphdriver.DifferOutputFormatFlat) + toc, err := prepareCacheFile([]byte(jsonTOC), graphdriver.DifferOutputFormatFlat) if err != nil { - t.Errorf("got error from prepareMetadata: %v", err) + t.Errorf("got error from prepareCacheFile: %v", err) } for _, e := range toc { if len(strings.Split(e.Name, "/")) != 2 { - t.Error("prepareMetadata returns the wrong number of path elements for flat directories") + t.Error("prepareCacheFile returns the wrong number of path elements for flat directories") } if len(filepath.Dir(e.Name)) != 2 { - t.Error("prepareMetadata returns the wrong path for flat directories") + t.Error("prepareCacheFile returns the wrong path for flat directories") } } } @@ -104,9 +104,9 @@ func (b *bigDataToBuffer) SetLayerBigData(id, key string, data io.Reader) error } func TestWriteCache(t *testing.T) { - toc, err := prepareMetadata([]byte(jsonTOC), graphdriver.DifferOutputFormatDir) + toc, err := prepareCacheFile([]byte(jsonTOC), graphdriver.DifferOutputFormatDir) if err != nil { - t.Errorf("got error from prepareMetadata: %v", err) + t.Errorf("got error from prepareCacheFile: %v", err) } dest := bigDataToBuffer{ @@ -182,7 +182,7 @@ func TestReadCache(t *testing.T) { t.Errorf("got error from writeCache: %v", err) } - cacheRead, err := readCacheFileFromReader(dest.buf) + cacheRead, err := readCacheFileFromMemory(dest.buf.Bytes()) if err != nil { t.Errorf("got error from readMetadataFromCache: %v", err) }