Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve storage transport documentation, primarily about locking #2291

Merged
merged 4 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 59 additions & 48 deletions storage/storage_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,23 @@ type storageImageDestination struct {
signatureses map[digest.Digest][]byte // Instance signature contents, temporary
metadata storageImageMetadata // Metadata contents being built

// A storage destination may be used concurrently. Accesses are
// serialized via a mutex. Please refer to the individual comments
// below for details.
lock sync.Mutex
// Mapping from layer (by index) to the associated ID in the storage.
// It's protected *implicitly* since `commitLayer()`, at any given
// time, can only be executed by *one* goroutine. Please refer to
// `queueOrCommit()` for further details on how the single-caller
// guarantee is implemented.
indexToStorageID map[int]string
// All accesses to below data are, during the concurrent TryReusingBlob/PutBlob/* calls
// (but not necessarily during the final Commit) protected by `lock` which is made
// *explicit* in the code.

// A storage destination may be used concurrently, due to HasThreadSafePutBlob.
lock sync.Mutex // Protects lockProtected
lockProtected storageImageDestinationLockProtected
}

// storageImageDestinationLockProtected contains storageImageDestination data which might be
// accessed concurrently, due to HasThreadSafePutBlob.
// _During the concurrent TryReusingBlob/PutBlob/* calls_ (but not necessarily during the final Commit)
// uses must hold storageImageDestination.lock.
type storageImageDestinationLockProtected struct {
uncompressedOrTocDigest map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs or TOC IDs.
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
Expand Down Expand Up @@ -117,20 +121,22 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (*
HasThreadSafePutBlob: true,
}),

