From a9f1980ebbe85053232f8c512e9a906e5d568aa9 Mon Sep 17 00:00:00 2001 From: Erik Sipsma Date: Fri, 9 Jul 2021 00:09:35 +0000 Subject: [PATCH] Refactor cache metadata interface. There are a few goals with this refactor: 1. Remove external access to fields that no longer make sense and/or won't make sense soon due to other potential changes. For example, there can now be multiple blobs associated with a ref (for different compression types), so the fact that you could access the "Blob" field from the Info method on Ref incorrectly implied there was just a single blob for the ref. This is on top of the fact that there is no need for external access to blob digests. 2. Centralize use of cache metadata inside the cache package. Previously, many parts of the code outside the cache package could obtain the bolt storage item for any ref and read/write it directly. This made it hard to understand what fields are used and when. Now, the Metadata method has been removed from the Ref interface and replaced with getters+setters for metadata fields we want to expose outside the package, which makes it much easier to track and understand. Similar changes have been made to the metadata search interface. 3. Use a consistent getter+setter interface for metadata, replacing the mix of interfaces like Metadata(), Size(), Info() and other inconsistencies. Signed-off-by: Erik Sipsma --- cache/blobs.go | 32 +- cache/contenthash/checksum.go | 66 +-- cache/contenthash/checksum_test.go | 46 +- cache/manager.go | 262 +++++---- cache/manager_test.go | 121 ++--- cache/metadata.go | 736 ++++++++++++-------------- cache/metadata/metadata.go | 20 +- cache/migrate_v2.go | 68 +-- cache/refs.go | 176 +++--- cache/remote.go | 10 +- exporter/containerimage/writer.go | 4 +- frontend/gateway/container.go | 2 +- solver/llbsolver/file/refmanager.go | 3 +- solver/llbsolver/mounts/mount.go | 51 +- solver/llbsolver/mounts/mount_test.go | 37 +- solver/llbsolver/ops/exec.go | 5 +- solver/llbsolver/ops/file.go | 7 +- source/containerimage/pull.go | 6 +- source/git/gitsource.go | 80 +-- source/git/gitsource_test.go | 7 +- source/http/httpsource.go | 169 +++--- source/http/httpsource_test.go | 11 +- source/local/local.go | 68 +-- worker/base/worker.go | 57 +- worker/cacheresult.go | 6 +- worker/containerd/containerd.go | 9 +- worker/runc/runc.go | 10 +- worker/worker.go | 2 - 28 files changed, 991 insertions(+), 1080 deletions(-) diff --git a/cache/blobs.go b/cache/blobs.go index 2b69ec24a7f6..5c3c123d38fe 100644 --- a/cache/blobs.go +++ b/cache/blobs.go @@ -58,7 +58,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool eg.Go(func() error { _, err := g.Do(ctx, fmt.Sprintf("%s-%t", sr.ID(), createIfNeeded), func(ctx context.Context) (interface{}, error) { - if getBlob(sr.md) != "" { + if sr.getBlob() != "" { return nil, nil } if !createIfNeeded { @@ -187,7 +187,7 @@ func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression sr.mu.Lock() defer sr.mu.Unlock() - if getBlob(sr.md) != "" { + if sr.getBlob() != "" { return nil } @@ -202,11 +202,11 @@ func (sr *immutableRef) setBlob(ctx context.Context, compressionType compression return err } - queueDiffID(sr.md, diffID.String()) - queueBlob(sr.md, desc.Digest.String()) - queueMediaType(sr.md, desc.MediaType) - queueBlobSize(sr.md, desc.Size) - if err := sr.md.Commit(); err != nil { + sr.queueDiffID(diffID) + sr.queueBlob(desc.Digest) + sr.queueMediaType(desc.MediaType) + sr.queueBlobSize(desc.Size) + if err := sr.commitMetadata(); err != nil { return err } @@ -224,33 +224,33 @@ func (sr *immutableRef) setChains(ctx context.Context) error { sr.mu.Lock() defer sr.mu.Unlock() - if getChainID(sr.md) != "" { + if sr.getChainID() != "" { return nil } var chainIDs []digest.Digest var blobChainIDs []digest.Digest if sr.parent != nil { - chainIDs = append(chainIDs, digest.Digest(getChainID(sr.parent.md))) - blobChainIDs = append(blobChainIDs, digest.Digest(getBlobChainID(sr.parent.md))) + chainIDs = append(chainIDs, digest.Digest(sr.parent.getChainID())) + blobChainIDs = append(blobChainIDs, digest.Digest(sr.parent.getBlobChainID())) } - diffID := digest.Digest(getDiffID(sr.md)) + diffID := digest.Digest(sr.getDiffID()) chainIDs = append(chainIDs, diffID) - blobChainIDs = append(blobChainIDs, imagespecidentity.ChainID([]digest.Digest{digest.Digest(getBlob(sr.md)), diffID})) + blobChainIDs = append(blobChainIDs, imagespecidentity.ChainID([]digest.Digest{digest.Digest(sr.getBlob()), diffID})) chainID := imagespecidentity.ChainID(chainIDs) blobChainID := imagespecidentity.ChainID(blobChainIDs) - queueChainID(sr.md, chainID.String()) - queueBlobChainID(sr.md, blobChainID.String()) - if err := sr.md.Commit(); err != nil { + sr.queueChainID(chainID) + sr.queueBlobChainID(blobChainID) + if err := sr.commitMetadata(); err != nil { return err } return nil } func isTypeWindows(sr *immutableRef) bool { - if GetLayerType(sr) == "windows" { + if sr.GetLayerType() == "windows" { return true } if parent := sr.parent; parent != nil { diff --git a/cache/contenthash/checksum.go b/cache/contenthash/checksum.go index 4813c274a91f..77c9649dd139 100644 --- a/cache/contenthash/checksum.go +++ b/cache/contenthash/checksum.go @@ -12,11 +12,9 @@ import ( "sync" "github.com/docker/docker/pkg/fileutils" - "github.com/docker/docker/pkg/idtools" iradix "github.com/hashicorp/go-immutable-radix" "github.com/hashicorp/golang-lru/simplelru" "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" "github.com/moby/locker" @@ -31,8 +29,6 @@ var errNotFound = errors.Errorf("not found") var defaultManager *cacheManager var defaultManagerOnce sync.Once -const keyContentHash = "buildkit.contenthash.v0" - func getDefaultManager() *cacheManager { defaultManagerOnce.Do(func() { lru, _ := simplelru.NewLRU(20, nil) // error is impossible on positive size @@ -58,15 +54,15 @@ func Checksum(ctx context.Context, ref cache.ImmutableRef, path string, opts Che return getDefaultManager().Checksum(ctx, ref, path, opts, s) } -func GetCacheContext(ctx context.Context, md *metadata.StorageItem, idmap *idtools.IdentityMapping) (CacheContext, error) { - return getDefaultManager().GetCacheContext(ctx, md, idmap) +func GetCacheContext(ctx context.Context, md cache.RefMetadata) (CacheContext, error) { + return getDefaultManager().GetCacheContext(ctx, md) } -func SetCacheContext(ctx context.Context, md *metadata.StorageItem, cc CacheContext) error { +func SetCacheContext(ctx context.Context, md cache.RefMetadata, cc CacheContext) error { return getDefaultManager().SetCacheContext(ctx, md, cc) } -func ClearCacheContext(md *metadata.StorageItem) { +func ClearCacheContext(md cache.RefMetadata) { getDefaultManager().clearCacheContext(md.ID()) } @@ -99,14 +95,14 @@ func (cm *cacheManager) Checksum(ctx context.Context, ref cache.ImmutableRef, p } return "", errors.Errorf("%s: no such file or directory", p) } - cc, err := cm.GetCacheContext(ctx, ensureOriginMetadata(ref.Metadata()), ref.IdentityMapping()) + cc, err := cm.GetCacheContext(ctx, ensureOriginMetadata(ref)) if err != nil { return "", nil } return cc.Checksum(ctx, ref, p, opts, s) } -func (cm *cacheManager) GetCacheContext(ctx context.Context, md *metadata.StorageItem, idmap *idtools.IdentityMapping) (CacheContext, error) { +func (cm *cacheManager) GetCacheContext(ctx context.Context, md cache.RefMetadata) (CacheContext, error) { cm.locker.Lock(md.ID()) cm.lruMu.Lock() v, ok := cm.lru.Get(md.ID()) @@ -116,7 +112,7 @@ func (cm *cacheManager) GetCacheContext(ctx context.Context, md *metadata.Storag v.(*cacheContext).linkMap = map[string][][]byte{} return v.(*cacheContext), nil } - cc, err := newCacheContext(md, idmap) + cc, err := newCacheContext(md) if err != nil { cm.locker.Unlock(md.ID()) return nil, err @@ -128,14 +124,14 @@ func (cm *cacheManager) GetCacheContext(ctx context.Context, md *metadata.Storag return cc, nil } -func (cm *cacheManager) SetCacheContext(ctx context.Context, md *metadata.StorageItem, cci CacheContext) error { +func (cm *cacheManager) SetCacheContext(ctx context.Context, md cache.RefMetadata, cci CacheContext) error { cc, ok := cci.(*cacheContext) if !ok { return errors.Errorf("invalid cachecontext: %T", cc) } if md.ID() != cc.md.ID() { cc = &cacheContext{ - md: md, + md: cacheMetadata{md}, tree: cci.(*cacheContext).tree, dirtyMap: map[string]struct{}{}, linkMap: map[string][][]byte{}, @@ -159,7 +155,7 @@ func (cm *cacheManager) clearCacheContext(id string) { type cacheContext struct { mu sync.RWMutex - md *metadata.StorageItem + md cacheMetadata tree *iradix.Tree dirty bool // needs to be persisted to disk @@ -168,7 +164,20 @@ type cacheContext struct { node *iradix.Node dirtyMap map[string]struct{} linkMap map[string][][]byte - idmap *idtools.IdentityMapping +} + +type cacheMetadata struct { + cache.RefMetadata +} + +const keyContentHash = "buildkit.contenthash.v0" + +func (md cacheMetadata) GetContentHash() ([]byte, error) { + return md.GetExternal(keyContentHash) +} + +func (md cacheMetadata) SetContentHash(dt []byte) error { + return md.SetExternal(keyContentHash, dt) } type mount struct { @@ -209,13 +218,12 @@ func (m *mount) clean() error { return nil } -func newCacheContext(md *metadata.StorageItem, idmap *idtools.IdentityMapping) (*cacheContext, error) { +func newCacheContext(md cache.RefMetadata) (*cacheContext, error) { cc := &cacheContext{ - md: md, + md: cacheMetadata{md}, tree: iradix.New(), dirtyMap: map[string]struct{}{}, linkMap: map[string][][]byte{}, - idmap: idmap, } if err := cc.load(); err != nil { return nil, err @@ -224,7 +232,7 @@ func newCacheContext(md *metadata.StorageItem, idmap *idtools.IdentityMapping) ( } func (cc *cacheContext) load() error { - dt, err := cc.md.GetExternal(keyContentHash) + dt, err := cc.md.GetContentHash() if err != nil { return nil } @@ -265,7 +273,7 @@ func (cc *cacheContext) save() error { return err } - return cc.md.SetExternal(keyContentHash, dt) + return cc.md.SetContentHash(dt) } func keyPath(p string) string { @@ -1016,20 +1024,12 @@ func addParentToMap(d string, m map[string]struct{}) { addParentToMap(d, m) } -func ensureOriginMetadata(md *metadata.StorageItem) *metadata.StorageItem { - v := md.Get("cache.equalMutable") // TODO: const - if v == nil { - return md - } - var mutable string - if err := v.Unmarshal(&mutable); err != nil { - return md - } - si, ok := md.Storage().Get(mutable) - if ok { - return si +func ensureOriginMetadata(md cache.RefMetadata) cache.RefMetadata { + em, ok := md.GetEqualMutable() + if !ok { + em = md } - return md + return em } var pool32K = sync.Pool{ diff --git a/cache/contenthash/checksum_test.go b/cache/contenthash/checksum_test.go index 2ecd68edaf31..c7d6fc8afd42 100644 --- a/cache/contenthash/checksum_test.go +++ b/cache/contenthash/checksum_test.go @@ -59,7 +59,7 @@ func TestChecksumSymlinkNoParentScan(t *testing.T) { ref := createRef(t, cm, ch) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) dgst, err := cc.Checksum(context.TODO(), ref, "aa/ln/bb/cc/dd", ChecksumOpts{FollowLinks: true}, nil) @@ -87,7 +87,7 @@ func TestChecksumHardlinks(t *testing.T) { ref := createRef(t, cm, ch) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) dgst, err := cc.Checksum(context.TODO(), ref, "abc/foo", ChecksumOpts{}, nil) @@ -105,7 +105,7 @@ func TestChecksumHardlinks(t *testing.T) { // validate same results with handleChange ref2 := createRef(t, cm, nil) - cc2, err := newCacheContext(ref2.Metadata(), nil) + cc2, err := newCacheContext(ref2) require.NoError(t, err) err = emit(cc2.HandleChange, changeStream(ch)) @@ -176,7 +176,7 @@ func TestChecksumWildcardOrFilter(t *testing.T) { ref := createRef(t, cm, ch) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) dgst, err := cc.Checksum(context.TODO(), ref, "f*o", ChecksumOpts{Wildcard: true}, nil) @@ -220,7 +220,7 @@ func TestChecksumWildcardWithBadMountable(t *testing.T) { ref := createRef(t, cm, nil) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) _, err = cc.Checksum(context.TODO(), newBadMountable(), "*", ChecksumOpts{Wildcard: true}, nil) @@ -249,7 +249,7 @@ func TestSymlinksNoFollow(t *testing.T) { ref := createRef(t, cm, ch) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) expectedSym := digest.Digest("sha256:a2ba571981f48ec34eb79c9a3ab091b6491e825c2f7e9914ea86e8e958be7fae") @@ -311,7 +311,7 @@ func TestChecksumBasicFile(t *testing.T) { // for the digest values, the actual values are not important in development // phase but consistency is - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) _, err = cc.Checksum(context.TODO(), ref, "nosuch", ChecksumOpts{FollowLinks: true}, nil) @@ -372,7 +372,7 @@ func TestChecksumBasicFile(t *testing.T) { ref = createRef(t, cm, ch) - cc, err = newCacheContext(ref.Metadata(), nil) + cc, err = newCacheContext(ref) require.NoError(t, err) dgst, err = cc.Checksum(context.TODO(), ref, "/", ChecksumOpts{FollowLinks: true}, nil) @@ -391,7 +391,7 @@ func TestChecksumBasicFile(t *testing.T) { ref = createRef(t, cm, ch) - cc, err = newCacheContext(ref.Metadata(), nil) + cc, err = newCacheContext(ref) require.NoError(t, err) dgst, err = cc.Checksum(context.TODO(), ref, "/", ChecksumOpts{FollowLinks: true}, nil) @@ -417,7 +417,7 @@ func TestChecksumBasicFile(t *testing.T) { ref = createRef(t, cm, ch) - cc, err = newCacheContext(ref.Metadata(), nil) + cc, err = newCacheContext(ref) require.NoError(t, err) dgst, err = cc.Checksum(context.TODO(), ref, "abc/aa/foo", ChecksumOpts{FollowLinks: true}, nil) @@ -466,7 +466,7 @@ func testChecksumIncludeExclude(t *testing.T, wildcard bool) { ref := createRef(t, cm, ch) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) opts := func(opts ChecksumOpts) ChecksumOpts { @@ -525,7 +525,7 @@ func testChecksumIncludeExclude(t *testing.T, wildcard bool) { ref = createRef(t, cm, ch) - cc, err = newCacheContext(ref.Metadata(), nil) + cc, err = newCacheContext(ref) require.NoError(t, err) dgstFoo2, err := cc.Checksum(context.TODO(), ref, "", opts(ChecksumOpts{IncludePatterns: []string{"foo"}}), nil) @@ -599,7 +599,7 @@ func TestChecksumIncludeDoubleStar(t *testing.T) { ref := createRef(t, cm, ch) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) dgst, err := cc.Checksum(context.TODO(), ref, "prefix/a", ChecksumOpts{IncludePatterns: []string{"**/foo/**"}}, nil) @@ -623,7 +623,7 @@ func TestChecksumIncludeDoubleStar(t *testing.T) { ref = createRef(t, cm, ch) - cc, err = newCacheContext(ref.Metadata(), nil) + cc, err = newCacheContext(ref) require.NoError(t, err) dgst, err = cc.Checksum(context.TODO(), ref, "prefix/a", ChecksumOpts{IncludePatterns: []string{"**/foo/**", "**/report"}}, nil) @@ -671,7 +671,7 @@ func TestChecksumIncludeSymlink(t *testing.T) { ref := createRef(t, cm, ch) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) dgst, err := cc.Checksum(context.TODO(), ref, "data/d1", ChecksumOpts{IncludePatterns: []string{"**/foo"}}, nil) @@ -715,7 +715,7 @@ func TestHandleChange(t *testing.T) { // for the digest values, the actual values are not important in development // phase but consistency is - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) err = emit(cc.HandleChange, changeStream(ch)) @@ -791,7 +791,7 @@ func TestHandleRecursiveDir(t *testing.T) { ref := createRef(t, cm, nil) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) err = emit(cc.HandleChange, changeStream(ch)) @@ -838,7 +838,7 @@ func TestChecksumUnorderedFiles(t *testing.T) { ref := createRef(t, cm, nil) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) err = emit(cc.HandleChange, changeStream(ch)) @@ -858,7 +858,7 @@ func TestChecksumUnorderedFiles(t *testing.T) { ref = createRef(t, cm, nil) - cc, err = newCacheContext(ref.Metadata(), nil) + cc, err = newCacheContext(ref) require.NoError(t, err) err = emit(cc.HandleChange, changeStream(ch)) @@ -1045,7 +1045,7 @@ func TestSymlinkInPathHandleChange(t *testing.T) { ref := createRef(t, cm, nil) - cc, err := newCacheContext(ref.Metadata(), nil) + cc, err := newCacheContext(ref) require.NoError(t, err) err = emit(cc.HandleChange, changeStream(ch)) @@ -1165,9 +1165,6 @@ func createRef(t *testing.T, cm cache.Manager, files []string) cache.ImmutableRe } func setupCacheManager(t *testing.T, tmpdir string, snapshotterName string, snapshotter snapshots.Snapshotter) (cache.Manager, func()) { - md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) - require.NoError(t, err) - store, err := local.NewStore(tmpdir) require.NoError(t, err) @@ -1178,6 +1175,9 @@ func setupCacheManager(t *testing.T, tmpdir string, snapshotterName string, snap snapshotterName: snapshotter, }) + md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) + require.NoError(t, err) + cm, err := cache.NewManager(cache.ManagerOpt{ Snapshotter: snapshot.FromContainerdSnapshotter(snapshotterName, containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter(snapshotterName)), nil), MetadataStore: md, diff --git a/cache/manager.go b/cache/manager.go index b815b0ceaf73..e63879975fb7 100644 --- a/cache/manager.go +++ b/cache/manager.go @@ -35,23 +35,24 @@ var ( type ManagerOpt struct { Snapshotter snapshot.Snapshotter - MetadataStore *metadata.Store ContentStore content.Store LeaseManager leases.Manager PruneRefChecker ExternalRefCheckerFunc GarbageCollect func(ctx context.Context) (gc.Stats, error) Applier diff.Applier Differ diff.Comparer + MetadataStore *metadata.Store } type Accessor interface { + MetadataStore + GetByBlob(ctx context.Context, desc ocispecs.Descriptor, parent ImmutableRef, opts ...RefOption) (ImmutableRef, error) Get(ctx context.Context, id string, opts ...RefOption) (ImmutableRef, error) New(ctx context.Context, parent ImmutableRef, s session.Group, opts ...RefOption) (MutableRef, error) GetMutable(ctx context.Context, id string, opts ...RefOption) (MutableRef, error) // Rebase? IdentityMapping() *idtools.IdentityMapping - Metadata(string) *metadata.StorageItem } type Controller interface { @@ -75,7 +76,6 @@ type cacheManager struct { records map[string]*cacheRecord mu sync.Mutex ManagerOpt - md *metadata.Store muPrune sync.Mutex // make sure parallel prune is not allowed so there will not be inconsistent results unlazyG flightcontrol.Group @@ -84,7 +84,6 @@ type cacheManager struct { func NewManager(opt ManagerOpt) (Manager, error) { cm := &cacheManager{ ManagerOpt: opt, - md: opt.MetadataStore, records: make(map[string]*cacheRecord), } @@ -117,19 +116,21 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor, var p *immutableRef var parentID string if parent != nil { - pInfo := parent.Info() - if pInfo.ChainID == "" || pInfo.BlobChainID == "" { - return nil, errors.Errorf("failed to get ref by blob on non-addressable parent") - } - chainID = imagespecidentity.ChainID([]digest.Digest{pInfo.ChainID, chainID}) - blobChainID = imagespecidentity.ChainID([]digest.Digest{pInfo.BlobChainID, blobChainID}) - p2, err := cm.Get(ctx, parent.ID(), NoUpdateLastUsed, descHandlers) if err != nil { return nil, err } p = p2.(*immutableRef) + + if p.getChainID() == "" || p.getBlobChainID() == "" { + p.Release(context.TODO()) + return nil, errors.Errorf("failed to get ref by blob on non-addressable parent") + } + chainID = imagespecidentity.ChainID([]digest.Digest{p.getChainID(), chainID}) + blobChainID = imagespecidentity.ChainID([]digest.Digest{p.getBlobChainID(), blobChainID}) + if err := p.finalizeLocked(ctx); err != nil { + p.Release(context.TODO()) return nil, err } parentID = p.ID() @@ -145,7 +146,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor, cm.mu.Lock() defer cm.mu.Unlock() - sis, err := cm.MetadataStore.Search("blobchainid:" + blobChainID.String()) + sis, err := cm.searchBlobchain(ctx, blobChainID) if err != nil { return nil, err } @@ -161,23 +162,23 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor, if p != nil { releaseParent = true } - if err := setImageRefMetadata(ref, opts...); err != nil { + if err := setImageRefMetadata(ref.cacheMetadata, opts...); err != nil { return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", ref.ID()) } return ref, nil } - sis, err = cm.MetadataStore.Search("chainid:" + chainID.String()) + sis, err = cm.searchChain(ctx, chainID) if err != nil { return nil, err } - var link ImmutableRef + var link *immutableRef for _, si := range sis { ref, err := cm.get(ctx, si.ID(), opts...) // if the error was NotFound or NeedsRemoteProvider, we can't re-use the snapshot from the blob so just skip it if err != nil && !IsNotFound(err) && !errors.As(err, &NeedsRemoteProvidersError{}) { - return nil, errors.Wrapf(err, "failed to get record %s by chainid", sis[0].ID()) + return nil, errors.Wrapf(err, "failed to get record %s by chainid", si.ID()) } if ref != nil { link = ref @@ -189,8 +190,8 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor, snapshotID := chainID.String() blobOnly := true if link != nil { - snapshotID = getSnapshotID(link.Metadata()) - blobOnly = getBlobOnly(link.Metadata()) + snapshotID = link.getSnapshotID() + blobOnly = link.getBlobOnly() go link.Release(context.TODO()) } @@ -231,35 +232,35 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor, } } - md, _ := cm.md.Get(id) + md, _ := cm.getMetadata(id) rec := &cacheRecord{ - mu: &sync.Mutex{}, - cm: cm, - refs: make(map[ref]struct{}), - parent: p, - md: md, + mu: &sync.Mutex{}, + cm: cm, + refs: make(map[ref]struct{}), + parent: p, + cacheMetadata: md, } - if err := initializeMetadata(rec, parentID, opts...); err != nil { + if err := initializeMetadata(rec.cacheMetadata, parentID, opts...); err != nil { return nil, err } - if err := setImageRefMetadata(rec, opts...); err != nil { + if err := setImageRefMetadata(rec.cacheMetadata, opts...); err != nil { return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID()) } - queueDiffID(rec.md, diffID.String()) - queueBlob(rec.md, desc.Digest.String()) - queueChainID(rec.md, chainID.String()) - queueBlobChainID(rec.md, blobChainID.String()) - queueSnapshotID(rec.md, snapshotID) - queueBlobOnly(rec.md, blobOnly) - queueMediaType(rec.md, desc.MediaType) - queueBlobSize(rec.md, desc.Size) - queueCommitted(rec.md) + rec.queueDiffID(diffID) + rec.queueBlob(desc.Digest) + rec.queueChainID(chainID) + rec.queueBlobChainID(blobChainID) + rec.queueSnapshotID(snapshotID) + rec.queueBlobOnly(blobOnly) + rec.queueMediaType(desc.MediaType) + rec.queueBlobSize(desc.Size) + rec.queueCommitted(true) - if err := rec.md.Commit(); err != nil { + if err := rec.commitMetadata(); err != nil { return nil, err } @@ -271,7 +272,7 @@ func (cm *cacheManager) GetByBlob(ctx context.Context, desc ocispecs.Descriptor, // init loads all snapshots from metadata state and tries to load the records // from the snapshotter. If snaphot can't be found, metadata is deleted as well. func (cm *cacheManager) init(ctx context.Context) error { - items, err := cm.md.All() + items, err := cm.MetadataStore.All() if err != nil { return err } @@ -279,7 +280,7 @@ func (cm *cacheManager) init(ctx context.Context) error { for _, si := range items { if _, err := cm.getRecord(ctx, si.ID()); err != nil { logrus.Debugf("could not load snapshot %s: %+v", si.ID(), err) - cm.md.Clear(si.ID()) + cm.MetadataStore.Clear(si.ID()) cm.LeaseManager.Delete(ctx, leases.Lease{ID: si.ID()}) } } @@ -295,7 +296,7 @@ func (cm *cacheManager) IdentityMapping() *idtools.IdentityMapping { // method should be called after Close. func (cm *cacheManager) Close() error { // TODO: allocate internal context and cancel it here - return cm.md.Close() + return cm.MetadataStore.Close() } // Get returns an immutable snapshot reference for ID @@ -305,16 +306,6 @@ func (cm *cacheManager) Get(ctx context.Context, id string, opts ...RefOption) ( return cm.get(ctx, id, opts...) } -func (cm *cacheManager) Metadata(id string) *metadata.StorageItem { - cm.mu.Lock() - defer cm.mu.Unlock() - r, ok := cm.records[id] - if !ok { - return nil - } - return r.Metadata() -} - // get requires manager lock to be taken func (cm *cacheManager) get(ctx context.Context, id string, opts ...RefOption) (*immutableRef, error) { rec, err := cm.getRecord(ctx, id, opts...) @@ -352,7 +343,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt missing := NeedsRemoteProvidersError(nil) dhs := descHandlersOf(opts...) for { - blob := digest.Digest(getBlob(rec.md)) + blob := rec.getBlob() if isLazy, err := rec.isLazy(ctx); err != nil { return err } else if isLazy && dhs[blob] == nil { @@ -380,16 +371,16 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt return rec, nil } - md, ok := cm.md.Get(id) + md, ok := cm.getMetadata(id) if !ok { return nil, errors.Wrap(errNotFound, id) } - if mutableID := getEqualMutable(md); mutableID != "" { + if mutableID := md.getEqualMutable(); mutableID != "" { mutable, err := cm.getRecord(ctx, mutableID) if err != nil { // check loading mutable deleted record from disk if IsNotFound(err) { - cm.md.Clear(id) + cm.MetadataStore.Clear(id) } return nil, err } @@ -400,12 +391,12 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt dhs = mutable.parent.descHandlers } rec := &cacheRecord{ - mu: &sync.Mutex{}, - cm: cm, - refs: make(map[ref]struct{}), - parent: mutable.parentRef(false, dhs), - md: md, - equalMutable: &mutableRef{cacheRecord: mutable}, + mu: &sync.Mutex{}, + cm: cm, + refs: make(map[ref]struct{}), + parent: mutable.parentRef(false, dhs), + cacheMetadata: md, + equalMutable: &mutableRef{cacheRecord: mutable}, } mutable.equalImmutable = &immutableRef{cacheRecord: rec} cm.records[id] = rec @@ -413,7 +404,7 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt } var parent *immutableRef - if parentID := getParent(md); parentID != "" { + if parentID := md.getParent(); parentID != "" { var err error parent, err = cm.get(ctx, parentID, append(opts, NoUpdateLastUsed)...) if err != nil { @@ -429,27 +420,27 @@ func (cm *cacheManager) getRecord(ctx context.Context, id string, opts ...RefOpt } rec := &cacheRecord{ - mu: &sync.Mutex{}, - mutable: !getCommitted(md), - cm: cm, - refs: make(map[ref]struct{}), - parent: parent, - md: md, + mu: &sync.Mutex{}, + mutable: !md.getCommitted(), + cm: cm, + refs: make(map[ref]struct{}), + parent: parent, + cacheMetadata: md, } // the record was deleted but we crashed before data on disk was removed - if getDeleted(md) { + if md.getDeleted() { if err := rec.remove(ctx, true); err != nil { return nil, err } return nil, errors.Wrapf(errNotFound, "failed to get deleted record %s", id) } - if err := initializeMetadata(rec, getParent(md), opts...); err != nil { + if err := initializeMetadata(rec.cacheMetadata, md.getParent(), opts...); err != nil { return nil, err } - if err := setImageRefMetadata(rec, opts...); err != nil { + if err := setImageRefMetadata(rec.cacheMetadata, opts...); err != nil { return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID()) } @@ -482,7 +473,7 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, sess session.Gr if err := parent.Extract(ctx, sess); err != nil { return nil, err } - parentSnapshotID = getSnapshotID(parent.md) + parentSnapshotID = parent.getSnapshotID() parentID = parent.ID() } @@ -533,28 +524,29 @@ func (cm *cacheManager) New(ctx context.Context, s ImmutableRef, sess session.Gr return nil, errors.Wrapf(err, "failed to prepare %s", id) } - md, _ := cm.md.Get(id) + cm.mu.Lock() + defer cm.mu.Unlock() + + md, _ := cm.getMetadata(id) rec := &cacheRecord{ - mu: &sync.Mutex{}, - mutable: true, - cm: cm, - refs: make(map[ref]struct{}), - parent: parent, - md: md, + mu: &sync.Mutex{}, + mutable: true, + cm: cm, + refs: make(map[ref]struct{}), + parent: parent, + cacheMetadata: md, } - if err := initializeMetadata(rec, parentID, opts...); err != nil { + opts = append(opts, withSnapshotID(id)) + if err := initializeMetadata(rec.cacheMetadata, parentID, opts...); err != nil { return nil, err } - if err := setImageRefMetadata(rec, opts...); err != nil { + if err := setImageRefMetadata(rec.cacheMetadata, opts...); err != nil { return nil, errors.Wrapf(err, "failed to append image ref metadata to ref %s", rec.ID()) } - cm.mu.Lock() - defer cm.mu.Unlock() - cm.records[id] = rec // TODO: save to db // parent refs are possibly lazy so keep it hold the description handlers. @@ -690,7 +682,7 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt } if len(cr.refs) == 0 { - recordType := GetRecordType(cr) + recordType := cr.GetRecordType() if recordType == "" { recordType = client.UsageRecordTypeRegular } @@ -714,7 +706,7 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt Shared: shared, } - usageCount, lastUsedAt := getLastUsed(cr.md) + usageCount, lastUsedAt := cr.getLastUsed() c.LastUsedAt = lastUsedAt c.UsageCount = usageCount @@ -735,7 +727,12 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt cr.dead = true // mark metadata as deleted in case we crash before cleanup finished - if err := setDeleted(cr.md); err != nil { + if err := cr.queueDeleted(); err != nil { + cr.mu.Unlock() + cm.mu.Unlock() + return err + } + if err := cr.commitMetadata(); err != nil { cr.mu.Unlock() cm.mu.Unlock() return err @@ -756,7 +753,10 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt // only remove single record at a time if i == 0 { cr.dead = true - err = setDeleted(cr.md) + err = cr.queueDeleted() + if err == nil { + err = cr.commitMetadata() + } } cr.mu.Unlock() } @@ -774,14 +774,14 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt // calculate sizes here so that lock does not need to be held for slow process for _, cr := range toDelete { - size := getSize(cr.md) + size := cr.getSize() if size == sizeUnknown && cr.equalImmutable != nil { - size = getSize(cr.equalImmutable.md) // benefit from DiskUsage calc + size = cr.equalImmutable.getSize() // benefit from DiskUsage calc } if size == sizeUnknown { // calling size will warm cache for next call - if _, err := cr.Size(ctx); err != nil { + if _, err := cr.size(ctx); err != nil { return err } } @@ -792,15 +792,15 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt for _, cr := range toDelete { cr.mu.Lock() - usageCount, lastUsedAt := getLastUsed(cr.md) + usageCount, lastUsedAt := cr.getLastUsed() c := client.UsageInfo{ ID: cr.ID(), Mutable: cr.mutable, InUse: len(cr.refs) > 0, - Size: getSize(cr.md), - CreatedAt: GetCreatedAt(cr.md), - Description: GetDescription(cr.md), + Size: cr.getSize(), + CreatedAt: cr.GetCreatedAt(), + Description: cr.GetDescription(), LastUsedAt: lastUsedAt, UsageCount: usageCount, } @@ -809,7 +809,7 @@ func (cm *cacheManager) prune(ctx context.Context, ch chan client.UsageInfo, opt c.Parent = cr.parent.ID() } if c.Size == sizeUnknown && cr.equalImmutable != nil { - c.Size = getSize(cr.equalImmutable.md) // benefit from DiskUsage calc + c.Size = cr.equalImmutable.getSize() // benefit from DiskUsage calc } opt.totalSize -= c.Size @@ -905,17 +905,17 @@ func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) continue } - usageCount, lastUsedAt := getLastUsed(cr.md) + usageCount, lastUsedAt := cr.getLastUsed() c := &cacheUsageInfo{ refs: len(cr.refs), mutable: cr.mutable, - size: getSize(cr.md), - createdAt: GetCreatedAt(cr.md), + size: cr.getSize(), + createdAt: cr.GetCreatedAt(), usageCount: usageCount, lastUsedAt: lastUsedAt, - description: GetDescription(cr.md), + description: cr.GetDescription(), doubleRef: cr.equalImmutable != nil, - recordType: GetRecordType(cr), + recordType: cr.GetRecordType(), parentChain: cr.parentChain(), } if c.recordType == "" { @@ -980,12 +980,14 @@ func (cm *cacheManager) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) if d.Size == sizeUnknown { func(d *client.UsageInfo) { eg.Go(func() error { - ref, err := cm.Get(ctx, d.ID, NoUpdateLastUsed) + cm.mu.Lock() + ref, err := cm.get(ctx, d.ID, NoUpdateLastUsed) + cm.mu.Unlock() if err != nil { d.Size = 0 return nil } - s, err := ref.Size(ctx) + s, err := ref.size(ctx) if err != nil { return err } @@ -1016,41 +1018,33 @@ const ( cachePolicyRetain ) -type withMetadata interface { - Metadata() *metadata.StorageItem -} - type noUpdateLastUsed struct{} var NoUpdateLastUsed noUpdateLastUsed -func HasCachePolicyRetain(m withMetadata) bool { - return getCachePolicy(m.Metadata()) == cachePolicyRetain +func CachePolicyRetain(m *cacheMetadata) error { + return m.SetCachePolicyRetain() } -func CachePolicyRetain(m withMetadata) error { - return queueCachePolicy(m.Metadata(), cachePolicyRetain) -} - -func CachePolicyDefault(m withMetadata) error { - return queueCachePolicy(m.Metadata(), cachePolicyDefault) +func CachePolicyDefault(m *cacheMetadata) error { + return m.SetCachePolicyDefault() } func WithDescription(descr string) RefOption { - return func(m withMetadata) error { - return queueDescription(m.Metadata(), descr) + return func(m *cacheMetadata) error { + return m.queueDescription(descr) } } func WithRecordType(t client.UsageRecordType) RefOption { - return func(m withMetadata) error { - return queueRecordType(m.Metadata(), t) + return func(m *cacheMetadata) error { + return m.queueRecordType(t) } } func WithCreationTime(tm time.Time) RefOption { - return func(m withMetadata) error { - return queueCreatedAt(m.Metadata(), tm) + return func(m *cacheMetadata) error { + return m.queueCreatedAt(tm) } } @@ -1058,17 +1052,16 @@ func WithCreationTime(tm time.Time) RefOption { // initializeMetadata while still being a RefOption, so wrapping it in a // different type ensures initializeMetadata won't catch it too and duplicate // setting the metadata. -type imageRefOption func(m withMetadata) error +type imageRefOption func(m *cacheMetadata) error // WithImageRef appends the given imageRef to the cache ref's metadata func WithImageRef(imageRef string) RefOption { - return imageRefOption(func(m withMetadata) error { - return appendImageRef(m.Metadata(), imageRef) + return imageRefOption(func(m *cacheMetadata) error { + return m.appendImageRef(imageRef) }) } -func setImageRefMetadata(m withMetadata, opts ...RefOption) error { - md := m.Metadata() +func setImageRefMetadata(m *cacheMetadata, opts ...RefOption) error { for _, opt := range opts { if fn, ok := opt.(imageRefOption); ok { if err := fn(m); err != nil { @@ -1076,32 +1069,37 @@ func setImageRefMetadata(m withMetadata, opts ...RefOption) error { } } } - return md.Commit() + return m.commitMetadata() +} + +func withSnapshotID(id string) RefOption { + return imageRefOption(func(m *cacheMetadata) error { + return m.queueSnapshotID(id) + }) } -func initializeMetadata(m withMetadata, parent string, opts ...RefOption) error { - md := m.Metadata() - if tm := GetCreatedAt(md); !tm.IsZero() { +func initializeMetadata(m *cacheMetadata, parent string, opts ...RefOption) error { + if tm := m.GetCreatedAt(); !tm.IsZero() { return nil } - if err := queueParent(md, parent); err != nil { + if err := m.queueParent(parent); err != nil { return err } - if err := queueCreatedAt(md, time.Now()); err != nil { + if err := m.queueCreatedAt(time.Now()); err != nil { return err } for _, opt := range opts { - if fn, ok := opt.(func(withMetadata) error); ok { + if fn, ok := opt.(func(*cacheMetadata) error); ok { if err := fn(m); err != nil { return err } } } - return md.Commit() + return m.commitMetadata() } func adaptUsageInfo(info *client.UsageInfo) filters.Adaptor { diff --git a/cache/manager_test.go b/cache/manager_test.go index 871e8d96c16b..f71c3c026cc9 100644 --- a/cache/manager_test.go +++ b/cache/manager_test.go @@ -83,7 +83,7 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() return err } defer func() { - if err != nil { + if err != nil && cleanup != nil { cleanup() } }() @@ -104,11 +104,6 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() opt.snapshotter = snapshotter } - md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) - if err != nil { - return nil, nil, err - } - store, err := local.NewStore(tmpdir) if err != nil { return nil, nil, err @@ -132,6 +127,11 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() store = containerdsnapshot.NewContentStore(mdb.ContentStore(), ns) lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), ns) + md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) + if err != nil { + return nil, nil, err + } + cm, err := NewManager(ManagerOpt{ Snapshotter: snapshot.FromContainerdSnapshotter(opt.snapshotterName, containerdsnapshot.NSSnapshotter(ns, mdb.Snapshotter(opt.snapshotterName)), nil), MetadataStore: md, @@ -363,7 +363,7 @@ func TestSnapshotExtract(t *testing.T) { snap, err := cm.GetByBlob(ctx, desc, nil) require.NoError(t, err) - require.Equal(t, false, snap.Info().Extracted) + require.Equal(t, false, !snap.(*immutableRef).getBlobOnly()) b2, desc2, err := mapToBlob(map[string]string{"foo": "bar123"}, true) require.NoError(t, err) @@ -374,11 +374,11 @@ func TestSnapshotExtract(t *testing.T) { snap2, err := cm.GetByBlob(ctx, desc2, snap) require.NoError(t, err) - size, err := snap2.Size(ctx) + size, err := snap2.(*immutableRef).size(ctx) require.NoError(t, err) require.Equal(t, int64(len(b2)), size) - require.Equal(t, false, snap2.Info().Extracted) + require.Equal(t, false, !snap2.(*immutableRef).getBlobOnly()) dirs, err := ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) require.NoError(t, err) @@ -389,8 +389,8 @@ func TestSnapshotExtract(t *testing.T) { err = snap2.Extract(ctx, nil) require.NoError(t, err) - require.Equal(t, true, snap.Info().Extracted) - require.Equal(t, true, snap2.Info().Extracted) + require.Equal(t, true, !snap.(*immutableRef).getBlobOnly()) + require.Equal(t, true, !snap2.(*immutableRef).getBlobOnly()) dirs, err = ioutil.ReadDir(filepath.Join(tmpdir, "snapshots/snapshots")) require.NoError(t, err) @@ -534,9 +534,9 @@ func TestExtractOnMutable(t *testing.T) { err = snap.Release(context.TODO()) require.NoError(t, err) - require.Equal(t, false, snap2.Info().Extracted) + require.Equal(t, false, !snap2.(*immutableRef).getBlobOnly()) - size, err := snap2.Size(ctx) + size, err := snap2.(*immutableRef).size(ctx) require.NoError(t, err) require.Equal(t, int64(len(b2)), size) @@ -549,8 +549,8 @@ func TestExtractOnMutable(t *testing.T) { err = snap2.Extract(ctx, nil) require.NoError(t, err) - require.Equal(t, true, snap.Info().Extracted) - require.Equal(t, true, snap2.Info().Extracted) + require.Equal(t, true, !snap.(*immutableRef).getBlobOnly()) + require.Equal(t, true, !snap2.(*immutableRef).getBlobOnly()) buf := pruneResultBuffer() err = cm.Prune(ctx, buf.C, client.PruneInfo{}) @@ -617,12 +617,12 @@ func TestSetBlob(t *testing.T) { snap, err := active.Commit(ctx) require.NoError(t, err) - info := snap.Info() - require.Equal(t, "", string(info.DiffID)) - require.Equal(t, "", string(info.Blob)) - require.Equal(t, "", string(info.ChainID)) - require.Equal(t, "", string(info.BlobChainID)) - require.Equal(t, info.Extracted, true) + snapRef := snap.(*immutableRef) + require.Equal(t, "", string(snapRef.getDiffID())) + require.Equal(t, "", string(snapRef.getBlob())) + require.Equal(t, "", string(snapRef.getChainID())) + require.Equal(t, "", string(snapRef.getBlobChainID())) + require.Equal(t, !snapRef.getBlobOnly(), true) ctx, clean, err := leaseutil.WithLease(ctx, co.lm) require.NoError(t, err) @@ -651,14 +651,14 @@ func TestSetBlob(t *testing.T) { err = snap.(*immutableRef).setChains(ctx) require.NoError(t, err) - info = snap.Info() - require.Equal(t, desc.Annotations["containerd.io/uncompressed"], string(info.DiffID)) - require.Equal(t, desc.Digest, info.Blob) - require.Equal(t, desc.MediaType, info.MediaType) - require.Equal(t, info.DiffID, info.ChainID) - require.Equal(t, digest.FromBytes([]byte(desc.Digest+" "+info.DiffID)), info.BlobChainID) - require.Equal(t, snap.ID(), info.SnapshotID) - require.Equal(t, info.Extracted, true) + snapRef = snap.(*immutableRef) + require.Equal(t, desc.Annotations["containerd.io/uncompressed"], string(snapRef.getDiffID())) + require.Equal(t, desc.Digest, snapRef.getBlob()) + require.Equal(t, desc.MediaType, snapRef.getMediaType()) + require.Equal(t, snapRef.getDiffID(), snapRef.getChainID()) + require.Equal(t, digest.FromBytes([]byte(desc.Digest+" "+snapRef.getDiffID())), snapRef.getBlobChainID()) + require.Equal(t, snap.ID(), snapRef.getSnapshotID()) + require.Equal(t, !snapRef.getBlobOnly(), true) active, err = cm.New(ctx, snap, nil) require.NoError(t, err) @@ -681,14 +681,14 @@ func TestSetBlob(t *testing.T) { err = snap2.(*immutableRef).setChains(ctx) require.NoError(t, err) - info2 := snap2.Info() - require.Equal(t, desc2.Annotations["containerd.io/uncompressed"], string(info2.DiffID)) - require.Equal(t, desc2.Digest, info2.Blob) - require.Equal(t, desc2.MediaType, info2.MediaType) - require.Equal(t, digest.FromBytes([]byte(info.ChainID+" "+info2.DiffID)), info2.ChainID) - require.Equal(t, digest.FromBytes([]byte(info.BlobChainID+" "+digest.FromBytes([]byte(desc2.Digest+" "+info2.DiffID)))), info2.BlobChainID) - require.Equal(t, snap2.ID(), info2.SnapshotID) - require.Equal(t, info2.Extracted, true) + snapRef2 := snap2.(*immutableRef) + require.Equal(t, desc2.Annotations["containerd.io/uncompressed"], string(snapRef2.getDiffID())) + require.Equal(t, desc2.Digest, snapRef2.getBlob()) + require.Equal(t, desc2.MediaType, snapRef2.getMediaType()) + require.Equal(t, digest.FromBytes([]byte(snapRef.getChainID()+" "+snapRef2.getDiffID())), snapRef2.getChainID()) + require.Equal(t, digest.FromBytes([]byte(snapRef.getBlobChainID()+" "+digest.FromBytes([]byte(desc2.Digest+" "+snapRef2.getDiffID())))), snapRef2.getBlobChainID()) + require.Equal(t, snap2.ID(), snapRef2.getSnapshotID()) + require.Equal(t, !snapRef2.getBlobOnly(), true) b3, desc3, err := mapToBlob(map[string]string{"foo3": "bar3"}, true) require.NoError(t, err) @@ -699,14 +699,14 @@ func TestSetBlob(t *testing.T) { snap3, err := cm.GetByBlob(ctx, desc3, snap) require.NoError(t, err) - info3 := snap3.Info() - require.Equal(t, desc3.Annotations["containerd.io/uncompressed"], string(info3.DiffID)) - require.Equal(t, desc3.Digest, info3.Blob) - require.Equal(t, desc3.MediaType, info3.MediaType) - require.Equal(t, digest.FromBytes([]byte(info.ChainID+" "+info3.DiffID)), info3.ChainID) - require.Equal(t, digest.FromBytes([]byte(info.BlobChainID+" "+digest.FromBytes([]byte(desc3.Digest+" "+info3.DiffID)))), info3.BlobChainID) - require.Equal(t, string(info3.ChainID), info3.SnapshotID) - require.Equal(t, info3.Extracted, false) + snapRef3 := snap3.(*immutableRef) + require.Equal(t, desc3.Annotations["containerd.io/uncompressed"], string(snapRef3.getDiffID())) + require.Equal(t, desc3.Digest, snapRef3.getBlob()) + require.Equal(t, desc3.MediaType, snapRef3.getMediaType()) + require.Equal(t, digest.FromBytes([]byte(snapRef.getChainID()+" "+snapRef3.getDiffID())), snapRef3.getChainID()) + require.Equal(t, digest.FromBytes([]byte(snapRef.getBlobChainID()+" "+digest.FromBytes([]byte(desc3.Digest+" "+snapRef3.getDiffID())))), snapRef3.getBlobChainID()) + require.Equal(t, string(snapRef3.getChainID()), snapRef3.getSnapshotID()) + require.Equal(t, !snapRef3.getBlobOnly(), false) // snap4 is same as snap2 snap4, err := cm.GetByBlob(ctx, desc2, snap) @@ -718,7 +718,7 @@ func TestSetBlob(t *testing.T) { b5, desc5, err := mapToBlob(map[string]string{"foo5": "bar5"}, true) require.NoError(t, err) - desc5.Annotations["containerd.io/uncompressed"] = info2.DiffID.String() + desc5.Annotations["containerd.io/uncompressed"] = snapRef2.getDiffID().String() err = content.WriteBlob(ctx, co.cs, "ref5", bytes.NewBuffer(b5), desc5) require.NoError(t, err) @@ -726,14 +726,15 @@ func TestSetBlob(t *testing.T) { snap5, err := cm.GetByBlob(ctx, desc5, snap) require.NoError(t, err) + snapRef5 := snap5.(*immutableRef) require.NotEqual(t, snap2.ID(), snap5.ID()) - require.Equal(t, snap2.Info().SnapshotID, snap5.Info().SnapshotID) - require.Equal(t, info2.DiffID, snap5.Info().DiffID) - require.Equal(t, desc5.Digest, snap5.Info().Blob) + require.Equal(t, snapRef2.getSnapshotID(), snapRef5.getSnapshotID()) + require.Equal(t, snapRef2.getDiffID(), snapRef5.getDiffID()) + require.Equal(t, desc5.Digest, snapRef5.getBlob()) - require.Equal(t, snap2.Info().ChainID, snap5.Info().ChainID) - require.NotEqual(t, snap2.Info().BlobChainID, snap5.Info().BlobChainID) - require.Equal(t, digest.FromBytes([]byte(info.BlobChainID+" "+digest.FromBytes([]byte(desc5.Digest+" "+info2.DiffID)))), snap5.Info().BlobChainID) + require.Equal(t, snapRef2.getChainID(), snapRef5.getChainID()) + require.NotEqual(t, snapRef2.getBlobChainID(), snapRef5.getBlobChainID()) + require.Equal(t, digest.FromBytes([]byte(snapRef.getBlobChainID()+" "+digest.FromBytes([]byte(desc5.Digest+" "+snapRef2.getDiffID())))), snapRef5.getBlobChainID()) // snap6 is a child of snap3 b6, desc6, err := mapToBlob(map[string]string{"foo6": "bar6"}, true) @@ -745,13 +746,13 @@ func TestSetBlob(t *testing.T) { snap6, err := cm.GetByBlob(ctx, desc6, snap3) require.NoError(t, err) - info6 := snap6.Info() - require.Equal(t, desc6.Annotations["containerd.io/uncompressed"], string(info6.DiffID)) - require.Equal(t, desc6.Digest, info6.Blob) - require.Equal(t, digest.FromBytes([]byte(snap3.Info().ChainID+" "+info6.DiffID)), info6.ChainID) - require.Equal(t, digest.FromBytes([]byte(info3.BlobChainID+" "+digest.FromBytes([]byte(info6.Blob+" "+info6.DiffID)))), info6.BlobChainID) - require.Equal(t, string(info6.ChainID), info6.SnapshotID) - require.Equal(t, info6.Extracted, false) + snapRef6 := snap6.(*immutableRef) + require.Equal(t, desc6.Annotations["containerd.io/uncompressed"], string(snapRef6.getDiffID())) + require.Equal(t, desc6.Digest, snapRef6.getBlob()) + require.Equal(t, digest.FromBytes([]byte(snapRef3.getChainID()+" "+snapRef6.getDiffID())), snapRef6.getChainID()) + require.Equal(t, digest.FromBytes([]byte(snapRef3.getBlobChainID()+" "+digest.FromBytes([]byte(snapRef6.getBlob()+" "+snapRef6.getDiffID())))), snapRef6.getBlobChainID()) + require.Equal(t, string(snapRef6.getChainID()), snapRef6.getSnapshotID()) + require.Equal(t, !snapRef6.getBlobOnly(), false) _, err = cm.GetByBlob(ctx, ocispecs.Descriptor{ Digest: digest.FromBytes([]byte("notexist")), @@ -1185,7 +1186,7 @@ func TestGetRemote(t *testing.T) { r := refChain[i] if compressionType == compression.EStargz { - if digest.Digest(getBlob(r.md)) == desc.Digest { + if digest.Digest(r.getBlob()) == desc.Digest { esgzRefsMu.Lock() esgzRefs[desc.Digest] = struct{}{} esgzRefsMu.Unlock() diff --git a/cache/metadata.go b/cache/metadata.go index a5c7df7f4cf0..84b634ee04de 100644 --- a/cache/metadata.go +++ b/cache/metadata.go @@ -1,10 +1,13 @@ package cache import ( + "context" "time" "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" + "github.com/moby/buildkit/util/bklog" + digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" bolt "go.etcd.io/bbolt" ) @@ -29,446 +32,323 @@ const keySnapshot = "cache.snapshot" const keyBlobOnly = "cache.blobonly" const keyMediaType = "cache.mediatype" const keyImageRefs = "cache.imageRefs" +const keyDeleted = "cache.deleted" +const keyBlobSize = "cache.blobsize" // the packed blob size as specified in the oci descriptor -// BlobSize is the packed blob size as specified in the oci descriptor -const keyBlobSize = "cache.blobsize" +// Indexes +const blobchainIndex = "blobchainid:" +const chainIndex = "chainid:" -const keyDeleted = "cache.deleted" +type MetadataStore interface { + Search(context.Context, string) ([]RefMetadata, error) +} -func queueDiffID(si *metadata.StorageItem, str string) error { - if str == "" { - return nil - } - v, err := metadata.NewValue(str) - if err != nil { - return errors.Wrap(err, "failed to create diffID value") - } - si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, keyDiffID, v) - }) - return nil +type RefMetadata interface { + ID() string + + GetDescription() string + SetDescription(string) error + + GetCreatedAt() time.Time + SetCreatedAt(time.Time) error + + HasCachePolicyDefault() bool + SetCachePolicyDefault() error + HasCachePolicyRetain() bool + SetCachePolicyRetain() error + + GetLayerType() string + SetLayerType(string) error + + GetRecordType() client.UsageRecordType + SetRecordType(client.UsageRecordType) error + + GetEqualMutable() (RefMetadata, bool) + + // generic getters/setters for external packages + GetString(string) string + SetString(key, val, index string) error + + GetExternal(string) ([]byte, error) + SetExternal(string, []byte) error + + ClearValueAndIndex(string, string) error } -func getMediaType(si *metadata.StorageItem) string { - v := si.Get(keyMediaType) - if v == nil { - return si.ID() - } - var str string - if err := v.Unmarshal(&str); err != nil { - return "" - } - return str +func (cm *cacheManager) Search(ctx context.Context, idx string) ([]RefMetadata, error) { + cm.mu.Lock() + defer cm.mu.Unlock() + return cm.search(ctx, idx) } -func queueMediaType(si *metadata.StorageItem, str string) error { - if str == "" { - return nil - } - v, err := metadata.NewValue(str) +// callers must hold cm.mu lock +func (cm *cacheManager) search(ctx context.Context, idx string) ([]RefMetadata, error) { + sis, err := cm.MetadataStore.Search(idx) if err != nil { - return errors.Wrap(err, "failed to create mediaType value") + return nil, err + } + var mds []RefMetadata + for _, si := range sis { + // calling getMetadata ensures we return the same storage item object that's cached in memory + md, ok := cm.getMetadata(si.ID()) + if !ok { + bklog.G(ctx).Warnf("missing metadata for storage item %q during search for %q", si.ID(), idx) + continue + } + if md.getDeleted() { + continue + } + mds = append(mds, md) } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyMediaType, v) - }) - return nil + return mds, nil } -func getSnapshotID(si *metadata.StorageItem) string { - v := si.Get(keySnapshot) - if v == nil { - return si.ID() +// callers must hold cm.mu lock +func (cm *cacheManager) getMetadata(id string) (*cacheMetadata, bool) { + if rec, ok := cm.records[id]; ok { + return rec.cacheMetadata, true } - var str string - if err := v.Unmarshal(&str); err != nil { - return "" - } - return str + si, ok := cm.MetadataStore.Get(id) + md := &cacheMetadata{si} + return md, ok } -func queueSnapshotID(si *metadata.StorageItem, str string) error { - if str == "" { - return nil - } - v, err := metadata.NewValue(str) - if err != nil { - return errors.Wrap(err, "failed to create chainID value") - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keySnapshot, v) - }) - return nil +// callers must hold cm.mu lock +func (cm *cacheManager) searchBlobchain(ctx context.Context, id digest.Digest) ([]RefMetadata, error) { + return cm.search(ctx, blobchainIndex+id.String()) } -func getDiffID(si *metadata.StorageItem) string { - v := si.Get(keyDiffID) - if v == nil { - return "" - } - var str string - if err := v.Unmarshal(&str); err != nil { - return "" - } - return str +// callers must hold cm.mu lock +func (cm *cacheManager) searchChain(ctx context.Context, id digest.Digest) ([]RefMetadata, error) { + return cm.search(ctx, chainIndex+id.String()) } -func queueChainID(si *metadata.StorageItem, str string) error { - if str == "" { - return nil - } - v, err := metadata.NewValue(str) - if err != nil { - return errors.Wrap(err, "failed to create chainID value") - } - v.Index = "chainid:" + str - si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, keyChainID, v) - }) - return nil +type cacheMetadata struct { + si *metadata.StorageItem } -func getBlobChainID(si *metadata.StorageItem) string { - v := si.Get(keyBlobChainID) - if v == nil { - return "" - } - var str string - if err := v.Unmarshal(&str); err != nil { - return "" - } - return str +func (md *cacheMetadata) ID() string { + return md.si.ID() } -func queueBlobChainID(si *metadata.StorageItem, str string) error { - if str == "" { - return nil - } - v, err := metadata.NewValue(str) - if err != nil { - return errors.Wrap(err, "failed to create chainID value") - } - v.Index = "blobchainid:" + str - si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, keyBlobChainID, v) - }) - return nil +func (md *cacheMetadata) commitMetadata() error { + return md.si.Commit() } -func getChainID(si *metadata.StorageItem) string { - v := si.Get(keyChainID) - if v == nil { - return "" - } - var str string - if err := v.Unmarshal(&str); err != nil { - return "" - } - return str +func (md *cacheMetadata) GetDescription() string { + return md.GetString(keyDescription) } -func queueBlob(si *metadata.StorageItem, str string) error { - if str == "" { - return nil - } - v, err := metadata.NewValue(str) - if err != nil { - return errors.Wrap(err, "failed to create blob value") - } - si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, keyBlob, v) - }) - return nil +func (md *cacheMetadata) SetDescription(descr string) error { + return md.setValue(keyDescription, descr, "") } -func getBlob(si *metadata.StorageItem) string { - v := si.Get(keyBlob) - if v == nil { - return "" - } - var str string - if err := v.Unmarshal(&str); err != nil { - return "" - } - return str +func (md *cacheMetadata) queueDescription(descr string) error { + return md.queueValue(keyDescription, descr, "") } -func queueBlobOnly(si *metadata.StorageItem, b bool) error { - v, err := metadata.NewValue(b) - if err != nil { - return errors.Wrap(err, "failed to create blobonly value") - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyBlobOnly, v) - }) - return nil +func (md *cacheMetadata) queueCommitted(b bool) error { + return md.queueValue(keyCommitted, b, "") } -func getBlobOnly(si *metadata.StorageItem) bool { - v := si.Get(keyBlobOnly) - if v == nil { - return false - } - var blobOnly bool - if err := v.Unmarshal(&blobOnly); err != nil { - return false - } - return blobOnly +func (md *cacheMetadata) getCommitted() bool { + return md.getBool(keyCommitted) } -func setDeleted(si *metadata.StorageItem) error { - v, err := metadata.NewValue(true) - if err != nil { - return errors.Wrap(err, "failed to create deleted value") - } - si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, keyDeleted, v) - }) - return nil +func (md *cacheMetadata) GetLayerType() string { + return md.GetString(keyLayerType) } -func getDeleted(si *metadata.StorageItem) bool { - v := si.Get(keyDeleted) - if v == nil { - return false - } - var deleted bool - if err := v.Unmarshal(&deleted); err != nil { - return false - } - return deleted +func (md *cacheMetadata) SetLayerType(value string) error { + return md.setValue(keyLayerType, value, "") } -func queueCommitted(si *metadata.StorageItem) error { - v, err := metadata.NewValue(true) - if err != nil { - return errors.Wrap(err, "failed to create committed value") - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyCommitted, v) - }) - return nil +func (md *cacheMetadata) GetRecordType() client.UsageRecordType { + return client.UsageRecordType(md.GetString(keyRecordType)) } -func getCommitted(si *metadata.StorageItem) bool { - v := si.Get(keyCommitted) - if v == nil { - return false - } - var committed bool - if err := v.Unmarshal(&committed); err != nil { - return false - } - return committed +func (md *cacheMetadata) SetRecordType(value client.UsageRecordType) error { + return md.setValue(keyRecordType, value, "") } -func queueParent(si *metadata.StorageItem, parent string) error { - if parent == "" { - return nil - } - v, err := metadata.NewValue(parent) - if err != nil { - return errors.Wrap(err, "failed to create parent value") - } - si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, keyParent, v) - }) - return nil +func (md *cacheMetadata) queueRecordType(value client.UsageRecordType) error { + return md.queueValue(keyRecordType, value, "") } -func getParent(si *metadata.StorageItem) string { - v := si.Get(keyParent) - if v == nil { - return "" - } - var parent string - if err := v.Unmarshal(&parent); err != nil { - return "" - } - return parent +func (md *cacheMetadata) SetCreatedAt(tm time.Time) error { + return md.setTime(keyCreatedAt, tm, "") } -func setSize(si *metadata.StorageItem, s int64) error { - v, err := metadata.NewValue(s) - if err != nil { - return errors.Wrap(err, "failed to create size value") - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keySize, v) - }) - return nil +func (md *cacheMetadata) queueCreatedAt(tm time.Time) error { + return md.queueTime(keyCreatedAt, tm, "") } -func getSize(si *metadata.StorageItem) int64 { - v := si.Get(keySize) - if v == nil { - return sizeUnknown - } - var size int64 - if err := v.Unmarshal(&size); err != nil { - return sizeUnknown - } - return size +func (md *cacheMetadata) GetCreatedAt() time.Time { + return md.getTime(keyCreatedAt) } -func appendImageRef(si *metadata.StorageItem, s string) error { - return si.GetAndSetValue(keyImageRefs, func(v *metadata.Value) (*metadata.Value, error) { - var imageRefs []string - if v != nil { - if err := v.Unmarshal(&imageRefs); err != nil { - return nil, err - } - } - for _, existing := range imageRefs { - if existing == s { - return nil, metadata.ErrSkipSetValue - } - } - imageRefs = append(imageRefs, s) - v, err := metadata.NewValue(imageRefs) - if err != nil { - return nil, errors.Wrap(err, "failed to create imageRefs value") - } - return v, nil - }) +func (md *cacheMetadata) HasCachePolicyDefault() bool { + return md.getCachePolicy() == cachePolicyDefault } -func getImageRefs(si *metadata.StorageItem) []string { - v := si.Get(keyImageRefs) - if v == nil { - return nil - } - var refs []string - if err := v.Unmarshal(&refs); err != nil { - return nil - } - return refs +func (md *cacheMetadata) SetCachePolicyDefault() error { + return md.setCachePolicy(cachePolicyDefault) } -func queueBlobSize(si *metadata.StorageItem, s int64) error { - v, err := metadata.NewValue(s) - if err != nil { - return errors.Wrap(err, "failed to create blobsize value") - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyBlobSize, v) - }) - return nil +func (md *cacheMetadata) HasCachePolicyRetain() bool { + return md.getCachePolicy() == cachePolicyRetain } -func getBlobSize(si *metadata.StorageItem) int64 { - v := si.Get(keyBlobSize) - if v == nil { - return sizeUnknown - } - var size int64 - if err := v.Unmarshal(&size); err != nil { - return sizeUnknown - } - return size +func (md *cacheMetadata) SetCachePolicyRetain() error { + return md.setCachePolicy(cachePolicyRetain) } -func getEqualMutable(si *metadata.StorageItem) string { - v := si.Get(keyEqualMutable) - if v == nil { - return "" - } - var str string - if err := v.Unmarshal(&str); err != nil { - return "" - } - return str +func (md *cacheMetadata) GetExternal(s string) ([]byte, error) { + return md.si.GetExternal(s) } -func setEqualMutable(si *metadata.StorageItem, s string) error { - v, err := metadata.NewValue(s) - if err != nil { - return errors.Wrapf(err, "failed to create %s meta value", keyEqualMutable) +func (md *cacheMetadata) SetExternal(s string, dt []byte) error { + return md.si.SetExternal(s, dt) +} + +func (md *cacheMetadata) GetEqualMutable() (RefMetadata, bool) { + emSi, ok := md.si.Storage().Get(md.getEqualMutable()) + if !ok { + return nil, false } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyEqualMutable, v) - }) - return nil + return &cacheMetadata{emSi}, true } -func clearEqualMutable(si *metadata.StorageItem) error { - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyEqualMutable, nil) - }) - return nil +func (md *cacheMetadata) getEqualMutable() string { + return md.GetString(keyEqualMutable) } -func queueCachePolicy(si *metadata.StorageItem, p cachePolicy) error { - v, err := metadata.NewValue(p) - if err != nil { - return errors.Wrap(err, "failed to create cachePolicy value") - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyCachePolicy, v) +func (md *cacheMetadata) setEqualMutable(s string) error { + return md.queueValue(keyEqualMutable, s, "") +} + +func (md *cacheMetadata) clearEqualMutable() error { + md.si.Queue(func(b *bolt.Bucket) error { + return md.si.SetValue(b, keyEqualMutable, nil) }) return nil } -func getCachePolicy(si *metadata.StorageItem) cachePolicy { - v := si.Get(keyCachePolicy) - if v == nil { - return cachePolicyDefault - } - var p cachePolicy - if err := v.Unmarshal(&p); err != nil { - return cachePolicyDefault - } - return p +func (md *cacheMetadata) queueDiffID(str digest.Digest) error { + return md.queueValue(keyDiffID, str, "") } -func queueDescription(si *metadata.StorageItem, descr string) error { - v, err := metadata.NewValue(descr) - if err != nil { - return errors.Wrap(err, "failed to create description value") - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyDescription, v) - }) - return nil +func (md *cacheMetadata) getMediaType() string { + return md.GetString(keyMediaType) } -func GetDescription(si *metadata.StorageItem) string { - v := si.Get(keyDescription) - if v == nil { - return "" - } - var str string - if err := v.Unmarshal(&str); err != nil { - return "" - } - return str +func (md *cacheMetadata) queueMediaType(str string) error { + return md.queueValue(keyMediaType, str, "") } -func queueCreatedAt(si *metadata.StorageItem, tm time.Time) error { - v, err := metadata.NewValue(tm.UnixNano()) - if err != nil { - return errors.Wrap(err, "failed to create createdAt value") +func (md *cacheMetadata) getSnapshotID() string { + return md.GetString(keySnapshot) +} + +func (md *cacheMetadata) queueSnapshotID(str string) error { + return md.queueValue(keySnapshot, str, "") +} + +func (md *cacheMetadata) getDiffID() digest.Digest { + return digest.Digest(md.GetString(keyDiffID)) +} + +func (md *cacheMetadata) queueChainID(str digest.Digest) error { + return md.queueValue(keyChainID, str, chainIndex+str.String()) +} + +func (md *cacheMetadata) getBlobChainID() digest.Digest { + return digest.Digest(md.GetString(keyBlobChainID)) +} + +func (md *cacheMetadata) queueBlobChainID(str digest.Digest) error { + return md.queueValue(keyBlobChainID, str, blobchainIndex+str.String()) +} + +func (md *cacheMetadata) getChainID() digest.Digest { + return digest.Digest(md.GetString(keyChainID)) +} + +func (md *cacheMetadata) queueBlob(str digest.Digest) error { + return md.queueValue(keyBlob, str, "") +} + +func (md *cacheMetadata) getBlob() digest.Digest { + return digest.Digest(md.GetString(keyBlob)) +} + +func (md *cacheMetadata) queueBlobOnly(b bool) error { + return md.queueValue(keyBlobOnly, b, "") +} + +func (md *cacheMetadata) getBlobOnly() bool { + return md.getBool(keyBlobOnly) +} + +func (md *cacheMetadata) queueDeleted() error { + return md.queueValue(keyDeleted, true, "") +} + +func (md *cacheMetadata) getDeleted() bool { + return md.getBool(keyDeleted) +} + +func (md *cacheMetadata) queueParent(parent string) error { + return md.queueValue(keyParent, parent, "") +} + +func (md *cacheMetadata) getParent() string { + return md.GetString(keyParent) +} + +func (md *cacheMetadata) queueSize(s int64) error { + return md.queueValue(keySize, s, "") +} + +func (md *cacheMetadata) getSize() int64 { + if size, ok := md.getInt64(keySize); ok { + return size } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyCreatedAt, v) - }) - return nil + return sizeUnknown } -func GetCreatedAt(si *metadata.StorageItem) time.Time { - v := si.Get(keyCreatedAt) - if v == nil { - return time.Time{} +func (md *cacheMetadata) appendImageRef(s string) error { + return md.appendStringSlice(keyImageRefs, s) +} + +func (md *cacheMetadata) getImageRefs() []string { + return md.getStringSlice(keyImageRefs) +} + +func (md *cacheMetadata) queueBlobSize(s int64) error { + return md.queueValue(keyBlobSize, s, "") +} + +func (md *cacheMetadata) getBlobSize() int64 { + if size, ok := md.getInt64(keyBlobSize); ok { + return size } - var tm int64 - if err := v.Unmarshal(&tm); err != nil { - return time.Time{} + return sizeUnknown +} + +func (md *cacheMetadata) setCachePolicy(p cachePolicy) error { + return md.setValue(keyCachePolicy, p, "") +} + +func (md *cacheMetadata) getCachePolicy() cachePolicy { + if i, ok := md.getInt64(keyCachePolicy); ok { + return cachePolicy(i) } - return time.Unix(tm/1e9, tm%1e9) + return cachePolicyDefault } -func getLastUsed(si *metadata.StorageItem) (int, *time.Time) { - v := si.Get(keyUsageCount) +func (md *cacheMetadata) getLastUsed() (int, *time.Time) { + v := md.si.Get(keyUsageCount) if v == nil { return 0, nil } @@ -476,7 +356,7 @@ func getLastUsed(si *metadata.StorageItem) (int, *time.Time) { if err := v.Unmarshal(&usageCount); err != nil { return 0, nil } - v = si.Get(keyLastUsedAt) + v = md.si.Get(keyLastUsedAt) if v == nil { return usageCount, nil } @@ -488,8 +368,8 @@ func getLastUsed(si *metadata.StorageItem) (int, *time.Time) { return usageCount, &tm } -func updateLastUsed(si *metadata.StorageItem) error { - count, _ := getLastUsed(si) +func (md *cacheMetadata) updateLastUsed() error { + count, _ := md.getLastUsed() count++ v, err := metadata.NewValue(count) @@ -500,27 +380,57 @@ func updateLastUsed(si *metadata.StorageItem) error { if err != nil { return errors.Wrap(err, "failed to create lastUsedAt value") } - return si.Update(func(b *bolt.Bucket) error { - if err := si.SetValue(b, keyUsageCount, v); err != nil { + return md.si.Update(func(b *bolt.Bucket) error { + if err := md.si.SetValue(b, keyUsageCount, v); err != nil { return err } - return si.SetValue(b, keyLastUsedAt, v2) + return md.si.SetValue(b, keyLastUsedAt, v2) + }) +} + +func (md *cacheMetadata) queueValue(key string, value interface{}, index string) error { + v, err := metadata.NewValue(value) + if err != nil { + return errors.Wrap(err, "failed to create value") + } + v.Index = index + md.si.Queue(func(b *bolt.Bucket) error { + return md.si.SetValue(b, key, v) }) + return nil +} + +func (md *cacheMetadata) SetString(key, value string, index string) error { + return md.setValue(key, value, index) } -func SetLayerType(m withMetadata, value string) error { +func (md *cacheMetadata) setValue(key string, value interface{}, index string) error { v, err := metadata.NewValue(value) if err != nil { - return errors.Wrap(err, "failed to create layertype value") + return errors.Wrap(err, "failed to create value") } - m.Metadata().Queue(func(b *bolt.Bucket) error { - return m.Metadata().SetValue(b, keyLayerType, v) + v.Index = index + return md.si.Update(func(b *bolt.Bucket) error { + return md.si.SetValue(b, key, v) }) - return m.Metadata().Commit() } -func GetLayerType(m withMetadata) string { - v := m.Metadata().Get(keyLayerType) +func (md *cacheMetadata) ClearValueAndIndex(key string, index string) error { + currentVal := md.GetString(key) + return md.si.Update(func(b *bolt.Bucket) error { + if err := md.si.SetValue(b, key, nil); err != nil { + return err + } + if currentVal != "" { + // force clearing index, see #1836 https://github.com/moby/buildkit/pull/1836 + return md.si.ClearIndex(b.Tx(), index+currentVal) + } + return nil + }) +} + +func (md *cacheMetadata) GetString(key string) string { + v := md.si.Get(key) if v == nil { return "" } @@ -531,32 +441,80 @@ func GetLayerType(m withMetadata) string { return str } -func GetRecordType(m withMetadata) client.UsageRecordType { - v := m.Metadata().Get(keyRecordType) +func (md *cacheMetadata) setTime(key string, value time.Time, index string) error { + return md.setValue(key, value.UnixNano(), index) +} + +func (md *cacheMetadata) queueTime(key string, value time.Time, index string) error { + return md.queueValue(key, value.UnixNano(), index) +} + +func (md *cacheMetadata) getTime(key string) time.Time { + v := md.si.Get(key) if v == nil { - return "" + return time.Time{} } - var str string - if err := v.Unmarshal(&str); err != nil { - return "" + var tm int64 + if err := v.Unmarshal(&tm); err != nil { + return time.Time{} } - return client.UsageRecordType(str) + return time.Unix(tm/1e9, tm%1e9) } -func SetRecordType(m withMetadata, value client.UsageRecordType) error { - if err := queueRecordType(m.Metadata(), value); err != nil { - return err +func (md *cacheMetadata) getBool(key string) bool { + v := md.si.Get(key) + if v == nil { + return false + } + var b bool + if err := v.Unmarshal(&b); err != nil { + return false } - return m.Metadata().Commit() + return b } -func queueRecordType(si *metadata.StorageItem, value client.UsageRecordType) error { - v, err := metadata.NewValue(value) - if err != nil { - return errors.Wrap(err, "failed to create recordtype value") +func (md *cacheMetadata) getInt64(key string) (int64, bool) { + v := md.si.Get(key) + if v == nil { + return 0, false + } + var i int64 + if err := v.Unmarshal(&i); err != nil { + return 0, false } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyRecordType, v) + return i, true +} + +func (md *cacheMetadata) appendStringSlice(key string, value string) error { + return md.si.GetAndSetValue(key, func(v *metadata.Value) (*metadata.Value, error) { + var slice []string + if v != nil { + if err := v.Unmarshal(&slice); err != nil { + return nil, err + } + } + for _, existing := range slice { + if existing == value { + return nil, metadata.ErrSkipSetValue + } + } + slice = append(slice, value) + v, err := metadata.NewValue(slice) + if err != nil { + return nil, err + } + return v, nil }) - return nil +} + +func (md *cacheMetadata) getStringSlice(key string) []string { + v := md.si.Get(key) + if v == nil { + return nil + } + var s []string + if err := v.Unmarshal(&s); err != nil { + return nil + } + return s } diff --git a/cache/metadata/metadata.go b/cache/metadata/metadata.go index ff76a0be5d61..75bdf4427111 100644 --- a/cache/metadata/metadata.go +++ b/cache/metadata/metadata.go @@ -345,15 +345,25 @@ func (s *StorageItem) SetValue(b *bolt.Bucket, key string, v *Value) error { return s.setValue(b, key, v) } +func (s *StorageItem) ClearIndex(tx *bolt.Tx, index string) error { + s.vmu.Lock() + defer s.vmu.Unlock() + return s.clearIndex(tx, index) +} + +func (s *StorageItem) clearIndex(tx *bolt.Tx, index string) error { + b, err := tx.CreateBucketIfNotExists([]byte(indexBucket)) + if err != nil { + return errors.WithStack(err) + } + return b.Delete([]byte(indexKey(index, s.ID()))) +} + func (s *StorageItem) setValue(b *bolt.Bucket, key string, v *Value) error { if v == nil { if old, ok := s.values[key]; ok { if old.Index != "" { - b, err := b.Tx().CreateBucketIfNotExists([]byte(indexBucket)) - if err != nil { - return errors.WithStack(err) - } - b.Delete([]byte(indexKey(old.Index, s.ID()))) // ignore error + s.clearIndex(b.Tx(), old.Index) // ignore error } } if err := b.Put([]byte(key), nil); err != nil { diff --git a/cache/migrate_v2.go b/cache/migrate_v2.go index f75c9f98962b..5bb90bb3641e 100644 --- a/cache/migrate_v2.go +++ b/cache/migrate_v2.go @@ -19,16 +19,17 @@ import ( ) func migrateChainID(si *metadata.StorageItem, all map[string]*metadata.StorageItem) (digest.Digest, digest.Digest, error) { - diffID := digest.Digest(getDiffID(si)) + md := &cacheMetadata{si} + diffID := md.getDiffID() if diffID == "" { return "", "", nil } - blobID := digest.Digest(getBlob(si)) + blobID := md.getBlob() if blobID == "" { return "", "", nil } - chainID := digest.Digest(getChainID(si)) - blobChainID := digest.Digest(getBlobChainID(si)) + chainID := md.getChainID() + blobChainID := md.getBlobChainID() if chainID != "" && blobChainID != "" { return chainID, blobChainID, nil @@ -37,7 +38,7 @@ func migrateChainID(si *metadata.StorageItem, all map[string]*metadata.StorageIt chainID = diffID blobChainID = digest.FromBytes([]byte(blobID + " " + diffID)) - parent := getParent(si) + parent := md.getParent() if parent != "" { pChainID, pBlobChainID, err := migrateChainID(all[parent], all) if err != nil { @@ -47,10 +48,10 @@ func migrateChainID(si *metadata.StorageItem, all map[string]*metadata.StorageIt blobChainID = digest.FromBytes([]byte(pBlobChainID + " " + blobChainID)) } - queueChainID(si, chainID.String()) - queueBlobChainID(si, blobChainID.String()) + md.queueChainID(chainID) + md.queueBlobChainID(blobChainID) - return chainID, blobChainID, si.Commit() + return chainID, blobChainID, md.commitMetadata() } func MigrateV2(ctx context.Context, from, to string, cs content.Store, s snapshot.Snapshotter, lm leases.Manager) error { @@ -105,31 +106,34 @@ func MigrateV2(ctx context.Context, from, to string, cs content.Store, s snapsho // add committed, parent, snapshot for id, item := range byID { - em := getEqualMutable(item) + md := &cacheMetadata{item} + em := md.getEqualMutable() if em == "" { info, err := s.Stat(ctx, id) if err != nil { return err } if info.Kind == snapshots.KindCommitted { - queueCommitted(item) + md.queueCommitted(true) } if info.Parent != "" { - queueParent(item, info.Parent) + md.queueParent(info.Parent) } } else { - queueCommitted(item) + md.queueCommitted(true) } - queueSnapshotID(item, id) - item.Commit() + md.queueSnapshotID(id) + md.commitMetadata() } for _, item := range byID { - em := getEqualMutable(item) + md := &cacheMetadata{item} + em := md.getEqualMutable() if em != "" { - if getParent(item) == "" { - queueParent(item, getParent(byID[em])) - item.Commit() + if md.getParent() == "" { + emMd := &cacheMetadata{byID[em]} + md.queueParent(emMd.getParent()) + md.commitMetadata() } } } @@ -151,10 +155,11 @@ func MigrateV2(ctx context.Context, from, to string, cs content.Store, s snapsho if _, err := cs.Info(ctx, digest.Digest(blob.Blobsum)); err != nil { continue } - queueDiffID(item, blob.DiffID) - queueBlob(item, blob.Blobsum) - queueMediaType(item, images.MediaTypeDockerSchema2LayerGzip) - if err := item.Commit(); err != nil { + md := &cacheMetadata{item} + md.queueDiffID(digest.Digest(blob.DiffID)) + md.queueBlob(digest.Digest(blob.Blobsum)) + md.queueMediaType(images.MediaTypeDockerSchema2LayerGzip) + if err := md.commitMetadata(); err != nil { return err } @@ -171,6 +176,7 @@ func MigrateV2(ctx context.Context, from, to string, cs content.Store, s snapsho // add new leases for _, item := range byID { + md := &cacheMetadata{item} l, err := lm.Create(ctx, func(l *leases.Lease) error { l.ID = item.ID() l.Labels = map[string]string{ @@ -187,15 +193,15 @@ func MigrateV2(ctx context.Context, from, to string, cs content.Store, s snapsho } if err := lm.AddResource(ctx, l, leases.Resource{ - ID: getSnapshotID(item), + ID: md.getSnapshotID(), Type: "snapshots/" + s.Name(), }); err != nil { return errors.Wrapf(err, "failed to add snapshot %s to lease", item.ID()) } - if blobID := getBlob(item); blobID != "" { + if blobID := md.getBlob(); blobID != "" { if err := lm.AddResource(ctx, l, leases.Resource{ - ID: blobID, + ID: string(blobID), Type: "content", }); err != nil { return errors.Wrapf(err, "failed to add blob %s to lease", item.ID()) @@ -205,17 +211,18 @@ func MigrateV2(ctx context.Context, from, to string, cs content.Store, s snapsho // remove old root labels for _, item := range byID { - em := getEqualMutable(item) + md := &cacheMetadata{item} + em := md.getEqualMutable() if em == "" { if _, err := s.Update(ctx, snapshots.Info{ - Name: getSnapshotID(item), + Name: md.getSnapshotID(), }, "labels.containerd.io/gc.root"); err != nil { if !errors.Is(err, errdefs.ErrNotFound) { return err } } - if blob := getBlob(item); blob != "" { + if blob := md.getBlob(); blob != "" { if _, err := cs.Update(ctx, content.Info{ Digest: digest.Digest(blob), }, "labels.containerd.io/gc.root"); err != nil { @@ -252,8 +259,9 @@ func MigrateV2(ctx context.Context, from, to string, cs content.Store, s snapsho } for _, item := range byID { - bklog.G(ctx).Infof("migrated %s parent:%q snapshot:%v committed:%v blob:%v diffid:%v chainID:%v blobChainID:%v", - item.ID(), getParent(item), getSnapshotID(item), getCommitted(item), getBlob(item), getDiffID(item), getChainID(item), getBlobChainID(item)) + md := &cacheMetadata{item} + bklog.G(ctx).Infof("migrated %s parent:%q snapshot:%v blob:%v diffid:%v chainID:%v blobChainID:%v", + item.ID(), md.getParent(), md.getSnapshotID(), md.getBlob(), md.getDiffID(), md.getChainID(), md.getBlobChainID()) } return nil diff --git a/cache/refs.go b/cache/refs.go index 54ad214d6a42..801e249507cb 100644 --- a/cache/refs.go +++ b/cache/refs.go @@ -13,7 +13,6 @@ import ( "github.com/containerd/containerd/mount" "github.com/containerd/containerd/snapshots" "github.com/docker/docker/pkg/idtools" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" @@ -32,10 +31,8 @@ import ( // Ref is a reference to cacheable objects. type Ref interface { Mountable - ID() string + RefMetadata Release(context.Context) error - Size(ctx context.Context) (int64, error) - Metadata() *metadata.StorageItem IdentityMapping() *idtools.IdentityMapping } @@ -44,21 +41,10 @@ type ImmutableRef interface { Parent() ImmutableRef Clone() ImmutableRef - Info() RefInfo Extract(ctx context.Context, s session.Group) error // +progress GetRemote(ctx context.Context, createIfNeeded bool, compressionType compression.Type, forceCompression bool, s session.Group) (*solver.Remote, error) } -type RefInfo struct { - SnapshotID string - ChainID digest.Digest - BlobChainID digest.Digest - DiffID digest.Digest - Blob digest.Digest - MediaType string - Extracted bool -} - type MutableRef interface { Ref Commit(context.Context) (ImmutableRef, error) @@ -69,7 +55,7 @@ type Mountable interface { } type ref interface { - updateLastUsed() bool + shouldUpdateLastUsed() bool } type cacheRecord struct { @@ -79,7 +65,7 @@ type cacheRecord struct { mutable bool refs map[ref]struct{} parent *immutableRef - md *metadata.StorageItem + *cacheMetadata // dead means record is marked as deleted dead bool @@ -122,7 +108,7 @@ func (cr *cacheRecord) parentChain() []digest.Digest { if cr.parentChainCache != nil { return cr.parentChainCache } - blob := getBlob(cr.md) + blob := cr.getBlob() if blob == "" { return nil } @@ -144,15 +130,15 @@ func (cr *cacheRecord) isDead() bool { } func (cr *cacheRecord) isLazy(ctx context.Context) (bool, error) { - if !getBlobOnly(cr.md) { + if !cr.getBlobOnly() { return false, nil } - dgst := getBlob(cr.md) + dgst := cr.getBlob() // special case for moby where there is no compressed blob (empty digest) if dgst == "" { return false, nil } - _, err := cr.cm.ContentStore.Info(ctx, digest.Digest(dgst)) + _, err := cr.cm.ContentStore.Info(ctx, dgst) if errors.Is(err, errdefs.ErrNotFound) { return true, nil } else if err != nil { @@ -160,7 +146,7 @@ func (cr *cacheRecord) isLazy(ctx context.Context) (bool, error) { } // If the snapshot is a remote snapshot, this layer is lazy. - if info, err := cr.cm.Snapshotter.Stat(ctx, getSnapshotID(cr.md)); err == nil { + if info, err := cr.cm.Snapshotter.Stat(ctx, cr.getSnapshotID()); err == nil { if _, ok := info.Labels["containerd.io/snapshot/remote"]; ok { return true, nil } @@ -173,22 +159,22 @@ func (cr *cacheRecord) IdentityMapping() *idtools.IdentityMapping { return cr.cm.IdentityMapping() } -func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { +func (cr *cacheRecord) size(ctx context.Context) (int64, error) { // this expects that usage() is implemented lazily s, err := cr.sizeG.Do(ctx, cr.ID(), func(ctx context.Context) (interface{}, error) { cr.mu.Lock() - s := getSize(cr.md) + s := cr.getSize() if s != sizeUnknown { cr.mu.Unlock() return s, nil } - driverID := getSnapshotID(cr.md) + driverID := cr.getSnapshotID() if cr.equalMutable != nil { - driverID = getSnapshotID(cr.equalMutable.md) + driverID = cr.equalMutable.getSnapshotID() } cr.mu.Unlock() var usage snapshots.Usage - if !getBlobOnly(cr.md) { + if !cr.getBlobOnly() { var err error usage, err = cr.cm.ManagerOpt.Snapshotter.Usage(ctx, driverID) if err != nil { @@ -203,7 +189,7 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { } } } - if dgst := getBlob(cr.md); dgst != "" { + if dgst := cr.getBlob(); dgst != "" { info, err := cr.cm.ContentStore.Info(ctx, digest.Digest(dgst)) if err == nil { usage.Size += info.Size @@ -224,8 +210,8 @@ func (cr *cacheRecord) Size(ctx context.Context) (int64, error) { } } cr.mu.Lock() - setSize(cr.md, usage.Size) - if err := cr.md.Commit(); err != nil { + cr.queueSize(usage.Size) + if err := cr.commitMetadata(); err != nil { cr.mu.Unlock() return s, err } @@ -251,7 +237,7 @@ func (cr *cacheRecord) parentRef(hidden bool, descHandlers DescHandlers) *immuta // must be called holding cacheRecord mu func (cr *cacheRecord) mount(ctx context.Context, readonly bool) (snapshot.Mountable, error) { if cr.mutable { - m, err := cr.cm.Snapshotter.Mounts(ctx, getSnapshotID(cr.md)) + m, err := cr.cm.Snapshotter.Mounts(ctx, cr.getSnapshotID()) if err != nil { return nil, errors.Wrapf(err, "failed to mount %s", cr.ID()) } @@ -262,7 +248,7 @@ func (cr *cacheRecord) mount(ctx context.Context, readonly bool) (snapshot.Mount } if cr.equalMutable != nil && readonly { - m, err := cr.cm.Snapshotter.Mounts(ctx, getSnapshotID(cr.equalMutable.md)) + m, err := cr.cm.Snapshotter.Mounts(ctx, cr.equalMutable.getSnapshotID()) if err != nil { return nil, errors.Wrapf(err, "failed to mount %s", cr.equalMutable.ID()) } @@ -285,7 +271,7 @@ func (cr *cacheRecord) mount(ctx context.Context, readonly bool) (snapshot.Mount return nil, err } ctx = leases.WithLease(ctx, l.ID) - m, err := cr.cm.Snapshotter.View(ctx, view, getSnapshotID(cr.md)) + m, err := cr.cm.Snapshotter.View(ctx, view, cr.getSnapshotID()) if err != nil { cr.cm.LeaseManager.Delete(context.TODO(), leases.Lease{ID: l.ID}) return nil, errors.Wrapf(err, "failed to mount %s", cr.ID()) @@ -312,16 +298,12 @@ func (cr *cacheRecord) remove(ctx context.Context, removeSnapshot bool) error { return errors.Wrapf(err, "failed to remove %s", cr.ID()) } } - if err := cr.cm.md.Clear(cr.ID()); err != nil { + if err := cr.cm.MetadataStore.Clear(cr.ID()); err != nil { return err } return nil } -func (cr *cacheRecord) ID() string { - return cr.md.ID() -} - type immutableRef struct { *cacheRecord triggerLastUsed bool @@ -348,23 +330,11 @@ func (sr *immutableRef) Parent() ImmutableRef { return nil } -func (sr *immutableRef) Info() RefInfo { - return RefInfo{ - ChainID: digest.Digest(getChainID(sr.md)), - DiffID: digest.Digest(getDiffID(sr.md)), - Blob: digest.Digest(getBlob(sr.md)), - MediaType: getMediaType(sr.md), - BlobChainID: digest.Digest(getBlobChainID(sr.md)), - SnapshotID: getSnapshotID(sr.md), - Extracted: !getBlobOnly(sr.md), - } -} - func (sr *immutableRef) ociDesc(ctx context.Context, dhs DescHandlers) (ocispecs.Descriptor, error) { desc := ocispecs.Descriptor{ - Digest: digest.Digest(getBlob(sr.md)), - Size: getBlobSize(sr.md), - MediaType: getMediaType(sr.md), + Digest: sr.getBlob(), + Size: sr.getBlobSize(), + MediaType: sr.getMediaType(), Annotations: make(map[string]string), } @@ -379,12 +349,12 @@ func (sr *immutableRef) ociDesc(ctx context.Context, dhs DescHandlers) (ocispecs } } - diffID := getDiffID(sr.md) + diffID := sr.getDiffID() if diffID != "" { - desc.Annotations["containerd.io/uncompressed"] = diffID + desc.Annotations["containerd.io/uncompressed"] = string(diffID) } - createdAt := GetCreatedAt(sr.md) + createdAt := sr.GetCreatedAt() if !createdAt.IsZero() { createdAt, err := createdAt.MarshalText() if err != nil { @@ -408,7 +378,7 @@ func compressionVariantDigestLabel(compressionType compression.Type) string { func (sr *immutableRef) getCompressionBlob(ctx context.Context, compressionType compression.Type) (ocispecs.Descriptor, error) { cs := sr.cm.ContentStore - info, err := cs.Info(ctx, digest.Digest(getBlob(sr.md))) + info, err := cs.Info(ctx, sr.getBlob()) if err != nil { return ocispecs.Descriptor{}, err } @@ -431,7 +401,7 @@ func (sr *immutableRef) addCompressionBlob(ctx context.Context, desc ocispecs.De }); err != nil { return err } - info, err := cs.Info(ctx, digest.Digest(getBlob(sr.md))) + info, err := cs.Info(ctx, sr.getBlob()) if err != nil { return err } @@ -548,7 +518,7 @@ func (sr *immutableRef) Mount(ctx context.Context, readonly bool, s session.Grou } func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr error) { - if !getBlobOnly(sr.md) { + if !sr.getBlobOnly() { return } @@ -558,7 +528,7 @@ func (sr *immutableRef) Extract(ctx context.Context, s session.Group) (rerr erro } defer done(ctx) - if GetLayerType(sr) == "windows" { + if sr.GetLayerType() == "windows" { ctx = winlayers.UseWindowsLayerMode(ctx) } @@ -581,7 +551,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, dhs := sr.descHandlers for _, r := range sr.parentRefChain() { r := r - info, err := r.cm.Snapshotter.Stat(ctx, getSnapshotID(r.md)) + info, err := r.cm.Snapshotter.Stat(ctx, r.getSnapshotID()) if err != nil && !errdefs.IsNotFound(err) { return err } else if errdefs.IsNotFound(err) { @@ -589,7 +559,7 @@ func (sr *immutableRef) withRemoteSnapshotLabelsStargzMode(ctx context.Context, } else if _, ok := info.Labels["containerd.io/snapshot/remote"]; !ok { continue // This isn't a remote snapshot; skip } - dh := dhs[digest.Digest(getBlob(r.md))] + dh := dhs[digest.Digest(r.getBlob())] if dh == nil { continue // no info passed; skip } @@ -623,12 +593,12 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s dhs := sr.descHandlers for _, r := range sr.parentRefChain() { r := r - snapshotID := getSnapshotID(r.md) + snapshotID := r.getSnapshotID() if _, err := r.cm.Snapshotter.Stat(ctx, snapshotID); err == nil { continue } - dh := dhs[digest.Digest(getBlob(r.md))] + dh := dhs[digest.Digest(r.getBlob())] if dh == nil { // We cannot prepare remote snapshots without descHandler. return nil, nil @@ -646,7 +616,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s // Prepare remote snapshots var ( - key = fmt.Sprintf("tmp-%s %s", identity.NewID(), r.Info().ChainID) + key = fmt.Sprintf("tmp-%s %s", identity.NewID(), r.getChainID()) opts = []snapshots.Opt{ snapshots.WithLabels(defaultLabels), snapshots.WithLabels(tmpLabels), @@ -654,7 +624,7 @@ func (sr *immutableRef) prepareRemoteSnapshotsStargzMode(ctx context.Context, s ) parentID := "" if r.parent != nil { - parentID = getSnapshotID(r.parent.md) + parentID = r.parent.getSnapshotID() } if err := r.cm.Snapshotter.Prepare(ctx, key, parentID, opts...); err != nil { if errdefs.IsAlreadyExists(err) { @@ -707,7 +677,7 @@ func makeTmpLabelsStargzMode(labels map[string]string, s session.Group) (fields func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session.Group) error { _, err := sr.sizeG.Do(ctx, sr.ID()+"-extract", func(ctx context.Context) (_ interface{}, rerr error) { - snapshotID := getSnapshotID(sr.md) + snapshotID := sr.getSnapshotID() if _, err := sr.cm.Snapshotter.Stat(ctx, snapshotID); err == nil { return nil, nil } @@ -724,7 +694,7 @@ func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session if err := sr.parent.extract(egctx, dhs, s); err != nil { return err } - parentID = getSnapshotID(sr.parent.md) + parentID = sr.parent.getSnapshotID() return nil }) } @@ -756,7 +726,7 @@ func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session defer statusDone() } - key := fmt.Sprintf("extract-%s %s", identity.NewID(), sr.Info().ChainID) + key := fmt.Sprintf("extract-%s %s", identity.NewID(), sr.getChainID()) err = sr.cm.Snapshotter.Prepare(ctx, key, parentID) if err != nil { @@ -780,14 +750,14 @@ func (sr *immutableRef) extract(ctx context.Context, dhs DescHandlers, s session if err := unmount(); err != nil { return nil, err } - if err := sr.cm.Snapshotter.Commit(ctx, getSnapshotID(sr.md), key); err != nil { + if err := sr.cm.Snapshotter.Commit(ctx, sr.getSnapshotID(), key); err != nil { if !errors.Is(err, errdefs.ErrAlreadyExists) { return nil, err } } - queueBlobOnly(sr.md, false) - setSize(sr.md, sizeUnknown) - if err := sr.md.Commit(); err != nil { + sr.queueBlobOnly(false) + sr.queueSize(sizeUnknown) + if err := sr.commitMetadata(); err != nil { return nil, err } return nil, nil @@ -805,7 +775,7 @@ func (sr *immutableRef) Release(ctx context.Context) error { return sr.release(ctx) } -func (sr *immutableRef) updateLastUsed() bool { +func (sr *immutableRef) shouldUpdateLastUsed() bool { return sr.triggerLastUsed } @@ -814,7 +784,7 @@ func (sr *immutableRef) updateLastUsedNow() bool { return false } for r := range sr.refs { - if r.updateLastUsed() { + if r.shouldUpdateLastUsed() { return false } } @@ -825,7 +795,7 @@ func (sr *immutableRef) release(ctx context.Context) error { delete(sr.refs, sr) if sr.updateLastUsedNow() { - updateLastUsed(sr.md) + sr.updateLastUsed() if sr.equalMutable != nil { sr.equalMutable.triggerLastUsed = true } @@ -854,10 +824,6 @@ func (sr *immutableRef) finalizeLocked(ctx context.Context) error { return sr.finalize(ctx) } -func (cr *cacheRecord) Metadata() *metadata.StorageItem { - return cr.md -} - // caller must hold cacheRecord.mu func (cr *cacheRecord) finalize(ctx context.Context) error { mutable := cr.equalMutable @@ -901,11 +867,12 @@ func (cr *cacheRecord) finalize(ctx context.Context) error { }() cr.equalMutable = nil - clearEqualMutable(cr.md) - return cr.md.Commit() + cr.clearEqualMutable() + cr.queueSnapshotID(cr.ID()) + return cr.commitMetadata() } -func (sr *mutableRef) updateLastUsed() bool { +func (sr *mutableRef) shouldUpdateLastUsed() bool { return sr.triggerLastUsed } @@ -915,18 +882,18 @@ func (sr *mutableRef) commit(ctx context.Context) (*immutableRef, error) { } id := identity.NewID() - md, _ := sr.cm.md.Get(id) + md, _ := sr.cm.getMetadata(id) rec := &cacheRecord{ - mu: sr.mu, - cm: sr.cm, - parent: sr.parentRef(false, sr.descHandlers), - equalMutable: sr, - refs: make(map[ref]struct{}), - md: md, + mu: sr.mu, + cm: sr.cm, + parent: sr.parentRef(false, sr.descHandlers), + equalMutable: sr, + refs: make(map[ref]struct{}), + cacheMetadata: md, } - if descr := GetDescription(sr.md); descr != "" { - if err := queueDescription(md, descr); err != nil { + if descr := sr.GetDescription(); descr != "" { + if err := md.queueDescription(descr); err != nil { return nil, err } } @@ -935,20 +902,21 @@ func (sr *mutableRef) commit(ctx context.Context) (*immutableRef, error) { if rec.parent != nil { parentID = rec.parent.ID() } - if err := initializeMetadata(rec, parentID); err != nil { + if err := initializeMetadata(rec.cacheMetadata, parentID); err != nil { return nil, err } sr.cm.records[id] = rec - if err := sr.md.Commit(); err != nil { + if err := sr.commitMetadata(); err != nil { return nil, err } - queueCommitted(md) - setSize(md, sizeUnknown) - setEqualMutable(md, sr.ID()) - if err := md.Commit(); err != nil { + md.queueCommitted(true) + md.queueSize(sizeUnknown) + md.queueSnapshotID(sr.getSnapshotID()) + md.setEqualMutable(sr.ID()) + if err := md.commitMetadata(); err != nil { return nil, err } @@ -999,11 +967,11 @@ func (sr *mutableRef) Release(ctx context.Context) error { func (sr *mutableRef) release(ctx context.Context) error { delete(sr.refs, sr) - if getCachePolicy(sr.md) != cachePolicyRetain { + if !sr.HasCachePolicyRetain() { if sr.equalImmutable != nil { - if getCachePolicy(sr.equalImmutable.md) == cachePolicyRetain { - if sr.updateLastUsed() { - updateLastUsed(sr.md) + if sr.equalImmutable.HasCachePolicyRetain() { + if sr.shouldUpdateLastUsed() { + sr.updateLastUsed() sr.triggerLastUsed = false } return nil @@ -1014,8 +982,8 @@ func (sr *mutableRef) release(ctx context.Context) error { } return sr.remove(ctx, true) } - if sr.updateLastUsed() { - updateLastUsed(sr.md) + if sr.shouldUpdateLastUsed() { + sr.updateLastUsed() sr.triggerLastUsed = false } return nil diff --git a/cache/remote.go b/cache/remote.go index d33c3a1f0914..e1a63452b20f 100644 --- a/cache/remote.go +++ b/cache/remote.go @@ -70,7 +70,7 @@ func (sr *immutableRef) GetRemote(ctx context.Context, createIfNeeded bool, comp if err != nil { return nil, err } else if isLazy { - imageRefs := getImageRefs(ref.md) + imageRefs := ref.getImageRefs() for _, imageRef := range imageRefs { refspec, err := reference.Parse(imageRef) if err != nil { @@ -214,13 +214,11 @@ func (p lazyRefProvider) Unlazy(ctx context.Context) error { return nil, err } - if imageRefs := getImageRefs(p.ref.md); len(imageRefs) > 0 { + if imageRefs := p.ref.getImageRefs(); len(imageRefs) > 0 { // just use the first image ref, it's arbitrary imageRef := imageRefs[0] - if GetDescription(p.ref.md) == "" { - queueDescription(p.ref.md, "pulled from "+imageRef) - err := p.ref.md.Commit() - if err != nil { + if p.ref.GetDescription() == "" { + if err := p.ref.SetDescription("pulled from " + imageRef); err != nil { return nil, err } } diff --git a/exporter/containerimage/writer.go b/exporter/containerimage/writer.go index e54ba36dfb7f..2d92fadc6de7 100644 --- a/exporter/containerimage/writer.go +++ b/exporter/containerimage/writer.go @@ -504,10 +504,10 @@ func getRefMetadata(ref cache.ImmutableRef, limit int) []refMetadata { if ref == nil { return append(getRefMetadata(nil, limit-1), meta) } - if descr := cache.GetDescription(ref.Metadata()); descr != "" { + if descr := ref.GetDescription(); descr != "" { meta.description = descr } - createdAt := cache.GetCreatedAt(ref.Metadata()) + createdAt := ref.GetCreatedAt() meta.createdAt = &createdAt p := ref.Parent() if p != nil { diff --git a/frontend/gateway/container.go b/frontend/gateway/container.go index 1951e26eb9b6..ec418f93d7e6 100644 --- a/frontend/gateway/container.go +++ b/frontend/gateway/container.go @@ -78,7 +78,7 @@ func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g s } name := fmt.Sprintf("container %s", req.ContainerID) - mm := mounts.NewMountManager(name, w.CacheManager(), sm, w.MetadataStore()) + mm := mounts.NewMountManager(name, w.CacheManager(), sm) p, err := PrepareMounts(ctx, mm, w.CacheManager(), g, "", mnts, refs, func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) { cm := w.CacheManager() if m.Input != opspb.Empty { diff --git a/solver/llbsolver/file/refmanager.go b/solver/llbsolver/file/refmanager.go index 600d41beac0b..e1c58c1e5453 100644 --- a/solver/llbsolver/file/refmanager.go +++ b/solver/llbsolver/file/refmanager.go @@ -39,8 +39,7 @@ func (rm *RefManager) Prepare(ctx context.Context, ref fileoptypes.Ref, readonly } defer func() { if rerr != nil { - cache.CachePolicyDefault(mr) - if err := mr.Metadata().Commit(); err != nil { + if err := mr.SetCachePolicyDefault(); err != nil { bklog.G(ctx).Errorf("failed to reset FileOp mutable ref cachepolicy: %v", err) } mr.Release(context.TODO()) diff --git a/solver/llbsolver/mounts/mount.go b/solver/llbsolver/mounts/mount.go index 7731a2ccc99b..d0dfef9fa74c 100644 --- a/solver/llbsolver/mounts/mount.go +++ b/solver/llbsolver/mounts/mount.go @@ -15,7 +15,6 @@ import ( "github.com/containerd/containerd/pkg/userns" "github.com/docker/docker/pkg/idtools" "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" @@ -26,16 +25,14 @@ import ( "github.com/moby/buildkit/util/grpcerrors" "github.com/moby/locker" "github.com/pkg/errors" - bolt "go.etcd.io/bbolt" "google.golang.org/grpc/codes" ) -func NewMountManager(name string, cm cache.Manager, sm *session.Manager, md *metadata.Store) *MountManager { +func NewMountManager(name string, cm cache.Manager, sm *session.Manager) *MountManager { return &MountManager{ cm: cm, sm: sm, cacheMounts: map[string]*cacheRefShare{}, - md: md, managerName: name, } } @@ -45,7 +42,6 @@ type MountManager struct { sm *session.Manager cacheMountsMu sync.Mutex cacheMounts map[string]*cacheRefShare - md *metadata.Store managerName string } @@ -54,7 +50,6 @@ func (mm *MountManager) getRefCacheDir(ctx context.Context, ref cache.ImmutableR locker: &mm.cacheMountsMu, cacheMounts: mm.cacheMounts, cm: mm.cm, - md: mm.md, globalCacheRefs: sharedCacheRefs, name: fmt.Sprintf("cached mount %s from %s", m.Dest, mm.managerName), session: s, @@ -66,14 +61,13 @@ type cacheRefGetter struct { locker sync.Locker cacheMounts map[string]*cacheRefShare cm cache.Manager - md *metadata.Store globalCacheRefs *cacheRefs name string session session.Group } func (g *cacheRefGetter) getRefCacheDir(ctx context.Context, ref cache.ImmutableRef, id string, sharing pb.CacheSharingOpt) (mref cache.MutableRef, err error) { - key := "cache-dir:" + id + key := id if ref != nil { key += ":" + ref.ID() } @@ -114,7 +108,7 @@ func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string, cacheRefsLocker.Lock(key) defer cacheRefsLocker.Unlock(key) for { - sis, err := g.md.Search(key) + sis, err := SearchCacheDir(ctx, g.cm, key) if err != nil { return nil, err } @@ -145,16 +139,8 @@ func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string, return nil, err } - si, _ := g.md.Get(mRef.ID()) - v, err := metadata.NewValue(key) - if err != nil { - mRef.Release(context.TODO()) - return nil, err - } - v.Index = key - if err := si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, key, v) - }); err != nil { + md := CacheRefMetadata{mRef} + if err := md.setCacheDirIndex(key); err != nil { mRef.Release(context.TODO()) return nil, err } @@ -518,3 +504,30 @@ func (r *cacheRef) Release(ctx context.Context) error { } return nil } + +const keyCacheDir = "cache-dir" +const cacheDirIndex = keyCacheDir + ":" + +func SearchCacheDir(ctx context.Context, store cache.MetadataStore, id string) ([]CacheRefMetadata, error) { + var results []CacheRefMetadata + mds, err := store.Search(ctx, cacheDirIndex+id) + if err != nil { + return nil, err + } + for _, md := range mds { + results = append(results, CacheRefMetadata{md}) + } + return results, nil +} + +type CacheRefMetadata struct { + cache.RefMetadata +} + +func (md CacheRefMetadata) setCacheDirIndex(id string) error { + return md.SetString(keyCacheDir, id, cacheDirIndex+id) +} + +func (md CacheRefMetadata) ClearCacheDirIndex() error { + return md.ClearValueAndIndex(keyCacheDir, cacheDirIndex) +} diff --git a/solver/llbsolver/mounts/mount_test.go b/solver/llbsolver/mounts/mount_test.go index 796281e7b813..2d78ce458b44 100644 --- a/solver/llbsolver/mounts/mount_test.go +++ b/solver/llbsolver/mounts/mount_test.go @@ -39,7 +39,6 @@ type cmOut struct { manager cache.Manager lm leases.Manager cs content.Store - md *metadata.Store } func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() error, err error) { @@ -89,11 +88,6 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() opt.snapshotter = snapshotter } - md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) - if err != nil { - return nil, nil, err - } - store, err := local.NewStore(tmpdir) if err != nil { return nil, nil, err @@ -116,6 +110,11 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() lm := ctdmetadata.NewLeaseManager(mdb) + md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) + if err != nil { + return nil, nil, err + } + cm, err := cache.NewManager(cache.ManagerOpt{ Snapshotter: snapshot.FromContainerdSnapshotter(opt.snapshotterName, containerdsnapshot.NSSnapshotter(ns, mdb.Snapshotter(opt.snapshotterName)), nil), MetadataStore: md, @@ -131,16 +130,14 @@ func newCacheManager(ctx context.Context, opt cmOpt) (co *cmOut, cleanup func() manager: cm, lm: lm, cs: mdb.ContentStore(), - md: md, }, cleanup, nil } -func newRefGetter(m cache.Manager, md *metadata.Store, shared *cacheRefs) *cacheRefGetter { +func newRefGetter(m cache.Manager, shared *cacheRefs) *cacheRefGetter { return &cacheRefGetter{ locker: &sync.Mutex{}, cacheMounts: map[string]*cacheRefShare{}, cm: m, - md: md, globalCacheRefs: shared, } } @@ -164,10 +161,10 @@ func TestCacheMountPrivateRefs(t *testing.T) { defer cleanup() - g1 := newRefGetter(co.manager, co.md, sharedCacheRefs) - g2 := newRefGetter(co.manager, co.md, sharedCacheRefs) - g3 := newRefGetter(co.manager, co.md, sharedCacheRefs) - g4 := newRefGetter(co.manager, co.md, sharedCacheRefs) + g1 := newRefGetter(co.manager, sharedCacheRefs) + g2 := newRefGetter(co.manager, sharedCacheRefs) + g3 := newRefGetter(co.manager, sharedCacheRefs) + g4 := newRefGetter(co.manager, sharedCacheRefs) ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_PRIVATE) require.NoError(t, err) @@ -231,9 +228,9 @@ func TestCacheMountSharedRefs(t *testing.T) { defer cleanup() - g1 := newRefGetter(co.manager, co.md, sharedCacheRefs) - g2 := newRefGetter(co.manager, co.md, sharedCacheRefs) - g3 := newRefGetter(co.manager, co.md, sharedCacheRefs) + g1 := newRefGetter(co.manager, sharedCacheRefs) + g2 := newRefGetter(co.manager, sharedCacheRefs) + g3 := newRefGetter(co.manager, sharedCacheRefs) ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED) require.NoError(t, err) @@ -281,8 +278,8 @@ func TestCacheMountLockedRefs(t *testing.T) { defer cleanup() - g1 := newRefGetter(co.manager, co.md, sharedCacheRefs) - g2 := newRefGetter(co.manager, co.md, sharedCacheRefs) + g1 := newRefGetter(co.manager, sharedCacheRefs) + g2 := newRefGetter(co.manager, sharedCacheRefs) ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_LOCKED) require.NoError(t, err) @@ -346,8 +343,8 @@ func TestCacheMountSharedRefsDeadlock(t *testing.T) { var sharedCacheRefs = &cacheRefs{} - g1 := newRefGetter(co.manager, co.md, sharedCacheRefs) - g2 := newRefGetter(co.manager, co.md, sharedCacheRefs) + g1 := newRefGetter(co.manager, sharedCacheRefs) + g2 := newRefGetter(co.manager, sharedCacheRefs) ref, err := g1.getRefCacheDir(ctx, nil, "foo", pb.CacheSharingOpt_SHARED) require.NoError(t, err) diff --git a/solver/llbsolver/ops/exec.go b/solver/llbsolver/ops/exec.go index 121621321491..3ec606523e0b 100644 --- a/solver/llbsolver/ops/exec.go +++ b/solver/llbsolver/ops/exec.go @@ -12,7 +12,6 @@ import ( "github.com/containerd/containerd/platforms" "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/executor" "github.com/moby/buildkit/frontend/gateway" "github.com/moby/buildkit/session" @@ -44,14 +43,14 @@ type execOp struct { parallelism *semaphore.Weighted } -func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, parallelism *semaphore.Weighted, sm *session.Manager, md *metadata.Store, exec executor.Executor, w worker.Worker) (solver.Op, error) { +func NewExecOp(v solver.Vertex, op *pb.Op_Exec, platform *pb.Platform, cm cache.Manager, parallelism *semaphore.Weighted, sm *session.Manager, exec executor.Executor, w worker.Worker) (solver.Op, error) { if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil { return nil, err } name := fmt.Sprintf("exec %s", strings.Join(op.Exec.Meta.Args, " ")) return &execOp{ op: op.Exec, - mm: mounts.NewMountManager(name, cm, sm, md), + mm: mounts.NewMountManager(name, cm, sm), cm: cm, exec: exec, numInputs: len(v.Inputs()), diff --git a/solver/llbsolver/ops/file.go b/solver/llbsolver/ops/file.go index 540ce7f21b17..27deec5ca431 100644 --- a/solver/llbsolver/ops/file.go +++ b/solver/llbsolver/ops/file.go @@ -11,7 +11,6 @@ import ( "sync" "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/solver/llbsolver" @@ -31,20 +30,20 @@ const fileCacheType = "buildkit.file.v0" type fileOp struct { op *pb.FileOp - md *metadata.Store + md cache.MetadataStore w worker.Worker solver *FileOpSolver numInputs int parallelism *semaphore.Weighted } -func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, parallelism *semaphore.Weighted, md *metadata.Store, w worker.Worker) (solver.Op, error) { +func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, parallelism *semaphore.Weighted, w worker.Worker) (solver.Op, error) { if err := llbsolver.ValidateOp(&pb.Op{Op: op}); err != nil { return nil, err } return &fileOp{ op: op.File, - md: md, + md: cm, numInputs: len(v.Inputs()), w: w, solver: NewFileOpSolver(w, &file.Backend{}, file.NewRefManager(cm)), diff --git a/source/containerimage/pull.go b/source/containerimage/pull.go index ad7310fc0206..5773c8ae4805 100644 --- a/source/containerimage/pull.go +++ b/source/containerimage/pull.go @@ -345,8 +345,8 @@ func (p *puller) Snapshot(ctx context.Context, g session.Group) (ir cache.Immuta } } - if p.id.RecordType != "" && cache.GetRecordType(current) == "" { - if err := cache.SetRecordType(current, p.id.RecordType); err != nil { + if p.id.RecordType != "" && current.GetRecordType() == "" { + if err := current.SetRecordType(p.id.RecordType); err != nil { return nil, err } } @@ -361,7 +361,7 @@ func markRefLayerTypeWindows(ref cache.ImmutableRef) error { return err } } - return cache.SetLayerType(ref, "windows") + return ref.SetLayerType("windows") } // cacheKeyFromConfig returns a stable digest from image config. If image config diff --git a/source/git/gitsource.go b/source/git/gitsource.go index 23a9a3a0ac3d..7a81e8e82aa2 100644 --- a/source/git/gitsource.go +++ b/source/git/gitsource.go @@ -17,10 +17,7 @@ import ( "strconv" "strings" - "github.com/moby/buildkit/util/bklog" - "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" @@ -29,10 +26,10 @@ import ( "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/source" + "github.com/moby/buildkit/util/bklog" "github.com/moby/buildkit/util/progress/logs" "github.com/moby/locker" "github.com/pkg/errors" - bolt "go.etcd.io/bbolt" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -42,11 +39,9 @@ var defaultBranch = regexp.MustCompile(`refs/heads/(\S+)`) type Opt struct { CacheAccessor cache.Accessor - MetadataStore *metadata.Store } type gitSource struct { - md *metadata.Store cache cache.Accessor locker *locker.Locker } @@ -61,7 +56,6 @@ func Supported() error { func NewSource(opt Opt) (source.Source, error) { gs := &gitSource{ - md: opt.MetadataStore, cache: opt.CacheAccessor, locker: locker.New(), } @@ -74,9 +68,7 @@ func (gs *gitSource) ID() string { // needs to be called with repo lock func (gs *gitSource) mountRemote(ctx context.Context, remote string, auth []string, g session.Group) (target string, release func(), retErr error) { - remoteKey := "git-remote::" + remote - - sis, err := gs.md.Search(remoteKey) + sis, err := searchGitRemote(ctx, gs.cache, remote) if err != nil { return "", nil, errors.Wrapf(err, "failed to search metadata for %s", redactCredentials(remote)) } @@ -140,17 +132,9 @@ func (gs *gitSource) mountRemote(ctx context.Context, remote string, auth []stri return "", nil, errors.Wrapf(err, "failed add origin repo at %s", dir) } - // same new remote metadata - si, _ := gs.md.Get(remoteRef.ID()) - v, err := metadata.NewValue(remoteKey) - if err != nil { - return "", nil, err - } - v.Index = remoteKey - - if err := si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, "git-remote", v) - }); err != nil { + // save new remote metadata + md := cacheRefMetadata{remoteRef} + if err := md.setGitRemote(remote); err != nil { return "", nil, err } } @@ -387,11 +371,11 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context, g session.Group) (out gs.getAuthToken(ctx, g) - snapshotKey := "git-snapshot::" + cacheKey + ":" + gs.src.Subdir + snapshotKey := cacheKey + ":" + gs.src.Subdir gs.locker.Lock(snapshotKey) defer gs.locker.Unlock(snapshotKey) - sis, err := gs.md.Search(snapshotKey) + sis, err := searchGitSnapshot(ctx, gs.cache, snapshotKey) if err != nil { return nil, errors.Wrapf(err, "failed to search metadata for %s", snapshotKey) } @@ -601,19 +585,10 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context, g session.Group) (out } }() - si, _ := gs.md.Get(snap.ID()) - v, err := metadata.NewValue(snapshotKey) - if err != nil { - return nil, err - } - v.Index = snapshotKey - - if err := si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, "git-snapshot", v) - }); err != nil { + md := cacheRefMetadata{snap} + if err := md.setGitSnapshot(snapshotKey); err != nil { return nil, err } - return snap, nil } @@ -709,3 +684,40 @@ func getDefaultBranch(ctx context.Context, gitDir, workDir, sshAuthSock, knownHo } return ss[0][1], nil } + +const keyGitRemote = "git-remote" +const gitRemoteIndex = keyGitRemote + "::" +const keyGitSnapshot = "git-snapshot" +const gitSnapshotIndex = keyGitSnapshot + "::" + +func search(ctx context.Context, store cache.MetadataStore, key string, idx string) ([]cacheRefMetadata, error) { + var results []cacheRefMetadata + mds, err := store.Search(ctx, idx+key) + if err != nil { + return nil, err + } + for _, md := range mds { + results = append(results, cacheRefMetadata{md}) + } + return results, nil +} + +func searchGitRemote(ctx context.Context, store cache.MetadataStore, remote string) ([]cacheRefMetadata, error) { + return search(ctx, store, remote, gitRemoteIndex) +} + +func searchGitSnapshot(ctx context.Context, store cache.MetadataStore, key string) ([]cacheRefMetadata, error) { + return search(ctx, store, key, gitSnapshotIndex) +} + +type cacheRefMetadata struct { + cache.RefMetadata +} + +func (md cacheRefMetadata) setGitSnapshot(key string) error { + return md.SetString(keyGitSnapshot, key, gitSnapshotIndex+key) +} + +func (md cacheRefMetadata) setGitRemote(key string) error { + return md.SetString(keyGitRemote, key, gitRemoteIndex+key) +} diff --git a/source/git/gitsource_test.go b/source/git/gitsource_test.go index 62c7b82f2459..116eac540381 100644 --- a/source/git/gitsource_test.go +++ b/source/git/gitsource_test.go @@ -428,9 +428,6 @@ func setupGitSource(t *testing.T, tmpdir string) source.Source { snapshotter, err := native.NewSnapshotter(filepath.Join(tmpdir, "snapshots")) assert.NoError(t, err) - md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) - assert.NoError(t, err) - store, err := local.NewStore(tmpdir) require.NoError(t, err) @@ -441,6 +438,9 @@ func setupGitSource(t *testing.T, tmpdir string) source.Source { "native": snapshotter, }) + md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) + require.NoError(t, err) + cm, err := cache.NewManager(cache.ManagerOpt{ Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil), MetadataStore: md, @@ -452,7 +452,6 @@ func setupGitSource(t *testing.T, tmpdir string) source.Source { gs, err := NewSource(Opt{ CacheAccessor: cm, - MetadataStore: md, }) require.NoError(t, err) diff --git a/source/http/httpsource.go b/source/http/httpsource.go index dd9c12763bfe..a2d4157ba269 100644 --- a/source/http/httpsource.go +++ b/source/http/httpsource.go @@ -17,7 +17,6 @@ import ( "github.com/docker/docker/pkg/idtools" "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/session" "github.com/moby/buildkit/snapshot" "github.com/moby/buildkit/solver" @@ -26,17 +25,14 @@ import ( "github.com/moby/locker" digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" - bolt "go.etcd.io/bbolt" ) type Opt struct { CacheAccessor cache.Accessor - MetadataStore *metadata.Store Transport http.RoundTripper } type httpSource struct { - md *metadata.Store cache cache.Accessor locker *locker.Locker transport http.RoundTripper @@ -48,7 +44,6 @@ func NewSource(opt Opt) (source.Source, error) { transport = tracing.DefaultTransport } hs := &httpSource{ - md: opt.MetadataStore, cache: opt.CacheAccessor, locker: locker.New(), transport: transport, @@ -135,7 +130,7 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, inde } // look up metadata(previously stored headers) for that URL - sis, err := hs.md.Search(uh.String()) + mds, err := searchHTTPURLDigest(ctx, hs.cache, uh) if err != nil { return "", nil, false, errors.Wrapf(err, "failed to search metadata for %s", uh) } @@ -145,19 +140,19 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, inde return "", nil, false, err } req = req.WithContext(ctx) - m := map[string]*metadata.StorageItem{} + m := map[string]cacheRefMetadata{} // If we request a single ETag in 'If-None-Match', some servers omit the // unambiguous ETag in their response. // See: https://github.com/moby/buildkit/issues/905 var onlyETag string - if len(sis) > 0 { - for _, si := range sis { + if len(mds) > 0 { + for _, md := range mds { // if metaDigest := getMetaDigest(si); metaDigest == hs.formatCacheKey("") { - if etag := getETag(si); etag != "" { - if dgst := getChecksum(si); dgst != "" { - m[etag] = si + if etag := md.getETag(); etag != "" { + if dgst := md.getHTTPChecksum(); dgst != "" { + m[etag] = md } } // } @@ -192,12 +187,12 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, inde if respETag == "" && onlyETag != "" && resp.StatusCode == http.StatusNotModified { respETag = onlyETag } - si, ok := m[respETag] + md, ok := m[respETag] if ok { - hs.refID = si.ID() - dgst := getChecksum(si) + hs.refID = md.ID() + dgst := md.getHTTPChecksum() if dgst != "" { - modTime := getModTime(si) + modTime := md.getHTTPModTime() resp.Body.Close() return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, modTime).String(), nil, true, nil } @@ -224,16 +219,16 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, inde // to .save() resp.Header.Set("ETag", onlyETag) } - si, ok := m[respETag] + md, ok := m[respETag] if !ok { return "", nil, false, errors.Errorf("invalid not-modified ETag: %v", respETag) } - hs.refID = si.ID() - dgst := getChecksum(si) + hs.refID = md.ID() + dgst := md.getHTTPChecksum() if dgst == "" { return "", nil, false, errors.Errorf("invalid metadata change") } - modTime := getModTime(si) + modTime := md.getHTTPModTime() resp.Body.Close() return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, resp), dgst, modTime).String(), nil, true, nil } @@ -348,24 +343,28 @@ func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response, s se return nil, "", err } newRef = nil + md := cacheRefMetadata{ref} hs.refID = ref.ID() dgst = digest.NewDigest(digest.SHA256, h) if respETag := resp.Header.Get("ETag"); respETag != "" { - setETag(ref.Metadata(), respETag) + if err := md.setETag(respETag); err != nil { + return nil, "", err + } uh, err := hs.urlHash() if err != nil { return nil, "", err } - setChecksum(ref.Metadata(), uh.String(), dgst) - if err := ref.Metadata().Commit(); err != nil { + if err := md.setHTTPChecksum(uh, dgst); err != nil { return nil, "", err } } if modTime := resp.Header.Get("Last-Modified"); modTime != "" { - setModTime(ref.Metadata(), modTime) + if err := md.setHTTPModTime(modTime); err != nil { + return nil, "", err + } } return ref, dgst, nil @@ -404,84 +403,6 @@ func (hs *httpSourceHandler) Snapshot(ctx context.Context, g session.Group) (cac return ref, nil } -const keyETag = "etag" -const keyChecksum = "http.checksum" -const keyModTime = "http.modtime" - -func setETag(si *metadata.StorageItem, s string) error { - v, err := metadata.NewValue(s) - if err != nil { - return errors.Wrap(err, "failed to create etag value") - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyETag, v) - }) - return nil -} - -func getETag(si *metadata.StorageItem) string { - v := si.Get(keyETag) - if v == nil { - return "" - } - var etag string - if err := v.Unmarshal(&etag); err != nil { - return "" - } - return etag -} - -func setModTime(si *metadata.StorageItem, s string) error { - v, err := metadata.NewValue(s) - if err != nil { - return errors.Wrap(err, "failed to create modtime value") - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyModTime, v) - }) - return nil -} - -func getModTime(si *metadata.StorageItem) string { - v := si.Get(keyModTime) - if v == nil { - return "" - } - var modTime string - if err := v.Unmarshal(&modTime); err != nil { - return "" - } - return modTime -} - -func setChecksum(si *metadata.StorageItem, url string, d digest.Digest) error { - v, err := metadata.NewValue(d) - if err != nil { - return errors.Wrap(err, "failed to create checksum value") - } - v.Index = url - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, keyChecksum, v) - }) - return nil -} - -func getChecksum(si *metadata.StorageItem) digest.Digest { - v := si.Get(keyChecksum) - if v == nil { - return "" - } - var dgstStr string - if err := v.Unmarshal(&dgstStr); err != nil { - return "" - } - dgst, err := digest.Parse(dgstStr) - if err != nil { - return "" - } - return dgst -} - func getFileName(urlStr, manualFilename string, resp *http.Response) string { if manualFilename != "" { return manualFilename @@ -505,3 +426,47 @@ func getFileName(urlStr, manualFilename string, resp *http.Response) string { } return "download" } + +func searchHTTPURLDigest(ctx context.Context, store cache.MetadataStore, dgst digest.Digest) ([]cacheRefMetadata, error) { + var results []cacheRefMetadata + mds, err := store.Search(ctx, string(dgst)) + if err != nil { + return nil, err + } + for _, md := range mds { + results = append(results, cacheRefMetadata{md}) + } + return results, nil +} + +type cacheRefMetadata struct { + cache.RefMetadata +} + +const keyHTTPChecksum = "http.checksum" +const keyETag = "etag" +const keyModTime = "http.modtime" + +func (md cacheRefMetadata) getHTTPChecksum() digest.Digest { + return digest.Digest(md.GetString(keyHTTPChecksum)) +} + +func (md cacheRefMetadata) setHTTPChecksum(urlDgst digest.Digest, d digest.Digest) error { + return md.SetString(keyHTTPChecksum, d.String(), urlDgst.String()) +} + +func (md cacheRefMetadata) getETag() string { + return md.GetString(keyETag) +} + +func (md cacheRefMetadata) setETag(s string) error { + return md.SetString(keyETag, s, "") +} + +func (md cacheRefMetadata) getHTTPModTime() string { + return md.GetString(keyModTime) +} + +func (md cacheRefMetadata) setHTTPModTime(s string) error { + return md.SetString(keyModTime, s, "") +} diff --git a/source/http/httpsource_test.go b/source/http/httpsource_test.go index 5049248536ca..898968d34e40 100644 --- a/source/http/httpsource_test.go +++ b/source/http/httpsource_test.go @@ -328,11 +328,6 @@ func newHTTPSource(tmpdir string) (source.Source, error) { return nil, err } - md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) - if err != nil { - return nil, err - } - store, err := local.NewStore(tmpdir) if err != nil { return nil, err @@ -347,6 +342,11 @@ func newHTTPSource(tmpdir string) (source.Source, error) { "native": snapshotter, }) + md, err := metadata.NewStore(filepath.Join(tmpdir, "metadata.db")) + if err != nil { + return nil, err + } + cm, err := cache.NewManager(cache.ManagerOpt{ Snapshotter: snapshot.FromContainerdSnapshotter("native", containerdsnapshot.NSSnapshotter("buildkit", mdb.Snapshotter("native")), nil), MetadataStore: md, @@ -360,6 +360,5 @@ func newHTTPSource(tmpdir string) (source.Source, error) { return NewSource(Opt{ CacheAccessor: cm, - MetadataStore: md, }) } diff --git a/source/local/local.go b/source/local/local.go index b97174ca34df..1eb3ab9a3c28 100644 --- a/source/local/local.go +++ b/source/local/local.go @@ -11,7 +11,6 @@ import ( "github.com/docker/docker/pkg/idtools" "github.com/moby/buildkit/cache" "github.com/moby/buildkit/cache/contenthash" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/session" "github.com/moby/buildkit/session/filesync" @@ -23,30 +22,24 @@ import ( "github.com/pkg/errors" "github.com/tonistiigi/fsutil" fstypes "github.com/tonistiigi/fsutil/types" - bolt "go.etcd.io/bbolt" "golang.org/x/time/rate" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -const keySharedKey = "local.sharedKey" - type Opt struct { CacheAccessor cache.Accessor - MetadataStore *metadata.Store } func NewSource(opt Opt) (source.Source, error) { ls := &localSource{ cm: opt.CacheAccessor, - md: opt.MetadataStore, } return ls, nil } type localSource struct { cm cache.Accessor - md *metadata.Store } func (ls *localSource) ID() string { @@ -111,10 +104,10 @@ func (ls *localSourceHandler) Snapshot(ctx context.Context, g session.Group) (ca } func (ls *localSourceHandler) snapshot(ctx context.Context, s session.Group, caller session.Caller) (out cache.ImmutableRef, retErr error) { - sharedKey := keySharedKey + ":" + ls.src.Name + ":" + ls.src.SharedKeyHint + ":" + caller.SharedKey() // TODO: replace caller.SharedKey() with source based hint from client(absolute-path+nodeid) + sharedKey := ls.src.Name + ":" + ls.src.SharedKeyHint + ":" + caller.SharedKey() // TODO: replace caller.SharedKey() with source based hint from client(absolute-path+nodeid) var mutable cache.MutableRef - sis, err := ls.md.Search(sharedKey) + sis, err := searchSharedKey(ctx, ls.cm, sharedKey) if err != nil { return nil, err } @@ -138,11 +131,10 @@ func (ls *localSourceHandler) snapshot(ctx context.Context, s session.Group, cal defer func() { if retErr != nil && mutable != nil { // on error remove the record as checksum update is in undefined state - cache.CachePolicyDefault(mutable) - if err := mutable.Metadata().Commit(); err != nil { + if err := mutable.SetCachePolicyDefault(); err != nil { bklog.G(ctx).Errorf("failed to reset mutable cachepolicy: %v", err) } - contenthash.ClearCacheContext(mutable.Metadata()) + contenthash.ClearCacheContext(mutable) go mutable.Release(context.TODO()) } }() @@ -165,7 +157,7 @@ func (ls *localSourceHandler) snapshot(ctx context.Context, s session.Group, cal } }() - cc, err := contenthash.GetCacheContext(ctx, mutable.Metadata(), mount.IdentityMapping()) + cc, err := contenthash.GetCacheContext(ctx, mutable) if err != nil { return nil, err } @@ -209,29 +201,14 @@ func (ls *localSourceHandler) snapshot(ctx context.Context, s session.Group, cal } lm = nil - if err := contenthash.SetCacheContext(ctx, mutable.Metadata(), cc); err != nil { + if err := contenthash.SetCacheContext(ctx, mutable, cc); err != nil { return nil, err } // skip storing snapshot by the shared key if it already exists - skipStoreSharedKey := false - si, _ := ls.md.Get(mutable.ID()) - if v := si.Get(keySharedKey); v != nil { - var str string - if err := v.Unmarshal(&str); err != nil { - return nil, err - } - skipStoreSharedKey = str == sharedKey - } - if !skipStoreSharedKey { - v, err := metadata.NewValue(sharedKey) - if err != nil { - return nil, err - } - v.Index = sharedKey - if err := si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, sharedKey, v) - }); err != nil { + md := cacheRefMetadata{mutable} + if md.getSharedKey() != sharedKey { + if err := md.setSharedKey(sharedKey); err != nil { return nil, err } bklog.G(ctx).Debugf("saved %s as %s", mutable.ID(), sharedKey) @@ -282,3 +259,30 @@ func (cu *cacheUpdater) MarkSupported(bool) { func (cu *cacheUpdater) ContentHasher() fsutil.ContentHasher { return contenthash.NewFromStat } + +const keySharedKey = "local.sharedKey" +const sharedKeyIndex = keySharedKey + ":" + +func searchSharedKey(ctx context.Context, store cache.MetadataStore, k string) ([]cacheRefMetadata, error) { + var results []cacheRefMetadata + mds, err := store.Search(ctx, sharedKeyIndex+k) + if err != nil { + return nil, err + } + for _, md := range mds { + results = append(results, cacheRefMetadata{md}) + } + return results, nil +} + +type cacheRefMetadata struct { + cache.RefMetadata +} + +func (md cacheRefMetadata) getSharedKey() string { + return md.GetString(keySharedKey) +} + +func (md cacheRefMetadata) setSharedKey(key string) error { + return md.SetString(keySharedKey, key, sharedKeyIndex+key) +} diff --git a/worker/base/worker.go b/worker/base/worker.go index e60dace602da..d20fd4b36813 100644 --- a/worker/base/worker.go +++ b/worker/base/worker.go @@ -6,7 +6,6 @@ import ( "io/ioutil" "os" "path/filepath" - "strings" "time" "github.com/containerd/containerd/content" @@ -49,7 +48,6 @@ import ( digest "github.com/opencontainers/go-digest" ocispecs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" - bolt "go.etcd.io/bbolt" "golang.org/x/sync/semaphore" ) @@ -64,7 +62,6 @@ type WorkerOpt struct { Labels map[string]string Platforms []ocispecs.Platform GCPolicy []client.PruneInfo - MetadataStore *metadata.Store Executor executor.Executor Snapshotter snapshot.Snapshotter ContentStore content.Store @@ -76,6 +73,7 @@ type WorkerOpt struct { LeaseManager leases.Manager GarbageCollect func(context.Context) (gc.Stats, error) ParallelismSem *semaphore.Weighted + MetadataStore *metadata.Store } // Worker is a local worker instance with dedicated snapshotter, cache, and so on. @@ -97,13 +95,13 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) { cm, err := cache.NewManager(cache.ManagerOpt{ Snapshotter: opt.Snapshotter, - MetadataStore: opt.MetadataStore, PruneRefChecker: imageRefChecker, Applier: opt.Applier, GarbageCollect: opt.GarbageCollect, LeaseManager: opt.LeaseManager, ContentStore: opt.ContentStore, Differ: opt.Differ, + MetadataStore: opt.MetadataStore, }) if err != nil { return nil, err @@ -132,7 +130,6 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) { if err := git.Supported(); err == nil { gs, err := git.NewSource(git.Opt{ CacheAccessor: cm, - MetadataStore: opt.MetadataStore, }) if err != nil { return nil, err @@ -144,7 +141,6 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) { hs, err := http.NewSource(http.Opt{ CacheAccessor: cm, - MetadataStore: opt.MetadataStore, }) if err != nil { return nil, err @@ -154,7 +150,6 @@ func NewWorker(ctx context.Context, opt WorkerOpt) (*Worker, error) { ss, err := local.NewSource(local.Opt{ CacheAccessor: cm, - MetadataStore: opt.MetadataStore, }) if err != nil { return nil, err @@ -255,19 +250,15 @@ func (w *Worker) CacheManager() cache.Manager { return w.CacheMgr } -func (w *Worker) MetadataStore() *metadata.Store { - return w.WorkerOpt.MetadataStore -} - func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error) { if baseOp, ok := v.Sys().(*pb.Op); ok { switch op := baseOp.Op.(type) { case *pb.Op_Source: return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, w.ParallelismSem, sm, w) case *pb.Op_Exec: - return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, w.ParallelismSem, sm, w.WorkerOpt.MetadataStore, w.WorkerOpt.Executor, w) + return ops.NewExecOp(v, op, baseOp.Platform, w.CacheMgr, w.ParallelismSem, sm, w.WorkerOpt.Executor, w) case *pb.Op_File: - return ops.NewFileOp(v, op, w.CacheMgr, w.ParallelismSem, w.WorkerOpt.MetadataStore, w) + return ops.NewFileOp(v, op, w.CacheMgr, w.ParallelismSem, w) case *pb.Op_Build: return ops.NewBuildOp(v, op, s, w) default: @@ -283,38 +274,20 @@ func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error { defer mu.Unlock() for _, id := range ids { - id = "cache-dir:" + id - sis, err := w.WorkerOpt.MetadataStore.Search(id) + mds, err := mounts.SearchCacheDir(ctx, w.CacheMgr, id) if err != nil { return err } - for _, si := range sis { - for _, k := range si.Indexes() { - if k == id || strings.HasPrefix(k, id+":") { - siOrig := si - if siCached := w.CacheMgr.Metadata(si.ID()); siCached != nil { - si = siCached - } - if si.Get(k) == nil { - si.Update(func(b *bolt.Bucket) error { - return si.SetValue(b, k, siOrig.Get(k)) - }) - } - if err := cache.CachePolicyDefault(si); err != nil { - return err - } - si.Queue(func(b *bolt.Bucket) error { - return si.SetValue(b, k, nil) - }) - if err := si.Commit(); err != nil { - return err - } - // if ref is unused try to clean it up right away by releasing it - if mref, err := w.CacheMgr.GetMutable(ctx, si.ID()); err == nil { - go mref.Release(context.TODO()) - } - break - } + for _, md := range mds { + if err := md.SetCachePolicyDefault(); err != nil { + return err + } + if err := md.ClearCacheDirIndex(); err != nil { + return err + } + // if ref is unused try to clean it up right away by releasing it + if mref, err := w.CacheMgr.GetMutable(ctx, md.ID()); err == nil { + go mref.Release(context.TODO()) } } } diff --git a/worker/cacheresult.go b/worker/cacheresult.go index 54b445507ff2..bdb757358089 100644 --- a/worker/cacheresult.go +++ b/worker/cacheresult.go @@ -5,7 +5,6 @@ import ( "strings" "time" - "github.com/moby/buildkit/cache" "github.com/moby/buildkit/session" "github.com/moby/buildkit/solver" "github.com/moby/buildkit/util/compression" @@ -28,11 +27,10 @@ func (s *cacheResultStorage) Save(res solver.Result, createdAt time.Time) (solve return solver.CacheResult{}, errors.Errorf("invalid result: %T", res.Sys()) } if ref.ImmutableRef != nil { - if !cache.HasCachePolicyRetain(ref.ImmutableRef) { - if err := cache.CachePolicyRetain(ref.ImmutableRef); err != nil { + if !ref.ImmutableRef.HasCachePolicyRetain() { + if err := ref.ImmutableRef.SetCachePolicyRetain(); err != nil { return solver.CacheResult{}, err } - ref.ImmutableRef.Metadata().Commit() } } return solver.CacheResult{ID: ref.ID(), CreatedAt: createdAt}, nil diff --git a/worker/containerd/containerd.go b/worker/containerd/containerd.go index 2a3a3415a296..58a3e1a79091 100644 --- a/worker/containerd/containerd.go +++ b/worker/containerd/containerd.go @@ -102,7 +102,14 @@ func newContainerd(root string, client *containerd.Client, snapshotterName, ns s snap := containerdsnapshot.NewSnapshotter(snapshotterName, client.SnapshotService(snapshotterName), ns, nil) - if err := cache.MigrateV2(context.TODO(), filepath.Join(root, "metadata.db"), filepath.Join(root, "metadata_v2.db"), cs, snap, lm); err != nil { + if err := cache.MigrateV2( + context.TODO(), + filepath.Join(root, "metadata.db"), + filepath.Join(root, "metadata_v2.db"), + cs, + snap, + lm, + ); err != nil { return base.WorkerOpt{}, err } diff --git a/worker/runc/runc.go b/worker/runc/runc.go index 4af74efdffe5..8611ed26c180 100644 --- a/worker/runc/runc.go +++ b/worker/runc/runc.go @@ -103,7 +103,15 @@ func NewWorkerOpt(root string, snFactory SnapshotterFactory, rootless bool, proc } snap := containerdsnapshot.NewSnapshotter(snFactory.Name, mdb.Snapshotter(snFactory.Name), "buildkit", idmap) lm := leaseutil.WithNamespace(ctdmetadata.NewLeaseManager(mdb), "buildkit") - if err := cache.MigrateV2(context.TODO(), filepath.Join(root, "metadata.db"), filepath.Join(root, "metadata_v2.db"), c, snap, lm); err != nil { + + if err := cache.MigrateV2( + context.TODO(), + filepath.Join(root, "metadata.db"), + filepath.Join(root, "metadata_v2.db"), + c, + snap, + lm, + ); err != nil { return opt, err } diff --git a/worker/worker.go b/worker/worker.go index a33d21b5a42f..4ec4070d79c0 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -5,7 +5,6 @@ import ( "github.com/containerd/containerd/content" "github.com/moby/buildkit/cache" - "github.com/moby/buildkit/cache/metadata" "github.com/moby/buildkit/client" "github.com/moby/buildkit/client/llb" "github.com/moby/buildkit/executor" @@ -36,7 +35,6 @@ type Worker interface { ContentStore() content.Store Executor() executor.Executor CacheManager() cache.Manager - MetadataStore() *metadata.Store } type Infos interface {