Skip to content

Commit

Permalink
[coordinator] Store metrics type into the annotation (#2628)
Browse files Browse the repository at this point in the history
* Rename existing protobuf TimeSeries.type field to m3_type to avoid collision

* Add new Prometheus protobuf fields

* Rename the internal MetricsType to M3MetricsType

* Implement conversions

* Write metrics type as annotation payload

* Avoid reusing annotation slices

* Fix test

* Introduce metric family type

* Address review feedback

* Revert "Introduce metric family type"

This reverts commit d108b4f.

* Introduce annotation.Payload.handle_value_resets field

* Minor changes according to PR feedback
  • Loading branch information
linasm authored Oct 6, 2020
1 parent 2df33bf commit ac2ef9b
Show file tree
Hide file tree
Showing 14 changed files with 1,155 additions and 119 deletions.
2 changes: 1 addition & 1 deletion src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type SamplesAppenderResult struct {
type SampleAppenderOptions struct {
Override bool
OverrideRules SamplesAppenderOverrideRules
MetricType ts.MetricType
MetricType ts.M3MetricType
}

// SamplesAppenderOverrideRules provides override rules to
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilter(t *testing.
},
},
sampleAppenderOpts: &SampleAppenderOptions{
MetricType: ts.MetricTypeCounter,
MetricType: ts.M3MetricTypeCounter,
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
Expand Down Expand Up @@ -436,7 +436,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilterNoMatch(t *t
},
},
sampleAppenderOpts: &SampleAppenderOptions{
MetricType: ts.MetricTypeGauge,
MetricType: ts.M3MetricTypeGauge,
},
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: []testGaugeMetric{gaugeMetric},
Expand Down
8 changes: 4 additions & 4 deletions src/cmd/services/m3coordinator/downsample/metrics_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,13 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp
if a.augmentM3Tags {
// NB (@shreyas): Add the metric type tag. The tag has the prefix
// __m3_. All tags with that prefix are only used for the purpose of
// filter matchiand then stripped off before we actually send to the aggregator.
// filter match and then stripped off before we actually send to the aggregator.
switch opts.MetricType {
case ts.MetricTypeCounter:
case ts.M3MetricTypeCounter:
tags.append(metric.M3TypeTag, metric.M3CounterValue)
case ts.MetricTypeGauge:
case ts.M3MetricTypeGauge:
tags.append(metric.M3TypeTag, metric.M3GaugeValue)
case ts.MetricTypeTimer:
case ts.M3MetricTypeTimer:
tags.append(metric.M3TypeTag, metric.M3TimerValue)
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/services/m3coordinator/ingest/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func (d *downsamplerAndWriter) writeAggregatedBatch(
}

opts := downsample.SampleAppenderOptions{
MetricType: value.Attributes.Type,
MetricType: value.Attributes.M3Type,
}
if downsampleMappingRuleOverrides, ok := d.downsampleOverrideRules(overrides); ok {
opts = downsample.SampleAppenderOptions{
Expand All @@ -484,12 +484,12 @@ func (d *downsamplerAndWriter) writeAggregatedBatch(
}

for _, dp := range value.Datapoints {
switch value.Attributes.Type {
case ts.MetricTypeGauge:
switch value.Attributes.M3Type {
case ts.M3MetricTypeGauge:
err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value)
case ts.MetricTypeCounter:
case ts.M3MetricTypeCounter:
err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value))
case ts.MetricTypeTimer:
case ts.M3MetricTypeTimer:
err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value)
}
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions src/cmd/services/m3coordinator/ingest/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,13 @@ var (
testAnnotation2 = []byte("second")

testAttributesGauge = ts.SeriesAttributes{
Type: ts.MetricTypeGauge,
M3Type: ts.M3MetricTypeGauge,
}
testAttributesCounter = ts.SeriesAttributes{
Type: ts.MetricTypeCounter,
M3Type: ts.M3MetricTypeCounter,
}
testAttributesTimer = ts.SeriesAttributes{
Type: ts.MetricTypeTimer,
M3Type: ts.M3MetricTypeTimer,
}

testEntries = []testIterEntry{
Expand Down Expand Up @@ -565,11 +565,11 @@ func TestDownsampleAndWriteBatchDifferentTypes(t *testing.T) {

mockMetricsAppender.
EXPECT().
SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.MetricTypeCounter}).
SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.M3MetricTypeCounter}).
Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).Times(1)
mockMetricsAppender.
EXPECT().
SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.MetricTypeTimer}).
SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.M3MetricTypeTimer}).
Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).Times(1)
for _, tag := range testTags1.Tags {
mockMetricsAppender.EXPECT().AddTag(tag.Name, tag.Value)
Expand Down
Loading

0 comments on commit ac2ef9b

Please sign in to comment.