Skip to content

Commit

Permalink
chunked: use mmap to load cache files
Browse files Browse the repository at this point in the history
reduce memory usage for the process by not loading entirely in memory
any cache file for the layers.

The memory mapped files can be shared among multiple instances of
Podman, as well as not being fully loaded in memory.

Signed-off-by: Giuseppe Scrivano <[email protected]>
  • Loading branch information
giuseppe committed Mar 5, 2024
1 parent bb59307 commit 7460370
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 73 deletions.
201 changes: 129 additions & 72 deletions pkg/chunked/cache_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
jsoniter "github.com/json-iterator/go"
digest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)

const (
Expand All @@ -41,6 +42,9 @@ type layer struct {
id string
metadata *metadata
target string
// mmapBuffer is nil when the metadata is fully loaded in memory.
// Otherwise it points to a mmap'ed buffer that is referenced by metadata.vdata.
mmapBuffer []byte
}

type layersCache struct {
Expand All @@ -61,9 +65,15 @@ func (c *layersCache) release() {
defer cacheMutex.Unlock()

c.refs--
if c.refs == 0 {
cache = nil
if c.refs != 0 {
return
}
for _, l := range c.layers {
if l.mmapBuffer != nil {
unix.Munmap(l.mmapBuffer)
}
}
cache = nil
}

func getLayersCacheRef(store storage.Store) *layersCache {
Expand Down Expand Up @@ -91,83 +101,130 @@ func getLayersCache(store storage.Store) (*layersCache, error) {
return c, nil
}

func (c *layersCache) load() error {
c.mutex.Lock()
defer c.mutex.Unlock()
func (c *layersCache) loadCacheFile(layerID, cacheKey string) ([]byte, error) {
path, err := c.store.LayerBigDataFilePath(layerID, cacheKey)
if path == "" || err != nil {
return nil, err
}

allLayers, err := c.store.Layers()
file, err := os.OpenFile(path, os.O_RDONLY, 0600)
if err != nil {
return nil, err
}
defer file.Close()

st, err := file.Stat()
if err != nil {
return nil, err
}

size := st.Size()
if size == 0 {
return nil, nil
}

buf, err := unix.Mmap(int(file.Fd()), 0, int(size), unix.PROT_READ, unix.MAP_SHARED)
if err != nil {
return nil, err
}
// best effort advise to the kernel.
_ = unix.Madvise(buf, unix.MADV_RANDOM)

return buf, err
}

func (c *layersCache) loadLayerCache(layerID string) (bool, error) {
mmapBuffer, err := c.loadCacheFile(layerID, cacheKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return false, err
}
// there is no existing cache to load
if err != nil || mmapBuffer == nil {
return false, nil
}

metadata, err := readMetadataFromCache(mmapBuffer)
if err != nil {
unix.Munmap(mmapBuffer)
return false, err
}
if err := c.addLayer(layerID, metadata, mmapBuffer); err != nil {
// the mmap'ed data is not owned by the cache manager on errors
unix.Munmap(mmapBuffer)
return false, err
}
return true, nil
}

func (c *layersCache) createCacheFileFromTOC(layerID string) error {
clFile, err := c.store.LayerBigData(layerID, chunkedLayerDataKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
existingLayers := make(map[string]string)
for _, r := range c.layers {
existingLayers[r.id] = r.target
if clFile == nil {
return nil
}
cl, err := io.ReadAll(clFile)
if err != nil {
return fmt.Errorf("open manifest file: %w", err)
}
json := jsoniter.ConfigCompatibleWithStandardLibrary

currentLayers := make(map[string]string)
for _, r := range allLayers {
currentLayers[r.ID] = r.ID
if _, found := existingLayers[r.ID]; found {
continue
}
var lcd chunkedLayerData
if err := json.Unmarshal(cl, &lcd); err != nil {
return err
}
manifestReader, err := c.store.LayerBigData(layerID, bigDataKey)
if err != nil {
return err
}
defer manifestReader.Close()

bigData, err := c.store.LayerBigData(r.ID, cacheKey)
// if the cache already exists, read and use it
if err == nil {
defer bigData.Close()
metadata, err := readMetadataFromCache(bigData)
if err == nil {
c.addLayer(r.ID, metadata)
continue
}
logrus.Warningf("Error reading cache file for layer %q: %v", r.ID, err)
} else if !errors.Is(err, os.ErrNotExist) {
return err
}
manifest, err := io.ReadAll(manifestReader)
if err != nil {
return fmt.Errorf("read manifest file: %w", err)
}

var lcd chunkedLayerData
metadata, err := writeCache(manifest, lcd.Format, layerID, c.store)
if err != nil {
return err
}
return c.addLayer(layerID, metadata, nil)
}

clFile, err := c.store.LayerBigData(r.ID, chunkedLayerDataKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
if clFile != nil {
cl, err := io.ReadAll(clFile)
if err != nil {
return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err)
}
json := jsoniter.ConfigCompatibleWithStandardLibrary
if err := json.Unmarshal(cl, &lcd); err != nil {
return err
}
}
func (c *layersCache) load() error {
c.mutex.Lock()
defer c.mutex.Unlock()

// otherwise create it from the layer TOC.
manifestReader, err := c.store.LayerBigData(r.ID, bigDataKey)
if err != nil {
existingLayers := make(map[string]struct{})
for _, r := range c.layers {
// ignore the layer if it was fully loaded in memory.
// In this way it can be reloaded using mmap.
if r.mmapBuffer != nil {
existingLayers[r.id] = struct{}{}
}
}
allLayers, err := c.store.Layers()
if err != nil {
return err
}
for _, r := range allLayers {
if _, found := existingLayers[r.ID]; found {
continue
}
defer manifestReader.Close()

manifest, err := io.ReadAll(manifestReader)
// try to read the existing cache file
loaded, err := c.loadLayerCache(r.ID)
if err != nil {
return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err)
logrus.Warningf("Error loading cache file for layer %q: %v", r.ID, err)
}

metadata, err := writeCache(manifest, lcd.Format, r.ID, c.store)
if err == nil {
c.addLayer(r.ID, metadata)
if loaded {
continue
}
}

var newLayers []layer
for _, l := range c.layers {
if _, found := currentLayers[l.id]; found {
newLayers = append(newLayers, l)
// the cache file is either not present or broken. Try to generate it.
if err := c.createCacheFileFromTOC(r.ID); err != nil {
logrus.Warningf("Error creating cache file for layer %q: %v", r.ID, err)
}
}
c.layers = newLayers

return nil
}

Expand Down Expand Up @@ -272,7 +329,6 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin
if _, err := vdata.Write(location); err != nil {
return nil, err
}

digestLen = len(k.Digest)
}
if k.ChunkDigest != "" {
Expand Down Expand Up @@ -377,7 +433,9 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin
}, nil
}

func readMetadataFromCache(bigData io.Reader) (*metadata, error) {
func readMetadataFromCache(bigDataBuffer []byte) (*metadata, error) {
bigData := bytes.NewReader(bigDataBuffer)

var version, tagLen, digestLen, tagsLen, vdataLen uint64
if err := binary.Read(bigData, binary.LittleEndian, &version); err != nil {
return nil, err
Expand All @@ -403,10 +461,8 @@ func readMetadataFromCache(bigData io.Reader) (*metadata, error) {
return nil, err
}

vdata := make([]byte, vdataLen)
if _, err := bigData.Read(vdata); err != nil {
return nil, err
}
// retrieve the unread part of the buffer.
vdata := bigDataBuffer[len(bigDataBuffer)-bigData.Len():]

return &metadata{
tagLen: int(tagLen),
Expand Down Expand Up @@ -455,16 +511,17 @@ func prepareMetadata(manifest []byte, format graphdriver.DifferOutputFormat) ([]
return r, nil
}

func (c *layersCache) addLayer(id string, metadata *metadata) error {
func (c *layersCache) addLayer(id string, metadata *metadata, mmapBuffer []byte) error {
target, err := c.store.DifferTarget(id)
if err != nil {
return fmt.Errorf("get checkout directory layer %q: %w", id, err)
}

l := layer{
id: id,
metadata: metadata,
target: target,
id: id,
metadata: metadata,
target: target,
mmapBuffer: mmapBuffer,
}
c.layers = append(c.layers, l)
return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/chunked/cache_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func TestReadCache(t *testing.T) {
t.Errorf("got error from writeCache: %v", err)
}

cacheRead, err := readMetadataFromCache(dest.buf)
cacheRead, err := readMetadataFromCache(dest.buf.Bytes())
if err != nil {
t.Errorf("got error from readMetadataFromCache: %v", err)
}
Expand Down

0 comments on commit 7460370

Please sign in to comment.