Skip to content

Commit

Permalink
Merge pull request #2287 from mtrmac/random-cleanups
Browse files Browse the repository at this point in the history
Random storage-related cleanups
  • Loading branch information
rhatdan authored Feb 8, 2024
2 parents 93b4b55 + e0e8045 commit 848cac5
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 57 deletions.
2 changes: 1 addition & 1 deletion oci/layout/oci_src_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestGetBlobForRemoteLayers(t *testing.T) {
imageSource := createImageSource(t, &types.SystemContext{})
defer imageSource.Close()
layerInfo := types.BlobInfo{
Digest: digest.FromBytes([]byte("Hello world")),
Digest: digest.FromString("Hello world"),
Size: -1,
URLs: []string{
"brokenurl",
Expand Down
80 changes: 42 additions & 38 deletions storage/storage_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,14 @@ type storageImageDestination struct {
stubs.ImplementsPutBlobPartial
stubs.AlwaysSupportsSignatures

imageRef storageReference
directory string // Temporary directory where we store blobs until Commit() time
nextTempFileID atomic.Int32 // A counter that we use for computing filenames to assign to blobs
manifest []byte // Manifest contents, temporary
manifestDigest digest.Digest // Valid if len(manifest) != 0
signatures []byte // Signature contents, temporary
signatureses map[digest.Digest][]byte // Instance signature contents, temporary
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // Sizes of each manifest's signature slice
imageRef storageReference
directory string // Temporary directory where we store blobs until Commit() time
nextTempFileID atomic.Int32 // A counter that we use for computing filenames to assign to blobs
manifest []byte // Manifest contents, temporary
manifestDigest digest.Digest // Valid if len(manifest) != 0
signatures []byte // Signature contents, temporary
signatureses map[digest.Digest][]byte // Instance signature contents, temporary
metadata storageImageMetadata // Metadata contents being built

// A storage destination may be used concurrently. Accesses are
// serialized via a mutex. Please refer to the individual comments
Expand All @@ -74,8 +73,9 @@ type storageImageDestination struct {
// time, can only be executed by *one* goroutine. Please refer to
// `queueOrCommit()` for further details on how the single-caller
// guarantee is implemented.
indexToStorageID map[int]*string
// All accesses to below data are protected by `lock` which is made
indexToStorageID map[int]string
// All accesses to below data are, during the concurrent TryReusingBlob/PutBlob/* calls
// (but not necessarily during the final Commit) protected by `lock` which is made
// *explicit* in the code.
uncompressedOrTocDigest map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs or TOC IDs.
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
Expand Down Expand Up @@ -124,11 +124,13 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (*
blobAdditionalLayer: make(map[digest.Digest]storage.AdditionalLayer),
fileSizes: make(map[digest.Digest]int64),
filenames: make(map[digest.Digest]string),
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
indexToStorageID: make(map[int]*string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
metadata: storageImageMetadata{
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
},
indexToStorageID: make(map[int]string),
indexToAddedLayerInfo: make(map[int]addedLayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
}
dest.Compat = impl.AddCompat(dest)
return dest, nil
Expand Down Expand Up @@ -586,14 +588,18 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
// Start with an empty string or the previous layer ID. Note that
// `s.indexToStorageID` can only be accessed by *one* goroutine at any
// given time. Hence, we don't need to lock accesses.
var lastLayer string
if prev := s.indexToStorageID[index-1]; prev != nil {
lastLayer = *prev
var parentLayer string
if index != 0 {
prev, ok := s.indexToStorageID[index-1]
if !ok {
return false, fmt.Errorf("Internal error: commitLayer called with previous layer %d not committed yet", index-1)
}
parentLayer = prev
}

// Carry over the previous ID for empty non-base layers.
if info.emptyLayer {
s.indexToStorageID[index] = &lastLayer
s.indexToStorageID[index] = parentLayer
return false, nil
}

Expand Down Expand Up @@ -626,13 +632,12 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
}
}
id := diffIDOrTOCDigest.Hex()
if lastLayer != "" {
id = digest.Canonical.FromBytes([]byte(lastLayer + "+" + diffIDOrTOCDigest.Hex())).Hex()
if parentLayer != "" {
id = digest.Canonical.FromString(parentLayer + "+" + diffIDOrTOCDigest.Hex()).Hex()
}
if layer, err2 := s.imageRef.transport.store.Layer(id); layer != nil && err2 == nil {
// There's already a layer that should have the right contents, just reuse it.
lastLayer = layer.ID
s.indexToStorageID[index] = &lastLayer
s.indexToStorageID[index] = layer.ID
return false, nil
}

Expand Down Expand Up @@ -664,7 +669,7 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
return false, fmt.Errorf("index %d out of range for configOCI.RootFS.DiffIDs", index)
}

layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil)
layer, err := s.imageRef.transport.store.CreateLayer(id, parentLayer, nil, "", false, nil)
if err != nil {
return false, err
}
Expand All @@ -682,20 +687,19 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
return false, err
}

s.indexToStorageID[index] = &layer.ID
s.indexToStorageID[index] = layer.ID
return false, nil
}

s.lock.Lock()
al, ok := s.blobAdditionalLayer[info.digest]
s.lock.Unlock()
if ok {
layer, err := al.PutAs(id, lastLayer, nil)
layer, err := al.PutAs(id, parentLayer, nil)
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
return false, fmt.Errorf("failed to put layer from digest and labels: %w", err)
}
lastLayer = layer.ID
s.indexToStorageID[index] = &lastLayer
s.indexToStorageID[index] = layer.ID
return false, nil
}

Expand Down Expand Up @@ -761,15 +765,15 @@ func (s *storageImageDestination) commitLayer(index int, info addedLayerInfo, si
defer file.Close()
// Build the new layer using the diff, regardless of where it came from.
// TODO: This can take quite some time, and should ideally be cancellable using ctx.Done().
layer, _, err := s.imageRef.transport.store.PutLayer(id, lastLayer, nil, "", false, &storage.LayerOptions{
layer, _, err := s.imageRef.transport.store.PutLayer(id, parentLayer, nil, "", false, &storage.LayerOptions{
OriginalDigest: info.digest,
UncompressedDigest: diffIDOrTOCDigest,
}, file)
if err != nil && !errors.Is(err, storage.ErrDuplicateID) {
return false, fmt.Errorf("adding layer with blob %q: %w", info.digest, err)
}

s.indexToStorageID[index] = &layer.ID
s.indexToStorageID[index] = layer.ID
return false, nil
}

Expand Down Expand Up @@ -827,12 +831,12 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
}
}
var lastLayer string
if len(layerBlobs) > 0 { // Can happen when using caches
prev := s.indexToStorageID[len(layerBlobs)-1]
if prev == nil {
if len(layerBlobs) > 0 { // Zero-layer images rarely make sense, but it is technically possible, and may happen for non-image artifacts.
prev, ok := s.indexToStorageID[len(layerBlobs)-1]
if !ok {
return fmt.Errorf("Internal error: storageImageDestination.Commit(): previous layer %d hasn't been committed (lastLayer == nil)", len(layerBlobs)-1)
}
lastLayer = *prev
lastLayer = prev
}

// If one of those blobs was a configuration blob, then we can try to dig out the date when the image
Expand Down Expand Up @@ -906,7 +910,7 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
}

// Set up to save our metadata.
metadata, err := json.Marshal(s)
metadata, err := json.Marshal(s.metadata)
if err != nil {
return fmt.Errorf("encoding metadata for image: %w", err)
}
Expand Down Expand Up @@ -1011,15 +1015,15 @@ func (s *storageImageDestination) PutSignaturesWithFormat(ctx context.Context, s
}
if instanceDigest == nil {
s.signatures = sigblob
s.SignatureSizes = sizes
s.metadata.SignatureSizes = sizes
if len(s.manifest) > 0 {
manifestDigest := s.manifestDigest
instanceDigest = &manifestDigest
}
}
if instanceDigest != nil {
s.signatureses[*instanceDigest] = sigblob
s.SignaturesSizes[*instanceDigest] = sizes
s.metadata.SignaturesSizes[*instanceDigest] = sizes
}
return nil
}
16 changes: 11 additions & 5 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,6 @@ var (
ErrNoSuchImage = storage.ErrNotAnImage
)

type storageImageCloser struct {
types.ImageCloser
size int64
}

// manifestBigDataKey returns a key suitable for recording a manifest with the specified digest using storage.Store.ImageBigData and related functions.
// If a specific manifest digest is explicitly requested by the user, the key returned by this function should be used preferably;
// for compatibility, if a manifest is not available under this key, check also storage.ImageDigestBigDataKey
Expand All @@ -36,6 +31,17 @@ func signatureBigDataKey(digest digest.Digest) string {
return "signature-" + digest.Encoded()
}

// storageImageMetadata is stored, as JSON, in storage.Image.Metadata
type storageImageMetadata struct {
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // Sizes of each manifest's signature slice
}

type storageImageCloser struct {
types.ImageCloser
size int64
}

// Size() returns the previously-computed size of the image, with no error.
func (s *storageImageCloser) Size() (int64, error) {
return s.size, nil
Expand Down
27 changes: 14 additions & 13 deletions storage/storage_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@ type storageImageSource struct {
imageRef storageReference
image *storage.Image
systemContext *types.SystemContext // SystemContext used in GetBlob() to create temporary files
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID)
metadata storageImageMetadata
cachedManifest []byte // A cached copy of the manifest, if already known, or nil
getBlobMutex sync.Mutex // Mutex to sync state for parallel GetBlob executions (it guards layerPosition and digestToLayerID)
getBlobMutexProtected getBlobMutexProtected
SignatureSizes []int `json:"signature-sizes,omitempty"` // List of sizes of each signature slice
SignaturesSizes map[digest.Digest][]int `json:"signatures-sizes,omitempty"` // List of sizes of each signature slice
}

const expectedLayerDiffIDFlag = "expected-layer-diffid"
Expand All @@ -71,19 +70,21 @@ func newImageSource(sys *types.SystemContext, imageRef storageReference) (*stora
}),
NoGetBlobAtInitialize: stubs.NoGetBlobAt(imageRef),

imageRef: imageRef,
systemContext: sys,
image: img,
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
imageRef: imageRef,
systemContext: sys,
image: img,
metadata: storageImageMetadata{
SignatureSizes: []int{},
SignaturesSizes: make(map[digest.Digest][]int),
},
getBlobMutexProtected: getBlobMutexProtected{
digestToLayerID: make(map[digest.Digest]string),
layerPosition: make(map[digest.Digest]int),
},
}
image.Compat = impl.AddCompat(image)
if img.Metadata != "" {
if err := json.Unmarshal([]byte(img.Metadata), image); err != nil {
if err := json.Unmarshal([]byte(img.Metadata), &image.metadata); err != nil {
return nil, fmt.Errorf("decoding metadata for source image: %w", err)
}
}
Expand Down Expand Up @@ -375,11 +376,11 @@ func buildLayerInfosForCopy(manifestInfos []manifest.LayerInfo, physicalInfos []
func (s *storageImageSource) GetSignaturesWithFormat(ctx context.Context, instanceDigest *digest.Digest) ([]signature.Signature, error) {
var offset int
signatureBlobs := []byte{}
signatureSizes := s.SignatureSizes
signatureSizes := s.metadata.SignatureSizes
key := "signatures"
instance := "default instance"
if instanceDigest != nil {
signatureSizes = s.SignaturesSizes[*instanceDigest]
signatureSizes = s.metadata.SignaturesSizes[*instanceDigest]
key = signatureBigDataKey(*instanceDigest)
instance = instanceDigest.Encoded()
}
Expand Down Expand Up @@ -425,7 +426,7 @@ func (s *storageImageSource) getSize() (int64, error) {
sum += bigSize
}
// Add the signature sizes.
for _, sigSize := range s.SignatureSizes {
for _, sigSize := range s.metadata.SignatureSizes {
sum += int64(sigSize)
}
// Walk the layer list.
Expand Down

0 comments on commit 848cac5

Please sign in to comment.