-
Notifications
You must be signed in to change notification settings - Fork 455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[coordinator] Expand and add metrics + configurability of downsample and ingest worker pool #2797
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -294,13 +294,21 @@ func (d *downsamplerAndWriter) writeToStorage( | |
) error { | ||
storagePolicies, ok := d.writeOverrideStoragePolicies(overrides) | ||
if !ok { | ||
return d.writeWithOptions(ctx, storage.WriteQueryOptions{ | ||
// NB(r): Allocate the write query at the top | ||
// of the pooled worker instead of need to pass | ||
// the options down the stack which can cause | ||
// the stack to grow (and sometimes cause stack splits). | ||
writeQuery, err := storage.NewWriteQuery(storage.WriteQueryOptions{ | ||
Tags: tags, | ||
Datapoints: datapoints, | ||
Unit: unit, | ||
Annotation: annotation, | ||
Attributes: storageAttributesFromPolicy(unaggregatedStoragePolicy), | ||
}) | ||
if err != nil { | ||
return err | ||
} | ||
return d.store.Write(ctx, writeQuery) | ||
} | ||
|
||
var ( | ||
|
@@ -314,18 +322,26 @@ func (d *downsamplerAndWriter) writeToStorage( | |
|
||
wg.Add(1) | ||
d.workerPool.Go(func() { | ||
err := d.writeWithOptions(ctx, storage.WriteQueryOptions{ | ||
// NB(r): Allocate the write query at the top | ||
// of the pooled worker instead of need to pass | ||
// the options down the stack which can cause | ||
// the stack to grow (and sometimes cause stack splits). | ||
writeQuery, err := storage.NewWriteQuery(storage.WriteQueryOptions{ | ||
Tags: tags, | ||
Datapoints: datapoints, | ||
Unit: unit, | ||
Annotation: annotation, | ||
Attributes: storageAttributesFromPolicy(p), | ||
}) | ||
if err == nil { | ||
err = d.store.Write(ctx, writeQuery) | ||
} | ||
if err != nil { | ||
errLock.Lock() | ||
multiErr = multiErr.Add(err) | ||
errLock.Unlock() | ||
} | ||
|
||
wg.Done() | ||
}) | ||
} | ||
|
@@ -334,17 +350,6 @@ func (d *downsamplerAndWriter) writeToStorage( | |
return multiErr.FinalError() | ||
} | ||
|
||
func (d *downsamplerAndWriter) writeWithOptions( | ||
ctx context.Context, | ||
opts storage.WriteQueryOptions, | ||
) error { | ||
writeQuery, err := storage.NewWriteQuery(opts) | ||
if err != nil { | ||
return err | ||
} | ||
return d.store.Write(ctx, writeQuery) | ||
} | ||
|
||
func (d *downsamplerAndWriter) WriteBatch( | ||
ctx context.Context, | ||
iter DownsampleAndWriteIter, | ||
|
@@ -397,13 +402,20 @@ func (d *downsamplerAndWriter) WriteBatch( | |
p := p // Capture for lambda. | ||
wg.Add(1) | ||
d.workerPool.Go(func() { | ||
err := d.writeWithOptions(ctx, storage.WriteQueryOptions{ | ||
// NB(r): Allocate the write query at the top | ||
// of the pooled worker instead of need to pass | ||
// the options down the stack which can cause | ||
// the stack to grow (and sometimes cause stack splits). | ||
writeQuery, err := storage.NewWriteQuery(storage.WriteQueryOptions{ | ||
Tags: value.Tags, | ||
Datapoints: value.Datapoints, | ||
Unit: value.Unit, | ||
Annotation: value.Annotation, | ||
Attributes: storageAttributesFromPolicy(p), | ||
}) | ||
if err == nil { | ||
err = d.store.Write(ctx, writeQuery) | ||
} | ||
if err != nil { | ||
Comment on lines
+416
to
419
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: change to |
||
addError(err) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -85,6 +85,12 @@ var ( | |
// By default, raise errors instead of truncating results so | ||
// users do not experience see unexpected results. | ||
defaultRequireExhaustive = true | ||
|
||
defaultWriteWorkerPool = xconfig.WorkerPoolPolicy{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit; |
||
GrowOnDemand: true, | ||
Size: 4096, | ||
KillWorkerProbability: 0.001, | ||
} | ||
) | ||
|
||
// Configuration is the configuration for the query service. | ||
|
@@ -129,7 +135,7 @@ type Configuration struct { | |
ReadWorkerPool xconfig.WorkerPoolPolicy `yaml:"readWorkerPoolPolicy"` | ||
|
||
// WriteWorkerPool is the worker pool policy for write requests. | ||
WriteWorkerPool xconfig.WorkerPoolPolicy `yaml:"writeWorkerPoolPolicy"` | ||
WriteWorkerPool *xconfig.WorkerPoolPolicy `yaml:"writeWorkerPoolPolicy"` | ||
|
||
// WriteForwarding is the write forwarding options. | ||
WriteForwarding WriteForwardingConfiguration `yaml:"writeForwarding"` | ||
|
@@ -177,6 +183,14 @@ type Configuration struct { | |
Debug config.DebugConfiguration `yaml:"debug"` | ||
} | ||
|
||
// WriteWorkerPoolOrDefault returns the write worker pool config or default. | ||
func (c Configuration) WriteWorkerPoolOrDefault() xconfig.WorkerPoolPolicy { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: may be worth renaming these to |
||
if c.WriteWorkerPool != nil { | ||
return *c.WriteWorkerPool | ||
} | ||
return defaultWriteWorkerPool | ||
} | ||
|
||
// WriteForwardingConfiguration is the write forwarding configuration. | ||
type WriteForwardingConfiguration struct { | ||
PromRemoteWrite handleroptions.PromWriteHandlerForwardingOptions `yaml:"promRemoteWrite"` | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: change to
else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't change to else unfortunately since error reporting needs to happen for both code paths, this is a way to reuse the same error reporting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah misread my bad