From 8c1080206ff8e1ebe14e6138bb0623e8ef926acb Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Fri, 16 Dec 2022 17:48:26 +1030 Subject: [PATCH] x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable (#34014) The behaviour defaults to the old behaviour, but exposes the full set of httpcommon.WithKeepaliveSettings configuration options. (cherry picked from commit 3cd8d811d61ada755321f89a4a72b9c691aef6b1) # Conflicts: # x-pack/filebeat/input/cel/input.go # x-pack/filebeat/input/httpjson/input.go # x-pack/filebeat/input/httpjson/request_chain_helper.go --- CHANGELOG.next.asciidoc | 18 ++++++ .../filebeat/docs/inputs/input-cel.asciidoc | 20 +++++++ .../docs/inputs/input-httpjson.asciidoc | 20 +++++++ x-pack/filebeat/input/cel/config.go | 33 +++++++++++ x-pack/filebeat/input/cel/config_test.go | 58 +++++++++++++++++++ x-pack/filebeat/input/cel/input.go | 50 ++++++++++++++++ .../filebeat/input/httpjson/config_request.go | 33 +++++++++++ x-pack/filebeat/input/httpjson/config_test.go | 58 +++++++++++++++++++ x-pack/filebeat/input/httpjson/input.go | 50 ++++++++++++++++ .../input/httpjson/request_chain_helper.go | 4 ++ 10 files changed, 344 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2ac0b9108e85..78916db1166f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -106,6 +106,24 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - httpjson input: Add request tracing logger. {issue}32402[32402] {pull}32412[32412] - Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620] - Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811] +- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896] +- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377] +- Improve httpjson documentation for split processor. {pull}33473[33473] +- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499] +- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456] +- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656] +- Update `aws.vpcflow` dataset in AWS module have a configurable log `format` and to produce ECS 8.x fields. {pull}33699[33699] +- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658] +- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673] +- Add Common Expression Language input. {pull}31233[31233] +- Add support for http+unix and http+npipe schemes in httpjson input. {issue}33571[33571] {pull}33610[33610] +- Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712] +- Add `decode_duration`, `move_fields` processors. {pull}31301[31301] +- Add metrics for UDP packet processing. {pull}33870[33870] +- Convert UDP input to v2 input. {pull}33930[33930] +- Improve collection of risk information from Okta debug data. {issue}33677[33677] {pull}34030[34030] +- Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044] +- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-cel.asciidoc b/x-pack/filebeat/docs/inputs/input-cel.asciidoc index c0fc9d6b7701..163aaa13490f 100644 --- a/x-pack/filebeat/docs/inputs/input-cel.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cel.asciidoc @@ -431,6 +431,26 @@ This specifies SSL/TLS configuration. If the ssl section is missing, the host's CAs are used for HTTPS connections. See <> for more information. +[float] +==== `resource.keep_alive.disable` + +This specifies whether to disable keep-alives for HTTP end-points. Default: `true`. + +[float] +==== `resource.keep_alive.max_idle_connections` + +The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`. + +[float] +==== `resource.keep_alive.max_idle_connections_per_host` + +The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`. + +[float] +==== `resource.keep_alive.idle_connection_timeout` + +The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`. + [float] ==== `resource.retry.max_attempts` diff --git a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc index ed832f56ef7a..35d82bf7cac5 100644 --- a/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-httpjson.asciidoc @@ -453,6 +453,26 @@ filebeat.inputs: request.proxy_url: http://proxy.example:8080 ---- +[float] +==== `request.keep_alive.disable` + +This specifies whether to disable keep-alives for HTTP end-points. Default: `true`. + +[float] +==== `request.keep_alive.max_idle_connections` + +The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`. + +[float] +==== `request.keep_alive.max_idle_connections_per_host` + +The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`. + +[float] +==== `request.keep_alive.idle_connection_timeout` + +The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`. + [float] ==== `request.retry.max_attempts` diff --git a/x-pack/filebeat/input/cel/config.go b/x-pack/filebeat/input/cel/config.go index 49758bd3ad19..f896514a1d7d 100644 --- a/x-pack/filebeat/input/cel/config.go +++ b/x-pack/filebeat/input/cel/config.go @@ -142,6 +142,38 @@ func (c rateLimitConfig) Validate() error { return nil } +type keepAlive struct { + Disable *bool `config:"disable"` + MaxIdleConns int `config:"max_idle_connections"` + MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport. + IdleConnTimeout time.Duration `config:"idle_connection_timeout"` +} + +func (c keepAlive) Validate() error { + if c.Disable == nil || *c.Disable { + return nil + } + if c.MaxIdleConns < 0 { + return errors.New("max_idle_connections must not be negative") + } + if c.MaxIdleConnsPerHost < 0 { + return errors.New("max_idle_connections_per_host must not be negative") + } + if c.IdleConnTimeout < 0 { + return errors.New("idle_connection_timeout must not be negative") + } + return nil +} + +func (c keepAlive) settings() httpcommon.WithKeepaliveSettings { + return httpcommon.WithKeepaliveSettings{ + Disable: c.Disable == nil || *c.Disable, + MaxIdleConns: c.MaxIdleConns, + MaxIdleConnsPerHost: c.MaxIdleConnsPerHost, + IdleConnTimeout: c.IdleConnTimeout, + } +} + type ResourceConfig struct { URL *urlConfig `config:"url" validate:"required"` Retry retryConfig `config:"retry"` @@ -149,6 +181,7 @@ type ResourceConfig struct { RedirectHeadersBanList []string `config:"redirect.headers_ban_list"` RedirectMaxRedirects int `config:"redirect.max_redirects"` RateLimit *rateLimitConfig `config:"rate_limit"` + KeepAlive keepAlive `config:"keep_alive"` Transport httpcommon.HTTPTransportSettings `config:",inline"` diff --git a/x-pack/filebeat/input/cel/config_test.go b/x-pack/filebeat/input/cel/config_test.go index 0fde669f649a..c4fcd82c9996 100644 --- a/x-pack/filebeat/input/cel/config_test.go +++ b/x-pack/filebeat/input/cel/config_test.go @@ -17,6 +17,7 @@ import ( "golang.org/x/oauth2/google" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) func TestProviderCanonical(t *testing.T) { @@ -466,3 +467,60 @@ func TestConfigOauth2Validation(t *testing.T) { }) } } + +var keepAliveTests = []struct { + name string + input map[string]interface{} + want httpcommon.WithKeepaliveSettings + wantErr error +}{ + { + name: "keep_alive_none", // Default to the old behaviour of true. + input: map[string]interface{}{}, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_true", + input: map[string]interface{}{ + "resource.keep_alive.disable": true, + }, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_false", + input: map[string]interface{}{ + "resource.keep_alive.disable": false, + }, + want: httpcommon.WithKeepaliveSettings{Disable: false}, + }, + { + name: "keep_alive_invalid_max", + input: map[string]interface{}{ + "resource.keep_alive.disable": false, + "resource.keep_alive.max_idle_connections": -1, + }, + wantErr: errors.New("max_idle_connections must not be negative accessing 'resource.keep_alive'"), + }, +} + +func TestKeepAliveSetting(t *testing.T) { + for _, test := range keepAliveTests { + t.Run(test.name, func(t *testing.T) { + test.input["resource.url"] = "localhost" + cfg := conf.MustNewConfigFrom(test.input) + conf := defaultConfig() + conf.Program = "{}" // Provide an empty program to avoid validation error from that. + err := cfg.Unpack(&conf) + if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { + t.Errorf("unexpected error return from Unpack: got: %v want: %v", err, test.wantErr) + } + if err != nil { + return + } + got := conf.Resource.KeepAlive.settings() + if got != test.want { + t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want) + } + }) + } +} diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index c99e8520380b..05c24b4f2e3b 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -643,10 +643,14 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client, if !wantClient(cfg) { return nil, nil } +<<<<<<< HEAD c, err := cfg.Resource.Transport.Client( httpcommon.WithAPMHTTPInstrumentation(), httpcommon.WithKeepaliveSettings{Disable: true}, ) +======= + c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings())...) +>>>>>>> 3cd8d811d6 (x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable (#34014)) if err != nil { return nil, err } @@ -695,6 +699,52 @@ func wantClient(cfg config) bool { } } +<<<<<<< HEAD +======= +// clientOption returns constructed client configuration options, including +// setting up http+unix and http+npipe transports if requested. +func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption { + scheme, trans, ok := strings.Cut(u.Scheme, "+") + var dialer transport.Dialer + switch { + default: + fallthrough + case !ok: + return []httpcommon.TransportOption{ + httpcommon.WithAPMHTTPInstrumentation(), + keepalive, + } + + // We set the host for the unix socket and Windows named + // pipes schemes because the http.Transport expects to + // have a host and will error out if it is not present. + // The values here are just non-zero with a helpful name. + // They are not used in any logic. + case trans == "unix": + u.Host = "unix-socket" + dialer = socketDialer{u.Path} + case trans == "npipe": + u.Host = "windows-npipe" + dialer = npipeDialer{u.Path} + } + u.Scheme = scheme + return []httpcommon.TransportOption{ + httpcommon.WithAPMHTTPInstrumentation(), + keepalive, + httpcommon.WithBaseDialer(dialer), + } +} + +// socketDialer implements transport.Dialer to a constant socket path. +type socketDialer struct { + path string +} + +func (d socketDialer) Dial(_, _ string) (net.Conn, error) { + return net.Dial("unix", d.path) +} + +>>>>>>> 3cd8d811d6 (x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable (#34014)) func checkRedirect(cfg *ResourceConfig, log *logp.Logger) func(*http.Request, []*http.Request) error { return func(req *http.Request, via []*http.Request) error { log.Debug("http client: checking redirect") diff --git a/x-pack/filebeat/input/httpjson/config_request.go b/x-pack/filebeat/input/httpjson/config_request.go index dcfda22ee1da..86ec0a68a581 100644 --- a/x-pack/filebeat/input/httpjson/config_request.go +++ b/x-pack/filebeat/input/httpjson/config_request.go @@ -74,6 +74,38 @@ func (c rateLimitConfig) Validate() error { return nil } +type keepAlive struct { + Disable *bool `config:"disable"` + MaxIdleConns int `config:"max_idle_connections"` + MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport. + IdleConnTimeout time.Duration `config:"idle_connection_timeout"` +} + +func (c keepAlive) Validate() error { + if c.Disable == nil || *c.Disable { + return nil + } + if c.MaxIdleConns < 0 { + return errors.New("max_idle_connections must not be negative") + } + if c.MaxIdleConnsPerHost < 0 { + return errors.New("max_idle_connections_per_host must not be negative") + } + if c.IdleConnTimeout < 0 { + return errors.New("idle_connection_timeout must not be negative") + } + return nil +} + +func (c keepAlive) settings() httpcommon.WithKeepaliveSettings { + return httpcommon.WithKeepaliveSettings{ + Disable: c.Disable == nil || *c.Disable, + MaxIdleConns: c.MaxIdleConns, + MaxIdleConnsPerHost: c.MaxIdleConnsPerHost, + IdleConnTimeout: c.IdleConnTimeout, + } +} + type urlConfig struct { *url.URL } @@ -99,6 +131,7 @@ type requestConfig struct { RedirectHeadersBanList []string `config:"redirect.headers_ban_list"` RedirectMaxRedirects int `config:"redirect.max_redirects"` RateLimit *rateLimitConfig `config:"rate_limit"` + KeepAlive keepAlive `config:"keep_alive"` Transforms transformsConfig `config:"transforms"` Transport httpcommon.HTTPTransportSettings `config:",inline"` diff --git a/x-pack/filebeat/input/httpjson/config_test.go b/x-pack/filebeat/input/httpjson/config_test.go index 116ea617e3b1..01b34afad238 100644 --- a/x-pack/filebeat/input/httpjson/config_test.go +++ b/x-pack/filebeat/input/httpjson/config_test.go @@ -7,6 +7,7 @@ package httpjson import ( "context" "errors" + "fmt" "os" "testing" @@ -15,6 +16,7 @@ import ( "golang.org/x/oauth2/google" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) func TestProviderCanonical(t *testing.T) { @@ -485,3 +487,59 @@ func TestCursorEntryConfig(t *testing.T) { assert.True(t, conf["entry3"].mustIgnoreEmptyValue()) assert.True(t, conf["entry4"].mustIgnoreEmptyValue()) } + +var keepAliveTests = []struct { + name string + input map[string]interface{} + want httpcommon.WithKeepaliveSettings + wantErr error +}{ + { + name: "keep_alive_none", // Default to the old behaviour of true. + input: map[string]interface{}{}, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_true", + input: map[string]interface{}{ + "request.keep_alive.disable": true, + }, + want: httpcommon.WithKeepaliveSettings{Disable: true}, + }, + { + name: "keep_alive_false", + input: map[string]interface{}{ + "request.keep_alive.disable": false, + }, + want: httpcommon.WithKeepaliveSettings{Disable: false}, + }, + { + name: "keep_alive_invalid_max", + input: map[string]interface{}{ + "request.keep_alive.disable": false, + "request.keep_alive.max_idle_connections": -1, + }, + wantErr: errors.New("max_idle_connections must not be negative accessing 'request.keep_alive'"), + }, +} + +func TestKeepAliveSetting(t *testing.T) { + for _, test := range keepAliveTests { + t.Run(test.name, func(t *testing.T) { + test.input["request.url"] = "localhost" + cfg := conf.MustNewConfigFrom(test.input) + conf := defaultConfig() + err := cfg.Unpack(&conf) + if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { + t.Errorf("unexpected error return from Unpack: got: %q want: %q", err, test.wantErr) + } + if err != nil { + return + } + got := conf.Request.KeepAlive.settings() + if got != test.want { + t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want) + } + }) + } +} diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 54b60495f0cc..952d90575488 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -155,10 +155,14 @@ func run( func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpClient, error) { // Make retryable HTTP client +<<<<<<< HEAD netHTTPClient, err := config.Request.Transport.Client( httpcommon.WithAPMHTTPInstrumentation(), httpcommon.WithKeepaliveSettings{Disable: true}, ) +======= + netHTTPClient, err := config.Request.Transport.Client(clientOptions(config.Request.URL.URL, config.Request.KeepAlive.settings())...) +>>>>>>> 3cd8d811d6 (x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable (#34014)) if err != nil { return nil, err } @@ -200,6 +204,52 @@ func newHTTPClient(ctx context.Context, config config, log *logp.Logger) (*httpC return &httpClient{client: client.StandardClient(), limiter: limiter}, nil } +<<<<<<< HEAD +======= +// clientOption returns constructed client configuration options, including +// setting up http+unix and http+npipe transports if requested. +func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption { + scheme, trans, ok := strings.Cut(u.Scheme, "+") + var dialer transport.Dialer + switch { + default: + fallthrough + case !ok: + return []httpcommon.TransportOption{ + httpcommon.WithAPMHTTPInstrumentation(), + keepalive, + } + + // We set the host for the unix socket and Windows named + // pipes schemes because the http.Transport expects to + // have a host and will error out if it is not present. + // The values here are just non-zero with a helpful name. + // They are not used in any logic. + case trans == "unix": + u.Host = "unix-socket" + dialer = socketDialer{u.Path} + case trans == "npipe": + u.Host = "windows-npipe" + dialer = npipeDialer{u.Path} + } + u.Scheme = scheme + return []httpcommon.TransportOption{ + httpcommon.WithAPMHTTPInstrumentation(), + keepalive, + httpcommon.WithBaseDialer(dialer), + } +} + +// socketDialer implements transport.Dialer to a constant socket path. +type socketDialer struct { + path string +} + +func (d socketDialer) Dial(_, _ string) (net.Conn, error) { + return net.Dial("unix", d.path) +} + +>>>>>>> 3cd8d811d6 (x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable (#34014)) func checkRedirect(config *requestConfig, log *logp.Logger) func(*http.Request, []*http.Request) error { return func(req *http.Request, via []*http.Request) error { log.Debug("http client: checking redirect") diff --git a/x-pack/filebeat/input/httpjson/request_chain_helper.go b/x-pack/filebeat/input/httpjson/request_chain_helper.go index a894e4c6248a..5d35e006f430 100644 --- a/x-pack/filebeat/input/httpjson/request_chain_helper.go +++ b/x-pack/filebeat/input/httpjson/request_chain_helper.go @@ -33,10 +33,14 @@ const ( func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, p ...*Policy) (*httpClient, error) { // Make retryable HTTP client +<<<<<<< HEAD netHTTPClient, err := requestCfg.Transport.Client( httpcommon.WithAPMHTTPInstrumentation(), httpcommon.WithKeepaliveSettings{Disable: true}, ) +======= + netHTTPClient, err := requestCfg.Transport.Client(clientOptions(requestCfg.URL.URL, requestCfg.KeepAlive.settings())...) +>>>>>>> 3cd8d811d6 (x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable (#34014)) if err != nil { return nil, err }