Skip to content

Commit

Permalink
Adding aws-s3 metric for sqs worker utilization (#34793)
Browse files Browse the repository at this point in the history
  • Loading branch information
kgeller authored Apr 3, 2023
1 parent c4f86d6 commit 91906c9
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ automatic splitting at root level, if root level element is an array. {pull}3415
- Add Basic Authentication support on constructed requests to CEL input {issue}34609[34609] {pull}34689[34689]
- Add string manipulation extensions to CEL input {issue}34610[34610] {pull}34689[34689]
- Add unix socket log parsing for nginx ingress_controller {pull}34732[34732]
- Added metric `sqs_worker_utilization` for aws-s3 input. {pull}34793[34793]
- Improve CEL input documentation {pull}34831[34831]
- Add metrics documentation for CEL and AWS CloudWatch inputs. {issue}34887[34887] {pull}34889[34889]
- Register MIME handlers for CSV types in CEL input. {pull}34934[34934]
Expand Down
1 change: 1 addition & 0 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -738,6 +738,7 @@ observe the activity of the input.
| `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
| `sqs_messages_deleted_total` | Number of SQS messages deleted.
| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
| `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized.
| `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
| `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
| `s3_objects_requested_total` | Number of S3 objects downloaded.
Expand Down
12 changes: 6 additions & 6 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}
defer receiver.metrics.Close()

// Poll sqs waiting metric periodically in the background.
// Poll metrics periodically in the background
go pollSqsWaitingMetric(ctx, receiver)

if err := receiver.Receive(ctx); err != nil {
Expand Down Expand Up @@ -208,9 +208,9 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s
if err != nil {
return nil, err
}
in.metrics = newInputMetrics(ctx.ID, nil)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory)
in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), in.metrics, sqsAPI, script, in.config.VisibilityTimeout, in.config.SQSMaxReceiveCount, pipeline, s3EventHandlerFactory, in.config.MaxNumberOfMessages)
sqsReader := newSQSReader(log.Named("sqs"), in.metrics, sqsAPI, in.config.MaxNumberOfMessages, sqsMessageHandler)

return sqsReader, nil
Expand Down Expand Up @@ -281,8 +281,8 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
if len(in.config.FileSelectors) == 0 {
fileSelectors = []fileSelectorConfig{{ReaderConfig: in.config.ReaderConfig}}
}
in.metrics = newInputMetrics(ctx.ID, nil)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig)
in.metrics = newInputMetrics(ctx.ID, nil, in.config.MaxNumberOfMessages)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), in.metrics, s3API, fileSelectors, in.config.BackupConfig, in.config.MaxNumberOfMessages)
s3Poller := newS3Poller(log.Named("s3_poller"),
in.metrics,
s3API,
Expand Down
10 changes: 5 additions & 5 deletions x-pack/filebeat/input/awss3/input_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,14 +211,14 @@ func benchmarkInputSQS(t *testing.T, maxMessagesInflight int) testing.BenchmarkR
return testing.Benchmark(func(b *testing.B) {
log := logp.NewLogger(inputName)
metricRegistry := monitoring.NewRegistry()
metrics := newInputMetrics("test_id", metricRegistry)
metrics := newInputMetrics("test_id", metricRegistry, maxMessagesInflight)
sqsAPI := newConstantSQS()
s3API := newConstantS3(t)
pipeline := &fakePipeline{}
conf := makeBenchmarkConfig(t)

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{})
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory)
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, conf.FileSelectors, backupConfig{}, maxMessagesInflight)
sqsMessageHandler := newSQSS3EventProcessor(log.Named("sqs_s3_event"), metrics, sqsAPI, nil, time.Minute, 5, pipeline, s3EventHandlerFactory, maxMessagesInflight)
sqsReader := newSQSReader(log.Named("sqs"), metrics, sqsAPI, maxMessagesInflight, sqsMessageHandler)

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -302,7 +302,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
log.Infof("benchmark with %d number of workers", numberOfWorkers)

metricRegistry := monitoring.NewRegistry()
metrics := newInputMetrics("test_id", metricRegistry)
metrics := newInputMetrics("test_id", metricRegistry, numberOfWorkers)

