Skip to content

Commit

Permalink
chunked: use mmap to load cache files
Browse files Browse the repository at this point in the history
reduce memory usage for the process by not loading entirely in memory
any cache file for the layers.

The memory mapped files can be shared among multiple instances of
Podman, as well as not being fully loaded in memory.

Signed-off-by: Giuseppe Scrivano <[email protected]>
  • Loading branch information
giuseppe committed Mar 6, 2024
1 parent bb59307 commit ec1aba0
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 100 deletions.
253 changes: 163 additions & 90 deletions pkg/chunked/cache_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
jsoniter "github.com/json-iterator/go"
digest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
"golang.org/x/sys/unix"
)

const (
Expand All @@ -30,17 +31,20 @@ const (
digestSha256Empty = "sha256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
)

type metadata struct {
type cacheFile struct {
tagLen int
digestLen int
tags []byte
vdata []byte
}

type layer struct {
id string
metadata *metadata
target string
id string
cacheFile *cacheFile
target string
// mmapBuffer is nil when the cache file is fully loaded in memory.
// Otherwise it points to a mmap'ed buffer that is referenced by cacheFile.vdata.
mmapBuffer []byte
}

type layersCache struct {
Expand All @@ -61,9 +65,15 @@ func (c *layersCache) release() {
defer cacheMutex.Unlock()

c.refs--
if c.refs == 0 {
cache = nil
if c.refs != 0 {
return
}
for _, l := range c.layers {
if l.mmapBuffer != nil {
unix.Munmap(l.mmapBuffer)
}
}
cache = nil
}

func getLayersCacheRef(store storage.Store) *layersCache {
Expand Down Expand Up @@ -91,83 +101,146 @@ func getLayersCache(store storage.Store) (*layersCache, error) {
return c, nil
}

func (c *layersCache) load() error {
c.mutex.Lock()
defer c.mutex.Unlock()

allLayers, err := c.store.Layers()
// tryLoadCacheFromFile attempts to load the specified cacheKey from a file and mmap its content.
// If the cache is not backed by a file, then it loads the entire content in memory.
// Returns the cache content, and if mmap'ed, the mmap buffer to Munmap.
func (c *layersCache) tryLoadCacheFromFile(layerID, cacheKey string) ([]byte, []byte, error) {
path, err := c.store.LayerBigDataFilePath(layerID, cacheKey)
if err != nil {
return err
}
existingLayers := make(map[string]string)
for _, r := range c.layers {
existingLayers[r.id] = r.target
return nil, nil, err
}
// the cache is backed by a file, attempt to mmap it.
if path != "" {
file, err := os.OpenFile(path, os.O_RDONLY, 0o600)
if err != nil {
return nil, nil, err
}
defer file.Close()

currentLayers := make(map[string]string)
for _, r := range allLayers {
currentLayers[r.ID] = r.ID
if _, found := existingLayers[r.ID]; found {
continue
st, err := file.Stat()
if err != nil {
return nil, nil, err
}

bigData, err := c.store.LayerBigData(r.ID, cacheKey)
// if the cache already exists, read and use it
if err == nil {
defer bigData.Close()
metadata, err := readMetadataFromCache(bigData)
if err == nil {
c.addLayer(r.ID, metadata)
continue
}
logrus.Warningf("Error reading cache file for layer %q: %v", r.ID, err)
} else if !errors.Is(err, os.ErrNotExist) {
return err
size := st.Size()
if size == 0 {
return nil, nil, nil
}

var lcd chunkedLayerData
buf, err := unix.Mmap(int(file.Fd()), 0, int(size), unix.PROT_READ, unix.MAP_SHARED)
if err != nil {
return nil, nil, err
}
// best effort advise to the kernel.
_ = unix.Madvise(buf, unix.MADV_RANDOM)

clFile, err := c.store.LayerBigData(r.ID, chunkedLayerDataKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
return buf, buf, err
}
input, err := c.store.LayerBigData(layerID, cacheKey)
if err != nil {
return nil, nil, err
}
defer input.Close()
buf, err := io.ReadAll(input)
return buf, nil, err
}

func (c *layersCache) loadLayerCache(layerID string) (bool, error) {
buffer, mmapBuffer, err := c.tryLoadCacheFromFile(layerID, cacheKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return false, err
}
// there is no existing cache to load
if err != nil || buffer == nil {
return false, nil
}

cacheFile, err := readCacheFileFromMemory(buffer)
if err != nil {
if mmapBuffer != nil {
unix.Munmap(mmapBuffer)
}
if clFile != nil {
cl, err := io.ReadAll(clFile)
if err != nil {
return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err)
}
json := jsoniter.ConfigCompatibleWithStandardLibrary
if err := json.Unmarshal(cl, &lcd); err != nil {
return err
}
return false, err
}
if err := c.addLayer(layerID, cacheFile, mmapBuffer); err != nil {
// the mmap'ed data is not owned by the cache manager on errors
if mmapBuffer != nil {
unix.Munmap(mmapBuffer)
}
return false, err
}
return true, nil
}

// otherwise create it from the layer TOC.
manifestReader, err := c.store.LayerBigData(r.ID, bigDataKey)
if err != nil {
func (c *layersCache) createCacheFileFromTOC(layerID string) error {
clFile, err := c.store.LayerBigData(layerID, chunkedLayerDataKey)
if err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
if clFile == nil {
return nil
}
cl, err := io.ReadAll(clFile)
if err != nil {
return fmt.Errorf("open manifest file: %w", err)
}
json := jsoniter.ConfigCompatibleWithStandardLibrary

var lcd chunkedLayerData
if err := json.Unmarshal(cl, &lcd); err != nil {
return err
}
manifestReader, err := c.store.LayerBigData(layerID, bigDataKey)
if err != nil {
return err
}
defer manifestReader.Close()

manifest, err := io.ReadAll(manifestReader)
if err != nil {
return fmt.Errorf("read manifest file: %w", err)
}

cacheFile, err := writeCache(manifest, lcd.Format, layerID, c.store)
if err != nil {
return err
}
return c.addLayer(layerID, cacheFile, nil)
}

func (c *layersCache) load() error {
c.mutex.Lock()
defer c.mutex.Unlock()

existingLayers := make(map[string]struct{})
for _, r := range c.layers {
// ignore the layer if it was fully loaded in memory.
// In this way it can be reloaded using mmap.
if r.mmapBuffer != nil {
existingLayers[r.id] = struct{}{}
}
}
allLayers, err := c.store.Layers()
if err != nil {
return err
}
for _, r := range allLayers {
if _, found := existingLayers[r.ID]; found {
continue
}
defer manifestReader.Close()

manifest, err := io.ReadAll(manifestReader)
// try to read the existing cache file
loaded, err := c.loadLayerCache(r.ID)
if err != nil {
return fmt.Errorf("open manifest file for layer %q: %w", r.ID, err)
logrus.Warningf("Error loading cache file for layer %q: %v", r.ID, err)
}

metadata, err := writeCache(manifest, lcd.Format, r.ID, c.store)
if err == nil {
c.addLayer(r.ID, metadata)
if loaded {
continue
}
}

var newLayers []layer
for _, l := range c.layers {
if _, found := currentLayers[l.id]; found {
newLayers = append(newLayers, l)
// the cache file is either not present or broken. Try to generate it.
if err := c.createCacheFileFromTOC(r.ID); err != nil {
logrus.Warningf("Error creating cache file for layer %q: %v", r.ID, err)
}
}
c.layers = newLayers

return nil
}

Expand Down Expand Up @@ -214,7 +287,7 @@ func generateFileLocation(path string, offset, len uint64) []byte {

// generateTag generates a tag in the form $DIGEST$OFFSET@LEN.
// the [OFFSET; LEN] points to the variable length data where the file locations
// are stored. $DIGEST has length digestLen stored in the metadata file header.
// are stored. $DIGEST has length digestLen stored in the cache file file header.
func generateTag(digest string, offset, len uint64) string {
return fmt.Sprintf("%s%.20d@%.20d", digest, offset, len)
}
Expand All @@ -231,13 +304,13 @@ type setBigData interface {
// - digest(file.payload))
// - digest(digest(file.payload) + file.UID + file.GID + file.mode + file.xattrs)
// - digest(i) for each i in chunks(file payload)
func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id string, dest setBigData) (*metadata, error) {
func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id string, dest setBigData) (*cacheFile, error) {
var vdata bytes.Buffer
tagLen := 0
digestLen := 0
var tagsBuffer bytes.Buffer

toc, err := prepareMetadata(manifest, format)
toc, err := prepareCacheFile(manifest, format)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,7 +345,6 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin
if _, err := vdata.Write(location); err != nil {
return nil, err
}

digestLen = len(k.Digest)
}
if k.ChunkDigest != "" {
Expand Down Expand Up @@ -369,15 +441,17 @@ func writeCache(manifest []byte, format graphdriver.DifferOutputFormat, id strin

logrus.Debugf("Written lookaside cache for layer %q with length %v", id, counter.Count)

return &metadata{
return &cacheFile{
digestLen: digestLen,
tagLen: tagLen,
tags: tagsBuffer.Bytes(),
vdata: vdata.Bytes(),
}, nil
}

func readMetadataFromCache(bigData io.Reader) (*metadata, error) {
func readCacheFileFromMemory(bigDataBuffer []byte) (*cacheFile, error) {
bigData := bytes.NewReader(bigDataBuffer)

var version, tagLen, digestLen, tagsLen, vdataLen uint64
if err := binary.Read(bigData, binary.LittleEndian, &version); err != nil {
return nil, err
Expand All @@ -403,20 +477,18 @@ func readMetadataFromCache(bigData io.Reader) (*metadata, error) {
return nil, err
}

vdata := make([]byte, vdataLen)
if _, err := bigData.Read(vdata); err != nil {
return nil, err
}
// retrieve the unread part of the buffer.
vdata := bigDataBuffer[len(bigDataBuffer)-bigData.Len():]

return &metadata{
return &cacheFile{
tagLen: int(tagLen),
digestLen: int(digestLen),
tags: tags,
vdata: vdata,
}, nil
}

func prepareMetadata(manifest []byte, format graphdriver.DifferOutputFormat) ([]*internal.FileMetadata, error) {
func prepareCacheFile(manifest []byte, format graphdriver.DifferOutputFormat) ([]*internal.FileMetadata, error) {
toc, err := unmarshalToc(manifest)
if err != nil {
// ignore errors here. They might be caused by a different manifest format.
Expand Down Expand Up @@ -455,16 +527,17 @@ func prepareMetadata(manifest []byte, format graphdriver.DifferOutputFormat) ([]
return r, nil
}

func (c *layersCache) addLayer(id string, metadata *metadata) error {
func (c *layersCache) addLayer(id string, cacheFile *cacheFile, mmapBuffer []byte) error {
target, err := c.store.DifferTarget(id)
if err != nil {
return fmt.Errorf("get checkout directory layer %q: %w", id, err)
}

l := layer{
id: id,
metadata: metadata,
target: target,
id: id,
cacheFile: cacheFile,
target: target,
mmapBuffer: mmapBuffer,
}
c.layers = append(c.layers, l)
return nil
Expand All @@ -474,22 +547,22 @@ func byteSliceAsString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}

func findTag(digest string, metadata *metadata) (string, uint64, uint64) {
if len(digest) != metadata.digestLen {
func findTag(digest string, cacheFile *cacheFile) (string, uint64, uint64) {
if len(digest) != cacheFile.digestLen {
return "", 0, 0
}

nElements := len(metadata.tags) / metadata.tagLen
nElements := len(cacheFile.tags) / cacheFile.tagLen

i := sort.Search(nElements, func(i int) bool {
d := byteSliceAsString(metadata.tags[i*metadata.tagLen : i*metadata.tagLen+metadata.digestLen])
d := byteSliceAsString(cacheFile.tags[i*cacheFile.tagLen : i*cacheFile.tagLen+cacheFile.digestLen])
return strings.Compare(d, digest) >= 0
})
if i < nElements {
d := string(metadata.tags[i*metadata.tagLen : i*metadata.tagLen+len(digest)])
d := string(cacheFile.tags[i*cacheFile.tagLen : i*cacheFile.tagLen+len(digest)])
if digest == d {
startOff := i*metadata.tagLen + metadata.digestLen
parts := strings.Split(string(metadata.tags[startOff:(i+1)*metadata.tagLen]), "@")
startOff := i*cacheFile.tagLen + cacheFile.digestLen
parts := strings.Split(string(cacheFile.tags[startOff:(i+1)*cacheFile.tagLen]), "@")

off, _ := strconv.ParseInt(parts[0], 10, 64)

Expand All @@ -509,9 +582,9 @@ func (c *layersCache) findDigestInternal(digest string) (string, string, int64,
defer c.mutex.RUnlock()

for _, layer := range c.layers {
digest, off, tagLen := findTag(digest, layer.metadata)
digest, off, tagLen := findTag(digest, layer.cacheFile)
if digest != "" {
position := string(layer.metadata.vdata[off : off+tagLen])
position := string(layer.cacheFile.vdata[off : off+tagLen])
parts := strings.SplitN(position, ":", 3)
if len(parts) != 3 {
continue
Expand Down
Loading

0 comments on commit ec1aba0

Please sign in to comment.