Skip to content

Commit

Permalink
Merge pull request #2291 from mtrmac/random-cleanups2
Browse files Browse the repository at this point in the history
Improve storage transport documentation, primarily about locking
  • Loading branch information
mtrmac authored Feb 13, 2024
2 parents a83dbea + 29287a3 commit cf0ca66
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 59 deletions.
107 changes: 59 additions & 48 deletions storage/storage_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,23 @@ type storageImageDestination struct {
signatureses map[digest.Digest][]byte // Instance signature contents, temporary
metadata storageImageMetadata // Metadata contents being built

// A storage destination may be used concurrently. Accesses are
// serialized via a mutex. Please refer to the individual comments
// below for details.
lock sync.Mutex
// Mapping from layer (by index) to the associated ID in the storage.
// It's protected *implicitly* since `commitLayer()`, at any given
// time, can only be executed by *one* goroutine. Please refer to
// `queueOrCommit()` for further details on how the single-caller
// guarantee is implemented.
indexToStorageID map[int]string
// All accesses to below data are, during the concurrent TryReusingBlob/PutBlob/* calls
// (but not necessarily during the final Commit) protected by `lock` which is made
// *explicit* in the code.

// A storage destination may be used concurrently, due to HasThreadSafePutBlob.
lock sync.Mutex // Protects lockProtected
lockProtected storageImageDestinationLockProtected
}

// storageImageDestinationLockProtected contains storageImageDestination data which might be
// accessed concurrently, due to HasThreadSafePutBlob.
// _During the concurrent TryReusingBlob/PutBlob/* calls_ (but not necessarily during the final Commit)
// uses must hold storageImageDestination.lock.
type storageImageDestinationLockProtected struct {
uncompressedOrTocDigest map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs or TOC IDs.
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
Expand Down Expand Up @@ -117,20 +121,22 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (*
HasThreadSafePutBlob: true,
}),

imageRef: imageRef,
directory: directory,
signatureses: make(map[digest.Digest][]byte),
uncompressedOrTocDigest: make(map[digest.Digest]digest.Digest),
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
imageRef: imageRef,
directory: directory,
signatureses: make(map[digest.Digest][]byte),
metadata: storageImageMetadata{
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
},
indexToStorageID: make(map[int]string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
indexToStorageID: make(map[int]string),
lockProtected: storageImageDestinationLockProtected{
uncompressedOrTocDigest: make(map[digest.Digest]digest.Digest),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
},
}
dest.Compat = impl.AddCompat(dest)
return dest, nil
Expand All @@ -144,10 +150,11 @@ func (s *storageImageDestination) Reference() types.ImageReference {

// Close cleans up the temporary directory and additional layer store handlers.
func (s *storageImageDestination) Close() error {
for _, al := range s.blobAdditionalLayer {
// This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock.
for _, al := range s.lockProtected.blobAdditionalLayer {
al.Release()
}
for _, v := range s.diffOutputs {
for _, v := range s.lockProtected.diffOutputs {
if v.Target != "" {
_ = s.imageRef.transport.store.CleanupStagingDirectory(v.Target)
}
Expand Down Expand Up @@ -229,9 +236,9 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf

// Record information about the blob.
s.lock.Lock()
s.uncompressedOrTocDigest[blobDigest] = diffID.Digest()
s.fileSizes[blobDigest] = counter.Count
s.filenames[blobDigest] = filename
s.lockProtected.uncompressedOrTocDigest[blobDigest] = diffID.Digest()
s.lockProtected.fileSizes[blobDigest] = counter.Count
s.lockProtected.filenames[blobDigest] = filename
s.lock.Unlock()
// This is safe because we have just computed diffID, and blobDigest was either computed
// by us, or validated by the caller (usually copy.digestingReader).
Expand Down Expand Up @@ -291,10 +298,10 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces
blobDigest := srcInfo.Digest

s.lock.Lock()
s.uncompressedOrTocDigest[blobDigest] = blobDigest
s.fileSizes[blobDigest] = 0
s.filenames[blobDigest] = ""
s.diffOutputs[blobDigest] = out
s.lockProtected.uncompressedOrTocDigest[blobDigest] = blobDigest
s.lockProtected.fileSizes[blobDigest] = 0
s.lockProtected.filenames[blobDigest] = ""
s.lockProtected.diffOutputs[blobDigest] = out
s.lock.Unlock()

return private.UploadedBlob{
Expand Down Expand Up @@ -337,8 +344,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q and labels: %w`, digest, err)
} else if err == nil {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.uncompressedOrTocDigest[digest] = aLayer.UncompressedDigest()
s.blobAdditionalLayer[digest] = aLayer
s.lockProtected.uncompressedOrTocDigest[digest] = aLayer.UncompressedDigest()
s.lockProtected.blobAdditionalLayer[digest] = aLayer
return true, private.ReusedBlob{
Digest: digest,
Size: aLayer.CompressedSize(),
Expand All @@ -354,7 +361,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}

// Check if we've already cached it in a file.
if size, ok := s.fileSizes[digest]; ok {
if size, ok := s.lockProtected.fileSizes[digest]; ok {
return true, private.ReusedBlob{
Digest: digest,
Size: size,
Expand All @@ -368,7 +375,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
// Save this for completeness.
s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: digest,
Size: layers[0].UncompressedSize,
Expand All @@ -382,7 +389,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: digest,
Size: layers[0].CompressedSize,
Expand All @@ -400,7 +407,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
if size != -1 {
s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: digest,
Size: size,
Expand All @@ -409,7 +416,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
if !options.CanSubstitute {
return false, private.ReusedBlob{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blob with digest %s", digest)
}
s.uncompressedOrTocDigest[uncompressedDigest] = layers[0].UncompressedDigest
s.lockProtected.uncompressedOrTocDigest[uncompressedDigest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: uncompressedDigest,
Size: layers[0].UncompressedSize,
Expand All @@ -430,7 +437,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
// Save this for completeness.
s.uncompressedOrTocDigest[digest] = layers[0].TOCDigest
s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].TOCDigest
return true, private.ReusedBlob{
Digest: layers[0].TOCDigest,
Size: layers[0].UncompressedSize,
Expand All @@ -446,6 +453,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
// that since we don't have a recommendation, a random ID should be used if one needs
// to be allocated.
func (s *storageImageDestination) computeID(m manifest.Manifest) string {
// This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock.

// Build the diffID list. We need the decompressed sums that we've been calculating to
// fill in the DiffIDs. It's expected (but not enforced by us) that the number of
// diffIDs corresponds to the number of non-EmptyLayer entries in the history.
Expand All @@ -459,7 +468,7 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string {
continue
}
blobSum := m.FSLayers[i].BlobSum
diffID, ok := s.uncompressedOrTocDigest[blobSum]
diffID, ok := s.lockProtected.uncompressedOrTocDigest[blobSum]
if !ok {
logrus.Infof("error looking up diffID for layer %q", blobSum.String())
return ""
Expand Down Expand Up @@ -493,7 +502,7 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er
return nil, fmt.Errorf("invalid digest supplied when reading blob: %w", err)
}
// Assume it's a file, since we're only calling this from a place that expects to read files.
if filename, ok := s.filenames[info.Digest]; ok {
if filename, ok := s.lockProtected.filenames[info.Digest]; ok {
contents, err2 := os.ReadFile(filename)
if err2 != nil {
return nil, fmt.Errorf(`reading blob from file %q: %w`, filename, err2)
Expand Down Expand Up @@ -527,17 +536,17 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)
// caller is the "worker" routine committing layers. All other routines
// can continue pulling and queuing in layers.
s.lock.Lock()
s.indexToAddedLayerInfo[index] = info
s.lockProtected.indexToAddedLayerInfo[index] = info

// We're still waiting for at least one previous/parent layer to be
// committed, so there's nothing to do.
if index != s.currentIndex {
if index != s.lockProtected.currentIndex {
s.lock.Unlock()
return nil
}

for {
info, ok := s.indexToAddedLayerInfo[index]
info, ok := s.lockProtected.indexToAddedLayerInfo[index]
if !ok {
break
}
Expand All @@ -552,7 +561,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)

// Set the index at the very end to make sure that only one routine
// enters stage 2).
s.currentIndex = index
s.lockProtected.currentIndex = index
s.lock.Unlock()
return nil
}
Expand All @@ -562,10 +571,10 @@ func (s *storageImageDestination) getDiffIDOrTOCDigest(uncompressedDigest digest
s.lock.Lock()
defer s.lock.Unlock()

if d, found := s.diffOutputs[uncompressedDigest]; found {
if d, found := s.lockProtected.diffOutputs[uncompressedDigest]; found {
return d.TOCDigest, found
}
d, found := s.uncompressedOrTocDigest[uncompressedDigest]
d, found := s.lockProtected.uncompressedOrTocDigest[uncompressedDigest]
return d, found
}

Expand Down Expand Up @@ -642,7 +651,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}

s.lock.Lock()
diffOutput, ok := s.diffOutputs[info.digest]
diffOutput, ok := s.lockProtected.diffOutputs[info.digest]
s.lock.Unlock()
if ok {
if s.manifest == nil {
Expand Down Expand Up @@ -692,7 +701,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}

s.lock.Lock()
al, ok := s.blobAdditionalLayer[info.digest]
al, ok := s.lockProtected.blobAdditionalLayer[info.digest]
s.lock.Unlock()
if ok {
layer, err := al.PutAs(id, parentLayer, nil)
Expand All @@ -706,7 +715,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Check if we previously cached a file with that blob's contents. If we didn't,
// then we need to read the desired contents from a layer.
s.lock.Lock()
filename, ok := s.filenames[info.digest]
filename, ok := s.lockProtected.filenames[info.digest]
s.lock.Unlock()
if !ok {
// Try to find the layer with contents matching that blobsum.
Expand Down Expand Up @@ -754,7 +763,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Make sure that we can find this file later, should we need the layer's
// contents again.
s.lock.Lock()
s.filenames[info.digest] = filename
s.lockProtected.filenames[info.digest] = filename
s.lock.Unlock()
}
// Read the cached blob and use it as a diff.
Expand Down Expand Up @@ -785,6 +794,8 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// - Uploaded data MAY be visible to others before Commit() is called
// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed)
func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel types.UnparsedImage) error {
// This function is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock.

if len(s.manifest) == 0 {
return errors.New("Internal error: storageImageDestination.Commit() called without PutManifest()")
}
Expand Down Expand Up @@ -850,14 +861,14 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
// Set up to save the non-layer blobs as data items. Since we only share layers, they should all be in files, so
// we just need to screen out the ones that are actually layers to get the list of non-layers.
dataBlobs := set.New[digest.Digest]()
for blob := range s.filenames {
for blob := range s.lockProtected.filenames {
dataBlobs.Add(blob)
}
for _, layerBlob := range layerBlobs {
dataBlobs.Delete(layerBlob.Digest)
}
for _, blob := range dataBlobs.Values() {
v, err := os.ReadFile(s.filenames[blob])
v, err := os.ReadFile(s.lockProtected.filenames[blob])
if err != nil {
return fmt.Errorf("copying non-layer blob %q to image: %w", blob, err)
}
Expand Down
25 changes: 14 additions & 11 deletions storage/storage_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@ import (
"github.com/sirupsen/logrus"
)

// getBlobMutexProtected is a struct to hold the state of the getBlobMutex mutex.
type getBlobMutexProtected struct {
// digestToLayerID is a lookup map from the layer digest (either the uncompressed digest or the TOC digest) to the
// layer ID in the store.
digestToLayerID map[digest.Digest]string

// layerPosition stores where we are in reading a blob's layers
layerPosition map[digest.Digest]int
}

type storageImageSource struct {
impl.Compat
impl.PropertyMethodsInitialize
Expand All @@ -49,10 +39,23 @@ type storageImageSource struct {
systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files
metadata storageImageMetadata
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID)
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions
getBlobMutexProtected getBlobMutexProtected
}

// getBlobMutexProtected contains storageImageSource data protected by getBlobMutex.
type getBlobMutexProtected struct {
// digestToLayerID is a lookup map from a possibly-untrusted uncompressed layer digest (as returned by LayerInfosForCopy) to the
// layer ID in the store.
digestToLayerID map[digest.Digest]string

// layerPosition stores where we are in reading a blob's layers
layerPosition map[digest.Digest]int
}

// expectedLayerDiffIDFlag is a per-layer flag containing an UNTRUSTED uncompressed digest of the layer.
// It is set when pulling a layer by TOC; later, this value is used with digestToLayerID
// to allow identifying the layer — and the consumer is expected to verify the blob returned by GetBlob against the digest.
const expectedLayerDiffIDFlag = "expected-layer-diffid"

// newImageSource sets up an image for reading.
Expand Down

0 comments on commit cf0ca66

Please sign in to comment.