Skip to content

Commit

Permalink
incremental improvement in catalogmetadata cache/client performance
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Lanford <[email protected]>
  • Loading branch information
joelanford committed Jun 24, 2024
1 parent 6c61a78 commit 82303d7
Show file tree
Hide file tree
Showing 11 changed files with 502 additions and 356 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ test-ext-dev-e2e: $(OPERATOR_SDK) $(KUSTOMIZE) $(KIND) #HELP Run extension creat
ENVTEST_VERSION := $(shell go list -m k8s.io/client-go | cut -d" " -f2 | sed 's/^v0\.\([[:digit:]]\{1,\}\)\.[[:digit:]]\{1,\}$$/1.\1.x/')
UNIT_TEST_DIRS := $(shell go list ./... | grep -v /test/)
test-unit: $(SETUP_ENVTEST) #HELP Run the unit tests
eval $$($(SETUP_ENVTEST) use -p env $(ENVTEST_VERSION) $(SETUP_ENVTEST_BIN_DIR_OVERRIDE)) && go test -count=1 -short $(UNIT_TEST_DIRS) -coverprofile cover.out
eval $$($(SETUP_ENVTEST) use -p env $(ENVTEST_VERSION) $(SETUP_ENVTEST_BIN_DIR_OVERRIDE)) && CGO_ENABLED=1 go test -count=1 -race -short $(UNIT_TEST_DIRS) -coverprofile cover.out

image-registry: ## Setup in-cluster image registry
./test/tools/image-registry.sh $(E2E_REGISTRY_NAMESPACE) $(E2E_REGISTRY_NAME)
Expand Down
58 changes: 36 additions & 22 deletions internal/catalogmetadata/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package cache
import (
"context"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"
"sync"

catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
"github.com/operator-framework/operator-registry/alpha/declcfg"

"github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
)
Expand Down Expand Up @@ -66,7 +67,7 @@ type filesystemCache struct {
// resources that have been successfully reconciled, unpacked, and are being served.
// These requirements help ensure that we can rely on status conditions to determine
// when to issue a request to update the cached Catalog contents.
func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (io.ReadCloser, error) {
func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) {
if catalog == nil {
return nil, fmt.Errorf("error: provided catalog must be non-nil")
}
Expand All @@ -80,25 +81,23 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c
}

cacheDir := filepath.Join(fsc.cachePath, catalog.Name)
cacheFilePath := filepath.Join(cacheDir, "data.json")

fsc.mutex.RLock()
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if catalog.Status.ResolvedSource.Image.ResolvedRef == data.ResolvedRef {
fsc.mutex.RUnlock()
return os.Open(cacheFilePath)
return os.DirFS(cacheDir), nil
}
}
fsc.mutex.RUnlock()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil)
if err != nil {
return nil, fmt.Errorf("error forming request: %s", err)
return nil, fmt.Errorf("error forming request: %v", err)
}

resp, err := fsc.client.Do(req)
if err != nil {
return nil, fmt.Errorf("error performing request: %s", err)
return nil, fmt.Errorf("error performing request: %v", err)
}
defer resp.Body.Close()

Expand All @@ -117,34 +116,49 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c
// the cached contents
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if data.ResolvedRef == catalog.Status.ResolvedSource.Image.Ref {
return os.Open(cacheFilePath)
return os.DirFS(cacheDir), nil
}
}

if err = os.MkdirAll(cacheDir, os.ModePerm); err != nil {
return nil, fmt.Errorf("error creating cache directory for Catalog %q: %s", catalog.Name, err)
}

file, err := os.Create(cacheFilePath)
tmpDir, err := os.MkdirTemp(fsc.cachePath, fmt.Sprintf(".%s-", catalog.Name))
if err != nil {
return nil, fmt.Errorf("error creating cache file for Catalog %q: %s", catalog.Name, err)
return nil, fmt.Errorf("error creating temporary directory to unpack catalog metadata: %v", err)
}

if _, err := io.Copy(file, resp.Body); err != nil {
return nil, fmt.Errorf("error writing contents to cache file for Catalog %q: %s", catalog.Name, err)
if err := declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error {
if err != nil {
return fmt.Errorf("error parsing catalog contents: %v", err)
}
pkgName := meta.Package
if meta.Schema == declcfg.SchemaPackage {
pkgName = meta.Name
}
metaName := meta.Name
if meta.Name == "" {
metaName = meta.Schema
}
metaPath := filepath.Join(tmpDir, pkgName, meta.Schema, metaName+".json")
if err := os.MkdirAll(filepath.Dir(metaPath), os.ModePerm); err != nil {
return fmt.Errorf("error creating directory for catalog metadata: %v", err)
}
if err := os.WriteFile(metaPath, meta.Blob, os.ModePerm); err != nil {
return fmt.Errorf("error writing catalog metadata to file: %v", err)
}
return nil
}); err != nil {
return nil, err
}

if err = file.Sync(); err != nil {
return nil, fmt.Errorf("error syncing contents to cache file for Catalog %q: %s", catalog.Name, err)
if err := os.RemoveAll(cacheDir); err != nil {
return nil, fmt.Errorf("error removing old cache directory: %v", err)
}

if _, err = file.Seek(0, 0); err != nil {
return nil, fmt.Errorf("error resetting offset for cache file reader for Catalog %q: %s", catalog.Name, err)
if err := os.Rename(tmpDir, cacheDir); err != nil {
return nil, fmt.Errorf("error moving temporary directory to cache directory: %v", err)
}

fsc.cacheDataByCatalogName[catalog.Name] = cacheData{
ResolvedRef: catalog.Status.ResolvedSource.Image.ResolvedRef,
}

return file, nil
return os.DirFS(cacheDir), nil
}
Loading

0 comments on commit 82303d7

Please sign in to comment.