Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] keep metric type during the aggregation #2941

Merged
merged 27 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e260f56
[aggregator] keep metric type during the aggregation
gediminasgu Nov 23, 2020
560fdec
metric's type integration test
gediminasgu Nov 24, 2020
abcb9f6
Merge branch 'master' into gg/aggregator-keep-metric-type
gediminasgu Nov 24, 2020
0e7a2f0
better integration of metric type into a aggregation workflow
gediminasgu Nov 27, 2020
3419d12
newline
gediminasgu Nov 27, 2020
fdf8c5f
remote/write header converted into a map
gediminasgu Nov 27, 2020
e0d673f
build fix
gediminasgu Nov 27, 2020
d055c8f
test fix
gediminasgu Nov 30, 2020
5ec2d44
test fix
gediminasgu Nov 30, 2020
a0f702a
more tests
gediminasgu Nov 30, 2020
0bf7d20
fix code generation
gediminasgu Nov 30, 2020
30a7407
Merge remote-tracking branch 'origin/master' into gg/aggregator-keep-…
gediminasgu Nov 30, 2020
d7b695f
lint
gediminasgu Dec 2, 2020
3678ba6
lint
gediminasgu Dec 2, 2020
861c217
lint
gediminasgu Dec 2, 2020
de52701
some refactoring
gediminasgu Dec 2, 2020
78abe4c
test fix
gediminasgu Dec 2, 2020
ba439f6
config fix
gediminasgu Dec 2, 2020
f183803
refactoring
gediminasgu Dec 2, 2020
70972be
test fix
gediminasgu Dec 3, 2020
79004ca
Merge branch 'master' into gg/aggregator-keep-metric-type
gediminasgu Dec 3, 2020
1069a86
minor refactoring
gediminasgu Dec 9, 2020
7cbe252
minor changes
gediminasgu Dec 9, 2020
00c00b3
Merge branch 'master' into gg/aggregator-keep-metric-type
gediminasgu Dec 9, 2020
35698d2
lint
gediminasgu Dec 9, 2020
2460baf
Merge branch 'gg/aggregator-keep-metric-type' of github.com:m3db/m3 i…
gediminasgu Dec 9, 2020
a73fad5
lint
gediminasgu Dec 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/m3coordinator-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/m3coordinator-standard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/aggregator/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@ ingest:
retry:
maxBackoff: 10s
jitter: true

