Skip to content

Commit

Permalink
x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives conf…
Browse files Browse the repository at this point in the history
…igurable (#34014)

The behaviour defaults to the old behaviour, but exposes the full set of
httpcommon.WithKeepaliveSettings configuration options.

(cherry picked from commit 3cd8d81)

# Conflicts:
#	x-pack/filebeat/input/cel/input.go
#	x-pack/filebeat/input/httpjson/input.go
#	x-pack/filebeat/input/httpjson/request_chain_helper.go
  • Loading branch information
efd6 authored and mergify[bot] committed Jan 18, 2023
1 parent 6400b56 commit 8c10802
Show file tree
Hide file tree
Showing 10 changed files with 344 additions and 0 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- httpjson input: Add request tracing logger. {issue}32402[32402] {pull}32412[32412]
- Add cloudflare R2 to provider list in AWS S3 input. {pull}32620[32620]
- Add support for single string containing multiple relation-types in getRFC5988Link. {pull}32811[32811]
- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896]
- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456]
- Add `parse_aws_vpc_flow_log` processor. {pull}33656[33656]
- Update `aws.vpcflow` dataset in AWS module have a configurable log `format` and to produce ECS 8.x fields. {pull}33699[33699]
- Modified `aws-s3` input to reduce mutex contention when multiple SQS message are being processed concurrently. {pull}33658[33658]
- Disable "event normalization" processing for the aws-s3 input to reduce allocations. {pull}33673[33673]
- Add Common Expression Language input. {pull}31233[31233]
- Add support for http+unix and http+npipe schemes in httpjson input. {issue}33571[33571] {pull}33610[33610]
- Add support for http+unix and http+npipe schemes in cel input. {issue}33571[33571] {pull}33712[33712]
- Add `decode_duration`, `move_fields` processors. {pull}31301[31301]
- Add metrics for UDP packet processing. {pull}33870[33870]
- Convert UDP input to v2 input. {pull}33930[33930]
- Improve collection of risk information from Okta debug data. {issue}33677[33677] {pull}34030[34030]
- Adding filename details from zip to response for httpjson {issue}33952[33952] {pull}34044[34044]
- Allow user configuration of keep-alive behaviour for HTTPJSON and CEL inputs. {issue}33951[33951] {pull}34014[34014]

*Auditbeat*

Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/docs/inputs/input-cel.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,26 @@ This specifies SSL/TLS configuration. If the ssl section is missing, the host's
CAs are used for HTTPS connections. See <<configuration-ssl>> for more
information.

[float]
==== `resource.keep_alive.disable`

This specifies whether to disable keep-alives for HTTP end-points. Default: `true`.

[float]
==== `resource.keep_alive.max_idle_connections`

The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`.

[float]
==== `resource.keep_alive.max_idle_connections_per_host`

The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`.

[float]
==== `resource.keep_alive.idle_connection_timeout`

