Skip to content

Commit

Permalink
Add span attribute key mapping support (#9) (#536)
Browse files Browse the repository at this point in the history
* Add span attribute key mapping support (#9)

Many libraries and services didn't implement consistent naming for their span attribute keys, this feature provides the capability of making the attribute keys consistent for different libraries and services without the need to change their code.

* PR Feedback

* PR feedback

* type on test

Fix nil return and avoid lookup if overwritting

Better syntax to avoid lookup when overwritting

Support same replacement for multiple keys

* Simplify handling of attribute related config
  • Loading branch information
Paulo Janotti authored May 9, 2019
1 parent 5228f66 commit 5fb7397
Show file tree
Hide file tree
Showing 9 changed files with 684 additions and 12 deletions.
18 changes: 15 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,13 @@ agent/client health information/inventory metadata to downstream exporters.

### <a name="global-attributes"></a> Global Attributes

The collector also takes some global configurations that modify its behavior for all receivers / exporters. One of the configurations
available is to add Attributes or Tags to all spans passing through this collector. These additional attributes can be configured to either overwrite
attributes if they already exists on the span, or respect the original values. An example of this is provided below.
The collector also takes some global configurations that modify its behavior for all receivers / exporters.

1. Add Attributes to all spans passing through this collector. These additional attributes can be configured to either overwrite existing keys if they already exist on the span, or respect the original values.
2. The key of each attribute can also be mapped to different strings using the `key-mapping` configuration. The key matching is case sensitive.

An example using these configurations of this is provided below.

```yaml
global:
attributes:
Expand All @@ -290,6 +294,14 @@ global:
some_int: 1234
some_float: 3.14159
some_bool: false
key-mapping:
# key-mapping is used to replace the attribute key with different keys
- key: servertracer.http.responsecode
replacement: http.status_code
- key: servertracer.http.responsephrase
replacement: http.message
overwrite: true # replace attribute key even if the replacement string is already a key on the span attributes
keep: true # keep the attribute with the original key
```

### <a name="tail-sampling"></a>Intelligent Sampling
Expand Down
7 changes: 5 additions & 2 deletions cmd/occollector/app/builder/processor_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"time"

"github.com/spf13/viper"

"github.com/census-instrumentation/opencensus-service/processor/attributekeyprocessor"
)

// SenderType indicates the type of sender
Expand Down Expand Up @@ -126,8 +128,9 @@ type QueuedSpanProcessorCfg struct {
// AttributesCfg holds configuration for attributes that can be added to all spans
// going through a processor.
type AttributesCfg struct {
Overwrite bool `mapstructure:"overwrite"`
Values map[string]interface{} `mapstructure:"values"`
Overwrite bool `mapstructure:"overwrite"`
Values map[string]interface{} `mapstructure:"values"`
KeyReplacements []attributekeyprocessor.KeyReplacement `mapstructure:"key-mapping,omitempty"`
}

// GlobalProcessorCfg holds global configuration values that apply to all processors
Expand Down
89 changes: 89 additions & 0 deletions cmd/occollector/app/builder/processor_builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"testing"

"github.com/google/go-cmp/cmp"

"github.com/census-instrumentation/opencensus-service/processor/attributekeyprocessor"
)

func TestGlobalProcessorCfg_InitFromViper(t *testing.T) {
tests := []struct {
name string
file string
want *AttributesCfg
}{
{
name: "key_mapping",
file: "./testdata/global_attributes_key_mapping.yaml",
want: &AttributesCfg{
KeyReplacements: []attributekeyprocessor.KeyReplacement{
{
Key: "servertracer.http.responsecode",
NewKey: "http.status_code",
},
{
Key: "servertracer.http.responsephrase",
NewKey: "http.message",
Overwrite: true,
KeepOriginal: true,
},
},
},
},
{
name: "all_settings",
file: "./testdata/global_attributes_all.yaml",
want: &AttributesCfg{
Overwrite: true,
Values: map[string]interface{}{"some_string": "hello world"},
KeyReplacements: []attributekeyprocessor.KeyReplacement{
{
Key: "servertracer.http.responsecode",
NewKey: "http.status_code",
},
{
Key: "servertracer.http.responsephrase",
NewKey: "http.message",
Overwrite: true,
KeepOriginal: true,
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
v, err := loadViperFromFile(tt.file)
if err != nil {
t.Fatalf("Failed to load viper from test file: %v", err)
}

cfg := NewDefaultMultiSpanProcessorCfg().InitFromViper(v)

got := cfg.Global.Attributes
if got == nil {
t.Fatalf("got nil, want non-nil")
}

if diff := cmp.Diff(got, tt.want); diff != "" {
t.Errorf("Mismatched global configuration\n-Got +Want:\n\t%s", diff)
}
})
}
}
12 changes: 12 additions & 0 deletions cmd/occollector/app/builder/testdata/global_attributes_all.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
global:
attributes:
overwrite: true
values:
some_string: "hello world"
key-mapping:
- key: servertracer.http.responsecode
replacement: http.status_code
- key: servertracer.http.responsephrase
replacement: http.message
overwrite: true
keep: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
global:
attributes:
key-mapping:
- key: servertracer.http.responsecode
replacement: http.status_code
- key: servertracer.http.responsephrase
replacement: http.message
overwrite: true
keep: true
22 changes: 15 additions & 7 deletions cmd/occollector/app/collector/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/census-instrumentation/opencensus-service/internal/collector/sampling"
"github.com/census-instrumentation/opencensus-service/internal/config"
"github.com/census-instrumentation/opencensus-service/processor/addattributesprocessor"
"github.com/census-instrumentation/opencensus-service/processor/attributekeyprocessor"
"github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
)

Expand Down Expand Up @@ -310,18 +311,25 @@ func startProcessor(v *viper.Viper, logger *zap.Logger) (consumer.TraceConsumer,
}

// Wraps processors in a single one to be connected to all enabled receivers.
tp := multiconsumer.NewTraceProcessor(traceConsumers)
if multiProcessorCfg.Global != nil && multiProcessorCfg.Global.Attributes != nil {
logger.Info(
"Found global attributes config",
zap.Bool("overwrite", multiProcessorCfg.Global.Attributes.Overwrite),
zap.Any("values", multiProcessorCfg.Global.Attributes.Values),
zap.Any("key-mapping", multiProcessorCfg.Global.Attributes.KeyReplacements),
)
tp, _ := addattributesprocessor.NewTraceProcessor(
multiconsumer.NewTraceProcessor(traceConsumers),
addattributesprocessor.WithAttributes(multiProcessorCfg.Global.Attributes.Values),
addattributesprocessor.WithOverwrite(multiProcessorCfg.Global.Attributes.Overwrite),
)
return tp, closeFns

if len(multiProcessorCfg.Global.Attributes.Values) > 0 {
tp, _ = addattributesprocessor.NewTraceProcessor(
tp,
addattributesprocessor.WithAttributes(multiProcessorCfg.Global.Attributes.Values),
addattributesprocessor.WithOverwrite(multiProcessorCfg.Global.Attributes.Overwrite),
)
}
if len(multiProcessorCfg.Global.Attributes.KeyReplacements) > 0 {
tp, _ = attributekeyprocessor.NewTraceProcessor(tp, multiProcessorCfg.Global.Attributes.KeyReplacements...)
}
}
return multiconsumer.NewTraceProcessor(traceConsumers), closeFns
return tp, closeFns
}
106 changes: 106 additions & 0 deletions cmd/occollector/app/collector/processors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2019, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package collector

import (
"reflect"
"testing"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/census-instrumentation/opencensus-service/processor/addattributesprocessor"
"github.com/census-instrumentation/opencensus-service/processor/attributekeyprocessor"
"github.com/census-instrumentation/opencensus-service/processor/multiconsumer"
"github.com/census-instrumentation/opencensus-service/processor/processortest"
)

func Test_startProcessor(t *testing.T) {
tests := []struct {
name string
setupViperCfg func() *viper.Viper
wantExamplar func(t *testing.T) interface{}
}{
{
name: "incomplete_global_attrib_config",
setupViperCfg: func() *viper.Viper {
v := viper.New()
v.Set("logging-exporter", true)
v.Set("global.attributes.overwrite", true)
return v
},
wantExamplar: func(t *testing.T) interface{} {
return multiconsumer.NewTraceProcessor(nil)
},
},
{
name: "global_attrib_config_values",
setupViperCfg: func() *viper.Viper {
v := viper.New()
v.Set("logging-exporter", true)
v.Set("global.attributes.values", map[string]interface{}{"foo": "bar"})
return v
},
wantExamplar: func(t *testing.T) interface{} {
nopProcessor := processortest.NewNopTraceProcessor(nil)
addAttributesProcessor, err := addattributesprocessor.NewTraceProcessor(nopProcessor)
if err != nil {
t.Fatalf("addattributesprocessor.NewTraceProcessor() = %v", err)
}
return addAttributesProcessor
},
},
{
name: "global_attrib_config_key_mapping",
setupViperCfg: func() *viper.Viper {
v := viper.New()
v.Set("logging-exporter", true)
v.Set("global.attributes.key-mapping",
[]map[string]interface{}{
{
"key": "foo",
"replacement": "bar",
},
})
return v
},
wantExamplar: func(t *testing.T) interface{} {
nopProcessor := processortest.NewNopTraceProcessor(nil)
attributeKeyProcessor, err := attributekeyprocessor.NewTraceProcessor(nopProcessor)
if err != nil {
t.Fatalf("attributekeyprocessor.NewTraceProcessor() = %v", err)
}
return attributeKeyProcessor
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
consumer, closeFns := startProcessor(tt.setupViperCfg(), zap.NewNop())
if consumer == nil {
t.Errorf("startProcessor() got nil consumer")
}
consumerExamplar := tt.wantExamplar(t)
if reflect.TypeOf(consumer) != reflect.TypeOf(consumerExamplar) {
t.Errorf("startProcessor() got consumer type %q want %q",
reflect.TypeOf(consumer),
reflect.TypeOf(consumerExamplar))
}
for _, closeFn := range closeFns {
closeFn()
}
})
}
}
Loading

0 comments on commit 5fb7397

Please sign in to comment.