diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/config.go b/src/cmd/services/m3coordinator/ingest/m3msg/config.go index 3ddeadf505..9d7c17cce2 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/config.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/config.go @@ -21,6 +21,7 @@ package ingestm3msg import ( + "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" @@ -43,9 +44,10 @@ type Configuration struct { // NewIngester creates an ingester with an appender. func (cfg Configuration) NewIngester( appender storage.Appender, + tagOptions models.TagOptions, instrumentOptions instrument.Options, ) (*Ingester, error) { - opts, err := cfg.newOptions(appender, instrumentOptions) + opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions) if err != nil { return nil, err } @@ -54,6 +56,7 @@ func (cfg Configuration) NewIngester( func (cfg Configuration) newOptions( appender storage.Appender, + tagOptions models.TagOptions, instrumentOptions instrument.Options, ) (Options, error) { scope := instrumentOptions.MetricsScope().Tagged( @@ -90,6 +93,7 @@ func (cfg Configuration) newOptions( Appender: appender, Workers: workers, PoolOptions: cfg.OpPool.NewObjectPoolOptions(instrumentOptions), + TagOptions: tagOptions, TagDecoderPool: tagDecoderPool, RetryOptions: cfg.Retry.NewOptions(scope), Sampler: sampler, diff --git a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go index 5118ca1537..65579afbf1 100644 --- a/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go +++ b/src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go @@ -33,10 +33,10 @@ import ( "github.com/m3db/m3/src/query/models" "github.com/m3db/m3/src/query/storage" "github.com/m3db/m3/src/query/ts" - "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" xtime "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" @@ -54,7 +54,8 @@ func TestIngest(t *testing.T) { }, } appender := &mockAppender{} - ingester, err := cfg.NewIngester(appender, instrument.NewOptions()) + ingester, err := cfg.NewIngester(appender, models.NewTagOptions(), + instrument.NewOptions()) require.NoError(t, err) id := newTestID(t, "__name__", "foo", "app", "bar") diff --git a/src/query/server/query.go b/src/query/server/query.go index f04b7dd685..faeb6c2bca 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -121,6 +121,14 @@ type RunOptions struct { // on once it has opened. ListenerCh chan<- net.Listener + // M3MsgListenerCh is a programmatic channel to receive the M3Msg server + // listener on once it has opened. + M3MsgListenerCh chan<- net.Listener + + // DownsamplerReadyCh is a programmatic channel to receive the downsampler + // ready signal once it is open. + DownsamplerReadyCh chan<- struct{} + // CustomHandlers is a list of custom 3rd party handlers. CustomHandlers []options.CustomHandler @@ -228,8 +236,7 @@ func Run(runOpts RunOptions) { instrumentOptions, cfg.ReadWorkerPool, cfg.WriteWorkerPool, - scope, - ) + scope) if err != nil { logger.Fatal("could not create worker pools", zap.Error(err)) } @@ -286,7 +293,8 @@ func Run(runOpts RunOptions) { var cleanup cleanupFn backendStorage, clusterClient, downsampler, cleanup, err = newM3DBStorage( cfg, m3dbClusters, m3dbPoolWrapper, - runOpts, queryCtxOpts, tsdbOpts, instrumentOptions) + runOpts, queryCtxOpts, tsdbOpts, + runOpts.DownsamplerReadyCh, instrumentOptions) if err != nil { logger.Fatal("unable to setup m3db backend", zap.Error(err)) @@ -379,25 +387,33 @@ func Run(runOpts RunOptions) { if cfg.Ingest != nil { logger.Info("starting m3msg server", zap.String("address", cfg.Ingest.M3Msg.Server.ListenAddress)) - ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions) + ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, + tagOptions, instrumentOptions) if err != nil { logger.Fatal("unable to create ingester", zap.Error(err)) } server, err := cfg.Ingest.M3Msg.NewServer( ingester.Ingest, - instrumentOptions.SetMetricsScope(scope.SubScope("ingest-m3msg")), - ) - + instrumentOptions.SetMetricsScope(scope.SubScope("ingest-m3msg"))) if err != nil { logger.Fatal("unable to create m3msg server", zap.Error(err)) } - if err := server.ListenAndServe(); err != nil { + listener, err := net.Listen("tcp", cfg.Ingest.M3Msg.Server.ListenAddress) + if err != nil { + logger.Fatal("unable to open m3msg server", zap.Error(err)) + } + + if runOpts.M3MsgListenerCh != nil { + runOpts.M3MsgListenerCh <- listener + } + + if err := server.Serve(listener); err != nil { logger.Fatal("unable to listen on ingest server", zap.Error(err)) } - logger.Info("started m3msg server ") + logger.Info("started m3msg server", zap.Stringer("addr", listener.Addr())) defer server.Close() } else { logger.Info("no m3msg server configured") @@ -425,6 +441,7 @@ func newM3DBStorage( runOpts RunOptions, queryContextOptions models.QueryContextOptions, tsdbOpts tsdb.Options, + downsamplerReadyCh chan<- struct{}, instrumentOptions instrument.Options, ) (storage.Storage, clusterclient.Client, downsample.Downsampler, cleanupFn, error) { var ( @@ -486,8 +503,19 @@ func newM3DBStorage( } newDownsamplerFn := func() (downsample.Downsampler, error) { - return newDownsampler(cfg.Downsample, clusterClient, + downsampler, err := newDownsampler(cfg.Downsample, clusterClient, fanoutStorage, autoMappingRules, tsdbOpts.TagOptions(), instrumentOptions) + if err != nil { + return nil, err + } + + // Notify the downsampler ready channel that + // the downsampler has now been created and is ready. + if downsamplerReadyCh != nil { + downsamplerReadyCh <- struct{}{} + } + + return downsampler, nil } if clusterClientWaitCh != nil { diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index eb9bd62af6..ab2eb12e23 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -31,17 +31,28 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" + clusterclient "github.com/m3db/m3/src/cluster/client" + "github.com/m3db/m3/src/cluster/kv" "github.com/m3db/m3/src/cmd/services/m3query/config" "github.com/m3db/m3/src/dbnode/client" + "github.com/m3db/m3/src/metrics/generated/proto/metricpb" + "github.com/m3db/m3/src/metrics/generated/proto/rulepb" + "github.com/m3db/m3/src/metrics/policy" + "github.com/m3db/m3/src/msg/generated/proto/msgpb" + m3msgproto "github.com/m3db/m3/src/msg/protocol/proto" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote" "github.com/m3db/m3/src/query/api/v1/handler/prometheus/remote/test" "github.com/m3db/m3/src/query/cost" rpc "github.com/m3db/m3/src/query/generated/proto/rpcpb" "github.com/m3db/m3/src/query/storage/m3" + xclock "github.com/m3db/m3/src/x/clock" xconfig "github.com/m3db/m3/src/x/config" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/serialize" xtest "github.com/m3db/m3/src/x/test" + "go.uber.org/atomic" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -71,6 +82,28 @@ clusters: - namespace: prometheus_metrics type: unaggregated retention: 48h + - namespace: prometheus_metrics_1m_aggregated + type: aggregated + retention: 120h + resolution: 1m + downsample: + all: false + +ingest: + ingester: + workerPoolSize: 100 + opPool: + size: 100 + retry: + maxRetries: 3 + jitter: true + logSampleRate: 0.01 + m3msg: + server: + listenAddress: "0.0.0.0:0" + retry: + maxBackoff: 10s + jitter: true tagOptions: metricName: "_new" @@ -79,18 +112,17 @@ tagOptions: readWorkerPoolPolicy: grow: true size: 100 - shards: 1000 + shards: 100 killProbability: 0.3 writeWorkerPoolPolicy: grow: true size: 100 - shards: 1000 + shards: 100 killProbability: 0.3 - ` -func TestRun(t *testing.T) { +func TestWrite(t *testing.T) { ctrl := gomock.NewController(xtest.Reporter{T: t}) defer ctrl.Finish() @@ -123,10 +155,10 @@ func TestRun(t *testing.T) { gomock.Any(), nil) } - session.EXPECT().Close() + session.EXPECT().Close().AnyTimes() dbClient := client.NewMockClient(ctrl) - dbClient.EXPECT().DefaultSession().Return(session, nil) + dbClient.EXPECT().DefaultSession().Return(session, nil).AnyTimes() cfg.Clusters[0].NewClientFromConfig = m3.NewClientFromConfig( func( @@ -140,15 +172,41 @@ func TestRun(t *testing.T) { interruptCh := make(chan error, 1) doneCh := make(chan struct{}, 1) listenerCh := make(chan net.Listener, 1) + + rulesNamespacesValue := kv.NewMockValue(ctrl) + rulesNamespacesValue.EXPECT().Version().Return(0).AnyTimes() + rulesNamespacesValue.EXPECT().Unmarshal(gomock.Any()).DoAndReturn(func(v proto.Message) error { + msg := v.(*rulepb.Namespaces) + *msg = rulepb.Namespaces{} + return nil + }) + rulesNamespacesWatchable := kv.NewValueWatchable() + rulesNamespacesWatchable.Update(rulesNamespacesValue) + _, rulesNamespacesWatch, err := rulesNamespacesWatchable.Watch() + require.NoError(t, err) + kvClient := kv.NewMockStore(ctrl) + kvClient.EXPECT().Watch(gomock.Any()).Return(rulesNamespacesWatch, nil).AnyTimes() + clusterClient := clusterclient.NewMockClient(ctrl) + clusterClient.EXPECT().KV().Return(kvClient, nil).AnyTimes() + clusterClientCh := make(chan clusterclient.Client, 1) + clusterClientCh <- clusterClient + + downsamplerReadyCh := make(chan struct{}, 1) + go func() { Run(RunOptions{ - Config: cfg, - InterruptCh: interruptCh, - ListenerCh: listenerCh, + Config: cfg, + InterruptCh: interruptCh, + ListenerCh: listenerCh, + ClusterClient: clusterClientCh, + DownsamplerReadyCh: downsamplerReadyCh, }) doneCh <- struct{}{} }() + // Wait for downsampler to be ready. + <-downsamplerReadyCh + // Wait for listener listener := <-listenerCh addr := listener.Addr().String() @@ -171,6 +229,149 @@ func TestRun(t *testing.T) { <-doneCh } +// TestIngest will test an M3Msg being ingested by the coordinator, it also +// makes sure that the tag options is correctly propagated from the config +// all the way to the M3Msg ingester and when written to the DB will include +// the correctly formed ID. +func TestIngest(t *testing.T) { + ctrl := gomock.NewController(xtest.Reporter{T: t}) + defer ctrl.Finish() + + configFile, close := newTestFile(t, "config.yaml", configYAML) + defer close() + + var cfg config.Configuration + err := xconfig.LoadFile(&cfg, configFile.Name(), xconfig.Options{}) + require.NoError(t, err) + + // Override the client creation + require.Equal(t, 1, len(cfg.Clusters)) + + numWrites := atomic.NewInt32(0) + + session := client.NewMockSession(ctrl) + session.EXPECT(). + WriteTagged(ident.NewIDMatcher("prometheus_metrics_1m_aggregated"), + ident.NewIDMatcher(`{_new="first",biz="baz",foo="bar"}`), + gomock.Any(), + gomock.Any(), + 42.0, + gomock.Any(), + nil). + Do(func(_, _, _, _, _, _, _ interface{}) { + numWrites.Add(1) + }) + session.EXPECT().Close().AnyTimes() + + dbClient := client.NewMockClient(ctrl) + dbClient.EXPECT().DefaultSession().Return(session, nil).AnyTimes() + + cfg.Clusters[0].NewClientFromConfig = m3.NewClientFromConfig( + func( + cfg client.Configuration, + params client.ConfigurationParameters, + custom ...client.CustomAdminOption, + ) (client.Client, error) { + return dbClient, nil + }) + + interruptCh := make(chan error, 1) + doneCh := make(chan struct{}, 1) + listenerCh := make(chan net.Listener, 1) + m3msgListenerCh := make(chan net.Listener, 1) + + rulesNamespacesValue := kv.NewMockValue(ctrl) + rulesNamespacesValue.EXPECT().Version().Return(0).AnyTimes() + rulesNamespacesValue.EXPECT().Unmarshal(gomock.Any()).DoAndReturn(func(v proto.Message) error { + msg := v.(*rulepb.Namespaces) + *msg = rulepb.Namespaces{} + return nil + }) + rulesNamespacesWatchable := kv.NewValueWatchable() + rulesNamespacesWatchable.Update(rulesNamespacesValue) + _, rulesNamespacesWatch, err := rulesNamespacesWatchable.Watch() + require.NoError(t, err) + kvClient := kv.NewMockStore(ctrl) + kvClient.EXPECT().Watch(gomock.Any()).Return(rulesNamespacesWatch, nil).AnyTimes() + clusterClient := clusterclient.NewMockClient(ctrl) + clusterClient.EXPECT().KV().Return(kvClient, nil).AnyTimes() + clusterClientCh := make(chan clusterclient.Client, 1) + clusterClientCh <- clusterClient + + downsamplerReadyCh := make(chan struct{}, 1) + + go func() { + Run(RunOptions{ + Config: cfg, + InterruptCh: interruptCh, + ListenerCh: listenerCh, + M3MsgListenerCh: m3msgListenerCh, + ClusterClient: clusterClientCh, + DownsamplerReadyCh: downsamplerReadyCh, + }) + doneCh <- struct{}{} + }() + + // Wait for downsampler to be ready. + <-downsamplerReadyCh + + // Wait for listener + listener := <-listenerCh + addr := listener.Addr().String() + + // Wait for server to come up + waitForServerHealthy(t, addr) + + // Send ingest message. + tagEncoderPool := serialize.NewTagEncoderPool(serialize.NewTagEncoderOptions(), nil) + tagEncoderPool.Init() + tagEncoder := tagEncoderPool.Get() + err = tagEncoder.Encode(ident.MustNewTagStringsIterator( + "_new", "first", + "biz", "baz", + "foo", "bar")) + require.NoError(t, err) + id, ok := tagEncoder.Data() + require.True(t, ok) + sp, err := policy.MustParseStoragePolicy("1m:120h").Proto() + require.NoError(t, err) + + // Copy message. + message, err := proto.Marshal(&metricpb.AggregatedMetric{ + Metric: metricpb.TimedMetricWithStoragePolicy{ + TimedMetric: metricpb.TimedMetric{ + Type: metricpb.MetricType_GAUGE, + Id: id.Bytes(), + TimeNanos: time.Now().UnixNano(), + Value: 42, + }, + StoragePolicy: *sp, + }, + }) + require.NoError(t, err) + + // Encode as m3msg protobuf message. + encoder := m3msgproto.NewEncoder(m3msgproto.NewOptions()) + err = encoder.Encode(&msgpb.Message{ + Value: message, + }) + require.NoError(t, err) + m3msgListener := <-m3msgListenerCh + conn, err := net.Dial("tcp", m3msgListener.Addr().String()) + require.NoError(t, err) + _, err = conn.Write(encoder.Bytes()) + require.NoError(t, err) + + // Now wait for write. + xclock.WaitUntil(func() bool { + return numWrites.Load() == 1 + }, 30*time.Second) + + // Ensure close server performs as expected + interruptCh <- fmt.Errorf("interrupt") + <-doneCh +} + type closeFn func() func newTestFile(t *testing.T, fileName, contents string) (*os.File, closeFn) {