Skip to content

Commit

Permalink
chunked: use mmap to load cache files
Browse files Browse the repository at this point in the history
reduce memory usage for the process by not loading entirely in memory
any cache file for the layers.

The memory mapped files can be shared among multiple instances of
Podman, as well as not being fully loaded in memory.

Signed-off-by: Giuseppe Scrivano <[email protected]>
  • Loading branch information
giuseppe committed Mar 20, 2024
1 parent f7e661f commit 2a4e4b3
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 85 deletions.
246 changes: 171 additions & 75 deletions pkg/chunked/cache_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"os"
"runtime"
"sort"
"strconv"
"strings"
Expand All @@ -21,6 +22,7 @@ import (
jsoniter "github.com/json-iterator/go"
digest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)

const (
Expand All @@ -41,10 +43,19 @@ type layer struct {
id string
cacheFile *cacheFile
target string
// mmapBuffer is nil when the cache file is fully loaded in memory.
// Otherwise it points to a mmap'ed buffer that is referenced by cacheFile.vdata.
mmapBuffer []byte

// reloadWithMmap is set when the current process generates the cache file,
// and cacheFile reuses the memory buffer used by the generation function.
// Next time the layer cache is used, attempt to reload the file using
// mmap.
reloadWithMmap bool
}

type layersCache struct {
layers []layer
layers []*layer
refs int
store storage.Store
mutex sync.RWMutex
Expand All @@ -56,14 +67,29 @@ var (
cache *layersCache
)

func (c *layer) release() {
runtime.SetFinalizer(c, nil)
if c.mmapBuffer != nil {
unix.Munmap(c.mmapBuffer)
}
}

func layerFinalizer(c *layer) {
c.release()
}

func (c *layersCache) release() {
cacheMutex.Lock()
defer cacheMutex.Unlock()

c.refs--
if c.refs == 0 {
cache = nil
if c.refs != 0 {
return
}
for _, l := range c.layers {
l.release()
}
cache = nil
}

func getLayersCacheRef(store storage.Store) *layersCache {
Expand Down Expand Up @@ -91,83 +117,152 @@ func getLayersCache(store storage.Store) (*layersCache, error) {
return c, nil
}

// loadLayerBigData attempts to load the specified cacheKey from a file and mmap its content.
// If the cache is not backed by a file, then it loads the entire content in memory.
// Returns the cache content, and if mmap'ed, the mmap buffer to Munmap.
func (c *layersCache) loadLayerBigData(layerID, cacheKey string) ([]byte, []byte, error) {
inputFile, err := c.store.LayerBigData(layerID, cacheKey)
if err != nil {
return nil, nil, err
}
defer inputFile.Close()

// if the cache is backed by a file, attempt to mmap it.
if osFile, ok := inputFile.(*os.File); ok {
st, err := osFile.Stat()
if err != nil {
logrus.Warningf("Error stat'ing cache file for layer %q: %v", layerID, err)
goto fallback
}
size := st.Size()
if size == 0 {
logrus.Warningf("Cache file size is zero for layer %q: %v", layerID, err)
goto fallback
}
buf, err := unix.Mmap(int(osFile.Fd()), 0, int(size), unix.PROT_READ, unix.MAP_SHARED)
if err != nil {
logrus.Warningf("Error mmap'ing cache file for layer %q: %v", layerID, err)
goto fallback
}
// best effort advise to the kernel.
_ = unix.Madvise(buf, unix.MADV_RANDOM)

return buf, buf, nil
}
fallback:
buf, err := io.ReadAll(inputFile)
return buf, nil, err
}

func (c *layersCache) loadLayerCache(layerID string) (_ *layer, errRet error) {
buffer, mmapBuffer, err := c.loadLayerBigData(layerID, cacheKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, err
}
// there is no existing cache to load
if err != nil || buffer == nil {
return nil, nil
}
defer func() {
if errRet != nil && mmapBuffer != nil {
unix.Munmap(mmapBuffer)
}
}()
cacheFile, err := readCacheFileFromMemory(buffer)
if err != nil {
return nil, err
}
return c.createLayer(layerID, cacheFile, mmapBuffer)
}

func (c *layersCache) createCacheFileFromTOC(layerID string) (*layer, error) {
clFile, err := c.store.LayerBigData(layerID, chunkedLayerDataKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return nil, err
}
var lcd chunkedLayerData
if clFile != nil {
cl, err := io.ReadAll(clFile)
if err != nil {
return nil, fmt.Errorf("open manifest file: %w", err)
}
json := jsoniter.ConfigCompatibleWithStandardLibrary

if err := json.Unmarshal(cl, &lcd); err != nil {
return nil, err
}
}
manifestReader, err := c.store.LayerBigData(layerID, bigDataKey)
if err != nil {
return nil, err
}
defer manifestReader.Close()

manifest, err := io.ReadAll(manifestReader)
if err != nil {
return nil, fmt.Errorf("read manifest file: %w", err)
}

cacheFile, err := writeCache(manifest, lcd.Format, layerID, c.store)
if err != nil {
return nil, err
}
l, err := c.createLayer(layerID, cacheFile, nil)
if err != nil {
return nil, err
}
l.reloadWithMmap = true
return l, nil
}

func (c *layersCache) load() error {
c.mutex.Lock()
defer c.mutex.Unlock()

loadedLayers := make(map[string]*layer)
for i, r := range c.layers {
loadedLayers[r.id] = c.layers[i]
}
allLayers, err := c.store.Layers()
if err != nil {
return err
}
existingLayers := make(map[string]string)
for _, r := range c.layers {
existingLayers[r.id] = r.target
}

currentLayers := make(map[string]string)
var newLayers []*layer
for _, r := range allLayers {
currentLayers[r.ID] = r.ID
if _, found := existingLayers[r.ID]; found {
continue
}

bigData, err := c.store.LayerBigData(r.ID, cacheKey)
// if the cache already exists, read and use it
if err == nil {
defer bigData.Close()
cacheFile, err := readCacheFileFromReader(bigData)
if err == nil {
c.addLayer(r.ID, cacheFile)
// The layer is present in the store and it is already loaded. Attempt to
// re-use it if mmap'ed.
if l, found := loadedLayers[r.ID]; found {
// If the layer is not marked for re-load, move it to newLayers.
if !l.reloadWithMmap {
delete(loadedLayers, r.ID)
newLayers = append(newLayers, l)
continue
}
logrus.Warningf("Error reading cache file for layer %q: %v", r.ID, err)
} else if !errors.Is(err, os.ErrNotExist) {
return err
}

var lcd chunkedLayerData

clFile, err := c.store.LayerBigData(r.ID, chunkedLayerDataKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
if clFile != nil {
cl, err := io.ReadAll(clFile)
if err != nil {
return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err)
}
json := jsoniter.ConfigCompatibleWithStandardLibrary
if err := json.Unmarshal(cl, &lcd); err != nil {
return err
}
}

// otherwise create it from the layer TOC.
manifestReader, err := c.store.LayerBigData(r.ID, bigDataKey)
// try to read the existing cache file.
l, err := c.loadLayerCache(r.ID)
if err != nil {
logrus.Warningf("Error loading cache file for layer %q: %v", r.ID, err)
}
if l != nil {
newLayers = append(newLayers, l)
continue
}
defer manifestReader.Close()

manifest, err := io.ReadAll(manifestReader)
// the cache file is either not present or broken. Try to generate it from the TOC.
l, err = c.createCacheFileFromTOC(r.ID)
if err != nil {
return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err)
}

cacheFile, err := writeCache(manifest, lcd.Format, r.ID, c.store)
if err == nil {
c.addLayer(r.ID, cacheFile)
logrus.Warningf("Error creating cache file for layer %q: %v", r.ID, err)
}
}

var newLayers []layer
for _, l := range c.layers {
if _, found := currentLayers[l.id]; found {
if l != nil {
newLayers = append(newLayers, l)
}
}
// The layers that are still in loadedLayers are either stale or fully loaded in memory. Clean them up.
for _, l := range loadedLayers {
l.release()
}
c.layers = newLayers

return nil
}

Expand Down Expand Up @@ -237,7 +332,7 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin
digestLen := 0
var tagsBuffer bytes.Buffer

toc, err := prepareMetadata(manifest, format)
toc, err := prepareCacheFile(manifest, format)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,7 +367,6 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin
if _, err := vdata.Write(location); err != nil {
return nil, err
}

digestLen = len(k.Digest)
}
if k.ChunkDigest != "" {
Expand Down Expand Up @@ -377,7 +471,9 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin
}, nil
}

func readCacheFileFromReader(bigData io.Reader) (*cacheFile, error) {
func readCacheFileFromMemory(bigDataBuffer []byte) (*cacheFile, error) {
bigData := bytes.NewReader(bigDataBuffer)

var version, tagLen, digestLen, tagsLen, vdataLen uint64
if err := binary.Read(bigData, binary.LittleEndian, &version); err != nil {
return nil, err
Expand All @@ -403,10 +499,8 @@ func readCacheFileFromReader(bigData io.Reader) (*cacheFile, error) {
return nil, err
}

vdata := make([]byte, vdataLen)
if _, err := bigData.Read(vdata); err != nil {
return nil, err
}
// retrieve the unread part of the buffer.
vdata := bigDataBuffer[len(bigDataBuffer)-bigData.Len():]

return &cacheFile{
tagLen: int(tagLen),
Expand All @@ -416,7 +510,7 @@ func readCacheFileFromReader(bigData io.Reader) (*cacheFile, error) {
}, nil
}

func prepareMetadata(manifest []byte, format graphdriver.DifferOutputFormat) ([]*fileMetadata, error) {
func prepareCacheFile(manifest []byte, format graphdriver.DifferOutputFormat) ([]*fileMetadata, error) {
toc, err := unmarshalToc(manifest)
if err != nil {
// ignore errors here. They might be caused by a different manifest format.
Expand Down Expand Up @@ -462,19 +556,21 @@ func prepareMetadata(manifest []byte, format graphdriver.DifferOutputFormat) ([]
return r, nil
}

func (c *layersCache) addLayer(id string, cacheFile *cacheFile) error {
func (c *layersCache) createLayer(id string, cacheFile *cacheFile, mmapBuffer []byte) (*layer, error) {
target, err := c.store.DifferTarget(id)
if err != nil {
return fmt.Errorf("get checkout directory layer %q: %w", id, err)
return nil, fmt.Errorf("get checkout directory layer %q: %w", id, err)
}

l := layer{
id: id,
cacheFile: cacheFile,
target: target,
l := &layer{
id: id,
cacheFile: cacheFile,
target: target,
mmapBuffer: mmapBuffer,
}
c.layers = append(c.layers, l)
return nil
if mmapBuffer != nil {
runtime.SetFinalizer(l, layerFinalizer)
}
return l, nil
}

func byteSliceAsString(b []byte) string {
Expand Down
Loading

0 comments on commit 2a4e4b3

Please sign in to comment.