From ac2ef9b9da9685cfdad2312aca35bf093c1ebb38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linas=20Med=C5=BEi=C5=ABnas?= Date: Tue, 6 Oct 2020 11:11:50 +0300 Subject: [PATCH] [coordinator] Store metrics type into the annotation (#2628) * Rename existing protobuf TimeSeries.type field to m3_type to avoid collision * Add new Prometheus protobuf fields * Rename the internal MetricsType to M3MetricsType * Implement conversions * Write metrics type as annotation payload * Avoid reusing annotation slices * Fix test * Introduce metric family type * Address review feedback * Revert "Introduce metric family type" This reverts commit d108b4f47716b60862913b8cba5f98a896ded259. * Introduce annotation.Payload.handle_value_resets field * Minor changes according to PR feedback --- .../m3coordinator/downsample/downsampler.go | 2 +- .../downsample/downsampler_test.go | 4 +- .../downsample/metrics_appender.go | 8 +- .../services/m3coordinator/ingest/write.go | 10 +- .../m3coordinator/ingest/write_test.go | 10 +- .../proto/annotation/annotation.pb.go | 405 ++++++++++++++++++ .../proto/annotation/annotation.proto | 19 + .../api/v1/handler/prometheus/remote/write.go | 34 +- .../handler/prometheus/remote/write_test.go | 150 ++++++- src/query/generated/proto/prompb/types.pb.go | 288 ++++++++++--- src/query/generated/proto/prompb/types.proto | 31 +- src/query/storage/converter.go | 127 +++++- src/query/storage/converter_test.go | 126 ++++++ src/query/ts/metadata.go | 60 ++- 14 files changed, 1155 insertions(+), 119 deletions(-) create mode 100644 src/dbnode/generated/proto/annotation/annotation.pb.go create mode 100644 src/dbnode/generated/proto/annotation/annotation.proto diff --git a/src/cmd/services/m3coordinator/downsample/downsampler.go b/src/cmd/services/m3coordinator/downsample/downsampler.go index 0be8515118..61809944bb 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler.go @@ -58,7 +58,7 @@ type SamplesAppenderResult struct { type SampleAppenderOptions struct { Override bool OverrideRules SamplesAppenderOverrideRules - MetricType ts.MetricType + MetricType ts.M3MetricType } // SamplesAppenderOverrideRules provides override rules to diff --git a/src/cmd/services/m3coordinator/downsample/downsampler_test.go b/src/cmd/services/m3coordinator/downsample/downsampler_test.go index 6d3ea5bfb2..7b7e462824 100644 --- a/src/cmd/services/m3coordinator/downsample/downsampler_test.go +++ b/src/cmd/services/m3coordinator/downsample/downsampler_test.go @@ -382,7 +382,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilter(t *testing. }, }, sampleAppenderOpts: &SampleAppenderOptions{ - MetricType: ts.MetricTypeCounter, + MetricType: ts.M3MetricTypeCounter, }, ingest: &testDownsamplerOptionsIngest{ gaugeMetrics: []testGaugeMetric{gaugeMetric}, @@ -436,7 +436,7 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesTypeFilterNoMatch(t *t }, }, sampleAppenderOpts: &SampleAppenderOptions{ - MetricType: ts.MetricTypeGauge, + MetricType: ts.M3MetricTypeGauge, }, ingest: &testDownsamplerOptionsIngest{ gaugeMetrics: []testGaugeMetric{gaugeMetric}, diff --git a/src/cmd/services/m3coordinator/downsample/metrics_appender.go b/src/cmd/services/m3coordinator/downsample/metrics_appender.go index 4f82fbd2ea..8811af493f 100644 --- a/src/cmd/services/m3coordinator/downsample/metrics_appender.go +++ b/src/cmd/services/m3coordinator/downsample/metrics_appender.go @@ -153,13 +153,13 @@ func (a *metricsAppender) SamplesAppender(opts SampleAppenderOptions) (SamplesAp if a.augmentM3Tags { // NB (@shreyas): Add the metric type tag. The tag has the prefix // __m3_. All tags with that prefix are only used for the purpose of - // filter matchiand then stripped off before we actually send to the aggregator. + // filter match and then stripped off before we actually send to the aggregator. switch opts.MetricType { - case ts.MetricTypeCounter: + case ts.M3MetricTypeCounter: tags.append(metric.M3TypeTag, metric.M3CounterValue) - case ts.MetricTypeGauge: + case ts.M3MetricTypeGauge: tags.append(metric.M3TypeTag, metric.M3GaugeValue) - case ts.MetricTypeTimer: + case ts.M3MetricTypeTimer: tags.append(metric.M3TypeTag, metric.M3TimerValue) } } diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index 9889c23d89..b5902b6dcd 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -462,7 +462,7 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( } opts := downsample.SampleAppenderOptions{ - MetricType: value.Attributes.Type, + MetricType: value.Attributes.M3Type, } if downsampleMappingRuleOverrides, ok := d.downsampleOverrideRules(overrides); ok { opts = downsample.SampleAppenderOptions{ @@ -484,12 +484,12 @@ func (d *downsamplerAndWriter) writeAggregatedBatch( } for _, dp := range value.Datapoints { - switch value.Attributes.Type { - case ts.MetricTypeGauge: + switch value.Attributes.M3Type { + case ts.M3MetricTypeGauge: err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value) - case ts.MetricTypeCounter: + case ts.M3MetricTypeCounter: err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value)) - case ts.MetricTypeTimer: + case ts.M3MetricTypeTimer: err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value) } if err != nil { diff --git a/src/cmd/services/m3coordinator/ingest/write_test.go b/src/cmd/services/m3coordinator/ingest/write_test.go index 43281876c1..b55df9037e 100644 --- a/src/cmd/services/m3coordinator/ingest/write_test.go +++ b/src/cmd/services/m3coordinator/ingest/write_test.go @@ -129,13 +129,13 @@ var ( testAnnotation2 = []byte("second") testAttributesGauge = ts.SeriesAttributes{ - Type: ts.MetricTypeGauge, + M3Type: ts.M3MetricTypeGauge, } testAttributesCounter = ts.SeriesAttributes{ - Type: ts.MetricTypeCounter, + M3Type: ts.M3MetricTypeCounter, } testAttributesTimer = ts.SeriesAttributes{ - Type: ts.MetricTypeTimer, + M3Type: ts.M3MetricTypeTimer, } testEntries = []testIterEntry{ @@ -565,11 +565,11 @@ func TestDownsampleAndWriteBatchDifferentTypes(t *testing.T) { mockMetricsAppender. EXPECT(). - SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.MetricTypeCounter}). + SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.M3MetricTypeCounter}). Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).Times(1) mockMetricsAppender. EXPECT(). - SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.MetricTypeTimer}). + SamplesAppender(downsample.SampleAppenderOptions{MetricType: ts.M3MetricTypeTimer}). Return(downsample.SamplesAppenderResult{SamplesAppender: mockSamplesAppender}, nil).Times(1) for _, tag := range testTags1.Tags { mockMetricsAppender.EXPECT().AddTag(tag.Name, tag.Value) diff --git a/src/dbnode/generated/proto/annotation/annotation.pb.go b/src/dbnode/generated/proto/annotation/annotation.pb.go new file mode 100644 index 0000000000..4151c04dde --- /dev/null +++ b/src/dbnode/generated/proto/annotation/annotation.pb.go @@ -0,0 +1,405 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: github.com/m3db/m3/src/dbnode/generated/proto/annotation/annotation.proto + +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +/* + Package annotation is a generated protocol buffer package. + + It is generated from these files: + github.com/m3db/m3/src/dbnode/generated/proto/annotation/annotation.proto + + It has these top-level messages: + Payload +*/ +package annotation + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type MetricType int32 + +const ( + MetricType_UNKNOWN MetricType = 0 + MetricType_COUNTER MetricType = 1 + MetricType_GAUGE MetricType = 2 + MetricType_HISTOGRAM MetricType = 3 + MetricType_GAUGE_HISTOGRAM MetricType = 4 + MetricType_SUMMARY MetricType = 5 + MetricType_INFO MetricType = 6 + MetricType_STATESET MetricType = 7 +) + +var MetricType_name = map[int32]string{ + 0: "UNKNOWN", + 1: "COUNTER", + 2: "GAUGE", + 3: "HISTOGRAM", + 4: "GAUGE_HISTOGRAM", + 5: "SUMMARY", + 6: "INFO", + 7: "STATESET", +} +var MetricType_value = map[string]int32{ + "UNKNOWN": 0, + "COUNTER": 1, + "GAUGE": 2, + "HISTOGRAM": 3, + "GAUGE_HISTOGRAM": 4, + "SUMMARY": 5, + "INFO": 6, + "STATESET": 7, +} + +func (x MetricType) String() string { + return proto.EnumName(MetricType_name, int32(x)) +} +func (MetricType) EnumDescriptor() ([]byte, []int) { return fileDescriptorAnnotation, []int{0} } + +type Payload struct { + MetricType MetricType `protobuf:"varint,1,opt,name=metric_type,json=metricType,proto3,enum=annotation.MetricType" json:"metric_type,omitempty"` + HandleValueResets bool `protobuf:"varint,2,opt,name=handle_value_resets,json=handleValueResets,proto3" json:"handle_value_resets,omitempty"` +} + +func (m *Payload) Reset() { *m = Payload{} } +func (m *Payload) String() string { return proto.CompactTextString(m) } +func (*Payload) ProtoMessage() {} +func (*Payload) Descriptor() ([]byte, []int) { return fileDescriptorAnnotation, []int{0} } + +func (m *Payload) GetMetricType() MetricType { + if m != nil { + return m.MetricType + } + return MetricType_UNKNOWN +} + +func (m *Payload) GetHandleValueResets() bool { + if m != nil { + return m.HandleValueResets + } + return false +} + +func init() { + proto.RegisterType((*Payload)(nil), "annotation.Payload") + proto.RegisterEnum("annotation.MetricType", MetricType_name, MetricType_value) +} +func (m *Payload) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Payload) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.MetricType != 0 { + dAtA[i] = 0x8 + i++ + i = encodeVarintAnnotation(dAtA, i, uint64(m.MetricType)) + } + if m.HandleValueResets { + dAtA[i] = 0x10 + i++ + if m.HandleValueResets { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + return i, nil +} + +func encodeVarintAnnotation(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Payload) Size() (n int) { + var l int + _ = l + if m.MetricType != 0 { + n += 1 + sovAnnotation(uint64(m.MetricType)) + } + if m.HandleValueResets { + n += 2 + } + return n +} + +func sovAnnotation(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozAnnotation(x uint64) (n int) { + return sovAnnotation(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Payload) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAnnotation + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Payload: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Payload: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field MetricType", wireType) + } + m.MetricType = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAnnotation + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.MetricType |= (MetricType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field HandleValueResets", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowAnnotation + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.HandleValueResets = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipAnnotation(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthAnnotation + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipAnnotation(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAnnotation + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAnnotation + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAnnotation + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthAnnotation + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowAnnotation + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipAnnotation(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthAnnotation = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowAnnotation = fmt.Errorf("proto: integer overflow") +) + +func init() { + proto.RegisterFile("github.com/m3db/m3/src/dbnode/generated/proto/annotation/annotation.proto", fileDescriptorAnnotation) +} + +var fileDescriptorAnnotation = []byte{ + // 297 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0xd0, 0x4d, 0x4b, 0xc3, 0x30, + 0x1c, 0x06, 0xf0, 0x65, 0xee, 0xf5, 0x3f, 0x5f, 0x62, 0x06, 0xb2, 0x53, 0x19, 0x9e, 0x86, 0x87, + 0x06, 0xdc, 0xc1, 0x73, 0x95, 0x3a, 0x87, 0xb4, 0x95, 0x36, 0x55, 0x3c, 0x95, 0x74, 0x09, 0xdb, + 0x60, 0x4d, 0x46, 0x96, 0x09, 0xf3, 0x53, 0xf8, 0xb1, 0x3c, 0xfa, 0x11, 0x64, 0x7e, 0x11, 0x59, + 0x05, 0xb7, 0xdb, 0x93, 0xe7, 0xc7, 0x93, 0xc3, 0x1f, 0xc6, 0xd3, 0xb9, 0x9d, 0xad, 0x73, 0x77, + 0xa2, 0x0b, 0x5a, 0x0c, 0x45, 0x4e, 0x8b, 0x21, 0x5d, 0x99, 0x09, 0x15, 0xb9, 0xd2, 0x42, 0xd2, + 0xa9, 0x54, 0xd2, 0x70, 0x2b, 0x05, 0x5d, 0x1a, 0x6d, 0x35, 0xe5, 0x4a, 0x69, 0xcb, 0xed, 0x5c, + 0xab, 0x83, 0xe8, 0x96, 0x46, 0x60, 0xdf, 0x5c, 0x1a, 0x68, 0x3e, 0xf1, 0xcd, 0x42, 0x73, 0x41, + 0x6e, 0xa0, 0x53, 0x48, 0x6b, 0xe6, 0x93, 0xcc, 0x6e, 0x96, 0xb2, 0x87, 0xfa, 0x68, 0x70, 0x7a, + 0x7d, 0xe1, 0x1e, 0xcc, 0x83, 0x92, 0xd9, 0x66, 0x29, 0x63, 0x28, 0xfe, 0x33, 0x71, 0xa1, 0x3b, + 0xe3, 0x4a, 0x2c, 0x64, 0xf6, 0xc6, 0x17, 0x6b, 0x99, 0x19, 0xb9, 0x92, 0x76, 0xd5, 0xab, 0xf6, + 0xd1, 0xa0, 0x15, 0x9f, 0xff, 0xd1, 0xf3, 0x4e, 0xe2, 0x12, 0xae, 0xde, 0x01, 0xf6, 0x3f, 0x91, + 0x0e, 0x34, 0xd3, 0xf0, 0x31, 0x8c, 0x5e, 0x42, 0x5c, 0xd9, 0x3d, 0xee, 0xa2, 0x34, 0x64, 0x7e, + 0x8c, 0x11, 0x69, 0x43, 0x7d, 0xe4, 0xa5, 0x23, 0x1f, 0x57, 0xc9, 0x09, 0xb4, 0x1f, 0xc6, 0x09, + 0x8b, 0x46, 0xb1, 0x17, 0xe0, 0x23, 0xd2, 0x85, 0xb3, 0x52, 0xb2, 0x7d, 0x59, 0xdb, 0x6d, 0x93, + 0x34, 0x08, 0xbc, 0xf8, 0x15, 0xd7, 0x49, 0x0b, 0x6a, 0xe3, 0xf0, 0x3e, 0xc2, 0x0d, 0x72, 0x0c, + 0xad, 0x84, 0x79, 0xcc, 0x4f, 0x7c, 0x86, 0x9b, 0xb7, 0xf8, 0x73, 0xeb, 0xa0, 0xaf, 0xad, 0x83, + 0xbe, 0xb7, 0x0e, 0xfa, 0xf8, 0x71, 0x2a, 0x79, 0xa3, 0x3c, 0xca, 0xf0, 0x37, 0x00, 0x00, 0xff, + 0xff, 0x25, 0xe6, 0x7a, 0x90, 0x61, 0x01, 0x00, 0x00, +} diff --git a/src/dbnode/generated/proto/annotation/annotation.proto b/src/dbnode/generated/proto/annotation/annotation.proto new file mode 100644 index 0000000000..762e7cc049 --- /dev/null +++ b/src/dbnode/generated/proto/annotation/annotation.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package annotation; + +message Payload { + MetricType metric_type = 1; + bool handle_value_resets = 2; +} + +enum MetricType { + UNKNOWN = 0; + COUNTER = 1; + GAUGE = 2; + HISTOGRAM = 3; + GAUGE_HISTOGRAM = 4; + SUMMARY = 5; + INFO = 6; + STATESET = 7; +} diff --git a/src/query/api/v1/handler/prometheus/remote/write.go b/src/query/api/v1/handler/prometheus/remote/write.go index d1ecebc7d4..930583ea9a 100644 --- a/src/query/api/v1/handler/prometheus/remote/write.go +++ b/src/query/api/v1/handler/prometheus/remote/write.go @@ -590,15 +590,41 @@ func newPromTSIter(timeseries []prompb.TimeSeries, tagOpts models.TagOptions) (* type promTSIter struct { idx int + err error attributes []ts.SeriesAttributes tags []models.Tags datapoints []ts.Datapoints metadatas []ts.Metadata + annotation []byte } func (i *promTSIter) Next() bool { + if i.err != nil { + return false + } + i.idx++ - return i.idx < len(i.tags) + if i.idx >= len(i.tags) { + return false + } + + annotationPayload, err := storage.SeriesAttributesToAnnotationPayload(i.attributes[i.idx]) + if err != nil { + i.err = err + return false + } + + i.annotation, err = annotationPayload.Marshal() + if err != nil { + i.err = err + return false + } + + if len(i.annotation) == 0 { + i.annotation = nil + } + + return true } func (i *promTSIter) Current() ingest.IterValue { @@ -611,6 +637,7 @@ func (i *promTSIter) Current() ingest.IterValue { Datapoints: i.datapoints[i.idx], Attributes: i.attributes[i.idx], Unit: xtime.Millisecond, + Annotation: i.annotation, } if i.idx < len(i.metadatas) { value.Metadata = i.metadatas[i.idx] @@ -620,11 +647,14 @@ func (i *promTSIter) Current() ingest.IterValue { func (i *promTSIter) Reset() error { i.idx = -1 + i.err = nil + i.annotation = nil + return nil } func (i *promTSIter) Error() error { - return nil + return i.err } func (i *promTSIter) SetCurrentMetadata(metadata ts.Metadata) { diff --git a/src/query/api/v1/handler/prometheus/remote/write_test.go b/src/query/api/v1/handler/prometheus/remote/write_test.go index b5c6218e5d..2936ba216b 100644 --- a/src/query/api/v1/handler/prometheus/remote/write_test.go +++ b/src/query/api/v1/handler/prometheus/remote/write_test.go @@ -22,6 +22,7 @@ package remote import ( "bytes" + "context" "errors" "fmt" "io/ioutil" @@ -33,18 +34,22 @@ import ( "github.com/m3db/m3/src/cmd/services/m3coordinator/ingest" "github.com/m3db/m3/src/cmd/services/m3query/config" + "github.com/m3db/m3/src/dbnode/generated/proto/annotation" "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/handleroptions" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote/test" "github.com/m3db/m3/src/query/api/v1/options" + "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage/m3/storagemetadata" xclock "github.com/m3db/m3/src/x/clock" xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/headers" "github.com/m3db/m3/src/x/instrument" + xtest "github.com/m3db/m3/src/x/test" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" ) @@ -62,7 +67,7 @@ func makeOptions(ds ingest.DownsamplerAndWriter) options.HandlerOptions { } func TestPromWriteParsing(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) @@ -81,7 +86,7 @@ func TestPromWriteParsing(t *testing.T) { } func TestPromWrite(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) @@ -104,7 +109,7 @@ func TestPromWrite(t *testing.T) { } func TestPromWriteError(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() multiErr := xerrors.NewMultiError().Add(errors.New("an error")) @@ -135,7 +140,7 @@ func TestPromWriteError(t *testing.T) { } func TestWriteErrorMetricCount(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) @@ -159,7 +164,7 @@ func TestWriteErrorMetricCount(t *testing.T) { } func TestWriteDatapointDelayMetric(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) @@ -217,7 +222,7 @@ func TestWriteDatapointDelayMetric(t *testing.T) { } func TestPromWriteUnaggregatedMetricsWithHeader(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() expectedIngestWriteOptions := ingest.WriteOptions{ @@ -249,7 +254,7 @@ func TestPromWriteUnaggregatedMetricsWithHeader(t *testing.T) { } func TestPromWriteAggregatedMetricsWithHeader(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() expectedIngestWriteOptions := ingest.WriteOptions{ @@ -284,8 +289,116 @@ func TestPromWriteAggregatedMetricsWithHeader(t *testing.T) { require.Equal(t, http.StatusOK, resp.StatusCode) } +func TestPromWriteMetricsTypes(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + var capturedIter ingest.DownsampleAndWriteIter + mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) + mockDownsamplerAndWriter. + EXPECT(). + WriteBatch(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(_ context.Context, iter ingest.DownsampleAndWriteIter, _ ingest.WriteOptions) ingest.BatchError { + capturedIter = iter + return nil + }) + + opts := makeOptions(mockDownsamplerAndWriter) + handler, err := NewPromWriteHandler(opts) + require.NoError(t, err) + + promReq := &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + {Type: prompb.MetricType_UNKNOWN}, + {Type: prompb.MetricType_COUNTER}, + {Type: prompb.MetricType_GAUGE}, + {Type: prompb.MetricType_GAUGE}, + {Type: prompb.MetricType_SUMMARY}, + {Type: prompb.MetricType_HISTOGRAM}, + {Type: prompb.MetricType_GAUGE_HISTOGRAM}, + {Type: prompb.MetricType_INFO}, + {Type: prompb.MetricType_STATESET}, + }, + } + + promReqBody := test.GeneratePromWriteRequestBody(t, promReq) + req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, promReqBody) + + writer := httptest.NewRecorder() + handler.ServeHTTP(writer, req) + resp := writer.Result() + require.Equal(t, http.StatusOK, resp.StatusCode) + + firstValue := verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_UNKNOWN, false) + secondValue := verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_COUNTER, true) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_GAUGE, false) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_GAUGE, false) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_SUMMARY, true) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_HISTOGRAM, true) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_GAUGE_HISTOGRAM, false) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_INFO, false) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_STATESET, false) + + require.False(t, capturedIter.Next()) + require.NoError(t, capturedIter.Error()) + + assert.Nil(t, firstValue.Annotation, "first annotation invalidation") + + secondAnnotationPayload := unmarshalAnnotation(t, secondValue.Annotation) + assert.Equal(t, annotation.Payload{ + MetricType: annotation.MetricType_COUNTER, + HandleValueResets: true, + }, secondAnnotationPayload, "second annotation invalidated") +} + +func TestPromWriteGraphiteMetricsTypes(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + var capturedIter ingest.DownsampleAndWriteIter + mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) + mockDownsamplerAndWriter. + EXPECT(). + WriteBatch(gomock.Any(), gomock.Any(), gomock.Any()). + Do(func(_ context.Context, iter ingest.DownsampleAndWriteIter, _ ingest.WriteOptions) ingest.BatchError { + capturedIter = iter + return nil + }) + + opts := makeOptions(mockDownsamplerAndWriter) + handler, err := NewPromWriteHandler(opts) + require.NoError(t, err) + + promReq := &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + {Source: prompb.Source_GRAPHITE, M3Type: prompb.M3Type_M3_TIMER}, + {Source: prompb.Source_GRAPHITE, M3Type: prompb.M3Type_M3_COUNTER}, + {Source: prompb.Source_GRAPHITE, M3Type: prompb.M3Type_M3_GAUGE}, + {Source: prompb.Source_GRAPHITE, M3Type: prompb.M3Type_M3_GAUGE}, + {Source: prompb.Source_GRAPHITE, M3Type: prompb.M3Type_M3_TIMER}, + }, + } + + promReqBody := test.GeneratePromWriteRequestBody(t, promReq) + req := httptest.NewRequest(PromWriteHTTPMethod, PromWriteURL, promReqBody) + + writer := httptest.NewRecorder() + handler.ServeHTTP(writer, req) + resp := writer.Result() + require.Equal(t, http.StatusOK, resp.StatusCode) + + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_UNKNOWN, false) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_COUNTER, false) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_GAUGE, false) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_GAUGE, false) + verifyIterValueAnnotation(t, capturedIter, annotation.MetricType_UNKNOWN, false) + + require.False(t, capturedIter.Next()) + require.NoError(t, capturedIter.Error()) +} + func BenchmarkWriteDatapoints(b *testing.B) { - ctrl := gomock.NewController(b) + ctrl := xtest.NewController(b) defer ctrl.Finish() mockDownsamplerAndWriter := ingest.NewMockDownsamplerAndWriter(ctrl) @@ -308,3 +421,24 @@ func BenchmarkWriteDatapoints(b *testing.B) { handler.ServeHTTP(httptest.NewRecorder(), req) } } + +func verifyIterValueAnnotation( + t *testing.T, + iter ingest.DownsampleAndWriteIter, + expectedMetricType annotation.MetricType, + expectedHandleValueResets bool, +) ingest.IterValue { + require.True(t, iter.Next()) + value := iter.Current() + + expectedPayload := annotation.Payload{MetricType: expectedMetricType, HandleValueResets: expectedHandleValueResets} + assert.Equal(t, expectedPayload, unmarshalAnnotation(t, value.Annotation)) + + return value +} + +func unmarshalAnnotation(t *testing.T, annot []byte) annotation.Payload { + payload := annotation.Payload{} + require.NoError(t, payload.Unmarshal(annot)) + return payload +} diff --git a/src/query/generated/proto/prompb/types.pb.go b/src/query/generated/proto/prompb/types.pb.go index 542008f7dc..afb3144d71 100644 --- a/src/query/generated/proto/prompb/types.pb.go +++ b/src/query/generated/proto/prompb/types.pb.go @@ -37,29 +37,68 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf -type Type int32 +type MetricType int32 const ( - Type_GAUGE Type = 0 - Type_COUNTER Type = 1 - Type_TIMER Type = 2 + MetricType_UNKNOWN MetricType = 0 + MetricType_COUNTER MetricType = 1 + MetricType_GAUGE MetricType = 2 + MetricType_HISTOGRAM MetricType = 3 + MetricType_GAUGE_HISTOGRAM MetricType = 4 + MetricType_SUMMARY MetricType = 5 + MetricType_INFO MetricType = 6 + MetricType_STATESET MetricType = 7 ) -var Type_name = map[int32]string{ - 0: "GAUGE", +var MetricType_name = map[int32]string{ + 0: "UNKNOWN", 1: "COUNTER", - 2: "TIMER", + 2: "GAUGE", + 3: "HISTOGRAM", + 4: "GAUGE_HISTOGRAM", + 5: "SUMMARY", + 6: "INFO", + 7: "STATESET", } -var Type_value = map[string]int32{ - "GAUGE": 0, - "COUNTER": 1, - "TIMER": 2, +var MetricType_value = map[string]int32{ + "UNKNOWN": 0, + "COUNTER": 1, + "GAUGE": 2, + "HISTOGRAM": 3, + "GAUGE_HISTOGRAM": 4, + "SUMMARY": 5, + "INFO": 6, + "STATESET": 7, } -func (x Type) String() string { - return proto.EnumName(Type_name, int32(x)) +func (x MetricType) String() string { + return proto.EnumName(MetricType_name, int32(x)) } -func (Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorTypes, []int{0} } +func (MetricType) EnumDescriptor() ([]byte, []int) { return fileDescriptorTypes, []int{0} } + +type M3Type int32 + +const ( + M3Type_M3_GAUGE M3Type = 0 + M3Type_M3_COUNTER M3Type = 1 + M3Type_M3_TIMER M3Type = 2 +) + +var M3Type_name = map[int32]string{ + 0: "M3_GAUGE", + 1: "M3_COUNTER", + 2: "M3_TIMER", +} +var M3Type_value = map[string]int32{ + "M3_GAUGE": 0, + "M3_COUNTER": 1, + "M3_TIMER": 2, +} + +func (x M3Type) String() string { + return proto.EnumName(M3Type_name, int32(x)) +} +func (M3Type) EnumDescriptor() ([]byte, []int) { return fileDescriptorTypes, []int{1} } type Source int32 @@ -80,7 +119,7 @@ var Source_value = map[string]int32{ func (x Source) String() string { return proto.EnumName(Source_name, int32(x)) } -func (Source) EnumDescriptor() ([]byte, []int) { return fileDescriptorTypes, []int{1} } +func (Source) EnumDescriptor() ([]byte, []int) { return fileDescriptorTypes, []int{2} } type LabelMatcher_Type int32 @@ -134,11 +173,14 @@ func (m *Sample) GetTimestamp() int64 { } type TimeSeries struct { - Labels []Label `protobuf:"bytes,1,rep,name=labels" json:"labels"` - Samples []Sample `protobuf:"bytes,2,rep,name=samples" json:"samples"` + Labels []Label `protobuf:"bytes,1,rep,name=labels" json:"labels"` + Samples []Sample `protobuf:"bytes,2,rep,name=samples" json:"samples"` + Type MetricType `protobuf:"varint,3,opt,name=type,proto3,enum=m3prometheus.MetricType" json:"type,omitempty"` + Unit string `protobuf:"bytes,4,opt,name=unit,proto3" json:"unit,omitempty"` + Help string `protobuf:"bytes,5,opt,name=help,proto3" json:"help,omitempty"` // NB: These are custom fields that M3 uses. They start at 101 so that they // should never clash with prometheus fields. - Type Type `protobuf:"varint,101,opt,name=type,proto3,enum=m3prometheus.Type" json:"type,omitempty"` + M3Type M3Type `protobuf:"varint,101,opt,name=m3_type,json=m3Type,proto3,enum=m3prometheus.M3Type" json:"m3_type,omitempty"` Source Source `protobuf:"varint,102,opt,name=source,proto3,enum=m3prometheus.Source" json:"source,omitempty"` } @@ -161,11 +203,32 @@ func (m *TimeSeries) GetSamples() []Sample { return nil } -func (m *TimeSeries) GetType() Type { +func (m *TimeSeries) GetType() MetricType { if m != nil { return m.Type } - return Type_GAUGE + return MetricType_UNKNOWN +} + +func (m *TimeSeries) GetUnit() string { + if m != nil { + return m.Unit + } + return "" +} + +func (m *TimeSeries) GetHelp() string { + if m != nil { + return m.Help + } + return "" +} + +func (m *TimeSeries) GetM3Type() M3Type { + if m != nil { + return m.M3Type + } + return M3Type_M3_GAUGE } func (m *TimeSeries) GetSource() Source { @@ -254,7 +317,8 @@ func init() { proto.RegisterType((*Label)(nil), "m3prometheus.Label") proto.RegisterType((*Labels)(nil), "m3prometheus.Labels") proto.RegisterType((*LabelMatcher)(nil), "m3prometheus.LabelMatcher") - proto.RegisterEnum("m3prometheus.Type", Type_name, Type_value) + proto.RegisterEnum("m3prometheus.MetricType", MetricType_name, MetricType_value) + proto.RegisterEnum("m3prometheus.M3Type", M3Type_name, M3Type_value) proto.RegisterEnum("m3prometheus.Source", Source_name, Source_value) proto.RegisterEnum("m3prometheus.LabelMatcher_Type", LabelMatcher_Type_name, LabelMatcher_Type_value) } @@ -327,11 +391,28 @@ func (m *TimeSeries) MarshalTo(dAtA []byte) (int, error) { } } if m.Type != 0 { + dAtA[i] = 0x18 + i++ + i = encodeVarintTypes(dAtA, i, uint64(m.Type)) + } + if len(m.Unit) > 0 { + dAtA[i] = 0x22 + i++ + i = encodeVarintTypes(dAtA, i, uint64(len(m.Unit))) + i += copy(dAtA[i:], m.Unit) + } + if len(m.Help) > 0 { + dAtA[i] = 0x2a + i++ + i = encodeVarintTypes(dAtA, i, uint64(len(m.Help))) + i += copy(dAtA[i:], m.Help) + } + if m.M3Type != 0 { dAtA[i] = 0xa8 i++ dAtA[i] = 0x6 i++ - i = encodeVarintTypes(dAtA, i, uint64(m.Type)) + i = encodeVarintTypes(dAtA, i, uint64(m.M3Type)) } if m.Source != 0 { dAtA[i] = 0xb0 @@ -475,7 +556,18 @@ func (m *TimeSeries) Size() (n int) { } } if m.Type != 0 { - n += 2 + sovTypes(uint64(m.Type)) + n += 1 + sovTypes(uint64(m.Type)) + } + l = len(m.Unit) + if l > 0 { + n += 1 + l + sovTypes(uint64(l)) + } + l = len(m.Help) + if l > 0 { + n += 1 + l + sovTypes(uint64(l)) + } + if m.M3Type != 0 { + n += 2 + sovTypes(uint64(m.M3Type)) } if m.Source != 0 { n += 2 + sovTypes(uint64(m.Source)) @@ -710,7 +802,7 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex - case 101: + case 3: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) } @@ -724,7 +816,84 @@ func (m *TimeSeries) Unmarshal(dAtA []byte) error { } b := dAtA[iNdEx] iNdEx++ - m.Type |= (Type(b) & 0x7F) << shift + m.Type |= (MetricType(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Unit", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Unit = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Help", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthTypes + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Help = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 101: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field M3Type", wireType) + } + m.M3Type = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTypes + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.M3Type |= (M3Type(b) & 0x7F) << shift if b < 0x80 { break } @@ -1203,35 +1372,42 @@ func init() { } var fileDescriptorTypes = []byte{ - // 467 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x4f, 0x6b, 0xdb, 0x4e, - 0x10, 0xd5, 0x4a, 0xb6, 0xfc, 0xcb, 0xc4, 0x84, 0x65, 0x7f, 0x39, 0x88, 0x52, 0x1c, 0xa3, 0x43, - 0x50, 0x43, 0x6b, 0x91, 0xa8, 0xb7, 0x16, 0x4a, 0x52, 0x16, 0x27, 0x10, 0xe7, 0xcf, 0x5a, 0xbe, - 0xf4, 0x26, 0x39, 0x13, 0xdb, 0xe0, 0x8d, 0x54, 0xad, 0x54, 0xf0, 0xb7, 0xe8, 0xad, 0x5f, 0x29, - 0xd0, 0x4b, 0x3f, 0x41, 0x29, 0xee, 0x17, 0x29, 0xda, 0x55, 0x88, 0x53, 0x72, 0xe9, 0x45, 0xec, - 0xbc, 0x79, 0x6f, 0xf4, 0xe6, 0x31, 0xf0, 0x61, 0xb6, 0x28, 0xe7, 0x55, 0x3a, 0x98, 0x66, 0x32, - 0x94, 0xd1, 0x4d, 0x1a, 0xca, 0x28, 0x54, 0xc5, 0x34, 0xfc, 0x5c, 0x61, 0xb1, 0x0a, 0x67, 0x78, - 0x87, 0x45, 0x52, 0xe2, 0x4d, 0x98, 0x17, 0x59, 0x99, 0xd5, 0x5f, 0x99, 0xa7, 0x61, 0xb9, 0xca, - 0x51, 0x0d, 0x34, 0xc4, 0xba, 0x32, 0xaa, 0x51, 0x2c, 0xe7, 0x58, 0xa9, 0x17, 0x6f, 0x36, 0xc6, - 0xcd, 0xb2, 0x59, 0x66, 0x74, 0x69, 0x75, 0xab, 0x2b, 0x33, 0xa4, 0x7e, 0x19, 0xb1, 0xff, 0x1e, - 0xdc, 0x71, 0x22, 0xf3, 0x25, 0xb2, 0x5d, 0x68, 0x7f, 0x49, 0x96, 0x15, 0x7a, 0xa4, 0x4f, 0x02, - 0x22, 0x4c, 0xc1, 0x5e, 0xc2, 0x56, 0xb9, 0x90, 0xa8, 0xca, 0x44, 0xe6, 0x9e, 0xdd, 0x27, 0x81, - 0x23, 0x1e, 0x01, 0xff, 0x3b, 0x01, 0x88, 0x17, 0x12, 0xc7, 0x58, 0x2c, 0x50, 0xb1, 0x43, 0x70, - 0x97, 0x49, 0x8a, 0x4b, 0xe5, 0x91, 0xbe, 0x13, 0x6c, 0x1f, 0xfd, 0x3f, 0xd8, 0xb4, 0x36, 0x38, - 0xaf, 0x7b, 0x27, 0xad, 0xfb, 0x9f, 0x7b, 0x96, 0x68, 0x88, 0xec, 0x2d, 0x74, 0x94, 0xfe, 0xbf, - 0xf2, 0x6c, 0xad, 0xd9, 0x7d, 0xaa, 0x31, 0xe6, 0x1a, 0xd1, 0x03, 0x95, 0xed, 0x43, 0xab, 0x4e, - 0xc0, 0xc3, 0x3e, 0x09, 0x76, 0x8e, 0xd8, 0x53, 0x49, 0xbc, 0xca, 0x51, 0xe8, 0x3e, 0x7b, 0x0d, - 0xae, 0xca, 0xaa, 0x62, 0x8a, 0xde, 0xad, 0x66, 0xfe, 0x3d, 0x5c, 0xf7, 0x44, 0xc3, 0xf1, 0x0f, - 0xa1, 0xad, 0x2d, 0x32, 0x06, 0xad, 0xbb, 0x44, 0x9a, 0x24, 0xba, 0x42, 0xbf, 0x1f, 0xe3, 0xb1, - 0x35, 0x68, 0x0a, 0xff, 0x1d, 0xb8, 0xe7, 0x66, 0x91, 0x7f, 0xdf, 0xdd, 0xff, 0x46, 0xa0, 0xab, - 0xf1, 0x51, 0x52, 0x4e, 0xe7, 0x58, 0xb0, 0xa8, 0x59, 0x8b, 0x68, 0xb3, 0x7b, 0xcf, 0x4c, 0x68, - 0x98, 0x9b, 0x3b, 0x3e, 0x98, 0xb5, 0x9f, 0x33, 0xeb, 0x6c, 0x9a, 0x0d, 0xa0, 0x55, 0xeb, 0x98, - 0x0b, 0x36, 0xbf, 0xa6, 0x16, 0xeb, 0x80, 0x73, 0xc1, 0xaf, 0x29, 0xa9, 0x01, 0xc1, 0xa9, 0xad, - 0x01, 0xc1, 0xa9, 0x73, 0xf0, 0xaa, 0x61, 0x6e, 0x41, 0x7b, 0x78, 0x3c, 0x19, 0x72, 0x6a, 0xb1, - 0x6d, 0xe8, 0x7c, 0xbc, 0x9c, 0x5c, 0xc4, 0x5c, 0x50, 0x52, 0xe3, 0xf1, 0xd9, 0x88, 0x0b, 0x6a, - 0x1f, 0xec, 0x83, 0x6b, 0x62, 0x64, 0x3b, 0x00, 0x57, 0xe2, 0x72, 0xc4, 0xe3, 0x53, 0x3e, 0x19, - 0x53, 0x8b, 0x75, 0xe1, 0xbf, 0xa1, 0x38, 0xbe, 0x3a, 0x3d, 0x8b, 0x39, 0x25, 0x27, 0xde, 0xfd, - 0xba, 0x47, 0x7e, 0xac, 0x7b, 0xe4, 0xd7, 0xba, 0x47, 0xbe, 0xfe, 0xee, 0x59, 0x9f, 0x5c, 0x73, - 0xcb, 0xa9, 0xab, 0x2f, 0x31, 0xfa, 0x13, 0x00, 0x00, 0xff, 0xff, 0xe2, 0xad, 0x94, 0xb6, 0x09, - 0x03, 0x00, 0x00, + // 590 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xcb, 0x6e, 0xd3, 0x40, + 0x14, 0xcd, 0xd8, 0x8e, 0xd3, 0xdc, 0x86, 0x32, 0x9a, 0x76, 0x61, 0x21, 0x94, 0x46, 0x59, 0xa0, + 0xa8, 0x6a, 0x63, 0xb5, 0xee, 0x0e, 0x24, 0x94, 0x22, 0x93, 0x46, 0xd4, 0x4e, 0x3b, 0x76, 0x84, + 0x60, 0x13, 0xd9, 0xe9, 0x34, 0xb1, 0x94, 0xa9, 0x8d, 0x1f, 0x48, 0xe5, 0x2b, 0xd8, 0xf1, 0x0b, + 0x7c, 0x4a, 0x97, 0x7c, 0x01, 0x42, 0xe5, 0x47, 0xd0, 0xcc, 0x04, 0xa5, 0x45, 0xdd, 0xb0, 0x49, + 0xe6, 0x9e, 0x7b, 0xce, 0xbd, 0xc7, 0x67, 0x34, 0xf0, 0x7a, 0x9e, 0x94, 0x8b, 0x2a, 0xee, 0xcf, + 0x52, 0x6e, 0x73, 0xe7, 0x32, 0xb6, 0xb9, 0x63, 0x17, 0xf9, 0xcc, 0xfe, 0x54, 0xb1, 0xfc, 0xc6, + 0x9e, 0xb3, 0x6b, 0x96, 0x47, 0x25, 0xbb, 0xb4, 0xb3, 0x3c, 0x2d, 0x53, 0xf1, 0xcb, 0xb3, 0xd8, + 0x2e, 0x6f, 0x32, 0x56, 0xf4, 0x25, 0x44, 0x5a, 0xdc, 0x11, 0x28, 0x2b, 0x17, 0xac, 0x2a, 0x9e, + 0x1d, 0xdc, 0x1b, 0x37, 0x4f, 0xe7, 0xa9, 0xd2, 0xc5, 0xd5, 0x95, 0xac, 0xd4, 0x10, 0x71, 0x52, + 0xe2, 0xee, 0x2b, 0x30, 0x83, 0x88, 0x67, 0x4b, 0x46, 0x76, 0xa0, 0xfe, 0x39, 0x5a, 0x56, 0xcc, + 0x42, 0x1d, 0xd4, 0x43, 0x54, 0x15, 0xe4, 0x39, 0x34, 0xcb, 0x84, 0xb3, 0xa2, 0x8c, 0x78, 0x66, + 0x69, 0x1d, 0xd4, 0xd3, 0xe9, 0x1a, 0xe8, 0x7e, 0xd7, 0x00, 0xc2, 0x84, 0xb3, 0x80, 0xe5, 0x09, + 0x2b, 0xc8, 0x21, 0x98, 0xcb, 0x28, 0x66, 0xcb, 0xc2, 0x42, 0x1d, 0xbd, 0xb7, 0x79, 0xb4, 0xdd, + 0xbf, 0x6f, 0xad, 0x7f, 0x26, 0x7a, 0x27, 0xc6, 0xed, 0xcf, 0xdd, 0x1a, 0x5d, 0x11, 0xc9, 0x31, + 0x34, 0x0a, 0xb9, 0xbf, 0xb0, 0x34, 0xa9, 0xd9, 0x79, 0xa8, 0x51, 0xe6, 0x56, 0xa2, 0xbf, 0x54, + 0xb2, 0x0f, 0x86, 0x48, 0xc0, 0xd2, 0x3b, 0xa8, 0xb7, 0x75, 0x64, 0x3d, 0x94, 0x78, 0xac, 0xcc, + 0x93, 0x59, 0x78, 0x93, 0x31, 0x2a, 0x59, 0x84, 0x80, 0x51, 0x5d, 0x27, 0xa5, 0x65, 0x74, 0x50, + 0xaf, 0x49, 0xe5, 0x59, 0x60, 0x0b, 0xb6, 0xcc, 0xac, 0xba, 0xc2, 0xc4, 0x99, 0x1c, 0x40, 0x83, + 0x3b, 0x53, 0x39, 0x98, 0xc9, 0xc1, 0xff, 0x78, 0xf1, 0x1c, 0x39, 0xd4, 0xe4, 0xf2, 0x9f, 0xec, + 0x83, 0x59, 0xa4, 0x55, 0x3e, 0x63, 0xd6, 0xd5, 0x63, 0xec, 0x40, 0xf6, 0xe8, 0x8a, 0xd3, 0x3d, + 0x84, 0xba, 0xfc, 0x7e, 0xb1, 0xf9, 0x3a, 0xe2, 0x2a, 0xe6, 0x16, 0x95, 0xe7, 0x75, 0xf6, 0x9a, + 0x04, 0x55, 0xd1, 0x7d, 0x09, 0xe6, 0x99, 0x4a, 0xe9, 0xff, 0x83, 0xed, 0x7e, 0x43, 0xd0, 0x92, + 0xb8, 0x17, 0x95, 0xb3, 0x05, 0xcb, 0x89, 0xb3, 0xca, 0x0c, 0x49, 0xb3, 0xbb, 0x8f, 0x4c, 0x58, + 0x31, 0xfb, 0x0f, 0xa3, 0x93, 0x66, 0xb5, 0xc7, 0xcc, 0xea, 0xf7, 0xcd, 0xf6, 0xc0, 0x90, 0xa9, + 0x98, 0xa0, 0xb9, 0x17, 0xb8, 0x46, 0x1a, 0xa0, 0xfb, 0xee, 0x05, 0x46, 0x02, 0xa0, 0x2e, 0xd6, + 0x24, 0x40, 0x5d, 0xac, 0xef, 0x7d, 0x01, 0x58, 0x5f, 0x11, 0xd9, 0x84, 0xc6, 0xc4, 0x7f, 0xe7, + 0x8f, 0xdf, 0xfb, 0xb8, 0x26, 0x8a, 0x37, 0xe3, 0x89, 0x1f, 0xba, 0x14, 0x23, 0xd2, 0x84, 0xfa, + 0x70, 0x30, 0x19, 0x0a, 0xed, 0x13, 0x68, 0x9e, 0x8e, 0x82, 0x70, 0x3c, 0xa4, 0x03, 0x0f, 0xeb, + 0x64, 0x1b, 0x9e, 0xca, 0xce, 0x74, 0x0d, 0x1a, 0x42, 0x1b, 0x4c, 0x3c, 0x6f, 0x40, 0x3f, 0xe0, + 0x3a, 0xd9, 0x00, 0x63, 0xe4, 0xbf, 0x1d, 0x63, 0x93, 0xb4, 0x60, 0x23, 0x08, 0x07, 0xa1, 0x1b, + 0xb8, 0x21, 0x6e, 0xec, 0x1d, 0x83, 0xa9, 0x6e, 0x51, 0xe0, 0x9e, 0x33, 0x55, 0x0b, 0x6a, 0x64, + 0x0b, 0xc0, 0x73, 0xa6, 0xeb, 0xdd, 0xaa, 0x1b, 0x8e, 0x3c, 0x97, 0x62, 0x6d, 0xef, 0x05, 0x98, + 0xea, 0x36, 0x05, 0xef, 0x9c, 0x8e, 0x3d, 0x37, 0x3c, 0x75, 0x27, 0x01, 0xae, 0x09, 0xde, 0x90, + 0x0e, 0xce, 0x4f, 0x47, 0xa1, 0x8b, 0xd1, 0x89, 0x75, 0x7b, 0xd7, 0x46, 0x3f, 0xee, 0xda, 0xe8, + 0xd7, 0x5d, 0x1b, 0x7d, 0xfd, 0xdd, 0xae, 0x7d, 0x34, 0xd5, 0x7b, 0x8d, 0x4d, 0xf9, 0xda, 0x9c, + 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0xcf, 0x9f, 0x2c, 0x6d, 0xed, 0x03, 0x00, 0x00, } diff --git a/src/query/generated/proto/prompb/types.proto b/src/query/generated/proto/prompb/types.proto index 68cdf32d7e..20dfaa572e 100644 --- a/src/query/generated/proto/prompb/types.proto +++ b/src/query/generated/proto/prompb/types.proto @@ -14,10 +14,14 @@ message Sample { message TimeSeries { repeated Label labels = 1 [(gogoproto.nullable) = false]; repeated Sample samples = 2 [(gogoproto.nullable) = false]; + MetricType type = 3; + string unit = 4; + string help = 5; + // NB: These are custom fields that M3 uses. They start at 101 so that they // should never clash with prometheus fields. - Type type = 101; - Source source = 102; + M3Type m3_type = 101; + Source source = 102; } message Label { @@ -37,18 +41,29 @@ message LabelMatcher { RE = 2; NRE = 3; } - Type type = 1; + Type type = 1; bytes name = 2; bytes value = 3; } -enum Type { - GAUGE = 0; - COUNTER = 1; - TIMER = 2; +enum MetricType { + UNKNOWN = 0; + COUNTER = 1; + GAUGE = 2; + HISTOGRAM = 3; + GAUGE_HISTOGRAM = 4; + SUMMARY = 5; + INFO = 6; + STATESET = 7; +} + +enum M3Type { + M3_GAUGE = 0; + M3_COUNTER = 1; + M3_TIMER = 2; } enum Source { PROMETHEUS = 0; GRAPHITE = 1; -} \ No newline at end of file +} diff --git a/src/query/storage/converter.go b/src/query/storage/converter.go index 5c20f89521..9cd4ecbfcd 100644 --- a/src/query/storage/converter.go +++ b/src/query/storage/converter.go @@ -26,6 +26,7 @@ import ( "sort" "time" + "github.com/m3db/m3/src/dbnode/generated/proto/annotation" "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/ts" @@ -33,11 +34,14 @@ import ( "github.com/prometheus/common/model" ) -// The default name for the name and bucket tags in Prometheus metrics. -// This can be overwritten by setting tagOptions in the config. var ( + // The default name for the name and bucket tags in Prometheus metrics. + // This can be overwritten by setting tagOptions in the config. promDefaultName = []byte(model.MetricNameLabel) // __name__ promDefaultBucketName = []byte(model.BucketLabel) // le + + // The suffix of count metric name in Prometheus histogram/summary metric families. + promDefaultCountSuffix = []byte("_count") ) // PromLabelsToM3Tags converts Prometheus labels to M3 tags @@ -70,9 +74,12 @@ func PromLabelsToM3Tags( // timeseries. func PromTimeSeriesToSeriesAttributes(series prompb.TimeSeries) (ts.SeriesAttributes, error) { var ( - sourceType ts.SourceType - metricType ts.MetricType + sourceType ts.SourceType + m3MetricType ts.M3MetricType + promMetricType ts.PromMetricType + handleValueResets bool ) + switch series.Source { case prompb.Source_PROMETHEUS: sourceType = ts.SourceTypePrometheus @@ -81,19 +88,104 @@ func PromTimeSeriesToSeriesAttributes(series prompb.TimeSeries) (ts.SeriesAttrib default: return ts.SeriesAttributes{}, fmt.Errorf("invalid source type %v", series.Source) } + switch series.Type { - case prompb.Type_COUNTER: - metricType = ts.MetricTypeCounter - case prompb.Type_GAUGE: - metricType = ts.MetricTypeGauge - case prompb.Type_TIMER: - metricType = ts.MetricTypeTimer + + case prompb.MetricType_UNKNOWN: + promMetricType = ts.PromMetricTypeUnknown + + case prompb.MetricType_COUNTER: + promMetricType = ts.PromMetricTypeCounter + handleValueResets = true + + case prompb.MetricType_GAUGE: + promMetricType = ts.PromMetricTypeGauge + + case prompb.MetricType_HISTOGRAM: + promMetricType = ts.PromMetricTypeHistogram + handleValueResets = true + + case prompb.MetricType_GAUGE_HISTOGRAM: + promMetricType = ts.PromMetricTypeGaugeHistogram + name := metricNameFromLabels(series.Labels) + handleValueResets = bytes.HasSuffix(name, promDefaultCountSuffix) + + case prompb.MetricType_SUMMARY: + promMetricType = ts.PromMetricTypeSummary + handleValueResets = true + + case prompb.MetricType_INFO: + promMetricType = ts.PromMetricTypeInfo + + case prompb.MetricType_STATESET: + promMetricType = ts.PromMetricTypeStateSet + + default: + return ts.SeriesAttributes{}, fmt.Errorf("invalid Prometheus metric type %v", series.Type) + } + + switch series.M3Type { + case prompb.M3Type_M3_COUNTER: + m3MetricType = ts.M3MetricTypeCounter + if promMetricType == ts.PromMetricTypeUnknown && series.Source == prompb.Source_GRAPHITE { + promMetricType = ts.PromMetricTypeCounter + } + case prompb.M3Type_M3_GAUGE: + m3MetricType = ts.M3MetricTypeGauge + if promMetricType == ts.PromMetricTypeUnknown && series.Source == prompb.Source_GRAPHITE { + promMetricType = ts.PromMetricTypeGauge + } + case prompb.M3Type_M3_TIMER: + m3MetricType = ts.M3MetricTypeTimer default: - return ts.SeriesAttributes{}, fmt.Errorf("invalid metric type %v", series.Type) + return ts.SeriesAttributes{}, fmt.Errorf("invalid M3 metric type %v", series.M3Type) } + return ts.SeriesAttributes{ - Type: metricType, - Source: sourceType, + M3Type: m3MetricType, + PromType: promMetricType, + Source: sourceType, + HandleValueResets: handleValueResets, + }, nil +} + +// SeriesAttributesToAnnotationPayload converts ts.SeriesAttributes into an annotation.Payload. +func SeriesAttributesToAnnotationPayload(seriesAttributes ts.SeriesAttributes) (annotation.Payload, error) { + var metricType annotation.MetricType + + switch seriesAttributes.PromType { + + case ts.PromMetricTypeUnknown: + metricType = annotation.MetricType_UNKNOWN + + case ts.PromMetricTypeCounter: + metricType = annotation.MetricType_COUNTER + + case ts.PromMetricTypeGauge: + metricType = annotation.MetricType_GAUGE + + case ts.PromMetricTypeHistogram: + metricType = annotation.MetricType_HISTOGRAM + + case ts.PromMetricTypeGaugeHistogram: + metricType = annotation.MetricType_GAUGE_HISTOGRAM + + case ts.PromMetricTypeSummary: + metricType = annotation.MetricType_SUMMARY + + case ts.PromMetricTypeInfo: + metricType = annotation.MetricType_INFO + + case ts.PromMetricTypeStateSet: + metricType = annotation.MetricType_STATESET + + default: + return annotation.Payload{}, fmt.Errorf("invalid Prometheus metric type %v", seriesAttributes.PromType) + } + + return annotation.Payload{ + MetricType: metricType, + HandleValueResets: seriesAttributes.HandleValueResets, }, nil } @@ -267,3 +359,12 @@ func SeriesToPromSamples(series *ts.Series) []prompb.Sample { return samples } + +func metricNameFromLabels(labels []prompb.Label) []byte { + for _, label := range labels { + if bytes.Equal(promDefaultName, label.GetName()) { + return label.GetValue() + } + } + return nil +} diff --git a/src/query/storage/converter_test.go b/src/query/storage/converter_test.go index b684dd23ef..ca808ab4f5 100644 --- a/src/query/storage/converter_test.go +++ b/src/query/storage/converter_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/m3db/m3/src/dbnode/generated/proto/annotation" "github.com/m3db/m3/src/query/generated/proto/prompb" "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/ts" @@ -275,3 +276,128 @@ func BenchmarkFetchResultToPromResult(b *testing.B) { benchResult = FetchResultToPromResult(fr, false) } } + +func TestPromTimeSeriesToSeriesAttributesSource(t *testing.T) { + attrs, err := PromTimeSeriesToSeriesAttributes(prompb.TimeSeries{}) + require.NoError(t, err) + assert.Equal(t, ts.SourceTypePrometheus, attrs.Source) + + attrs, err = PromTimeSeriesToSeriesAttributes(prompb.TimeSeries{Source: prompb.Source_PROMETHEUS}) + require.NoError(t, err) + assert.Equal(t, ts.SourceTypePrometheus, attrs.Source) + + attrs, err = PromTimeSeriesToSeriesAttributes(prompb.TimeSeries{Source: prompb.Source_GRAPHITE}) + require.NoError(t, err) + assert.Equal(t, ts.SourceTypeGraphite, attrs.Source) + + attrs, err = PromTimeSeriesToSeriesAttributes(prompb.TimeSeries{Source: -1}) + require.Error(t, err) +} + +func TestPromTimeSeriesToSeriesAttributesPromMetricsType(t *testing.T) { + type prompbMetricTypeWithNameSuffix struct { + metricType prompb.MetricType + nameSuffix string + } + + type promMetricTypeWithBool struct { + metricType ts.PromMetricType + handleValueResets bool + } + + mapping := map[prompbMetricTypeWithNameSuffix]promMetricTypeWithBool{ + {metricType: prompb.MetricType_UNKNOWN}: {metricType: ts.PromMetricTypeUnknown}, + {metricType: prompb.MetricType_COUNTER}: {metricType: ts.PromMetricTypeCounter, handleValueResets: true}, + {metricType: prompb.MetricType_GAUGE}: {metricType: ts.PromMetricTypeGauge}, + {metricType: prompb.MetricType_INFO}: {metricType: ts.PromMetricTypeInfo}, + {metricType: prompb.MetricType_STATESET}: {metricType: ts.PromMetricTypeStateSet}, + + {prompb.MetricType_HISTOGRAM, "bucket"}: {metricType: ts.PromMetricTypeHistogram, handleValueResets: true}, + {prompb.MetricType_HISTOGRAM, "count"}: {metricType: ts.PromMetricTypeHistogram, handleValueResets: true}, + {prompb.MetricType_HISTOGRAM, "sum"}: {metricType: ts.PromMetricTypeHistogram, handleValueResets: true}, + + {prompb.MetricType_GAUGE_HISTOGRAM, "bucket"}: {metricType: ts.PromMetricTypeGaugeHistogram}, + {prompb.MetricType_GAUGE_HISTOGRAM, "count"}: {metricType: ts.PromMetricTypeGaugeHistogram, handleValueResets: true}, + {prompb.MetricType_GAUGE_HISTOGRAM, "sum"}: {metricType: ts.PromMetricTypeGaugeHistogram}, + + {metricType: prompb.MetricType_SUMMARY}: {metricType: ts.PromMetricTypeSummary, handleValueResets: true}, + {prompb.MetricType_SUMMARY, "count"}: {metricType: ts.PromMetricTypeSummary, handleValueResets: true}, + {prompb.MetricType_SUMMARY, "sum"}: {metricType: ts.PromMetricTypeSummary, handleValueResets: true}, + } + + for proto, expected := range mapping { + var labels []prompb.Label + if proto.nameSuffix != "" { + labels = append(labels, prompb.Label{Name: promDefaultName, Value: []byte("foo_" + proto.nameSuffix)}) + } + attrs, err := PromTimeSeriesToSeriesAttributes(prompb.TimeSeries{Type: proto.metricType, Labels: labels}) + require.NoError(t, err) + assert.Equal(t, expected.metricType, attrs.PromType) + assert.Equal(t, expected.handleValueResets, attrs.HandleValueResets) + } + + _, err := PromTimeSeriesToSeriesAttributes(prompb.TimeSeries{Type: -1}) + require.Error(t, err) +} + +func TestPromTimeSeriesToSeriesAttributesM3Type(t *testing.T) { + mapping := map[prompb.M3Type]ts.M3MetricType{ + prompb.M3Type_M3_GAUGE: ts.M3MetricTypeGauge, + prompb.M3Type_M3_COUNTER: ts.M3MetricTypeCounter, + prompb.M3Type_M3_TIMER: ts.M3MetricTypeTimer, + } + + for proto, expected := range mapping { + attrs, err := PromTimeSeriesToSeriesAttributes(prompb.TimeSeries{M3Type: proto}) + require.NoError(t, err) + assert.Equal(t, expected, attrs.M3Type) + } + + _, err := PromTimeSeriesToSeriesAttributes(prompb.TimeSeries{M3Type: -1}) + require.Error(t, err) +} + +func TestPromTimeSeriesToSeriesAttributesPromMetricsTypeFromGraphite(t *testing.T) { + mapping := map[prompb.M3Type]ts.PromMetricType{ + prompb.M3Type_M3_GAUGE: ts.PromMetricTypeGauge, + prompb.M3Type_M3_COUNTER: ts.PromMetricTypeCounter, + prompb.M3Type_M3_TIMER: ts.PromMetricTypeUnknown, + } + + for proto, expected := range mapping { + attrs, err := PromTimeSeriesToSeriesAttributes(prompb.TimeSeries{M3Type: proto, Source: prompb.Source_GRAPHITE}) + require.NoError(t, err) + assert.Equal(t, expected, attrs.PromType) + assert.False(t, attrs.HandleValueResets) + } +} + +func TestSeriesAttributesToAnnotationPayload(t *testing.T) { + mapping := map[ts.PromMetricType]annotation.MetricType{ + ts.PromMetricTypeUnknown: annotation.MetricType_UNKNOWN, + ts.PromMetricTypeCounter: annotation.MetricType_COUNTER, + ts.PromMetricTypeGauge: annotation.MetricType_GAUGE, + ts.PromMetricTypeHistogram: annotation.MetricType_HISTOGRAM, + ts.PromMetricTypeGaugeHistogram: annotation.MetricType_GAUGE_HISTOGRAM, + ts.PromMetricTypeSummary: annotation.MetricType_SUMMARY, + ts.PromMetricTypeInfo: annotation.MetricType_INFO, + ts.PromMetricTypeStateSet: annotation.MetricType_STATESET, + } + + for promType, expected := range mapping { + payload, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{PromType: promType}) + require.NoError(t, err) + assert.Equal(t, expected, payload.MetricType) + } + + _, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{PromType: math.MaxUint8}) + require.Error(t, err) + + payload, err := SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{HandleValueResets: true}) + require.NoError(t, err) + assert.True(t, payload.HandleValueResets) + + payload, err = SeriesAttributesToAnnotationPayload(ts.SeriesAttributes{HandleValueResets: false}) + require.NoError(t, err) + assert.False(t, payload.HandleValueResets) +} diff --git a/src/query/ts/metadata.go b/src/query/ts/metadata.go index ab451c3195..09b2335ac8 100644 --- a/src/query/ts/metadata.go +++ b/src/query/ts/metadata.go @@ -20,22 +20,53 @@ package ts -// MetricType is the enum for metric types. -type MetricType int +// M3MetricType is the enum for M3 metric types. +// NB: the current use case for this is Graphite metrics. Also see PromMetricType (below). +// In future, it is worth considering a merge of these two enumerations. +type M3MetricType uint8 const ( - // MetricTypeGauge is the gauge metric type. - MetricTypeGauge MetricType = iota + // M3MetricTypeGauge is the gauge metric type. + M3MetricTypeGauge M3MetricType = iota - // MetricTypeCounter is the counter metric type. - MetricTypeCounter + // M3MetricTypeCounter is the counter metric type. + M3MetricTypeCounter - // MetricTypeTimer is the timer metric type. - MetricTypeTimer + // M3MetricTypeTimer is the timer metric type. + M3MetricTypeTimer +) + +// PromMetricType is the enum for Prometheus metric types. +type PromMetricType uint8 + +const ( + // PromMetricTypeUnknown is the unknown Prometheus metric type. + PromMetricTypeUnknown PromMetricType = iota + + // PromMetricTypeCounter is the counter Prometheus metric type. + PromMetricTypeCounter + + // PromMetricTypeGauge is the gauge Prometheus metric type. + PromMetricTypeGauge + + // PromMetricTypeHistogram is the histogram Prometheus metric type. + PromMetricTypeHistogram + + // PromMetricTypeGaugeHistogram is the gauge histogram Prometheus metric type. + PromMetricTypeGaugeHistogram + + // PromMetricTypeSummary is the summary Prometheus metric type. + PromMetricTypeSummary + + // PromMetricTypeInfo is the info Prometheus metric type. + PromMetricTypeInfo + + // PromMetricTypeStateSet is the state set Prometheus metric type. + PromMetricTypeStateSet ) // SourceType is the enum for metric source types. -type SourceType int +type SourceType uint8 const ( // SourceTypePrometheus is the prometheus source type. @@ -47,16 +78,15 @@ const ( // SeriesAttributes has attributes about the time series. type SeriesAttributes struct { - Type MetricType - Source SourceType + M3Type M3MetricType + PromType PromMetricType + Source SourceType + HandleValueResets bool } // DefaultSeriesAttributes returns a default series attributes. func DefaultSeriesAttributes() SeriesAttributes { - return SeriesAttributes{ - Type: MetricTypeGauge, - Source: SourceTypePrometheus, - } + return SeriesAttributes{} } // Metadata is metadata associated with a time series.