Skip to content

Commit

Permalink
[coordinator] Add metrics and configurability of downsample and inges…
Browse files Browse the repository at this point in the history
…t writer worker pool (#2797)
  • Loading branch information
robskillington authored Oct 27, 2020
1 parent 0e2f31f commit 8510dd7
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 24 deletions.
40 changes: 26 additions & 14 deletions src/cmd/services/m3coordinator/ingest/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,13 +294,21 @@ func (d *downsamplerAndWriter) writeToStorage(
) error {
storagePolicies, ok := d.writeOverrideStoragePolicies(overrides)
if !ok {
return d.writeWithOptions(ctx, storage.WriteQueryOptions{
// NB(r): Allocate the write query at the top
// of the pooled worker instead of need to pass
// the options down the stack which can cause
// the stack to grow (and sometimes cause stack splits).
writeQuery, err := storage.NewWriteQuery(storage.WriteQueryOptions{
Tags: tags,
Datapoints: datapoints,
Unit: unit,
Annotation: annotation,
Attributes: storageAttributesFromPolicy(unaggregatedStoragePolicy),
})
if err != nil {
return err
}
return d.store.Write(ctx, writeQuery)
}

var (
Expand All @@ -314,18 +322,26 @@ func (d *downsamplerAndWriter) writeToStorage(

wg.Add(1)
d.workerPool.Go(func() {
err := d.writeWithOptions(ctx, storage.WriteQueryOptions{
// NB(r): Allocate the write query at the top
// of the pooled worker instead of need to pass
// the options down the stack which can cause
// the stack to grow (and sometimes cause stack splits).
writeQuery, err := storage.NewWriteQuery(storage.WriteQueryOptions{
Tags: tags,
Datapoints: datapoints,
Unit: unit,
Annotation: annotation,
Attributes: storageAttributesFromPolicy(p),
})
if err == nil {
err = d.store.Write(ctx, writeQuery)
}
if err != nil {
errLock.Lock()
multiErr = multiErr.Add(err)
errLock.Unlock()
}

wg.Done()
})
}
Expand All @@ -334,17 +350,6 @@ func (d *downsamplerAndWriter) writeToStorage(
return multiErr.FinalError()
}

func (d *downsamplerAndWriter) writeWithOptions(
ctx context.Context,
opts storage.WriteQueryOptions,
) error {
writeQuery, err := storage.NewWriteQuery(opts)
if err != nil {
return err
}
return d.store.Write(ctx, writeQuery)
}

func (d *downsamplerAndWriter) WriteBatch(
ctx context.Context,
iter DownsampleAndWriteIter,
Expand Down Expand Up @@ -397,13 +402,20 @@ func (d *downsamplerAndWriter) WriteBatch(
p := p // Capture for lambda.
wg.Add(1)
d.workerPool.Go(func() {
err := d.writeWithOptions(ctx, storage.WriteQueryOptions{
// NB(r): Allocate the write query at the top
// of the pooled worker instead of need to pass
// the options down the stack which can cause
// the stack to grow (and sometimes cause stack splits).
writeQuery, err := storage.NewWriteQuery(storage.WriteQueryOptions{
Tags: value.Tags,
Datapoints: value.Datapoints,
Unit: value.Unit,
Annotation: value.Annotation,
Attributes: storageAttributesFromPolicy(p),
})
if err == nil {
err = d.store.Write(ctx, writeQuery)
}
if err != nil {
addError(err)
}
Expand Down
16 changes: 15 additions & 1 deletion src/cmd/services/m3query/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ var (
// By default, raise errors instead of truncating results so
// users do not experience see unexpected results.
defaultRequireExhaustive = true

defaultWriteWorkerPool = xconfig.WorkerPoolPolicy{
GrowOnDemand: true,
Size: 4096,
KillWorkerProbability: 0.001,
}
)

// Configuration is the configuration for the query service.
Expand Down Expand Up @@ -129,7 +135,7 @@ type Configuration struct {
ReadWorkerPool xconfig.WorkerPoolPolicy `yaml:"readWorkerPoolPolicy"`

// WriteWorkerPool is the worker pool policy for write requests.
WriteWorkerPool xconfig.WorkerPoolPolicy `yaml:"writeWorkerPoolPolicy"`
WriteWorkerPool *xconfig.WorkerPoolPolicy `yaml:"writeWorkerPoolPolicy"`

// WriteForwarding is the write forwarding options.
WriteForwarding WriteForwardingConfiguration `yaml:"writeForwarding"`
Expand Down Expand Up @@ -177,6 +183,14 @@ type Configuration struct {
Debug config.DebugConfiguration `yaml:"debug"`
}

// WriteWorkerPoolOrDefault returns the write worker pool config or default.
func (c Configuration) WriteWorkerPoolOrDefault() xconfig.WorkerPoolPolicy {
if c.WriteWorkerPool != nil {
return *c.WriteWorkerPool
}
return defaultWriteWorkerPool
}

// WriteForwardingConfiguration is the write forwarding configuration.
type WriteForwardingConfiguration struct {
PromRemoteWrite handleroptions.PromWriteHandlerForwardingOptions `yaml:"promRemoteWrite"`
Expand Down
18 changes: 9 additions & 9 deletions src/query/server/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ var (
},
}

defaultDownsamplerAndWriterWorkerPoolSize = 1024
defaultCarbonIngesterWorkerPoolSize = 1024
defaultPerCPUMultiProcess = 0.5
defaultCarbonIngesterWorkerPoolSize = 1024
defaultPerCPUMultiProcess = 0.5
)

type cleanupFn func() error
Expand Down Expand Up @@ -321,7 +320,7 @@ func Run(runOpts RunOptions) {
readWorkerPool, writeWorkerPool, err := pools.BuildWorkerPools(
instrumentOptions,
cfg.ReadWorkerPool,
cfg.WriteWorkerPool,
cfg.WriteWorkerPoolOrDefault(),
scope)
if err != nil {
logger.Fatal("could not create worker pools", zap.Error(err))
Expand Down Expand Up @@ -447,6 +446,7 @@ func Run(runOpts RunOptions) {
downsamplerAndWriter, err := newDownsamplerAndWriter(
backendStorage,
downsampler,
cfg.WriteWorkerPoolOrDefault(),
instrumentOptions,
)
if err != nil {
Expand Down Expand Up @@ -1122,15 +1122,15 @@ func startCarbonIngestion(
func newDownsamplerAndWriter(
storage storage.Storage,
downsampler downsample.Downsampler,
workerPoolPolicy xconfig.WorkerPoolPolicy,
iOpts instrument.Options,
) (ingest.DownsamplerAndWriter, error) {
// Make sure the downsampler and writer gets its own PooledWorkerPool and that its not shared with any other
// codepaths because PooledWorkerPools can deadlock if used recursively.
downAndWriterWorkerPoolOpts := xsync.NewPooledWorkerPoolOptions().
SetGrowOnDemand(true).
SetKillWorkerProbability(0.001)
downAndWriteWorkerPool, err := xsync.NewPooledWorkerPool(
defaultDownsamplerAndWriterWorkerPoolSize, downAndWriterWorkerPoolOpts)
downAndWriterWorkerPoolOpts, writePoolSize := workerPoolPolicy.Options()
downAndWriterWorkerPoolOpts = downAndWriterWorkerPoolOpts.SetInstrumentOptions(iOpts.
SetMetricsScope(iOpts.MetricsScope().SubScope("ingest-writer-worker-pool")))
downAndWriteWorkerPool, err := xsync.NewPooledWorkerPool(writePoolSize, downAndWriterWorkerPoolOpts)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8510dd7

Please sign in to comment.