From ab2a2c00e50317aca9d1f2fbe8409603310415aa Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Mon, 12 Dec 2022 11:15:54 +1030 Subject: [PATCH] x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable The behaviour defaults to the old behaviour, but exposes the full set of httpcommon.WithKeepaliveSettings configuration options. --- CHANGELOG.next.asciidoc | 1 + .../filebeat/docs/inputs/input-cel.asciidoc | 20 +++++++ .../docs/inputs/input-httpjson.asciidoc | 20 +++++++ x-pack/filebeat/input/cel/config.go | 33 +++++++++++ x-pack/filebeat/input/cel/config_test.go | 58 +++++++++++++++++++ x-pack/filebeat/input/cel/input.go | 8 +-- .../filebeat/input/httpjson/config_request.go | 33 +++++++++++ x-pack/filebeat/input/httpjson/config_test.go | 58 +++++++++++++++++++ x-pack/filebeat/input/httpjson/input.go | 8 +-- .../input/httpjson/request_chain_helper.go | 2 +- 10 files changed, 232 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0a1ad764144e..8fbcaa8e0d94 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -183,6 +183,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Convert UDP input to v2 input. {pull}33930[33930] - Improve collection of risk information from Okta debug data. {issue}33677[33677] {pull}34030[34030] - Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044] +- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-cel.asciidoc b/x-pack/filebeat/docs/inputs/input-cel.asciidoc index db07a3e62a23..dccca87f9aaf 100644 --- a/x-pack/filebeat/docs/inputs/input-cel.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cel.asciidoc @@ -434,6 +434,26 @@ This specifies SSL/TLS configuration. If the ssl section is missing, the host's CAs are used for HTTPS connections. See <> for more information. +[float] +==== `resource.keep_alive.disable` + +This specifies whether to disable keep-alives for HTTP end-points. Default: `true`. + +[float] +==== `resource.keep_alive.max_idle_connections` + +The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`. + +[float] +==== `resource.keep_alive.max_idle_connections_per_host` + +The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`. + +[float] +==== `resource.keep_alive.idle_connection_timeout` + +The maximum amount of time an idle connection will remain idle before closing itself. Zero means no limit. Default: `0s`. + [float] ==== `resource.retry.max_attempts` diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index 9bda680b6153..9de259688f1b 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -456,6 +456,26 @@ filebeat.inputs: request.proxy_url: http://proxy.example:8080 ---- +[float] +==== `request.keep_alive.disable` + +This specifies whether to disable keep-alives for HTTP end-points. Default: `true`. + +[float] +==== `request.keep_alive.max_idle_connections` + +The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`. + +[float] +==== `request.keep_alive.max_idle_connections_per_host` + +The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`. + +[float] +==== `request.keep_alive.idle_connection_timeout` + +The maximum amount of time an idle connection will remain idle before closing itself. Zero means no limit. Default: `0s`. + [float] ==== `request.retry.max_attempts` diff --git a/x-pack/filebeat/input/cel/config.go b/x-pack/filebeat/input/cel/config.go index 7188ce57056f..b286df3d202c 100644 --- a/x-pack/filebeat/input/cel/config.go +++ b/x-pack/filebeat/input/cel/config.go @@ -143,6 +143,38 @@ func (c rateLimitConfig) Validate() error { return nil } +type keepAlive struct { + Disable *bool `config:"disable"` + MaxIdleConns int `config:"max_idle_connections"` + MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport. + IdleConnTimeout time.Duration `config:"idle_connection_timeout"` +} + +func (c keepAlive) Validate() error { + if c.Disable == nil || *c.Disable { + return nil + } + if c.MaxIdleConns < 0 { + return errors.New("max_idle_connections must not be negative") + } + if c.MaxIdleConnsPerHost < 0 { + return errors.New("max_idle_connections_per_host must not be negative") + } + if c.IdleConnTimeout < 0 { + return errors.New("idle_connection_timeout must not be negative") + } + return nil +} + +func (c keepAlive) settings() httpcommon.WithKeepaliveSettings { + return httpcommon.WithKeepaliveSettings{ + Disable: c.Disable == nil || *c.Disable, + MaxIdleConns: c.MaxIdleConns, + MaxIdleConnsPerHost: c.MaxIdleConnsPerHost, + IdleConnTimeout: c.IdleConnTimeout, + } +} + type ResourceConfig struct { URL *urlConfig `config:"url" validate:"required"` Retry retryConfig `config:"retry"` @@ -150,6 +182,7 @@ type ResourceConfig struct { RedirectHeadersBanList []string `config:"redirect.headers_ban_list"` RedirectMaxRedirects int `config:"redirect.max_redirects"` RateLimit *rateLimitConfig `config:"rate_limit"` + KeepAlive keepAlive `config:"keep_alive"` Transport httpcommon.HTTPTransportSettings `config:",inline"` diff --git a/x-pack/filebeat/input/cel/config_test.go b/x-pack/filebeat/input/cel/config_test.go index 0fde669f649a..c4fcd82c9996 100644 --- a/x-pack/filebeat/input/cel/config_test.go +++ b/x-pack/filebeat/input/cel/config_test.go @@ -17,6 +17,7 @@ import ( "golang.org/x/oauth2/google" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) func TestProviderCanonical(t *testing.T) { @@ -466,3 +467,60 @@ func TestConfigOauth2Validation(t *testing.T) { }) } } + +var keepAliveTests = []struct { + name string + input map[string]interface{} + want httpcommon.WithKeepaliveSettings + wantErr error +}{ + { + name: "keep_alive_none", // Default to the old behaviour of true. + input: map[string]interface{}{}, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_true", + input: map[string]interface{}{ + "resource.keep_alive.disable": true, + }, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_false", + input: map[string]interface{}{ + "resource.keep_alive.disable": false, + }, + want: httpcommon.WithKeepaliveSettings{Disable: false}, + }, + { + name: "keep_alive_invalid_max", + input: map[string]interface{}{ + "resource.keep_alive.disable": false, + "resource.keep_alive.max_idle_connections": -1, + }, + wantErr: errors.New("max_idle_connections must not be negative accessing 'resource.keep_alive'"), + }, +} + +func TestKeepAliveSetting(t *testing.T) { + for _, test := range keepAliveTests { + t.Run(test.name, func(t *testing.T) { + test.input["resource.url"] = "localhost" + cfg := conf.MustNewConfigFrom(test.input) + conf := defaultConfig() + conf.Program = "{}" // Provide an empty program to avoid validation error from that. + err := cfg.Unpack(&conf) + if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { + t.Errorf("unexpected error return from Unpack: got: %v want: %v", err, test.wantErr) + } + if err != nil { + return + } + got := conf.Resource.KeepAlive.settings() + if got != test.want { + t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want) + } + }) + } +} diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 19bbaf81c883..9f3c93ae37b9 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -645,7 +645,7 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, if !wantClient(cfg) { return nil, nil } - c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL)...) + c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings())...) if err != nil { return nil, err } @@ -696,7 +696,7 @@ func wantClient(cfg config) bool { // clientOption returns constructed client configuration options, including // setting up http+unix and http+npipe transports if requested. -func clientOptions(u *url.URL) []httpcommon.TransportOption { +func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption { scheme, trans, ok := strings.Cut(u.Scheme, "+") var dialer transport.Dialer switch { @@ -705,7 +705,7 @@ func clientOptions(u *url.URL) []httpcommon.TransportOption { case !ok: return []httpcommon.TransportOption{ httpcommon.WithAPMHTTPInstrumentation(), - httpcommon.WithKeepaliveSettings{Disable: true}, + keepalive, } // We set the host for the unix socket and Windows named @@ -723,7 +723,7 @@ func clientOptions(u *url.URL) []httpcommon.TransportOption { u.Scheme = scheme return []httpcommon.TransportOption{ httpcommon.WithAPMHTTPInstrumentation(), - httpcommon.WithKeepaliveSettings{Disable: true}, + keepalive, httpcommon.WithBaseDialer(dialer), } } diff --git a/x-pack/filebeat/input/httpjson/config_request.go b/x-pack/filebeat/input/httpjson/config_request.go index dcfda22ee1da..86ec0a68a581 100644 --- a/x-pack/filebeat/input/httpjson/config_request.go +++ b/x-pack/filebeat/input/httpjson/config_request.go @@ -74,6 +74,38 @@ func (c rateLimitConfig) Validate() error { return nil } +type keepAlive struct { + Disable *bool `config:"disable"` + MaxIdleConns int `config:"max_idle_connections"` + MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport. + IdleConnTimeout time.Duration `config:"idle_connection_timeout"` +} + +func (c keepAlive) Validate() error { + if c.Disable == nil || *c.Disable { + return nil + } + if c.MaxIdleConns < 0 { + return errors.New("max_idle_connections must not be negative") + } + if c.MaxIdleConnsPerHost < 0 { + return errors.New("max_idle_connections_per_host must not be negative") + } + if c.IdleConnTimeout < 0 { + return errors.New("idle_connection_timeout must not be negative") + } + return nil +} + +func (c keepAlive) settings() httpcommon.WithKeepaliveSettings { + return httpcommon.WithKeepaliveSettings{ + Disable: c.Disable == nil || *c.Disable, + MaxIdleConns: c.MaxIdleConns, + MaxIdleConnsPerHost: c.MaxIdleConnsPerHost, + IdleConnTimeout: c.IdleConnTimeout, + } +} + type urlConfig struct { *url.URL } @@ -99,6 +131,7 @@ type requestConfig struct { RedirectHeadersBanList []string `config:"redirect.headers_ban_list"` RedirectMaxRedirects int `config:"redirect.max_redirects"` RateLimit *rateLimitConfig `config:"rate_limit"` + KeepAlive keepAlive `config:"keep_alive"` Transforms transformsConfig `config:"transforms"` Transport httpcommon.HTTPTransportSettings `config:",inline"` diff --git a/x-pack/filebeat/input/httpjson/config_test.go b/x-pack/filebeat/input/httpjson/config_test.go index 116ea617e3b1..01b34afad238 100644 --- a/x-pack/filebeat/input/httpjson/config_test.go +++ b/x-pack/filebeat/input/httpjson/config_test.go @@ -7,6 +7,7 @@ package httpjson import ( "context" "errors" + "fmt" "os" "testing" @@ -15,6 +16,7 @@ import ( "golang.org/x/oauth2/google" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) func TestProviderCanonical(t *testing.T) { @@ -485,3 +487,59 @@ func TestCursorEntryConfig(t *testing.T) { assert.True(t, conf["entry3"].mustIgnoreEmptyValue()) assert.True(t, conf["entry4"].mustIgnoreEmptyValue()) } + +var keepAliveTests = []struct { + name string + input map[string]interface{} + want httpcommon.WithKeepaliveSettings + wantErr error +}{ + { + name: "keep_alive_none", // Default to the old behaviour of true. + input: map[string]interface{}{}, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_true", + input: map[string]interface{}{ + "request.keep_alive.disable": true, + }, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_false", + input: map[string]interface{}{ + "request.keep_alive.disable": false, + }, + want: httpcommon.WithKeepaliveSettings{Disable: false}, + }, + { + name: "keep_alive_invalid_max", + input: map[string]interface{}{ + "request.keep_alive.disable": false, + "request.keep_alive.max_idle_connections": -1, + }, + wantErr: errors.New("max_idle_connections must not be negative accessing 'request.keep_alive'"), + }, +} + +func TestKeepAliveSetting(t *testing.T) { + for _, test := range keepAliveTests { + t.Run(test.name, func(t *testing.T) { + test.input["request.url"] = "localhost" + cfg := conf.MustNewConfigFrom(test.input) + conf := defaultConfig() + err := cfg.Unpack(&conf) + if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { + t.Errorf("unexpected error return from Unpack: got: %q want: %q", err, test.wantErr) + } + if err != nil { + return + } + got := conf.Request.KeepAlive.settings() + if got != test.want { + t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want) + } + }) + } +} diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 241c08fe2b2d..2abf2d7e7307 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -157,7 +157,7 @@ func run( func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpClient, error) { // Make retryable HTTP client - netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL)...) + netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL, config.Request.KeepAlive.settings())...) if err != nil { return nil, err } @@ -201,7 +201,7 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC // clientOption returns constructed client configuration options, including // setting up http+unix and http+npipe transports if requested. -func clientOptions(u *url.URL) []httpcommon.TransportOption { +func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption { scheme, trans, ok := strings.Cut(u.Scheme, "+") var dialer transport.Dialer switch { @@ -210,7 +210,7 @@ func clientOptions(u *url.URL) []httpcommon.TransportOption { case !ok: return []httpcommon.TransportOption{ httpcommon.WithAPMHTTPInstrumentation(), - httpcommon.WithKeepaliveSettings{Disable: true}, + keepalive, } // We set the host for the unix socket and Windows named @@ -228,7 +228,7 @@ func clientOptions(u *url.URL) []httpcommon.TransportOption { u.Scheme = scheme return []httpcommon.TransportOption{ httpcommon.WithAPMHTTPInstrumentation(), - httpcommon.WithKeepaliveSettings{Disable: true}, + keepalive, httpcommon.WithBaseDialer(dialer), } } diff --git a/x-pack/filebeat/input/httpjson/request_chain_helper.go b/x-pack/filebeat/input/httpjson/request_chain_helper.go index 57510b049a6d..cc31b9062a72 100644 --- a/x-pack/filebeat/input/httpjson/request_chain_helper.go +++ b/x-pack/filebeat/input/httpjson/request_chain_helper.go @@ -32,7 +32,7 @@ const ( func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, p ...*Policy) (*httpClient, error) { // Make retryable HTTP client - netHTTPClient, err := requestCfg.Transport.Client(clientOptions(requestCfg.URL.URL)...) + netHTTPClient, err := requestCfg.Transport.Client(clientOptions(requestCfg.URL.URL, requestCfg.KeepAlive.settings())...) if err != nil { return nil, err }