Skip to content

Commit

Permalink
feat(k8sprocessor): add ignored pod names as config parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
pmalek committed Dec 16, 2021
1 parent 3ef0f4e commit ec30001
Show file tree
Hide file tree
Showing 13 changed files with 176 additions and 19 deletions.
1 change: 1 addition & 0 deletions pkg/processor/k8sprocessor/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func newFakeClient(
rules kube.ExtractionRules,
filters kube.Filters,
associations []kube.Association,
exclude kube.Excludes,
_ kube.APIClientsetProvider,
_ kube.InformerProvider,
_ kube.OwnerProvider,
Expand Down
14 changes: 14 additions & 0 deletions pkg/processor/k8sprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type Config struct {
// Association section allows to define rules for tagging spans, metrics,
// and logs with Pod metadata.
Association []PodAssociationConfig `mapstructure:"pod_association"`

// Exclude section allows to define names of pod that should be
// ignored while tagging.
Exclude ExcludeConfig `mapstructure:"exclude"`
}

func (cfg *Config) Validate() error {
Expand Down Expand Up @@ -227,3 +231,13 @@ type PodAssociationConfig struct {

// DefaultDelimiter is default value for Delimiter for ExtractConfig
const DefaultDelimiter string = ", "

// ExcludeConfig represent a list of Pods to exclude
type ExcludeConfig struct {
Pods []ExcludePodConfig `mapstructure:"pods"`
}

// ExcludePodConfig represent a Pod name to ignore
type ExcludePodConfig struct {
Name string `mapstructure:"name"`
}
18 changes: 17 additions & 1 deletion pkg/processor/k8sprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,14 @@ func TestLoadConfig(t *testing.T) {
&Config{
ProcessorSettings: config.NewProcessorSettings(config.NewComponentID(typeStr)),
APIConfig: k8sconfig.APIConfig{AuthType: k8sconfig.AuthTypeServiceAccount},
Extract: ExtractConfig{Delimiter: ", "},
Exclude: ExcludeConfig{Pods: []ExcludePodConfig{
{Name: "jaeger-agent"},
{Name: "jaeger-collector"},
{Name: "otel-collector"},
{Name: "otel-agent"},
{Name: "collection-sumologic-otelcol"},
}},
Extract: ExtractConfig{Delimiter: ", "},
})

p1 := cfg.Processors[config.NewComponentIDWithName(typeStr, "2")]
Expand Down Expand Up @@ -119,5 +126,14 @@ func TestLoadConfig(t *testing.T) {
Name: "k8s.pod.uid",
},
},
Exclude: ExcludeConfig{
Pods: []ExcludePodConfig{
{Name: "jaeger-agent"},
{Name: "jaeger-collector"},
{Name: "otel-collector"},
{Name: "otel-agent"},
{Name: "collection-sumologic-otelcol"},
},
},
})
}
12 changes: 12 additions & 0 deletions pkg/processor/k8sprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ const (

var kubeClientProvider = kube.ClientProvider(nil)
var processorCapabilities = consumer.Capabilities{MutatesData: true}
var defaultExcludes = ExcludeConfig{
Pods: []ExcludePodConfig{
{Name: "jaeger-agent"},
{Name: "jaeger-collector"},
{Name: "otel-collector"},
{Name: "otel-agent"},
{Name: "collection-sumologic-otelcol"},
},
}

// NewFactory returns a new factory for the k8s processor.
func NewFactory() component.ProcessorFactory {
Expand All @@ -52,6 +61,7 @@ func createDefaultConfig() config.Processor {
Extract: ExtractConfig{
Delimiter: DefaultDelimiter,
},
Exclude: defaultExcludes,
}
}

Expand Down Expand Up @@ -200,5 +210,7 @@ func createProcessorOpts(cfg config.Processor) []Option {

opts = append(opts, WithDelimiter(oCfg.Extract.Delimiter))

opts = append(opts, WithExcludes(oCfg.Exclude))

return opts
}
3 changes: 3 additions & 0 deletions pkg/processor/k8sprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ require (
k8s.io/client-go v0.22.1
)

require github.com/kr/pretty v0.2.0

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v4.11.0+incompatible // indirect
Expand All @@ -30,6 +32,7 @@ require (
github.com/imdario/mergo v0.3.11 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/knadh/koanf v1.3.2 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/magiconair/properties v1.8.5 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/mapstructure v1.4.2 // indirect
Expand Down
9 changes: 6 additions & 3 deletions pkg/processor/k8sprocessor/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type WatchClient struct {
Rules ExtractionRules
Filters Filters
Associations []Association
Exclude Excludes
}

// New initializes a new k8s Client.
Expand All @@ -59,6 +60,7 @@ func New(
rules ExtractionRules,
filters Filters,
associations []Association,
exclude Excludes,
newClientSet APIClientsetProvider,
newInformer InformerProvider,
newOwnerProviderFunc OwnerProvider,
Expand All @@ -69,6 +71,7 @@ func New(
Rules: rules,
Filters: filters,
Associations: associations,
Exclude: exclude,
stopCh: make(chan struct{}),
delimiter: delimiter,
}
Expand Down Expand Up @@ -465,9 +468,9 @@ func (c *WatchClient) shouldIgnorePod(pod *api_v1.Pod) bool {
}
}

// Check well known names that should be ignored
for _, rexp := range podNameIgnorePatterns {
if rexp.MatchString(pod.Name) {
// Check if user requested the pod to be ignored through configuration
for _, excludedPod := range c.Exclude.Pods {
if excludedPod.Name.MatchString(pod.Name) {
return true
}
}
Expand Down
33 changes: 28 additions & 5 deletions pkg/processor/k8sprocessor/kube/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ func podAddAndUpdateTest(t *testing.T, c *WatchClient, handler func(obj interfac
}

func TestDefaultClientset(t *testing.T) {
c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, nil, nil, nil, "")
c, err := New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, nil, nil, nil, "")
assert.Error(t, err)
assert.Equal(t, "invalid authType for kubernetes: ", err.Error())
assert.Nil(t, c)

c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, newFakeAPIClientset, nil, nil, "")
c, err = New(zap.NewNop(), k8sconfig.APIConfig{}, ExtractionRules{}, Filters{}, []Association{}, Excludes{}, newFakeAPIClientset, nil, nil, "")
assert.NoError(t, err)
assert.NotNil(t, c)
}
Expand All @@ -104,6 +104,7 @@ func TestBadFilters(t *testing.T) {
ExtractionRules{},
Filters{Fields: []FieldFilter{{Op: selection.Exists}}},
[]Association{},
Excludes{},
newFakeAPIClientset,
NewFakeInformer,
newFakeOwnerProvider,
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestConstructorErrors(t *testing.T) {
gotAPIConfig = c
return nil, fmt.Errorf("error creating k8s client")
}
c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, clientProvider, NewFakeInformer, newFakeOwnerProvider, "")
c, err := New(zap.NewNop(), apiCfg, er, ff, []Association{}, Excludes{}, clientProvider, NewFakeInformer, newFakeOwnerProvider, "")
assert.Nil(t, c)
assert.Error(t, err)
assert.Equal(t, err.Error(), "error creating k8s client")
Expand Down Expand Up @@ -851,12 +852,28 @@ func TestPodIgnorePatterns(t *testing.T) {
Name: "jaeger-collector",
},
},
}, {
ignore: true,
pod: api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: "jaeger-agent-b2zdv",
},
},
}, {
ignore: false,
pod: api_v1.Pod{
ObjectMeta: meta_v1.ObjectMeta{
Name: "test-pod-name",
},
},
},
}

