Skip to content

Commit

Permalink
Resource Processor: Logs Support (#1650) (#1729)
Browse files Browse the repository at this point in the history
* Resource Processor: Logs Support (#1650)

* Resource Processor: incorrect config testcase
  • Loading branch information
pmm-sumo authored Sep 3, 2020
1 parent d119b89 commit 315e96e
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 2 deletions.
2 changes: 1 addition & 1 deletion processor/resourceprocessor/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Resource Processor

Supported pipeline types: metrics, traces
Supported pipeline types: metrics, traces, logs

The resource processor can be used to apply changes on resource attributes.
Please refer to [config.go](./config.go) for the config spec.
Expand Down
19 changes: 18 additions & 1 deletion processor/resourceprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func NewFactory() component.ProcessorFactory {
typeStr,
createDefaultConfig,
processorhelper.WithTraces(createTraceProcessor),
processorhelper.WithMetrics(createMetricsProcessor))
processorhelper.WithMetrics(createMetricsProcessor),
processorhelper.WithLogs(createLogsProcessor))
}

// Note: This isn't a valid configuration because the processor would do no work.
Expand Down Expand Up @@ -85,6 +86,22 @@ func createMetricsProcessor(
processorhelper.WithCapabilities(processorCapabilities))
}

func createLogsProcessor(
_ context.Context,
params component.ProcessorCreateParams,
cfg configmodels.Processor,
nextConsumer consumer.LogsConsumer) (component.LogsProcessor, error) {
attrProc, err := createAttrProcessor(cfg.(*Config), params.Logger)
if err != nil {
return nil, err
}
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
&resourceProcessor{attrProc: attrProc},
processorhelper.WithCapabilities(processorCapabilities))
}

func createAttrProcessor(cfg *Config, logger *zap.Logger) (*processorhelper.AttrProc, error) {
handleDeprecatedFields(cfg, logger)
if len(cfg.AttributesActions) == 0 {
Expand Down
14 changes: 14 additions & 0 deletions processor/resourceprocessor/resource_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,17 @@ func (rp *resourceProcessor) ProcessMetrics(_ context.Context, md pdata.Metrics)
}
return md, nil
}

// ProcessLogs implements the LProcessor interface
func (rp *resourceProcessor) ProcessLogs(_ context.Context, ld pdata.Logs) (pdata.Logs, error) {
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
resource := rls.At(i).Resource()
if resource.IsNil() {
resource.InitEmpty()
}
attrs := resource.Attributes()
rp.attrProc.Process(attrs)
}
return ld, nil
}
65 changes: 65 additions & 0 deletions processor/resourceprocessor/resource_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,48 @@ func TestResourceProcessorAttributesUpsert(t *testing.T) {
err = rmp.ConsumeMetrics(context.Background(), sourceMetricData)
require.NoError(t, err)
assert.EqualValues(t, wantMetricData, tmn.md)

// Test logs consumer
tln := &testLogsConsumer{}
rlp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, tt.config, tln)
require.NoError(t, err)
assert.Equal(t, true, rtp.GetCapabilities().MutatesConsumedData)

sourceLogData := generateLogData(tt.sourceAttributes)
wantLogData := generateLogData(tt.wantAttributes)
err = rlp.ConsumeLogs(context.Background(), sourceLogData)
require.NoError(t, err)
assert.EqualValues(t, wantLogData, tln.ld)
})
}
}

func TestResourceProcessorError(t *testing.T) {
ttn := &testTraceConsumer{}

badCfg := &Config{
ProcessorSettings: processorSettings,
AttributesActions: nil,
}

factory := NewFactory()
rtp, err := factory.CreateTraceProcessor(context.Background(), component.ProcessorCreateParams{}, ttn, badCfg)
require.Error(t, err)
require.Nil(t, rtp)

// Test metrics consumer
tmn := &testMetricsConsumer{}
rmp, err := factory.CreateMetricsProcessor(context.Background(), component.ProcessorCreateParams{}, tmn, badCfg)
require.Error(t, err)
require.Nil(t, rmp)

// Test logs consumer
tln := &testLogsConsumer{}
rlp, err := factory.CreateLogsProcessor(context.Background(), component.ProcessorCreateParams{}, badCfg, tln)
require.Error(t, err)
require.Nil(t, rlp)
}

func generateTraceData(attributes map[string]string) pdata.Traces {
td := testdata.GenerateTraceDataOneSpanNoResource()
if attributes == nil {
Expand Down Expand Up @@ -158,6 +196,20 @@ func generateMetricData(attributes map[string]string) pdata.Metrics {
return md
}

func generateLogData(attributes map[string]string) pdata.Logs {
ld := testdata.GenerateLogDataOneLogNoResource()
if attributes == nil {
return ld
}
resource := ld.ResourceLogs().At(0).Resource()
resource.InitEmpty()
for k, v := range attributes {
resource.Attributes().InsertString(k, v)
}
resource.Attributes().Sort()
return ld
}

type testTraceConsumer struct {
td pdata.Traces
}
Expand All @@ -184,6 +236,19 @@ func (tmn *testMetricsConsumer) ConsumeMetrics(_ context.Context, md pdata.Metri
return nil
}

type testLogsConsumer struct {
ld pdata.Logs
}

func (tln *testLogsConsumer) ConsumeLogs(_ context.Context, ld pdata.Logs) error {
// sort attributes to be able to compare traces
for i := 0; i < ld.ResourceLogs().Len(); i++ {
sortResourceAttributes(ld.ResourceLogs().At(i).Resource())
}
tln.ld = ld
return nil
}

func sortResourceAttributes(resource pdata.Resource) {
if resource.IsNil() {
return
Expand Down
4 changes: 4 additions & 0 deletions processor/resourceprocessor/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ exporters:

service:
pipelines:
logs:
receivers: [examplereceiver]
processors: [resource]
exporters: [exampleexporter]
metrics:
receivers: [examplereceiver]
processors: [resource]
Expand Down

0 comments on commit 315e96e

Please sign in to comment.