Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix the GC job index data race #20830

Merged
merged 1 commit into from
Aug 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 22 additions & 24 deletions src/jobservice/job/impl/gc/garbage_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,18 +318,16 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
return errGcStop
}

atomic.AddInt64(&index, 1)
index := atomic.LoadInt64(&index)

localIndex := atomic.AddInt64(&index, 1)
// set the status firstly, if the blob is updated by any HEAD/PUT request, it should be fail and skip.
blob.Status = blobModels.StatusDeleting
count, err := gc.blobMgr.UpdateBlobStatus(ctx.SystemContext(), blob)
if err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to mark gc candidate deleting, skip: %s, %s", uid, index, total, blob.Digest, blob.Status)
gc.logger.Errorf("[%s][%d/%d] failed to mark gc candidate deleting, skip: %s, %s", uid, localIndex, total, blob.Digest, blob.Status)
continue
}
if count == 0 {
gc.logger.Warningf("[%s][%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", uid, index, total, blob.ID, blob.Digest)
gc.logger.Warningf("[%s][%d/%d] no blob found to mark gc candidate deleting, ID:%d, digest:%s", uid, localIndex, total, blob.ID, blob.Digest)
continue
}

Expand All @@ -339,7 +337,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
for _, art := range gc.trashedArts[blob.Digest] {
// Harbor cannot know the existing tags in the backend from its database, so let the v2 DELETE manifest to remove all of them.
gc.logger.Infof("[%s][%d/%d] delete the manifest with registry v2 API: %s, %s, %s",
uid, index, total, art.RepositoryName, blob.ContentType, blob.Digest)
uid, localIndex, total, art.RepositoryName, blob.ContentType, blob.Digest)
if err := retry.Retry(func() error {
return ignoreNotFound(func() error {
err := v2DeleteManifest(art.RepositoryName, blob.Digest)
Expand All @@ -350,13 +348,13 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
return err
})
}, retry.Callback(func(err error, sleep time.Duration) {
gc.logger.Infof("[%s][%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep)
gc.logger.Infof("[%s][%d/%d] failed to exec v2DeleteManifest, error: %v, will retry again after: %s", uid, localIndex, total, err, sleep)
})); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to delete manifest with v2 API, %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to delete manifest with v2 API, %s, %s, %v", uid, localIndex, total, art.RepositoryName, blob.Digest, err)
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", uid, index, total, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after v2DeleteManifest() error out: %s, %v", uid, localIndex, total, blob.Digest, err)
return err
}
// if the system is set to read-only mode, return directly
Expand All @@ -367,7 +365,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
continue
}
// for manifest, it has to delete the revisions folder of each repository
gc.logger.Infof("[%s][%d/%d] delete manifest from storage: %s", uid, index, total, blob.Digest)
gc.logger.Infof("[%s][%d/%d] delete manifest from storage: %s", uid, localIndex, total, blob.Digest)
if err := retry.Retry(func() error {
return ignoreNotFound(func() error {
err := gc.registryCtlClient.DeleteManifest(art.RepositoryName, blob.Digest)
Expand All @@ -378,13 +376,13 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
return err
})
}, retry.Callback(func(err error, sleep time.Duration) {
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", uid, index, total, err, sleep)
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteManifest, error: %v, will retry again after: %s", uid, localIndex, total, err, sleep)
})); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", uid, index, total, art.RepositoryName, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to remove manifest from storage: %s, %s, errMsg=%v", uid, localIndex, total, art.RepositoryName, blob.Digest, err)
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", uid, index, total, art.RepositoryName, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteManifest() error out: %s, %s, %v", uid, localIndex, total, art.RepositoryName, blob.Digest, err)
return err
}
// if the system is set to read-only mode, return directly
Expand All @@ -395,19 +393,19 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
continue
}

gc.logger.Infof("[%s][%d/%d] delete artifact blob record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest)
gc.logger.Infof("[%s][%d/%d] delete artifact blob record from database: %d, %s, %s", uid, localIndex, total, art.ID, art.RepositoryName, art.Digest)
if err := ignoreNotFound(func() error {
return gc.blobMgr.CleanupAssociationsForArtifact(ctx.SystemContext(), art.Digest)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", uid, index, total, art.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.blobMgr.CleanupAssociationsForArtifact(): %v, errMsg=%v", uid, localIndex, total, art.Digest, err)
return err
}

gc.logger.Infof("[%s][%d/%d] delete artifact trash record from database: %d, %s, %s", uid, index, total, art.ID, art.RepositoryName, art.Digest)
gc.logger.Infof("[%s][%d/%d] delete artifact trash record from database: %d, %s, %s", uid, localIndex, total, art.ID, art.RepositoryName, art.Digest)
if err := ignoreNotFound(func() error {
return gc.artrashMgr.Delete(ctx.SystemContext(), art.ID)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", uid, index, total, art.ID, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.artrashMgr.Delete(): %v, errMsg=%v", uid, localIndex, total, art.ID, err)
return err
}
}
Expand All @@ -421,7 +419,7 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
// delete all the blobs, which include config, layer and manifest
// for the foreign layer, as it's not stored in the storage, no need to call the delete api and count size, but still have to delete the DB record.
if !blob.IsForeignLayer() {
gc.logger.Infof("[%s][%d/%d] delete blob from storage: %s", uid, index, total, blob.Digest)
gc.logger.Infof("[%s][%d/%d] delete blob from storage: %s", uid, localIndex, total, blob.Digest)
if err := retry.Retry(func() error {
return ignoreNotFound(func() error {
err := gc.registryCtlClient.DeleteBlob(blob.Digest)
Expand All @@ -432,13 +430,13 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
return err
})
}, retry.Callback(func(err error, sleep time.Duration) {
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", uid, index, total, err, sleep)
gc.logger.Infof("[%s][%d/%d] failed to exec DeleteBlob, error: %v, will retry again after: %s", uid, localIndex, total, err, sleep)
})); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err)
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from storage: %s, %s, errMsg=%v", uid, localIndex, total, blob.Digest, blob.Status, err)
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", uid, index, total, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.registryCtlClient.DeleteBlob() error out: %s, %v", uid, localIndex, total, blob.Digest, err)
return err
}
// if the system is set to read-only mode, return directly
Expand All @@ -450,15 +448,15 @@ func (gc *GarbageCollector) sweep(ctx job.Context) error {
atomic.AddInt64(&sweepSize, blob.Size)
}

gc.logger.Infof("[%s][%d/%d] delete blob record from database: %d, %s", uid, index, total, blob.ID, blob.Digest)
gc.logger.Infof("[%s][%d/%d] delete blob record from database: %d, %s", uid, localIndex, total, blob.ID, blob.Digest)
if err := ignoreNotFound(func() error {
return gc.blobMgr.Delete(ctx.SystemContext(), blob.ID)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", uid, index, total, blob.Digest, blob.Status, err)
gc.logger.Errorf("[%s][%d/%d] failed to delete blob from database: %s, %s, errMsg=%v", uid, localIndex, total, blob.Digest, blob.Status, err)
if err := ignoreNotFound(func() error {
return gc.markDeleteFailed(ctx, blob)
}); err != nil {
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", uid, index, total, blob.ID, blob.Digest, err)
gc.logger.Errorf("[%s][%d/%d] failed to call gc.markDeleteFailed() after gc.blobMgr.Delete() error out, %d, %s %v", uid, localIndex, total, blob.ID, blob.Digest, err)
return err
}
return err
Expand Down
Loading