Skip to content

Commit

Permalink
Fixing filter spans to match new API
Browse files Browse the repository at this point in the history
  • Loading branch information
boostchicken committed Jun 11, 2022
1 parent 1d0456d commit 660bc0e
Show file tree
Hide file tree
Showing 7 changed files with 442 additions and 38 deletions.
8 changes: 4 additions & 4 deletions internal/coreinternal/processor/filterspan/filterspan.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ func (mp *propertiesMatcher) MatchSpan(span ptrace.Span, resource pcommon.Resour
// If a set of properties was not in the mp, all spans are considered to match on that property
if mp.serviceFilters != nil {
// Check resource and spans for service.name
serviceName, found := serviceNameForResource(resource)
if !found {
serviceName, _ = serviceNameForSpan(span)
serviceName := serviceNameForResource(resource)
if serviceName == "<nil-service-name>" {
serviceName = serviceNameForSpan(span)
}
if !mp.serviceFilters.Matches(serviceName) {
return false
Expand All @@ -139,7 +139,7 @@ func serviceNameForResource(resource pcommon.Resource) string {
}

// serviceNameForSpan gets the service name for a span
func serviceNameForSpan(span pdata.Span) (string, bool) {
func serviceNameForSpan(span ptrace.Span) string {
service, found := span.Attributes().Get(conventions.AttributeServiceName)
if !found {
return "<nil-service-name>"
Expand Down
3 changes: 2 additions & 1 deletion processor/filterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package filterprocessor

import (
"path"
"path/filepath"
"testing"

Expand Down Expand Up @@ -278,7 +279,7 @@ func TestLoadingSpans(t *testing.T) {
require.NoError(t, err)
factory := NewFactory()
factories.Processors[typeStr] = factory
cfg, err := configtest.LoadConfigAndValidate(path.Join(".", "testdata", "config_traces.yaml"), factories)
cfg, err := servicetest.LoadConfigAndValidate(path.Join(".", "testdata", "config_traces.yaml"), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

Expand Down
1 change: 1 addition & 0 deletions processor/filterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"fmt"
"path/filepath"
"strings"
"testing"

"github.com/stretchr/testify/assert"
Expand Down
23 changes: 12 additions & 11 deletions processor/filterprocessor/filter_processor_traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ package filterprocessor // import "github.com/open-telemetry/opentelemetry-colle

import (
"context"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"go.opentelemetry.io/collector/model/pdata"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.uber.org/zap"

Expand All @@ -41,7 +42,7 @@ func newFilterSpansProcessor(logger *zap.Logger, cfg *Config) (*filterSpanProces
return nil, err
}

includeMatchTypem, excludeMatchType := "[None]"
includeMatchType, excludeMatchType := "[None]", "[None]"
if cfg.Spans.Include != nil {
includeMatchType = string(cfg.Spans.Include.MatchType)
}
Expand Down Expand Up @@ -85,31 +86,31 @@ func createSpanMatcher(cfg *Config) (filterspan.Matcher, filterspan.Matcher, err
}

// processTraces filters the given spans of a traces based off the filterSpanProcessor's filters.
func (fsp *filterSpanProcessor) processTraces(_ context.Context, pdt pdata.Traces) (pdata.Traces, error) {
func (fsp *filterSpanProcessor) processTraces(_ context.Context, pdt ptrace.Traces) (ptrace.Traces, error) {
for i := 0; i < pdt.ResourceSpans().Len(); i++ {
resSpan := pdt.ResourceSpans().At(i)
for x := 0; x < resSpan.InstrumentationLibrarySpans().Len(); x++ {
ils := resSpan.InstrumentationLibrarySpans().At(x)
ils.Spans().RemoveIf(func(span pdata.Span) bool {
return fsp.shouldRemoveSpan(span, resSpan.Resource(), ils.InstrumentationLibrary())
for x := 0; x < resSpan.ScopeSpans().Len(); x++ {
ils := resSpan.ScopeSpans().At(x)
ils.Spans().RemoveIf(func(span ptrace.Span) bool {
return fsp.shouldRemoveSpan(span, resSpan.Resource(), ils.Scope())
})
}
// Remove empty elements, that way if we delete everything we can tell
// the pipeline to stop processing completely (ErrSkipProcessingData)
resSpan.InstrumentationLibrarySpans().RemoveIf(func(ilsSpans pdata.InstrumentationLibrarySpans) bool {
resSpan.ScopeSpans().RemoveIf(func(ilsSpans ptrace.ScopeSpans) bool {
return ilsSpans.Spans().Len() == 0
})
}
pdt.ResourceSpans().RemoveIf(func(res pdata.ResourceSpans) bool {
return res.InstrumentationLibrarySpans().Len() == 0
pdt.ResourceSpans().RemoveIf(func(res ptrace.ResourceSpans) bool {
return res.ScopeSpans().Len() == 0
})
if pdt.ResourceSpans().Len() == 0 {
return pdt, processorhelper.ErrSkipProcessingData
}
return pdt, nil
}

func (fsp *filterSpanProcessor) shouldRemoveSpan(span pdata.Span, resource pdata.Resource, library pdata.InstrumentationLibrary) bool {
func (fsp *filterSpanProcessor) shouldRemoveSpan(span ptrace.Span, resource pcommon.Resource, library pcommon.InstrumentationScope) bool {
if fsp.include != nil {
if !fsp.include.MatchSpan(span, resource, library) {
return true
Expand Down
45 changes: 23 additions & 22 deletions processor/filterprocessor/filter_processor_traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ package filterprocessor

import (
"context"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/model/pdata"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterconfig"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/processor/filterset"
Expand All @@ -34,16 +34,16 @@ type testTrace struct {
spanName string
libraryName string
libraryVersion string
resourceAttributes map[string]pdata.AttributeValue
tags map[string]pdata.AttributeValue
resourceAttributes map[string]interface{}
tags map[string]interface{}
}

// All the data we need to define a test
type traceTest struct {
name string
inc *filterconfig.MatchProperties
exc *filterconfig.MatchProperties
inTraces pdata.Traces
inTraces ptrace.Traces
allTracesFiltered bool
spanCountExpected int // The number of spans that should be left after all filtering
}
Expand All @@ -54,11 +54,11 @@ var (
spanName: "test!",
libraryName: "otel",
libraryVersion: "11",
resourceAttributes: map[string]pdata.AttributeValue{
"service.name": pdata.NewAttributeValueString("test_service"),
resourceAttributes: map[string]interface{}{
"service.name": "test_service",
},
tags: map[string]pdata.AttributeValue{
"db.type": pdata.NewAttributeValueString("redis"),
tags: map[string]interface{}{
"db.type": "redis",
},
},
}
Expand All @@ -68,24 +68,24 @@ var (
spanName: "test!",
libraryName: "otel",
libraryVersion: "11",
resourceAttributes: map[string]pdata.AttributeValue{
"service.name": pdata.NewAttributeValueString("keep"),
resourceAttributes: map[string]interface{}{
"service.name": "keep",
},
},
{
spanName: "test!",
libraryName: "otel",
libraryVersion: "11",
resourceAttributes: map[string]pdata.AttributeValue{
"service.name": pdata.NewAttributeValueString("dont_keep"),
resourceAttributes: map[string]interface{}{
"service.name": "dont_keep",
},
},
{
spanName: "test!",
libraryName: "otel",
libraryVersion: "11",
resourceAttributes: map[string]pdata.AttributeValue{
"service.name": pdata.NewAttributeValueString("keep"),
resourceAttributes: map[string]interface{}{
"service.name": "keep",
},
},
}
Expand All @@ -96,6 +96,7 @@ var (
}

redisMatchProperties = &filterconfig.MatchProperties{
Config: filterset.Config{MatchType: filterset.Strict},
Attributes: []filterconfig.Attribute{
{Key: "db.type", Value: "redis"},
},
Expand Down Expand Up @@ -164,17 +165,17 @@ func TestFilterTraceProcessor(t *testing.T) {
})
}
}
func generateTraces(traces []testTrace) pdata.Traces {
td := pdata.NewTraces()
func generateTraces(traces []testTrace) ptrace.Traces {
td := ptrace.NewTraces()

for _, trace := range traces {
rs := td.ResourceSpans().AppendEmpty()
pdata.NewAttributeMapFromMap(trace.resourceAttributes).CopyTo(rs.Resource().Attributes())
ils := rs.InstrumentationLibrarySpans().AppendEmpty()
ils.InstrumentationLibrary().SetName(trace.libraryName)
ils.InstrumentationLibrary().SetVersion(trace.libraryVersion)
pcommon.NewMapFromRaw(trace.resourceAttributes).CopyTo(rs.Resource().Attributes())
ils := rs.ScopeSpans().AppendEmpty()
ils.Scope().SetName(trace.libraryName)
ils.Scope().SetVersion(trace.libraryVersion)
span := ils.Spans().AppendEmpty()
pdata.NewAttributeMapFromMap(trace.tags).CopyTo(span.Attributes())
pcommon.NewMapFromRaw(trace.tags).CopyTo(span.Attributes())
span.SetName(trace.spanName)
}
return td
Expand Down
1 change: 1 addition & 0 deletions processor/filterprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.53.0
github.com/stretchr/testify v1.7.2
go.opentelemetry.io/collector v0.53.0
go.opentelemetry.io/collector/model v0.50.0
go.opentelemetry.io/collector/pdata v0.53.0
go.uber.org/zap v1.21.0
)
Expand Down
Loading

0 comments on commit 660bc0e

Please sign in to comment.