Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[coordinator] Use tag options specified in config with M3Msg ingester #2212

Merged
6 changes: 5 additions & 1 deletion src/cmd/services/m3coordinator/ingest/m3msg/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package ingestm3msg

import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
Expand All @@ -43,9 +44,10 @@ type Configuration struct {
// NewIngester creates an ingester with an appender.
func (cfg Configuration) NewIngester(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
) (*Ingester, error) {
opts, err := cfg.newOptions(appender, instrumentOptions)
opts, err := cfg.newOptions(appender, tagOptions, instrumentOptions)
if err != nil {
return nil, err
}
Expand All @@ -54,6 +56,7 @@ func (cfg Configuration) NewIngester(

func (cfg Configuration) newOptions(
appender storage.Appender,
tagOptions models.TagOptions,
instrumentOptions instrument.Options,
) (Options, error) {
scope := instrumentOptions.MetricsScope().Tagged(
Expand Down Expand Up @@ -90,6 +93,7 @@ func (cfg Configuration) newOptions(
Appender: appender,
Workers: workers,
PoolOptions: cfg.OpPool.NewObjectPoolOptions(instrumentOptions),
TagOptions: tagOptions,
TagDecoderPool: tagDecoderPool,
RetryOptions: cfg.Retry.NewOptions(scope),
Sampler: sampler,
Expand Down
5 changes: 3 additions & 2 deletions src/cmd/services/m3coordinator/ingest/m3msg/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ import (
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/query/storage"
"github.com/m3db/m3/src/query/ts"
"github.com/m3db/m3/src/x/serialize"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/serialize"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/mock/gomock"
Expand All @@ -54,7 +54,8 @@ func TestIngest(t *testing.T) {
},
}
appender := &mockAppender{}
ingester, err := cfg.NewIngester(appender, instrument.NewOptions())
ingester, err := cfg.NewIngester(appender, models.NewTagOptions(),
instrument.NewOptions())
require.NoError(t, err)

id := newTestID(t, "__name__", "foo", "app", "bar")
Expand Down
42 changes: 35 additions & 7 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ type RunOptions struct {
// on once it has opened.
ListenerCh chan<- net.Listener

// M3MsgListenerCh is a programmatic channel to receive the M3Msg server
// listener on once it has opened.
M3MsgListenerCh chan<- net.Listener

// DownsamplerReadyCh is a programmatic channel to receive the downsampler
// ready signal once it is open.
DownsamplerReadyCh chan<- struct{}

// CustomHandlers is a list of custom 3rd party handlers.
CustomHandlers []options.CustomHandler

Expand Down Expand Up @@ -286,7 +294,8 @@ func Run(runOpts RunOptions) {
var cleanup cleanupFn
backendStorage, clusterClient, downsampler, cleanup, err = newM3DBStorage(
cfg, m3dbClusters, m3dbPoolWrapper,
runOpts, queryCtxOpts, tsdbOpts, instrumentOptions)
runOpts, queryCtxOpts, tsdbOpts,
runOpts.DownsamplerReadyCh, instrumentOptions)

if err != nil {
logger.Fatal("unable to setup m3db backend", zap.Error(err))
Expand Down Expand Up @@ -379,25 +388,33 @@ func Run(runOpts RunOptions) {
if cfg.Ingest != nil {
logger.Info("starting m3msg server",
zap.String("address", cfg.Ingest.M3Msg.Server.ListenAddress))
ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage, instrumentOptions)
ingester, err := cfg.Ingest.Ingester.NewIngester(backendStorage,
tagOptions, instrumentOptions)
if err != nil {
logger.Fatal("unable to create ingester", zap.Error(err))
}

server, err := cfg.Ingest.M3Msg.NewServer(
ingester.Ingest,
instrumentOptions.SetMetricsScope(scope.SubScope("ingest-m3msg")),
)

instrumentOptions.SetMetricsScope(scope.SubScope("ingest-m3msg")))
if err != nil {
logger.Fatal("unable to create m3msg server", zap.Error(err))
}

if err := server.ListenAndServe(); err != nil {
listener, err := net.Listen("tcp", cfg.Ingest.M3Msg.Server.ListenAddress)
if err != nil {
logger.Fatal("unable to open m3msg server", zap.Error(err))
}

if runOpts.M3MsgListenerCh != nil {
runOpts.M3MsgListenerCh <- listener
}

if err := server.Serve(listener); err != nil {
logger.Fatal("unable to listen on ingest server", zap.Error(err))
}

logger.Info("started m3msg server ")
logger.Info("started m3msg server", zap.Stringer("addr", listener.Addr()))
defer server.Close()
} else {
logger.Info("no m3msg server configured")
Expand Down Expand Up @@ -425,6 +442,7 @@ func newM3DBStorage(
runOpts RunOptions,
queryContextOptions models.QueryContextOptions,
tsdbOpts tsdb.Options,
downsamplerReadyCh chan<- struct{},
instrumentOptions instrument.Options,
) (storage.Storage, clusterclient.Client, downsample.Downsampler, cleanupFn, error) {
var (
Expand Down Expand Up @@ -495,6 +513,12 @@ func newM3DBStorage(
// since the cluster client will return errors until it's initialized itself
// and will fail constructing the downsampler consequently
downsampler = downsample.NewAsyncDownsampler(func() (downsample.Downsampler, error) {
if downsamplerReadyCh != nil {
defer func() {
downsamplerReadyCh <- struct{}{}
}()
}

<-clusterClientWaitCh
return newDownsamplerFn()
}, nil)
Expand All @@ -504,6 +528,10 @@ func newM3DBStorage(
if err != nil {
return nil, nil, nil, nil, err
}

if downsamplerReadyCh != nil {
downsamplerReadyCh <- struct{}{}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete this? should get done in the defer right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in an else clause - defer is inside an inner function inside the other side of the else.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I think)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, then won’t you miss a send when the newDownsamplerFn errors on this branch? Can we defer before this if else to avoid this? Could see this being a bit dangerous going forward

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll move it into newDownsamplerFn so it's only done in one place.

I can't defer from the top level function since some of these are async clients and are not ready by the end of the top level function creating these async clients.

For some further contextDownsamplerReadyCh is only used by tests to ensure failures to write a datapoint doesn't fail due to the async downsampler being not ready. The DownsamplerReadyCh should not be notified unless the downsampler is truly ready, if an error occurs then the channel should never receive an update.

}
}

Expand Down
Loading