client := pubtest.NewChanClientWithCallback(100, func(event beat.Event) {
event.Private.(*awscommon.EventACKTracker).ACK()
Expand Down Expand Up @@ -348,7 +348,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
return
}

s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{})
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, config.FileSelectors, backupConfig{}, numberOfWorkers)
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, client, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", listPrefix, "region", "provider", numberOfWorkers, time.Second)

if err := s3Poller.Poll(ctx); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ func TestInputRunSQS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0)
}

func TestInputRunS3(t *testing.T) {
Expand Down Expand Up @@ -430,4 +431,5 @@ func TestInputRunSNS(t *testing.T) {
assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7)
assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12)
assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0)
assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0)
}
121 changes: 111 additions & 10 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,46 @@
package awss3

import (
"context"
"io"
"math"
"sync"
"time"

"github.com/rcrowley/go-metrics"

"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
"github.com/elastic/elastic-agent-libs/monitoring"
"github.com/elastic/elastic-agent-libs/monitoring/adapter"
"github.com/elastic/go-concert/timed"
)

// currentTime returns the current time. This exists to allow unit tests
// simulate the passage of time.
var currentTime = time.Now

type inputMetrics struct {
registry *monitoring.Registry
unregister func()
ctx context.Context // ctx signals when to stop the sqs worker utilization goroutine.
cancel context.CancelFunc // cancel cancels the ctx context.

sqsMaxMessagesInflight int // Maximum number of SQS workers allowed.
sqsWorkerUtilizationMutex sync.Mutex // Guards the sqs worker utilization fields.
sqsWorkerUtilizationLastUpdate time.Time // Time of the last SQS worker utilization calculation.
sqsWorkerUtilizationCurrentPeriod time.Duration // Elapsed execution duration of any SQS workers that completed during the current period.
sqsWorkerIDCounter uint64 // Counter used to assigned unique IDs to SQS workers.
sqsWorkerStartTimes map[uint64]time.Time // Map of SQS worker ID to the time at which the worker started.

sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully).
sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions.
sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge).
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.
sqsMessagesReceivedTotal *monitoring.Uint // Number of SQS messages received (not necessarily processed fully).
sqsVisibilityTimeoutExtensionsTotal *monitoring.Uint // Number of SQS visibility timeout extensions.
sqsMessagesInflight *monitoring.Uint // Number of SQS messages inflight (gauge).
sqsMessagesReturnedTotal *monitoring.Uint // Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes).
sqsMessagesDeletedTotal *monitoring.Uint // Number of SQS messages deleted.
sqsMessagesWaiting *monitoring.Int // Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes.
sqsWorkerUtilization *monitoring.Float // Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized.
sqsMessageProcessingTime metrics.Sample // Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return).
sqsLagTime metrics.Sample // Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds.

s3ObjectsRequestedTotal *monitoring.Uint // Number of S3 objects downloaded.
s3ObjectsAckedTotal *monitoring.Uint // Number of S3 objects processed that were fully ACKed.
Expand All @@ -37,8 +56,9 @@ type inputMetrics struct {
s3ObjectProcessingTime metrics.Sample // Histogram of the elapsed S3 object processing times in nanoseconds (start of download to completion of parsing).
}

// Close removes the metrics from the registry.
// Close cancels the context and removes the metrics from the registry.
func (m *inputMetrics) Close() {
m.cancel()
m.unregister()
}

Expand All @@ -54,17 +74,90 @@ func (m *inputMetrics) setSQSMessagesWaiting(count int64) {
m.sqsMessagesWaiting.Set(count)
}

