Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] keep metric type during the aggregation #2941

Merged
merged 27 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e260f56
[aggregator] keep metric type during the aggregation
gediminasgu Nov 23, 2020
560fdec
metric's type integration test
gediminasgu Nov 24, 2020
abcb9f6
Merge branch 'master' into gg/aggregator-keep-metric-type
gediminasgu Nov 24, 2020
0e7a2f0
better integration of metric type into a aggregation workflow
gediminasgu Nov 27, 2020
3419d12
newline
gediminasgu Nov 27, 2020
fdf8c5f
remote/write header converted into a map
gediminasgu Nov 27, 2020
e0d673f
build fix
gediminasgu Nov 27, 2020
d055c8f
test fix
gediminasgu Nov 30, 2020
5ec2d44
test fix
gediminasgu Nov 30, 2020
a0f702a
more tests
gediminasgu Nov 30, 2020
0bf7d20
fix code generation
gediminasgu Nov 30, 2020
30a7407
Merge remote-tracking branch 'origin/master' into gg/aggregator-keep-…
gediminasgu Nov 30, 2020
d7b695f
lint
gediminasgu Dec 2, 2020
3678ba6
lint
gediminasgu Dec 2, 2020
861c217
lint
gediminasgu Dec 2, 2020
de52701
some refactoring
gediminasgu Dec 2, 2020
78abe4c
test fix
gediminasgu Dec 2, 2020
ba439f6
config fix
gediminasgu Dec 2, 2020
f183803
refactoring
gediminasgu Dec 2, 2020
70972be
test fix
gediminasgu Dec 3, 2020
79004ca
Merge branch 'master' into gg/aggregator-keep-metric-type
gediminasgu Dec 3, 2020
1069a86
minor refactoring
gediminasgu Dec 9, 2020
7cbe252
minor changes
gediminasgu Dec 9, 2020
00c00b3
Merge branch 'master' into gg/aggregator-keep-metric-type
gediminasgu Dec 9, 2020
35698d2
lint
gediminasgu Dec 9, 2020
2460baf
Merge branch 'gg/aggregator-keep-metric-type' of github.com:m3db/m3 i…
gediminasgu Dec 9, 2020
a73fad5
lint
gediminasgu Dec 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/m3coordinator-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,5 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 2 additions & 0 deletions scripts/development/m3_stack/m3coordinator-standard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,5 @@ carbon:

tagOptions:
idScheme: quoted

storeMetricsType: true
2 changes: 2 additions & 0 deletions scripts/docker-integration-tests/aggregator/m3coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,5 @@ ingest:
retry:
maxBackoff: 10s
jitter: true

