diff --git a/pkg/processor/k8sprocessor/client_test.go b/pkg/processor/k8sprocessor/client_test.go index 6d2c12ae37..2099fe2d77 100644 --- a/pkg/processor/k8sprocessor/client_test.go +++ b/pkg/processor/k8sprocessor/client_test.go @@ -47,6 +47,7 @@ func newFakeClient( rules kube.ExtractionRules, filters kube.Filters, associations []kube.Association, + exclude kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.OwnerProvider, diff --git a/pkg/processor/k8sprocessor/config.go b/pkg/processor/k8sprocessor/config.go index e6c3cbf478..5e82c9e46d 100644 --- a/pkg/processor/k8sprocessor/config.go +++ b/pkg/processor/k8sprocessor/config.go @@ -47,6 +47,10 @@ type Config struct { // Association section allows to define rules for tagging spans, metrics, // and logs with Pod metadata. Association []PodAssociationConfig `mapstructure:"pod_association"` + + // Exclude section allows to define names of pod that should be + // ignored while tagging. + Exclude ExcludeConfig `mapstructure:"exclude"` } func (cfg *Config) Validate() error { @@ -227,3 +231,13 @@ type PodAssociationConfig struct { // DefaultDelimiter is default value for Delimiter for ExtractConfig const DefaultDelimiter string = ", " + +// ExcludeConfig represent a list of Pods to exclude +type ExcludeConfig struct { + Pods []ExcludePodConfig `mapstructure:"pods"` +} + +// ExcludePodConfig represent a Pod name to ignore +type ExcludePodConfig struct { + Name string `mapstructure:"name"` +} diff --git a/pkg/processor/k8sprocessor/config_test.go b/pkg/processor/k8sprocessor/config_test.go index a86316465c..ce210c6181 100644 --- a/pkg/processor/k8sprocessor/config_test.go +++ b/pkg/processor/k8sprocessor/config_test.go @@ -48,7 +48,14 @@ func TestLoadConfig(t *testing.T) { &Config{ ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)), APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount}, - Extract: ExtractConfig{Delimiter: ", "}, + Exclude: ExcludeConfig{Pods: []ExcludePodConfig{ + {Name: "jaeger-agent"}, + {Name: "jaeger-collector"}, + {Name: "otel-collector"}, + {Name: "otel-agent"}, + {Name: "collection-sumologic-otelcol"}, + }}, + Extract: ExtractConfig{Delimiter: ", "}, }) p1 := cfg.Processors[config.NewComponentIDWithName(typeStr, "2")] @@ -119,5 +126,14 @@ func TestLoadConfig(t *testing.T) { Name: "k8s.pod.uid", }, }, + Exclude: ExcludeConfig{ + Pods: []ExcludePodConfig{ + {Name: "jaeger-agent"}, + {Name: "jaeger-collector"}, + {Name: "otel-collector"}, + {Name: "otel-agent"}, + {Name: "collection-sumologic-otelcol"}, + }, + }, }) } diff --git a/pkg/processor/k8sprocessor/factory.go b/pkg/processor/k8sprocessor/factory.go index 4c4df49c47..67a1e14e73 100644 --- a/pkg/processor/k8sprocessor/factory.go +++ b/pkg/processor/k8sprocessor/factory.go @@ -33,6 +33,15 @@ const ( var kubeClientProvider = kube.ClientProvider(nil) var processorCapabilities = consumer.Capabilities{MutatesData: true} +var defaultExcludes = ExcludeConfig{ + Pods: []ExcludePodConfig{ + {Name: "jaeger-agent"}, + {Name: "jaeger-collector"}, + {Name: "otel-collector"}, + {Name: "otel-agent"}, + {Name: "collection-sumologic-otelcol"}, + }, +} // NewFactory returns a new factory for the k8s processor. func NewFactory() component.ProcessorFactory { @@ -52,6 +61,7 @@ func createDefaultConfig() config.Processor { Extract: ExtractConfig{ Delimiter: DefaultDelimiter, }, + Exclude: defaultExcludes, } } @@ -200,5 +210,7 @@ func createProcessorOpts(cfg config.Processor) []Option { opts = append(opts, WithDelimiter(oCfg.Extract.Delimiter)) + opts = append(opts, WithExcludes(oCfg.Exclude)) + return opts } diff --git a/pkg/processor/k8sprocessor/go.mod b/pkg/processor/k8sprocessor/go.mod index 75678ae865..e3f8521ad2 100644 --- a/pkg/processor/k8sprocessor/go.mod +++ b/pkg/processor/k8sprocessor/go.mod @@ -16,6 +16,8 @@ require ( k8s.io/client-go v0.22.1 ) +require github.com/kr/pretty v0.2.0 + require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/evanphx/json-patch v4.11.0+incompatible // indirect @@ -30,6 +32,7 @@ require ( github.com/imdario/mergo v0.3.11 // indirect github.com/json-iterator/go v1.1.11 // indirect github.com/knadh/koanf v1.3.2 // indirect + github.com/kr/text v0.2.0 // indirect github.com/magiconair/properties v1.8.5 // indirect github.com/mitchellh/copystructure v1.2.0 // indirect github.com/mitchellh/mapstructure v1.4.2 // indirect diff --git a/pkg/processor/k8sprocessor/kube/client.go b/pkg/processor/k8sprocessor/kube/client.go index 2a25dd4882..795ab372cc 100644 --- a/pkg/processor/k8sprocessor/kube/client.go +++ b/pkg/processor/k8sprocessor/kube/client.go @@ -50,6 +50,7 @@ type WatchClient struct { Rules ExtractionRules Filters Filters Associations []Association + Exclude Excludes } // New initializes a new k8s Client. @@ -59,6 +60,7 @@ func New( rules ExtractionRules, filters Filters, associations []Association, + exclude Excludes, newClientSet APIClientsetProvider, newInformer InformerProvider, newOwnerProviderFunc OwnerProvider, @@ -69,6 +71,7 @@ func New( Rules: rules, Filters: filters, Associations: associations, + Exclude: exclude, stopCh: make(chan struct{}), delimiter: delimiter, } @@ -465,9 +468,9 @@ func (c *WatchClient) shouldIgnorePod(pod *api_v1.Pod) bool { } } - // Check well known names that should be ignored - for _, rexp := range podNameIgnorePatterns { - if rexp.MatchString(pod.Name) { + // Check if user requested the pod to be ignored through configuration + for _, excludedPod := range c.Exclude.Pods { + if excludedPod.Name.MatchString(pod.Name) { return true } } diff --git a/pkg/processor/k8sprocessor/kube/client_test.go b/pkg/processor/k8sprocessor/kube/client_test.go index 618f46953f..80c447e1d5 100644 --- a/pkg/processor/k8sprocessor/kube/client_test.go +++ b/pkg/processor/k8sprocessor/kube/client_test.go @@ -87,12 +87,12 @@ func podAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj interfac } func TestDefaultClientset(t *testing.T) { - c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, nil, nil, nil, "") + c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, "") assert.Error(t, err) assert.Equal(t, "invalid authType for kubernetes: ", err.Error()) assert.Nil(t, c) - c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, newFakeAPIClientset, nil, nil, "") + c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, "") assert.NoError(t, err) assert.NotNil(t, c) } @@ -104,6 +104,7 @@ func TestBadFilters(t *testing.T) { ExtractionRules{}, Filters{Fields: []FieldFilter{{Op: selection.Exists}}}, []Association{}, + Excludes{}, newFakeAPIClientset, NewFakeInformer, newFakeOwnerProvider, @@ -143,7 +144,7 @@ func TestConstructorErrors(t *testing.T) { gotAPIConfig = c return nil, fmt.Errorf("error creating k8s client") } - c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, clientProvider, NewFakeInformer, newFakeOwnerProvider, "") + c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, newFakeOwnerProvider, "") assert.Nil(t, c) assert.Error(t, err) assert.Equal(t, err.Error(), "error creating k8s client") @@ -851,12 +852,28 @@ func TestPodIgnorePatterns(t *testing.T) { Name: "jaeger-collector", }, }, + }, { + ignore: true, + pod: api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "jaeger-agent-b2zdv", + }, + }, + }, { + ignore: false, + pod: api_v1.Pod{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "test-pod-name", + }, + }, }, } c, _ := newTestClient(t) for _, tc := range testCases { - assert.Equal(t, tc.ignore, c.shouldIgnorePod(&tc.pod)) + assert.Equal(t, tc.ignore, c.shouldIgnorePod(&tc.pod), + "Should ignore %v, pod.Name: %q, pod annotations %#v", tc.ignore, tc.pod.Name, tc.pod.Annotations, + ) } } @@ -952,7 +969,13 @@ func Test_selectorsFromFilters(t *testing.T) { func newTestClientWithRulesAndFilters(t *testing.T, e ExtractionRules, f Filters) (*WatchClient, *observer.ObservedLogs) { observedLogger, logs := observer.New(zapcore.WarnLevel) logger := zap.New(observedLogger) - c, err := New(logger, k8sconfig.APIConfig{}, e, f, []Association{}, newFakeAPIClientset, NewFakeInformer, newFakeOwnerProvider, "_") + exclude := Excludes{ + Pods: []ExcludePods{ + {Name: regexp.MustCompile(`jaeger-agent`)}, + {Name: regexp.MustCompile(`jaeger-collector`)}, + }, + } + c, err := New(logger, k8sconfig.APIConfig{}, e, f, []Association{}, exclude, newFakeAPIClientset, NewFakeInformer, newFakeOwnerProvider, "_") require.NoError(t, err) return c.(*WatchClient), logs } diff --git a/pkg/processor/k8sprocessor/kube/kube.go b/pkg/processor/k8sprocessor/kube/kube.go index 542fb4d59a..8e4dab381e 100644 --- a/pkg/processor/k8sprocessor/kube/kube.go +++ b/pkg/processor/k8sprocessor/kube/kube.go @@ -51,14 +51,6 @@ const ( type PodIdentifier string var ( - // TODO: move these to config with default values - podNameIgnorePatterns = []*regexp.Regexp{ - regexp.MustCompile(`jaeger-agent`), - regexp.MustCompile(`jaeger-collector`), - regexp.MustCompile(`otel-collector`), - regexp.MustCompile(`otel-agent`), - regexp.MustCompile(`collection-sumologic-otelcol`), - } defaultPodDeleteGracePeriod = time.Second * 120 watchSyncPeriod = time.Minute * 5 ) @@ -71,7 +63,18 @@ type Client interface { } // ClientProvider defines a func type that returns a new Client. -type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, APIClientsetProvider, InformerProvider, OwnerProvider, string) (Client, error) +type ClientProvider func( + *zap.Logger, + k8sconfig.APIConfig, + ExtractionRules, + Filters, + []Association, + Excludes, + APIClientsetProvider, + InformerProvider, + OwnerProvider, + string, +) (Client, error) // APIClientsetProvider defines a func type that initializes and return a new kubernetes // Clientset object. @@ -218,3 +221,13 @@ type Association struct { From string Name string } + +// Excludes represent a list of Pods to ignore +type Excludes struct { + Pods []ExcludePods +} + +// ExcludePods represent a Pod name to ignore +type ExcludePods struct { + Name *regexp.Regexp +} diff --git a/pkg/processor/k8sprocessor/options.go b/pkg/processor/k8sprocessor/options.go index eb75536699..14e4d812b9 100644 --- a/pkg/processor/k8sprocessor/options.go +++ b/pkg/processor/k8sprocessor/options.go @@ -365,3 +365,22 @@ func WithDelimiter(delimiter string) Option { return nil } } + +// WithExcludes allows specifying pods to exclude +func WithExcludes(podExclude ExcludeConfig) Option { + return func(p *kubernetesprocessor) error { + ignoredNames := kube.Excludes{} + names := podExclude.Pods + + if len(names) == 0 { + names = defaultExcludes.Pods + } + for _, name := range names { + ignoredNames.Pods = append(ignoredNames.Pods, kube.ExcludePods{ + Name: regexp.MustCompile(name.Name)}, + ) + } + p.podIgnore = ignoredNames + return nil + } +} diff --git a/pkg/processor/k8sprocessor/options_test.go b/pkg/processor/k8sprocessor/options_test.go index 2fdc916769..0fdb78b8f5 100644 --- a/pkg/processor/k8sprocessor/options_test.go +++ b/pkg/processor/k8sprocessor/options_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "k8s.io/apimachinery/pkg/selection" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" @@ -663,3 +664,48 @@ func TestWithExtractPodAssociation(t *testing.T) { }) } } + +func TestWithExcludes(t *testing.T) { + tests := []struct { + name string + args ExcludeConfig + want kube.Excludes + }{ + { + "default", + ExcludeConfig{}, + kube.Excludes{ + Pods: []kube.ExcludePods{ + {Name: regexp.MustCompile(`jaeger-agent`)}, + {Name: regexp.MustCompile(`jaeger-collector`)}, + {Name: regexp.MustCompile(`otel-collector`)}, + {Name: regexp.MustCompile(`otel-agent`)}, + {Name: regexp.MustCompile(`collection-sumologic-otelcol`)}, + }, + }, + }, + { + "configured", + ExcludeConfig{ + Pods: []ExcludePodConfig{ + {Name: "ignore_pod1"}, + {Name: "ignore_pod2"}, + }, + }, + kube.Excludes{ + Pods: []kube.ExcludePods{ + {Name: regexp.MustCompile(`ignore_pod1`)}, + {Name: regexp.MustCompile(`ignore_pod2`)}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &kubernetesprocessor{} + option := WithExcludes(tt.args) + require.NoError(t, option(p)) + assert.Equal(t, tt.want, p.podIgnore) + }) + } +} diff --git a/pkg/processor/k8sprocessor/processor.go b/pkg/processor/k8sprocessor/processor.go index 728e336b83..921d1cbc88 100644 --- a/pkg/processor/k8sprocessor/processor.go +++ b/pkg/processor/k8sprocessor/processor.go @@ -38,6 +38,7 @@ type kubernetesprocessor struct { rules kube.ExtractionRules filters kube.Filters podAssociations []kube.Association + podIgnore kube.Excludes delimiter string } @@ -46,7 +47,7 @@ func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kub kubeClient = kube.New } if !kp.passthroughMode { - kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, nil, nil, nil, kp.delimiter) + kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, kp.delimiter) if err != nil { return err } diff --git a/pkg/processor/k8sprocessor/processor_test.go b/pkg/processor/k8sprocessor/processor_test.go index 4fcc3a2baa..eff69b6d88 100644 --- a/pkg/processor/k8sprocessor/processor_test.go +++ b/pkg/processor/k8sprocessor/processor_test.go @@ -230,6 +230,7 @@ func TestProcessorBadClientProvider(t *testing.T) { _ kube.ExtractionRules, _ kube.Filters, _ []kube.Association, + _ kube.Excludes, _ kube.APIClientsetProvider, _ kube.InformerProvider, _ kube.OwnerProvider, diff --git a/pkg/processor/k8sprocessor/testdata/config.yaml b/pkg/processor/k8sprocessor/testdata/config.yaml index 5d8ecd71cd..a0b9196d31 100644 --- a/pkg/processor/k8sprocessor/testdata/config.yaml +++ b/pkg/processor/k8sprocessor/testdata/config.yaml @@ -72,6 +72,11 @@ processors: - from: resource_attribute name: k8s.pod.uid + exclude: + pods: + - name: jaeger-agent + - name: jaeger-collector + exporters: nop: