From cc740f9554ed7345760353d7aa812a55b3a9c353 Mon Sep 17 00:00:00 2001 From: Bharat Pasupula <123897612+bhapas@users.noreply.github.com> Date: Mon, 24 Jul 2023 22:34:48 +0200 Subject: [PATCH] x-pack/filebeat/input/awss3 ; Fix nil hit panic when a getter is invoked on input metric (#36101) --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-aws-s3.asciidoc | 2 +- x-pack/filebeat/input/awss3/input.go | 38 +++++++++++++------ .../input/awss3/input_integration_test.go | 6 +-- x-pack/filebeat/input/awss3/metrics.go | 16 ++------ x-pack/filebeat/input/awss3/metrics_test.go | 32 ++++++++++++++++ x-pack/filebeat/input/awss3/sqs.go | 1 + 7 files changed, 68 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 14021299b72..f3e9a396e47 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -151,6 +151,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix handling of NUL-terminated log lines in Fortinet Firewall module. {issue}36026[36026] {pull}36027[36027] - Make redact field configuration recommended in CEL input and log warning if missing. {pull}36008[36008] - Fix handling of region name configuration in awss3 input {pull}36034[36034] +- Fix panic when sqs input metrics getter is invoked {pull}36101[36101] {issue}36077[36077] - Make CEL input's `now` global variable static for evaluation lifetime. {pull}36107[36107] *Heartbeat* diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 1df7f4bb341..794a51de081 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -797,7 +797,7 @@ observe the activity of the input. | `sqs_messages_inflight_gauge` | Number of SQS messages inflight (gauge). | `sqs_messages_returned_total` | Number of SQS message returned to queue (happens on errors implicitly after visibility timeout passes). | `sqs_messages_deleted_total` | Number of SQS messages deleted. -| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from GetQueueAttributes. +| `sqs_messages_waiting_gauge` | Number of SQS messages waiting in the SQS queue (gauge). The value is refreshed every minute via data from https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_GetQueueAttributes.html. A value of `-1` indicates the metric is uninitialized or could not be collected due to an error. | `sqs_worker_utilization` | Rate of SQS worker utilization over previous 5 seconds. 0 indicates idle, 1 indicates all workers utilized. | `sqs_message_processing_time` | Histogram of the elapsed SQS processing times in nanoseconds (time of receipt to time of delete/return). | `sqs_lag_time` | Histogram of the difference between the SQS SentTimestamp attribute and the time when the SQS message was received expressed in nanoseconds. diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 221084881f8..fa2bedfeebe 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -409,6 +409,12 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string { } func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { + // Run GetApproximateMessageCount before start of timer to set initial count for sqs waiting metric + // This is to avoid misleading values in metric when sqs messages are processed before the ticker channel kicks in + if shouldReturn := updateMessageCount(receiver, ctx); shouldReturn { + return + } + t := time.NewTicker(time.Minute) defer t.Stop() for { @@ -416,21 +422,31 @@ func pollSqsWaitingMetric(ctx context.Context, receiver *sqsReader) { case <-ctx.Done(): return case <-t.C: - count, err := receiver.GetApproximateMessageCount(ctx) - - var apiError smithy.APIError - if errors.As(err, &apiError) { - switch apiError.ErrorCode() { - case sqsAccessDeniedErrorCode: - // stop polling if auth error is encountered - receiver.metrics.setSQSMessagesWaiting(int64(count)) - return - } + if shouldReturn := updateMessageCount(receiver, ctx); shouldReturn { + return } + } + } +} - receiver.metrics.setSQSMessagesWaiting(int64(count)) +// updateMessageCount runs GetApproximateMessageCount for the given context and updates the receiver metric with the count returning false on no error +// If there is an error, the metric is reinitialized to -1 and true is returned +func updateMessageCount(receiver *sqsReader, ctx context.Context) bool { + count, err := receiver.GetApproximateMessageCount(ctx) + + var apiError smithy.APIError + if errors.As(err, &apiError) { + switch apiError.ErrorCode() { + case sqsAccessDeniedErrorCode: + // stop polling if auth error is encountered + // Set it back to -1 because there is a permission error + receiver.metrics.sqsMessagesWaiting.Set(int64(-1)) + return true } } + + receiver.metrics.sqsMessagesWaiting.Set(int64(count)) + return false } // boolPtr returns a pointer to b. diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 33f6f406776..c843e47e93f 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -187,12 +187,11 @@ func TestInputRunSQS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077 assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } func TestInputRunS3(t *testing.T) { @@ -426,10 +425,9 @@ func TestInputRunSNS(t *testing.T) { assert.EqualValues(t, s3Input.metrics.sqsMessagesDeletedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.sqsMessagesReturnedTotal.Get(), 1) // Invalid JSON is returned so that it can eventually be DLQed. assert.EqualValues(t, s3Input.metrics.sqsVisibilityTimeoutExtensionsTotal.Get(), 0) - //assert.EqualValues(t, s3Input.metrics.sqsMessagesWaiting.Get(), 0) - Issue created - https://github.com/elastic/beats/issues/36077 assert.EqualValues(t, s3Input.metrics.s3ObjectsInflight.Get(), 0) assert.EqualValues(t, s3Input.metrics.s3ObjectsRequestedTotal.Get(), 7) assert.EqualValues(t, s3Input.metrics.s3EventsCreatedTotal.Get(), 12) assert.Greater(t, s3Input.metrics.sqsLagTime.Mean(), 0.0) - //assert.Greater(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) - Issue created - https://github.com/elastic/beats/issues/36077 + assert.EqualValues(t, s3Input.metrics.sqsWorkerUtilization.Get(), 0.0) // Workers are reset after processing and hence utilization should be 0 at the end } diff --git a/x-pack/filebeat/input/awss3/metrics.go b/x-pack/filebeat/input/awss3/metrics.go index df535a2d473..bef57210ca6 100644 --- a/x-pack/filebeat/input/awss3/metrics.go +++ b/x-pack/filebeat/input/awss3/metrics.go @@ -79,18 +79,6 @@ func (m *inputMetrics) Close() { m.unregister() } -func (m *inputMetrics) setSQSMessagesWaiting(count int64) { - if m.sqsMessagesWaiting == nil { - // if metric not initialized, and count is -1, do nothing - if count == -1 { - return - } - m.sqsMessagesWaiting = monitoring.NewInt(m.registry, "sqs_messages_waiting_gauge") - } - - m.sqsMessagesWaiting.Set(count) -} - // beginSQSWorker tracks the start of a new SQS worker. The returned ID // must be used to call endSQSWorker when the worker finishes. It also // increments the sqsMessagesInflight counter. @@ -174,6 +162,7 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers sqsMessagesInflight: monitoring.NewUint(reg, "sqs_messages_inflight_gauge"), sqsMessagesReturnedTotal: monitoring.NewUint(reg, "sqs_messages_returned_total"), sqsMessagesDeletedTotal: monitoring.NewUint(reg, "sqs_messages_deleted_total"), + sqsMessagesWaiting: monitoring.NewInt(reg, "sqs_messages_waiting_gauge"), sqsWorkerUtilization: monitoring.NewFloat(reg, "sqs_worker_utilization"), sqsMessageProcessingTime: metrics.NewUniformSample(1024), sqsLagTime: metrics.NewUniformSample(1024), @@ -186,6 +175,9 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers s3ObjectsInflight: monitoring.NewUint(reg, "s3_objects_inflight_gauge"), s3ObjectProcessingTime: metrics.NewUniformSample(1024), } + + // Initializing the sqs_messages_waiting_gauge value to -1 so that we can distinguish between no messages waiting (0) and never collected / error collecting (-1). + out.sqsMessagesWaiting.Set(int64(-1)) adapter.NewGoMetrics(reg, "sqs_message_processing_time", adapter.Accept). Register("histogram", metrics.NewHistogram(out.sqsMessageProcessingTime)) //nolint:errcheck // A unique namespace is used so name collisions are impossible. adapter.NewGoMetrics(reg, "sqs_lag_time", adapter.Accept). diff --git a/x-pack/filebeat/input/awss3/metrics_test.go b/x-pack/filebeat/input/awss3/metrics_test.go index fc39786cf0b..e153d321e9f 100644 --- a/x-pack/filebeat/input/awss3/metrics_test.go +++ b/x-pack/filebeat/input/awss3/metrics_test.go @@ -29,6 +29,38 @@ func TestInputMetricsClose(t *testing.T) { }) } +// TestNewInputMetricsInstance asserts that all the metrics are initialized +// when a newInputMetrics method is invoked. This avoids nil hit panics when +// a getter is invoked on any uninitialized metric. +func TestNewInputMetricsInstance(t *testing.T) { + reg := monitoring.NewRegistry() + metrics := newInputMetrics("some-new-metric-test", reg, 1) + + assert.NotNil(t, metrics.sqsMessagesWaiting, + metrics.sqsMaxMessagesInflight, + metrics.sqsWorkerStartTimes, + metrics.sqsWorkerUtilizationLastUpdate, + metrics.sqsMessagesReceivedTotal, + metrics.sqsVisibilityTimeoutExtensionsTotal, + metrics.sqsMessagesInflight, + metrics.sqsMessagesReturnedTotal, + metrics.sqsMessagesDeletedTotal, + metrics.sqsMessagesWaiting, + metrics.sqsWorkerUtilization, + metrics.sqsMessageProcessingTime, + metrics.sqsLagTime, + metrics.s3ObjectsRequestedTotal, + metrics.s3ObjectsAckedTotal, + metrics.s3ObjectsListedTotal, + metrics.s3ObjectsProcessedTotal, + metrics.s3BytesProcessedTotal, + metrics.s3EventsCreatedTotal, + metrics.s3ObjectsInflight, + metrics.s3ObjectProcessingTime) + + assert.Equal(t, int64(-1), metrics.sqsMessagesWaiting.Get()) +} + func TestInputMetricsSQSWorkerUtilization(t *testing.T) { const interval = 5000 diff --git a/x-pack/filebeat/input/awss3/sqs.go b/x-pack/filebeat/input/awss3/sqs.go index 01ed8bfb183..dd454a3bfb9 100644 --- a/x-pack/filebeat/input/awss3/sqs.go +++ b/x-pack/filebeat/input/awss3/sqs.go @@ -80,6 +80,7 @@ func (r *sqsReader) Receive(ctx context.Context) error { r.log.Debugf("Received %v SQS messages.", len(msgs)) r.metrics.sqsMessagesReceivedTotal.Add(uint64(len(msgs))) workerWg.Add(len(msgs)) + for _, msg := range msgs { go func(msg types.Message, start time.Time) { id := r.metrics.beginSQSWorker()