Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: otel prometheus duplicated attributes #62

Merged
merged 5 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion stats/internal/otel/prometheus/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (c *collector) Collect(ch chan<- prometheus.Metric) {
scopeKeys = append(scopeKeys, "job")
scopeValues = append(scopeValues, attr.Value.AsString())
}
scopeKeys = append(scopeKeys, strings.ReplaceAll(string(attr.Key), ".", "_"))
scopeKeys = append(scopeKeys, strings.Map(sanitizeRune, string(attr.Key)))
scopeValues = append(scopeValues, attr.Value.AsString())
}

Expand Down Expand Up @@ -279,6 +279,8 @@ func (c *collector) createInfoMetric(name, description string, res *resource.Res
return prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(1), values...)
}

// BEWARE that we are already sanitizing metric names in the OTel adapter, see sanitizeTagKey function,
// but we still need this function to sanitize metrics coming from the internal OpenTelemetry client
func sanitizeRune(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) || r == ':' || r == '_' {
return r
Expand Down
37 changes: 25 additions & 12 deletions stats/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ const (

// otelStats is an OTel-specific adapter that follows the Stats contract
type otelStats struct {
config statsConfig
otelConfig otelStatsConfig
config statsConfig
otelConfig otelStatsConfig
resourceAttrs map[string]struct{}

meter metric.Meter
counters map[string]instrument.Int64Counter
Expand Down Expand Up @@ -59,11 +60,16 @@ func (s *otelStats) Start(ctx context.Context, goFactory GoRoutineFactory) error

// Starting OpenTelemetry setup
var attrs []attribute.KeyValue
s.resourceAttrs = make(map[string]struct{})
if s.config.instanceName != "" {
attrs = append(attrs, attribute.String("instanceName", s.config.instanceName))
sanitized := sanitizeTagKey("instanceName")
attrs = append(attrs, attribute.String(sanitized, s.config.instanceName))
s.resourceAttrs[sanitized] = struct{}{}
}
if s.config.namespaceIdentifier != "" {
attrs = append(attrs, attribute.String("namespace", s.config.namespaceIdentifier))
sanitized := sanitizeTagKey("namespace")
attrs = append(attrs, attribute.String(sanitized, s.config.namespaceIdentifier))
s.resourceAttrs[sanitized] = struct{}{}
}
res, err := otel.NewResource(s.config.serviceName, s.config.serviceVersion, attrs...)
if err != nil {
Expand Down Expand Up @@ -238,30 +244,37 @@ func (s *otelStats) getMeasurement(name, statType string, tags Tags) Measurement
}

// Clean up tags based on deployment type. No need to send workspace id tag for free tier customers.
newTags := make(Tags)
for k, v := range tags {
if strings.Trim(k, " ") == "" {
s.logger.Warnf("removing empty tag key with value %s for measurement %s", v, name)
delete(tags, k)
s.logger.Warnf("removing empty tag key with value %q for measurement %q", v, name)
continue
}
if _, ok := s.config.excludedTags[k]; ok {
delete(tags, k)
continue
}
Comment on lines 253 to 255
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, better safe than sorry since we might not be the only ones using this lib and I'm not planning to check everywhere if removing it might break an exclusion 🤷‍♂️

}
if tags == nil {
tags = make(Tags)
sanitizedKey := sanitizeTagKey(k)
if _, ok := s.config.excludedTags[sanitizedKey]; ok {
continue
}
if _, ok := s.resourceAttrs[sanitizedKey]; ok {
s.logger.Warnf("removing tag %q for measurement %q since it is a resource attribute", k, name)
continue
}
newTags[sanitizedKey] = v
}

om := &otelMeasurement{
genericMeasurement: genericMeasurement{statType: statType},
attributes: tags.otelAttributes(),
attributes: newTags.otelAttributes(),
}

switch statType {
case CountType:
instr := buildOTelInstrument(s.meter, name, s.counters, &s.countersMu)
return &otelCounter{counter: instr, otelMeasurement: om}
case GaugeType:
return s.getGauge(s.meter, name, om.attributes, tags.String())
return s.getGauge(s.meter, name, om.attributes, newTags.String())
case TimerType:
instr := buildOTelInstrument(s.meter, name, s.timers, &s.timersMu)
return &otelTimer{timer: instr, otelMeasurement: om}
Expand Down
76 changes: 76 additions & 0 deletions stats/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus"
promClient "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/httputil"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/logger/mock_logger"
"github.com/rudderlabs/rudder-go-kit/stats/metric"
statsTest "github.com/rudderlabs/rudder-go-kit/stats/testhelper"
"github.com/rudderlabs/rudder-go-kit/testhelper"
Expand Down Expand Up @@ -776,6 +778,72 @@ func TestPrometheusCustomRegistry(t *testing.T) {
})
}

func TestPrometheusDuplicatedAttributes(t *testing.T) {
freePort, err := testhelper.GetFreePort()
require.NoError(t, err)

c := config.New()
c.Set("INSTANCE_ID", "my-instance-id")
c.Set("OpenTelemetry.enabled", true)
c.Set("OpenTelemetry.metrics.prometheus.enabled", true)
c.Set("OpenTelemetry.metrics.prometheus.port", freePort)
c.Set("OpenTelemetry.metrics.exportInterval", time.Millisecond)
c.Set("RuntimeStats.enabled", false)
ctrl := gomock.NewController(t)
loggerSpy := mock_logger.NewMockLogger(ctrl)
loggerSpy.EXPECT().Infof(gomock.Any(), gomock.Any()).AnyTimes()
loggerSpy.EXPECT().Warnf(
"removing tag %q for measurement %q since it is a resource attribute",
"instanceName", "foo",
).Times(1)
loggerFactory := mock_logger.NewMockLogger(ctrl)
loggerFactory.EXPECT().Child(gomock.Any()).Times(1).Return(loggerSpy)
l := newLoggerSpyFactory(loggerFactory)
m := metric.NewManager()
r := prometheus.NewRegistry()
s := NewStats(c, l, m,
WithServiceName(t.Name()),
WithServiceVersion("v1.2.3"),
WithPrometheusRegistry(r, r),
)
require.NoError(t, s.Start(context.Background(), DefaultGoRoutineFactory))
t.Cleanup(s.Stop)

metricName := "foo"
s.NewTaggedStat(metricName, CountType, Tags{"a": "b", "instanceName": "from-metric"}).Count(7)

var (
resp *http.Response
metrics map[string]*promClient.MetricFamily
metricsEndpoint = fmt.Sprintf("http://localhost:%d/metrics", freePort)
)
require.Eventuallyf(t, func() bool {
resp, err = http.Get(metricsEndpoint)
if err != nil {
return false
}
defer func() { httputil.CloseResponse(resp) }()
metrics, err = statsTest.ParsePrometheusMetrics(resp.Body)
if err != nil {
return false
}
if _, ok := metrics[metricName]; !ok {
return false
}
return true
}, 10*time.Second, 100*time.Millisecond, "err: %v, metrics: %+v", err, metrics)

require.EqualValues(t, &metricName, metrics[metricName].Name)
require.EqualValues(t, ptr(promClient.MetricType_COUNTER), metrics[metricName].Type)
require.Len(t, metrics[metricName].Metric, 1)
require.EqualValues(t, &promClient.Counter{Value: ptr(7.0)}, metrics[metricName].Metric[0].Counter)
require.ElementsMatchf(t, append(globalDefaultAttrs,
&promClient.LabelPair{Name: ptr("a"), Value: ptr("b")},
&promClient.LabelPair{Name: ptr("job"), Value: ptr(t.Name())},
&promClient.LabelPair{Name: ptr("service_name"), Value: ptr(t.Name())},
), metrics[metricName].Metric[0].Label, "Got %+v", metrics[metricName].Metric[0].Label)
}

func getDataPoint[T any](ctx context.Context, t *testing.T, rdr sdkmetric.Reader, name string, idx int) (zero T) {
t.Helper()
rm := metricdata.ResourceMetrics{}
Expand Down Expand Up @@ -871,3 +939,11 @@ func (r TestMeasurement) GetTags() map[string]string {
"destType": r.destType,
}
}

type loggerSpyFactory struct{ spy logger.Logger }

func (f loggerSpyFactory) NewLogger() logger.Logger { return f.spy }

func newLoggerSpyFactory(l logger.Logger) loggerFactory {
return &loggerSpyFactory{spy: l}
}
26 changes: 25 additions & 1 deletion stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ package stats
import (
"context"
"os"
"strings"
"sync/atomic"
"time"
"unicode"

"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/metric/global"
Expand Down Expand Up @@ -54,9 +56,13 @@ type Stats interface {
Stop()
}

type loggerFactory interface {
NewLogger() logger.Logger
}

// NewStats create a new Stats instance using the provided config, logger factory and metric manager as dependencies
func NewStats(
config *config.Config, loggerFactory *logger.Factory, metricManager svcMetric.Manager, opts ...Option,
config *config.Config, loggerFactory loggerFactory, metricManager svcMetric.Manager, opts ...Option,
) Stats {
excludedTags := make(map[string]struct{})
excludedTagsSlice := config.GetStringSlice("statsExcludedTags", nil)
Expand Down Expand Up @@ -140,3 +146,21 @@ type defaultGoRoutineFactory struct{}
func (defaultGoRoutineFactory) Go(function func()) {
go function()
}

func sanitizeTagKey(key string) string {
return strings.Map(sanitizeRune, key)
}

// This function has been copied from the prometheus exporter.
// Thus changes done only here might not always produce the desired result when exporting to prometheus
// unless the prometheus exporter is also updated.
// The rationale behind the duplication is that this function is used across all our Stats modes (statsd, prom, otel...)
// and the one in the prometheus exporter is still used to sanitize some attributes set on a Resource level from
// the OpenTelemetry client itself or 3rd parties.
// Alternatively we could further customise the prometheus exporter and make it use the same function (this one).
func sanitizeRune(r rune) rune {
if unicode.IsLetter(r) || unicode.IsDigit(r) || r == ':' || r == '_' {
return r
}
return '_'
}
30 changes: 18 additions & 12 deletions stats/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,18 +159,24 @@ func (s *statsdStats) internalNewTaggedStat(name, statType string, tags Tags, sa
}

// Clean up tags based on deployment type. No need to send workspace id tag for free tier customers.
for excludedTag := range s.config.excludedTags {
delete(tags, excludedTag)
}
if tags == nil {
tags = make(Tags)
}
if v, ok := tags[""]; ok {
s.logger.Warnf("removing empty tag key with value %s for measurement %s", v, name)
delete(tags, "")
newTags := make(Tags)
for k, v := range tags {
if strings.Trim(k, " ") == "" {
s.logger.Warnf("removing empty tag key with value %q for measurement %q", v, name)
continue
}
if _, ok := s.config.excludedTags[k]; ok {
continue
}
Comment on lines +168 to +170
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this?

Suggested change
if _, ok := s.config.excludedTags[k]; ok {
continue
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought I'd leave it for backwards compatibility in case somewhere there is an exclusion rule that would work only before the tag is sanitized. I thought it wouldn't hurt having for now.

sanitizedKey := sanitizeTagKey(k)
if _, ok := s.config.excludedTags[sanitizedKey]; ok {
continue
}
newTags[sanitizedKey] = v
}
// key comprises of the measurement type plus all tag-value pairs
taggedClientKey := tags.String() + fmt.Sprintf("%f", samplingRate)

// key comprises the measurement type plus all tag-value pairs
taggedClientKey := newTags.String() + fmt.Sprintf("%f", samplingRate)

s.state.clientsLock.RLock()
taggedClient, found := s.state.clients[taggedClientKey]
Expand All @@ -179,7 +185,7 @@ func (s *statsdStats) internalNewTaggedStat(name, statType string, tags Tags, sa
if !found {
s.state.clientsLock.Lock()
if taggedClient, found = s.state.clients[taggedClientKey]; !found { // double check for race
tagVals := tags.Strings()
tagVals := newTags.Strings()
taggedClient = &statsdClient{samplingRate: samplingRate, tags: tagVals}
if s.state.connEstablished {
taggedClient.statsd = s.state.client.statsd.Clone(s.state.conn, s.statsdConfig.statsdTagsFormat(), s.statsdConfig.statsdDefaultTags(), statsd.Tags(tagVals...), statsd.SampleRate(samplingRate))
Expand Down