diff --git a/DEVELOPMENT.md b/DEVELOPMENT.md index 901a6ba5b..b6421858a 100644 --- a/DEVELOPMENT.md +++ b/DEVELOPMENT.md @@ -63,7 +63,13 @@ Kafka can then be enabled in the `FlowCollector` resource by setting `spec.kafka ## Linking with API changes in flowlogs-pipeline -Add this at the bottom of `go.mod`: +To link with merged changes (but unreleased), update the FLP version by running (replacing "LONG_COMMIT_SHA"): + +```bash +go get github.com/netobserv/flowlogs-pipeline@LONG_COMMIT_SHA +``` + +To link with unmerged changes, add this at the bottom of `go.mod`: ``` replace github.com/netobserv/flowlogs-pipeline => ../flowlogs-pipeline diff --git a/controllers/flowlogspipeline/flp_objects.go b/controllers/flowlogspipeline/flp_objects.go index f2114638a..2a884ae2e 100644 --- a/controllers/flowlogspipeline/flp_objects.go +++ b/controllers/flowlogspipeline/flp_objects.go @@ -5,9 +5,6 @@ import ( "encoding/json" "fmt" "hash/fnv" - "io/ioutil" - "log" - "os" "strconv" "github.com/netobserv/flowlogs-pipeline/pkg/api" @@ -29,15 +26,14 @@ import ( "github.com/netobserv/network-observability-operator/pkg/helper" ) -const configMapName = "flowlogs-pipeline-config" -const configVolume = "config-volume" -const configPath = "/etc/flowlogs-pipeline" -const configFile = "config.json" - -const kafkaCerts = "kafka-certs" -const lokiCerts = "loki-certs" - const ( + configMapName = "flowlogs-pipeline-config" + configVolume = "config-volume" + configPath = "/etc/flowlogs-pipeline" + configFile = "config.json" + metricsConfigDir = "metrics_definitions" + kafkaCerts = "kafka-certs" + lokiCerts = "loki-certs" healthServiceName = "health" prometheusServiceName = "prometheus" healthTimeoutSeconds = 5 @@ -242,72 +238,47 @@ func (b *builder) podTemplate(hostNetwork bool, configDigest string) corev1.PodT } //go:embed metrics_definitions -var FlpMetricsConfig embed.FS +var metricsConfigEmbed embed.FS -var FlpMetricsConfigDir = "metrics_definitions" -var generateStages = []string{"extract_aggregate", "encode_prom"} -var tmpMetricsDefinitionsDir = "/tmp/tmp_metrics_definitions_dir" - -func (b *builder) obtainMetricsConfiguration() ([]api.AggregateDefinition, api.PromMetricsItems) { - // copy metrics_definitions from embed to /tmp and pass to confgenerator - os.RemoveAll(tmpMetricsDefinitionsDir) - if err := os.Mkdir(tmpMetricsDefinitionsDir, os.ModePerm); err != nil { - log.Printf("failed to create tmpMetricsDefinitionsDir %s, %s\n", tmpMetricsDefinitionsDir, err) - return nil, nil - } - entries, err := FlpMetricsConfig.ReadDir(FlpMetricsConfigDir) +func (b *builder) obtainMetricsConfiguration() ([]api.AggregateDefinition, api.PromMetricsItems, error) { + entries, err := metricsConfigEmbed.ReadDir(metricsConfigDir) if err != nil { - log.Printf("failed to access metrics_definitions directory: %v\n", err) - return nil, nil + return nil, nil, fmt.Errorf("failed to access metrics_definitions directory: %w", err) } + + cg := confgen.NewConfGen(&confgen.Options{ + GenerateStages: []string{"extract_aggregate", "encode_prom"}, + SkipWithTags: b.desired.IgnoreMetrics, + }) + for _, entry := range entries { fileName := entry.Name() - srcPath := FlpMetricsConfigDir + "/" + fileName - destPath := tmpMetricsDefinitionsDir + "/" + fileName - input, err := FlpMetricsConfig.ReadFile(srcPath) + srcPath := metricsConfigDir + "/" + fileName + + input, err := metricsConfigEmbed.ReadFile(srcPath) if err != nil { - fmt.Printf("error reading metrics file %s; %v\n", srcPath, err) - return nil, nil + return nil, nil, fmt.Errorf("error reading metrics file %s; %w", srcPath, err) } - - err = ioutil.WriteFile(destPath, input, 0644) + err = cg.ParseDefinition(fileName, input) if err != nil { - fmt.Printf("Error creating %s; %v\n", destPath, err) - return nil, nil + return nil, nil, fmt.Errorf("error parsing metrics file %s; %w", srcPath, err) } } - // set confgen.Opt.SrcFolder, etc - confgen.Opt.SrcFolder = tmpMetricsDefinitionsDir - confgen.Opt.DestConfFile = "/dev/null" - confgen.Opt.SkipWithTags = b.desired.IgnoreMetrics - confgen.Opt.GenerateStages = generateStages - // run the confgenerator to produce the proper flp configuration for metrics from metrics_definitions - cg, _ := confgen.NewConfGen() - err = cg.Run() - if err != nil { - log.Printf("failed to run NewConfGen %s", err) - return nil, nil + stages := cg.GenerateTruncatedConfig() + if len(stages) != 2 { + return nil, nil, fmt.Errorf("error generating truncated config, 2 stages expected in %v", stages) } - - truncatedConfig := cg.GenerateTruncatedConfig(generateStages) - - // obtain pointers to various parameters structures: - var aggregates []api.AggregateDefinition - var promMetrics api.PromMetricsItems - - for _, p := range truncatedConfig.Parameters { - if p.Extract != nil && p.Extract.Aggregates != nil { - aggregates = p.Extract.Aggregates - } - if p.Encode != nil && p.Encode.Prom != nil { - promMetrics = p.Encode.Prom.Metrics - } + if stages[0].Extract == nil { + return nil, nil, fmt.Errorf("error generating truncated config, Extract expected in %v", stages) + } + if stages[1].Encode == nil || stages[1].Encode.Prom == nil { + return nil, nil, fmt.Errorf("error generating truncated config, Encode expected in %v", stages) } - return aggregates, promMetrics + return stages[0].Extract.Aggregates, stages[1].Encode.Prom.Metrics, nil } -func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) { +func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) error { lastStage := *stage // Filter-out unused fields? if b.desired.DropUnusedFields { @@ -385,7 +356,11 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) { } // obtain extract_aggregate and encode_prometheus stages from metrics_definitions - aggregates, promMetrics := b.obtainMetricsConfiguration() + aggregates, promMetrics, err := b.obtainMetricsConfiguration() + if err != nil { + return err + } + // prometheus stage (encode) configuration agg := enrichedStage.Aggregate("aggregate", aggregates) agg.EncodePrometheus("prometheus", api.PromEncode{ @@ -393,6 +368,7 @@ func (b *builder) addTransformStages(stage *config.PipelineBuilderStage) { Prefix: "netobserv_", Metrics: promMetrics, }) + return nil } func (b *builder) getKafkaTLS() *api.ClientTLS { @@ -407,7 +383,7 @@ func (b *builder) getKafkaTLS() *api.ClientTLS { return nil } -func (b *builder) buildPipelineConfig() ([]config.Stage, []config.StageParam) { +func (b *builder) buildPipelineConfig() ([]config.Stage, []config.StageParam, error) { var pipeline config.PipelineBuilderStage if b.confKind == ConfKafkaTransformer { pipeline = config.NewKafkaPipeline("kafka-read", api.IngestKafka{ @@ -437,15 +413,21 @@ func (b *builder) buildPipelineConfig() ([]config.Stage, []config.StageParam) { TLS: b.getKafkaTLS(), }) } else { - b.addTransformStages(&pipeline) + err := b.addTransformStages(&pipeline) + if err != nil { + return nil, nil, err + } } - return pipeline.GetStages(), pipeline.GetStageParams() + return pipeline.GetStages(), pipeline.GetStageParams(), nil } // returns a configmap with a digest of its configuration contents, which will be used to // detect any configuration change -func (b *builder) configMap() (*corev1.ConfigMap, string) { - stages, parameters := b.buildPipelineConfig() +func (b *builder) configMap() (*corev1.ConfigMap, string, error) { + stages, parameters, err := b.buildPipelineConfig() + if err != nil { + return nil, "", err + } config := map[string]interface{}{ "log-level": b.desired.LogLevel, @@ -456,11 +438,11 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) { "parameters": parameters, } - configStr := "{}" bs, err := json.Marshal(config) - if err == nil { - configStr = string(bs) + if err != nil { + return nil, "", err } + configStr := string(bs) configMap := corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -475,7 +457,7 @@ func (b *builder) configMap() (*corev1.ConfigMap, string) { hasher := fnv.New64a() _, _ = hasher.Write([]byte(configStr)) digest := strconv.FormatUint(hasher.Sum64(), 36) - return &configMap, digest + return &configMap, digest, nil } func (b *builder) service(old *corev1.Service) *corev1.Service { diff --git a/controllers/flowlogspipeline/flp_reconciler.go b/controllers/flowlogspipeline/flp_reconciler.go index 424d47eb3..505a53a93 100644 --- a/controllers/flowlogspipeline/flp_reconciler.go +++ b/controllers/flowlogspipeline/flp_reconciler.go @@ -173,7 +173,10 @@ func (r *singleDeploymentReconciler) Reconcile(ctx context.Context, desired *flo } builder := newBuilder(r.nobjMngr.Namespace, desired.Spec.Agent, desiredFLP, desiredLoki, desiredKafka, r.confKind, r.useOpenShiftSCC) - newCM, configDigest := builder.configMap() + newCM, configDigest, err := builder.configMap() + if err != nil { + return err + } if !r.nobjMngr.Exists(r.owned.configMap) { if err := r.CreateOwned(ctx, newCM); err != nil { return err diff --git a/controllers/flowlogspipeline/flp_test.go b/controllers/flowlogspipeline/flp_test.go index 6c84e5afa..16089f667 100644 --- a/controllers/flowlogspipeline/flp_test.go +++ b/controllers/flowlogspipeline/flp_test.go @@ -17,7 +17,6 @@ limitations under the License. package flowlogspipeline import ( - "embed" "encoding/json" "fmt" "testing" @@ -149,14 +148,16 @@ func TestDaemonSetNoChange(t *testing.T) { loki := getLokiConfig() kafka := getKafkaConfig() b := newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest := b.configMap() + _, digest, err := b.configMap() + assert.NoError(err) first := b.daemonSet(digest) // Check no change flp = getFLPConfig() loki = getLokiConfig() b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) assert.False(daemonSetNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) } @@ -170,13 +171,15 @@ func TestDaemonSetChanged(t *testing.T) { loki := getLokiConfig() kafka := getKafkaConfig() b := newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest := b.configMap() + _, digest, err := b.configMap() + assert.NoError(err) first := b.daemonSet(digest) // Check probes enabled change flp.EnableKubeProbes = true b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) second := b.daemonSet(digest) assert.True(daemonSetNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) @@ -184,7 +187,8 @@ func TestDaemonSetChanged(t *testing.T) { // Check log level change flp.LogLevel = "info" b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) third := b.daemonSet(digest) assert.True(daemonSetNeedsUpdate(second, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) @@ -195,7 +199,8 @@ func TestDaemonSetChanged(t *testing.T) { corev1.ResourceMemory: resource.MustParse("500Gi"), } b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) fourth := b.daemonSet(digest) assert.True(daemonSetNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) @@ -206,7 +211,8 @@ func TestDaemonSetChanged(t *testing.T) { corev1.ResourceMemory: resource.MustParse("512Mi"), } b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) assert.True(daemonSetNeedsUpdate(fourth, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) assert.False(daemonSetNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) @@ -221,14 +227,16 @@ func TestDeploymentNoChange(t *testing.T) { loki := getLokiConfig() kafka := getKafkaConfig() b := newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest := b.configMap() + _, digest, err := b.configMap() + assert.NoError(err) first := b.deployment(digest) // Check no change flp = getFLPConfig() loki = getLokiConfig() b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) assert.False(deploymentNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) } @@ -242,13 +250,15 @@ func TestDeploymentChanged(t *testing.T) { loki := getLokiConfig() kafka := getKafkaConfig() b := newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest := b.configMap() + _, digest, err := b.configMap() + assert.NoError(err) first := b.deployment(digest) // Check probes enabled change flp.EnableKubeProbes = true b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) second := b.deployment(digest) assert.True(deploymentNeedsUpdate(first, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) @@ -256,7 +266,8 @@ func TestDeploymentChanged(t *testing.T) { // Check log level change flp.LogLevel = "info" b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) third := b.deployment(digest) assert.True(deploymentNeedsUpdate(second, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) @@ -267,7 +278,8 @@ func TestDeploymentChanged(t *testing.T) { corev1.ResourceMemory: resource.MustParse("500Gi"), } b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) fourth := b.deployment(digest) assert.True(deploymentNeedsUpdate(third, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) @@ -278,7 +290,8 @@ func TestDeploymentChanged(t *testing.T) { corev1.ResourceMemory: resource.MustParse("512Mi"), } b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) fifth := b.deployment(digest) assert.True(deploymentNeedsUpdate(fourth, &flp, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) @@ -288,7 +301,8 @@ func TestDeploymentChanged(t *testing.T) { flp2 := flp flp2.Replicas = 5 b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp2, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) assert.False(deploymentNeedsUpdate(fifth, &flp2, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) } @@ -302,14 +316,16 @@ func TestDeploymentChangedReplicasNoHPA(t *testing.T) { loki := getLokiConfig() kafka := getKafkaConfig() b := newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - _, digest := b.configMap() + _, digest, err := b.configMap() + assert.NoError(err) first := b.deployment(digest) // Check replicas changed (need to copy flp, as Spec.Replicas stores a pointer) flp2 := flp flp2.Replicas = 5 b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp2, &loki, &kafka, ConfSingle, true) - _, digest = b.configMap() + _, digest, err = b.configMap() + assert.NoError(err) assert.True(deploymentNeedsUpdate(first, &flp2, digest, constants.FLPName+FlpConfSuffix[ConfSingle])) } @@ -365,7 +381,8 @@ func TestConfigMapShouldDeserializeAsJSON(t *testing.T) { loki := getLokiConfig() kafka := getKafkaConfig() b := newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - cm, digest := b.configMap() + cm, digest, err := b.configMap() + assert.NoError(err) assert.NotEmpty(t, digest) assert.Equal("dev", cm.Labels["version"]) @@ -378,7 +395,7 @@ func TestConfigMapShouldDeserializeAsJSON(t *testing.T) { LogLevel string `json:"log-level"` } var decoded cfg - err := json.Unmarshal([]byte(data), &decoded) + err = json.Unmarshal([]byte(data), &decoded) assert.Nil(err) assert.Equal("trace", decoded.LogLevel) @@ -533,7 +550,8 @@ func TestPipelineConfig(t *testing.T) { loki := getLokiConfig() kafka := getKafkaConfig() b := newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - stages, parameters := b.buildPipelineConfig() + stages, parameters, err := b.buildPipelineConfig() + assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) jsonStages, _ := json.Marshal(stages) assert.Equal(`[{"name":"ipfix"},{"name":"enrich","follows":"ipfix"},{"name":"loki","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages)) @@ -541,14 +559,16 @@ func TestPipelineConfig(t *testing.T) { // Kafka Ingester kafka.Enable = true b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfKafkaIngester, true) - stages, parameters = b.buildPipelineConfig() + stages, parameters, err = b.buildPipelineConfig() + assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) jsonStages, _ = json.Marshal(stages) assert.Equal(`[{"name":"ipfix"},{"name":"kafka-write","follows":"ipfix"}]`, string(jsonStages)) // Kafka Transformer b = newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfKafkaTransformer, true) - stages, parameters = b.buildPipelineConfig() + stages, parameters, err = b.buildPipelineConfig() + assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) jsonStages, _ = json.Marshal(stages) assert.Equal(`[{"name":"kafka-read"},{"name":"enrich","follows":"kafka-read"},{"name":"loki","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages)) @@ -565,7 +585,8 @@ func TestPipelineConfigDropUnused(t *testing.T) { loki := getLokiConfig() kafka := getKafkaConfig() b := newBuilder(ns, flowsv1alpha1.AgentIPFIX, &flp, &loki, &kafka, ConfSingle, true) - stages, parameters := b.buildPipelineConfig() + stages, parameters, err := b.buildPipelineConfig() + assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) jsonStages, _ := json.Marshal(stages) assert.Equal(`[{"name":"ipfix"},{"name":"filter","follows":"ipfix"},{"name":"enrich","follows":"filter"},{"name":"loki","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages)) @@ -582,24 +603,21 @@ func TestPipelineTraceStage(t *testing.T) { flp := getFLPConfig() b := newBuilder("namespace", flowsv1alpha1.AgentIPFIX, &flp, nil, nil, "", true) - stages, parameters := b.buildPipelineConfig() + stages, parameters, err := b.buildPipelineConfig() + assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) jsonStages, _ := json.Marshal(stages) assert.Equal(`[{"name":"ipfix"},{"name":"enrich","follows":"ipfix"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages)) } -//go:embed test_metrics_definitions -var TestFlpMetricsConfig embed.FS - func TestMergeMetricsConfigurationNoIgnore(t *testing.T) { assert := assert.New(t) flp := getFLPConfig() - FlpMetricsConfigDir = "test_metrics_definitions" - FlpMetricsConfig = TestFlpMetricsConfig b := newBuilder("namespace", flowsv1alpha1.AgentIPFIX, &flp, nil, nil, "", true) - stages, parameters := b.buildPipelineConfig() + stages, parameters, err := b.buildPipelineConfig() + assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) jsonStages, _ := json.Marshal(stages) assert.Equal(`[{"name":"ipfix"},{"name":"enrich","follows":"ipfix"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages)) @@ -610,15 +628,16 @@ func TestMergeMetricsConfigurationNoIgnore(t *testing.T) { assert.Equal("network_service_count", parameters[5].Encode.Prom.Metrics[2].Name) assert.Equal("netobserv_", parameters[5].Encode.Prom.Prefix) } + func TestMergeMetricsConfigurationWithIgnore(t *testing.T) { assert := assert.New(t) flp := getFLPConfig() - FlpMetricsConfigDir = "test_metrics_definitions" flp.IgnoreMetrics = []string{"subnet"} b := newBuilder("namespace", flowsv1alpha1.AgentIPFIX, &flp, nil, nil, "", true) - stages, parameters := b.buildPipelineConfig() + stages, parameters, err := b.buildPipelineConfig() + assert.NoError(err) assert.True(validatePipelineConfig(stages, parameters)) jsonStages, _ := json.Marshal(stages) assert.Equal(`[{"name":"ipfix"},{"name":"enrich","follows":"ipfix"},{"name":"loki","follows":"enrich"},{"name":"stdout","follows":"enrich"},{"name":"aggregate","follows":"enrich"},{"name":"prometheus","follows":"aggregate"}]`, string(jsonStages)) diff --git a/controllers/flowlogspipeline/test_metrics_definitions/bandwidth_per_network_service_per_namespace.yaml b/controllers/flowlogspipeline/test_metrics_definitions/bandwidth_per_network_service_per_namespace.yaml deleted file mode 100644 index c50eb6d26..000000000 --- a/controllers/flowlogspipeline/test_metrics_definitions/bandwidth_per_network_service_per_namespace.yaml +++ /dev/null @@ -1,32 +0,0 @@ -#flp_confgen -description: - This metric observes the network bandwidth per network service per namespace -details: - Sum bytes for all traffic per network service and namespace -usage: - Evaluate network usage breakdown per network service and namespace -tags: - - bandwidth - - graph - - rate - - network-service - - kubernetes -extract: - aggregates: - - name: bandwidth_network_service_namespace - by: - - Service - - SrcK8S_Namespace - operation: sum - recordKey: Bytes -encode: - type: prom - prom: - metrics: - - name: bandwidth_per_network_service_per_namespace - type: counter - filter: {key: name, value: bandwidth_network_service_namespace} - valuekey: recent_op_value - labels: - - by - - aggregate diff --git a/controllers/flowlogspipeline/test_metrics_definitions/bandwidth_per_src_subnet.yaml b/controllers/flowlogspipeline/test_metrics_definitions/bandwidth_per_src_subnet.yaml deleted file mode 100644 index 829fa1f8b..000000000 --- a/controllers/flowlogspipeline/test_metrics_definitions/bandwidth_per_src_subnet.yaml +++ /dev/null @@ -1,30 +0,0 @@ -#flp_confgen -description: - This metric observes the network bandwidth per source subnet -details: - Sum bytes for all traffic per source subnet -usage: - Evaluate network usage breakdown per source subnet -tags: - - bandwidth - - graph - - rate - - subnet -extract: - aggregates: - - name: bandwidth_source_subnet - by: - - SrcSubnet - operation: sum - recordKey: Bytes -encode: - type: prom - prom: - metrics: - - name: bandwidth_per_source_subnet - type: counter - filter: {key: name, value: bandwidth_source_subnet} - valueKey: recent_op_value - labels: - - by - - aggregate diff --git a/controllers/flowlogspipeline/test_metrics_definitions/network_services_count.yaml b/controllers/flowlogspipeline/test_metrics_definitions/network_services_count.yaml deleted file mode 100644 index 06cdc2370..000000000 --- a/controllers/flowlogspipeline/test_metrics_definitions/network_services_count.yaml +++ /dev/null @@ -1,30 +0,0 @@ -#flp_confgen -description: - This metric observes network services rate (total) -details: - Counts the number of connections per network service based on destination port number and protocol -usage: - Evaluate network services -tags: - - rate - - network-services - - destination-port - - destination-protocol -extract: - type: aggregates - aggregates: - - name: dest_service_count - by: - - Service - operation: count -encode: - type: prom - prom: - metrics: - - name: network_service_count - type: counter - filter: { key: name, value: dest_service_count } - valuekey: recent_count - labels: - - by - - aggregate diff --git a/go.mod b/go.mod index 95f0526ef..3eeb1c398 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/mitchellh/mapstructure v1.4.3 - github.com/netobserv/flowlogs-pipeline v0.1.3-rc1 + github.com/netobserv/flowlogs-pipeline v0.1.3-rc1.0.20220729090746-bfbd89bd6d0d github.com/onsi/ginkgo/v2 v2.1.3 github.com/onsi/gomega v1.19.0 github.com/openshift/api v0.0.0-20220112145620-704957ce4980 diff --git a/go.sum b/go.sum index f0a7b722d..c3d9a127b 100644 --- a/go.sum +++ b/go.sum @@ -299,12 +299,15 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2 github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o= +github.com/go-kit/kit v0.12.0 h1:e4o3o3IsBfAKQh5Qbbiqyfu97Ku7jrO/JbohvztANh4= github.com/go-kit/kit v0.12.0/go.mod h1:lHd+EkCZPIwYItmGDDRdhinkzX2A1sj+M9biaEaizzs= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-kit/log v0.2.0 h1:7i2K3eKTos3Vc0enKCfnVcgHh2olr/MyfboYq7cAcFw= github.com/go-kit/log v0.2.0/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v0.2.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU= @@ -459,6 +462,7 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -576,6 +580,7 @@ github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= @@ -682,6 +687,7 @@ github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0U github.com/lyft/protoc-gen-star v0.5.3/go.mod h1:V0xaHgaf5oCCqmcxYcWiDfTiKsZsRc87/1qhoTACD8w= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls= github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -770,13 +776,10 @@ github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= -github.com/netobserv/flowlogs-pipeline v0.1.2-0.20220616154151-f71171409f0b h1:K5dFjnRzvhCelbvc6SkEkAH7mICBGaL8T3sBGYTxJxM= -github.com/netobserv/flowlogs-pipeline v0.1.2-0.20220616154151-f71171409f0b/go.mod h1:K6d9rkvz/peanr5oF0DL0E9vEdyIs1eb2aCRXSFM2Nk= -github.com/netobserv/flowlogs-pipeline v0.1.3-rc0.0.20220720151725-3dbc423808e6 h1:CSAsmWsG+/kSqG+Nmh6v4OXMzImC3GSJaBNzfSNATAM= -github.com/netobserv/flowlogs-pipeline v0.1.3-rc0.0.20220720151725-3dbc423808e6/go.mod h1:K6d9rkvz/peanr5oF0DL0E9vEdyIs1eb2aCRXSFM2Nk= -github.com/netobserv/flowlogs-pipeline v0.1.3-rc1 h1:MSeEqU+1yq92q6rcoiZGCFyw270Ua4adqvmV6pfQLK0= -github.com/netobserv/flowlogs-pipeline v0.1.3-rc1/go.mod h1:TiuGzBmOft4B9aumfHGTx7j2R3w06M/ZY0YU7pM9T68= +github.com/netobserv/flowlogs-pipeline v0.1.3-rc1.0.20220729090746-bfbd89bd6d0d h1:fiRQ6WPC7CzHcJEg7COwi7C1wJBx/DYEu+KhyJnpblE= +github.com/netobserv/flowlogs-pipeline v0.1.3-rc1.0.20220729090746-bfbd89bd6d0d/go.mod h1:TiuGzBmOft4B9aumfHGTx7j2R3w06M/ZY0YU7pM9T68= github.com/netobserv/gopipes v0.1.1/go.mod h1:eGoHZW1ON8Dx/zmDXUhsbVNqatPjtpdO0UZBmGZGmVI= +github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9 h1:c2swm3EamzgjBq9idNbEs5bNz20FJo/HK6uxyigXekQ= github.com/netobserv/loki-client-go v0.0.0-20211018150932-cb17208397a9/go.mod h1:LHXpc5tjKvsfZn0pwLKrvlgEhZcCaw3Di9mUEZGAI4E= github.com/netobserv/netobserv-ebpf-agent v0.1.1-0.20220608092850-3fd4695b7cc2/go.mod h1:996FEHp8Xj+AKCkiN4eH3dl/yF2DzuYM0kchWZOrapM= github.com/netobserv/prometheus-common v0.31.2-0.20220720134304-43e74fd22881 h1:hx5bi6xBovRjmwUoVJBzhJ3EDo4K4ZUsqqKrJuQ2vMI= @@ -840,6 +843,7 @@ github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtP github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= +github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM= github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/performancecopilot/speed/v4 v4.0.0/go.mod h1:qxrSyuDGrTOWfV+uKRFhfxw6h/4HXRGUiZiufxo49BM= @@ -911,6 +915,7 @@ github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/prometheus/prometheus v1.8.2-0.20201028100903-3245b3267b24 h1:V/4Cj2GytqdqK7OMEz6c4LNjey3SNyfw3pg5jPKtJvQ= github.com/prometheus/prometheus v1.8.2-0.20201028100903-3245b3267b24/go.mod h1:MDRkz271loM/PrYN+wUNEaTMDGSP760MQzB0yEjdgSQ= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= @@ -960,9 +965,11 @@ github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasO github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/afero v1.3.3/go.mod h1:5KUK8ByomD5Ti5Artl0RtHeI5pTF7MIDuXL3yY520V4= +github.com/spf13/afero v1.6.0 h1:xoax2sJ2DT8S8xA2paPFjDCScCNeWsg75VG0DLRreiY= github.com/spf13/afero v1.6.0/go.mod h1:Ai8FlHk4v/PARR026UzYexafAt9roJ7LcLMAmO6Z93I= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cast v1.4.1 h1:s0hze+J0196ZfEMTs80N7UlFt0BDuQ7Q+JDnHiMWKdA= github.com/spf13/cast v1.4.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= github.com/spf13/cobra v1.1.1/go.mod h1:WnodtKOvamDL/PwE2M4iKs8aMDBZ5Q5klgD3qfVJQMI= @@ -970,6 +977,7 @@ github.com/spf13/cobra v1.1.3/go.mod h1:pGADOWyqRD/YMrPZigI/zbliZ2wVD/23d+is3pSW github.com/spf13/cobra v1.2.1/go.mod h1:ExllRjgxM/piMAM+3tAZvg8fsklGAf3tPfi+i8t68Nk= github.com/spf13/cobra v1.3.0/go.mod h1:BrRVncBjOJa/eUcVVm9CE+oC6as8k+VYr4NY7WCi9V4= github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= @@ -979,6 +987,7 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An github.com/spf13/viper v1.7.0/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= github.com/spf13/viper v1.10.0/go.mod h1:SoyBPwAtKDzypXNDFKN5kzH7ppppbGZtls1UpIy5AsM= +github.com/spf13/viper v1.10.1 h1:nuJZuYpG7gTj/XqiUwg8bA0cp1+M2mC3J4g5luUYBKk= github.com/spf13/viper v1.10.1/go.mod h1:IGlFPqhNAPKRxohIzWpI5QEy4kuI7tcl5WvR+8qy1rU= github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= @@ -999,6 +1008,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= @@ -1015,6 +1025,7 @@ github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/vladimirvivien/gexe v0.1.1/go.mod h1:LHQL00w/7gDUKIak24n801ABp8C+ni6eBht9vGVst8w= +github.com/vmware/go-ipfix v0.5.12 h1:mqQknlvnvDY25apPNy9c27ri3FMDFIhzvO68Kk5Qp58= github.com/vmware/go-ipfix v0.5.12/go.mod h1:yzbG1rv+yJ8GeMrRm+MDhOV3akygNZUHLhC1pDoD2AY= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= @@ -1613,6 +1624,7 @@ google.golang.org/genproto v0.0.0-20211129164237-f09f9a12af12/go.mod h1:5CzLGKJ6 google.golang.org/genproto v0.0.0-20211203200212-54befc351ae9/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211206160659-862468c7d6e0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac h1:qSNTkEN+L2mvWcLgJOR+8bdHX9rN/IdU3A1Ghpfb1Rg= google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1648,6 +1660,7 @@ google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9K google.golang.org/grpc v1.40.1/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= google.golang.org/grpc v1.43.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M= google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= @@ -1683,6 +1696,7 @@ gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.66.2 h1:XfR1dOYubytKy4Shzc2LHrrGhU0lDCfDGG1yLPmpgsI= gopkg.in/ini.v1 v1.66.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/api.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/api.go index 3ffbc39bf..4d83991a4 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/api.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/api.go @@ -23,6 +23,7 @@ const ( FileChunksType = "file_chunks" CollectorType = "collector" GRPCType = "grpc" + FakeType = "fake" KafkaType = "kafka" StdoutType = "stdout" LokiType = "loki" @@ -31,6 +32,7 @@ const ( GenericType = "generic" NetworkType = "network" FilterType = "filter" + ConnTrackType = "conntrack" NoneType = "none" ConnTrackingRuleType = "conn_tracking" AddRegExIfRuleType = "add_regex_if" @@ -48,15 +50,16 @@ const ( // Note: items beginning with doc: "## title" are top level items that get divided into sections inside api.md. type API struct { - PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"` - KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"` - IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"` - IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` - IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"` - TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"` - TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"` - TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"` - WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"` - WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"` - ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"` + PromEncode PromEncode `yaml:"prom" doc:"## Prometheus encode API\nFollowing is the supported API format for prometheus encode:\n"` + KafkaEncode EncodeKafka `yaml:"kafka" doc:"## Kafka encode API\nFollowing is the supported API format for kafka encode:\n"` + IngestCollector IngestCollector `yaml:"collector" doc:"## Ingest collector API\nFollowing is the supported API format for the NetFlow / IPFIX collector:\n"` + IngestKafka IngestKafka `yaml:"kafka" doc:"## Ingest Kafka API\nFollowing is the supported API format for the kafka ingest:\n"` + IngestGRPCProto IngestGRPCProto `yaml:"grpc" doc:"## Ingest GRPC from Network Observability eBPF Agent\nFollowing is the supported API format for the Network Observability eBPF ingest:\n"` + TransformGeneric TransformGeneric `yaml:"generic" doc:"## Transform Generic API\nFollowing is the supported API format for generic transformations:\n"` + TransformFilter TransformFilter `yaml:"filter" doc:"## Transform Filter API\nFollowing is the supported API format for filter transformations:\n"` + TransformNetwork TransformNetwork `yaml:"network" doc:"## Transform Network API\nFollowing is the supported API format for network transformations:\n"` + WriteLoki WriteLoki `yaml:"loki" doc:"## Write Loki API\nFollowing is the supported API format for writing to loki:\n"` + WriteStdout WriteStdout `yaml:"stdout" doc:"## Write Standard Output\nFollowing is the supported API format for writing to standard output:\n"` + ExtractAggregate AggregateDefinition `yaml:"aggregates" doc:"## Aggregate metrics API\nFollowing is the supported API format for specifying metrics aggregations:\n"` + ConnectionTracking ConnTrack `yaml:"conntrack" doc:"## Connection tracking API\nFollowing is the supported API format for specifying connection tracking:\n"` } diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/conn_track.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/conn_track.go index a2d93df56..e7326b594 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/conn_track.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/conn_track.go @@ -17,8 +17,6 @@ package api -import "time" - const ( HashIdFieldName = "_HashId" RecordTypeFieldName = "_RecordType" @@ -29,7 +27,7 @@ type ConnTrack struct { KeyDefinition KeyDefinition `yaml:"keyDefinition,omitempty" doc:"fields that are used to identify the connection"` OutputRecordTypes []string `yaml:"outputRecordTypes,omitempty" enum:"ConnTrackOutputRecordTypeEnum" doc:"output record types to emit"` OutputFields []OutputField `yaml:"outputFields,omitempty" doc:"list of output fields"` - EndConnectionTimeout time.Duration `yaml:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"` + EndConnectionTimeout Duration `yaml:"endConnectionTimeout,omitempty" doc:"duration of time to wait from the last flow log to end a connection"` } type ConnTrackOutputRecordTypeEnum struct { diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/utils.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/utils.go new file mode 100644 index 000000000..292609f4d --- /dev/null +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/utils.go @@ -0,0 +1,55 @@ +package api + +import ( + "encoding/json" + "fmt" + "time" +) + +// Duration is a wrapper of time.Duration that allows json marshaling. +// https://stackoverflow.com/a/48051946/2749989 +type Duration struct { + time.Duration +} + +func (d Duration) MarshalJSON() ([]byte, error) { + return json.Marshal(d.String()) +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + d.Duration = time.Duration(value) + return nil + case string: + var err error + d.Duration, err = time.ParseDuration(value) + if err != nil { + return err + } + return nil + default: + return fmt.Errorf("invalid duration %v", value) + } +} + +func (d Duration) MarshalYAML() (interface{}, error) { + return d.String(), nil +} + +func (d *Duration) UnmarshalYAML(unmarshal func(interface{}) error) error { + var durationStr string + err := unmarshal(&durationStr) + if err != nil { + return err + } + d.Duration, err = time.ParseDuration(durationStr) + if err != nil { + return err + } + return nil +} diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/write_loki.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/write_loki.go index 1cf4ddddf..2f5de6992 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/write_loki.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/api/write_loki.go @@ -46,18 +46,30 @@ type WriteLoki struct { TimestampScale string `yaml:"timestampScale,omitempty" json:"timestampScale,omitempty" doc:"timestamp units scale (e.g. for UNIX = 1s)"` } -func GetWriteLokiDefaults() WriteLoki { - return WriteLoki{ - URL: "http://loki:3100/", - BatchWait: "1s", - BatchSize: 100 * 1024, - Timeout: "10s", - MinBackoff: "1s", - MaxBackoff: "5m", - MaxRetries: 10, - StaticLabels: model.LabelSet{}, - TimestampLabel: "TimeReceived", - TimestampScale: "1s", +func (w *WriteLoki) SetDefaults() { + if w.BatchWait == "" { + w.BatchWait = "1s" + } + if w.BatchSize == 0 { + w.BatchSize = 100 * 1024 + } + if w.Timeout == "" { + w.Timeout = "10s" + } + if w.MinBackoff == "" { + w.MinBackoff = "1s" + } + if w.MaxBackoff == "" { + w.MaxBackoff = "1s" + } + if w.MaxRetries == 0 { + w.MaxRetries = 10 + } + if w.TimestampLabel == "" { + w.TimestampLabel = "TimeReceived" + } + if w.TimestampScale == "" { + w.TimestampScale = "1s" } } diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/confgen.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/confgen.go index 7ee0d3dde..80eb60e89 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/confgen.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/confgen.go @@ -29,7 +29,7 @@ import ( "gopkg.in/yaml.v2" ) -var ( +const ( definitionExt = ".yaml" definitionHeader = "#flp_confgen" configFileName = "config.yaml" @@ -50,6 +50,7 @@ type Definition struct { type Definitions []Definition type ConfGen struct { + opts *Options config *Config transformRules api.NetworkTransformRules aggregateDefinitions aggregate.Definitions @@ -71,26 +72,31 @@ type DefFile struct { func (cg *ConfGen) Run() error { var err error - cg.config, err = cg.ParseConfigFile(Opt.SrcFolder + "/" + configFileName) + cg.config, err = cg.ParseConfigFile(cg.opts.SrcFolder + "/" + configFileName) if err != nil { log.Debugf("cg.ParseConfigFile err: %v ", err) return err } - definitionFiles := cg.GetDefinitionFiles(Opt.SrcFolder) + definitionFiles := getDefinitionFiles(cg.opts.SrcFolder) for _, definitionFile := range definitionFiles { - err := cg.parseFile(definitionFile) + b, err := ioutil.ReadFile(definitionFile) if err != nil { - log.Debugf("cg.parseFile err: %v ", err) + log.Debugf("ioutil.ReadFile err: %v ", err) + continue + } + err = cg.ParseDefinition(definitionFile, b) + if err != nil { + log.Debugf("cg.parseDefinition err: %v ", err) continue } } - cg.Dedupe() + cg.dedupe() - if len(Opt.GenerateStages) != 0 { - config := cg.GenerateTruncatedConfig(Opt.GenerateStages) - err = cg.writeConfigFile(Opt.DestConfFile, config) + if len(cg.opts.GenerateStages) != 0 { + cfg := cg.GenerateTruncatedConfig() + err = cg.writeConfigFile(cg.opts.DestConfFile, cfg) if err != nil { log.Debugf("cg.GenerateTruncatedConfig err: %v ", err) return err @@ -98,20 +104,20 @@ func (cg *ConfGen) Run() error { return nil } else { config := cg.GenerateFlowlogs2PipelineConfig() - err = cg.writeConfigFile(Opt.DestConfFile, config) + err = cg.writeConfigFile(cg.opts.DestConfFile, config) if err != nil { log.Debugf("cg.GenerateFlowlogs2PipelineConfig err: %v ", err) return err } } - err = cg.generateDoc(Opt.DestDocFile) + err = cg.generateDoc(cg.opts.DestDocFile) if err != nil { log.Debugf("cg.generateDoc err: %v ", err) return err } - err = cg.generateGrafanaJsonnet(Opt.DestGrafanaJsonnetFolder) + err = cg.generateGrafanaJsonnet(cg.opts.DestGrafanaJsonnetFolder) if err != nil { log.Debugf("cg.generateGrafanaJsonnet err: %v ", err) return err @@ -120,62 +126,44 @@ func (cg *ConfGen) Run() error { return nil } -func (cg *ConfGen) checkHeader(fileName string) error { - // check header - f, err := os.OpenFile(fileName, os.O_RDONLY, 0644) - if err != nil { - log.Debugf("os.OpenFile error: %v ", err) - return err - } +func checkHeader(bytes []byte) error { header := make([]byte, len(definitionHeader)) - _, err = f.Read(header) - if err != nil || string(header) != definitionHeader { - log.Debugf("Wrong header file: %s ", fileName) + copy(header, bytes) + if string(header) != definitionHeader { return fmt.Errorf("wrong header") } - err = f.Close() - if err != nil { - log.Debugf("f.Close err: %v ", err) - return err - } - return nil } -func (cg *ConfGen) parseFile(fileName string) error { - +func (cg *ConfGen) ParseDefinition(name string, bytes []byte) error { // check header - err := cg.checkHeader(fileName) + err := checkHeader(bytes) if err != nil { - log.Debugf("cg.checkHeader err: %v ", err) + log.Debugf("%s cg.checkHeader err: %v ", name, err) return err } // parse yaml var defFile DefFile - yamlFile, err := ioutil.ReadFile(fileName) + err = yaml.Unmarshal(bytes, &defFile) if err != nil { - log.Debugf("ioutil.ReadFile err: %v ", err) - return err - } - err = yaml.Unmarshal(yamlFile, &defFile) - if err != nil { - log.Debugf("yaml.Unmarshal err: %v ", err) + log.Debugf("%s yaml.Unmarshal err: %v ", name, err) return err } //skip if their skip tag match - for _, skipTag := range Opt.SkipWithTags { + for _, skipTag := range cg.opts.SkipWithTags { for _, tag := range defFile.Tags { if skipTag == tag { - return fmt.Errorf("skipping definition %s due to skip tag %s", fileName, tag) + log.Infof("skipping definition %s due to skip tag %s", name, tag) + return nil } } } // parse definition definition := Definition{ - FileName: fileName, + FileName: name, Description: defFile.Description, Details: defFile.Details, Usage: defFile.Usage, @@ -215,7 +203,7 @@ func (cg *ConfGen) parseFile(fileName string) error { return nil } -func (*ConfGen) GetDefinitionFiles(rootPath string) []string { +func getDefinitionFiles(rootPath string) []string { var files []string @@ -235,11 +223,12 @@ func (*ConfGen) GetDefinitionFiles(rootPath string) []string { return files } -func NewConfGen() (*ConfGen, error) { +func NewConfGen(opts *Options) *ConfGen { return &ConfGen{ + opts: opts, transformRules: api.NetworkTransformRules{}, aggregateDefinitions: aggregate.Definitions{}, definitions: Definitions{}, visualizations: Visualizations{}, - }, nil + } } diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/config.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/config.go index a4743e279..5d9d012c4 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/config.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/config.go @@ -21,7 +21,7 @@ import ( "io/ioutil" "os" - "github.com/netobserv/flowlogs-pipeline/pkg/api" + "github.com/netobserv/flowlogs-pipeline/pkg/config" "github.com/pkg/errors" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" @@ -36,37 +36,16 @@ type Options struct { GenerateStages []string } -var ( - Opt = Options{} -) - -type ConfigIngest struct { - Collector api.IngestCollector `yaml:"collector"` -} - -type ConfigTransform struct { - Generic api.TransformGeneric `yaml:"generic"` -} - -type ConfigEncode struct { - Prom api.PromEncode `yaml:"prom"` -} - -type ConfigWrite struct { - Loki api.WriteLoki `yaml:"loki"` - Type string `yaml:"type"` -} - type ConfigVisualization struct { Grafana ConfigVisualizationGrafana `yaml:"grafana"` } type Config struct { Description string `yaml:"description"` - Ingest ConfigIngest `yaml:"ingest"` - Transform ConfigTransform `yaml:"transform"` - Write ConfigWrite `yaml:"write"` - Encode ConfigEncode `yaml:"encode"` + Ingest config.Ingest `yaml:"ingest"` + Transform config.Transform `yaml:"transform"` + Write config.Write `yaml:"write"` + Encode config.Encode `yaml:"encode"` Visualization ConfigVisualization `yaml:"visualization"` } @@ -75,7 +54,7 @@ func (cg *ConfGen) ParseConfigFile(fileName string) (*Config, error) { // provide a minimal config for when config file is missing (as for Netobserv Openshift Operator) var config Config if _, err := os.Stat(fileName); errors.Is(err, os.ErrNotExist) { - if len(Opt.GenerateStages) == 0 { + if len(cg.opts.GenerateStages) == 0 { log.Errorf("config file %s does not exist", fileName) return nil, err } diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/dedup.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/dedup.go index 0255468eb..e09d19953 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/dedup.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/dedup.go @@ -25,7 +25,7 @@ import ( log "github.com/sirupsen/logrus" ) -func (cg *ConfGen) Dedupe() { +func (cg *ConfGen) dedupe() { cg.transformRules = dedupeNetworkTransformRules(cg.transformRules) cg.aggregateDefinitions = dedupeAggregateDefinitions(cg.aggregateDefinitions) } diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/doc.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/doc.go index a3b03fd51..dd4366e93 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/doc.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/doc.go @@ -109,7 +109,7 @@ and the transformation to generate the exported metric. - `, Opt.SrcFolder) + `, cg.opts.SrcFolder) data := fmt.Sprintf("%s\n%s\n", header, doc) err := ioutil.WriteFile(fileName, []byte(data), 0664) if err != nil { diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/flowlogs2metrics_config.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/flowlogs2metrics_config.go index 4f9fe3f1a..6501fb6ad 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/flowlogs2metrics_config.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/flowlogs2metrics_config.go @@ -22,163 +22,70 @@ import ( "io/ioutil" "github.com/netobserv/flowlogs-pipeline/pkg/api" - config "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/config" log "github.com/sirupsen/logrus" "gopkg.in/yaml.v2" ) -func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() config.ConfigFileStruct { - configStruct := config.ConfigFileStruct{ - LogLevel: "error", - Pipeline: []config.Stage{ - {Name: "ingest_collector"}, - {Name: "transform_generic", - Follows: "ingest_collector", - }, - {Name: "transform_network", - Follows: "transform_generic", - }, - {Name: "extract_aggregate", - Follows: "transform_network", - }, - {Name: "encode_prom", - Follows: "extract_aggregate", - }, - {Name: "write_loki", - Follows: "transform_network", - }, - }, - Parameters: []config.StageParam{ - {Name: "ingest_collector", - Ingest: &config.Ingest{ - Type: "collector", - Collector: &api.IngestCollector{ - Port: cg.config.Ingest.Collector.Port, - PortLegacy: cg.config.Ingest.Collector.PortLegacy, - HostName: cg.config.Ingest.Collector.HostName, - }, - }, - }, - {Name: "transform_generic", - Transform: &config.Transform{ - Type: "generic", - Generic: &api.TransformGeneric{ - Policy: "replace_keys", - Rules: cg.config.Transform.Generic.Rules, - }, - }, - }, - {Name: "transform_network", - Transform: &config.Transform{ - Type: "network", - Network: &api.TransformNetwork{ - Rules: cg.transformRules, - }, - }, - }, - {Name: "extract_aggregate", - Extract: &config.Extract{ - Type: "aggregates", - Aggregates: cg.aggregateDefinitions, - }, - }, - {Name: "encode_prom", - Encode: &config.Encode{ - Type: "prom", - Prom: &api.PromEncode{ - Port: cg.config.Encode.Prom.Port, - Prefix: cg.config.Encode.Prom.Prefix, - Metrics: cg.promMetrics, - }, - }, - }, - {Name: "write_loki", - Write: &config.Write{ - Type: cg.config.Write.Type, - Loki: &cg.config.Write.Loki, - }, - }, - }, +func (cg *ConfGen) GenerateFlowlogs2PipelineConfig() *config.ConfigFileStruct { + pipeline, _ := config.NewPipeline("ingest_collector", &cg.config.Ingest) + next := pipeline + if cg.config.Transform.Generic != nil { + gen := *cg.config.Transform.Generic + if len(gen.Policy) == 0 { + gen.Policy = "replace_keys" + } + next = next.TransformGeneric("transform_generic", gen) + } + if len(cg.transformRules) > 0 { + next = next.TransformNetwork("transform_network", api.TransformNetwork{ + Rules: cg.transformRules, + }) + } + if len(cg.aggregateDefinitions) > 0 { + agg := next.Aggregate("extract_aggregate", cg.aggregateDefinitions) + agg.EncodePrometheus("encode_prom", api.PromEncode{ + Port: cg.config.Encode.Prom.Port, + Prefix: cg.config.Encode.Prom.Prefix, + Metrics: cg.promMetrics, + }) + } + if cg.config.Write.Loki != nil { + next.WriteLoki("write_loki", *cg.config.Write.Loki) + } + return &config.ConfigFileStruct{ + LogLevel: "error", + Pipeline: pipeline.GetStages(), + Parameters: pipeline.GetStageParams(), } - return configStruct } -func (cg *ConfGen) GenerateTruncatedConfig(stages []string) config.ConfigFileStruct { - parameters := make([]config.StageParam, len(stages)) - for i, stage := range stages { +func (cg *ConfGen) GenerateTruncatedConfig() []config.StageParam { + parameters := make([]config.StageParam, len(cg.opts.GenerateStages)) + for i, stage := range cg.opts.GenerateStages { switch stage { case "ingest": - parameters[i] = config.StageParam{ - Name: "ingest_collector", - Ingest: &config.Ingest{ - Type: "collector", - Collector: &api.IngestCollector{ - Port: cg.config.Ingest.Collector.Port, - PortLegacy: cg.config.Ingest.Collector.PortLegacy, - HostName: cg.config.Ingest.Collector.HostName, - }, - }, - } + parameters[i] = config.NewCollectorParams("ingest_collector", *cg.config.Ingest.Collector) case "transform_generic": - parameters[i] = config.StageParam{ - Name: "transform_generic", - Transform: &config.Transform{ - Type: "generic", - Generic: &api.TransformGeneric{ - Policy: "replace_keys", - Rules: cg.config.Transform.Generic.Rules, - }, - }, - } + parameters[i] = config.NewTransformGenericParams("transform_generic", *cg.config.Transform.Generic) case "transform_network": - parameters[i] = config.StageParam{ - Name: "transform_network", - Transform: &config.Transform{ - Type: "network", - Network: &api.TransformNetwork{ - Rules: cg.transformRules, - }, - }, - } + parameters[i] = config.NewTransformNetworkParams("transform_network", *cg.config.Transform.Network) case "extract_aggregate": - parameters[i] = config.StageParam{ - Name: "extract_aggregate", - Extract: &config.Extract{ - Type: "aggregates", - Aggregates: cg.aggregateDefinitions, - }, - } + parameters[i] = config.NewAggregateParams("extract_aggregate", cg.aggregateDefinitions) case "encode_prom": - parameters[i] = config.StageParam{ - Name: "encode_prom", - Encode: &config.Encode{ - Type: "prom", - Prom: &api.PromEncode{ - Port: cg.config.Encode.Prom.Port, - Prefix: cg.config.Encode.Prom.Prefix, - Metrics: cg.promMetrics, - }, - }, - } + parameters[i] = config.NewEncodePrometheusParams("encode_prom", api.PromEncode{ + Metrics: cg.promMetrics, + }) case "write_loki": - parameters[i] = config.StageParam{ - Name: "write_loki", - Write: &config.Write{ - Type: cg.config.Write.Type, - Loki: &cg.config.Write.Loki, - }, - } + parameters[i] = config.NewWriteLokiParams("write_loki", *cg.config.Write.Loki) } } log.Debugf("parameters = %v \n", parameters) - configStruct := config.ConfigFileStruct{ - Parameters: parameters, - } - return configStruct + return parameters } -func (cg *ConfGen) writeConfigFile(fileName string, config config.ConfigFileStruct) error { - configData, err := yaml.Marshal(&config) +func (cg *ConfGen) writeConfigFile(fileName string, cfg interface{}) error { + configData, err := yaml.Marshal(cfg) if err != nil { return err } diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/grafana_jsonnet.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/grafana_jsonnet.go index d7d4ef071..5871422f6 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/grafana_jsonnet.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/confgen/grafana_jsonnet.go @@ -20,6 +20,7 @@ package confgen import ( "bytes" "os" + "path/filepath" "text/template" log "github.com/sirupsen/logrus" @@ -165,7 +166,6 @@ type Dashboard struct { } func (cg *ConfGen) generateGrafanaJsonnet(folderName string) error { - // generate dashboards dashboards, err := cg.generateGrafanaJsonnetDashboards() if err != nil { @@ -180,13 +180,19 @@ func (cg *ConfGen) generateGrafanaJsonnet(folderName string) error { return err } + err = os.MkdirAll(folderName, 0755) + if err != nil { + log.Debugf("os.MkdirAll err: %v ", err) + return err + } + // write to destination files for _, dashboard := range dashboards { output := []byte(jsonNetHeaderTemplate) output = append(output, dashboard.Header...) output = append(output, dashboard.Panels...) - fileName := folderName + "dashboard_" + dashboard.Name + ".jsonnet" + fileName := filepath.Join(folderName, "dashboard_"+dashboard.Name+".jsonnet") err = os.WriteFile(fileName, output, 0644) if err != nil { log.Debugf("os.WriteFile to file %s err: %v ", fileName, err) diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/config.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/config.go index 42eb1dfd8..85f5ad12e 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/config.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/config.go @@ -24,28 +24,22 @@ import ( "github.com/sirupsen/logrus" ) -var ( - Opt = Options{} - PipeLine []Stage - Parameters []StageParam -) - type Options struct { PipeLine string Parameters string Health Health } -type Health struct { - Port string -} - type ConfigFileStruct struct { LogLevel string `yaml:"log-level,omitempty" json:"log-level,omitempty"` Pipeline []Stage `yaml:"pipeline,omitempty" json:"pipeline,omitempty"` Parameters []StageParam `yaml:"parameters,omitempty" json:"parameters,omitempty"` } +type Health struct { + Port string +} + type Stage struct { Name string `yaml:"name" json:"name"` Follows string `yaml:"follows,omitempty" json:"follows,omitempty"` @@ -85,6 +79,7 @@ type Transform struct { type Extract struct { Type string `yaml:"type" json:"type"` Aggregates []api.AggregateDefinition `yaml:"aggregates,omitempty" json:"aggregates,omitempty"` + ConnTrack *api.ConnTrack `yaml:"conntrack,omitempty" json:"conntrack,omitempty"` } type Encode struct { @@ -100,20 +95,22 @@ type Write struct { } // ParseConfig creates the internal unmarshalled representation from the Pipeline and Parameters json -func ParseConfig() error { - logrus.Debugf("config.Opt.PipeLine = %v ", Opt.PipeLine) - err := json.Unmarshal([]byte(Opt.PipeLine), &PipeLine) +func ParseConfig(opts Options) (ConfigFileStruct, error) { + out := ConfigFileStruct{} + + logrus.Debugf("opts.PipeLine = %v ", opts.PipeLine) + err := json.Unmarshal([]byte(opts.PipeLine), &out.Pipeline) if err != nil { logrus.Errorf("error when reading config file: %v", err) - return err + return out, err } - logrus.Debugf("stages = %v ", PipeLine) + logrus.Debugf("stages = %v ", out.Pipeline) - err = json.Unmarshal([]byte(Opt.Parameters), &Parameters) + err = json.Unmarshal([]byte(opts.Parameters), &out.Parameters) if err != nil { logrus.Errorf("error when reading config file: %v", err) - return err + return out, err } - logrus.Debugf("params = %v ", Parameters) - return nil + logrus.Debugf("params = %v ", out.Parameters) + return out, nil } diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/pipeline_builder.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/pipeline_builder.go index eebb8e97d..c39707136 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/pipeline_builder.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/pipeline_builder.go @@ -18,6 +18,8 @@ package config import ( + "errors" + "github.com/netobserv/flowlogs-pipeline/pkg/api" ) @@ -45,11 +47,25 @@ type PipelineBuilderStage struct { pipeline *pipeline } +// NewPipeline creates a new pipeline from an existing ingest +func NewPipeline(name string, ingest *Ingest) (PipelineBuilderStage, error) { + if ingest.Collector != nil { + return NewCollectorPipeline(name, *ingest.Collector), nil + } + if ingest.GRPC != nil { + return NewGRPCPipeline(name, *ingest.GRPC), nil + } + if ingest.Kafka != nil { + return NewKafkaPipeline(name, *ingest.Kafka), nil + } + return PipelineBuilderStage{}, errors.New("Missing ingest params") +} + // NewCollectorPipeline creates a new pipeline from an `IngestCollector` initial stage (listening for NetFlows / IPFIX) func NewCollectorPipeline(name string, ingest api.IngestCollector) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{{Name: name, Ingest: &Ingest{Type: api.CollectorType, Collector: &ingest}}}, + config: []StageParam{NewCollectorParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } @@ -58,7 +74,7 @@ func NewCollectorPipeline(name string, ingest api.IngestCollector) PipelineBuild func NewGRPCPipeline(name string, ingest api.IngestGRPCProto) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{{Name: name, Ingest: &Ingest{Type: api.GRPCType, GRPC: &ingest}}}, + config: []StageParam{NewGRPCParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } @@ -67,7 +83,7 @@ func NewGRPCPipeline(name string, ingest api.IngestGRPCProto) PipelineBuilderSta func NewKafkaPipeline(name string, ingest api.IngestKafka) PipelineBuilderStage { p := pipeline{ stages: []Stage{{Name: name}}, - config: []StageParam{{Name: name, Ingest: &Ingest{Type: api.KafkaType, Kafka: &ingest}}}, + config: []StageParam{NewKafkaParams(name, ingest)}, } return PipelineBuilderStage{pipeline: &p, lastStage: name} } @@ -80,42 +96,47 @@ func (b *PipelineBuilderStage) next(name string, param StageParam) PipelineBuild // Aggregate chains the current stage with an aggregate stage and returns that new stage func (b *PipelineBuilderStage) Aggregate(name string, aggs []api.AggregateDefinition) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Extract: &Extract{Type: api.AggregateType, Aggregates: aggs}}) + return b.next(name, NewAggregateParams(name, aggs)) } // TransformGeneric chains the current stage with a TransformGeneric stage and returns that new stage func (b *PipelineBuilderStage) TransformGeneric(name string, gen api.TransformGeneric) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Transform: &Transform{Type: api.GenericType, Generic: &gen}}) + return b.next(name, NewTransformGenericParams(name, gen)) } // TransformFilter chains the current stage with a TransformFilter stage and returns that new stage func (b *PipelineBuilderStage) TransformFilter(name string, filter api.TransformFilter) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Transform: &Transform{Type: api.FilterType, Filter: &filter}}) + return b.next(name, NewTransformFilterParams(name, filter)) } // TransformNetwork chains the current stage with a TransformNetwork stage and returns that new stage func (b *PipelineBuilderStage) TransformNetwork(name string, nw api.TransformNetwork) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}}) + return b.next(name, NewTransformNetworkParams(name, nw)) +} + +// ConnTrack chains the current stage with a ConnTrack stage and returns that new stage +func (b *PipelineBuilderStage) ConnTrack(name string, ct api.ConnTrack) PipelineBuilderStage { + return b.next(name, NewConnTrackParams(name, ct)) } // EncodePrometheus chains the current stage with a PromEncode stage (to expose metrics in Prometheus format) and returns that new stage func (b *PipelineBuilderStage) EncodePrometheus(name string, prom api.PromEncode) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Encode: &Encode{Type: api.PromType, Prom: &prom}}) + return b.next(name, NewEncodePrometheusParams(name, prom)) } // EncodeKafka chains the current stage with an EncodeKafka stage (writing to a Kafka topic) and returns that new stage func (b *PipelineBuilderStage) EncodeKafka(name string, kafka api.EncodeKafka) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Encode: &Encode{Type: api.KafkaType, Kafka: &kafka}}) + return b.next(name, NewEncodeKafkaParams(name, kafka)) } // WriteStdout chains the current stage with a WriteStdout stage and returns that new stage func (b *PipelineBuilderStage) WriteStdout(name string, stdout api.WriteStdout) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Write: &Write{Type: api.StdoutType, Stdout: &stdout}}) + return b.next(name, NewWriteStdoutParams(name, stdout)) } // WriteLoki chains the current stage with a WriteLoki stage and returns that new stage func (b *PipelineBuilderStage) WriteLoki(name string, loki api.WriteLoki) PipelineBuilderStage { - return b.next(name, StageParam{Name: name, Write: &Write{Type: api.LokiType, Loki: &loki}}) + return b.next(name, NewWriteLokiParams(name, loki)) } // GetStages returns the current pipeline stages. It can be called from any of the stages, they share the same pipeline reference. diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/stage_params.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/stage_params.go new file mode 100644 index 000000000..99e0ea85b --- /dev/null +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/config/stage_params.go @@ -0,0 +1,70 @@ +/* + * Copyright (C) 2021 IBM, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package config + +import ( + "github.com/netobserv/flowlogs-pipeline/pkg/api" +) + +func NewCollectorParams(name string, ingest api.IngestCollector) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.CollectorType, Collector: &ingest}} +} + +func NewGRPCParams(name string, ingest api.IngestGRPCProto) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.GRPCType, GRPC: &ingest}} +} + +func NewKafkaParams(name string, ingest api.IngestKafka) StageParam { + return StageParam{Name: name, Ingest: &Ingest{Type: api.KafkaType, Kafka: &ingest}} +} + +func NewAggregateParams(name string, aggs []api.AggregateDefinition) StageParam { + return StageParam{Name: name, Extract: &Extract{Type: api.AggregateType, Aggregates: aggs}} +} + +func NewTransformGenericParams(name string, gen api.TransformGeneric) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.GenericType, Generic: &gen}} +} + +func NewTransformFilterParams(name string, filter api.TransformFilter) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.FilterType, Filter: &filter}} +} + +func NewTransformNetworkParams(name string, nw api.TransformNetwork) StageParam { + return StageParam{Name: name, Transform: &Transform{Type: api.NetworkType, Network: &nw}} +} + +func NewConnTrackParams(name string, ct api.ConnTrack) StageParam { + return StageParam{Name: name, Extract: &Extract{Type: api.ConnTrackType, ConnTrack: &ct}} +} + +func NewEncodePrometheusParams(name string, prom api.PromEncode) StageParam { + return StageParam{Name: name, Encode: &Encode{Type: api.PromType, Prom: &prom}} +} + +func NewEncodeKafkaParams(name string, kafka api.EncodeKafka) StageParam { + return StageParam{Name: name, Encode: &Encode{Type: api.KafkaType, Kafka: &kafka}} +} + +func NewWriteStdoutParams(name string, stdout api.WriteStdout) StageParam { + return StageParam{Name: name, Write: &Write{Type: api.StdoutType, Stdout: &stdout}} +} + +func NewWriteLokiParams(name string, loki api.WriteLoki) StageParam { + return StageParam{Name: name, Write: &Write{Type: api.LokiType, Loki: &loki}} +} diff --git a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils/params_parse.go b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils/params_parse.go index c056384f4..0406db8ce 100644 --- a/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils/params_parse.go +++ b/vendor/github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils/params_parse.go @@ -16,43 +16,3 @@ */ package utils - -import ( - "encoding/json" - - "github.com/netobserv/flowlogs-pipeline/pkg/config" - log "github.com/sirupsen/logrus" -) - -// ParamString returns its corresponding (json) string from config.parameters for specified params structure -func ParamString(params config.StageParam, stage string, stageType string) string { - log.Debugf("entering paramString") - log.Debugf("params = %v, stage = %s, stageType = %s", params, stage, stageType) - - var configMap []map[string]interface{} - var err error - err = json.Unmarshal([]byte(config.Opt.Parameters), &configMap) - if err != nil { - return "" - } - log.Debugf("configMap = %v", configMap) - - var returnBytes []byte - for index := range config.Parameters { - paramsEntry := &config.Parameters[index] - if params.Name == paramsEntry.Name { - log.Debugf("paramsEntry = %v", paramsEntry) - log.Debugf("data[index][stage] = %v", configMap[index][stage]) - // convert back to string - subField := configMap[index][stage].(map[string]interface{}) - log.Debugf("subField = %v", subField) - returnBytes, err = json.Marshal(subField[stageType]) - if err != nil { - return "" - } - break - } - } - log.Debugf("returnBytes = %s", string(returnBytes)) - return string(returnBytes) -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 0a99155e0..66cc4b9e2 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -94,7 +94,7 @@ github.com/modern-go/reflect2 github.com/munnerz/goautoneg # github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f github.com/mwitkow/go-conntrack -# github.com/netobserv/flowlogs-pipeline v0.1.3-rc1 +# github.com/netobserv/flowlogs-pipeline v0.1.3-rc1.0.20220729090746-bfbd89bd6d0d ## explicit github.com/netobserv/flowlogs-pipeline/pkg/api github.com/netobserv/flowlogs-pipeline/pkg/confgen