Skip to content

Commit

Permalink
Consolidate file read to a single function (kube-burner#761)
Browse files Browse the repository at this point in the history
## Type of change

- [x] Refactor
- [x] Bug fix

## Description
Create a single GetReader function for all sources
Adjust all Reader operations to use the new function
Adjust APIs where needed
Remove the redundant embedConfig flag from Prometheus and Alert Manager
Make the EmbedFS field a pointer to allow simple check

## Related Tickets & Documents

When using via `kube-burner-ocp` patch jobType could not read from the
embedded file system.

Signed-off-by: Ygal Blum <[email protected]>
  • Loading branch information
ygalblum authored Dec 31, 2024
1 parent 95dbff5 commit 42b98e0
Show file tree
Hide file tree
Showing 20 changed files with 77 additions and 134 deletions.
10 changes: 5 additions & 5 deletions cmd/kube-burner/kube-burner.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ func initCmd() *cobra.Command {
util.SetupFileLogging(uuid)
kubeClientProvider := config.NewKubeClientProvider(kubeConfig, kubeContext)
clientSet, _ = kubeClientProvider.DefaultClientSet()
configFileReader, err := util.GetReaderForPath(configFile)
configFileReader, err := util.GetReader(configFile, nil, "")
if err != nil {
log.Fatalf("Error reading configuration file %s: %s", configFile, err)
}
var userDataFileReader io.Reader
if userDataFile != "" {
userDataFileReader, err = util.GetReaderForPath(userDataFile)
userDataFileReader, err = util.GetReader(userDataFile, nil, "")
if err != nil {
log.Fatalf("Error reading user data file %s: %s", userDataFile, err)
}
Expand Down Expand Up @@ -213,7 +213,7 @@ func measureCmd() *cobra.Command {
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
util.SetupFileLogging(uuid)
f, err := util.GetReaderForPath(configFile)
f, err := util.GetReader(configFile, nil, "")
if err != nil {
log.Fatalf("Error reading configuration file %s: %s", configFile, err)
}
Expand Down Expand Up @@ -450,15 +450,15 @@ func alertCmd() *cobra.Command {
Token: token,
SkipTLSVerify: skipTLSVerify,
}
p, err := prometheus.NewPrometheusClient(configSpec, url, auth, prometheusStep, nil, false, indexer)
p, err := prometheus.NewPrometheusClient(configSpec, url, auth, prometheusStep, nil, indexer)
if err != nil {
log.Fatal(err)
}
job := prometheus.Job{
Start: time.Unix(start, 0),
End: time.Unix(end, 0),
}
if alertM, err = alerting.NewAlertManager(alertProfile, uuid, p, false, indexer, nil); err != nil {
if alertM, err = alerting.NewAlertManager(alertProfile, uuid, p, indexer, nil); err != nil {
log.Fatalf("Error creating alert manager: %s", err)
}
err = alertM.Evaluate(job)
Expand Down
23 changes: 4 additions & 19 deletions pkg/alerting/alert_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ import (
"bytes"
"errors"
"fmt"
"io"
"math"
"os"
"path"
"strings"
"text/template"
"time"
Expand Down Expand Up @@ -86,35 +84,22 @@ type descriptionTemplate struct {
}

// NewAlertManager creates a new alert manager
func NewAlertManager(alertProfileCfg, uuid string, prometheusClient *prometheus.Prometheus, embedConfig bool, indexer *indexers.Indexer, metadata interface{}) (*AlertManager, error) {
func NewAlertManager(alertProfileCfg, uuid string, prometheusClient *prometheus.Prometheus, indexer *indexers.Indexer, metadata interface{}) (*AlertManager, error) {
log.Infof("🔔 Initializing alert manager for prometheus: %v", prometheusClient.Endpoint)
a := AlertManager{
prometheus: prometheusClient,
uuid: uuid,
indexer: indexer,
metadata: metadata,
}
if err := a.readProfile(alertProfileCfg, embedConfig); err != nil {
if err := a.readProfile(alertProfileCfg); err != nil {
return &a, err
}
return &a, nil
}

func (a *AlertManager) readProfile(alertProfileCfg string, embedConfig bool) error {
var f io.Reader
var err error
if embedConfig {
embeddedLocation := path.Join(path.Dir(a.prometheus.ConfigSpec.EmbedFSDir), alertProfileCfg)
f, err = util.ReadEmbedConfig(a.prometheus.ConfigSpec.EmbedFS, embeddedLocation)
if err != nil {
log.Infof("Embedded config doesn't contain alert profile %s. Falling back to original path", embeddedLocation)
f, err = util.GetReaderForPath(alertProfileCfg)
} else {
alertProfileCfg = embeddedLocation
}
} else {
f, err = util.GetReaderForPath(alertProfileCfg)
}
func (a *AlertManager) readProfile(alertProfileCfg string) error {
f, err := util.GetReader(alertProfileCfg, a.prometheus.ConfigSpec.EmbedFS, a.prometheus.ConfigSpec.EmbedFSDir)
if err != nil {
return fmt.Errorf("error reading alert profile %s: %s", alertProfileCfg, err)
}
Expand Down
12 changes: 1 addition & 11 deletions pkg/burner/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ package burner

import (
"context"
"embed"
"fmt"
"io"
"math"
"math/rand"
"path"
"strconv"
"sync"
"time"
Expand All @@ -39,22 +37,14 @@ import (
)

func (ex *Executor) setupCreateJob(configSpec config.Spec, mapper meta.RESTMapper) {
var err error
var f io.Reader
log.Debugf("Preparing create job: %s", ex.Name)
for _, o := range ex.Objects {
if o.Replicas < 1 {
log.Warnf("Object template %s has replicas %d < 1, skipping", o.ObjectTemplate, o.Replicas)
continue
}
log.Debugf("Rendering template: %s", o.ObjectTemplate)
e := embed.FS{}
if configSpec.EmbedFS == e {
f, err = util.GetReaderForPath(o.ObjectTemplate)
} else {
objectTemplate := path.Join(configSpec.EmbedFSDir, o.ObjectTemplate)
f, err = util.ReadEmbedConfig(configSpec.EmbedFS, objectTemplate)
}
f, err := util.GetReader(o.ObjectTemplate, configSpec.EmbedFS, configSpec.EmbedFSDir)
if err != nil {
log.Fatalf("Error reading template %s: %s", o.ObjectTemplate, err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/burner/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
)

func (ex *Executor) setupDeleteJob(mapper meta.RESTMapper) {
func (ex *Executor) setupDeleteJob(configSpec config.Spec, mapper meta.RESTMapper) {
log.Debugf("Preparing delete job: %s", ex.Name)
ex.itemHandler = deleteHandler
if ex.WaitForDeletion {
Expand All @@ -40,7 +40,7 @@ func (ex *Executor) setupDeleteJob(mapper meta.RESTMapper) {
ex.ExecutionMode = config.ExecutionModeSequential
for _, o := range ex.Objects {
log.Debugf("Job %s: %s %s with selector %s", ex.Name, ex.JobType, o.Kind, labels.Set(o.LabelSelector))
ex.objects = append(ex.objects, newObject(o, mapper, APIVersionV1))
ex.objects = append(ex.objects, newObject(o, configSpec, mapper, APIVersionV1))
}
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/burner/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ func newExecutor(configSpec config.Spec, kubeClientProvider *config.KubeClientPr
case config.CreationJob:
ex.setupCreateJob(configSpec, mapper)
case config.DeletionJob:
ex.setupDeleteJob(mapper)
ex.setupDeleteJob(configSpec, mapper)
case config.PatchJob:
ex.setupPatchJob(mapper)
ex.setupPatchJob(configSpec, mapper)
case config.ReadJob:
ex.setupReadJob(mapper)
ex.setupReadJob(configSpec, mapper)
case config.KubeVirtJob:
ex.setupKubeVirtJob(mapper)
ex.setupKubeVirtJob(configSpec, mapper)
default:
log.Fatalf("Unknown jobType: %s", job.JobType)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/burner/kubevirt.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var supportedOps = map[config.KubeVirtOpType]struct{}{
config.KubeVirtOpRemoveVolume: {},
}

func (ex *Executor) setupKubeVirtJob(mapper meta.RESTMapper) {
func (ex *Executor) setupKubeVirtJob(configSpec config.Spec, mapper meta.RESTMapper) {
var err error

if len(ex.ExecutionMode) == 0 {
Expand All @@ -76,7 +76,7 @@ func (ex *Executor) setupKubeVirtJob(mapper meta.RESTMapper) {
o.Kind = kubeVirtDefaultKind
}

ex.objects = append(ex.objects, newObject(o, mapper, kubeVirtAPIVersionV1))
ex.objects = append(ex.objects, newObject(o, configSpec, mapper, kubeVirtAPIVersionV1))
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/burner/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type object struct {
ready bool
}

func newObject(obj config.Object, mapper meta.RESTMapper, defaultAPIVersion string) object {
func newObject(obj config.Object, configSpec config.Spec, mapper meta.RESTMapper, defaultAPIVersion string) object {
if obj.APIVersion == "" {
obj.APIVersion = defaultAPIVersion
}
Expand All @@ -58,7 +58,7 @@ func newObject(obj config.Object, mapper meta.RESTMapper, defaultAPIVersion stri

if obj.ObjectTemplate != "" {
log.Debugf("Rendering template: %s", obj.ObjectTemplate)
f, err := util.GetReaderForPath(obj.ObjectTemplate)
f, err := util.GetReader(obj.ObjectTemplate, configSpec.EmbedFS, configSpec.EmbedFSDir)
if err != nil {
log.Fatalf("Error reading template %s: %s", obj.ObjectTemplate, err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/burner/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"k8s.io/apimachinery/pkg/types"
)

func (ex *Executor) setupPatchJob(mapper meta.RESTMapper) {
func (ex *Executor) setupPatchJob(configSpec config.Spec, mapper meta.RESTMapper) {
log.Debugf("Preparing patch job: %s", ex.Name)
ex.itemHandler = patchHandler
if len(ex.ExecutionMode) == 0 {
Expand All @@ -44,7 +44,7 @@ func (ex *Executor) setupPatchJob(mapper meta.RESTMapper) {
log.Fatalln("Empty Patch Type not allowed")
}
log.Infof("Job %s: %s %s with selector %s", ex.Name, ex.JobType, o.Kind, labels.Set(o.LabelSelector))
ex.objects = append(ex.objects, newObject(o, mapper, APIVersionV1))
ex.objects = append(ex.objects, newObject(o, configSpec, mapper, APIVersionV1))
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/burner/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ import (
"k8s.io/apimachinery/pkg/labels"
)

func (ex *Executor) setupReadJob(mapper meta.RESTMapper) {
func (ex *Executor) setupReadJob(configSpec config.Spec, mapper meta.RESTMapper) {
log.Debugf("Preparing read job: %s", ex.Name)
ex.itemHandler = readHandler
ex.ExecutionMode = config.ExecutionModeSequential

for _, o := range ex.Objects {
log.Debugf("Job %s: %s %s with selector %s", ex.Name, ex.JobType, o.Kind, labels.Set(o.LabelSelector))
ex.objects = append(ex.objects, newObject(o, mapper, APIVersionV1))
ex.objects = append(ex.objects, newObject(o, configSpec, mapper, APIVersionV1))
}
log.Infof("Job %s: %d iterations", ex.Name, ex.JobIterations)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Spec struct {
// Jobs list of kube-burner jobs
Jobs []Job `yaml:"jobs"`
// EmbedFS embed filesystem instance
EmbedFS embed.FS
EmbedFS *embed.FS
// EmbedFSDir Directory in which the configuration files are in the embed filesystem
EmbedFSDir string
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/measurements/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type measurement interface {
var factory measurementFactory
var measurementMap = make(map[string]measurement)
var globalCfg config.GlobalConfig
var embedFS embed.FS
var embedFS *embed.FS
var embedFSDir string

// NewMeasurementFactory initializes the measurement facture
Expand Down
13 changes: 1 addition & 12 deletions pkg/measurements/netpol_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@ package measurements
import (
"bytes"
"context"
"embed"
"encoding/json"
"fmt"
"io"
"net/http"
"path"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -439,16 +437,7 @@ func processResults(n *netpolLatency) {

// Read network policy object template
func readTemplate(o kconfig.Object) ([]byte, error) {
var err error
var f io.Reader

e := embed.FS{}
if embedFS == e {
f, err = kutil.GetReaderForPath(o.ObjectTemplate)
} else {
objectTemplate := path.Join(embedFSDir, o.ObjectTemplate)
f, err = kutil.ReadEmbedConfig(embedFS, objectTemplate)
}
f, err := kutil.GetReader(o.ObjectTemplate, embedFS, embedFSDir)
if err != nil {
log.Fatalf("Error reading template %s: %s", o.ObjectTemplate, err)
}
Expand Down
35 changes: 10 additions & 25 deletions pkg/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ package prometheus
import (
"bytes"
"fmt"
"io"
"math"
"path"
"text/template"
"time"

Expand All @@ -33,16 +31,15 @@ import (
)

// NewPrometheusClient creates a prometheus struct instance with the given parameters
func NewPrometheusClient(configSpec config.Spec, url string, auth Auth, step time.Duration, metadata map[string]interface{}, embedConfig bool, indexer *indexers.Indexer) (*Prometheus, error) {
func NewPrometheusClient(configSpec config.Spec, url string, auth Auth, step time.Duration, metadata map[string]interface{}, indexer *indexers.Indexer) (*Prometheus, error) {
var err error
p := Prometheus{
Step: step,
UUID: configSpec.GlobalConfig.UUID,
ConfigSpec: configSpec,
Endpoint: url,
embedConfig: embedConfig,
indexer: indexer,
metadata: metadata,
Step: step,
UUID: configSpec.GlobalConfig.UUID,
ConfigSpec: configSpec,
Endpoint: url,
indexer: indexer,
metadata: metadata,
}
log.Infof("👽 Initializing prometheus client with URL: %s", url)
p.Client, err = prometheus.NewClient(url, auth.Token, auth.Username, auth.Password, auth.SkipTLSVerify)
Expand Down Expand Up @@ -130,24 +127,12 @@ func (p *Prometheus) parseMatrix(metricName, query string, job Job, value model.

// ReadProfile reads, parses and validates metric profile configuration
func (p *Prometheus) ReadProfile(location string) error {
var f io.Reader
var err error
if p.embedConfig {
embeddedPath := path.Join(path.Dir(p.ConfigSpec.EmbedFSDir), location)
f, err = util.ReadEmbedConfig(p.ConfigSpec.EmbedFS, embeddedPath)
if err != nil {
log.Infof("Embedded config doesn't contain metrics profile %s. Falling back to original path", embeddedPath)
f, err = util.GetReaderForPath(location)
} else {
location = embeddedPath
}
} else {
f, err = util.GetReaderForPath(location)
}
p.profileName = location
f, err := util.GetReader(location, p.ConfigSpec.EmbedFS, p.ConfigSpec.EmbedFSDir)
if err != nil {
return fmt.Errorf("error reading metrics profile %s: %s", location, err)
}
p.profileName = location

yamlDec := yaml.NewDecoder(f)
yamlDec.KnownFields(true)
metricProfile := metricProfile{
Expand Down
1 change: 0 additions & 1 deletion pkg/prometheus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ type Prometheus struct {
UUID string
ConfigSpec config.Spec
metadata map[string]interface{}
embedConfig bool
indexer *indexers.Indexer
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/util/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func ProcessMetricsScraperConfig(scraperConfig ScraperConfig) Scraper {
Token: metricsEndpoint.Token,
SkipTLSVerify: metricsEndpoint.SkipTLSVerify,
}
p, err := prometheus.NewPrometheusClient(*scraperConfig.ConfigSpec, metricsEndpoint.Endpoint, auth, metricsEndpoint.Step, scraperConfig.MetricsMetadata, scraperConfig.EmbedConfig, indexer)
p, err := prometheus.NewPrometheusClient(*scraperConfig.ConfigSpec, metricsEndpoint.Endpoint, auth, metricsEndpoint.Step, scraperConfig.MetricsMetadata, indexer)
if err != nil {
log.Fatal(err)
}
Expand All @@ -92,7 +92,7 @@ func ProcessMetricsScraperConfig(scraperConfig ScraperConfig) Scraper {
}
}
for _, alertProfile := range metricsEndpoint.Alerts {
if alertM, err = alerting.NewAlertManager(alertProfile, scraperConfig.ConfigSpec.GlobalConfig.UUID, p, scraperConfig.EmbedConfig, indexer, scraperConfig.MetricsMetadata); err != nil {
if alertM, err = alerting.NewAlertManager(alertProfile, scraperConfig.ConfigSpec.GlobalConfig.UUID, p, indexer, scraperConfig.MetricsMetadata); err != nil {
log.Fatalf("Error creating alert manager: %s", err)
}
alertMs = append(alertMs, alertM)
Expand Down
1 change: 0 additions & 1 deletion pkg/util/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ type ScraperConfig struct {
UserMetaData string
SummaryMetadata map[string]interface{}
MetricsMetadata map[string]interface{}
EmbedConfig bool
}

// ScraperResponse holds parsed data related to scraper and target indexer
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/metrics/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// Decodes metrics endpoint yaml file
func DecodeMetricsEndpoint(metricsEndpointPath string) []config.MetricsEndpoint {
var metricsEndpoints []config.MetricsEndpoint
f, err := util.GetReaderForPath(metricsEndpointPath)
f, err := util.GetReader(metricsEndpointPath, nil, "")
if err != nil {
log.Fatalf("Error reading metricsEndpoint %s: %s", metricsEndpointPath, err)
}
Expand Down
Loading

0 comments on commit 42b98e0

Please sign in to comment.