Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/cache: add preferred pogreb database cache impl #1278

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions alpha/action/render.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sort"
Expand All @@ -17,7 +16,6 @@ import (
"github.com/h2non/filetype"
"github.com/h2non/filetype/matchers"
"github.com/operator-framework/api/pkg/operators/v1alpha1"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/operator-framework/operator-registry/alpha/declcfg"
Expand All @@ -26,6 +24,7 @@ import (
"github.com/operator-framework/operator-registry/pkg/image"
"github.com/operator-framework/operator-registry/pkg/image/containerdregistry"
"github.com/operator-framework/operator-registry/pkg/lib/bundle"
"github.com/operator-framework/operator-registry/pkg/lib/log"
"github.com/operator-framework/operator-registry/pkg/registry"
"github.com/operator-framework/operator-registry/pkg/sqlite"
)
Expand Down Expand Up @@ -61,12 +60,6 @@ type Render struct {
skipSqliteDeprecationLog bool
}

func nullLogger() *logrus.Entry {
logger := logrus.New()
logger.SetOutput(io.Discard)
return logrus.NewEntry(logger)
}

func (r Render) Run(ctx context.Context) (*declcfg.DeclarativeConfig, error) {
if r.skipSqliteDeprecationLog {
// exhaust once with a no-op function.
Expand Down Expand Up @@ -119,7 +112,7 @@ func (r Render) createRegistry() (*containerdregistry.Registry, error) {
// The containerd registry impl is somewhat verbose, even on the happy path,
// so discard all logger logs. Any important failures will be returned from
// registry methods and eventually logged as fatal errors.
containerdregistry.WithLog(nullLogger()),
containerdregistry.WithLog(log.Null()),
)
if err != nil {
return nil, err
Expand Down
263 changes: 123 additions & 140 deletions alpha/declcfg/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@ import (
"sync"

"github.com/joelanford/ignore"
"github.com/operator-framework/api/pkg/operators"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/yaml"

"github.com/operator-framework/api/pkg/operators"

"github.com/operator-framework/operator-registry/alpha/property"
)

Expand All @@ -25,22 +26,42 @@ const (

type WalkMetasFSFunc func(path string, meta *Meta, err error) error

func WalkMetasFS(root fs.FS, walkFn WalkMetasFSFunc) error {
return walkFiles(root, func(root fs.FS, path string, err error) error {
if err != nil {
return walkFn(path, nil, err)
}
// WalkMetasFS walks the filesystem rooted at root and calls walkFn for each individual meta object found in the root.
// By default, WalkMetasFS is not thread-safe because it invokes walkFn concurrently. In order to make it thread-safe,
// use the WithConcurrency(1) to avoid concurrent invocations of walkFn.
func WalkMetasFS(ctx context.Context, root fs.FS, walkFn WalkMetasFSFunc, opts ...LoadOption) error {
if root == nil {
return fmt.Errorf("no declarative config filesystem provided")
}

f, err := root.Open(path)
if err != nil {
return walkFn(path, nil, err)
}
defer f.Close()
options := LoadOptions{
concurrency: runtime.NumCPU(),
}
for _, opt := range opts {
opt(&options)
}

return WalkMetasReader(f, func(meta *Meta, err error) error {
return walkFn(path, meta, err)
})
pathChan := make(chan string, options.concurrency)

// Create an errgroup to manage goroutines. The context is closed when any
// goroutine returns an error. Goroutines should check the context
// to see if they should return early (in the case of another goroutine
// returning an error).
eg, ctx := errgroup.WithContext(ctx)

// Walk the FS and send paths to a channel for parsing.
eg.Go(func() error {
return sendPaths(ctx, root, pathChan)
})

// Parse paths concurrently. The waitgroup ensures that all paths are parsed
// before the cfgChan is closed.
for i := 0; i < options.concurrency; i++ {
eg.Go(func() error {
return parseMetaPaths(ctx, root, pathChan, walkFn, options)
})
}
return eg.Wait()
}

type WalkMetasReaderFunc func(meta *Meta, err error) error
Expand Down Expand Up @@ -126,59 +147,16 @@ func WithConcurrency(concurrency int) LoadOption {
// If LoadFS encounters an error loading or parsing any file, the error will be
// immediately returned.
func LoadFS(ctx context.Context, root fs.FS, opts ...LoadOption) (*DeclarativeConfig, error) {
if root == nil {
return nil, fmt.Errorf("no declarative config filesystem provided")
}

options := LoadOptions{
concurrency: runtime.NumCPU(),
}
for _, opt := range opts {
opt(&options)
}

var (
fcfg = &DeclarativeConfig{}
pathChan = make(chan string, options.concurrency)
cfgChan = make(chan *DeclarativeConfig, options.concurrency)
)

// Create an errgroup to manage goroutines. The context is closed when any
// goroutine returns an error. Goroutines should check the context
// to see if they should return early (in the case of another goroutine
// returning an error).
eg, ctx := errgroup.WithContext(ctx)

// Walk the FS and send paths to a channel for parsing.
eg.Go(func() error {
return sendPaths(ctx, root, pathChan)
})

// Parse paths concurrently. The waitgroup ensures that all paths are parsed
// before the cfgChan is closed.
var wg sync.WaitGroup
for i := 0; i < options.concurrency; i++ {
wg.Add(1)
eg.Go(func() error {
defer wg.Done()
return parsePaths(ctx, root, pathChan, cfgChan)
})
}

// Merge parsed configs into a single config.
eg.Go(func() error {
return mergeCfgs(ctx, cfgChan, fcfg)
})

// Wait for all path parsing goroutines to finish before closing cfgChan.
wg.Wait()
close(cfgChan)

// Wait for all goroutines to finish.
if err := eg.Wait(); err != nil {
builder := fbcBuilder{}
if err := WalkMetasFS(ctx, root, func(path string, meta *Meta, err error) error {
if err != nil {
return err
}
return builder.addMeta(meta)
}, opts...); err != nil {
return nil, err
}
return fcfg, nil
return &builder.cfg, nil
}

func sendPaths(ctx context.Context, root fs.FS, pathChan chan<- string) error {
Expand All @@ -196,7 +174,7 @@ func sendPaths(ctx context.Context, root fs.FS, pathChan chan<- string) error {
})
}

func parsePaths(ctx context.Context, root fs.FS, pathChan <-chan string, cfgChan chan<- *DeclarativeConfig) error {
func parseMetaPaths(ctx context.Context, root fs.FS, pathChan <-chan string, walkFn WalkMetasFSFunc, options LoadOptions) error {
for {
select {
case <-ctx.Done(): // don't block on receiving from pathChan
Expand All @@ -205,51 +183,35 @@ func parsePaths(ctx context.Context, root fs.FS, pathChan <-chan string, cfgChan
if !ok {
return nil
}
cfg, err := LoadFile(root, path)
file, err := root.Open(path)
if err != nil {
return err
}
select {
case cfgChan <- cfg:
case <-ctx.Done(): // don't block on sending to cfgChan
return ctx.Err()
if err := WalkMetasReader(file, func(meta *Meta, err error) error {
return walkFn(path, meta, err)
}); err != nil {
return err
}
}
}
}

func mergeCfgs(ctx context.Context, cfgChan <-chan *DeclarativeConfig, fcfg *DeclarativeConfig) error {
for {
select {
case <-ctx.Done(): // don't block on receiving from cfgChan
return ctx.Err()
case cfg, ok := <-cfgChan:
if !ok {
return nil
}
fcfg.Merge(cfg)
func readBundleObjects(b *Bundle) error {
var obj property.BundleObject
for i, props := range b.Properties {
if props.Type != property.TypeBundleObject {
continue
}
}
}

func readBundleObjects(bundles []Bundle) error {
for bi, b := range bundles {
var obj property.BundleObject
for i, props := range b.Properties {
if props.Type != property.TypeBundleObject {
continue
}
if err := json.Unmarshal(props.Value, &obj); err != nil {
return fmt.Errorf("package %q, bundle %q: parse property at index %d as bundle object: %v", b.Package, b.Name, i, err)
}
objJson, err := yaml.ToJSON(obj.Data)
if err != nil {
return fmt.Errorf("package %q, bundle %q: convert bundle object property at index %d to JSON: %v", b.Package, b.Name, i, err)
}
bundles[bi].Objects = append(bundles[bi].Objects, string(objJson))
if err := json.Unmarshal(props.Value, &obj); err != nil {
return fmt.Errorf("package %q, bundle %q: parse property at index %d as bundle object: %v", b.Package, b.Name, i, err)
}
bundles[bi].CsvJSON = extractCSV(bundles[bi].Objects)
objJson, err := yaml.ToJSON(obj.Data)
if err != nil {
return fmt.Errorf("package %q, bundle %q: convert bundle object property at index %d to JSON: %v", b.Package, b.Name, i, err)
}
b.Objects = append(b.Objects, string(objJson))
}
b.CsvJSON = extractCSV(b.Objects)
return nil
}

Expand All @@ -268,52 +230,16 @@ func extractCSV(objs []string) string {

// LoadReader reads yaml or json from the passed in io.Reader and unmarshals it into a DeclarativeConfig struct.
func LoadReader(r io.Reader) (*DeclarativeConfig, error) {
cfg := &DeclarativeConfig{}

if err := WalkMetasReader(r, func(in *Meta, err error) error {
builder := fbcBuilder{}
if err := WalkMetasReader(r, func(meta *Meta, err error) error {
if err != nil {
return err
}
switch in.Schema {
case SchemaPackage:
var p Package
if err := json.Unmarshal(in.Blob, &p); err != nil {
return fmt.Errorf("parse package: %v", err)
}
cfg.Packages = append(cfg.Packages, p)
case SchemaChannel:
var c Channel
if err := json.Unmarshal(in.Blob, &c); err != nil {
return fmt.Errorf("parse channel: %v", err)
}
cfg.Channels = append(cfg.Channels, c)
case SchemaBundle:
var b Bundle
if err := json.Unmarshal(in.Blob, &b); err != nil {
return fmt.Errorf("parse bundle: %v", err)
}
cfg.Bundles = append(cfg.Bundles, b)
case SchemaDeprecation:
var d Deprecation
if err := json.Unmarshal(in.Blob, &d); err != nil {
return fmt.Errorf("parse deprecation: %w", err)
}
cfg.Deprecations = append(cfg.Deprecations, d)
case "":
return fmt.Errorf("object '%s' is missing root schema field", string(in.Blob))
default:
cfg.Others = append(cfg.Others, *in)
}
return nil
return builder.addMeta(meta)
}); err != nil {
return nil, err
}

if err := readBundleObjects(cfg.Bundles); err != nil {
return nil, fmt.Errorf("read bundle objects: %v", err)
}

return cfg, nil
return &builder.cfg, nil
}

// LoadFile will unmarshall declarative config components from a single filename provided in 'path'
Expand All @@ -332,3 +258,60 @@ func LoadFile(root fs.FS, path string) (*DeclarativeConfig, error) {

return cfg, nil
}

type fbcBuilder struct {
cfg DeclarativeConfig

packagesMu sync.Mutex
channelsMu sync.Mutex
bundlesMu sync.Mutex
deprecationsMu sync.Mutex
othersMu sync.Mutex
}

func (c *fbcBuilder) addMeta(in *Meta) error {
switch in.Schema {
case SchemaPackage:
var p Package
if err := json.Unmarshal(in.Blob, &p); err != nil {
return fmt.Errorf("parse package: %v", err)
}
c.packagesMu.Lock()
c.cfg.Packages = append(c.cfg.Packages, p)
c.packagesMu.Unlock()
case SchemaChannel:
var ch Channel
if err := json.Unmarshal(in.Blob, &ch); err != nil {
return fmt.Errorf("parse channel: %v", err)
}
c.channelsMu.Lock()
c.cfg.Channels = append(c.cfg.Channels, ch)
c.channelsMu.Unlock()
case SchemaBundle:
var b Bundle
if err := json.Unmarshal(in.Blob, &b); err != nil {
return fmt.Errorf("parse bundle: %v", err)
}
if err := readBundleObjects(&b); err != nil {
return fmt.Errorf("read bundle objects: %v", err)
}
c.bundlesMu.Lock()
c.cfg.Bundles = append(c.cfg.Bundles, b)
c.bundlesMu.Unlock()
case SchemaDeprecation:
var d Deprecation
if err := json.Unmarshal(in.Blob, &d); err != nil {
return fmt.Errorf("parse deprecation: %w", err)
}
c.deprecationsMu.Lock()
c.cfg.Deprecations = append(c.cfg.Deprecations, d)
c.deprecationsMu.Unlock()
case "":
return fmt.Errorf("object '%s' is missing root schema field", string(in.Blob))
default:
c.othersMu.Lock()
c.cfg.Others = append(c.cfg.Others, *in)
c.othersMu.Unlock()
}
return nil
}
Loading
Loading