Skip to content

Commit

Permalink
[Spanmetrics] Add events_total metric to get the measurement for list…
Browse files Browse the repository at this point in the history
… of configured event attributes for a span (#27811)

**Description:**
We have an events section for a span. The details for all the exceptions
like exception.type and exception.message are recorded as Events for a
span. Right now, we don't have a feature to add event attributes to span
metrics.

The idea of this PR is to develop a feature which adds a new metric
`events_total` with a default set of dimensions like `service_name,
span_name, span_kind, status_code`. We can configure to add additional
set of dimensions like `exception.type` and `exception.message` which
will be fetched from the Events section for a span

**Link to tracking Issue:**
[27451](#27451)

---------

Co-authored-by: Albert <[email protected]>
  • Loading branch information
aishyandapalli and albertteoh authored Oct 31, 2023
1 parent d2c2265 commit b3328a7
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 11 deletions.
27 changes: 27 additions & 0 deletions .chloggen/events-metric-to-span-metrics.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add Events metric to span metrics connector that adds list of event attributes as dimensions

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27451]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
10 changes: 9 additions & 1 deletion connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ The following settings can be optionally configured:
- `metrics_flush_interval` (default: `15s`): Defines the flush interval of the generated metrics.
- `exemplars`: Use to configure how to attach exemplars to histograms
- `enabled` (default: `false`): enabling will add spans as Exemplars.
- `events`: Use to configure the events metric.
- `enabled`: (default: `false`): enabling will add the events metric.
- `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes.

## Examples

Expand Down Expand Up @@ -132,7 +135,12 @@ connectors:
exclude_dimensions: ['status.code']
dimensions_cache_size: 1000
aggregation_temporality: "AGGREGATION_TEMPORALITY_CUMULATIVE"
metrics_flush_interval: 15s
metrics_flush_interval: 15s
events:
enabled: true
dimensions:
- name: exception.type
- name: exception.message

service:
pipelines:
Expand Down
30 changes: 27 additions & 3 deletions connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ type Config struct {

// Exemplars defines the configuration for exemplars.
Exemplars ExemplarsConfig `mapstructure:"exemplars"`

// Events defines the configuration for events section of spans.
Events EventsConfig `mapstructure:"events"`
}

type HistogramConfig struct {
Expand All @@ -80,13 +83,22 @@ type ExplicitHistogramConfig struct {
Buckets []time.Duration `mapstructure:"buckets"`
}

type EventsConfig struct {
// Enabled is a flag to enable events.
Enabled bool `mapstructure:"enabled"`
// Dimensions defines the list of dimensions to add to the events metric.
Dimensions []Dimension `mapstructure:"dimensions"`
}

var _ component.ConfigValidator = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (c Config) Validate() error {
err := validateDimensions(c.Dimensions)
if err != nil {
return err
if err := validateDimensions(c.Dimensions); err != nil {
return fmt.Errorf("failed validating dimensions: %w", err)
}
if err := validateEventDimensions(c.Events.Enabled, c.Events.Dimensions); err != nil {
return fmt.Errorf("failed validating event dimensions: %w", err)
}

if c.DimensionsCacheSize <= 0 {
Expand All @@ -99,6 +111,7 @@ func (c Config) Validate() error {
if c.Histogram.Explicit != nil && c.Histogram.Exponential != nil {
return errors.New("use either `explicit` or `exponential` buckets histogram")
}

return nil
}

Expand Down Expand Up @@ -127,3 +140,14 @@ func validateDimensions(dimensions []Dimension) error {

return nil
}

// validateEventDimensions checks for empty and duplicates for the dimensions configured.
func validateEventDimensions(enabled bool, dimensions []Dimension) error {
if !enabled {
return nil
}
if len(dimensions) == 0 {
return fmt.Errorf("no dimensions configured for events")
}
return validateDimensions(dimensions)
}
44 changes: 44 additions & 0 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,47 @@ func TestValidateDimensions(t *testing.T) {
})
}
}

func TestValidateEventDimensions(t *testing.T) {
for _, tc := range []struct {
enabled bool
name string
dimensions []Dimension
expectedErr string
}{
{
enabled: false,
name: "disabled - no additional dimensions",
dimensions: []Dimension{},
},
{
enabled: true,
name: "enabled - no additional dimensions",
dimensions: []Dimension{},
expectedErr: "no dimensions configured for events",
},
{
enabled: true,
name: "enabled - no duplicate dimensions",
dimensions: []Dimension{{Name: "exception_type"}},
},
{
enabled: true,
name: "enabled - duplicate dimensions",
dimensions: []Dimension{
{Name: "exception_type"},
{Name: "exception_type"},
},
expectedErr: "duplicate dimension name exception_type",
},
} {
t.Run(tc.name, func(t *testing.T) {
err := validateEventDimensions(tc.enabled, tc.dimensions)
if tc.expectedErr != "" {
assert.EqualError(t, err, tc.expectedErr)
} else {
assert.NoError(t, err)
}
})
}
}
55 changes: 48 additions & 7 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (

metricNameDuration = "duration"
metricNameCalls = "calls"
metricNameEvents = "events"

defaultUnit = metrics.Milliseconds
)
Expand Down Expand Up @@ -66,11 +67,17 @@ type connectorImp struct {
started bool

shutdownOnce sync.Once

// Event dimensions to add to the events metric.
eDimensions []dimension

events EventsConfig
}

type resourceMetrics struct {
histograms metrics.HistogramMetrics
sums metrics.SumMetrics
events metrics.SumMetrics
attributes pcommon.Map
}

Expand Down Expand Up @@ -113,6 +120,8 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
metricKeyToDimensions: metricKeyToDimensionsCache,
ticker: ticker,
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
events: cfg.Events,
}, nil
}

Expand Down Expand Up @@ -245,6 +254,13 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics {
metric.SetUnit(p.config.Histogram.Unit.String())
histograms.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
}

events := rawMetrics.events
if p.events.Enabled {
metric = sm.Metrics().AppendEmpty()
metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents))
events.BuildMetrics(metric, p.startTimestamp, p.config.GetAggregationTemporality())
}
}

return m
Expand Down Expand Up @@ -288,6 +304,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
rm := p.getOrCreateResourceMetrics(resourceAttr)
sums := rm.sums
histograms := rm.histograms
events := rm.events

unitDivider := unitDivider(p.config.Histogram.Unit)
serviceName := serviceAttr.Str()
Expand All @@ -308,7 +325,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {

attributes, ok := p.metricKeyToDimensions.Get(key)
if !ok {
attributes = p.buildAttributes(serviceName, span, resourceAttr)
attributes = p.buildAttributes(serviceName, span, resourceAttr, p.dimensions)
p.metricKeyToDimensions.Add(key, attributes)
}
if !p.config.Histogram.Disable {
Expand All @@ -321,6 +338,29 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) {
// aggregate sums metrics
s := sums.GetOrCreate(key, attributes)
s.Add(1)

// aggregate events metrics
if p.events.Enabled {
for l := 0; l < span.Events().Len(); l++ {
event := span.Events().At(l)
eDimensions := p.dimensions
eDimensions = append(eDimensions, p.eDimensions...)

rscAndEventAttrs := pcommon.NewMap()
rscAndEventAttrs.EnsureCapacity(resourceAttr.Len() + event.Attributes().Len())
resourceAttr.CopyTo(rscAndEventAttrs)
event.Attributes().CopyTo(rscAndEventAttrs)

eKey := p.buildKey(serviceName, span, eDimensions, rscAndEventAttrs)
eAttributes, ok := p.metricKeyToDimensions.Get(eKey)
if !ok {
eAttributes = p.buildAttributes(serviceName, span, rscAndEventAttrs, eDimensions)
p.metricKeyToDimensions.Add(eKey, eAttributes)
}
e := events.GetOrCreate(eKey, eAttributes)
e.Add(1)
}
}
}
}
}
Expand All @@ -346,6 +386,7 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMet
v = &resourceMetrics{
histograms: initHistogramMetrics(p.config),
sums: metrics.NewSumMetrics(),
events: metrics.NewSumMetrics(),
attributes: attr,
}
p.resourceMetrics[key] = v
Expand All @@ -363,9 +404,9 @@ func contains(elements []string, value string) bool {
return false
}

func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map) pcommon.Map {
func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, resourceAttrs pcommon.Map, dimensions []dimension) pcommon.Map {
attr := pcommon.NewMap()
attr.EnsureCapacity(4 + len(p.dimensions))
attr.EnsureCapacity(4 + len(dimensions))
if !contains(p.config.ExcludeDimensions, serviceNameKey) {
attr.PutStr(serviceNameKey, serviceName)
}
Expand All @@ -378,7 +419,7 @@ func (p *connectorImp) buildAttributes(serviceName string, span ptrace.Span, res
if !contains(p.config.ExcludeDimensions, statusCodeKey) {
attr.PutStr(statusCodeKey, traceutil.StatusCodeStr(span.Status().Code()))
}
for _, d := range p.dimensions {
for _, d := range dimensions {
if v, ok := getDimensionValue(d, span.Attributes(), resourceAttrs); ok {
v.CopyTo(attr.PutEmpty(d.name))
}
Expand All @@ -395,10 +436,10 @@ func concatDimensionValue(dest *bytes.Buffer, value string, prefixSep bool) {

// buildKey builds the metric key from the service name and span metadata such as name, kind, status_code and
// will attempt to add any additional dimensions the user has configured that match the span's attributes
// or resource attributes. If the dimension exists in both, the span's attributes, being the most specific, takes precedence.
// or resource/event attributes. If the dimension exists in both, the span's attributes, being the most specific, takes precedence.
//
// The metric key is a simple concatenation of dimension values, delimited by a null character.
func (p *connectorImp) buildKey(serviceName string, span ptrace.Span, optionalDims []dimension, resourceAttrs pcommon.Map) metrics.Key {
func (p *connectorImp) buildKey(serviceName string, span ptrace.Span, optionalDims []dimension, resourceOrEventAttrs pcommon.Map) metrics.Key {
p.keyBuf.Reset()
if !contains(p.config.ExcludeDimensions, serviceNameKey) {
concatDimensionValue(p.keyBuf, serviceName, false)
Expand All @@ -414,7 +455,7 @@ func (p *connectorImp) buildKey(serviceName string, span ptrace.Span, optionalDi
}

for _, d := range optionalDims {
if v, ok := getDimensionValue(d, span.Attributes(), resourceAttrs); ok {
if v, ok := getDimensionValue(d, span.Attributes(), resourceOrEventAttrs); ok {
concatDimensionValue(p.keyBuf, v.AsString(), true)
}
}
Expand Down
59 changes: 59 additions & 0 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ const (
notInSpanAttrName0 = "shouldBeInMetric"
notInSpanAttrName1 = "shouldNotBeInMetric"
regionResourceAttrName = "region"
exceptionTypeAttrName = "exception.type"
DimensionsCacheSize = 2

sampleRegion = "us-east-1"
Expand Down Expand Up @@ -394,6 +395,10 @@ func initSpan(span span, s ptrace.Span) {
s.Attributes().PutEmptySlice(arrayAttrName)
s.SetTraceID(pcommon.TraceID(span.traceID))
s.SetSpanID(pcommon.SpanID(span.spanID))

e := s.Events().AppendEmpty()
e.SetName("exception")
e.Attributes().PutStr(exceptionTypeAttrName, "NullPointerException")
}

func disabledExemplarsConfig() ExemplarsConfig {
Expand Down Expand Up @@ -1259,3 +1264,57 @@ func TestConnector_initHistogramMetrics(t *testing.T) {
})
}
}

func TestSpanMetrics_Events(t *testing.T) {
tests := []struct {
name string
eventsConfig EventsConfig
shouldEventsMetricExist bool
}{
{
name: "events disabled",
eventsConfig: EventsConfig{Enabled: false, Dimensions: []Dimension{{Name: "exception.type", Default: stringp("NullPointerException")}}},
shouldEventsMetricExist: false,
},
{
name: "events enabled",
eventsConfig: EventsConfig{Enabled: true, Dimensions: []Dimension{{Name: "exception.type", Default: stringp("NullPointerException")}}},
shouldEventsMetricExist: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Events = tt.eventsConfig
c, err := newConnector(zaptest.NewLogger(t), cfg, nil)
require.NoError(t, err)
err = c.ConsumeTraces(context.Background(), buildSampleTrace())
require.NoError(t, err)
metrics := c.buildMetrics()
for i := 0; i < metrics.ResourceMetrics().Len(); i++ {
rm := metrics.ResourceMetrics().At(i)
ism := rm.ScopeMetrics()
for ilmC := 0; ilmC < ism.Len(); ilmC++ {
m := ism.At(ilmC).Metrics()
if !tt.shouldEventsMetricExist {
assert.Equal(t, 2, m.Len())
continue
}
assert.Equal(t, 3, m.Len())
for mC := 0; mC < m.Len(); mC++ {
metric := m.At(mC)
if metric.Name() != "events" {
continue
}
assert.Equal(t, pmetric.MetricTypeSum, metric.Type())
for idp := 0; idp < metric.Sum().DataPoints().Len(); idp++ {
attrs := metric.Sum().DataPoints().At(idp).Attributes()
assert.Contains(t, attrs.AsRaw(), exceptionTypeAttrName)
}
}
}
}
})
}
}

0 comments on commit b3328a7

Please sign in to comment.