storeMetricsType: true
52 changes: 52 additions & 0 deletions scripts/docker-integration-tests/aggregator/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -170,12 +170,14 @@ function prometheus_remote_write {
local label1_value=${label1_value:-label1}
local label2_name=${label2_name:-label2}
local label2_value=${label2_value:-label2}
local metric_type=${metric_type:counter}

network_name="aggregator"
network=$(docker network ls | fgrep $network_name | tr -s ' ' | cut -f 1 -d ' ' | tail -n 1)
out=$((docker run -it --rm --network $network \
$PROMREMOTECLI_IMAGE \
-u http://m3coordinator01:7202/api/v1/prom/remote/write \
-h M3-Prom-Type:${metric_type} \
-t __name__:${metric_name} \
-t ${label0_name}:${label0_value} \
-t ${label1_name}:${label1_value} \
Expand Down Expand Up @@ -217,6 +219,22 @@ function prometheus_query_native {
return $?
}

function dbnode_fetch {
local namespace=${namespace}
local id=${id}
local rangeStart=${rangeStart}
local rangeEnd=${rangeEnd}
local jq_path=${jq_path:-}
local expected_value=${expected_value:-}

result=$(curl -s \
"0.0.0.0:9002/fetch" \
"-d" \
"{\"namespace\": \"${namespace}\", \"id\": \"${id}\", \"rangeStart\": ${rangeStart}, \"rangeEnd\": ${rangeEnd}}" | jq -r "${jq_path}")
test "$result" = "$expected_value"
return $?
}

function test_aggregated_rollup_rule {
resolution_seconds="10"
now=$(date +"%s")
Expand All @@ -234,6 +252,7 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/bar" \
metric_type="counter" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand All @@ -251,6 +270,7 @@ function test_aggregated_rollup_rule {
label0_name="app" label0_value="nginx_edge" \
label1_name="status_code" label1_value="500" \
label2_name="endpoint" label2_value="/foo/baz" \
metric_type="gauge" \
prometheus_remote_write \
http_requests $write_at $value \
true "Expected request to succeed" \
Expand Down Expand Up @@ -284,6 +304,38 @@ function test_aggregated_rollup_rule {
retry_with_backoff prometheus_query_native
}

function test_metric_type_survives_aggregation {
now=$(date +"%s")

echo "Test metric type should be kept after aggregation"

# Emit values for endpoint /foo/bar (to ensure right values aggregated)
write_at="$now_truncated"
value="42"

metric_type="counter" \
prometheus_remote_write \
metric_type_test $now $value \
true "Expected request to succeed" \
200 "Expected request to return status code 200"

start=$(( $now - 3600 ))
end=$(( $now + 3600 ))
jq_path=".datapoints[0].annotation"

echo "Test query metric type"

# Test by metric types are stored in aggregated namespace
ATTEMPTS=50 TIMEOUT=2 MAX_TIMEOUT=4 \
namespace="agg" \
id='{__name__=\"metric_type_test\",label0=\"label0\",label1=\"label1\",label2=\"label2\"}' \
rangeStart=${start} \
rangeEnd=${end} \
jq_path="$jq_path" expected_value="CAEQAQ==" \
retry_with_backoff dbnode_fetch
}

echo "Run tests"
test_aggregated_graphite_metric
test_aggregated_rollup_rule
test_metric_type_survives_aggregation
2 changes: 2 additions & 0 deletions src/aggregator/aggregation/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ func TestCounterCustomAggregationType(t *testing.T) {
require.Equal(t, float64(338350), v)
case aggregation.Stdev:
require.InDelta(t, 29.01149, v, 0.001)
case aggregation.Last:
require.Equal(t, 0.0, v)
default:
require.Equal(t, float64(0), v)
require.False(t, aggType.IsValidForCounter())
Expand Down
1 change: 1 addition & 0 deletions src/aggregator/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ func (agg *aggregator) AddPassthrough(
ChunkedID: id.ChunkedID{
Data: []byte(metric.ID),
},
Type: metric.Type,
TimeNanos: metric.TimeNanos,
Value: metric.Value,
},
Expand Down
7 changes: 4 additions & 3 deletions src/aggregator/aggregator/counter_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/aggregator/aggregator/elem_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,9 @@ func TestCounterElemBaseResetSetData(t *testing.T) {

func TestCounterElemBaseResetSetDataInvalidTypes(t *testing.T) {
e := counterElemBase{}
err := e.ResetSetData(nil, maggregation.Types{maggregation.Last}, false)
err := e.ResetSetData(nil, maggregation.Types{maggregation.P10}, false)
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "invalid aggregation types Last for counter"))
require.True(t, strings.Contains(err.Error(), "invalid aggregation types P10 for counter"))
}

func TestTimerElemBase(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions src/aggregator/aggregator/elem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,10 @@ func TestCounterResetSetData(t *testing.T) {

func TestCounterResetSetDataInvalidAggregationType(t *testing.T) {
opts := NewOptions()
ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes, applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts)
err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.Last}, applied.DefaultPipeline, 0, NoPrefixNoSuffix)
ce := MustNewCounterElem(nil, policy.EmptyStoragePolicy, maggregation.DefaultTypes,
applied.DefaultPipeline, testNumForwardedTimes, NoPrefixNoSuffix, opts)
err := ce.ResetSetData(testCounterID, testStoragePolicy, maggregation.Types{maggregation.P10},
applied.DefaultPipeline, 0, NoPrefixNoSuffix)
require.Error(t, err)
}

Expand Down Expand Up @@ -1810,6 +1812,7 @@ func testFlushLocalMetricFn() (
return func(
idPrefix []byte,
id id.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
2 changes: 2 additions & 0 deletions src/aggregator/aggregator/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package aggregator
import (
"time"

"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
)
Expand Down Expand Up @@ -83,6 +84,7 @@ const (
type flushLocalMetricFn func(
idPrefix []byte,
id id.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
7 changes: 4 additions & 3 deletions src/aggregator/aggregator/gauge_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions src/aggregator/aggregator/generic_elem.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,10 +537,10 @@ func (e *GenericElem) processValueWithAggregationLock(
for _, point := range toFlush {
switch e.idPrefixSuffixType {
case NoPrefixNoSuffix:
flushLocalFn(nil, e.id, nil, point.TimeNanos, point.Value, e.sp)
flushLocalFn(nil, e.id, metric.GaugeType, nil, point.TimeNanos, point.Value, e.sp)
case WithPrefixWithSuffix:
flushLocalFn(e.FullPrefix(e.opts), e.id, e.TypeStringFor(e.aggTypesOpts, aggType),
point.TimeNanos, point.Value, e.sp)
flushLocalFn(e.FullPrefix(e.opts), e.id, metric.GaugeType,
e.TypeStringFor(e.aggTypesOpts, aggType), point.TimeNanos, point.Value, e.sp)
}
}
} else {
Expand Down
1 change: 1 addition & 0 deletions src/aggregator/aggregator/handler/writer/protobuf.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (w *protobufWriter) prepare(mp aggregated.ChunkedMetricWithStoragePolicy) (
w.m.ID = append(w.m.ID, mp.Suffix...)
w.m.Metric.TimeNanos = mp.TimeNanos
w.m.Metric.Value = mp.Value
w.m.Metric.Type = mp.Type
w.m.StoragePolicy = mp.StoragePolicy
shard := w.shardFn(w.m.ID, w.numShards)
return w.m, shard
Expand Down
4 changes: 4 additions & 0 deletions src/aggregator/aggregator/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/m3db/m3/src/aggregator/aggregator/handler"
"github.com/m3db/m3/src/aggregator/aggregator/handler/writer"
"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/metric/aggregated"
metricid "github.com/m3db/m3/src/metrics/metric/id"
"github.com/m3db/m3/src/metrics/policy"
Expand Down Expand Up @@ -434,6 +435,7 @@ func (l *baseMetricList) flushBefore(beforeNanos int64, flushType flushType) {
func (l *baseMetricList) consumeLocalMetric(
idPrefix []byte,
id metricid.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand All @@ -447,6 +449,7 @@ func (l *baseMetricList) consumeLocalMetric(
chunkedMetricWithPolicy := aggregated.ChunkedMetricWithStoragePolicy{
ChunkedMetric: aggregated.ChunkedMetric{
ChunkedID: chunkedID,
Type: metricType,
TimeNanos: timeNanos,
Value: value,
},
Expand All @@ -463,6 +466,7 @@ func (l *baseMetricList) consumeLocalMetric(
func (l *baseMetricList) discardLocalMetric(
idPrefix []byte,
id metricid.RawID,
metricType metric.Type,
idSuffix []byte,
timeNanos int64,
value float64,
Expand Down
2 changes: 2 additions & 0 deletions src/aggregator/aggregator/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,7 @@ func TestTimedMetricListFlushConsumingAndCollectingTimedMetrics(t *testing.T) {
ChunkedID: id.ChunkedID{
Data: ep.metric.ID,
},
Type: ep.metric.Type,
TimeNanos: alignedStart,
Value: ep.metric.Value,
},
Expand Down Expand Up @@ -1056,6 +1057,7 @@ func TestForwardedMetricListLastStepLocalFlush(t *testing.T) {
Prefix: ep.expectedPrefix,
Data: ep.metric.ID,
},
Type: ep.metric.Type,
TimeNanos: alignedStart,
Value: ep.metric.Values[0],
},
Expand Down
7 changes: 4 additions & 3 deletions src/aggregator/aggregator/timer_elem_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/aggregator/generated-source-files.mk
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ genny-all: genny-aggregator-counter-elem genny-aggregator-timer-elem genny-aggre
genny-aggregator-counter-elem:
cat $(m3db_package_path)/src/aggregator/aggregator/generic_elem.go \
| awk '/^package/{i++}i' \
| sed 's/metric.GaugeType/metric.CounterType/' \
| genny -out=$(m3db_package_path)/src/aggregator/aggregator/counter_elem_gen.go -pkg=aggregator gen \
"timedAggregation=timedCounter lockedAggregation=lockedCounterAggregation typeSpecificAggregation=counterAggregation typeSpecificElemBase=counterElemBase genericElemPool=CounterElemPool GenericElem=CounterElem"

Expand Down
5 changes: 4 additions & 1 deletion src/cmd/services/m3coordinator/ingest/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func (cfg Configuration) NewIngester(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
storeMetricsType bool,
) (*Ingester, error) {
opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions)
opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions, storeMetricsType)
if err != nil {
return nil, err
}
Expand All @@ -58,6 +59,7 @@ func (cfg Configuration) newOptions(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
storeMetricsType bool,
) (Options, error) {
scope := instrumentOptions.MetricsScope().Tagged(
map[string]string{"component": "ingester"},
Expand Down Expand Up @@ -98,5 +100,6 @@ func (cfg Configuration) newOptions(
RetryOptions: cfg.Retry.NewOptions(scope),
Sampler: sampler,
InstrumentOptions: instrumentOptions,
storeMetricsType: storeMetricsType,
}, nil
}
Loading