Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x](backport #27509) Add a header round tripper option to httpcommon #27788

Merged
merged 3 commits into from
Sep 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Bump AWS SDK version to v0.24.0 for WebIdentity authentication flow {issue}19393[19393] {pull}27126[27126]
- Add Linux pressure metricset {pull}27355[27355]
- Add support for kube-state-metrics v2.0.0 {pull}27552[27552]
- Add User-Agent header to HTTP requests. {issue}18160[18160] {pull}27509[27509]

*Packetbeat*

Expand Down
2 changes: 1 addition & 1 deletion dev-tools/cmd/dashboards/export_dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
Path: u.Path,
SpaceID: *spaceID,
Transport: transport,
})
}, "Beat Development Tools")
if err != nil {
log.Fatalf("Error while connecting to Kibana: %v", err)
}
Expand Down
8 changes: 4 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {

overwritePipelines := true
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := eslegclient.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
if err != nil {
return err
}
Expand Down Expand Up @@ -235,7 +235,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
}

esConfig := b.Config.Output.Config()
esClient, err := eslegclient.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
}
Expand All @@ -256,7 +256,7 @@ func (fb *Filebeat) loadModulesML(b *beat.Beat, kibanaConfig *common.Config) err
}
}

kibanaClient, err := kibana.NewKibanaClient(kibanaConfig)
kibanaClient, err := kibana.NewKibanaClient(kibanaConfig, "Filebeat")
if err != nil {
return errors.Errorf("Error creating Kibana client: %v", err)
}
Expand Down Expand Up @@ -521,7 +521,7 @@ func (fb *Filebeat) Stop() {
// Create a new pipeline loader (es client) factory
func newPipelineLoaderFactory(esConfig *common.Config) fileset.PipelineLoaderFactory {
pipelineLoaderFactory := func() (fileset.PipelineLoader, error) {
esClient, err := eslegclient.NewConnectedClient(esConfig)
esClient, err := eslegclient.NewConnectedClient(esConfig, "Filebeat")
if err != nil {
return nil, errors.Wrap(err, "Error creating Elasticsearch client")
}
Expand Down
4 changes: 4 additions & 0 deletions heartbeat/monitors/active/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/useragent"
"github.com/elastic/beats/v7/libbeat/logp"
)

Expand All @@ -38,6 +39,8 @@ func init() {

var debugf = logp.MakeDebug("http")

var userAgent = useragent.UserAgent("Heartbeat", true)

// Create makes a new HTTP monitor
func create(
name string,
Expand Down Expand Up @@ -128,5 +131,6 @@ func newRoundTripper(config *Config) (http.RoundTripper, error) {
httpcommon.WithKeepaliveSettings{
Disable: true,
},
httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}),
)
}
26 changes: 26 additions & 0 deletions heartbeat/monitors/active/http/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/heartbeat/hbtest"
Expand Down Expand Up @@ -674,3 +675,28 @@ func mustParseURL(t *testing.T, url string) *url.URL {
}
return parsed
}

func TestUserAgentInject(t *testing.T) {
ua := ""
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ua = r.Header.Get("User-Agent")
w.WriteHeader(http.StatusOK)
}))
defer ts.Close()

cfg, err := common.NewConfigFrom(map[string]interface{}{
"urls": ts.URL,
})
require.NoError(t, err)

p, err := create("ua", cfg)
require.NoError(t, err)

sched, _ := schedule.Parse("@every 1s")
job := wrappers.WrapCommon(p.Jobs, stdfields.StdMonitorFields{ID: "test", Type: "http", Schedule: sched, Timeout: 1})[0]

event := &beat.Event{}
_, err = job(event)
require.NoError(t, err)
assert.Contains(t, ua, "Heartbeat")
}
44 changes: 20 additions & 24 deletions heartbeat/monitors/active/http/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,10 @@ import (
"github.com/elastic/beats/v7/heartbeat/reason"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/useragent"
)

var userAgent = useragent.UserAgent("Heartbeat", true)

