Skip to content

Commit

Permalink
Fix crash on startup
Browse files Browse the repository at this point in the history
"failed to initialize pipeline opening "": open : no such file or directory"
Previous PR removed the default file names for service mapping
  • Loading branch information
jotak committed Sep 22, 2022
1 parent c022003 commit 4826700
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
27 changes: 27 additions & 0 deletions cmd/flowlogs-pipeline/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@
package main

import (
"encoding/json"
"os"
"os/exec"
"testing"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline"
"github.com/stretchr/testify/require"
)

func TestTheMain(t *testing.T) {
Expand All @@ -36,3 +41,25 @@ func TestTheMain(t *testing.T) {
}
t.Fatalf("process ran with err %v, want exit status 1", err)
}

func TestPipelineConfigSetup(t *testing.T) {
js := `{
"PipeLine": "[{\"name\":\"grpc\"},{\"follows\":\"grpc\",\"name\":\"enrich\"},{\"follows\":\"enrich\",\"name\":\"loki\"},{\"follows\":\"enrich\",\"name\":\"prometheus\"}]",
"Parameters": "[{\"ingest\":{\"grpc\":{\"port\":2055},\"type\":\"grpc\"},\"name\":\"grpc\"},{\"name\":\"enrich\",\"transform\":{\"network\":{\"rules\":[{\"input\":\"SrcAddr\",\"output\":\"SrcK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstAddr\",\"output\":\"DstK8S\",\"type\":\"add_kubernetes\"},{\"input\":\"DstPort\",\"output\":\"Service\",\"parameters\":\"Proto\",\"type\":\"add_service\"},{\"input\":\"SrcAddr\",\"output\":\"SrcSubnet\",\"parameters\":\"/16\",\"type\":\"add_subnet\"}]},\"type\":\"network\"}},{\"name\":\"loki\",\"write\":{\"loki\":{\"batchSize\":102400,\"batchWait\":\"1s\",\"clientConfig\":{\"follow_redirects\":false,\"proxy_url\":null,\"tls_config\":{\"insecure_skip_verify\":false}},\"labels\":[\"SrcK8S_Namespace\",\"SrcK8S_OwnerName\",\"DstK8S_Namespace\",\"DstK8S_OwnerName\",\"FlowDirection\"],\"maxBackoff\":\"5m0s\",\"maxRetries\":10,\"minBackoff\":\"1s\",\"staticLabels\":{\"app\":\"netobserv-flowcollector\"},\"tenantID\":\"netobserv\",\"timeout\":\"10s\",\"timestampLabel\":\"TimeFlowEndMs\",\"timestampScale\":\"1ms\",\"url\":\"http://loki.netobserv.svc:3100/\"},\"type\":\"loki\"}},{\"encode\":{\"prom\":{\"metrics\":[{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"Service\",\"SrcK8S_Namespace\"],\"name\":\"bandwidth_per_network_service_per_namespace\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"SrcSubnet\"],\"name\":\"bandwidth_per_source_subnet\",\"type\":\"counter\",\"valueKey\":\"Bytes\"},{\"buckets\":null,\"filter\":{\"key\":\"\",\"value\":\"\"},\"labels\":[\"Service\"],\"name\":\"network_service_total\",\"type\":\"counter\",\"valueKey\":\"\"}],\"port\":9102,\"prefix\":\"netobserv_\"},\"type\":\"prom\"},\"name\":\"prometheus\"}]",
"Health": {
"Port": "8080"
},
"Profile": {
"Port": 0
}
}`
var opts config.Options
err := json.Unmarshal([]byte(js), &opts)
require.NoError(t, err)
cfg, err := config.ParseConfig(opts)
require.NoError(t, err)
require.NotNil(t, cfg)
mainPipeline, err := pipeline.NewPipeline(&cfg)
require.NoError(t, err)
require.NotNil(t, mainPipeline)
}
12 changes: 12 additions & 0 deletions pkg/api/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,18 @@ type TransformNetwork struct {
ProtocolsFile string `yaml:"protocolsFile,omitempty" json:"protocolsFile,omitempty" doc:"path to protocols file (optional, default: /etc/protocols)"`
}

func (tn *TransformNetwork) GetServiceFiles() (string, string) {
p := tn.ProtocolsFile
if p == "" {
p = "/etc/protocols"
}
s := tn.ServicesFile
if s == "" {
s = "/etc/services"
}
return p, s
}

type TransformNetworkOperationEnum struct {
AddRegExIf string `yaml:"add_regex_if" json:"add_regex_if" doc:"add output field if input field satisfies regex pattern from parameters field"`
AddIf string `yaml:"add_if" json:"add_if" doc:"add output field if input field satisfies criteria from parameters field"`
Expand Down
9 changes: 5 additions & 4 deletions pkg/pipeline/transform/transform_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,16 @@ func NewTransformNetwork(params config.StageParam) (Transformer, error) {

var servicesDB *netdb.ServiceNames
if needToInitNetworkServices {
pFilename, sFilename := jsonNetworkTransform.GetServiceFiles()
var err error
protos, err := os.Open(jsonNetworkTransform.ProtocolsFile)
protos, err := os.Open(pFilename)
if err != nil {
return nil, fmt.Errorf("opening %q: %w", jsonNetworkTransform.ProtocolsFile, err)
return nil, fmt.Errorf("opening %q: %w", pFilename, err)
}
defer protos.Close()
services, err := os.Open(jsonNetworkTransform.ServicesFile)
services, err := os.Open(sFilename)
if err != nil {
return nil, fmt.Errorf("opening %q: %w", jsonNetworkTransform.ServicesFile, err)
return nil, fmt.Errorf("opening %q: %w", sFilename, err)
}
defer services.Close()
servicesDB, err = netdb.LoadServicesDB(protos, services)
Expand Down

0 comments on commit 4826700

Please sign in to comment.