Skip to content

Commit

Permalink
Propagate compression options to the inline cache export
Browse files Browse the repository at this point in the history
Co-authored-by: Tonis Tiigi <[email protected]>
Signed-off-by: Kohei Tokunaga <[email protected]>
  • Loading branch information
ktock and tonistiigi committed Oct 22, 2021
1 parent b2ff444 commit f9e0346
Show file tree
Hide file tree
Showing 21 changed files with 488 additions and 59 deletions.
121 changes: 118 additions & 3 deletions cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -35,6 +36,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
containerdsnapshot "github.com/moby/buildkit/snapshot/containerd"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/compression"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/leaseutil"
Expand Down Expand Up @@ -1125,7 +1127,9 @@ func TestConversion(t *testing.T) {
require.NoError(t, eg.Wait())
}

func TestGetRemote(t *testing.T) {
type idxToVariants []map[compression.Type]ocispecs.Descriptor

func TestGetRemotes(t *testing.T) {
t.Parallel()
// windows fails when lazy blob is being extracted with "invalid windows mount type: 'bind'"
if runtime.GOOS != "linux" {
Expand Down Expand Up @@ -1251,15 +1255,24 @@ func TestGetRemote(t *testing.T) {

checkNumBlobs(ctx, t, co.cs, 1)

// Call GetRemote on all the refs
variantsMap := make(map[string]idxToVariants)
var variantsMapMu sync.Mutex

// Call GetRemotes on all the refs
eg, egctx := errgroup.WithContext(ctx)
for _, ir := range refs {
ir := ir.(*immutableRef)
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType
compressionopt := solver.CompressionOpt{
Type: compressionType,
Force: true,
}
eg.Go(func() error {
remote, err := ir.GetRemote(egctx, true, compressionType, true, nil)
remotes, err := ir.GetRemotes(egctx, true, compressionopt, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(remotes))
remote := remotes[0]
refChain := ir.parentRefChain()
for i, desc := range remote.Descriptors {
switch compressionType {
Expand All @@ -1278,6 +1291,21 @@ func TestGetRemote(t *testing.T) {
require.Contains(t, expectedContent, dgst, "for %v", compressionType)
checkDescriptor(ctx, t, co.cs, desc, compressionType)

variantsMapMu.Lock()
if len(variantsMap[ir.ID()]) == 0 {
variantsMap[ir.ID()] = make(idxToVariants, len(remote.Descriptors))
}
variantsMapMu.Unlock()

require.Equal(t, len(remote.Descriptors), len(variantsMap[ir.ID()]))

variantsMapMu.Lock()
if variantsMap[ir.ID()][i] == nil {
variantsMap[ir.ID()][i] = make(map[compression.Type]ocispecs.Descriptor)
}
variantsMap[ir.ID()][i][compressionType] = desc
variantsMapMu.Unlock()

r := refChain[i]
isLazy, err := r.isLazy(egctx)
require.NoError(t, err)
Expand Down Expand Up @@ -1318,6 +1346,93 @@ func TestGetRemote(t *testing.T) {
})
require.NoError(t, err)
require.Equal(t, map[digest.Digest]struct{}{}, expectedContent)

// Check if "all" option returns all available blobs
for _, ir := range refs {
ir := ir.(*immutableRef)
variantsMapMu.Lock()
variants, ok := variantsMap[ir.ID()]
variantsMapMu.Unlock()
require.True(t, ok, ir.ID())
for _, compressionType := range []compression.Type{compression.Uncompressed, compression.Gzip, compression.EStargz, compression.Zstd} {
compressionType := compressionType
compressionopt := solver.CompressionOpt{Type: compressionType}
eg.Go(func() error {
remotes, err := ir.GetRemotes(egctx, false, compressionopt, true, nil)
require.NoError(t, err)
require.True(t, len(remotes) > 0, "for %s : %d", compressionType, len(remotes))
gotMain, gotVariants := remotes[0], remotes[1:]

// Check the main blob is compatible with all == false
mainOnly, err := ir.GetRemotes(egctx, false, compressionopt, false, nil)
require.NoError(t, err)
require.Equal(t, 1, len(mainOnly))
mainRemote := mainOnly[0]
require.Equal(t, len(mainRemote.Descriptors), len(gotMain.Descriptors))
for i := 0; i < len(mainRemote.Descriptors); i++ {
require.Equal(t, mainRemote.Descriptors[i].Digest, gotMain.Descriptors[i].Digest)
}

// Check all variants are covered
checkVariantsCoverage(egctx, t, variants, len(remotes[0].Descriptors)-1, gotVariants, &compressionType)
return nil
})
}
}
require.NoError(t, eg.Wait())
}

func checkVariantsCoverage(ctx context.Context, t *testing.T, variants idxToVariants, idx int, remotes []*solver.Remote, expectCompression *compression.Type) {
if idx < 0 {
for _, r := range remotes {
require.Equal(t, len(r.Descriptors), 0)
}
return
}

// check the contents of the topmost blob of each remote
got := make(map[digest.Digest][]*solver.Remote)
for _, r := range remotes {
require.Equal(t, len(r.Descriptors)-1, idx, "idx = %d", idx)

// record this variant
topmost, lower := r.Descriptors[idx], r.Descriptors[:idx]
got[topmost.Digest] = append(got[topmost.Digest], &solver.Remote{Descriptors: lower, Provider: r.Provider})

// check the contents
r, err := r.Provider.ReaderAt(ctx, topmost)
require.NoError(t, err)
dgstr := digest.Canonical.Digester()
_, err = io.Copy(dgstr.Hash(), io.NewSectionReader(r, 0, topmost.Size))
require.NoError(t, err)
require.NoError(t, r.Close())
require.Equal(t, dgstr.Digest(), topmost.Digest)
}

// check the lowers as well
eg, egctx := errgroup.WithContext(ctx)
for _, lowers := range got {
lowers := lowers
eg.Go(func() error {
checkVariantsCoverage(egctx, t, variants, idx-1, lowers, nil) // expect all compression variants
return nil
})
}
require.NoError(t, eg.Wait())

// check the coverage of the variants
targets := variants[idx]
if expectCompression != nil {
c, ok := variants[idx][*expectCompression]
require.True(t, ok, "idx = %d, compression = %q, variants = %+v, got = %+v", idx, *expectCompression, variants[idx], got)
targets = map[compression.Type]ocispecs.Descriptor{*expectCompression: c}
}
for c, d := range targets {
_, ok := got[d.Digest]
require.True(t, ok, "idx = %d, compression = %q, want = %+v, got = %+v", idx, c, d, got)
delete(got, d.Digest)
}
require.Equal(t, 0, len(got))
}

func checkInfo(ctx context.Context, t *testing.T, cs content.Store, info content.Info) {
Expand Down
26 changes: 23 additions & 3 deletions cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type ImmutableRef interface {
Clone() ImmutableRef

Extract(ctx context.Context, s session.Group) error // +progress
GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error)
GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error)
}

type MutableRef interface {
Expand Down Expand Up @@ -376,9 +376,29 @@ func compressionVariantDigestLabel(compressionType compression.Type) string {
return compressionVariantDigestLabelPrefix + compressionType.String()
}

func getCompressionVariants(ctx context.Context, cs content.Store, dgst digest.Digest) (res []compression.Type, _ error) {
info, err := cs.Info(ctx, dgst)
if errors.Is(err, errdefs.ErrNotFound) {
return nil, nil
} else if err != nil {
return nil, err
}
for k := range info.Labels {
if strings.HasPrefix(k, compressionVariantDigestLabelPrefix) {
if t := compression.Parse(strings.TrimPrefix(k, compressionVariantDigestLabelPrefix)); t != compression.UnknownCompression {
res = append(res, t)
}
}
}
return
}

func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) {
cs := sr.cm.ContentStore
info, err := cs.Info(ctx, sr.getBlob())
return getCompressionVariantBlob(ctx, sr.cm.ContentStore, sr.getBlob(), compressionType)
}

func getCompressionVariantBlob(ctx context.Context, cs content.Store, dgst digest.Digest, compressionType compression.Type) (ocispecs.Descriptor, error) {
info, err := cs.Info(ctx, dgst)
if err != nil {
return ocispecs.Descriptor{}, err
}
Expand Down
114 changes: 110 additions & 4 deletions cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,122 @@ type Unlazier interface {
Unlazy(ctx context.Context) error
}

// GetRemote gets a *solver.Remote from content store for this ref (potentially pulling lazily).
// Note: Use WorkerRef.GetRemote instead as moby integration requires custom GetRemote implementation.
func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error) {
// GetRemotes gets []*solver.Remote from content store for this ref (potentially pulling lazily).
// Compressionopt can be used to specify the compression type of blobs. If Force is true, the compression
// type is applied to all blobs in the chain. If Force is false, it's applied only to the newly created
// layers. If all is true, all available chains that has the specified compression type of topmost blob are
// appended to the result.
// Note: Use WorkerRef.GetRemotes instead as moby integration requires custom GetRemotes implementation.
func (sr *immutableRef) GetRemotes(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, all bool, s session.Group) ([]*solver.Remote, error) {
ctx, done, err := leaseutil.WithLease(ctx, sr.cm.LeaseManager, leaseutil.MakeTemporary)
if err != nil {
return nil, err
}
defer done(ctx)

err = sr.computeBlobChain(ctx, createIfNeeded, compressionType, forceCompression, s)
// fast path if compression variants aren't required
// NOTE: compressionopt is applied only to *newly created layers* if Force != true.
remote, err := sr.getRemote(ctx, createIfNeeded, compressionopt, s)
if err != nil {
return nil, err
}
if !all || compressionopt.Force || len(remote.Descriptors) == 0 {
return []*solver.Remote{remote}, nil // early return if compression variants aren't required
}

// Search all available remotes that has the topmost blob with the specified
// compression with all combination of copmressions
res := []*solver.Remote{remote}
topmost, parentChain := remote.Descriptors[len(remote.Descriptors)-1], remote.Descriptors[:len(remote.Descriptors)-1]
vDesc, err := getCompressionVariantBlob(ctx, sr.cm.ContentStore, topmost.Digest, compressionopt.Type)
if err != nil {
return res, nil // compression variant doesn't exist. return the main blob only.
}

var variants []*solver.Remote
if len(parentChain) == 0 {
variants = append(variants, &solver.Remote{
Descriptors: []ocispecs.Descriptor{vDesc},
Provider: sr.cm.ContentStore,
})
} else {
// get parents with all combination of all available compressions.
parents, err := getAvailableBlobs(ctx, sr.cm.ContentStore, &solver.Remote{
Descriptors: parentChain,
Provider: remote.Provider,
})
if err != nil {
return nil, err
}
variants = appendRemote(parents, vDesc, sr.cm.ContentStore)
}

// Return the main remote and all its compression variants.
// NOTE: Because compressionopt is applied only to *newly created layers* in the main remote (i.e. res[0]),
// it's possible that the main remote doesn't contain any blobs of the compressionopt.Type.
// The topmost blob of the variants (res[1:]) is guaranteed to be the compressionopt.Type.
res = append(res, variants...)
return res, nil
}

func appendRemote(parents []*solver.Remote, desc ocispecs.Descriptor, p content.Provider) (res []*solver.Remote) {
for _, pRemote := range parents {
provider := contentutil.NewMultiProvider(pRemote.Provider)
provider.Add(desc.Digest, p)
res = append(res, &solver.Remote{
Descriptors: append(pRemote.Descriptors, desc),
Provider: provider,
})
}
return
}

func getAvailableBlobs(ctx context.Context, cs content.Store, chain *solver.Remote) ([]*solver.Remote, error) {
if len(chain.Descriptors) == 0 {
return nil, nil
}
target, parentChain := chain.Descriptors[len(chain.Descriptors)-1], chain.Descriptors[:len(chain.Descriptors)-1]
parents, err := getAvailableBlobs(ctx, cs, &solver.Remote{
Descriptors: parentChain,
Provider: chain.Provider,
})
if err != nil {
return nil, err
}
compressions, err := getCompressionVariants(ctx, cs, target.Digest)
if err != nil {
return nil, err
}
var res []*solver.Remote
for _, c := range compressions {
desc, err := getCompressionVariantBlob(ctx, cs, target.Digest, c)
if err != nil {
return nil, err
}
if len(parents) == 0 { // bottommost ref
res = append(res, &solver.Remote{
Descriptors: []ocispecs.Descriptor{desc},
Provider: cs,
})
continue
}
res = append(res, appendRemote(parents, desc, cs)...)
}
if len(res) == 0 {
// no available compression blobs for this blob. return the original blob.
if len(parents) == 0 { // bottommost ref
return []*solver.Remote{chain}, nil
}
return appendRemote(parents, target, chain.Provider), nil
}
return res, nil
}

func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, compressionopt solver.CompressionOpt, s session.Group) (*solver.Remote, error) {
compressionType := compressionopt.Type
forceCompression := compressionopt.Force

err := sr.computeBlobChain(ctx, createIfNeeded, compressionType, forceCompression, s)
if err != nil {
return nil, err
}
Expand Down
20 changes: 18 additions & 2 deletions cache/remotecache/v1/cachestorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,25 @@ func (cs *cacheResultStorage) Load(ctx context.Context, res solver.CacheResult)
return worker.NewWorkerRefResult(ref, cs.w), nil
}

func (cs *cacheResultStorage) LoadRemote(ctx context.Context, res solver.CacheResult, _ session.Group) (*solver.Remote, error) {
func (cs *cacheResultStorage) LoadRemotes(ctx context.Context, res solver.CacheResult, compressionopts *solver.CompressionOpt, _ session.Group) ([]*solver.Remote, error) {
if r := cs.byResultID(res.ID); r != nil && r.result != nil {
return r.result, nil
if compressionopts == nil {
return []*solver.Remote{r.result}, nil
}
// Any of blobs in the remote must meet the specified compression option.
match := false
for _, desc := range r.result.Descriptors {
m := compressionopts.Type.IsMediaType(desc.MediaType)
match = match || m
if compressionopts.Force && !m {
match = false
break
}
}
if match {
return []*solver.Remote{r.result}, nil
}
return nil, nil // return nil as it's best effort.
}
return nil, errors.WithStack(solver.ErrNotFound)
}
Expand Down
Loading

0 comments on commit f9e0346

Please sign in to comment.