Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] keep metric type during the aggregation #2941

Merged
merged 27 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e260f56
[aggregator] keep metric type during the aggregation
gediminasgu Nov 23, 2020
560fdec
metric's type integration test
gediminasgu Nov 24, 2020
abcb9f6
Merge branch 'master' into gg/aggregator-keep-metric-type
gediminasgu Nov 24, 2020
0e7a2f0
better integration of metric type into a aggregation workflow
gediminasgu Nov 27, 2020
3419d12
newline
gediminasgu Nov 27, 2020
fdf8c5f
remote/write header converted into a map
gediminasgu Nov 27, 2020
e0d673f
build fix
gediminasgu Nov 27, 2020
d055c8f
test fix
gediminasgu Nov 30, 2020
5ec2d44
test fix
gediminasgu Nov 30, 2020
a0f702a
more tests
gediminasgu Nov 30, 2020
0bf7d20
fix code generation
gediminasgu Nov 30, 2020
30a7407
Merge remote-tracking branch 'origin/master' into gg/aggregator-keep-…
gediminasgu Nov 30, 2020
d7b695f
lint
gediminasgu Dec 2, 2020
3678ba6
lint
gediminasgu Dec 2, 2020
861c217
lint
gediminasgu Dec 2, 2020
de52701
some refactoring
gediminasgu Dec 2, 2020
78abe4c
test fix
gediminasgu Dec 2, 2020
ba439f6
config fix
gediminasgu Dec 2, 2020
f183803
refactoring
gediminasgu Dec 2, 2020
70972be
test fix
gediminasgu Dec 3, 2020
79004ca
Merge branch 'master' into gg/aggregator-keep-metric-type
gediminasgu Dec 3, 2020
1069a86
minor refactoring
gediminasgu Dec 9, 2020
7cbe252
minor changes
gediminasgu Dec 9, 2020
00c00b3
Merge branch 'master' into gg/aggregator-keep-metric-type
gediminasgu Dec 9, 2020
35698d2
lint
gediminasgu Dec 9, 2020
2460baf
Merge branch 'gg/aggregator-keep-metric-type' of github.com:m3db/m3 i…
gediminasgu Dec 9, 2020
a73fad5
lint
gediminasgu Dec 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/m3coordinator-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/m3coordinator-standard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/aggregator/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@ ingest:
retry:
maxBackoff: 10s
jitter: true