func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetrics {
// beginSQSWorker tracks the start of a new SQS worker. The returned ID
// must be used to call endSQSWorker when the worker finishes. It also
// increments the sqsMessagesInflight counter.
func (m *inputMetrics) beginSQSWorker() (id uint64) {
m.sqsMessagesInflight.Inc()

m.sqsWorkerUtilizationMutex.Lock()
defer m.sqsWorkerUtilizationMutex.Unlock()
m.sqsWorkerIDCounter++
m.sqsWorkerStartTimes[m.sqsWorkerIDCounter] = currentTime()
return m.sqsWorkerIDCounter
}

// endSQSWorker is used to signal that the specified worker has
// finished. This is used update the SQS worker utilization metric.
// It also decrements the sqsMessagesInflight counter and
// sqsMessageProcessingTime histogram.
func (m *inputMetrics) endSQSWorker(id uint64) {
m.sqsMessagesInflight.Dec()

m.sqsWorkerUtilizationMutex.Lock()
defer m.sqsWorkerUtilizationMutex.Unlock()
now := currentTime()
start := m.sqsWorkerStartTimes[id]
delete(m.sqsWorkerStartTimes, id)
m.sqsMessageProcessingTime.Update(now.Sub(start).Nanoseconds())
if start.Before(m.sqsWorkerUtilizationLastUpdate) {
m.sqsWorkerUtilizationCurrentPeriod += now.Sub(m.sqsWorkerUtilizationLastUpdate)
} else {
m.sqsWorkerUtilizationCurrentPeriod += now.Sub(start)
}
}

// updateSqsWorkerUtilization updates the sqsWorkerUtilization metric.
// This is invoked periodically to compute the utilization level
// of the SQS workers. 0 indicates no workers were utilized during
// the period. And 1 indicates that all workers fully utilized
// during the period.
func (m *inputMetrics) updateSqsWorkerUtilization() {
m.sqsWorkerUtilizationMutex.Lock()
defer m.sqsWorkerUtilizationMutex.Unlock()

now := currentTime()
lastPeriodDuration := now.Sub(m.sqsWorkerUtilizationLastUpdate)
maxUtilization := float64(m.sqsMaxMessagesInflight) * lastPeriodDuration.Seconds()

for _, startTime := range m.sqsWorkerStartTimes {
// If the worker started before the current period then only compute
// from elapsed time since the last update. Otherwise, it started
// during the current period so compute time elapsed since it started.
if startTime.Before(m.sqsWorkerUtilizationLastUpdate) {
m.sqsWorkerUtilizationCurrentPeriod += lastPeriodDuration
} else {
m.sqsWorkerUtilizationCurrentPeriod += now.Sub(startTime)
}
}

utilization := math.Round(m.sqsWorkerUtilizationCurrentPeriod.Seconds()/maxUtilization*1000) / 1000
if utilization > 1 {
utilization = 1
}
m.sqsWorkerUtilization.Set(utilization)
m.sqsWorkerUtilizationCurrentPeriod = 0
m.sqsWorkerUtilizationLastUpdate = now
}

func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers int) *inputMetrics {
reg, unreg := inputmon.NewInputRegistry(inputName, id, optionalParent)
ctx, cancel := context.WithCancel(context.Background())

out := &inputMetrics{
registry: reg,
unregister: unreg,
ctx: ctx,
cancel: cancel,
sqsMaxMessagesInflight: maxWorkers,
sqsWorkerStartTimes: map[uint64]time.Time{},
sqsWorkerUtilizationLastUpdate: currentTime(),
sqsMessagesReceivedTotal: monitoring.NewUint(reg, "sqs_messages_received_total"),
sqsVisibilityTimeoutExtensionsTotal: monitoring.NewUint(reg, "sqs_visibility_timeout_extensions_total"),
sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"),
sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"),
sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"),
sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"),
sqsMessageProcessingTime: metrics.NewUniformSample(1024),
sqsLagTime: metrics.NewUniformSample(1024),
s3ObjectsRequestedTotal: monitoring.NewUint(reg, "s3_objects_requested_total"),
Expand All @@ -82,6 +175,14 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry) *inputMetri
Register("histogram", metrics.NewHistogram(out.sqsLagTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.
adapter.NewGoMetrics(reg, "s3_object_processing_time", adapter.Accept).
Register("histogram", metrics.NewHistogram(out.s3ObjectProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible.

// Periodically update the sqs worker utilization metric.
//nolint:errcheck // This never returns an error.
go timed.Periodic(ctx, 5*time.Second, func() error {
out.updateSqsWorkerUtilization()
return nil
})

return out
}

Expand Down
79 changes: 78 additions & 1 deletion x-pack/filebeat/input/awss3/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ package awss3

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/elastic-agent-libs/monitoring"
)

Expand All @@ -17,10 +21,83 @@ import (
func TestInputMetricsClose(t *testing.T) {
reg := monitoring.NewRegistry()

metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg)
metrics := newInputMetrics("aws-s3-aws.cloudfront_logs-8b312b5f-9f99-492c-b035-3dff354a1f01", reg, 1)
metrics.Close()

reg.Do(monitoring.Full, func(s string, _ interface{}) {
t.Errorf("registry should be empty, but found %v", s)
})
}

func TestInputMetricsSQSWorkerUtilization(t *testing.T) {
const interval = 5000

t.Run("worker ends before one interval", func(t *testing.T) {
fakeTimeMs.Store(0)
defer useFakeCurrentTimeThenReset()()

reg := monitoring.NewRegistry()
metrics := newInputMetrics("test", reg, 1)
metrics.Close()

id := metrics.beginSQSWorker()
fakeTimeMs.Add(2500)
metrics.endSQSWorker(id)

fakeTimeMs.Store(1 * interval)
metrics.updateSqsWorkerUtilization()
assert.Equal(t, 0.5, metrics.sqsWorkerUtilization.Get())
})
t.Run("worker ends mid interval", func(t *testing.T) {
fakeTimeMs.Store(0)
defer useFakeCurrentTimeThenReset()()

reg := monitoring.NewRegistry()
metrics := newInputMetrics("test", reg, 1)
metrics.Close()

fakeTimeMs.Add(4000)
id := metrics.beginSQSWorker()

fakeTimeMs.Store(1 * interval)
metrics.updateSqsWorkerUtilization()

fakeTimeMs.Add(1000)
metrics.endSQSWorker(id)

fakeTimeMs.Store(2 * interval)
metrics.updateSqsWorkerUtilization()
assert.Equal(t, 0.2, metrics.sqsWorkerUtilization.Get())
})
t.Run("running worker goes longer than an interval", func(t *testing.T) {
fakeTimeMs.Store(0)
defer useFakeCurrentTimeThenReset()()

reg := monitoring.NewRegistry()
metrics := newInputMetrics("test", reg, 1)
metrics.Close()

id := metrics.beginSQSWorker()

fakeTimeMs.Store(1 * interval)
metrics.updateSqsWorkerUtilization()
assert.Equal(t, 1.0, metrics.sqsWorkerUtilization.Get())

fakeTimeMs.Store(2 * interval)
metrics.updateSqsWorkerUtilization()
assert.Equal(t, 1.0, metrics.sqsWorkerUtilization.Get())

metrics.endSQSWorker(id)
})
}

var fakeTimeMs = &atomic.Int64{}

func useFakeCurrentTimeThenReset() (reset func()) {
currentTime = func() time.Time {
return time.UnixMilli(fakeTimeMs.Load())
}
return func() {
currentTime = time.Now
}
}
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/awss3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func newS3Poller(log *logp.Logger,
bucketPollInterval time.Duration,
) *s3Poller {
if metrics == nil {
metrics = newInputMetrics("", monitoring.NewRegistry())
metrics = newInputMetrics("", monitoring.NewRegistry(), numberOfWorkers)
}
return &s3Poller{
numberOfWorkers: numberOfWorkers,
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ type s3ObjectProcessorFactory struct {
backupConfig backupConfig
}

func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig) *s3ObjectProcessorFactory {
func newS3ObjectProcessorFactory(log *logp.Logger, metrics *inputMetrics, s3 s3API, sel []fileSelectorConfig, backupConfig backupConfig, maxWorkers int) *s3ObjectProcessorFactory {
if metrics == nil {
metrics = newInputMetrics("", monitoring.NewRegistry())
metrics = newInputMetrics("", monitoring.NewRegistry(), maxWorkers)
}
if len(sel) == 0 {
sel = []fileSelectorConfig{
Expand Down
Loading

0 comments on commit 91906c9

Please sign in to comment.