Skip to content

Commit

Permalink
[filebeat][gcs] - Added support for retry config (elastic#41862)
Browse files Browse the repository at this point in the history
* Added support for retry config along with necessary documentation and tests
  • Loading branch information
ShourieG authored Dec 4, 2024
1 parent 91070bf commit 3f51793
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]

*Auditbeat*
Expand Down
40 changes: 40 additions & 0 deletions x-pack/filebeat/docs/inputs/input-gcs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Now let's explore the configuration attributes a bit more elaborately.
11. <<attrib-file_selectors-gcs,file_selectors>>
12. <<attrib-expand_event_list_from_field-gcs,expand_event_list_from_field>>
13. <<attrib-timestamp_epoch-gcs,timestamp_epoch>>
14. <<attrib-retry-gcs,retry>>


[id="attrib-project-id"]
Expand Down Expand Up @@ -397,6 +398,45 @@ filebeat.inputs:

The GCS APIs don't provide a direct way to filter files based on the timestamp, so the input will download all the files and then filter them based on the timestamp. This can cause a bottleneck in processing if the number of files are very high. It is recommended to use this attribute only when the number of files are limited or ample resources are available. This option scales vertically and not horizontally.

[id="attrib-retry-gcs"]
[float]
==== `retry`

This attribute can be used to configure a list of sub attributes that directly control how the input should behave when a download for a file/object fails or gets interrupted.

- `max_attempts`: This attribute defines the maximum number of retry attempts(including the initial api call) that should be attempted for a retryable error. The default value for this is `3`.
- `initial_backoff_duration`: This attribute defines the initial backoff time. The default value for this is `1s`.
- `max_backoff_duration`: This attribute defines the maximum backoff time. The default value for this is `30s`.
- `backoff_multiplier`: This attribute defines the backoff multiplication factor. The default value for this is `2`.

NOTE: The `initial_backoff_duration` and `max_backoff_duration` attributes must have time units. Valid time units are `ns`, `us` (or `µs`), `ms`, `s`, `m`, `h`.

By configuring these attributes, the user is given the flexibility to control how the input should behave when a download fails or gets interrupted. This attribute can only be
specified at the root level of the configuration and not at the bucket level. It applies uniformly to all the buckets.

An example configuration is given below :-

[source, yml]
----
filebeat.inputs:
- type: gcs
project_id: my_project_id
auth.credentials_file.path: {{file_path}}/{{creds_file_name}}.json
retry:
max_attempts: 3
initial_backoff_duration: 2s
max_backoff_duration: 60s
backoff_multiplier: 2
buckets:
- name: obs-bucket
max_workers: 3
poll: true
poll_interval: 11m
bucket_timeout: 10m
----

When configuring the `retry` attribute, the user should consider the `bucket_timeout` value. The `retry` attribute should be configured in such a way that the retries are completed within the `bucket_timeout` window. If the `retry` attribute is configured in such a way that the retries are not completed successfully within the `bucket_timeout` window, the input will suffer a `context timeout` for that specific object/file which it was retrying. This can cause gaps in ingested data to pile up over time.

[id="bucket-overrides"]
*The sample configs below will explain the bucket level overriding of attributes a bit further :-*

Expand Down
21 changes: 21 additions & 0 deletions x-pack/filebeat/input/gcs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type config struct {
ExpandEventListFromField string `config:"expand_event_list_from_field"`
// This field is only used for system test purposes, to override the HTTP endpoint.
AlternativeHost string `config:"alternative_host"`
// Retry - Defines the retry configuration for the input.
Retry retryConfig `config:"retry"`
}

// bucket contains the config for each specific object storage bucket in the root account
Expand Down Expand Up @@ -92,6 +94,19 @@ type jsonCredentialsConfig struct {
AccountKey string `config:"account_key"`
}

type retryConfig struct {
// MaxAttempts configures the maximum number of times an API call can be made in the case of retryable errors.
// For example, if you set MaxAttempts(5), the operation will be attempted up to 5 times total (initial call plus 4 retries).
// If you set MaxAttempts(1), the operation will be attempted only once and there will be no retries. This setting defaults to 3.
MaxAttempts int `config:"max_attempts" validate:"min=1"`
// InitialBackOffDuration is the initial value of the retry period, defaults to 1 second.
InitialBackOffDuration time.Duration `config:"initial_backoff_duration" validate:"min=1"`
// MaxBackOffDuration is the maximum value of the retry period, defaults to 30 seconds.
MaxBackOffDuration time.Duration `config:"max_backoff_duration" validate:"min=2"`
// BackOffMultiplier is the factor by which the retry period increases. It should be greater than 1 and defaults to 2.
BackOffMultiplier float64 `config:"backoff_multiplier" validate:"min=1.1"`
}

func (c authConfig) Validate() error {
// credentials_file
if c.CredentialsFile != nil {
Expand Down Expand Up @@ -126,5 +141,11 @@ func defaultConfig() config {
PollInterval: 5 * time.Minute,
BucketTimeOut: 120 * time.Second,
ParseJSON: false,
Retry: retryConfig{
MaxAttempts: 3,
InitialBackOffDuration: time.Second,
MaxBackOffDuration: 30 * time.Second,
BackOffMultiplier: 2,
},
}
}
7 changes: 6 additions & 1 deletion x-pack/filebeat/input/gcs/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) {
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
ReaderConfig: bucket.ReaderConfig,
Retry: config.Retry,
})
}

