From 3b1dd0e9700082fe7bc690a745263c06361417dd Mon Sep 17 00:00:00 2001 From: Xiaoxuan Wang <103478229+wangxiaoxuan273@users.noreply.github.com> Date: Thu, 11 Jan 2024 13:04:52 +0800 Subject: [PATCH] feat: Public GC function of `oci.Store` (#656) Part of #472 Signed-off-by: Xiaoxuan Wang --- content/oci/oci.go | 93 +++++++++++++++ content/oci/oci_test.go | 206 ++++++++++++++++++++++++++++++++++ internal/graph/memory.go | 10 ++ internal/graph/memory_test.go | 78 +++++++++++++ 4 files changed, 387 insertions(+) diff --git a/content/oci/oci.go b/content/oci/oci.go index 5c584f86d..ccefc0d9f 100644 --- a/content/oci/oci.go +++ b/content/oci/oci.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "sync" @@ -454,6 +455,77 @@ func (s *Store) writeIndexFile() error { return os.WriteFile(s.indexPath, indexJSON, 0666) } +// reloadIndex reloads the index and updates metadata by creating a new store. +func (s *Store) reloadIndex(ctx context.Context) error { + newStore, err := NewWithContext(ctx, s.root) + if err != nil { + return err + } + s.index = newStore.index + s.storage = newStore.storage + s.tagResolver = newStore.tagResolver + s.graph = newStore.graph + return nil +} + +// GC removes garbage from Store. Unsaved index will be lost. To prevent unexpected +// loss, call SaveIndex() before GC or set AutoSaveIndex to true. +// The garbage to be cleaned are: +// - unreferenced (dangling) blobs in Store which have no predecessors +// - garbage blobs in the storage whose metadata is not stored in Store +func (s *Store) GC(ctx context.Context) error { + s.sync.Lock() + defer s.sync.Unlock() + + // get reachable nodes by reloading the index + err := s.reloadIndex(ctx) + if err != nil { + return fmt.Errorf("unable to reload index: %w", err) + } + reachableNodes := s.graph.DigestSet() + + // clean up garbage blobs in the storage + rootpath := filepath.Join(s.root, ocispec.ImageBlobsDir) + algDirs, err := os.ReadDir(rootpath) + if err != nil { + return err + } + for _, algDir := range algDirs { + if !algDir.IsDir() { + continue + } + alg := algDir.Name() + // skip unsupported directories + if !isKnownAlgorithm(alg) { + continue + } + algPath := path.Join(rootpath, alg) + digestEntries, err := os.ReadDir(algPath) + if err != nil { + return err + } + for _, digestEntry := range digestEntries { + if err := isContextDone(ctx); err != nil { + return err + } + dgst := digestEntry.Name() + blobDigest := digest.NewDigestFromEncoded(digest.Algorithm(alg), dgst) + if err := blobDigest.Validate(); err != nil { + // skip irrelevant content + continue + } + if !reachableNodes.Contains(blobDigest) { + // remove the blob from storage if it does not exist in Store + err = os.Remove(path.Join(algPath, dgst)) + if err != nil { + return err + } + } + } + } + return nil +} + // unsafeStore is used to bypass lock restrictions in Delete. type unsafeStore struct { *Store @@ -467,6 +539,17 @@ func (s *unsafeStore) Predecessors(ctx context.Context, node ocispec.Descriptor) return s.graph.Predecessors(ctx, node) } +// isContextDone returns an error if the context is done. +// Reference: https://pkg.go.dev/context#Context +func isContextDone(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + return nil + } +} + // validateReference validates ref. func validateReference(ref string) error { if ref == "" { @@ -476,3 +559,13 @@ func validateReference(ref string) error { // TODO: may enforce more strict validation if needed. return nil } + +// isKnownAlgorithm checks is a string is a supported hash algorithm +func isKnownAlgorithm(alg string) bool { + switch digest.Algorithm(alg) { + case digest.SHA256, digest.SHA512, digest.SHA384: + return true + default: + return false + } +} diff --git a/content/oci/oci_test.go b/content/oci/oci_test.go index 0bdfb4a49..42702a6dd 100644 --- a/content/oci/oci_test.go +++ b/content/oci/oci_test.go @@ -24,6 +24,7 @@ import ( "fmt" "io" "os" + "path" "path/filepath" "reflect" "strconv" @@ -2844,6 +2845,199 @@ func TestStore_UntagErrorPath(t *testing.T) { } } +func TestStore_GC(t *testing.T) { + tempDir := t.TempDir() + s, err := New(tempDir) + if err != nil { + t.Fatal("New() error =", err) + } + ctx := context.Background() + + // generate test content + var blobs [][]byte + var descs []ocispec.Descriptor + appendBlob := func(mediaType string, blob []byte) { + blobs = append(blobs, blob) + descs = append(descs, ocispec.Descriptor{ + MediaType: mediaType, + Digest: digest.FromBytes(blob), + Size: int64(len(blob)), + }) + } + generateManifest := func(config ocispec.Descriptor, subject *ocispec.Descriptor, layers ...ocispec.Descriptor) { + manifest := ocispec.Manifest{ + Config: config, + Subject: subject, + Layers: layers, + } + manifestJSON, err := json.Marshal(manifest) + if err != nil { + t.Fatal(err) + } + appendBlob(ocispec.MediaTypeImageManifest, manifestJSON) + } + generateImageIndex := func(manifests ...ocispec.Descriptor) { + index := ocispec.Index{ + Manifests: manifests, + } + indexJSON, err := json.Marshal(index) + if err != nil { + t.Fatal(err) + } + appendBlob(ocispec.MediaTypeImageIndex, indexJSON) + } + generateArtifactManifest := func(blobs ...ocispec.Descriptor) { + var manifest spec.Artifact + manifest.Blobs = append(manifest.Blobs, blobs...) + manifestJSON, err := json.Marshal(manifest) + if err != nil { + t.Fatal(err) + } + appendBlob(spec.MediaTypeArtifactManifest, manifestJSON) + } + + appendBlob(ocispec.MediaTypeImageConfig, []byte("config")) // Blob 0 + appendBlob(ocispec.MediaTypeImageLayer, []byte("blob")) // Blob 1 + appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer")) // Blob 2, dangling layer + generateManifest(descs[0], nil, descs[1]) // Blob 3, valid manifest + generateManifest(descs[0], &descs[3], descs[1]) // Blob 4, referrer of a valid manifest, not in index.json, should be cleaned with current implementation + appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 2")) // Blob 5, dangling layer + generateArtifactManifest(descs[4]) // blob 6, dangling artifact + generateManifest(descs[0], &descs[5], descs[1]) // Blob 7, referrer of a dangling manifest + appendBlob(ocispec.MediaTypeImageLayer, []byte("dangling layer 3")) // Blob 8, dangling layer + generateArtifactManifest(descs[6]) // blob 9, dangling artifact + generateImageIndex(descs[7], descs[5]) // blob 10, dangling image index + appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 1")) // Blob 11, garbage layer 1 + generateManifest(descs[0], nil, descs[4]) // Blob 12, garbage manifest 1 + appendBlob(ocispec.MediaTypeImageConfig, []byte("garbage config")) // Blob 13, garbage config + appendBlob(ocispec.MediaTypeImageLayer, []byte("garbage layer 2")) // Blob 14, garbage layer 2 + generateManifest(descs[6], nil, descs[7]) // Blob 15, garbage manifest 2 + generateManifest(descs[0], &descs[13], descs[1]) // Blob 16, referrer of a garbage manifest + + // push blobs 0 - blobs 10 into s + for i := 0; i <= 10; i++ { + err := s.Push(ctx, descs[i], bytes.NewReader(blobs[i])) + if err != nil { + t.Errorf("failed to push test content to src: %d: %v", i, err) + } + } + + // remove blobs 4 - blobs 10 from index.json + for i := 4; i <= 10; i++ { + s.tagResolver.Untag(string(descs[i].Digest)) + } + s.SaveIndex() + + // push blobs 11 - blobs 16 into s.storage, making them garbage as their metadata + // doesn't exist in s + for i := 11; i < len(blobs); i++ { + err := s.storage.Push(ctx, descs[i], bytes.NewReader(blobs[i])) + if err != nil { + t.Errorf("failed to push test content to src: %d: %v", i, err) + } + } + + // confirm that all the blobs are in the storage + for i := 11; i < len(blobs); i++ { + exists, err := s.Exists(ctx, descs[i]) + if err != nil { + t.Fatal(err) + } + if !exists { + t.Fatalf("descs[%d] should exist", i) + } + } + + // perform GC + if err = s.GC(ctx); err != nil { + t.Fatal(err) + } + + // verify existence + wantExistence := []bool{true, true, false, true, false, false, false, false, false, false, false, false, false, false, false, false, false} + for i, wantValue := range wantExistence { + exists, err := s.Exists(ctx, descs[i]) + if err != nil { + t.Fatal(err) + } + if exists != wantValue { + t.Fatalf("want existence %d to be %v, got %v", i, wantValue, exists) + } + } +} + +func TestStore_GCErrorPath(t *testing.T) { + tempDir := t.TempDir() + s, err := New(tempDir) + if err != nil { + t.Fatal("New() error =", err) + } + ctx := context.Background() + + // generate test content + var blobs [][]byte + var descs []ocispec.Descriptor + appendBlob := func(mediaType string, blob []byte) { + blobs = append(blobs, blob) + descs = append(descs, ocispec.Descriptor{ + MediaType: mediaType, + Digest: digest.FromBytes(blob), + Size: int64(len(blob)), + }) + } + appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob")) // Blob 0 + + // push the valid blob + err = s.Push(ctx, descs[0], bytes.NewReader(blobs[0])) + if err != nil { + t.Error("failed to push test content to src") + } + + // write random contents + algPath := path.Join(tempDir, "blobs") + dgstPath := path.Join(algPath, "sha256") + if err := os.WriteFile(path.Join(algPath, "other"), []byte("random"), 0444); err != nil { + t.Fatal("error calling WriteFile(), error =", err) + } + if err := os.WriteFile(path.Join(dgstPath, "other2"), []byte("random2"), 0444); err != nil { + t.Fatal("error calling WriteFile(), error =", err) + } + + // perform GC + if err = s.GC(ctx); err != nil { + t.Fatal(err) + } + + appendBlob(ocispec.MediaTypeImageLayer, []byte("valid blob 2")) // Blob 1 + + // push the valid blob + err = s.Push(ctx, descs[1], bytes.NewReader(blobs[1])) + if err != nil { + t.Error("failed to push test content to src") + } + + // unknown algorithm + if err := os.Mkdir(path.Join(algPath, "sha666"), 0777); err != nil { + t.Fatal(err) + } + if err = s.GC(ctx); err != nil { + t.Fatal("this error should be silently ignored") + } + + // os.Remove() error + badDigest := digest.FromBytes([]byte("bad digest")).Encoded() + badPath := path.Join(algPath, "sha256", badDigest) + if err := os.Mkdir(badPath, 0777); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(path.Join(badPath, "whatever"), []byte("extra content"), 0444); err != nil { + t.Fatal("error calling WriteFile(), error =", err) + } + if err = s.GC(ctx); err == nil { + t.Fatal("expect an error when os.Remove()") + } +} + func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descriptor) bool { if len(actual) != len(expected) { return false @@ -2863,3 +3057,15 @@ func equalDescriptorSet(actual []ocispec.Descriptor, expected []ocispec.Descript } return true } + +func Test_isContextDone(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + if err := isContextDone(ctx); err != nil { + t.Errorf("expect error = %v, got %v", nil, err) + } + cancel() + if err := isContextDone(ctx); err != context.Canceled { + t.Errorf("expect error = %v, got %v", context.Canceled, err) + } +} diff --git a/internal/graph/memory.go b/internal/graph/memory.go index b93df83e4..aa7355527 100644 --- a/internal/graph/memory.go +++ b/internal/graph/memory.go @@ -20,6 +20,7 @@ import ( "errors" "sync" + "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "oras.land/oras-go/v2/content" "oras.land/oras-go/v2/errdef" @@ -147,6 +148,15 @@ func (m *Memory) Remove(node ocispec.Descriptor) []ocispec.Descriptor { return danglings } +// DigestSet returns the set of node digest in memory. +func (m *Memory) DigestSet() set.Set[digest.Digest] { + s := set.New[digest.Digest]() + for desc := range m.nodes { + s.Add(desc.Digest) + } + return s +} + // index indexes predecessors for each direct successor of the given node. func (m *Memory) index(ctx context.Context, fetcher content.Fetcher, node ocispec.Descriptor) ([]ocispec.Descriptor, error) { successors, err := content.Successors(ctx, fetcher, node) diff --git a/internal/graph/memory_test.go b/internal/graph/memory_test.go index 9b5bab33a..89ef4446a 100644 --- a/internal/graph/memory_test.go +++ b/internal/graph/memory_test.go @@ -610,3 +610,81 @@ func TestMemory_IndexAllAndPredecessors(t *testing.T) { t.Errorf("incorrect predecessor result") } } + +func TestMemory_DigestSet(t *testing.T) { + testFetcher := cas.NewMemory() + testMemory := NewMemory() + ctx := context.Background() + + // generate test content + var blobs [][]byte + var descriptors []ocispec.Descriptor + appendBlob := func(mediaType string, blob []byte) ocispec.Descriptor { + blobs = append(blobs, blob) + descriptors = append(descriptors, ocispec.Descriptor{ + MediaType: mediaType, + Digest: digest.FromBytes(blob), + Size: int64(len(blob)), + }) + return descriptors[len(descriptors)-1] + } + generateManifest := func(layers ...ocispec.Descriptor) ocispec.Descriptor { + manifest := ocispec.Manifest{ + Config: ocispec.Descriptor{MediaType: "test config"}, + Layers: layers, + } + manifestJSON, err := json.Marshal(manifest) + if err != nil { + t.Fatal(err) + } + return appendBlob(ocispec.MediaTypeImageManifest, manifestJSON) + } + generateIndex := func(manifests ...ocispec.Descriptor) ocispec.Descriptor { + index := ocispec.Index{ + Manifests: manifests, + } + indexJSON, err := json.Marshal(index) + if err != nil { + t.Fatal(err) + } + return appendBlob(ocispec.MediaTypeImageIndex, indexJSON) + } + descE := appendBlob("layer node E", []byte("Node E is a layer")) // blobs[0], layer "E" + descF := appendBlob("layer node F", []byte("Node F is a layer")) // blobs[1], layer "F" + descB := generateManifest(descriptors[0:1]...) // blobs[2], manifest "B" + descC := generateManifest(descriptors[0:2]...) // blobs[3], manifest "C" + descD := generateManifest(descriptors[1:2]...) // blobs[4], manifest "D" + descA := generateIndex(descriptors[2:5]...) // blobs[5], index "A" + + // prepare the content in the fetcher, so that it can be used to test IndexAll + testContents := []ocispec.Descriptor{descE, descF, descB, descC, descD, descA} + for i := 0; i < len(blobs); i++ { + testFetcher.Push(ctx, testContents[i], bytes.NewReader(blobs[i])) + } + + // make sure that testFetcher works + rc, err := testFetcher.Fetch(ctx, descA) + if err != nil { + t.Errorf("testFetcher.Fetch() error = %v", err) + } + got, err := io.ReadAll(rc) + if err != nil { + t.Errorf("testFetcher.Fetch().Read() error = %v", err) + } + err = rc.Close() + if err != nil { + t.Errorf("testFetcher.Fetch().Close() error = %v", err) + } + if !bytes.Equal(got, blobs[5]) { + t.Errorf("testFetcher.Fetch() = %v, want %v", got, blobs[4]) + } + + // index node A into testMemory using IndexAll + testMemory.IndexAll(ctx, testFetcher, descA) + digestSet := testMemory.DigestSet() + for i := 0; i < len(blobs); i++ { + if exists := digestSet.Contains(descriptors[i].Digest); exists != true { + t.Errorf("digest of blob[%d] should exist in digestSet", i) + } + } +}