From 2eca31ddd520a558b063880b5db0bb3ece878a3b Mon Sep 17 00:00:00 2001 From: Joe Lanford Date: Mon, 24 Jun 2024 15:13:09 -0400 Subject: [PATCH] incremental improvement in catalogmetadata cache/client performance (#965) Signed-off-by: Joe Lanford --- Makefile | 2 +- internal/catalogmetadata/cache/cache.go | 58 ++- internal/catalogmetadata/cache/cache_test.go | 404 +++++++++++------- internal/catalogmetadata/client/client.go | 75 +++- .../catalogmetadata/client/client_test.go | 290 +++++++------ .../clusterextension_controller.go | 10 +- .../clusterextension_controller_test.go | 2 +- ...terextension_registryv1_validation_test.go | 2 +- internal/controllers/common_controller.go | 2 +- internal/controllers/suite_test.go | 2 +- .../testutil}/fake_catalog_client.go | 11 +- 11 files changed, 502 insertions(+), 356 deletions(-) rename {test/util => internal/testutil}/fake_catalog_client.go (64%) diff --git a/Makefile b/Makefile index e46cc379a..fb69629f5 100644 --- a/Makefile +++ b/Makefile @@ -135,7 +135,7 @@ test-ext-dev-e2e: $(OPERATOR_SDK) $(KUSTOMIZE) $(KIND) #HELP Run extension creat ENVTEST_VERSION := $(shell go list -m k8s.io/client-go | cut -d" " -f2 | sed 's/^v0\.\([[:digit:]]\{1,\}\)\.[[:digit:]]\{1,\}$$/1.\1.x/') UNIT_TEST_DIRS := $(shell go list ./... | grep -v /test/) test-unit: $(SETUP_ENVTEST) #HELP Run the unit tests - eval $$($(SETUP_ENVTEST) use -p env $(ENVTEST_VERSION) $(SETUP_ENVTEST_BIN_DIR_OVERRIDE)) && go test -count=1 -short $(UNIT_TEST_DIRS) -coverprofile cover.out + eval $$($(SETUP_ENVTEST) use -p env $(ENVTEST_VERSION) $(SETUP_ENVTEST_BIN_DIR_OVERRIDE)) && CGO_ENABLED=1 go test -count=1 -race -short $(UNIT_TEST_DIRS) -coverprofile cover.out image-registry: ## Setup in-cluster image registry ./test/tools/image-registry.sh $(E2E_REGISTRY_NAMESPACE) $(E2E_REGISTRY_NAME) diff --git a/internal/catalogmetadata/cache/cache.go b/internal/catalogmetadata/cache/cache.go index 85c1a81ac..528c187a7 100644 --- a/internal/catalogmetadata/cache/cache.go +++ b/internal/catalogmetadata/cache/cache.go @@ -3,13 +3,14 @@ package cache import ( "context" "fmt" - "io" + "io/fs" "net/http" "os" "path/filepath" "sync" catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" + "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-controller/internal/catalogmetadata/client" ) @@ -66,7 +67,7 @@ type filesystemCache struct { // resources that have been successfully reconciled, unpacked, and are being served. // These requirements help ensure that we can rely on status conditions to determine // when to issue a request to update the cached Catalog contents. -func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (io.ReadCloser, error) { +func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) { if catalog == nil { return nil, fmt.Errorf("error: provided catalog must be non-nil") } @@ -80,25 +81,23 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c } cacheDir := filepath.Join(fsc.cachePath, catalog.Name) - cacheFilePath := filepath.Join(cacheDir, "data.json") - fsc.mutex.RLock() if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok { if catalog.Status.ResolvedSource.Image.ResolvedRef == data.ResolvedRef { fsc.mutex.RUnlock() - return os.Open(cacheFilePath) + return os.DirFS(cacheDir), nil } } fsc.mutex.RUnlock() req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil) if err != nil { - return nil, fmt.Errorf("error forming request: %s", err) + return nil, fmt.Errorf("error forming request: %v", err) } resp, err := fsc.client.Do(req) if err != nil { - return nil, fmt.Errorf("error performing request: %s", err) + return nil, fmt.Errorf("error performing request: %v", err) } defer resp.Body.Close() @@ -117,34 +116,49 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c // the cached contents if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok { if data.ResolvedRef == catalog.Status.ResolvedSource.Image.Ref { - return os.Open(cacheFilePath) + return os.DirFS(cacheDir), nil } } - if err = os.MkdirAll(cacheDir, os.ModePerm); err != nil { - return nil, fmt.Errorf("error creating cache directory for Catalog %q: %s", catalog.Name, err) - } - - file, err := os.Create(cacheFilePath) + tmpDir, err := os.MkdirTemp(fsc.cachePath, fmt.Sprintf(".%s-", catalog.Name)) if err != nil { - return nil, fmt.Errorf("error creating cache file for Catalog %q: %s", catalog.Name, err) + return nil, fmt.Errorf("error creating temporary directory to unpack catalog metadata: %v", err) } - if _, err := io.Copy(file, resp.Body); err != nil { - return nil, fmt.Errorf("error writing contents to cache file for Catalog %q: %s", catalog.Name, err) + if err := declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error { + if err != nil { + return fmt.Errorf("error parsing catalog contents: %v", err) + } + pkgName := meta.Package + if meta.Schema == declcfg.SchemaPackage { + pkgName = meta.Name + } + metaName := meta.Name + if meta.Name == "" { + metaName = meta.Schema + } + metaPath := filepath.Join(tmpDir, pkgName, meta.Schema, metaName+".json") + if err := os.MkdirAll(filepath.Dir(metaPath), os.ModePerm); err != nil { + return fmt.Errorf("error creating directory for catalog metadata: %v", err) + } + if err := os.WriteFile(metaPath, meta.Blob, os.ModePerm); err != nil { + return fmt.Errorf("error writing catalog metadata to file: %v", err) + } + return nil + }); err != nil { + return nil, err } - if err = file.Sync(); err != nil { - return nil, fmt.Errorf("error syncing contents to cache file for Catalog %q: %s", catalog.Name, err) + if err := os.RemoveAll(cacheDir); err != nil { + return nil, fmt.Errorf("error removing old cache directory: %v", err) } - - if _, err = file.Seek(0, 0); err != nil { - return nil, fmt.Errorf("error resetting offset for cache file reader for Catalog %q: %s", catalog.Name, err) + if err := os.Rename(tmpDir, cacheDir); err != nil { + return nil, fmt.Errorf("error moving temporary directory to cache directory: %v", err) } fsc.cacheDataByCatalogName[catalog.Name] = cacheData{ ResolvedRef: catalog.Status.ResolvedSource.Image.ResolvedRef, } - return file, nil + return os.DirFS(cacheDir), nil } diff --git a/internal/catalogmetadata/cache/cache_test.go b/internal/catalogmetadata/cache/cache_test.go index f6719522c..4b24d90cb 100644 --- a/internal/catalogmetadata/cache/cache_test.go +++ b/internal/catalogmetadata/cache/cache_test.go @@ -3,18 +3,24 @@ package cache_test import ( "bytes" "context" + "encoding/json" "errors" + "fmt" "io" + "io/fs" + "maps" "net/http" - "os" "path/filepath" - "strings" "testing" + "testing/fstest" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" + "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-controller/internal/catalogmetadata/cache" ) @@ -50,203 +56,196 @@ const ( }` ) -func TestCache(t *testing.T) { - t.Run("FetchCatalogContents", func(t *testing.T) { - type test struct { - name string - catalog *catalogd.ClusterCatalog - contents []byte - wantErr bool - tripper *MockTripper - testCaching bool - shouldHitCache bool - } - for _, tt := range []test{ - { - name: "valid non-cached fetch", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, +var defaultFS = fstest.MapFS{ + "fake1/olm.package/fake1.json": &fstest.MapFile{Data: []byte(package1)}, + "fake1/olm.bundle/fake1.v1.0.0.json": &fstest.MapFile{Data: []byte(bundle1)}, + "fake1/olm.channel/stable.json": &fstest.MapFile{Data: []byte(stableChannel)}, +} + +func TestFilesystemCache(t *testing.T) { + type test struct { + name string + catalog *catalogd.ClusterCatalog + contents fstest.MapFS + wantErr bool + tripper *MockTripper + testCaching bool + shouldHitCache bool + } + for _, tt := range []test{ + { + name: "valid non-cached fetch", + catalog: &catalogd.ClusterCatalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.ClusterCatalogStatus{ + ResolvedSource: &catalogd.ResolvedCatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ResolvedImageSource{ + ResolvedRef: "fake/catalog@sha256:fakesha", }, }, }, - contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), - tripper: &MockTripper{}, }, - { - name: "valid cached fetch", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, + contents: defaultFS, + tripper: &MockTripper{}, + }, + { + name: "valid cached fetch", + catalog: &catalogd.ClusterCatalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.ClusterCatalogStatus{ + ResolvedSource: &catalogd.ResolvedCatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ResolvedImageSource{ + ResolvedRef: "fake/catalog@sha256:fakesha", }, }, }, - contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), - tripper: &MockTripper{}, - testCaching: true, - shouldHitCache: true, }, - { - name: "cached update fetch with changes", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, + contents: defaultFS, + tripper: &MockTripper{}, + testCaching: true, + shouldHitCache: true, + }, + { + name: "cached update fetch with changes", + catalog: &catalogd.ClusterCatalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.ClusterCatalogStatus{ + ResolvedSource: &catalogd.ResolvedCatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ResolvedImageSource{ + ResolvedRef: "fake/catalog@sha256:fakesha", }, }, }, - contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), - tripper: &MockTripper{}, - testCaching: true, - shouldHitCache: false, }, - { - name: "fetch error", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, + contents: defaultFS, + tripper: &MockTripper{}, + testCaching: true, + shouldHitCache: false, + }, + { + name: "fetch error", + catalog: &catalogd.ClusterCatalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.ClusterCatalogStatus{ + ResolvedSource: &catalogd.ResolvedCatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ResolvedImageSource{ + ResolvedRef: "fake/catalog@sha256:fakesha", }, }, }, - contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), - tripper: &MockTripper{shouldError: true}, - wantErr: true, }, - { - name: "fetch internal server error response", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, + contents: defaultFS, + tripper: &MockTripper{shouldError: true}, + wantErr: true, + }, + { + name: "fetch internal server error response", + catalog: &catalogd.ClusterCatalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.ClusterCatalogStatus{ + ResolvedSource: &catalogd.ResolvedCatalogSource{ + Type: catalogd.SourceTypeImage, + Image: &catalogd.ResolvedImageSource{ + ResolvedRef: "fake/catalog@sha256:fakesha", }, }, }, - contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), - tripper: &MockTripper{serverError: true}, - wantErr: true, }, - { - name: "nil catalog", - catalog: nil, - contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), - tripper: &MockTripper{serverError: true}, - wantErr: true, - }, - { - name: "nil catalog.status.resolvedSource", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: nil, - }, + contents: defaultFS, + tripper: &MockTripper{serverError: true}, + wantErr: true, + }, + { + name: "nil catalog", + catalog: nil, + contents: defaultFS, + tripper: &MockTripper{serverError: true}, + wantErr: true, + }, + { + name: "nil catalog.status.resolvedSource", + catalog: &catalogd.ClusterCatalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.ClusterCatalogStatus{ + ResolvedSource: nil, }, - contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), - tripper: &MockTripper{serverError: true}, - wantErr: true, }, - { - name: "nil catalog.status.resolvedSource.image", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Image: nil, - }, + contents: defaultFS, + tripper: &MockTripper{serverError: true}, + wantErr: true, + }, + { + name: "nil catalog.status.resolvedSource.image", + catalog: &catalogd.ClusterCatalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-catalog", + }, + Status: catalogd.ClusterCatalogStatus{ + ResolvedSource: &catalogd.ResolvedCatalogSource{ + Image: nil, }, }, - contents: []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), - tripper: &MockTripper{serverError: true}, - wantErr: true, }, - } { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - cacheDir := t.TempDir() - tt.tripper.content = tt.contents - httpClient := http.DefaultClient - httpClient.Transport = tt.tripper - c := cache.NewFilesystemCache(cacheDir, httpClient) + contents: defaultFS, + tripper: &MockTripper{serverError: true}, + wantErr: true, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + cacheDir := t.TempDir() + tt.tripper.content = make(fstest.MapFS) + maps.Copy(tt.tripper.content, tt.contents) + httpClient := http.DefaultClient + httpClient.Transport = tt.tripper + c := cache.NewFilesystemCache(cacheDir, httpClient) - rc, err := c.FetchCatalogContents(ctx, tt.catalog) - if !tt.wantErr { - assert.NoError(t, err) - filePath := filepath.Join(cacheDir, tt.catalog.Name, "data.json") - assert.FileExists(t, filePath) - fileContents, err := os.ReadFile(filePath) - assert.NoError(t, err) - assert.Equal(t, tt.contents, fileContents) + actualFS, err := c.FetchCatalogContents(ctx, tt.catalog) + if !tt.wantErr { + assert.NoError(t, err) + assert.NoError(t, equalFilesystems(tt.contents, actualFS)) + } else { + assert.Error(t, err) + } - data, err := io.ReadAll(rc) - assert.NoError(t, err) - assert.Equal(t, tt.contents, data) - defer rc.Close() - } else { - assert.Error(t, err) + if tt.testCaching { + if !tt.shouldHitCache { + tt.catalog.Status.ResolvedSource.Image.ResolvedRef = "fake/catalog@sha256:shafake" } - - if tt.testCaching { - if !tt.shouldHitCache { - tt.catalog.Status.ResolvedSource.Image.ResolvedRef = "fake/catalog@sha256:shafake" - } - tt.tripper.content = append(tt.tripper.content, []byte(`{"schema": "olm.package", "name": "foobar"}`)...) - rc, err := c.FetchCatalogContents(ctx, tt.catalog) - assert.NoError(t, err) - defer rc.Close() - data, err := io.ReadAll(rc) - assert.NoError(t, err) - if !tt.shouldHitCache { - assert.Equal(t, tt.tripper.content, data) - assert.NotEqual(t, tt.contents, data) - } else { - assert.Equal(t, tt.contents, data) - } + tt.tripper.content["foobar/olm.package/foobar.json"] = &fstest.MapFile{Data: []byte(`{"schema": "olm.package", "name": "foobar"}`)} + actualFS, err := c.FetchCatalogContents(ctx, tt.catalog) + assert.NoError(t, err) + if !tt.shouldHitCache { + assert.NoError(t, equalFilesystems(tt.tripper.content, actualFS)) + assert.ErrorContains(t, equalFilesystems(tt.contents, actualFS), "foobar/olm.package/foobar.json") + } else { + assert.NoError(t, equalFilesystems(tt.contents, actualFS)) } - }) - } - }) + } + }) + } } var _ http.RoundTripper = &MockTripper{} type MockTripper struct { - content []byte + content fstest.MapFS shouldError bool serverError bool } @@ -263,8 +262,81 @@ func (mt *MockTripper) RoundTrip(_ *http.Request) (*http.Response, error) { }, nil } + pr, pw := io.Pipe() + + go func() { + _ = pw.CloseWithError(declcfg.WalkMetasFS(context.Background(), mt.content, func(_ string, meta *declcfg.Meta, err error) error { + if err != nil { + return err + } + _, err = pw.Write(meta.Blob) + return err + })) + }() + return &http.Response{ StatusCode: http.StatusOK, - Body: io.NopCloser(bytes.NewReader(mt.content)), + Body: pr, }, nil } + +func equalFilesystems(expected, actual fs.FS) error { + normalizeJSON := func(data []byte) []byte { + var v interface{} + if err := json.Unmarshal(data, &v); err != nil { + return data + } + norm, err := json.Marshal(v) + if err != nil { + return data + } + return norm + } + compare := func(expected, actual fs.FS, path string) error { + expectedData, expectedErr := fs.ReadFile(expected, path) + actualData, actualErr := fs.ReadFile(actual, path) + + switch { + case expectedErr == nil && actualErr != nil: + return fmt.Errorf("path %q: read error in actual FS: %v", path, actualErr) + case expectedErr != nil && actualErr == nil: + return fmt.Errorf("path %q: read error in expected FS: %v", path, expectedErr) + case expectedErr != nil && actualErr != nil && expectedErr.Error() != actualErr.Error(): + return fmt.Errorf("path %q: different read errors: expected: %v, actual: %v", path, expectedErr, actualErr) + } + + if filepath.Ext(path) == ".json" { + expectedData = normalizeJSON(expectedData) + actualData = normalizeJSON(actualData) + } + + if !bytes.Equal(expectedData, actualData) { + return fmt.Errorf("path %q: file contents do not match: %s", path, cmp.Diff(string(expectedData), string(actualData))) + } + return nil + } + + paths := sets.New[string]() + for _, fsys := range []fs.FS{expected, actual} { + if err := fs.WalkDir(fsys, ".", func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + return nil + } + paths.Insert(path) + return nil + }); err != nil { + return err + } + } + + var cmpErrs []error + for _, path := range sets.List(paths) { + if err := compare(expected, actual, path); err != nil { + cmpErrs = append(cmpErrs, err) + } + } + return errors.Join(cmpErrs...) +} diff --git a/internal/catalogmetadata/client/client.go b/internal/catalogmetadata/client/client.go index 147a50842..a3e7339ca 100644 --- a/internal/catalogmetadata/client/client.go +++ b/internal/catalogmetadata/client/client.go @@ -1,10 +1,14 @@ package client import ( + "cmp" "context" "encoding/json" + "errors" "fmt" - "io" + "io/fs" + "slices" + "sync" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -23,7 +27,7 @@ type Fetcher interface { // server for the catalog provided. It returns an io.ReadCloser // containing the FBC contents that the caller is expected to close. // returns an error if any occur. - FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (io.ReadCloser, error) + FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) } func New(cl client.Client, fetcher Fetcher) *Client { @@ -43,66 +47,93 @@ type Client struct { fetcher Fetcher } -func (c *Client) Bundles(ctx context.Context) ([]*catalogmetadata.Bundle, error) { +func (c *Client) Bundles(ctx context.Context, packageName string) ([]*catalogmetadata.Bundle, error) { var allBundles []*catalogmetadata.Bundle var catalogList catalogd.ClusterCatalogList if err := c.cl.List(ctx, &catalogList); err != nil { return nil, err } + + var errs []error for _, catalog := range catalogList.Items { - // if the catalog has not been successfully unpacked, skip it + // if the catalog has not been successfully unpacked, report an error. This ensures that our + // reconciles are deterministic and wait for all desired catalogs to be ready. if !meta.IsStatusConditionPresentAndEqual(catalog.Status.Conditions, catalogd.TypeUnpacked, metav1.ConditionTrue) { + errs = append(errs, fmt.Errorf("catalog %q is not unpacked", catalog.Name)) continue } channels := []*catalogmetadata.Channel{} bundles := []*catalogmetadata.Bundle{} deprecations := []*catalogmetadata.Deprecation{} - rc, err := c.fetcher.FetchCatalogContents(ctx, catalog.DeepCopy()) + catalogFS, err := c.fetcher.FetchCatalogContents(ctx, catalog.DeepCopy()) + if err != nil { + errs = append(errs, fmt.Errorf("error fetching catalog %q contents: %v", catalog.Name, err)) + continue + } + + packageFS, err := fs.Sub(catalogFS, packageName) if err != nil { - return nil, fmt.Errorf("error fetching catalog contents: %s", err) + errs = append(errs, fmt.Errorf("error reading package subdirectory %q from catalog %q filesystem: %v", packageName, catalog.Name, err)) + continue } - defer rc.Close() - err = declcfg.WalkMetasReader(rc, func(meta *declcfg.Meta, err error) error { + var ( + channelsMu sync.Mutex + bundlesMu sync.Mutex + deprecationsMu sync.Mutex + ) + + if err := declcfg.WalkMetasFS(ctx, packageFS, func(_ string, meta *declcfg.Meta, err error) error { if err != nil { - return fmt.Errorf("error was provided to the WalkMetasReaderFunc: %s", err) + return fmt.Errorf("error parsing package metadata: %v", err) } switch meta.Schema { case declcfg.SchemaChannel: var content catalogmetadata.Channel if err := json.Unmarshal(meta.Blob, &content); err != nil { - return fmt.Errorf("error unmarshalling channel from catalog metadata: %s", err) + return fmt.Errorf("error unmarshalling channel from catalog metadata: %v", err) } + channelsMu.Lock() + defer channelsMu.Unlock() channels = append(channels, &content) case declcfg.SchemaBundle: var content catalogmetadata.Bundle if err := json.Unmarshal(meta.Blob, &content); err != nil { - return fmt.Errorf("error unmarshalling bundle from catalog metadata: %s", err) + return fmt.Errorf("error unmarshalling bundle from catalog metadata: %v", err) } + bundlesMu.Lock() + defer bundlesMu.Unlock() bundles = append(bundles, &content) case declcfg.SchemaDeprecation: var content catalogmetadata.Deprecation if err := json.Unmarshal(meta.Blob, &content); err != nil { - return fmt.Errorf("error unmarshalling deprecation from catalog metadata: %s", err) + return fmt.Errorf("error unmarshalling deprecation from catalog metadata: %v", err) } + deprecationsMu.Lock() + defer deprecationsMu.Unlock() deprecations = append(deprecations, &content) } - return nil - }) - if err != nil { - return nil, fmt.Errorf("error processing response: %s", err) + }); err != nil { + if !errors.Is(err, fs.ErrNotExist) { + errs = append(errs, fmt.Errorf("error reading package %q from catalog %q: %v", packageName, catalog.Name, err)) + } + continue } bundles, err = PopulateExtraFields(catalog.Name, channels, bundles, deprecations) if err != nil { - return nil, err + errs = append(errs, err) + continue } allBundles = append(allBundles, bundles...) } + if len(errs) > 0 { + return nil, errors.Join(errs...) + } return allBundles, nil } @@ -128,6 +159,16 @@ func PopulateExtraFields(catalogName string, channels []*catalogmetadata.Channel } } + // We sort the channels here because the order that channels appear in this list is non-deterministic. + // They are non-deterministic because they are originally read from the cache in a concurrent manner that + // provides no ordering guarantees. + // + // This sort isn't strictly necessary for correctness, but it makes the output consistent and easier to + // reason about. + for _, bundle := range bundles { + slices.SortFunc(bundle.InChannels, func(a, b *catalogmetadata.Channel) int { return cmp.Compare(a.Name, b.Name) }) + } + // According to https://docs.google.com/document/d/1EzefSzoGZL2ipBt-eCQwqqNwlpOIt7wuwjG6_8ZCi5s/edit?usp=sharing // the olm.deprecations FBC object is only valid when either 0 or 1 instances exist // for any given package diff --git a/internal/catalogmetadata/client/client_test.go b/internal/catalogmetadata/client/client_test.go index 7d75f4f6e..d2e6dfabb 100644 --- a/internal/catalogmetadata/client/client_test.go +++ b/internal/catalogmetadata/client/client_test.go @@ -1,13 +1,12 @@ package client_test import ( - "bytes" "context" "encoding/json" "errors" - "io" - "strings" + "io/fs" "testing" + "testing/fstest" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -24,30 +23,30 @@ import ( ) func TestClient(t *testing.T) { - t.Run("Bundles", func(t *testing.T) { - for _, tt := range []struct { - name string - fakeCatalog func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) - wantErr string - fetcher *MockFetcher - }{ - { - name: "valid catalog", - fakeCatalog: defaultFakeCatalog, - fetcher: &MockFetcher{}, - }, - { - name: "cache error", - fakeCatalog: defaultFakeCatalog, - fetcher: &MockFetcher{shouldError: true}, - wantErr: "error fetching catalog contents: mock cache error", - }, - { - name: "channel has a ref to a missing bundle", - fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { - objs, _, catalogContentMap := defaultFakeCatalog() + for _, tt := range []struct { + name string + fakeCatalog func() ([]client.Object, []*catalogmetadata.Bundle, map[string]fstest.MapFS) + wantErr string + fetcher *MockFetcher + }{ + { + name: "valid catalog", + fakeCatalog: defaultFakeCatalog, + fetcher: &MockFetcher{}, + }, + { + name: "cache error", + fakeCatalog: defaultFakeCatalog, + fetcher: &MockFetcher{shouldError: true}, + wantErr: `error fetching catalog "catalog-1" contents: mock cache error +error fetching catalog "catalog-2" contents: mock cache error`, + }, + { + name: "channel has a ref to a missing bundle", + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string]fstest.MapFS) { + objs, _, catalogContentMap := defaultFakeCatalog() - catalogContentMap["catalog-1"] = append(catalogContentMap["catalog-1"], []byte(`{ + catalogContentMap["catalog-1"]["fake1/olm.channel/bad-channel-entry.json"] = &fstest.MapFile{Data: []byte(`{ "schema": "olm.channel", "name": "channel-with-missing-bundle", "package": "fake1", @@ -56,129 +55,130 @@ func TestClient(t *testing.T) { "name": "fake1.v9.9.9" } ] - }`)...) + }`)} - return objs, nil, catalogContentMap - }, - wantErr: `bundle "fake1.v9.9.9" not found in catalog "catalog-1" (package "fake1", channel "channel-with-missing-bundle")`, - fetcher: &MockFetcher{}, + return objs, nil, catalogContentMap }, - { - name: "invalid meta", - fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { - objs, _, catalogContentMap := defaultFakeCatalog() + wantErr: `bundle "fake1.v9.9.9" not found in catalog "catalog-1" (package "fake1", channel "channel-with-missing-bundle")`, + fetcher: &MockFetcher{}, + }, + { + name: "invalid meta", + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string]fstest.MapFS) { + objs, _, catalogContentMap := defaultFakeCatalog() - catalogContentMap["catalog-1"] = append(catalogContentMap["catalog-1"], []byte(`{"schema": "olm.bundle", "name":123123123}`)...) + catalogContentMap["catalog-1"]["fake1/olm.bundle/invalid-meta.json"] = &fstest.MapFile{Data: []byte(`{"schema": "olm.bundle", "package":"fake1", "name":123123123}`)} - return objs, nil, catalogContentMap - }, - wantErr: `error processing response: error was provided to the WalkMetasReaderFunc: expected value for key "name" to be a string, got %!t(float64=1.23123123e+08): 1.23123123e+08`, - fetcher: &MockFetcher{}, + return objs, nil, catalogContentMap }, - { - name: "invalid bundle", - fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { - objs, _, catalogContentMap := defaultFakeCatalog() + wantErr: `expected value for key "name" to be a string, got %!t(float64=1.23123123e+08): 1.23123123e+08`, + fetcher: &MockFetcher{}, + }, + { + name: "invalid bundle", + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string]fstest.MapFS) { + objs, _, catalogContentMap := defaultFakeCatalog() - catalogContentMap["catalog-1"] = append(catalogContentMap["catalog-1"], - []byte(`{"schema": "olm.bundle", "name":"foo", "package":"bar", "image":123123123}`)...) + catalogContentMap["catalog-1"]["fake1/olm.bundle/invalid-bundle.json"] = &fstest.MapFile{Data: []byte(`{"schema": "olm.bundle", "name":"foo", "package":"fake1", "image":123123123}`)} - return objs, nil, catalogContentMap - }, - wantErr: "error processing response: error unmarshalling bundle from catalog metadata: json: cannot unmarshal number into Go struct field Bundle.image of type string", - fetcher: &MockFetcher{}, + return objs, nil, catalogContentMap }, - { - name: "invalid channel", - fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { - objs, _, catalogContentMap := defaultFakeCatalog() + wantErr: "error unmarshalling bundle from catalog metadata: json: cannot unmarshal number into Go struct field Bundle.image of type string", + fetcher: &MockFetcher{}, + }, + { + name: "invalid channel", + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string]fstest.MapFS) { + objs, _, catalogContentMap := defaultFakeCatalog() - catalogContentMap["catalog-1"] = append(catalogContentMap["catalog-1"], - []byte(`{"schema": "olm.channel", "name":"foo", "package":"bar", "entries":[{"name":123123123}]}`)...) + catalogContentMap["catalog-1"]["fake1/olm.channel/invalid-channel.json"] = &fstest.MapFile{ + Data: []byte(`{"schema": "olm.channel", "name":"foo", "package":"fake1", "entries":[{"name":123123123}]}`), + } - return objs, nil, catalogContentMap - }, - wantErr: "error processing response: error unmarshalling channel from catalog metadata: json: cannot unmarshal number into Go struct field ChannelEntry.entries.name of type string", - fetcher: &MockFetcher{}, + return objs, nil, catalogContentMap }, - { - name: "skip catalog missing Unpacked status condition", - fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { - objs, bundles, catalogContentMap := defaultFakeCatalog() - objs = append(objs, &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "foobar", - }, - }) - catalogContentMap["foobar"] = catalogContentMap["catalog-1"] + wantErr: "error unmarshalling channel from catalog metadata: json: cannot unmarshal number into Go struct field ChannelEntry.entries.name of type string", + fetcher: &MockFetcher{}, + }, + { + name: "error on catalog missing Unpacked status condition", + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string]fstest.MapFS) { + objs, bundles, catalogContentMap := defaultFakeCatalog() + objs = append(objs, &catalogd.ClusterCatalog{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foobar", + }, + }) + catalogContentMap["foobar"] = catalogContentMap["catalog-1"] - return objs, bundles, catalogContentMap - }, - fetcher: &MockFetcher{}, + return objs, bundles, catalogContentMap }, - { - name: "deprecated at the package, channel, and bundle level", - fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { - objs, bundles, catalogContentMap := defaultFakeCatalog() - - catalogContentMap["catalog-1"] = append(catalogContentMap["catalog-1"], - []byte(`{"schema": "olm.deprecations", "package":"fake1", "entries":[{"message": "fake1 is deprecated", "reference": {"schema": "olm.package"}}, {"message":"channel stable is deprecated", "reference": {"schema": "olm.channel", "name": "stable"}}, {"message": "bundle fake1.v1.0.0 is deprecated", "reference":{"schema":"olm.bundle", "name":"fake1.v1.0.0"}}]}`)...) - - for i := range bundles { - if bundles[i].Package == "fake1" && bundles[i].CatalogName == "catalog-1" && bundles[i].Name == "fake1.v1.0.0" { - bundles[i].Deprecations = append(bundles[i].Deprecations, declcfg.DeprecationEntry{ - Reference: declcfg.PackageScopedReference{ - Schema: "olm.package", - }, - Message: "fake1 is deprecated", - }) - - bundles[i].Deprecations = append(bundles[i].Deprecations, declcfg.DeprecationEntry{ - Reference: declcfg.PackageScopedReference{ - Schema: "olm.channel", - Name: "stable", - }, - Message: "channel stable is deprecated", - }) - - bundles[i].Deprecations = append(bundles[i].Deprecations, declcfg.DeprecationEntry{ - Reference: declcfg.PackageScopedReference{ - Schema: "olm.bundle", - Name: "fake1.v1.0.0", - }, - Message: "bundle fake1.v1.0.0 is deprecated", - }) - } + wantErr: `catalog "foobar" is not unpacked`, + fetcher: &MockFetcher{}, + }, + { + name: "deprecated at the package, channel, and bundle level", + fakeCatalog: func() ([]client.Object, []*catalogmetadata.Bundle, map[string]fstest.MapFS) { + objs, bundles, catalogContentMap := defaultFakeCatalog() + + catalogContentMap["catalog-1"]["fake1/olm.deprecations/olm.deprecations.json"] = &fstest.MapFile{ + Data: []byte(`{"schema": "olm.deprecations", "package":"fake1", "entries":[{"message": "fake1 is deprecated", "reference": {"schema": "olm.package"}}, {"message":"channel stable is deprecated", "reference": {"schema": "olm.channel", "name": "stable"}}, {"message": "bundle fake1.v1.0.0 is deprecated", "reference":{"schema":"olm.bundle", "name":"fake1.v1.0.0"}}]}`), + } + + for i := range bundles { + if bundles[i].Package == "fake1" && bundles[i].CatalogName == "catalog-1" && bundles[i].Name == "fake1.v1.0.0" { + bundles[i].Deprecations = append(bundles[i].Deprecations, declcfg.DeprecationEntry{ + Reference: declcfg.PackageScopedReference{ + Schema: "olm.package", + }, + Message: "fake1 is deprecated", + }) + + bundles[i].Deprecations = append(bundles[i].Deprecations, declcfg.DeprecationEntry{ + Reference: declcfg.PackageScopedReference{ + Schema: "olm.channel", + Name: "stable", + }, + Message: "channel stable is deprecated", + }) + + bundles[i].Deprecations = append(bundles[i].Deprecations, declcfg.DeprecationEntry{ + Reference: declcfg.PackageScopedReference{ + Schema: "olm.bundle", + Name: "fake1.v1.0.0", + }, + Message: "bundle fake1.v1.0.0 is deprecated", + }) } + } - return objs, bundles, catalogContentMap - }, - fetcher: &MockFetcher{}, + return objs, bundles, catalogContentMap }, - } { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - objs, expectedBundles, catalogContentMap := tt.fakeCatalog() - tt.fetcher.contentMap = catalogContentMap - - fakeCatalogClient := catalogClient.New( - fake.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(objs...).Build(), - tt.fetcher, - ) - - bundles, err := fakeCatalogClient.Bundles(ctx) - if tt.wantErr == "" { - assert.NoError(t, err) - assert.Equal(t, expectedBundles, bundles) - } else { - assert.EqualError(t, err, tt.wantErr) - } - }) - } - }) + fetcher: &MockFetcher{}, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + objs, expectedBundles, catalogContentMap := tt.fakeCatalog() + tt.fetcher.contentMap = catalogContentMap + + fakeCatalogClient := catalogClient.New( + fake.NewClientBuilder().WithScheme(scheme.Scheme).WithObjects(objs...).Build(), + tt.fetcher, + ) + + bundles, err := fakeCatalogClient.Bundles(ctx, "fake1") + if tt.wantErr == "" { + assert.NoError(t, err) + assert.Equal(t, expectedBundles, bundles) + } else { + assert.ErrorContains(t, err, tt.wantErr) + } + }) + } } -func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle, map[string][]byte) { +func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle, map[string]fstest.MapFS) { package1 := `{ "schema": "olm.package", "name": "fake1" @@ -269,7 +269,7 @@ func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle, map[strin { Channel: declcfg.Channel{ Schema: declcfg.SchemaChannel, - Name: "stable", + Name: "beta", Package: "fake1", Entries: []declcfg.ChannelEntry{ { @@ -281,7 +281,7 @@ func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle, map[strin { Channel: declcfg.Channel{ Schema: declcfg.SchemaChannel, - Name: "beta", + Name: "stable", Package: "fake1", Entries: []declcfg.ChannelEntry{ { @@ -323,9 +323,21 @@ func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle, map[strin }, } - catalogContents := map[string][]byte{ - "catalog-1": []byte(strings.Join([]string{package1, bundle1, stableChannel, betaChannel}, "\n")), - "catalog-2": []byte(strings.Join([]string{package1, bundle1, stableChannel}, "\n")), + catalog1FS := fstest.MapFS{ + "fake1/olm.package/fake1.json": &fstest.MapFile{Data: []byte(package1)}, + "fake1/olm.bundle/fake1.v1.0.0.json": &fstest.MapFile{Data: []byte(bundle1)}, + "fake1/olm.channel/stable.json": &fstest.MapFile{Data: []byte(stableChannel)}, + "fake1/olm.channel/beta.json": &fstest.MapFile{Data: []byte(betaChannel)}, + } + catalog2FS := fstest.MapFS{ + "fake1/olm.package/fake1.json": &fstest.MapFile{Data: []byte(package1)}, + "fake1/olm.bundle/fake1.v1.0.0.json": &fstest.MapFile{Data: []byte(bundle1)}, + "fake1/olm.channel/stable.json": &fstest.MapFile{Data: []byte(stableChannel)}, + } + + catalogContents := map[string]fstest.MapFS{ + "catalog-1": catalog1FS, + "catalog-2": catalog2FS, } return objs, expectedBundles, catalogContents @@ -334,15 +346,15 @@ func defaultFakeCatalog() ([]client.Object, []*catalogmetadata.Bundle, map[strin var _ catalogClient.Fetcher = &MockFetcher{} type MockFetcher struct { - contentMap map[string][]byte + contentMap map[string]fstest.MapFS shouldError bool } -func (mc *MockFetcher) FetchCatalogContents(_ context.Context, catalog *catalogd.ClusterCatalog) (io.ReadCloser, error) { +func (mc *MockFetcher) FetchCatalogContents(_ context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) { if mc.shouldError { return nil, errors.New("mock cache error") } data := mc.contentMap[catalog.Name] - return io.NopCloser(bytes.NewReader(data)), nil + return data, nil } diff --git a/internal/controllers/clusterextension_controller.go b/internal/controllers/clusterextension_controller.go index c9d20e10f..f5707bb9d 100644 --- a/internal/controllers/clusterextension_controller.go +++ b/internal/controllers/clusterextension_controller.go @@ -376,15 +376,15 @@ func (r *ClusterExtensionReconciler) reconcile(ctx context.Context, ext *ocv1alp // resolve returns a Bundle from the catalog that needs to get installed on the cluster. func (r *ClusterExtensionReconciler) resolve(ctx context.Context, ext ocv1alpha1.ClusterExtension) (*catalogmetadata.Bundle, error) { - allBundles, err := r.BundleProvider.Bundles(ctx) - if err != nil { - return nil, fmt.Errorf("error fetching bundles: %w", err) - } - packageName := ext.Spec.PackageName channelName := ext.Spec.Channel versionRange := ext.Spec.Version + allBundles, err := r.BundleProvider.Bundles(ctx, packageName) + if err != nil { + return nil, fmt.Errorf("error fetching bundles: %w", err) + } + installedBundle, err := r.InstalledBundleGetter.GetInstalledBundle(ctx, &ext) if err != nil { return nil, err diff --git a/internal/controllers/clusterextension_controller_test.go b/internal/controllers/clusterextension_controller_test.go index 68c23fe5a..09608afbb 100644 --- a/internal/controllers/clusterextension_controller_test.go +++ b/internal/controllers/clusterextension_controller_test.go @@ -28,8 +28,8 @@ import ( "github.com/operator-framework/operator-controller/internal/catalogmetadata" "github.com/operator-framework/operator-controller/internal/conditionsets" "github.com/operator-framework/operator-controller/internal/controllers" + "github.com/operator-framework/operator-controller/internal/testutil" "github.com/operator-framework/operator-controller/pkg/features" - testutil "github.com/operator-framework/operator-controller/test/util" ) // Describe: ClusterExtension Controller Test diff --git a/internal/controllers/clusterextension_registryv1_validation_test.go b/internal/controllers/clusterextension_registryv1_validation_test.go index 3518a8b56..83b0157f1 100644 --- a/internal/controllers/clusterextension_registryv1_validation_test.go +++ b/internal/controllers/clusterextension_registryv1_validation_test.go @@ -23,7 +23,7 @@ import ( ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" "github.com/operator-framework/operator-controller/internal/catalogmetadata" "github.com/operator-framework/operator-controller/internal/controllers" - testutil "github.com/operator-framework/operator-controller/test/util" + "github.com/operator-framework/operator-controller/internal/testutil" ) func TestClusterExtensionRegistryV1DisallowDependencies(t *testing.T) { diff --git a/internal/controllers/common_controller.go b/internal/controllers/common_controller.go index 31c0d0685..c2770539a 100644 --- a/internal/controllers/common_controller.go +++ b/internal/controllers/common_controller.go @@ -29,7 +29,7 @@ import ( // BundleProvider provides the way to retrieve a list of Bundles from a source, // generally from a catalog client of some kind. type BundleProvider interface { - Bundles(ctx context.Context) ([]*catalogmetadata.Bundle, error) + Bundles(ctx context.Context, packageName string) ([]*catalogmetadata.Bundle, error) } // setResolvedStatusConditionSuccess sets the resolved status condition to success. diff --git a/internal/controllers/suite_test.go b/internal/controllers/suite_test.go index 7a049106f..e7c51aa15 100644 --- a/internal/controllers/suite_test.go +++ b/internal/controllers/suite_test.go @@ -41,8 +41,8 @@ import ( ocv1alpha1 "github.com/operator-framework/operator-controller/api/v1alpha1" "github.com/operator-framework/operator-controller/internal/controllers" + "github.com/operator-framework/operator-controller/internal/testutil" "github.com/operator-framework/operator-controller/pkg/scheme" - testutil "github.com/operator-framework/operator-controller/test/util" ) // MockUnpacker is a mock of Unpacker interface diff --git a/test/util/fake_catalog_client.go b/internal/testutil/fake_catalog_client.go similarity index 64% rename from test/util/fake_catalog_client.go rename to internal/testutil/fake_catalog_client.go index 20c3c8b29..7e855cb34 100644 --- a/test/util/fake_catalog_client.go +++ b/internal/testutil/fake_catalog_client.go @@ -23,9 +23,16 @@ func NewFakeCatalogClientWithError(e error) FakeCatalogClient { } } -func (c *FakeCatalogClient) Bundles(_ context.Context) ([]*catalogmetadata.Bundle, error) { +func (c *FakeCatalogClient) Bundles(_ context.Context, packageName string) ([]*catalogmetadata.Bundle, error) { if c.err != nil { return nil, c.err } - return c.bundles, nil + + var out []*catalogmetadata.Bundle + for _, b := range c.bundles { + if b.Package == packageName { + out = append(out, b) + } + } + return out, nil }