func newHTTPMonitorHostJob(
addr string,
config *Config,
Expand Down Expand Up @@ -141,27 +139,28 @@ func createPingFactory(
// prevents following redirects in this case, we know that
// config.MaxRedirects must be zero to even be here
checkRedirect := makeCheckRedirect(0, nil)
transport := &SimpleTransport{
Dialer: dialer,
OnStartWrite: func() {
cbMutex.Lock()
writeStart = time.Now()
cbMutex.Unlock()
},
OnEndWrite: func() {
cbMutex.Lock()
writeEnd = time.Now()
cbMutex.Unlock()
},
OnStartRead: func() {
cbMutex.Lock()
readStart = time.Now()
cbMutex.Unlock()
},
}
client := &http.Client{
CheckRedirect: checkRedirect,
Timeout: timeout,
Transport: &SimpleTransport{
Dialer: dialer,
OnStartWrite: func() {
cbMutex.Lock()
writeStart = time.Now()
cbMutex.Unlock()
},
OnEndWrite: func() {
cbMutex.Lock()
writeEnd = time.Now()
cbMutex.Unlock()
},
OnStartRead: func() {
cbMutex.Lock()
readStart = time.Now()
cbMutex.Unlock()
},
},
Transport: httpcommon.HeaderRoundTripper(transport, map[string]string{"User-Agent": userAgent}),
}

_, end, err := execPing(event, client, request, body, timeout, validator, config.Response)
Expand Down Expand Up @@ -206,9 +205,6 @@ func buildRequest(addr string, config *Config, enc contentEncoder) (*http.Reques

request.Header.Add(k, v)
}
if ua := request.Header.Get("User-Agent"); ua == "" {
request.Header.Set("User-Agent", userAgent)
}

if enc != nil {
enc.AddHeaders(&request.Header)
Expand Down
9 changes: 0 additions & 9 deletions heartbeat/monitors/active/http/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
"reflect"
"testing"

"github.com/elastic/beats/v7/libbeat/common/useragent"

"github.com/stretchr/testify/require"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -174,13 +172,6 @@ func TestRequestBuildingWithCustomHost(t *testing.T) {
}
}

func TestRequestBuildingWithNoUserAgent(t *testing.T) {
request, err := buildRequest("localhost", &Config{}, nilEncoder{})

require.Nil(t, err)
assert.Equal(t, useragent.UserAgent("Heartbeat", true), request.Header.Get("User-Agent"))
}

func TestRequestBuildingWithExplicitUserAgent(t *testing.T) {
expectedUserAgent := "some-user-agent"

Expand Down
2 changes: 1 addition & 1 deletion libbeat/cmd/export/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func GenDashboardCmd(settings instance.Settings) *cobra.Command {
b.Config.Kibana = common.NewConfig()
}

client, err := kibana.NewKibanaClient(b.Config.Kibana)
client, err := kibana.NewKibanaClient(b.Config.Kibana, b.Info.Beat)
if err != nil {
fatalf("Error creating Kibana client: %+v.\n", err)
}
Expand Down
4 changes: 2 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ func (b *Beat) Setup(settings Settings, bt beat.Creator, setup SetupSettings) er
if outCfg.Name() != "elasticsearch" {
return fmt.Errorf("Index management requested but the Elasticsearch output is not configured/enabled")
}
esClient, err := eslegclient.NewConnectedClient(outCfg.Config())
esClient, err := eslegclient.NewConnectedClient(outCfg.Config(), b.Info.Beat)
if err != nil {
return err
}
Expand Down Expand Up @@ -822,7 +822,7 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error {
return fmt.Errorf("error initKibanaConfig: %v", err)
}

client, err := kibana.NewKibanaClient(kibanaConfig)
client, err := kibana.NewKibanaClient(kibanaConfig, b.Info.Beat)
if err != nil {
return fmt.Errorf("error connecting to Kibana: %v", err)
}
Expand Down
27 changes: 27 additions & 0 deletions libbeat/common/transport/httpcommon/httpcommon.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ type extraOptionFunc func(*extraSettings)
func (extraOptionFunc) sealTransportOption() {}
func (fn extraOptionFunc) applyExtra(s *extraSettings) { fn(s) }

type headerRoundTripper struct {
headers map[string]string
rt http.RoundTripper
}

func (rt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
for k, v := range rt.headers {
if len(req.Header.Get(k)) == 0 {
req.Header.Set(k, v)
}
}
return rt.rt.RoundTrip(req)
}

// DefaultHTTPTransportSettings returns the default HTTP transport setting.
func DefaultHTTPTransportSettings() HTTPTransportSettings {
return HTTPTransportSettings{
Expand Down Expand Up @@ -373,6 +387,19 @@ func WithAPMHTTPInstrumentation() TransportOption {
return withAPMHTTPRountTripper
}

// HeaderRoundTripper will return a RoundTripper that sets header KVs if the key is not present.
func HeaderRoundTripper(rt http.RoundTripper, headers map[string]string) http.RoundTripper {
return &headerRoundTripper{headers, rt}
}

// WithHeaderRoundTripper instuments the HTTP client via a custom http.RoundTripper.
// This RoundTripper will add headers to each request if the key is not present.
func WithHeaderRoundTripper(headers map[string]string) TransportOption {
return WithModRoundtripper(func(rt http.RoundTripper) http.RoundTripper {
return HeaderRoundTripper(rt, headers)
})
}

// WithLogger sets the internal logger that will be used to log dial or TCP level errors.
// Logging at the connection level will only happen if the logger has been set.
func WithLogger(logger *logp.Logger) TransportOption {
Expand Down
6 changes: 3 additions & 3 deletions libbeat/dashboards/dashboards.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func ImportDashboards(
return errors.New("kibana configuration missing for loading dashboards.")
}

return setupAndImportDashboardsViaKibana(ctx, beatInfo.Hostname, kibanaConfig, &dashConfig, msgOutputter, pattern)
return setupAndImportDashboardsViaKibana(ctx, beatInfo.Hostname, beatInfo.Beat, kibanaConfig, &dashConfig, msgOutputter, pattern)
}

func setupAndImportDashboardsViaKibana(ctx context.Context, hostname string, kibanaConfig *common.Config,
func setupAndImportDashboardsViaKibana(ctx context.Context, hostname, beatname string, kibanaConfig *common.Config,
dashboardsConfig *Config, msgOutputter MessageOutputter, fields common.MapStr) error {

kibanaLoader, err := NewKibanaLoader(ctx, kibanaConfig, dashboardsConfig, hostname, msgOutputter)
kibanaLoader, err := NewKibanaLoader(ctx, kibanaConfig, dashboardsConfig, hostname, msgOutputter, beatname)
if err != nil {
return fmt.Errorf("fail to create the Kibana loader: %v", err)
}
Expand Down
10 changes: 5 additions & 5 deletions libbeat/dashboards/kibana_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ type KibanaLoader struct {
}

// NewKibanaLoader creates a new loader to load Kibana files
func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *Config, hostname string, msgOutputter MessageOutputter) (*KibanaLoader, error) {
func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *Config, hostname string, msgOutputter MessageOutputter, beatname string) (*KibanaLoader, error) {

if cfg == nil || !cfg.Enabled() {
return nil, fmt.Errorf("Kibana is not configured or enabled")
}

client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0)
client, err := getKibanaClient(ctx, cfg, dashboardsConfig.Retry, 0, beatname)
if err != nil {
return nil, fmt.Errorf("Error creating Kibana client: %v", err)
}
Expand All @@ -73,15 +73,15 @@ func NewKibanaLoader(ctx context.Context, cfg *common.Config, dashboardsConfig *
return &loader, nil
}

func getKibanaClient(ctx context.Context, cfg *common.Config, retryCfg *Retry, retryAttempt uint) (*kibana.Client, error) {
client, err := kibana.NewKibanaClient(cfg)
func getKibanaClient(ctx context.Context, cfg *common.Config, retryCfg *Retry, retryAttempt uint, beatname string) (*kibana.Client, error) {
client, err := kibana.NewKibanaClient(cfg, beatname)
if err != nil {
if retryCfg.Enabled && (retryCfg.Maximum == 0 || retryCfg.Maximum > retryAttempt) {
select {
case <-ctx.Done():
return nil, err
case <-time.After(retryCfg.Interval):
return getKibanaClient(ctx, cfg, retryCfg, retryAttempt+1)
return getKibanaClient(ctx, cfg, retryCfg, retryAttempt+1, beatname)
}
}
return nil, fmt.Errorf("Error creating Kibana client: %v", err)
Expand Down
17 changes: 13 additions & 4 deletions libbeat/esleg/eslegclient/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/elastic/beats/v7/libbeat/common/transport/httpcommon"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/common/useragent"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/testing"
)
Expand All @@ -57,7 +58,8 @@ type Connection struct {

// ConnectionSettings are the settings needed for a Connection
type ConnectionSettings struct {
URL string
URL string
Beatname string

Username string
Password string
Expand Down Expand Up @@ -110,6 +112,11 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
}
}

if s.Beatname == "" {
s.Beatname = "Libbeat"
}
userAgent := useragent.UserAgent(s.Beatname, true)

httpClient, err := s.Transport.Client(
httpcommon.WithLogger(logger),
httpcommon.WithIOStats(s.Observer),
Expand All @@ -119,6 +126,7 @@ func NewConnection(s ConnectionSettings) (*Connection, error) {
// eg, like in https://github.com/elastic/apm-server/blob/7.7/elasticsearch/client.go
return apmelasticsearch.WrapRoundTripper(rt)
}),
httpcommon.WithHeaderRoundTripper(map[string]string{"User-Agent": userAgent}),
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -160,7 +168,7 @@ func settingsWithDefaults(s ConnectionSettings) ConnectionSettings {
// configuration. It accepts the same configuration parameters as the Elasticsearch
// output, except for the output specific configuration options. If multiple hosts
// are defined in the configuration, a client is returned for each of them.
func NewClients(cfg *common.Config) ([]Connection, error) {
func NewClients(cfg *common.Config, beatname string) ([]Connection, error) {
config := defaultConfig()
if err := cfg.Unpack(&config); err != nil {
return nil, err
Expand All @@ -185,6 +193,7 @@ func NewClients(cfg *common.Config) ([]Connection, error) {

client, err := NewConnection(ConnectionSettings{
URL: esURL,
Beatname: beatname,
Kerberos: config.Kerberos,
Username: config.Username,
Password: config.Password,
Expand All @@ -205,8 +214,8 @@ func NewClients(cfg *common.Config) ([]Connection, error) {
return clients, nil
}

func NewConnectedClient(cfg *common.Config) (*Connection, error) {
clients, err := NewClients(cfg)
func NewConnectedClient(cfg *common.Config, beatname string) (*Connection, error) {
clients, err := NewClients(cfg, beatname)
if err != nil {
return nil, err
}
Expand Down
Loading