The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`.

[float]
==== `resource.retry.max_attempts`

Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,26 @@ filebeat.inputs:
request.proxy_url: http://proxy.example:8080
----

[float]
==== `request.keep_alive.disable`

This specifies whether to disable keep-alives for HTTP end-points. Default: `true`.

[float]
==== `request.keep_alive.max_idle_connections`

The maximum number of idle connections across all hosts. Zero means no limit. Default: `0`.

[float]
==== `request.keep_alive.max_idle_connections_per_host`

The maximum idle connections to keep per-host. If zero, defaults to two. Default: `0`.

[float]
==== `request.keep_alive.idle_connection_timeout`

The maximum amount of time an idle connection will remain idle before closing itself. Valid time units are `ns`, `us`, `ms`, `s`, `m`, `h`. Zero means no limit. Default: `0s`.

[float]
==== `request.retry.max_attempts`

Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/cel/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,46 @@ func (c rateLimitConfig) Validate() error {
return nil
}

type keepAlive struct {
Disable *bool `config:"disable"`
MaxIdleConns int `config:"max_idle_connections"`
MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport.
IdleConnTimeout time.Duration `config:"idle_connection_timeout"`
}

func (c keepAlive) Validate() error {
if c.Disable == nil || *c.Disable {
return nil
}
if c.MaxIdleConns < 0 {
return errors.New("max_idle_connections must not be negative")
}
if c.MaxIdleConnsPerHost < 0 {
return errors.New("max_idle_connections_per_host must not be negative")
}
if c.IdleConnTimeout < 0 {
return errors.New("idle_connection_timeout must not be negative")
}
return nil
}

func (c keepAlive) settings() httpcommon.WithKeepaliveSettings {
return httpcommon.WithKeepaliveSettings{
Disable: c.Disable == nil || *c.Disable,
MaxIdleConns: c.MaxIdleConns,
MaxIdleConnsPerHost: c.MaxIdleConnsPerHost,
IdleConnTimeout: c.IdleConnTimeout,
}
}

type ResourceConfig struct {
URL *urlConfig `config:"url" validate:"required"`
Retry retryConfig `config:"retry"`
RedirectForwardHeaders bool `config:"redirect.forward_headers"`
RedirectHeadersBanList []string `config:"redirect.headers_ban_list"`
RedirectMaxRedirects int `config:"redirect.max_redirects"`
RateLimit *rateLimitConfig `config:"rate_limit"`
KeepAlive keepAlive `config:"keep_alive"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`

Expand Down
58 changes: 58 additions & 0 deletions x-pack/filebeat/input/cel/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"golang.org/x/oauth2/google"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

func TestProviderCanonical(t *testing.T) {
Expand Down Expand Up @@ -466,3 +467,60 @@ func TestConfigOauth2Validation(t *testing.T) {
})
}
}

var keepAliveTests = []struct {
name string
input map[string]interface{}
want httpcommon.WithKeepaliveSettings
wantErr error
}{
{
name: "keep_alive_none", // Default to the old behaviour of true.
input: map[string]interface{}{},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_true",
input: map[string]interface{}{
"resource.keep_alive.disable": true,
},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_false",
input: map[string]interface{}{
"resource.keep_alive.disable": false,
},
want: httpcommon.WithKeepaliveSettings{Disable: false},
},
{
name: "keep_alive_invalid_max",
input: map[string]interface{}{
"resource.keep_alive.disable": false,
"resource.keep_alive.max_idle_connections": -1,
},
wantErr: errors.New("max_idle_connections must not be negative accessing 'resource.keep_alive'"),
},
}

func TestKeepAliveSetting(t *testing.T) {
for _, test := range keepAliveTests {
t.Run(test.name, func(t *testing.T) {
test.input["resource.url"] = "localhost"
cfg := conf.MustNewConfigFrom(test.input)
conf := defaultConfig()
conf.Program = "{}" // Provide an empty program to avoid validation error from that.
err := cfg.Unpack(&conf)
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error return from Unpack: got: %v want: %v", err, test.wantErr)
}
if err != nil {
return
}
got := conf.Resource.KeepAlive.settings()
if got != test.want {
t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want)
}
})
}
}
50 changes: 50 additions & 0 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,14 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger) (*http.Client,
if !wantClient(cfg) {
return nil, nil
}
<<<<<<< HEAD
c, err := cfg.Resource.Transport.Client(
httpcommon.WithAPMHTTPInstrumentation(),
httpcommon.WithKeepaliveSettings{Disable: true},
)
=======
c, err := cfg.Resource.Transport.Client(clientOptions(cfg.Resource.URL.URL, cfg.Resource.KeepAlive.settings())...)
>>>>>>> 3cd8d811d6 (x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable (#34014))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -695,6 +699,52 @@ func wantClient(cfg config) bool {
}
}

<<<<<<< HEAD
=======
// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
scheme, trans, ok := strings.Cut(u.Scheme, "+")
var dialer transport.Dialer
switch {
default:
fallthrough
case !ok:
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
keepalive,
}

// We set the host for the unix socket and Windows named
// pipes schemes because the http.Transport expects to
// have a host and will error out if it is not present.
// The values here are just non-zero with a helpful name.
// They are not used in any logic.
case trans == "unix":
u.Host = "unix-socket"
dialer = socketDialer{u.Path}
case trans == "npipe":
u.Host = "windows-npipe"
dialer = npipeDialer{u.Path}
}
u.Scheme = scheme
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
keepalive,
httpcommon.WithBaseDialer(dialer),
}
}

// socketDialer implements transport.Dialer to a constant socket path.
type socketDialer struct {
path string
}

func (d socketDialer) Dial(_, _ string) (net.Conn, error) {
return net.Dial("unix", d.path)
}

>>>>>>> 3cd8d811d6 (x-pack/filebeat/input/{cel,httpjson}: make transport keep-alives configurable (#34014))
func checkRedirect(cfg *ResourceConfig, log *logp.Logger) func(*http.Request, []*http.Request) error {
return func(req *http.Request, via []*http.Request) error {
log.Debug("http client: checking redirect")
Expand Down
33 changes: 33 additions & 0 deletions x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,38 @@ func (c rateLimitConfig) Validate() error {
return nil
}

type keepAlive struct {
Disable *bool `config:"disable"`
MaxIdleConns int `config:"max_idle_connections"`
MaxIdleConnsPerHost int `config:"max_idle_connections_per_host"` // If zero, http.DefaultMaxIdleConnsPerHost is the value used by http.Transport.
IdleConnTimeout time.Duration `config:"idle_connection_timeout"`
}

func (c keepAlive) Validate() error {
if c.Disable == nil || *c.Disable {
return nil
}
if c.MaxIdleConns < 0 {
return errors.New("max_idle_connections must not be negative")
}
if c.MaxIdleConnsPerHost < 0 {
return errors.New("max_idle_connections_per_host must not be negative")
}
if c.IdleConnTimeout < 0 {
return errors.New("idle_connection_timeout must not be negative")
}
return nil
}

func (c keepAlive) settings() httpcommon.WithKeepaliveSettings {
return httpcommon.WithKeepaliveSettings{
Disable: c.Disable == nil || *c.Disable,
MaxIdleConns: c.MaxIdleConns,
MaxIdleConnsPerHost: c.MaxIdleConnsPerHost,
IdleConnTimeout: c.IdleConnTimeout,
}
}

type urlConfig struct {
*url.URL
}
Expand All @@ -99,6 +131,7 @@ type requestConfig struct {
RedirectHeadersBanList []string `config:"redirect.headers_ban_list"`
RedirectMaxRedirects int `config:"redirect.max_redirects"`
RateLimit *rateLimitConfig `config:"rate_limit"`
KeepAlive keepAlive `config:"keep_alive"`
Transforms transformsConfig `config:"transforms"`

Transport httpcommon.HTTPTransportSettings `config:",inline"`
Expand Down
58 changes: 58 additions & 0 deletions x-pack/filebeat/input/httpjson/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package httpjson
import (
"context"
"errors"
"fmt"
"os"
"testing"

Expand All @@ -15,6 +16,7 @@ import (
"golang.org/x/oauth2/google"

conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
)

func TestProviderCanonical(t *testing.T) {
Expand Down Expand Up @@ -485,3 +487,59 @@ func TestCursorEntryConfig(t *testing.T) {
assert.True(t, conf["entry3"].mustIgnoreEmptyValue())
assert.True(t, conf["entry4"].mustIgnoreEmptyValue())
}

var keepAliveTests = []struct {
name string
input map[string]interface{}
want httpcommon.WithKeepaliveSettings
wantErr error
}{
{
name: "keep_alive_none", // Default to the old behaviour of true.
input: map[string]interface{}{},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_true",
input: map[string]interface{}{
"request.keep_alive.disable": true,
},
want: httpcommon.WithKeepaliveSettings{Disable: true},
},
{
name: "keep_alive_false",
input: map[string]interface{}{
"request.keep_alive.disable": false,
},
want: httpcommon.WithKeepaliveSettings{Disable: false},
},
{
name: "keep_alive_invalid_max",
input: map[string]interface{}{
"request.keep_alive.disable": false,
"request.keep_alive.max_idle_connections": -1,
},
wantErr: errors.New("max_idle_connections must not be negative accessing 'request.keep_alive'"),
},
}

func TestKeepAliveSetting(t *testing.T) {
for _, test := range keepAliveTests {
t.Run(test.name, func(t *testing.T) {
test.input["request.url"] = "localhost"
cfg := conf.MustNewConfigFrom(test.input)
conf := defaultConfig()
err := cfg.Unpack(&conf)
if fmt.Sprint(err) != fmt.Sprint(test.wantErr) {
t.Errorf("unexpected error return from Unpack: got: %q want: %q", err, test.wantErr)
}
if err != nil {
return
}
got := conf.Request.KeepAlive.settings()
if got != test.want {
t.Errorf("unexpected setting for %s: got: %#v\nwant:%#v", test.name, got, test.want)
}
})
}
}
Loading

0 comments on commit 8c10802

Please sign in to comment.