Skip to content

Commit

Permalink
Use refactored confgen for metrics (netobserv#142)
Browse files Browse the repository at this point in the history
- Simplified interface with confgen: just call "ParseDefinition" and
  "GenerateTruncatedConfig"
- Propagate errors to caller
  • Loading branch information
jotak authored Aug 3, 2022
1 parent 54e3d69 commit f1b1989
Show file tree
Hide file tree
Showing 24 changed files with 446 additions and 517 deletions.
8 changes: 7 additions & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ Kafka can then be enabled in the `FlowCollector` resource by setting `spec.kafka

## Linking with API changes in flowlogs-pipeline

Add this at the bottom of `go.mod`:
To link with merged changes (but unreleased), update the FLP version by running (replacing "LONG_COMMIT_SHA"):

```bash
go get github.com/netobserv/flowlogs-pipeline@LONG_COMMIT_SHA
```

To link with unmerged changes, add this at the bottom of `go.mod`:

```
replace github.com/netobserv/flowlogs-pipeline => ../flowlogs-pipeline
Expand Down
126 changes: 54 additions & 72 deletions controllers/flowlogspipeline/flp_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ import (
"encoding/json"
"fmt"
"hash/fnv"
"io/ioutil"
"log"
"os"
"strconv"

"github.com/netobserv/flowlogs-pipeline/pkg/api"
Expand All @@ -29,15 +26,14 @@ import (
"github.com/netobserv/network-observability-operator/pkg/helper"
)

const configMapName = "flowlogs-pipeline-config"
const configVolume = "config-volume"
const configPath = "/etc/flowlogs-pipeline"
const configFile = "config.json"

const kafkaCerts = "kafka-certs"
const lokiCerts = "loki-certs"

const (
configMapName = "flowlogs-pipeline-config"
configVolume = "config-volume"
configPath = "/etc/flowlogs-pipeline"
configFile = "config.json"
metricsConfigDir = "metrics_definitions"
kafkaCerts = "kafka-certs"
lokiCerts = "loki-certs"
healthServiceName = "health"
prometheusServiceName = "prometheus"
healthTimeoutSeconds = 5
Expand Down Expand Up @@ -242,72 +238,47 @@ func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodT
}

//go:embed metrics_definitions
var FlpMetricsConfig embed.FS
var metricsConfigEmbed embed.FS

var FlpMetricsConfigDir = "metrics_definitions"
var generateStages = []string{"extract_aggregate", "encode_prom"}
var tmpMetricsDefinitionsDir = "/tmp/tmp_metrics_definitions_dir"

func (b *builder) obtainMetricsConfiguration() ([]api.AggregateDefinition, api.PromMetricsItems) {
// copy metrics_definitions from embed to /tmp and pass to confgenerator
os.RemoveAll(tmpMetricsDefinitionsDir)
if err := os.Mkdir(tmpMetricsDefinitionsDir, os.ModePerm); err != nil {
log.Printf("failed to create tmpMetricsDefinitionsDir %s, %s\n", tmpMetricsDefinitionsDir, err)
return nil, nil
}
entries, err := FlpMetricsConfig.ReadDir(FlpMetricsConfigDir)
func (b *builder) obtainMetricsConfiguration() ([]api.AggregateDefinition, api.PromMetricsItems, error) {
entries, err := metricsConfigEmbed.ReadDir(metricsConfigDir)
if err != nil {
log.Printf("failed to access metrics_definitions directory: %v\n", err)
return nil, nil
return nil, nil, fmt.Errorf("failed to access metrics_definitions directory: %w", err)
}

cg := confgen.NewConfGen(&confgen.Options{
GenerateStages: []string{"extract_aggregate", "encode_prom"},
SkipWithTags: b.desired.IgnoreMetrics,
})

for _, entry := range entries {
fileName := entry.Name()
srcPath := FlpMetricsConfigDir + "/" + fileName
destPath := tmpMetricsDefinitionsDir + "/" + fileName
input, err := FlpMetricsConfig.ReadFile(srcPath)
srcPath := metricsConfigDir + "/" + fileName

input, err := metricsConfigEmbed.ReadFile(srcPath)
if err != nil {
fmt.Printf("error reading metrics file %s; %v\n", srcPath, err)
return nil, nil
return nil, nil, fmt.Errorf("error reading metrics file %s; %w", srcPath, err)
}

err = ioutil.WriteFile(destPath, input, 0644)
err = cg.ParseDefinition(fileName, input)
if err != nil {
fmt.Printf("Error creating %s; %v\n", destPath, err)
return nil, nil
return nil, nil, fmt.Errorf("error parsing metrics file %s; %w", srcPath, err)
}
}
// set confgen.Opt.SrcFolder, etc
confgen.Opt.SrcFolder = tmpMetricsDefinitionsDir
confgen.Opt.DestConfFile = "/dev/null"
confgen.Opt.SkipWithTags = b.desired.IgnoreMetrics
confgen.Opt.GenerateStages = generateStages

// run the confgenerator to produce the proper flp configuration for metrics from metrics_definitions
cg, _ := confgen.NewConfGen()
err = cg.Run()
if err != nil {
log.Printf("failed to run NewConfGen %s", err)
return nil, nil
stages := cg.GenerateTruncatedConfig()
if len(stages) != 2 {
return nil, nil, fmt.Errorf("error generating truncated config, 2 stages expected in %v", stages)
}

truncatedConfig := cg.GenerateTruncatedConfig(generateStages)

// obtain pointers to various parameters structures:
var aggregates []api.AggregateDefinition
var promMetrics api.PromMetricsItems

for _, p := range truncatedConfig.Parameters {
if p.Extract != nil && p.Extract.Aggregates != nil {
aggregates = p.Extract.Aggregates
}
if p.Encode != nil && p.Encode.Prom != nil {
promMetrics = p.Encode.Prom.Metrics
}
if stages[0].Extract == nil {
return nil, nil, fmt.Errorf("error generating truncated config, Extract expected in %v", stages)
}
if stages[1].Encode == nil || stages[1].Encode.Prom == nil {
return nil, nil, fmt.Errorf("error generating truncated config, Encode expected in %v", stages)
}
return aggregates, promMetrics
return stages[0].Extract.Aggregates, stages[1].Encode.Prom.Metrics, nil
}

func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) {
func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) error {
lastStage := *stage
// Filter-out unused fields?
if b.desired.DropUnusedFields {
Expand Down Expand Up @@ -385,14 +356,19 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) {
}

// obtain extract_aggregate and encode_prometheus stages from metrics_definitions
aggregates, promMetrics := b.obtainMetricsConfiguration()
aggregates, promMetrics, err := b.obtainMetricsConfiguration()
if err != nil {
return err
}

// prometheus stage (encode) configuration
agg := enrichedStage.Aggregate("aggregate", aggregates)
agg.EncodePrometheus("prometheus", api.PromEncode{
Port: int(b.desired.PrometheusPort),
Prefix: "netobserv_",
Metrics: promMetrics,
})
return nil
}

func (b *builder) getKafkaTLS() *api.ClientTLS {
Expand All @@ -407,7 +383,7 @@ func (b *builder) getKafkaTLS() *api.ClientTLS {
return nil
}

func (b *builder) buildPipelineConfig() ([]config.Stage, []config.StageParam) {
func (b *builder) buildPipelineConfig() ([]config.Stage, []config.StageParam, error) {
var pipeline config.PipelineBuilderStage
if b.confKind == ConfKafkaTransformer {
pipeline = config.NewKafkaPipeline("kafka-read", api.IngestKafka{
Expand Down Expand Up @@ -437,15 +413,21 @@ func (b *builder) buildPipelineConfig() ([]config.Stage, []config.StageParam) {
TLS: b.getKafkaTLS(),
})
} else {
b.addTransformStages(&pipeline)
err := b.addTransformStages(&pipeline)
if err != nil {
return nil, nil, err
}
}
return pipeline.GetStages(), pipeline.GetStageParams()
return pipeline.GetStages(), pipeline.GetStageParams(), nil
}

// returns a configmap with a digest of its configuration contents, which will be used to
// detect any configuration change
func (b *builder) configMap() (*corev1.ConfigMap, string) {
stages, parameters := b.buildPipelineConfig()
func (b *builder) configMap() (*corev1.ConfigMap, string, error) {
stages, parameters, err := b.buildPipelineConfig()
if err != nil {
return nil, "", err
}

config := map[string]interface{}{
"log-level": b.desired.LogLevel,
Expand All @@ -456,11 +438,11 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
"parameters": parameters,
}

configStr := "{}"
bs, err := json.Marshal(config)
if err == nil {
configStr = string(bs)
if err != nil {
return nil, "", err
}
configStr := string(bs)

configMap := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand All @@ -475,7 +457,7 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) {
hasher := fnv.New64a()
_, _ = hasher.Write([]byte(configStr))
digest := strconv.FormatUint(hasher.Sum64(), 36)
return &configMap, digest
return &configMap, digest, nil
}

func (b *builder) service(old *corev1.Service) *corev1.Service {
Expand Down
5 changes: 4 additions & 1 deletion controllers/flowlogspipeline/flp_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,10 @@ func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flo
}

builder := newBuilder(r.nobjMngr.Namespace, desired.Spec.Agent, desiredFLP, desiredLoki, desiredKafka, r.confKind, r.useOpenShiftSCC)
newCM, configDigest := builder.configMap()
newCM, configDigest, err := builder.configMap()
if err != nil {
return err
}
if !r.nobjMngr.Exists(r.owned.configMap) {
if err := r.CreateOwned(ctx, newCM); err != nil {
return err
Expand Down
Loading

0 comments on commit f1b1989

Please sign in to comment.