diff --git a/storage/storage_dest.go b/storage/storage_dest.go index 88e492b743..9fa7f4a86a 100644 --- a/storage/storage_dest.go +++ b/storage/storage_dest.go @@ -64,19 +64,23 @@ type storageImageDestination struct { signatureses map[digest.Digest][]byte // Instance signature contents, temporary metadata storageImageMetadata // Metadata contents being built - // A storage destination may be used concurrently. Accesses are - // serialized via a mutex. Please refer to the individual comments - // below for details. - lock sync.Mutex // Mapping from layer (by index) to the associated ID in the storage. // It's protected *implicitly* since `commitLayer()`, at any given // time, can only be executed by *one* goroutine. Please refer to // `queueOrCommit()` for further details on how the single-caller // guarantee is implemented. indexToStorageID map[int]string - // All accesses to below data are, during the concurrent TryReusingBlob/PutBlob/* calls - // (but not necessarily during the final Commit) protected by `lock` which is made - // *explicit* in the code. + + // A storage destination may be used concurrently, due to HasThreadSafePutBlob. + lock sync.Mutex // Protects lockProtected + lockProtected storageImageDestinationLockProtected +} + +// storageImageDestinationLockProtected contains storageImageDestination data which might be +// accessed concurrently, due to HasThreadSafePutBlob. +// _During the concurrent TryReusingBlob/PutBlob/* calls_ (but not necessarily during the final Commit) +// uses must hold storageImageDestination.lock. +type storageImageDestinationLockProtected struct { uncompressedOrTocDigest map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs or TOC IDs. fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them @@ -117,20 +121,22 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (* HasThreadSafePutBlob: true, }), - imageRef: imageRef, - directory: directory, - signatureses: make(map[digest.Digest][]byte), - uncompressedOrTocDigest: make(map[digest.Digest]digest.Digest), - blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer), - fileSizes: make(map[digest.Digest]int64), - filenames: make(map[digest.Digest]string), + imageRef: imageRef, + directory: directory, + signatureses: make(map[digest.Digest][]byte), metadata: storageImageMetadata{ SignatureSizes: []int{}, SignaturesSizes: make(map[digest.Digest][]int), }, - indexToStorageID: make(map[int]string), - indexToAddedLayerInfo: make(map[int]addedLayerInfo), - diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput), + indexToStorageID: make(map[int]string), + lockProtected: storageImageDestinationLockProtected{ + uncompressedOrTocDigest: make(map[digest.Digest]digest.Digest), + fileSizes: make(map[digest.Digest]int64), + filenames: make(map[digest.Digest]string), + indexToAddedLayerInfo: make(map[int]addedLayerInfo), + blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer), + diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput), + }, } dest.Compat = impl.AddCompat(dest) return dest, nil @@ -144,10 +150,11 @@ func (s *storageImageDestination) Reference() types.ImageReference { // Close cleans up the temporary directory and additional layer store handlers. func (s *storageImageDestination) Close() error { - for _, al := range s.blobAdditionalLayer { + // This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock. + for _, al := range s.lockProtected.blobAdditionalLayer { al.Release() } - for _, v := range s.diffOutputs { + for _, v := range s.lockProtected.diffOutputs { if v.Target != "" { _ = s.imageRef.transport.store.CleanupStagingDirectory(v.Target) } @@ -229,9 +236,9 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf // Record information about the blob. s.lock.Lock() - s.uncompressedOrTocDigest[blobDigest] = diffID.Digest() - s.fileSizes[blobDigest] = counter.Count - s.filenames[blobDigest] = filename + s.lockProtected.uncompressedOrTocDigest[blobDigest] = diffID.Digest() + s.lockProtected.fileSizes[blobDigest] = counter.Count + s.lockProtected.filenames[blobDigest] = filename s.lock.Unlock() // This is safe because we have just computed diffID, and blobDigest was either computed // by us, or validated by the caller (usually copy.digestingReader). @@ -291,10 +298,10 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces blobDigest := srcInfo.Digest s.lock.Lock() - s.uncompressedOrTocDigest[blobDigest] = blobDigest - s.fileSizes[blobDigest] = 0 - s.filenames[blobDigest] = "" - s.diffOutputs[blobDigest] = out + s.lockProtected.uncompressedOrTocDigest[blobDigest] = blobDigest + s.lockProtected.fileSizes[blobDigest] = 0 + s.lockProtected.filenames[blobDigest] = "" + s.lockProtected.diffOutputs[blobDigest] = out s.lock.Unlock() return private.UploadedBlob{ @@ -337,8 +344,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q and labels: %w`, digest, err) } else if err == nil { // Record the uncompressed value so that we can use it to calculate layer IDs. - s.uncompressedOrTocDigest[digest] = aLayer.UncompressedDigest() - s.blobAdditionalLayer[digest] = aLayer + s.lockProtected.uncompressedOrTocDigest[digest] = aLayer.UncompressedDigest() + s.lockProtected.blobAdditionalLayer[digest] = aLayer return true, private.ReusedBlob{ Digest: digest, Size: aLayer.CompressedSize(), @@ -354,7 +361,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } // Check if we've already cached it in a file. - if size, ok := s.fileSizes[digest]; ok { + if size, ok := s.lockProtected.fileSizes[digest]; ok { return true, private.ReusedBlob{ Digest: digest, Size: size, @@ -368,7 +375,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } if len(layers) > 0 { // Save this for completeness. - s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest + s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: digest, Size: layers[0].UncompressedSize, @@ -382,7 +389,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } if len(layers) > 0 { // Record the uncompressed value so that we can use it to calculate layer IDs. - s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest + s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: digest, Size: layers[0].CompressedSize, @@ -400,7 +407,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } if len(layers) > 0 { if size != -1 { - s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest + s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: digest, Size: size, @@ -409,7 +416,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, if !options.CanSubstitute { return false, private.ReusedBlob{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blob with digest %s", digest) } - s.uncompressedOrTocDigest[uncompressedDigest] = layers[0].UncompressedDigest + s.lockProtected.uncompressedOrTocDigest[uncompressedDigest] = layers[0].UncompressedDigest return true, private.ReusedBlob{ Digest: uncompressedDigest, Size: layers[0].UncompressedSize, @@ -430,7 +437,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, } if len(layers) > 0 { // Save this for completeness. - s.uncompressedOrTocDigest[digest] = layers[0].TOCDigest + s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].TOCDigest return true, private.ReusedBlob{ Digest: layers[0].TOCDigest, Size: layers[0].UncompressedSize, @@ -446,6 +453,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest, // that since we don't have a recommendation, a random ID should be used if one needs // to be allocated. func (s *storageImageDestination) computeID(m manifest.Manifest) string { + // This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock. + // Build the diffID list. We need the decompressed sums that we've been calculating to // fill in the DiffIDs. It's expected (but not enforced by us) that the number of // diffIDs corresponds to the number of non-EmptyLayer entries in the history. @@ -459,7 +468,7 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string { continue } blobSum := m.FSLayers[i].BlobSum - diffID, ok := s.uncompressedOrTocDigest[blobSum] + diffID, ok := s.lockProtected.uncompressedOrTocDigest[blobSum] if !ok { logrus.Infof("error looking up diffID for layer %q", blobSum.String()) return "" @@ -493,7 +502,7 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er return nil, fmt.Errorf("invalid digest supplied when reading blob: %w", err) } // Assume it's a file, since we're only calling this from a place that expects to read files. - if filename, ok := s.filenames[info.Digest]; ok { + if filename, ok := s.lockProtected.filenames[info.Digest]; ok { contents, err2 := os.ReadFile(filename) if err2 != nil { return nil, fmt.Errorf(`reading blob from file %q: %w`, filename, err2) @@ -527,17 +536,17 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo) // caller is the "worker" routine committing layers. All other routines // can continue pulling and queuing in layers. s.lock.Lock() - s.indexToAddedLayerInfo[index] = info + s.lockProtected.indexToAddedLayerInfo[index] = info // We're still waiting for at least one previous/parent layer to be // committed, so there's nothing to do. - if index != s.currentIndex { + if index != s.lockProtected.currentIndex { s.lock.Unlock() return nil } for { - info, ok := s.indexToAddedLayerInfo[index] + info, ok := s.lockProtected.indexToAddedLayerInfo[index] if !ok { break } @@ -552,7 +561,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo) // Set the index at the very end to make sure that only one routine // enters stage 2). - s.currentIndex = index + s.lockProtected.currentIndex = index s.lock.Unlock() return nil } @@ -562,10 +571,10 @@ func (s *storageImageDestination) getDiffIDOrTOCDigest(uncompressedDigest digest s.lock.Lock() defer s.lock.Unlock() - if d, found := s.diffOutputs[uncompressedDigest]; found { + if d, found := s.lockProtected.diffOutputs[uncompressedDigest]; found { return d.TOCDigest, found } - d, found := s.uncompressedOrTocDigest[uncompressedDigest] + d, found := s.lockProtected.uncompressedOrTocDigest[uncompressedDigest] return d, found } @@ -642,7 +651,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si } s.lock.Lock() - diffOutput, ok := s.diffOutputs[info.digest] + diffOutput, ok := s.lockProtected.diffOutputs[info.digest] s.lock.Unlock() if ok { if s.manifest == nil { @@ -692,7 +701,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si } s.lock.Lock() - al, ok := s.blobAdditionalLayer[info.digest] + al, ok := s.lockProtected.blobAdditionalLayer[info.digest] s.lock.Unlock() if ok { layer, err := al.PutAs(id, parentLayer, nil) @@ -706,7 +715,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // Check if we previously cached a file with that blob's contents. If we didn't, // then we need to read the desired contents from a layer. s.lock.Lock() - filename, ok := s.filenames[info.digest] + filename, ok := s.lockProtected.filenames[info.digest] s.lock.Unlock() if !ok { // Try to find the layer with contents matching that blobsum. @@ -754,7 +763,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // Make sure that we can find this file later, should we need the layer's // contents again. s.lock.Lock() - s.filenames[info.digest] = filename + s.lockProtected.filenames[info.digest] = filename s.lock.Unlock() } // Read the cached blob and use it as a diff. @@ -785,6 +794,8 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si // - Uploaded data MAY be visible to others before Commit() is called // - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed) func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel types.UnparsedImage) error { + // This function is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock. + if len(s.manifest) == 0 { return errors.New("Internal error: storageImageDestination.Commit() called without PutManifest()") } @@ -850,14 +861,14 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t // Set up to save the non-layer blobs as data items. Since we only share layers, they should all be in files, so // we just need to screen out the ones that are actually layers to get the list of non-layers. dataBlobs := set.New[digest.Digest]() - for blob := range s.filenames { + for blob := range s.lockProtected.filenames { dataBlobs.Add(blob) } for _, layerBlob := range layerBlobs { dataBlobs.Delete(layerBlob.Digest) } for _, blob := range dataBlobs.Values() { - v, err := os.ReadFile(s.filenames[blob]) + v, err := os.ReadFile(s.lockProtected.filenames[blob]) if err != nil { return fmt.Errorf("copying non-layer blob %q to image: %w", blob, err) } diff --git a/storage/storage_src.go b/storage/storage_src.go index 36f11ef301..1b8fa07b5b 100644 --- a/storage/storage_src.go +++ b/storage/storage_src.go @@ -29,16 +29,6 @@ import ( "github.com/sirupsen/logrus" ) -// getBlobMutexProtected is a struct to hold the state of the getBlobMutex mutex. -type getBlobMutexProtected struct { - // digestToLayerID is a lookup map from the layer digest (either the uncompressed digest or the TOC digest) to the - // layer ID in the store. - digestToLayerID map[digest.Digest]string - - // layerPosition stores where we are in reading a blob's layers - layerPosition map[digest.Digest]int -} - type storageImageSource struct { impl.Compat impl.PropertyMethodsInitialize @@ -49,10 +39,23 @@ type storageImageSource struct { systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files metadata storageImageMetadata cachedManifest []byte // A cached copy of the manifest, if already known, or nil - getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID) + getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions getBlobMutexProtected getBlobMutexProtected } +// getBlobMutexProtected contains storageImageSource data protected by getBlobMutex. +type getBlobMutexProtected struct { + // digestToLayerID is a lookup map from a possibly-untrusted uncompressed layer digest (as returned by LayerInfosForCopy) to the + // layer ID in the store. + digestToLayerID map[digest.Digest]string + + // layerPosition stores where we are in reading a blob's layers + layerPosition map[digest.Digest]int +} + +// expectedLayerDiffIDFlag is a per-layer flag containing an UNTRUSTED uncompressed digest of the layer. +// It is set when pulling a layer by TOC; later, this value is used with digestToLayerID +// to allow identifying the layer — and the consumer is expected to verify the blob returned by GetBlob against the digest. const expectedLayerDiffIDFlag = "expected-layer-diffid" // newImageSource sets up an image for reading.