From 48267008699d8f8cb8cfd69bf73ba07ded80e6c2 Mon Sep 17 00:00:00 2001 From: Joel Takvorian Date: Thu, 22 Sep 2022 13:45:24 +0200 Subject: [PATCH] Fix crash on startup "failed to initialize pipeline opening "": open : no such file or directory" Previous PR removed the default file names for service mapping --- cmd/flowlogs-pipeline/main_test.go | 27 +++++++++++++++++++++ pkg/api/transform_network.go | 12 +++++++++ pkg/pipeline/transform/transform_network.go | 9 ++++--- 3 files changed, 44 insertions(+), 4 deletions(-) diff --git a/cmd/flowlogs-pipeline/main_test.go b/cmd/flowlogs-pipeline/main_test.go index b405c6a41..5eba1bb22 100644 --- a/cmd/flowlogs-pipeline/main_test.go +++ b/cmd/flowlogs-pipeline/main_test.go @@ -18,9 +18,14 @@ package main import ( + "encoding/json" "os" "os/exec" "testing" + + "github.com/netobserv/flowlogs-pipeline/pkg/config" + "github.com/netobserv/flowlogs-pipeline/pkg/pipeline" + "github.com/stretchr/testify/require" ) func TestTheMain(t *testing.T) { @@ -36,3 +41,25 @@ func TestTheMain(t *testing.T) { } t.Fatalf("process ran with err %v, want exit status 1", err) } + +func TestPipelineConfigSetup(t *testing.T) { + js := `{ + "PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]", + "Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstAddr\",\"output\":\"DstK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstPort\",\"output\":\"Service\",\"parameters\":\"Proto\",\"type\":\"add_service\"},{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"parameters\":\"/16\",\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"port\":9102,\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]", + "Health": { + "Port": "8080" + }, + "Profile": { + "Port": 0 + } +}` + var opts config.Options + err := json.Unmarshal([]byte(js), &opts) + require.NoError(t, err) + cfg, err := config.ParseConfig(opts) + require.NoError(t, err) + require.NotNil(t, cfg) + mainPipeline, err := pipeline.NewPipeline(&cfg) + require.NoError(t, err) + require.NotNil(t, mainPipeline) +} diff --git a/pkg/api/transform_network.go b/pkg/api/transform_network.go index b4422bf7e..0c97d8262 100644 --- a/pkg/api/transform_network.go +++ b/pkg/api/transform_network.go @@ -24,6 +24,18 @@ type TransformNetwork struct { ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"` } +func (tn *TransformNetwork) GetServiceFiles() (string, string) { + p := tn.ProtocolsFile + if p == "" { + p = "/etc/protocols" + } + s := tn.ServicesFile + if s == "" { + s = "/etc/services" + } + return p, s +} + type TransformNetworkOperationEnum struct { AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"` AddIf string `yaml:"add_if" json:"add_if" doc:"add output field if input field satisfies criteria from parameters field"` diff --git a/pkg/pipeline/transform/transform_network.go b/pkg/pipeline/transform/transform_network.go index 867daaeac..8751dc4d4 100644 --- a/pkg/pipeline/transform/transform_network.go +++ b/pkg/pipeline/transform/transform_network.go @@ -189,15 +189,16 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) { var servicesDB *netdb.ServiceNames if needToInitNetworkServices { + pFilename, sFilename := jsonNetworkTransform.GetServiceFiles() var err error - protos, err := os.Open(jsonNetworkTransform.ProtocolsFile) + protos, err := os.Open(pFilename) if err != nil { - return nil, fmt.Errorf("opening %q: %w", jsonNetworkTransform.ProtocolsFile, err) + return nil, fmt.Errorf("opening %q: %w", pFilename, err) } defer protos.Close() - services, err := os.Open(jsonNetworkTransform.ServicesFile) + services, err := os.Open(sFilename) if err != nil { - return nil, fmt.Errorf("opening %q: %w", jsonNetworkTransform.ServicesFile, err) + return nil, fmt.Errorf("opening %q: %w", sFilename, err) } defer services.Close() servicesDB, err = netdb.LoadServicesDB(protos, services)