Expand Down Expand Up @@ -158,9 +159,13 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source,
}

bucket := client.Bucket(currentSource.BucketName).Retryer(
// Use WithMaxAttempts to change the maximum number of attempts.
storage.WithMaxAttempts(currentSource.Retry.MaxAttempts),
// Use WithBackoff to change the timing of the exponential backoff.
storage.WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Initial: currentSource.Retry.InitialBackOffDuration,
Max: currentSource.Retry.MaxBackOffDuration,
Multiplier: currentSource.Retry.BackOffMultiplier,
}),
// RetryAlways will retry the operation even if it is non-idempotent.
// Since we are only reading, the operation is always idempotent
Expand Down
8 changes: 6 additions & 2 deletions x-pack/filebeat/input/gcs/input_stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package gcs

import (
"context"
"time"

"cloud.google.com/go/storage"
gax "github.com/googleapis/gax-go/v2"
Expand Down Expand Up @@ -64,6 +63,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
ExpandEventListFromField: bucket.ExpandEventListFromField,
FileSelectors: bucket.FileSelectors,
ReaderConfig: bucket.ReaderConfig,
Retry: in.config.Retry,
}

st := newState()
Expand All @@ -80,9 +80,13 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher
}()

bkt := client.Bucket(currentSource.BucketName).Retryer(
// Use WithMaxAttempts to change the maximum number of attempts.
storage.WithMaxAttempts(currentSource.Retry.MaxAttempts),
// Use WithBackoff to change the timing of the exponential backoff.
storage.WithBackoff(gax.Backoff{
Initial: 2 * time.Second,
Initial: currentSource.Retry.InitialBackOffDuration,
Max: currentSource.Retry.MaxBackOffDuration,
Multiplier: currentSource.Retry.BackOffMultiplier,
}),
// RetryAlways will retry the operation even if it is non-idempotent.
// Since we are only reading, the operation is always idempotent
Expand Down
75 changes: 75 additions & 0 deletions x-pack/filebeat/input/gcs/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,81 @@ func Test_StorageClient(t *testing.T) {
mock.Gcs_test_new_object_docs_ata_json: true,
},
},
{
name: "RetryWithDefaultValues",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "1m",
"bucket_timeout": "1m",
"buckets": []map[string]interface{}{
{
"name": "gcs-test-new",
},
},
},
mockHandler: mock.GCSRetryServer,
expected: map[string]bool{
mock.Gcs_test_new_object_ata_json: true,
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
},
{
name: "RetryWithCustomValues",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "10s",
"bucket_timeout": "10s",
"retry": map[string]interface{}{
"max_attempts": 5,
"initial_backoff_duration": "1s",
"max_backoff_duration": "3s",
"backoff_multiplier": 1.4,
},
"buckets": []map[string]interface{}{
{
"name": "gcs-test-new",
},
},
},
mockHandler: mock.GCSRetryServer,
expected: map[string]bool{
mock.Gcs_test_new_object_ata_json: true,
mock.Gcs_test_new_object_data3_json: true,
mock.Gcs_test_new_object_docs_ata_json: true,
},
},
{
name: "RetryMinimumValueCheck",
baseConfig: map[string]interface{}{
"project_id": "elastic-sa",
"auth.credentials_file.path": "testdata/gcs_creds.json",
"max_workers": 1,
"poll": true,
"poll_interval": "10s",
"bucket_timeout": "10s",
"retry": map[string]interface{}{
"max_attempts": 5,
"initial_backoff_duration": "1s",
"max_backoff_duration": "3s",
"backoff_multiplier": 1,
},
"buckets": []map[string]interface{}{
{
"name": "gcs-test-new",
},
},
},
mockHandler: mock.GCSRetryServer,
expected: map[string]bool{},
isError: errors.New("requires value >= 1.1 accessing 'retry.backoff_multiplier'"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
40 changes: 40 additions & 0 deletions x-pack/filebeat/input/gcs/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,43 @@ func GCSFileServer() http.Handler {
w.Write([]byte("resource not found"))
})
}

//nolint:errcheck // We can ignore errors here, as this is just for testing
func GCSRetryServer() http.Handler {
retries := 0
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
retries++
path := strings.Split(strings.TrimLeft(r.URL.Path, "/"), "/")
if r.Method == http.MethodGet && retries >= 3 {
switch len(path) {
case 2:
if path[0] == "b" {
if buckets[path[1]] {
w.Write([]byte(fetchBucket[path[1]]))
return
}
} else if buckets[path[0]] && availableObjects[path[0]][path[1]] {
w.Write([]byte(objects[path[0]][path[1]]))
return
}
case 3:
if path[0] == "b" && path[2] == "o" {
if buckets[path[1]] {
w.Write([]byte(objectList[path[1]]))
return
}
} else if buckets[path[0]] {
objName := strings.Join(path[1:], "/")
if availableObjects[path[0]][objName] {
w.Write([]byte(objects[path[0]][objName]))
return
}
}
default:
w.WriteHeader(http.StatusNotFound)
return
}
}
w.WriteHeader(http.StatusInternalServerError)
})
}
1 change: 1 addition & 0 deletions x-pack/filebeat/input/gcs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Source struct {
FileSelectors []fileSelectorConfig
ReaderConfig readerConfig
ExpandEventListFromField string
Retry retryConfig
}

func (s *Source) Name() string {
Expand Down

0 comments on commit 3f51793

Please sign in to comment.