diff --git a/src/cmd/services/m3coordinator/ingest/write.go b/src/cmd/services/m3coordinator/ingest/write.go index b5902b6dcd..199f3325a1 100644 --- a/src/cmd/services/m3coordinator/ingest/write.go +++ b/src/cmd/services/m3coordinator/ingest/write.go @@ -294,13 +294,21 @@ func (d *downsamplerAndWriter) writeToStorage( ) error { storagePolicies, ok := d.writeOverrideStoragePolicies(overrides) if !ok { - return d.writeWithOptions(ctx, storage.WriteQueryOptions{ + // NB(r): Allocate the write query at the top + // of the pooled worker instead of need to pass + // the options down the stack which can cause + // the stack to grow (and sometimes cause stack splits). + writeQuery, err := storage.NewWriteQuery(storage.WriteQueryOptions{ Tags: tags, Datapoints: datapoints, Unit: unit, Annotation: annotation, Attributes: storageAttributesFromPolicy(unaggregatedStoragePolicy), }) + if err != nil { + return err + } + return d.store.Write(ctx, writeQuery) } var ( @@ -314,18 +322,26 @@ func (d *downsamplerAndWriter) writeToStorage( wg.Add(1) d.workerPool.Go(func() { - err := d.writeWithOptions(ctx, storage.WriteQueryOptions{ + // NB(r): Allocate the write query at the top + // of the pooled worker instead of need to pass + // the options down the stack which can cause + // the stack to grow (and sometimes cause stack splits). + writeQuery, err := storage.NewWriteQuery(storage.WriteQueryOptions{ Tags: tags, Datapoints: datapoints, Unit: unit, Annotation: annotation, Attributes: storageAttributesFromPolicy(p), }) + if err == nil { + err = d.store.Write(ctx, writeQuery) + } if err != nil { errLock.Lock() multiErr = multiErr.Add(err) errLock.Unlock() } + wg.Done() }) } @@ -334,17 +350,6 @@ func (d *downsamplerAndWriter) writeToStorage( return multiErr.FinalError() } -func (d *downsamplerAndWriter) writeWithOptions( - ctx context.Context, - opts storage.WriteQueryOptions, -) error { - writeQuery, err := storage.NewWriteQuery(opts) - if err != nil { - return err - } - return d.store.Write(ctx, writeQuery) -} - func (d *downsamplerAndWriter) WriteBatch( ctx context.Context, iter DownsampleAndWriteIter, @@ -397,13 +402,20 @@ func (d *downsamplerAndWriter) WriteBatch( p := p // Capture for lambda. wg.Add(1) d.workerPool.Go(func() { - err := d.writeWithOptions(ctx, storage.WriteQueryOptions{ + // NB(r): Allocate the write query at the top + // of the pooled worker instead of need to pass + // the options down the stack which can cause + // the stack to grow (and sometimes cause stack splits). + writeQuery, err := storage.NewWriteQuery(storage.WriteQueryOptions{ Tags: value.Tags, Datapoints: value.Datapoints, Unit: value.Unit, Annotation: value.Annotation, Attributes: storageAttributesFromPolicy(p), }) + if err == nil { + err = d.store.Write(ctx, writeQuery) + } if err != nil { addError(err) } diff --git a/src/cmd/services/m3query/config/config.go b/src/cmd/services/m3query/config/config.go index 099ee151fb..2d7fd642f4 100644 --- a/src/cmd/services/m3query/config/config.go +++ b/src/cmd/services/m3query/config/config.go @@ -85,6 +85,12 @@ var ( // By default, raise errors instead of truncating results so // users do not experience see unexpected results. defaultRequireExhaustive = true + + defaultWriteWorkerPool = xconfig.WorkerPoolPolicy{ + GrowOnDemand: true, + Size: 4096, + KillWorkerProbability: 0.001, + } ) // Configuration is the configuration for the query service. @@ -129,7 +135,7 @@ type Configuration struct { ReadWorkerPool xconfig.WorkerPoolPolicy `yaml:"readWorkerPoolPolicy"` // WriteWorkerPool is the worker pool policy for write requests. - WriteWorkerPool xconfig.WorkerPoolPolicy `yaml:"writeWorkerPoolPolicy"` + WriteWorkerPool *xconfig.WorkerPoolPolicy `yaml:"writeWorkerPoolPolicy"` // WriteForwarding is the write forwarding options. WriteForwarding WriteForwardingConfiguration `yaml:"writeForwarding"` @@ -177,6 +183,14 @@ type Configuration struct { Debug config.DebugConfiguration `yaml:"debug"` } +// WriteWorkerPoolOrDefault returns the write worker pool config or default. +func (c Configuration) WriteWorkerPoolOrDefault() xconfig.WorkerPoolPolicy { + if c.WriteWorkerPool != nil { + return *c.WriteWorkerPool + } + return defaultWriteWorkerPool +} + // WriteForwardingConfiguration is the write forwarding configuration. type WriteForwardingConfiguration struct { PromRemoteWrite handleroptions.PromWriteHandlerForwardingOptions `yaml:"promRemoteWrite"` diff --git a/src/query/server/query.go b/src/query/server/query.go index 8352ecf0e5..490b065ce8 100644 --- a/src/query/server/query.go +++ b/src/query/server/query.go @@ -103,9 +103,8 @@ var ( }, } - defaultDownsamplerAndWriterWorkerPoolSize = 1024 - defaultCarbonIngesterWorkerPoolSize = 1024 - defaultPerCPUMultiProcess = 0.5 + defaultCarbonIngesterWorkerPoolSize = 1024 + defaultPerCPUMultiProcess = 0.5 ) type cleanupFn func() error @@ -324,7 +323,7 @@ func Run(runOpts RunOptions) { readWorkerPool, writeWorkerPool, err := pools.BuildWorkerPools( instrumentOptions, cfg.ReadWorkerPool, - cfg.WriteWorkerPool, + cfg.WriteWorkerPoolOrDefault(), scope) if err != nil { logger.Fatal("could not create worker pools", zap.Error(err)) @@ -450,6 +449,7 @@ func Run(runOpts RunOptions) { downsamplerAndWriter, err := newDownsamplerAndWriter( backendStorage, downsampler, + cfg.WriteWorkerPoolOrDefault(), instrumentOptions, ) if err != nil { @@ -1202,15 +1202,15 @@ func startCarbonIngestion( func newDownsamplerAndWriter( storage storage.Storage, downsampler downsample.Downsampler, + workerPoolPolicy xconfig.WorkerPoolPolicy, iOpts instrument.Options, ) (ingest.DownsamplerAndWriter, error) { // Make sure the downsampler and writer gets its own PooledWorkerPool and that its not shared with any other // codepaths because PooledWorkerPools can deadlock if used recursively. - downAndWriterWorkerPoolOpts := xsync.NewPooledWorkerPoolOptions(). - SetGrowOnDemand(true). - SetKillWorkerProbability(0.001) - downAndWriteWorkerPool, err := xsync.NewPooledWorkerPool( - defaultDownsamplerAndWriterWorkerPoolSize, downAndWriterWorkerPoolOpts) + downAndWriterWorkerPoolOpts, writePoolSize := workerPoolPolicy.Options() + downAndWriterWorkerPoolOpts = downAndWriterWorkerPoolOpts.SetInstrumentOptions(iOpts. + SetMetricsScope(iOpts.MetricsScope().SubScope("ingest-writer-worker-pool"))) + downAndWriteWorkerPool, err := xsync.NewPooledWorkerPool(writePoolSize, downAndWriterWorkerPoolOpts) if err != nil { return nil, err }