Skip to content

Commit

Permalink
K8s processor refactoring (#1038)
Browse files Browse the repository at this point in the history
* K8s Processor: Use metrics internal data model

* K8s Processor: refactor tests

* Uses processhelper factory methods to initialize processor.
* Switches to internal metrics model
* Uses common tests for both metrics and logs, reducing boilerplate

* K8s Processor: minor code cleanup

* clear the flow of associating IP and fetching attributes
* remove unnecessary error from procesResource method (it never returns an error)
* make the intent clear when multiple extractors are present and first one finds IP (re PR comment)

* K8s Processor: move ip extractors into a separate file
  • Loading branch information
pmm-sumo authored Sep 17, 2020
1 parent d8f7bdd commit bff19a2
Show file tree
Hide file tree
Showing 5 changed files with 657 additions and 613 deletions.
75 changes: 72 additions & 3 deletions processor/k8sprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
)

var kubeClientProvider = kube.ClientProvider(nil)
var processorCapabilities = component.ProcessorCapabilities{MutatesConsumedData: true}

// NewFactory returns a new factory for the k8s processor.
func NewFactory() component.ProcessorFactory {
Expand All @@ -53,21 +54,89 @@ func createDefaultConfig() configmodels.Processor {
}

func createTraceProcessor(
_ context.Context,
ctx context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextTraceConsumer consumer.TraceConsumer,
) (component.TraceProcessor, error) {
return newTraceProcessor(params.Logger, nextTraceConsumer, kubeClientProvider, createProcessorOpts(cfg)...)
return createTraceProcessorWithOptions(ctx, params, cfg, nextTraceConsumer)
}

func createMetricsProcessor(
ctx context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextMetricsConsumer consumer.MetricsConsumer,
) (component.MetricsProcessor, error) {
return createMetricsProcessorWithOptions(ctx, params, cfg, nextMetricsConsumer)
}

func createTraceProcessorWithOptions(
_ context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextTraceConsumer consumer.TraceConsumer,
options ...Option,
) (component.TraceProcessor, error) {
kp, err := createKubernetesProcessor(params, cfg, options...)
if err != nil {
return nil, err
}

return processorhelper.NewTraceProcessor(
cfg,
nextTraceConsumer,
kp,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(kp.Start),
processorhelper.WithShutdown(kp.Shutdown))
}

func createMetricsProcessorWithOptions(
_ context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextMetricsConsumer consumer.MetricsConsumer,
options ...Option,
) (component.MetricsProcessor, error) {
return newMetricsProcessor(params.Logger, nextMetricsConsumer, kubeClientProvider, createProcessorOpts(cfg)...)
kp, err := createKubernetesProcessor(params, cfg, options...)
if err != nil {
return nil, err
}

return processorhelper.NewMetricsProcessor(
cfg,
nextMetricsConsumer,
kp,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(kp.Start),
processorhelper.WithShutdown(kp.Shutdown))
}

func createKubernetesProcessor(
params component.ProcessorCreateParams,
cfg configmodels.Processor,
options ...Option,
) (*kubernetesprocessor, error) {
kp := &kubernetesprocessor{logger: params.Logger}

allOptions := append(createProcessorOpts(cfg), options...)

for _, opt := range allOptions {
if err := opt(kp); err != nil {
return nil, err
}
}

// This might have been set by an option already
if kp.kc == nil {
err := kp.initKubeClient(kp.logger, kubeClientProvider)
if err != nil {
return nil, err
}
}

return kp, nil
}

func createProcessorOpts(cfg configmodels.Processor) []Option {
Expand Down
15 changes: 11 additions & 4 deletions processor/k8sprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.uber.org/zap"
)

Expand All @@ -33,26 +34,32 @@ func TestCreateDefaultConfig(t *testing.T) {

func TestCreateProcessor(t *testing.T) {
factory := NewFactory()

realClient := kubeClientProvider
kubeClientProvider = newFakeClient

cfg := factory.CreateDefaultConfig()
params := component.ProcessorCreateParams{Logger: zap.NewNop()}

tp, err := factory.CreateTraceProcessor(context.Background(), params, cfg, nil)
tp, err := factory.CreateTraceProcessor(context.Background(), params, cfg, exportertest.NewNopTraceExporter())
assert.NotNil(t, tp)
assert.NoError(t, err)

mp, err := factory.CreateMetricsProcessor(context.Background(), params, cfg, nil)
mp, err := factory.CreateMetricsProcessor(context.Background(), params, cfg, exportertest.NewNopMetricsExporter())
assert.NotNil(t, mp)
assert.NoError(t, err)

oCfg := cfg.(*Config)
oCfg.Passthrough = true

tp, err = factory.CreateTraceProcessor(context.Background(), params, cfg, nil)
tp, err = factory.CreateTraceProcessor(context.Background(), params, cfg, exportertest.NewNopTraceExporter())
assert.NotNil(t, tp)
assert.NoError(t, err)

mp, err = factory.CreateMetricsProcessor(context.Background(), params, cfg, nil)
mp, err = factory.CreateMetricsProcessor(context.Background(), params, cfg, exportertest.NewNopMetricsExporter())
assert.NotNil(t, mp)
assert.NoError(t, err)

// Switch it back so other tests run afterwards will not fail on unexpected state
kubeClientProvider = realClient
}
58 changes: 58 additions & 0 deletions processor/k8sprocessor/ip_extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2020 OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8sprocessor

import (
"net"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/translator/conventions"
)

type ipExtractor func(attrs pdata.AttributeMap) string

// k8sIPFromAttributes checks if the application, a collector/agent or a prior
// processor has already annotated the batch with IP and if so, uses it
func k8sIPFromAttributes() ipExtractor {
return func(attrs pdata.AttributeMap) string {
ip := stringAttributeFromMap(attrs, k8sIPLabelName)
if ip == "" {
ip = stringAttributeFromMap(attrs, clientIPLabelName)
}
return ip
}
}

// k8sIPFromHostnameAttributes leverages the observation that most of the metric receivers
// uses "host.hostname" resource label to identify metrics origin. In k8s environment,
// it's set to a pod IP address. If the value doesn't represent an IP address, we skip it.
func k8sIPFromHostnameAttributes() ipExtractor {
return func(attrs pdata.AttributeMap) string {
hostname := stringAttributeFromMap(attrs, conventions.AttributeHostHostname)
if net.ParseIP(hostname) != nil {
return hostname
}
return ""
}
}

func stringAttributeFromMap(attrs pdata.AttributeMap, key string) string {
if val, ok := attrs.Get(key); ok {
if val.Type() == pdata.AttributeValueSTRING {
return val.StringVal()
}
}
return ""
}
Loading

0 comments on commit bff19a2

Please sign in to comment.