c, _ := newTestClient(t)
for _, tc := range testCases {
assert.Equal(t, tc.ignore, c.shouldIgnorePod(&tc.pod))
assert.Equal(t, tc.ignore, c.shouldIgnorePod(&tc.pod),
"Should ignore %v, pod.Name: %q, pod annotations %#v", tc.ignore, tc.pod.Name, tc.pod.Annotations,
)
}
}

Expand Down Expand Up @@ -952,7 +969,13 @@ func Test_selectorsFromFilters(t *testing.T) {
func newTestClientWithRulesAndFilters(t *testing.T, e ExtractionRules, f Filters) (*WatchClient, *observer.ObservedLogs) {
observedLogger, logs := observer.New(zapcore.WarnLevel)
logger := zap.New(observedLogger)
c, err := New(logger, k8sconfig.APIConfig{}, e, f, []Association{}, newFakeAPIClientset, NewFakeInformer, newFakeOwnerProvider, "_")
exclude := Excludes{
Pods: []ExcludePods{
{Name: regexp.MustCompile(`jaeger-agent`)},
{Name: regexp.MustCompile(`jaeger-collector`)},
},
}
c, err := New(logger, k8sconfig.APIConfig{}, e, f, []Association{}, exclude, newFakeAPIClientset, NewFakeInformer, newFakeOwnerProvider, "_")
require.NoError(t, err)
return c.(*WatchClient), logs
}
Expand Down
31 changes: 22 additions & 9 deletions pkg/processor/k8sprocessor/kube/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,6 @@ const (
type PodIdentifier string

var (
// TODO: move these to config with default values
podNameIgnorePatterns = []*regexp.Regexp{
regexp.MustCompile(`jaeger-agent`),
regexp.MustCompile(`jaeger-collector`),
regexp.MustCompile(`otel-collector`),
regexp.MustCompile(`otel-agent`),
regexp.MustCompile(`collection-sumologic-otelcol`),
}
defaultPodDeleteGracePeriod = time.Second * 120
watchSyncPeriod = time.Minute * 5
)
Expand All @@ -71,7 +63,18 @@ type Client interface {
}

// ClientProvider defines a func type that returns a new Client.
type ClientProvider func(*zap.Logger, k8sconfig.APIConfig, ExtractionRules, Filters, []Association, APIClientsetProvider, InformerProvider, OwnerProvider, string) (Client, error)
type ClientProvider func(
*zap.Logger,
k8sconfig.APIConfig,
ExtractionRules,
Filters,
[]Association,
Excludes,
APIClientsetProvider,
InformerProvider,
OwnerProvider,
string,
) (Client, error)

// APIClientsetProvider defines a func type that initializes and return a new kubernetes
// Clientset object.
Expand Down Expand Up @@ -218,3 +221,13 @@ type Association struct {
From string
Name string
}

// Excludes represent a list of Pods to ignore
type Excludes struct {
Pods []ExcludePods
}

// ExcludePods represent a Pod name to ignore
type ExcludePods struct {
Name *regexp.Regexp
}
19 changes: 19 additions & 0 deletions pkg/processor/k8sprocessor/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,3 +365,22 @@ func WithDelimiter(delimiter string) Option {
return nil
}
}

// WithExcludes allows specifying pods to exclude
func WithExcludes(podExclude ExcludeConfig) Option {
return func(p *kubernetesprocessor) error {
ignoredNames := kube.Excludes{}
names := podExclude.Pods

if len(names) == 0 {
names = defaultExcludes.Pods
}
for _, name := range names {
ignoredNames.Pods = append(ignoredNames.Pods, kube.ExcludePods{
Name: regexp.MustCompile(name.Name)},
)
}
p.podIgnore = ignoredNames
return nil
}
}
46 changes: 46 additions & 0 deletions pkg/processor/k8sprocessor/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/selection"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
Expand Down Expand Up @@ -663,3 +664,48 @@ func TestWithExtractPodAssociation(t *testing.T) {
})
}
}