imageRef: imageRef,
directory: directory,
signatureses: make(map[digest.Digest][]byte),
uncompressedOrTocDigest: make(map[digest.Digest]digest.Digest),
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
imageRef: imageRef,
directory: directory,
signatureses: make(map[digest.Digest][]byte),
metadata: storageImageMetadata{
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
},
indexToStorageID: make(map[int]string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
indexToStorageID: make(map[int]string),
lockProtected: storageImageDestinationLockProtected{
uncompressedOrTocDigest: make(map[digest.Digest]digest.Digest),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
},
}
dest.Compat = impl.AddCompat(dest)
return dest, nil
Expand All @@ -144,10 +150,11 @@ func (s *storageImageDestination) Reference() types.ImageReference {

// Close cleans up the temporary directory and additional layer store handlers.
func (s *storageImageDestination) Close() error {
for _, al := range s.blobAdditionalLayer {
// This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock.
for _, al := range s.lockProtected.blobAdditionalLayer {
al.Release()
}
for _, v := range s.diffOutputs {
for _, v := range s.lockProtected.diffOutputs {
if v.Target != "" {
_ = s.imageRef.transport.store.CleanupStagingDirectory(v.Target)
}
Expand Down Expand Up @@ -229,9 +236,9 @@ func (s *storageImageDestination) putBlobToPendingFile(stream io.Reader, blobinf

// Record information about the blob.
s.lock.Lock()
s.uncompressedOrTocDigest[blobDigest] = diffID.Digest()
s.fileSizes[blobDigest] = counter.Count
s.filenames[blobDigest] = filename
s.lockProtected.uncompressedOrTocDigest[blobDigest] = diffID.Digest()
s.lockProtected.fileSizes[blobDigest] = counter.Count
s.lockProtected.filenames[blobDigest] = filename
s.lock.Unlock()
// This is safe because we have just computed diffID, and blobDigest was either computed
// by us, or validated by the caller (usually copy.digestingReader).
Expand Down Expand Up @@ -291,10 +298,10 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces
blobDigest := srcInfo.Digest

s.lock.Lock()
s.uncompressedOrTocDigest[blobDigest] = blobDigest
s.fileSizes[blobDigest] = 0
s.filenames[blobDigest] = ""
s.diffOutputs[blobDigest] = out
s.lockProtected.uncompressedOrTocDigest[blobDigest] = blobDigest
s.lockProtected.fileSizes[blobDigest] = 0
s.lockProtected.filenames[blobDigest] = ""
s.lockProtected.diffOutputs[blobDigest] = out
s.lock.Unlock()

return private.UploadedBlob{
Expand Down Expand Up @@ -337,8 +344,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
return false, private.ReusedBlob{}, fmt.Errorf(`looking for compressed layers with digest %q and labels: %w`, digest, err)
} else if err == nil {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.uncompressedOrTocDigest[digest] = aLayer.UncompressedDigest()
s.blobAdditionalLayer[digest] = aLayer
s.lockProtected.uncompressedOrTocDigest[digest] = aLayer.UncompressedDigest()
s.lockProtected.blobAdditionalLayer[digest] = aLayer
return true, private.ReusedBlob{
Digest: digest,
Size: aLayer.CompressedSize(),
Expand All @@ -354,7 +361,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}

// Check if we've already cached it in a file.
if size, ok := s.fileSizes[digest]; ok {
if size, ok := s.lockProtected.fileSizes[digest]; ok {
return true, private.ReusedBlob{
Digest: digest,
Size: size,
Expand All @@ -368,7 +375,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
// Save this for completeness.
s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: digest,
Size: layers[0].UncompressedSize,
Expand All @@ -382,7 +389,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: digest,
Size: layers[0].CompressedSize,
Expand All @@ -400,7 +407,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
if size != -1 {
s.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: digest,
Size: size,
Expand All @@ -409,7 +416,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
if !options.CanSubstitute {
return false, private.ReusedBlob{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blob with digest %s", digest)
}
s.uncompressedOrTocDigest[uncompressedDigest] = layers[0].UncompressedDigest
s.lockProtected.uncompressedOrTocDigest[uncompressedDigest] = layers[0].UncompressedDigest
return true, private.ReusedBlob{
Digest: uncompressedDigest,
Size: layers[0].UncompressedSize,
Expand All @@ -430,7 +437,7 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
}
if len(layers) > 0 {
// Save this for completeness.
s.uncompressedOrTocDigest[digest] = layers[0].TOCDigest
s.lockProtected.uncompressedOrTocDigest[digest] = layers[0].TOCDigest
return true, private.ReusedBlob{
Digest: layers[0].TOCDigest,
Size: layers[0].UncompressedSize,
Expand All @@ -446,6 +453,8 @@ func (s *storageImageDestination) tryReusingBlobAsPending(digest digest.Digest,
// that since we don't have a recommendation, a random ID should be used if one needs
// to be allocated.
func (s *storageImageDestination) computeID(m manifest.Manifest) string {
// This is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock.

// Build the diffID list. We need the decompressed sums that we've been calculating to
// fill in the DiffIDs. It's expected (but not enforced by us) that the number of
// diffIDs corresponds to the number of non-EmptyLayer entries in the history.
Expand All @@ -459,7 +468,7 @@ func (s *storageImageDestination) computeID(m manifest.Manifest) string {
continue
}
blobSum := m.FSLayers[i].BlobSum
diffID, ok := s.uncompressedOrTocDigest[blobSum]
diffID, ok := s.lockProtected.uncompressedOrTocDigest[blobSum]
if !ok {
logrus.Infof("error looking up diffID for layer %q", blobSum.String())
return ""
Expand Down Expand Up @@ -493,7 +502,7 @@ func (s *storageImageDestination) getConfigBlob(info types.BlobInfo) ([]byte, er
return nil, fmt.Errorf("invalid digest supplied when reading blob: %w", err)
}
// Assume it's a file, since we're only calling this from a place that expects to read files.
if filename, ok := s.filenames[info.Digest]; ok {
if filename, ok := s.lockProtected.filenames[info.Digest]; ok {
contents, err2 := os.ReadFile(filename)
if err2 != nil {
return nil, fmt.Errorf(`reading blob from file %q: %w`, filename, err2)
Expand Down Expand Up @@ -527,17 +536,17 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)
// caller is the "worker" routine committing layers. All other routines
// can continue pulling and queuing in layers.
s.lock.Lock()
s.indexToAddedLayerInfo[index] = info
s.lockProtected.indexToAddedLayerInfo[index] = info

// We're still waiting for at least one previous/parent layer to be
// committed, so there's nothing to do.
if index != s.currentIndex {
if index != s.lockProtected.currentIndex {
s.lock.Unlock()
return nil
}

for {
info, ok := s.indexToAddedLayerInfo[index]
info, ok := s.lockProtected.indexToAddedLayerInfo[index]
if !ok {
break
}
Expand All @@ -552,7 +561,7 @@ func (s *storageImageDestination) queueOrCommit(index int, info addedLayerInfo)

// Set the index at the very end to make sure that only one routine
// enters stage 2).
s.currentIndex = index
s.lockProtected.currentIndex = index
s.lock.Unlock()
return nil
}
Expand All @@ -562,10 +571,10 @@ func (s *storageImageDestination) getDiffIDOrTOCDigest(uncompressedDigest digest
s.lock.Lock()
defer s.lock.Unlock()

if d, found := s.diffOutputs[uncompressedDigest]; found {
if d, found := s.lockProtected.diffOutputs[uncompressedDigest]; found {
return d.TOCDigest, found
}
d, found := s.uncompressedOrTocDigest[uncompressedDigest]
d, found := s.lockProtected.uncompressedOrTocDigest[uncompressedDigest]
return d, found
}

Expand Down Expand Up @@ -642,7 +651,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}

s.lock.Lock()
diffOutput, ok := s.diffOutputs[info.digest]
diffOutput, ok := s.lockProtected.diffOutputs[info.digest]
s.lock.Unlock()
if ok {
if s.manifest == nil {
Expand Down Expand Up @@ -692,7 +701,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}

s.lock.Lock()
al, ok := s.blobAdditionalLayer[info.digest]
al, ok := s.lockProtected.blobAdditionalLayer[info.digest]
s.lock.Unlock()
if ok {
layer, err := al.PutAs(id, parentLayer, nil)
Expand All @@ -706,7 +715,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Check if we previously cached a file with that blob's contents. If we didn't,
// then we need to read the desired contents from a layer.
s.lock.Lock()
filename, ok := s.filenames[info.digest]
filename, ok := s.lockProtected.filenames[info.digest]
s.lock.Unlock()
if !ok {
// Try to find the layer with contents matching that blobsum.
Expand Down Expand Up @@ -754,7 +763,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Make sure that we can find this file later, should we need the layer's
// contents again.
s.lock.Lock()
s.filenames[info.digest] = filename
s.lockProtected.filenames[info.digest] = filename
s.lock.Unlock()
}
// Read the cached blob and use it as a diff.
Expand Down Expand Up @@ -785,6 +794,8 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// - Uploaded data MAY be visible to others before Commit() is called
// - Uploaded data MAY be removed or MAY remain around if Close() is called without Commit() (i.e. rollback is allowed but not guaranteed)
func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel types.UnparsedImage) error {
// This function is outside of the scope of HasThreadSafePutBlob, so we don’t need to hold s.lock.

if len(s.manifest) == 0 {
return errors.New("Internal error: storageImageDestination.Commit() called without PutManifest()")
}
Expand Down Expand Up @@ -850,14 +861,14 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
// Set up to save the non-layer blobs as data items. Since we only share layers, they should all be in files, so
// we just need to screen out the ones that are actually layers to get the list of non-layers.
dataBlobs := set.New[digest.Digest]()
for blob := range s.filenames {
for blob := range s.lockProtected.filenames {
dataBlobs.Add(blob)
}
for _, layerBlob := range layerBlobs {
dataBlobs.Delete(layerBlob.Digest)
}
for _, blob := range dataBlobs.Values() {
v, err := os.ReadFile(s.filenames[blob])
v, err := os.ReadFile(s.lockProtected.filenames[blob])
if err != nil {
return fmt.Errorf("copying non-layer blob %q to image: %w", blob, err)
}
Expand Down
25 changes: 14 additions & 11 deletions storage/storage_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,6 @@ import (
"github.com/sirupsen/logrus"
)

// getBlobMutexProtected is a struct to hold the state of the getBlobMutex mutex.
type getBlobMutexProtected struct {
// digestToLayerID is a lookup map from the layer digest (either the uncompressed digest or the TOC digest) to the
// layer ID in the store.
digestToLayerID map[digest.Digest]string

// layerPosition stores where we are in reading a blob's layers
layerPosition map[digest.Digest]int
}

type storageImageSource struct {
impl.Compat
impl.PropertyMethodsInitialize
Expand All @@ -49,10 +39,23 @@ type storageImageSource struct {
systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files
metadata storageImageMetadata
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID)
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions
getBlobMutexProtected getBlobMutexProtected
}

// getBlobMutexProtected contains storageImageSource data protected by getBlobMutex.
type getBlobMutexProtected struct {
// digestToLayerID is a lookup map from a possibly-untrusted uncompressed layer digest (as returned by LayerInfosForCopy) to the
// layer ID in the store.
digestToLayerID map[digest.Digest]string

// layerPosition stores where we are in reading a blob's layers
layerPosition map[digest.Digest]int
}

// expectedLayerDiffIDFlag is a per-layer flag containing an UNTRUSTED uncompressed digest of the layer.
// It is set when pulling a layer by TOC; later, this value is used with digestToLayerID
// to allow identifying the layer — and the consumer is expected to verify the blob returned by GetBlob against the digest.
const expectedLayerDiffIDFlag = "expected-layer-diffid"

// newImageSource sets up an image for reading.
Expand Down