Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add flag to enable span size metrics reporting #3782

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions cmd/collector/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
)

const (
flagDynQueueSizeMemory = "collector.queue-size-memory"
flagNumWorkers = "collector.num-workers"
flagQueueSize = "collector.queue-size"
flagCollectorTags = "collector.tags"
flagDynQueueSizeMemory = "collector.queue-size-memory"
flagNumWorkers = "collector.num-workers"
flagQueueSize = "collector.queue-size"
flagCollectorTags = "collector.tags"
flagSpanSizeMetricsEnabled = "collector.enable-span-size-metrics"

flagSuffixHostPort = "host-port"

Expand Down Expand Up @@ -124,6 +125,8 @@ type CollectorOptions struct {
}
// CollectorTags is the string representing collector tags to append to each and every span
CollectorTags map[string]string
// SpanSizeMetricsEnabled determines whether to enable metrics based on processed span size
SpanSizeMetricsEnabled bool
}

type serverFlagsConfig struct {
Expand Down Expand Up @@ -163,6 +166,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(flagQueueSize, DefaultQueueSize, "The queue size of the collector")
flags.Uint(flagDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.")
flags.String(flagCollectorTags, "", "One or more tags to be added to the Process tags of all spans passing through this collector. Ex: key1=value1,key2=${envVar:defaultValue}")
flags.Bool(flagSpanSizeMetricsEnabled, false, "Enables metrics based on processed span size, which are more expensive to calculate.")

addHTTPFlags(flags, httpServerFlagsCfg, ports.PortToHostPort(ports.CollectorHTTP))
addGRPCFlags(flags, grpcServerFlagsCfg, ports.PortToHostPort(ports.CollectorGRPC))
Expand Down Expand Up @@ -239,6 +243,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper, logger *zap.Logger)
cOpts.NumWorkers = v.GetInt(flagNumWorkers)
cOpts.QueueSize = v.GetInt(flagQueueSize)
cOpts.DynQueueSizeMemory = v.GetUint(flagDynQueueSizeMemory) * 1024 * 1024 // we receive in MiB and store in bytes
cOpts.SpanSizeMetricsEnabled = v.GetBool(flagSpanSizeMetricsEnabled)

if err := cOpts.HTTP.initFromViper(v, logger, httpServerFlagsCfg); err != nil {
return cOpts, fmt.Errorf("failed to parse HTTP server options: %w", err)
Expand Down
38 changes: 23 additions & 15 deletions cmd/collector/app/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,22 @@ import (
)

type options struct {
logger *zap.Logger
serviceMetrics metrics.Factory
hostMetrics metrics.Factory
preProcessSpans ProcessSpans // see docs in PreProcessSpans option.
sanitizer sanitizer.SanitizeSpan
preSave ProcessSpan
spanFilter FilterSpan
numWorkers int
blockingSubmit bool
queueSize int
dynQueueSizeWarmup uint
dynQueueSizeMemory uint
reportBusy bool
extraFormatTypes []processor.SpanFormat
collectorTags map[string]string
logger *zap.Logger
serviceMetrics metrics.Factory
hostMetrics metrics.Factory
preProcessSpans ProcessSpans // see docs in PreProcessSpans option.
sanitizer sanitizer.SanitizeSpan
preSave ProcessSpan
spanFilter FilterSpan
numWorkers int
blockingSubmit bool
queueSize int
dynQueueSizeWarmup uint
dynQueueSizeMemory uint
reportBusy bool
extraFormatTypes []processor.SpanFormat
collectorTags map[string]string
spanSizeMetricsEnabled bool
}

// Option is a function that sets some option on StorageBuilder.
Expand Down Expand Up @@ -156,6 +157,13 @@ func (options) CollectorTags(extraTags map[string]string) Option {
}
}

// SpanSizeMetricsEnabled creates an Option that initializes the spanSizeMetrics boolean
func (options) SpanSizeMetricsEnabled(spanSizeMetrics bool) Option {
return func(b *options) {
b.spanSizeMetricsEnabled = spanSizeMetrics
}
}

func (o options) apply(opts ...Option) options {
ret := options{}
for _, opt := range opts {
Expand Down
3 changes: 3 additions & 0 deletions cmd/collector/app/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,14 @@ func TestAllOptionSet(t *testing.T) {
Options.DynQueueSizeMemory(1024),
Options.PreSave(func(span *model.Span, tenant string) {}),
Options.CollectorTags(map[string]string{"extra": "tags"}),
Options.SpanSizeMetricsEnabled(true),
)
assert.EqualValues(t, 5, opts.numWorkers)
assert.EqualValues(t, 10, opts.queueSize)
assert.EqualValues(t, map[string]string{"extra": "tags"}, opts.collectorTags)
assert.EqualValues(t, 1000, opts.dynQueueSizeWarmup)
assert.EqualValues(t, 1024, opts.dynQueueSizeMemory)
assert.True(t, opts.spanSizeMetricsEnabled)
}

func TestNoOptionsSet(t *testing.T) {
Expand All @@ -66,4 +68,5 @@ func TestNoOptionsSet(t *testing.T) {
span := model.Span{}
assert.EqualValues(t, &span, opts.sanitizer(&span))
assert.EqualValues(t, 0, opts.dynQueueSizeWarmup)
assert.False(t, opts.spanSizeMetricsEnabled)
}
1 change: 1 addition & 0 deletions cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) proce
Options.CollectorTags(b.CollectorOpts.CollectorTags),
Options.DynQueueSizeWarmup(uint(b.CollectorOpts.QueueSize)), // same as queue size for now
Options.DynQueueSizeMemory(b.CollectorOpts.DynQueueSizeMemory),
Options.SpanSizeMetricsEnabled(b.CollectorOpts.SpanSizeMetricsEnabled),
)
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt

processSpanFuncs := []ProcessSpan{options.preSave, sp.saveSpan}
yurishkuro marked this conversation as resolved.
Show resolved Hide resolved
if options.dynQueueSizeMemory > 0 {
// add to processSpanFuncs
options.logger.Info("Dynamically adjusting the queue size at runtime.",
zap.Uint("memory-mib", options.dynQueueSizeMemory/1024/1024),
zap.Uint("queue-size-warmup", options.dynQueueSizeWarmup))
}
if options.dynQueueSizeMemory > 0 || options.spanSizeMetricsEnabled {
// add to processSpanFuncs
processSpanFuncs = append(processSpanFuncs, sp.countSpan)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in #2126 (comment) , sp.spansProcessed.Inc() could be independent but I didn't change this for simplicity.

}

Expand Down
81 changes: 64 additions & 17 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,27 +400,74 @@ func TestSpanProcessorWithCollectorTags(t *testing.T) {
}

func TestSpanProcessorCountSpan(t *testing.T) {
mb := metricstest.NewFactory(time.Hour)
m := mb.Namespace(metrics.NSOptions{})
tests := []struct {
name string
enableDynQueueSizeMem bool
enableSpanMetrics bool
expectedUpdateGauge bool
}{
{
name: "enable-dyn-queue-size-enable-metrics",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
name: "enable-dyn-queue-size-enable-metrics",
name: "enable dyn-queue-size, enable metrics",

nit: remove dashes for better readability, separate with commas

enableDynQueueSizeMem: true,
enableSpanMetrics: true,
expectedUpdateGauge: true,
},
{
name: "enable-dyn-queue-size-disable-metrics",
enableDynQueueSizeMem: true,
enableSpanMetrics: false,
expectedUpdateGauge: true,
},
{
name: "disable-dyn-queue-size-enable-metrics",
enableDynQueueSizeMem: false,
enableSpanMetrics: true,
expectedUpdateGauge: true,
},
{
name: "disable-dyn-queue-size-disable-metrics",
enableDynQueueSizeMem: false,
enableSpanMetrics: false,
expectedUpdateGauge: false,
},
}

w := &fakeSpanWriter{}
p := NewSpanProcessor(w, nil, Options.HostMetrics(m), Options.DynQueueSizeMemory(1000)).(*spanProcessor)
p.background(10*time.Millisecond, p.updateGauges)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mb := metricstest.NewFactory(time.Hour)
m := mb.Namespace(metrics.NSOptions{})

p.processSpan(&model.Span{}, "")
assert.NotEqual(t, uint64(0), p.bytesProcessed)
w := &fakeSpanWriter{}
opts := []Option{Options.HostMetrics(m), Options.SpanSizeMetricsEnabled(tt.enableSpanMetrics)}
if tt.enableDynQueueSizeMem {
opts = append(opts, Options.DynQueueSizeMemory(1000))
} else {
opts = append(opts, Options.DynQueueSizeMemory(0))
}
p := NewSpanProcessor(w, nil, opts...).(*spanProcessor)
p.background(10*time.Millisecond, p.updateGauges)

p.processSpan(&model.Span{}, "")
assert.NotEqual(t, uint64(0), p.bytesProcessed)

for i := 0; i < 15; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for i := 0; i < 15; i++ {
for i := 0; i < 10000; i++ {

doesn't affect test speed in most cases, but makes it more resilient in case of VM freezes

_, g := mb.Snapshot()
if b := g["spans.bytes"]; b > 0 {
if !tt.expectedUpdateGauge {
assert.Fail(t, "gauge has been updated unexpectedly")
}
assert.Equal(t, p.bytesProcessed.Load(), uint64(g["spans.bytes"]))
return
}
time.Sleep(time.Millisecond)
}

for i := 0; i < 15; i++ {
_, g := mb.Snapshot()
if b := g["spans.bytes"]; b > 0 {
assert.Equal(t, p.bytesProcessed.Load(), uint64(g["spans.bytes"]))
return
}
time.Sleep(time.Millisecond)
if tt.expectedUpdateGauge {
assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time")
}
assert.NoError(t, p.Close())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this won't be executed if you return in L460. I suggest using defer to close p.

})
}

assert.Fail(t, "gauge hasn't been updated within a reasonable amount of time")
assert.NoError(t, p.Close())
}

func TestUpdateDynQueueSize(t *testing.T) {
Expand Down