func TestWithExcludes(t *testing.T) {
tests := []struct {
name string
args ExcludeConfig
want kube.Excludes
}{
{
"default",
ExcludeConfig{},
kube.Excludes{
Pods: []kube.ExcludePods{
{Name: regexp.MustCompile(`jaeger-agent`)},
{Name: regexp.MustCompile(`jaeger-collector`)},
{Name: regexp.MustCompile(`otel-collector`)},
{Name: regexp.MustCompile(`otel-agent`)},
{Name: regexp.MustCompile(`collection-sumologic-otelcol`)},
},
},
},
{
"configured",
ExcludeConfig{
Pods: []ExcludePodConfig{
{Name: "ignore_pod1"},
{Name: "ignore_pod2"},
},
},
kube.Excludes{
Pods: []kube.ExcludePods{
{Name: regexp.MustCompile(`ignore_pod1`)},
{Name: regexp.MustCompile(`ignore_pod2`)},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &kubernetesprocessor{}
option := WithExcludes(tt.args)
require.NoError(t, option(p))
assert.Equal(t, tt.want, p.podIgnore)
})
}
}
3 changes: 2 additions & 1 deletion pkg/processor/k8sprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type kubernetesprocessor struct {
rules kube.ExtractionRules
filters kube.Filters
podAssociations []kube.Association
podIgnore kube.Excludes
delimiter string
}

Expand All @@ -46,7 +47,7 @@ func (kp *kubernetesprocessor) initKubeClient(logger *zap.Logger, kubeClient kub
kubeClient = kube.New
}
if !kp.passthroughMode {
kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, nil, nil, nil, kp.delimiter)
kc, err := kubeClient(logger, kp.apiConfig, kp.rules, kp.filters, kp.podAssociations, kp.podIgnore, nil, nil, nil, kp.delimiter)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/processor/k8sprocessor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func TestProcessorBadClientProvider(t *testing.T) {
_ kube.ExtractionRules,
_ kube.Filters,
_ []kube.Association,
_ kube.Excludes,
_ kube.APIClientsetProvider,
_ kube.InformerProvider,
_ kube.OwnerProvider,
Expand Down
5 changes: 5 additions & 0 deletions pkg/processor/k8sprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ processors:
- from: resource_attribute
name: k8s.pod.uid

exclude:
pods:
- name: jaeger-agent
- name: jaeger-collector

exporters:
nop:

Expand Down

0 comments on commit ec30001

Please sign in to comment.