storeMetricsType: true
52 changes: 52 additions & 0 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ function prometheus_remote_write {
local label1_value=${label1_value:-label1}
local label2_name=${label2_name:-label2}
local label2_value=${label2_value:-label2}
local metric_type=${metric_type:counter}

network_name="aggregator"
network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ' | tail -n 1)
out=$((docker run -it --rm --network $network \
$PROMREMOTECLI_IMAGE \
-u http://m3coordinator01:7202/api/v1/prom/remote/write \
-h M3-Prom-Type:${metric_type} \
-t __name__:${metric_name} \
-t ${label0_name}:${label0_value} \
-t ${label1_name}:${label1_value} \
Expand Down Expand Up @@ -217,6 +219,22 @@ function prometheus_query_native {
return $?
}

function dbnode_fetch {
local namespace=${namespace}
local id=${id}
local rangeStart=${rangeStart}
local rangeEnd=${rangeEnd}
local jq_path=${jq_path:-}
local expected_value=${expected_value:-}

result=$(curl -s \
"0.0.0.0:9002/fetch" \
"-d" \
"{\"namespace\": \"${namespace}\", \"id\": \"${id}\", \"rangeStart\": ${rangeStart}, \"rangeEnd\": ${rangeEnd}}" | jq -r "${jq_path}")
test "$result" = "$expected_value"
return $?
}

function test_aggregated_rollup_rule {
resolution_seconds="10"
now=$(date +"%s")
Expand All @@ -234,6 +252,7 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/bar" \
metric_type="counter" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand All @@ -251,6 +270,7 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/baz" \
metric_type="gauge" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand Down Expand Up @@ -284,6 +304,38 @@ function test_aggregated_rollup_rule {
retry_with_backoff prometheus_query_native
}

function test_metric_type_survives_aggregation {
now=$(date +"%s")

echo "Test metric type should be kept after aggregation"

# Emit values for endpoint /foo/bar (to ensure right values aggregated)
write_at="$now_truncated"
value="42"

metric_type="counter" \
prometheus_remote_write \
metric_type_test $now $value \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

start=$(( $now - 3600 ))
end=$(( $now + 3600 ))
jq_path=".datapoints[0].annotation"

echo "Test query metric type"

# Test by metric types are stored in aggregated namespace
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
namespace="agg" \
id='{__name__=\"metric_type_test\",label0=\"label0\",label1=\"label1\",label2=\"label2\"}' \
rangeStart=${start} \
rangeEnd=${end} \
jq_path="$jq_path" expected_value="CAE=" \
retry_with_backoff dbnode_fetch
}

echo "Run tests"
test_aggregated_graphite_metric
test_aggregated_rollup_rule
test_metric_type_survives_aggregation
1 change: 1 addition & 0 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (agg *aggregator) AddPassthrough(
ChunkedID: id.ChunkedID{
Data: []byte(metric.ID),
},
Type: metric.Type,
TimeNanos: metric.TimeNanos,
Value: metric.Value,
},
Expand Down
5 changes: 3 additions & 2 deletions src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/aggregator/aggregator/elem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,7 @@ func testFlushLocalMetricFn() (
return func(
idPrefix []byte,
id id.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
2 changes: 2 additions & 0 deletions src/aggregator/aggregator/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package aggregator
import (
"time"

"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
)
Expand Down Expand Up @@ -83,6 +84,7 @@ const (
type flushLocalMetricFn func(
idPrefix []byte,
id id.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
5 changes: 3 additions & 2 deletions src/aggregator/aggregator/gauge_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/aggregator/aggregator/generic_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,9 +537,9 @@ func (e *GenericElem) processValueWithAggregationLock(
for _, point := range toFlush {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp)
flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType),
flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType, e.TypeStringFor(e.aggTypesOpts, aggType),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it a Gauge? Is the type not available here? If so, some comment might be useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a base code for the code generation. By default, we set metric-type to Gauge (as most of them are) and just for Counters we replace it to Counter: https://github.com/m3db/m3/blob/gg/aggregator-keep-metric-type/src/aggregator/generated-source-files.mk#L16

point.TimeNanos, point.Value, e.sp)
}
}
Expand Down
1 change: 1 addition & 0 deletions src/aggregator/aggregator/handler/writer/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (w *protobufWriter) prepare(mp aggregated.ChunkedMetricWithStoragePolicy) (
w.m.ID = append(w.m.ID, mp.Suffix...)
w.m.Metric.TimeNanos = mp.TimeNanos
w.m.Metric.Value = mp.Value
w.m.Metric.Type = mp.Type
w.m.StoragePolicy = mp.StoragePolicy
shard := w.shardFn(w.m.ID, w.numShards)
return w.m, shard
Expand Down
4 changes: 4 additions & 0 deletions src/aggregator/aggregator/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/m3db/m3/src/aggregator/aggregator/handler"
"github.com/m3db/m3/src/aggregator/aggregator/handler/writer"
"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/aggregated"
metricid "github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
Expand Down Expand Up @@ -434,6 +435,7 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) {
func (l *baseMetricList) consumeLocalMetric(
idPrefix []byte,
id metricid.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand All @@ -447,6 +449,7 @@ func (l *baseMetricList) consumeLocalMetric(
chunkedMetricWithPolicy := aggregated.ChunkedMetricWithStoragePolicy{
ChunkedMetric: aggregated.ChunkedMetric{
ChunkedID: chunkedID,
Type: metricType,
TimeNanos: timeNanos,
Value: value,
},
Expand All @@ -463,6 +466,7 @@ func (l *baseMetricList) consumeLocalMetric(
func (l *baseMetricList) discardLocalMetric(
idPrefix []byte,
id metricid.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
5 changes: 3 additions & 2 deletions src/aggregator/aggregator/timer_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/cmd/services/m3coordinator/ingest/m3msg/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func NewIngester(
func (i *Ingester) Ingest(
ctx context.Context,
id []byte,
metricType ts.PromMetricType,
metricNanos, encodeNanos int64,
value float64,
sp policy.StoragePolicy,
Expand All @@ -131,6 +132,7 @@ func (i *Ingester) Ingest(
op := i.p.Get().(*ingestOp)
op.c = ctx
op.id = id
op.metricType = metricType
op.metricNanos = metricNanos
op.value = value
op.sp = sp
Expand All @@ -152,6 +154,7 @@ type ingestOp struct {

c context.Context
id []byte
metricType ts.PromMetricType
metricNanos int64
value float64
sp policy.StoragePolicy
Expand Down Expand Up @@ -216,6 +219,7 @@ func (op *ingestOp) resetWriteQuery() error {
op.resetDataPoints()
return op.q.Reset(storage.WriteQueryOptions{
Tags: op.tags,
Type: op.metricType,
Datapoints: op.datapoints,
Unit: convert.UnitForM3DB(op.sp.Resolution().Precision),
Attributes: storagemetadata.Attributes{
Expand Down
6 changes: 3 additions & 3 deletions src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestIngest(t *testing.T) {
callback := m3msg.NewProtobufCallback(m, protobuf.NewAggregatedDecoder(nil), &wg)

m.EXPECT().Ack()
ingester.Ingest(context.TODO(), id, metricNanos, 0, val, sp, callback)
ingester.Ingest(context.TODO(), id, ts.PromMetricTypeGauge, metricNanos, 0, val, sp, callback)

for appender.cnt() != 1 {
time.Sleep(100 * time.Millisecond)
Expand All @@ -93,7 +93,7 @@ func TestIngest(t *testing.T) {
},
Tags: models.NewTags(2, nil).AddTags(
[]models.Tag{
models.Tag{
{
Name: []byte("__name__"),
Value: []byte("foo"),
},
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestIngestNonRetryableError(t *testing.T) {
callback := m3msg.NewProtobufCallback(m, protobuf.NewAggregatedDecoder(nil), &wg)

m.EXPECT().Ack()
ingester.Ingest(context.TODO(), id, metricNanos, 0, val, sp, callback)
ingester.Ingest(context.TODO(), id, ts.PromMetricTypeGauge, metricNanos, 0, val, sp, callback)

for appender.cntErr() != 1 {
time.Sleep(100 * time.Millisecond)
Expand Down
24 changes: 17 additions & 7 deletions src/cmd/services/m3coordinator/ingest/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,14 +495,24 @@ func (d *downsamplerAndWriter) writeAggregatedBatch(
}

for _, dp := range value.Datapoints {
switch value.Attributes.M3Type {
case ts.M3MetricTypeGauge:
err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value)
case ts.M3MetricTypeCounter:
err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value))
case ts.M3MetricTypeTimer:
err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value)
if value.Attributes.PromType != ts.PromMetricTypeUnknown {
switch value.Attributes.PromType {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prom does not have timer types?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case ts.PromMetricTypeCounter:
err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value))
default:
gediminasgu marked this conversation as resolved.
Show resolved Hide resolved
err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value)
}
} else {
switch value.Attributes.M3Type {
case ts.M3MetricTypeGauge:
err = result.SamplesAppender.AppendGaugeTimedSample(dp.Timestamp, dp.Value)
case ts.M3MetricTypeCounter:
err = result.SamplesAppender.AppendCounterTimedSample(dp.Timestamp, int64(dp.Value))
case ts.M3MetricTypeTimer:
err = result.SamplesAppender.AppendTimerTimedSample(dp.Timestamp, dp.Value)
gediminasgu marked this conversation as resolved.
Show resolved Hide resolved
}
}

if err != nil {
// If we see an error break out so we can try processing the
// next datapoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (h *pbHandler) Process(msg consumer.Message) {
}
}

h.writeFn(h.ctx, dec.ID(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r)
h.writeFn(h.ctx, dec.ID(), dec.Type(), dec.TimeNanos(), dec.EncodeNanos(), dec.Value(), sp, r)
}

func (h *pbHandler) Close() { h.wg.Wait() }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/m3db/m3/src/msg/consumer"
"github.com/m3db/m3/src/msg/generated/proto/msgpb"
"github.com/m3db/m3/src/msg/protocol/proto"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/server"
xtime "github.com/m3db/m3/src/x/time"
Expand Down Expand Up @@ -233,6 +234,7 @@ type mockWriter struct {
func (m *mockWriter) write(
ctx context.Context,
name []byte,
metricType ts.PromMetricType,
metricNanos, encodeNanos int64,
value float64,
sp policy.StoragePolicy,
Expand Down
2 changes: 2 additions & 0 deletions src/cmd/services/m3coordinator/server/m3msg/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ import (
"context"

"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/query/ts"
)

// WriteFn is the function that writes a metric.
type WriteFn func(
ctx context.Context,
id []byte,
metricType ts.PromMetricType,
metricNanos, encodeNanos int64,
value float64,
sp policy.StoragePolicy,
Expand Down
2 changes: 1 addition & 1 deletion src/metrics/aggregation/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (a Type) IsValidForGauge() bool {
// IsValidForCounter if an Type is valid for Counter.
func (a Type) IsValidForCounter() bool {
switch a {
case Min, Max, Mean, Count, Sum, SumSq, Stdev:
case Min, Max, Mean, Count, Sum, SumSq, Stdev, Last:
return true
default:
return false
Expand Down
Loading