storeMetricsType: true
gediminasgu marked this conversation as resolved.
Show resolved Hide resolved
52 changes: 52 additions & 0 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ function prometheus_remote_write {
local label1_value=${label1_value:-label1}
local label2_name=${label2_name:-label2}
local label2_value=${label2_value:-label2}
local metric_type=${metric_type:counter}

network_name="aggregator"
network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ' | tail -n 1)
out=$((docker run -it --rm --network $network \
$PROMREMOTECLI_IMAGE \
-u http://m3coordinator01:7202/api/v1/prom/remote/write \
-h M3-Prom-Type:${metric_type} \
-t __name__:${metric_name} \
-t ${label0_name}:${label0_value} \
-t ${label1_name}:${label1_value} \
Expand Down Expand Up @@ -217,6 +219,22 @@ function prometheus_query_native {
return $?
}

function dbnode_fetch {
local namespace=${namespace}
local id=${id}
local rangeStart=${rangeStart}
local rangeEnd=${rangeEnd}
local jq_path=${jq_path:-}
local expected_value=${expected_value:-}

result=$(curl -s \
"0.0.0.0:9002/fetch" \
"-d" \
"{\"namespace\": \"${namespace}\", \"id\": \"${id}\", \"rangeStart\": ${rangeStart}, \"rangeEnd\": ${rangeEnd}}" | jq -r "${jq_path}")
test "$result" = "$expected_value"
return $?
}

function test_aggregated_rollup_rule {
resolution_seconds="10"
now=$(date +"%s")
Expand All @@ -234,6 +252,7 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/bar" \
metric_type="counter" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand All @@ -251,6 +270,7 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/baz" \
metric_type="gauge" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand Down Expand Up @@ -284,6 +304,38 @@ function test_aggregated_rollup_rule {
retry_with_backoff prometheus_query_native
}

function test_metric_type_survives_aggregation {
now=$(date +"%s")

echo "Test metric type should be kept after aggregation"

# Emit values for endpoint /foo/bar (to ensure right values aggregated)
write_at="$now_truncated"
value="42"

metric_type="counter" \
prometheus_remote_write \
metric_type_test $now $value \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

start=$(( $now - 3600 ))
end=$(( $now + 3600 ))
jq_path=".datapoints[0].annotation"

echo "Test query metric type"

# Test by metric types are stored in aggregated namespace
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
namespace="agg" \
id='{__name__=\"metric_type_test\",label0=\"label0\",label1=\"label1\",label2=\"label2\"}' \
rangeStart=${start} \
rangeEnd=${end} \
jq_path="$jq_path" expected_value="CAE=" \
retry_with_backoff dbnode_fetch
}

echo "Run tests"
test_aggregated_graphite_metric
test_aggregated_rollup_rule
test_metric_type_survives_aggregation
2 changes: 2 additions & 0 deletions src/cmd/services/m3coordinator/downsample/flush_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ var (
// GraphiteIDSchemeTagValue specifies that the graphite ID
// scheme should be used for a metric.
GraphiteIDSchemeTagValue = []byte("graphite")
// PromTypeTagValue specifies metric type
PromTypeTagValue = []byte("__promt__")
)

var (
Expand Down
8 changes: 8 additions & 0 deletions src/cmd/services/m3coordinator/ingest/m3msg/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ type ingestOp struct {
sp policy.StoragePolicy
callback m3msg.Callbackable
tags models.Tags
promType ts.PromMetricType
datapoints ts.Datapoints
q storage.WriteQuery
}
Expand Down Expand Up @@ -218,6 +219,7 @@ func (op *ingestOp) resetWriteQuery() error {
Tags: op.tags,
Datapoints: op.datapoints,
Unit: convert.UnitForM3DB(op.sp.Resolution().Precision),
Type: op.promType,
Attributes: storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: op.sp.Resolution().Window,
Expand Down Expand Up @@ -249,6 +251,12 @@ func (op *ingestOp) resetTags() error {
// or if passing for the second time
continue
}
if bytes.Equal(name, downsample.PromTypeTagValue) {
if len(value) == 1 {
op.promType = ts.PromMetricType(value[0])
}
continue
}

op.tags = op.tags.AddTagWithoutNormalizing(models.Tag{
Name: name,
Expand Down
7 changes: 7 additions & 0 deletions src/cmd/services/m3coordinator/ingest/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,13 @@ func (d *downsamplerAndWriter) writeAggregatedBatch(
appender.AddTag(downsample.MetricsOptionIDSchemeTagName,
downsample.GraphiteIDSchemeTagValue)
}
// If prom type is set then send it to aggregator too to do not loose it.
// Currently it's passed as a tag and later filtered out when series will back to
// the coordinator.
// Also, see note above.
if value.Attributes.PromType != ts.PromMetricTypeUnknown {
appender.AddTag(downsample.PromTypeTagValue, []byte{byte(value.Attributes.PromType)})
gediminasgu marked this conversation as resolved.
Show resolved Hide resolved
}

opts := downsample.SampleAppenderOptions{
MetricType: value.Attributes.M3Type,
Expand Down
25 changes: 25 additions & 0 deletions src/query/api/v1/handler/prometheus/remote/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,31 @@ func (h *PromWriteHandler) parseRequest(
}
}

if promType := r.Header.Get(headers.PromTypeHeader); promType != "" {
gediminasgu marked this conversation as resolved.
Show resolved Hide resolved
var tp prompb.MetricType
switch strings.ToLower(promType) {
case "counter":
tp = prompb.MetricType_COUNTER
case "gauge":
tp = prompb.MetricType_GAUGE
case "gauge-histogram":
tp = prompb.MetricType_GAUGE_HISTOGRAM
case "histogram":
tp = prompb.MetricType_HISTOGRAM
case "info":
tp = prompb.MetricType_INFO
case "stateset":
tp = prompb.MetricType_STATESET
case "summary":
tp = prompb.MetricType_SUMMARY
gediminasgu marked this conversation as resolved.
Show resolved Hide resolved
default:
return parseRequestResult{}, fmt.Errorf("unknown prom metric type %s", promType)
}
for i := range req.Timeseries {
req.Timeseries[i].Type = tp
}
}

return parseRequestResult{
Request: &req,
Options: opts,
Expand Down
37 changes: 21 additions & 16 deletions src/query/storage/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,42 +151,47 @@ func PromTimeSeriesToSeriesAttributes(series prompb.TimeSeries) (ts.SeriesAttrib

// SeriesAttributesToAnnotationPayload converts ts.SeriesAttributes into an annotation.Payload.
func SeriesAttributesToAnnotationPayload(seriesAttributes ts.SeriesAttributes) (annotation.Payload, error) {
var metricType annotation.MetricType
metricType, err := PromMetricTypeToAnnotationPayloadType(seriesAttributes.PromType)
if err != nil {
return annotation.Payload{}, err
}

switch seriesAttributes.PromType {
return annotation.Payload{
MetricType: metricType,
HandleValueResets: seriesAttributes.HandleValueResets,
}, nil
}

func PromMetricTypeToAnnotationPayloadType(t ts.PromMetricType) (annotation.MetricType, error) {
switch t {

case ts.PromMetricTypeUnknown:
metricType = annotation.MetricType_UNKNOWN
return annotation.MetricType_UNKNOWN, nil

case ts.PromMetricTypeCounter:
metricType = annotation.MetricType_COUNTER
return annotation.MetricType_COUNTER, nil

case ts.PromMetricTypeGauge:
metricType = annotation.MetricType_GAUGE
return annotation.MetricType_GAUGE, nil

case ts.PromMetricTypeHistogram:
metricType = annotation.MetricType_HISTOGRAM
return annotation.MetricType_HISTOGRAM, nil

case ts.PromMetricTypeGaugeHistogram:
metricType = annotation.MetricType_GAUGE_HISTOGRAM
return annotation.MetricType_GAUGE_HISTOGRAM, nil

case ts.PromMetricTypeSummary:
metricType = annotation.MetricType_SUMMARY
return annotation.MetricType_SUMMARY, nil

case ts.PromMetricTypeInfo:
metricType = annotation.MetricType_INFO
return annotation.MetricType_INFO, nil

case ts.PromMetricTypeStateSet:
metricType = annotation.MetricType_STATESET
return annotation.MetricType_STATESET, nil

default:
return annotation.Payload{}, fmt.Errorf("invalid Prometheus metric type %v", seriesAttributes.PromType)
return annotation.MetricType_UNKNOWN, fmt.Errorf("invalid Prometheus metric type %v", t)
}

return annotation.Payload{
MetricType: metricType,
HandleValueResets: seriesAttributes.HandleValueResets,
}, nil
}

// PromSamplesToM3Datapoints converts Prometheus samples to M3 datapoints
Expand Down
22 changes: 21 additions & 1 deletion src/query/storage/m3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/dbnode/generated/proto/annotation"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/query/block"
"github.com/m3db/m3/src/query/errors"
Expand Down Expand Up @@ -719,6 +720,25 @@ func (s *m3storage) writeSingle(

namespaceID := namespace.NamespaceID()
session := namespace.Session()

annot := query.Annotation()
if query.Options().Type != ts.PromMetricTypeUnknown && (annot == nil || len(annot) == 0) {
gediminasgu marked this conversation as resolved.
Show resolved Hide resolved
tp, err := storage.PromMetricTypeToAnnotationPayloadType(query.Options().Type)
if err != nil {
return err
}

annotationPayload := annotation.Payload{MetricType: tp}
annot, err = annotationPayload.Marshal()
if err != nil {
return err
}

if len(annot) == 0 {
annot = nil
}
}

return session.WriteTagged(namespaceID, identID, iterator,
datapoint.Timestamp, datapoint.Value, query.Unit(), query.Annotation())
datapoint.Timestamp, datapoint.Value, query.Unit(), annot)
}
1 change: 1 addition & 0 deletions src/query/storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type WriteQueryOptions struct {
Datapoints ts.Datapoints
Unit xtime.Unit
Annotation []byte
Type ts.PromMetricType
gediminasgu marked this conversation as resolved.
Show resolved Hide resolved
Attributes storagemetadata.Attributes
}

Expand Down
4 changes: 4 additions & 0 deletions src/x/headers/headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ const (
// Valid values are "unaggregated" or "aggregated".
MetricsTypeHeader = M3HeaderPrefix + "Metrics-Type"

// PromTypeHeader sets the prometheus metric type. Valid values are
// "counter", "gauge", etc.
gediminasgu marked this conversation as resolved.
Show resolved Hide resolved
PromTypeHeader = M3HeaderPrefix + "Prom-Type"

// WriteTypeHeader is a header that controls if default
// writes should be written to both unaggregated and aggregated
// namespaces, or if unaggregated values are skipped and
Expand Down