diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 7c8939d08f18..b0e6f3f9e6bd 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -128,7 +128,8 @@ func (c *publishClient) Publish(batch publisher.Batch) error { // Currently one request per event is sent. Reason is that each event can contain different // interval params and X-Pack requires to send the interval param. - _, err = c.es.BulkWith("_xpack", "monitoring", params, nil, bulk[:]) + _, err = c.es.SendMonitoringBulk(params, bulk[:]) + if err != nil { failed = append(failed, event) reason = err diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index 91eb5a298a3c..f013c2a113ac 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -23,6 +23,8 @@ import ( "io/ioutil" "net/http" "strings" + + "github.com/elastic/beats/libbeat/common" ) // MetaBuilder creates meta data for bulk requests @@ -77,6 +79,41 @@ func (conn *Connection) BulkWith( return readQueryResult(result.raw) } +// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of +// operations and sends them to Elasticsearch. The request is retransmitted up to max_retries +// before returning an error. +func (conn *Connection) SendMonitoringBulk( + params map[string]string, + body []interface{}, +) (*QueryResult, error) { + if len(body) == 0 { + return nil, nil + } + + enc := conn.encoder + enc.Reset() + if err := bulkEncode(enc, nil, body); err != nil { + return nil, err + } + + if !conn.version.IsValid() { + if err := conn.Connect(); err != nil { + return nil, err + } + } + + requ, err := newMonitoringBulkRequest(conn.version, conn.URL, params, enc) + if err != nil { + return nil, err + } + + _, result, err := conn.sendBulkRequest(requ) + if err != nil { + return nil, err + } + return readQueryResult(result.raw) +} + func newBulkRequest( urlStr string, index, docType string, @@ -88,6 +125,36 @@ func newBulkRequest( return nil, err } + return newBulkRequestWithPath(urlStr, path, params, body) +} + +func newMonitoringBulkRequest( + esVersion common.Version, + urlStr string, + params map[string]string, + body bodyEncoder, +) (*bulkRequest, error) { + var path string + var err error + if esVersion.Major < 7 { + path, err = makePath("_xpack", "monitoring", "_bulk") + } else { + path, err = makePath("_monitoring", "bulk", "") + } + + if err != nil { + return nil, err + } + + return newBulkRequestWithPath(urlStr, path, params, body) +} + +func newBulkRequestWithPath( + urlStr string, + path string, + params map[string]string, + body bodyEncoder, +) (*bulkRequest, error) { url := addToURL(urlStr, path, "", params) var reader io.Reader