Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[query] Add ClusterNamespacesWatcher and use to generate downsample automapper rules #2782

Merged
merged 3 commits into from
Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 78 additions & 21 deletions src/cmd/services/m3coordinator/downsample/downsampler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
package downsample

import (
"sync"
"time"

"github.com/m3db/m3/src/metrics/generated/proto/metricpb"
"github.com/m3db/m3/src/query/storage/m3"
"github.com/m3db/m3/src/query/ts"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

Expand Down Expand Up @@ -79,8 +83,10 @@ type SamplesAppender interface {
}

type downsampler struct {
opts DownsamplerOptions
agg agg
opts DownsamplerOptions
agg agg

sync.RWMutex
metricsAppenderOpts metricsAppenderOptions
}

Expand All @@ -94,34 +100,85 @@ func newDownsampler(opts downsamplerOptions) (*downsampler, error) {
return nil, err
}

downsampler := &downsampler{
opts: opts.opts,
agg: opts.agg,
metricsAppenderOpts: defaultMetricsAppenderOptions(opts.opts, opts.agg),
}

// No need to retain watch as NamespaceWatcher.Close() will handle closing any watches
// generated by creating listeners.
downsampler.opts.ClusterNamespacesWatcher.RegisterListener(downsampler)

return downsampler, nil
}

