Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve monitoring reporter #8090

Merged
merged 6 commits into from
Aug 29, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff]
- Deregister pipeline loader callback when inputsRunner is stopped. {pull}7893[7893]
- Replace index patterns in TSVB visualizations. {pull}7929[7929]
- Fixed Support `add_docker_metadata` in Windows by identifying systems' path separator. {issue}7797[7797]
- Add backoff support to x-pack monitoring outputs. {issue}7966[7966]

*Auditbeat*

Expand Down
11 changes: 11 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1803,6 +1803,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1250,6 +1250,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
24 changes: 23 additions & 1 deletion libbeat/docs/monitoring/shared-monitor-config.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,21 @@ configuration option contains the following fields:
The maximum number of metrics to bulk in a single {es} bulk API index request.
The default is `50`. For more information, see <<elasticsearch-output>>.

[float]
==== `backoff.init`

The number of seconds to wait before trying to reconnect to Elasticsearch after
a network error. After waiting `backoff.init` seconds, {beatname_uc} tries to
reconnect. If the attempt fails, the backoff timer is increased exponentially up
to `backoff.max`. After a successful connection, the backoff timer is reset. The
default is 1s.

[float]
===== `backoff.max`

The maximum number of seconds to wait before attempting to connect to
Elasticsearch after a network error. The default is 60s.

[float]
==== `compression_level`

Expand Down Expand Up @@ -79,10 +94,17 @@ The password that {beatname_uc} uses to authenticate with the {es} instances for
shipping monitoring data.

[float]
==== `period`
==== `metrics.period`

The time interval (in seconds) when metrics are sent to the {es} cluster. A new
snapshot of {beatname_uc} metrics is generated and scheduled for publishing each
period. The default value is 10 * time.Second.

[float]
==== `state.period`

The time interval (in seconds) when state information are sent to the {es} cluster. A new
snapshot of {beatname_uc} state is generated and scheduled for publishing each
period. The default value is 60 * time.Second.

[float]
Expand Down
10 changes: 10 additions & 0 deletions libbeat/monitoring/report/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ type config struct {
BulkMaxSize int `config:"bulk_max_size" validate:"min=0"`
BufferSize int `config:"buffer_size"`
Tags []string `config:"tags"`
Backoff backoff `config:"backoff"`
}

type backoff struct {
Init time.Duration
Max time.Duration
}

var defaultConfig = config{
Expand All @@ -61,4 +67,8 @@ var defaultConfig = config{
BulkMaxSize: 50,
BufferSize: 50,
Tags: nil,
Backoff: backoff{
Init: 1 * time.Second,
Max: 60 * time.Second,
},
}
35 changes: 22 additions & 13 deletions libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type reporter struct {
// pipeline
pipeline *pipeline.Pipeline
client beat.Client
out outputs.Group

out []outputs.NetworkClient
}

var debugf = logp.MakeDebug("monitoring")
Expand Down Expand Up @@ -104,22 +105,21 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
params[k] = v
}

out := outputs.Group{
Clients: nil,
BatchSize: windowSize,
Retry: 0, // no retry. on error drop events
}

hosts, err := outputs.ReadHostList(cfg)
if err != nil {
return nil, err
}
if len(hosts) == 0 {
return nil, errors.New("empty hosts list")
}

var clients []outputs.NetworkClient
for _, host := range hosts {
client, err := makeClient(host, params, proxyURL, tlsConfig, &config)
if err != nil {
return nil, err
}
out.Clients = append(out.Clients, client)
clients = append(clients, client)
}

queueFactory := func(e queue.Eventer) (queue.Queue, error) {
Expand All @@ -131,18 +131,27 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {

monitoring := monitoring.Default.GetRegistry("xpack.monitoring")

outClient := outputs.NewFailoverClient(clients)
outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max)

pipeline, err := pipeline.New(
beat,
monitoring,
queueFactory, out, pipeline.Settings{
queueFactory,
outputs.Group{
Clients: []outputs.Client{outClient},
BatchSize: windowSize,
Retry: 0, // no retry. Drop event on error.
},
pipeline.Settings{
WaitClose: 0,
WaitCloseMode: pipeline.NoWaitOnClose,
})
if err != nil {
return nil, err
}

client, err := pipeline.Connect()
pipeConn, err := pipeline.Connect()
if err != nil {
pipeline.Close()
return nil, err
Expand All @@ -154,8 +163,8 @@ func makeReporter(beat beat.Info, cfg *common.Config) (report.Reporter, error) {
tags: config.Tags,
checkRetry: checkRetry,
pipeline: pipeline,
client: client,
out: out,
client: pipeConn,
out: clients,
}
go r.initLoop(config)
return r, nil
Expand All @@ -175,7 +184,7 @@ func (r *reporter) initLoop(c config) {

for {
// Select one configured endpoint by random and check if xpack is available
client := r.out.Clients[rand.Intn(len(r.out.Clients))].(outputs.NetworkClient)
client := r.out[rand.Intn(len(r.out))]
err := client.Connect()
if err == nil {
closing(client)
Expand Down
11 changes: 11 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1521,6 +1521,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down
11 changes: 11 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,17 @@ logging.files:
# The default is 50.
#bulk_max_size: 50

# The number of seconds to wait before trying to reconnect to Elasticsearch
# after a network error. After waiting backoff.init seconds, the Beat
# tries to reconnect. If the attempt fails, the backoff timer is increased
# exponentially up to backoff.max. After a successful connection, the backoff
# timer is reset. The default is 1s.
#backoff.init: 1s

# The maximum number of seconds to wait before attempting to connect to
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# Configure http request timeout before failing an request to Elasticsearch.
#timeout: 90

Expand Down