Skip to content

Commit

Permalink
[coordinator] Add Graphite rewrite cleanup directive for cleansing in…
Browse files Browse the repository at this point in the history
…coming metrics (#3047)
  • Loading branch information
robskillington authored Dec 27, 2020
1 parent f19fa9a commit 7f34add
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 30 deletions.
17 changes: 8 additions & 9 deletions src/cluster/etcd/watchmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
"golang.org/x/net/context"

"github.com/m3db/m3/src/x/clock"
)

func TestWatchChan(t *testing.T) {
Expand Down Expand Up @@ -242,13 +244,9 @@ func TestWatchNoLeader(t *testing.T) {
require.NoError(t, err)

// give some time for watch to be updated
for i := 0; i < 10; i++ {
if atomic.LoadInt32(&updateCalled) == int32(2) {
break
}
time.Sleep(watchInitAndRetryDelay)
runtime.Gosched()
}
require.True(t, clock.WaitUntil(func() bool {
return atomic.LoadInt32(&updateCalled) >= 2
}, 30*time.Second))

updates := atomic.LoadInt32(&updateCalled)
if updates < 2 {
Expand Down Expand Up @@ -295,9 +293,10 @@ func TestWatchCompactedRevision(t *testing.T) {
})

go wh.Watch("foo")
time.Sleep(3 * wh.opts.WatchChanInitTimeout())

assert.Equal(t, int32(3), atomic.LoadInt32(updateCalled))
require.True(t, clock.WaitUntil(func() bool {
return atomic.LoadInt32(updateCalled) == 3
}, 30*time.Second))

lastRead := atomic.LoadInt32(updateCalled)
ec.Put(context.Background(), "foo", "bar-11")
Expand Down
17 changes: 9 additions & 8 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package ingestcarbon implements a carbon ingester.
package ingestcarbon

import (
Expand Down Expand Up @@ -73,7 +74,7 @@ var (
type Options struct {
InstrumentOptions instrument.Options
WorkerPool xsync.PooledWorkerPool
IngesterConfig *config.CarbonIngesterConfiguration
IngesterConfig config.CarbonIngesterConfiguration
}

// CarbonIngesterRules contains the carbon ingestion rules.
Expand All @@ -86,11 +87,9 @@ func (o *Options) Validate() error {
if o.InstrumentOptions == nil {
return errIOptsMustBeSet
}

if o.WorkerPool == nil {
return errWorkerPoolMustBeSet
}

return nil
}

Expand Down Expand Up @@ -277,10 +276,11 @@ func (i *ingester) Handle(conn net.Conn) {
// Interfaces require a context be passed, but M3DB client already has timeouts
// built in and allocating a new context each time is expensive so we just pass
// the same context always and rely on M3DB client timeouts.
ctx = context.Background()
wg = sync.WaitGroup{}
s = carbon.NewScanner(conn, i.opts.InstrumentOptions)
logger = i.opts.InstrumentOptions.Logger()
ctx = context.Background()
wg = sync.WaitGroup{}
s = carbon.NewScanner(conn, i.opts.InstrumentOptions)
logger = i.opts.InstrumentOptions.Logger()
rewrite = &i.opts.IngesterConfig.Rewrite
)

logger.Debug("handling new carbon ingestion connection")
Expand All @@ -289,8 +289,9 @@ func (i *ingester) Handle(conn net.Conn) {
name, timestamp, value := s.Metric()

resources := i.getLineResources()

// Copy name since scanner bytes are recycled.
resources.name = append(resources.name[:0], name...)
resources.name = copyAndRewrite(resources.name, name, rewrite)

wg.Add(1)
i.opts.WorkerPool.Go(func() {
Expand Down
3 changes: 1 addition & 2 deletions src/cmd/services/m3coordinator/ingest/carbon/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,7 @@ func TestGenerateTagsFromName(t *testing.T) {
func newTestOpts(rules CarbonIngesterRules) Options {
cfg := config.CarbonIngesterConfiguration{Rules: rules.Rules}
opts := testOptions
opts.IngesterConfig = &cfg

opts.IngesterConfig = cfg
return opts
}

Expand Down
88 changes: 88 additions & 0 deletions src/cmd/services/m3coordinator/ingest/carbon/rewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingestcarbon

import (
"github.com/m3db/m3/src/cmd/services/m3query/config"
)

// nolint: gocyclo
func copyAndRewrite(
dst, src []byte,
cfg *config.CarbonIngesterRewriteConfiguration,
) []byte {
if cfg == nil || !cfg.Cleanup {
// No rewrite required.
return append(dst[:0], src...)
}

// Copy into dst as we rewrite.
dst = dst[:0]
leadingDots := true
numDots := 0
for _, c := range src {
if c == '.' {
numDots++
} else {
numDots = 0
leadingDots = false
}

if leadingDots {
// Currently processing leading dots.
continue
}

if numDots > 1 {
// Do not keep multiple dots.
continue
}

if !(c >= 'a' && c <= 'z') &&
!(c >= 'A' && c <= 'Z') &&
!(c >= '0' && c <= '9') &&
c != '.' &&
c != '-' &&
c != '_' &&
c != ':' &&
c != '#' {
// Invalid character, replace with underscore.
if n := len(dst); n > 0 && dst[n-1] == '_' {
// Preceding character already underscore.
continue
}
dst = append(dst, '_')
continue
}

// Valid character and not proceeding dot or multiple dots.
dst = append(dst, c)
}
for i := len(dst) - 1; i >= 0; i-- {
if dst[i] != '.' {
// Found non dot.
break
}
// Remove trailing dot.
dst = dst[:i]
}
return dst
}
124 changes: 124 additions & 0 deletions src/cmd/services/m3coordinator/ingest/carbon/rewrite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package ingestcarbon

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/m3db/m3/src/cmd/services/m3query/config"
)

func TestCopyAndRewrite(t *testing.T) {
tests := []struct {
name string
input string
expected string
cfg *config.CarbonIngesterRewriteConfiguration
}{
{
name: "bad but no rewrite",
input: "foo$$.bar%%.baz@@",
expected: "foo$$.bar%%.baz@@",
cfg: nil,
},
{
name: "bad but no rewrite cleanup",
input: "foo$$.bar%%.baz@@",
expected: "foo$$.bar%%.baz@@",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: false,
},
},
{
name: "good with rewrite cleanup",
input: "foo.bar.baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "bad with rewrite cleanup",
input: "foo$$.bar%%.baz@@",
expected: "foo_.bar_.baz_",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "collapse two dots with rewrite cleanup",
input: "foo..bar.baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "collapse three and two dots with rewrite cleanup",
input: "foo...bar..baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "remove leading dot with rewrite cleanup",
input: ".foo.bar.baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "remove multiple leading dots with rewrite cleanup",
input: "..foo.bar.baz",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "remove trailing dot with rewrite cleanup",
input: "foo.bar.baz.",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
{
name: "remove multiple trailing dots with rewrite cleanup",
input: "foo.bar.baz..",
expected: "foo.bar.baz",
cfg: &config.CarbonIngesterRewriteConfiguration{
Cleanup: true,
},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
actual := copyAndRewrite(nil, []byte(test.input), test.cfg)
require.Equal(t, test.expected, string(actual))
})
}
}
18 changes: 15 additions & 3 deletions src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,9 +452,21 @@ type CarbonConfiguration struct {

// CarbonIngesterConfiguration is the configuration struct for carbon ingestion.
type CarbonIngesterConfiguration struct {
ListenAddress string `yaml:"listenAddress"`
MaxConcurrency int `yaml:"maxConcurrency"`
Rules []CarbonIngesterRuleConfiguration `yaml:"rules"`
ListenAddress string `yaml:"listenAddress"`
MaxConcurrency int `yaml:"maxConcurrency"`
Rewrite CarbonIngesterRewriteConfiguration `yaml:"rewrite"`
Rules []CarbonIngesterRuleConfiguration `yaml:"rules"`
}

// CarbonIngesterRewriteConfiguration is the configuration for rewriting
// metrics at ingestion.
type CarbonIngesterRewriteConfiguration struct {
// Cleanup will perform:
// - Trailing/leading dot elimination.
// - Double dot elimination.
// - Irregular char replacement with underscores (_), currently irregular
// is defined as not being in [0-9a-zA-Z-_:#].
Cleanup bool `yaml:"cleanup"`
}

// LookbackDurationOrDefault validates the LookbackDuration
Expand Down
13 changes: 5 additions & 8 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,12 +590,10 @@ func Run(runOpts RunOptions) {
}

if cfg.Carbon != nil && cfg.Carbon.Ingester != nil {
server, ok := startCarbonIngestion(cfg.Carbon, listenerOpts,
server := startCarbonIngestion(*cfg.Carbon.Ingester, listenerOpts,
instrumentOptions, logger, m3dbClusters, clusterNamespacesWatcher,
downsamplerAndWriter)
if ok {
defer server.Close()
}
defer server.Close()
}

// Wait for process interrupt.
Expand Down Expand Up @@ -1095,15 +1093,14 @@ func startGRPCServer(
}

func startCarbonIngestion(
cfg *config.CarbonConfiguration,
ingesterCfg config.CarbonIngesterConfiguration,
listenerOpts xnet.ListenerOptions,
iOpts instrument.Options,
logger *zap.Logger,
m3dbClusters m3.Clusters,
clusterNamespacesWatcher m3.ClusterNamespacesWatcher,
downsamplerAndWriter ingest.DownsamplerAndWriter,
) (xserver.Server, bool) {
ingesterCfg := cfg.Ingester
) xserver.Server {
logger.Info("carbon ingestion enabled, configuring ingester")

// Setup worker pool.
Expand Down Expand Up @@ -1167,7 +1164,7 @@ func startCarbonIngestion(

logger.Info("started carbon ingestion server", zap.String("listenAddress", carbonListenAddress))

return carbonServer, true
return carbonServer
}

func newDownsamplerAndWriter(
Expand Down

0 comments on commit 7f34add

Please sign in to comment.