func defaultMetricsAppenderOptions(opts DownsamplerOptions, agg agg) metricsAppenderOptions {
debugLogging := false
logger := opts.opts.InstrumentOptions.Logger()
logger := opts.InstrumentOptions.Logger()
if logger.Check(zapcore.DebugLevel, "debug") != nil {
debugLogging = true
}

metricsAppenderOpts := metricsAppenderOptions{
agg: opts.agg.aggregator,
clientRemote: opts.agg.clientRemote,
defaultStagedMetadatasProtos: opts.agg.defaultStagedMetadatasProtos,
clockOpts: opts.agg.clockOpts,
tagEncoderPool: opts.agg.pools.tagEncoderPool,
matcher: opts.agg.matcher,
metricTagsIteratorPool: opts.agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
augmentM3Tags: opts.agg.m3PrefixFilter,
return metricsAppenderOptions{
agg: agg.aggregator,
clientRemote: agg.clientRemote,
clockOpts: agg.clockOpts,
tagEncoderPool: agg.pools.tagEncoderPool,
matcher: agg.matcher,
metricTagsIteratorPool: agg.pools.metricTagsIteratorPool,
debugLogging: debugLogging,
logger: logger,
augmentM3Tags: agg.m3PrefixFilter,
}

return &downsampler{
opts: opts.opts,
agg: opts.agg,
metricsAppenderOpts: metricsAppenderOpts,
}, nil
}

func (d *downsampler) NewMetricsAppender() (MetricsAppender, error) {
metricsAppender := d.agg.pools.metricsAppenderPool.Get()
metricsAppender.reset(d.metricsAppenderOpts)

d.RLock()
newMetricsAppenderOpts := d.metricsAppenderOpts
d.RUnlock()

metricsAppender.reset(newMetricsAppenderOpts)

return metricsAppender, nil
}

func (d *downsampler) OnUpdate(namespaces m3.ClusterNamespaces) {
logger := d.opts.InstrumentOptions.Logger()

if len(namespaces) == 0 {
logger.Debug("received empty list of namespaces. not updating staged metadata")
return
}

autoMappingRules, err := NewAutoMappingRules(namespaces)
if err != nil {
logger.Error("could not generate automapping rules for aggregated namespaces."+
" aggregations will continue with current configuration.", zap.Error(err))
return
}
defaultStagedMetadatasProtos := make([]metricpb.StagedMetadatas, 0, len(autoMappingRules))
for _, rule := range autoMappingRules {
metadatas, err := rule.StagedMetadatas()
if err != nil {
logger.Error("could not generate staged metadata from automapping rules."+
" aggregations will continue with current configuration.", zap.Error(err))
return
}

var metadatasProto metricpb.StagedMetadatas
if err := metadatas.ToProto(&metadatasProto); err != nil {
logger.Error("could not generate staged metadata from automapping rules."+
" aggregations will continue with current configuration.", zap.Error(err))
return
}

defaultStagedMetadatasProtos = append(defaultStagedMetadatasProtos, metadatasProto)
}

d.Lock()
d.metricsAppenderOpts.defaultStagedMetadatasProtos = defaultStagedMetadatasProtos
Comment on lines +162 to +182
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it may be better to do this whole section under d.Lock() so that you can mutate d.metricsAppenderOpts.defaultStagedMetadatasProtos directly without needing the alloc on 162?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I think that only works if you end up with the same number of automapping rules after the update -- which is probably a corner case. Most likely, you'll have more automappings rules than before (as a result of a namespace addition) meaning we'd need re-size the existing one.

d.Unlock()
}
146 changes: 118 additions & 28 deletions src/cmd/services/m3coordinator/downsample/downsampler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
"github.com/m3db/m3/src/aggregator/client"
clusterclient "github.com/m3db/m3/src/cluster/client"
"github.com/m3db/m3/src/cluster/kv/mem"
dbclient "github.com/m3db/m3/src/dbnode/client"
"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/metrics/generated/proto/metricpb"
"github.com/m3db/m3/src/metrics/generated/proto/rulepb"
"github.com/m3db/m3/src/metrics/matcher"
"github.com/m3db/m3/src/metrics/metadata"
Expand All @@ -44,10 +46,12 @@ import (
"github.com/m3db/m3/src/metrics/transformation"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/storage/m3"
"github.com/m3db/m3/src/query/storage/m3/storagemetadata"
"github.com/m3db/m3/src/query/storage/mock"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xio "github.com/m3db/m3/src/x/io"
"github.com/m3db/m3/src/x/pool"
Expand All @@ -71,16 +75,45 @@ const (
nameTag = "__name__"
)

func TestDownsamplerAggregationWithAutoMappingRules(t *testing.T) {
func TestDownsamplerAggregationWithAutoMappingRulesFromNamespacesWatcher(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

gaugeMetrics, _ := testGaugeMetrics(testGaugeMetricsOptions{})
require.Equal(t, 1, len(gaugeMetrics))

gaugeMetric := gaugeMetrics[0]
numSamples := len(gaugeMetric.samples)

testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []AutoMappingRule{
{
Aggregations: []aggregation.Type{testAggregationType},
Policies: testAggregationStoragePolicies,
ingest: &testDownsamplerOptionsIngest{
gaugeMetrics: gaugeMetrics,
},
expect: &testDownsamplerOptionsExpect{
writes: []testExpectedWrite{
{
tags: gaugeMetric.tags,
// NB(nate): Automapping rules generated from cluster namespaces currently
// hardcode 'Last' as the aggregation type. As such, expect value to be the last value
// in the sample.
values: []expectedValue{{value: gaugeMetric.samples[numSamples-1]}},
},
},
},
})

origStagedMetadata := originalStagedMetadata(t, testDownsampler)

session := dbclient.NewMockSession(ctrl)
setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("2s:1d"),
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
Session: session,
})

waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata)

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}
Expand Down Expand Up @@ -181,7 +214,10 @@ func TestDownsamplerAggregationWithRulesConfigMappingRules(t *testing.T) {
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMappingRule(t *testing.T) {
func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMappingRuleFromNamespacesWatcher(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "foo_metric",
Expand All @@ -192,15 +228,6 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMapp
},
}
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []AutoMappingRule{
{
Aggregations: []aggregation.Type{aggregation.Sum},
Policies: policy.StoragePolicies{
policy.MustParseStoragePolicy("2s:24h"),
policy.MustParseStoragePolicy("4s:48h"),
},
},
},
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Expand Down Expand Up @@ -231,11 +258,14 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMapp
Retention: 24 * time.Hour,
},
},
// Expect the sum to still be used for the storage
// Expect last to still be used for the storage
// policy 4s:48h.
{
tags: gaugeMetric.tags,
values: []expectedValue{{value: 60}},
tags: gaugeMetric.tags,
// NB(nate): Automapping rules generated from cluster namespaces currently
// hardcode 'Last' as the aggregation type. As such, expect value to be the last value
// in the sample.
values: []expectedValue{{value: 0}},
attributes: &storagemetadata.Attributes{
MetricsType: storagemetadata.AggregatedMetricsType,
Resolution: 4 * time.Second,
Expand All @@ -246,11 +276,31 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesPartialReplaceAutoMapp
},
})

origStagedMetadata := originalStagedMetadata(t, testDownsampler)

session := dbclient.NewMockSession(ctrl)
setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("2s:24h"),
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
Session: session,
}, m3.AggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("4s:48h"),
Resolution: 4 * time.Second,
Retention: 48 * time.Hour,
Session: session,
})

waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata)

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}

func TestDownsamplerAggregationWithRulesConfigMappingRulesReplaceAutoMappingRule(t *testing.T) {
func TestDownsamplerAggregationWithRulesConfigMappingRulesReplaceAutoMappingRuleFromNamespacesWatcher(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

gaugeMetric := testGaugeMetric{
tags: map[string]string{
nameTag: "foo_metric",
Expand All @@ -261,14 +311,6 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesReplaceAutoMappingRule
},
}
testDownsampler := newTestDownsampler(t, testDownsamplerOptions{
autoMappingRules: []AutoMappingRule{
{
Aggregations: []aggregation.Type{aggregation.Sum},
Policies: policy.StoragePolicies{
policy.MustParseStoragePolicy("2s:24h"),
},
},
},
rulesConfig: &RulesConfiguration{
MappingRules: []MappingRuleConfiguration{
{
Expand Down Expand Up @@ -303,6 +345,18 @@ func TestDownsamplerAggregationWithRulesConfigMappingRulesReplaceAutoMappingRule
},
})

origStagedMetadata := originalStagedMetadata(t, testDownsampler)

session := dbclient.NewMockSession(ctrl)
setAggregatedNamespaces(t, testDownsampler, session, m3.AggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("2s:24h"),
Resolution: 2 * time.Second,
Retention: 24 * time.Hour,
Session: session,
})

waitForStagedMetadataUpdate(t, testDownsampler, origStagedMetadata)

// Test expected output
testDownsamplerAggregation(t, testDownsampler)
}
Expand Down Expand Up @@ -1181,6 +1235,27 @@ func TestDownsamplerAggregationWithRemoteAggregatorClient(t *testing.T) {
testDownsamplerRemoteAggregation(t, testDownsampler)
}

func originalStagedMetadata(t *testing.T, testDownsampler testDownsampler) []metricpb.StagedMetadatas {
ds, ok := testDownsampler.downsampler.(*downsampler)
require.True(t, ok)

origStagedMetadata := ds.metricsAppenderOpts.defaultStagedMetadatasProtos
return origStagedMetadata
}

func waitForStagedMetadataUpdate(t *testing.T, testDownsampler testDownsampler, origStagedMetadata []metricpb.StagedMetadatas) {
ds, ok := testDownsampler.downsampler.(*downsampler)
require.True(t, ok)

require.True(t, clock.WaitUntil(func() bool {
ds.RLock()
defer ds.RUnlock()

return !assert.ObjectsAreEqual(origStagedMetadata, ds.metricsAppenderOpts.defaultStagedMetadatasProtos)
}, time.Second))

}

type testExpectedWrite struct {
tags map[string]string
values []expectedValue // use values for multi expected values
Expand Down Expand Up @@ -1582,6 +1657,21 @@ func testDownsamplerAggregationIngest(
}
}

func setAggregatedNamespaces(
t *testing.T,
testDownsampler testDownsampler,
session dbclient.Session,
namespaces ...m3.AggregatedClusterNamespaceDefinition,
) {
clusters, err := m3.NewClusters(m3.UnaggregatedClusterNamespaceDefinition{
NamespaceID: ident.StringID("default"),
Retention: 48 * time.Hour,
Session: session,
}, namespaces...)
require.NoError(t, err)
require.NoError(t, testDownsampler.opts.ClusterNamespacesWatcher.Update(clusters.ClusterNamespaces()))
}

func tagsToStringMap(tags models.Tags) map[string]string {
stringMap := make(map[string]string, tags.Len())
for _, t := range tags.Tags {
Expand Down Expand Up @@ -1686,7 +1776,7 @@ func newTestDownsampler(t *testing.T, opts testDownsamplerOptions) testDownsampl
Storage: storage,
ClusterClient: clusterclient.NewMockClient(gomock.NewController(t)),
RulesKVStore: rulesKVStore,
AutoMappingRules: opts.autoMappingRules,
ClusterNamespacesWatcher: m3.NewClusterNamespacesWatcher(),
ClockOptions: clockOpts,
InstrumentOptions: instrumentOpts,
TagEncoderOptions: tagEncoderOptions,
Expand Down
Loading