From b5e445923e035d0262921e5b3ef25dc97d1dda87 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Tue, 19 Mar 2019 15:16:26 -0700 Subject: [PATCH] Cherry-pick #11203 to 7.0: Use different monitoring bulk API paths depending on ES version (#11323) Cherry-pick of PR #11203 to 7.0 branch. Original message: Resolves #9480. Starting Elasticsearch 7.0.0, Beats should ship their monitoring data to the `_monitoring/bulk` Elasticsearch API endpoint. Prior to 7.0.0, `_xpack/monitoring/_bulk` should be used. This PR implements this version-based conditional logic. I used Wireshark to look at the ES API endpoints being hit. Running this PR with ES 8.0.0 or ES 7.0.0, I confirmed that the `POST _monitoring/bulk` endpoint was being hit: Screen Shot 2019-03-14 at 10 55 52 AM And running this PR with ES 6.7.0, I confirmed that the `POST _xpack/monitoring/_bulk` endpoint was being hit: Screen Shot 2019-03-14 at 10 56 42 AM --- .../monitoring/report/elasticsearch/client.go | 3 +- libbeat/outputs/elasticsearch/bulkapi.go | 67 +++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 7c8939d08f1..b0e6f3f9e6b 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -128,7 +128,8 @@ func (c *publishClient) Publish(batch publisher.Batch) error { // Currently one request per event is sent. Reason is that each event can contain different // interval params and X-Pack requires to send the interval param. - _, err = c.es.BulkWith("_xpack", "monitoring", params, nil, bulk[:]) + _, err = c.es.SendMonitoringBulk(params, bulk[:]) + if err != nil { failed = append(failed, event) reason = err diff --git a/libbeat/outputs/elasticsearch/bulkapi.go b/libbeat/outputs/elasticsearch/bulkapi.go index 91eb5a298a3..f013c2a113a 100644 --- a/libbeat/outputs/elasticsearch/bulkapi.go +++ b/libbeat/outputs/elasticsearch/bulkapi.go @@ -23,6 +23,8 @@ import ( "io/ioutil" "net/http" "strings" + + "github.com/elastic/beats/libbeat/common" ) // MetaBuilder creates meta data for bulk requests @@ -77,6 +79,41 @@ func (conn *Connection) BulkWith( return readQueryResult(result.raw) } +// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of +// operations and sends them to Elasticsearch. The request is retransmitted up to max_retries +// before returning an error. +func (conn *Connection) SendMonitoringBulk( + params map[string]string, + body []interface{}, +) (*QueryResult, error) { + if len(body) == 0 { + return nil, nil + } + + enc := conn.encoder + enc.Reset() + if err := bulkEncode(enc, nil, body); err != nil { + return nil, err + } + + if !conn.version.IsValid() { + if err := conn.Connect(); err != nil { + return nil, err + } + } + + requ, err := newMonitoringBulkRequest(conn.version, conn.URL, params, enc) + if err != nil { + return nil, err + } + + _, result, err := conn.sendBulkRequest(requ) + if err != nil { + return nil, err + } + return readQueryResult(result.raw) +} + func newBulkRequest( urlStr string, index, docType string, @@ -88,6 +125,36 @@ func newBulkRequest( return nil, err } + return newBulkRequestWithPath(urlStr, path, params, body) +} + +func newMonitoringBulkRequest( + esVersion common.Version, + urlStr string, + params map[string]string, + body bodyEncoder, +) (*bulkRequest, error) { + var path string + var err error + if esVersion.Major < 7 { + path, err = makePath("_xpack", "monitoring", "_bulk") + } else { + path, err = makePath("_monitoring", "bulk", "") + } + + if err != nil { + return nil, err + } + + return newBulkRequestWithPath(urlStr, path, params, body) +} + +func newBulkRequestWithPath( + urlStr string, + path string, + params map[string]string, + body bodyEncoder, +) (*bulkRequest, error) { url := addToURL(urlStr, path, "", params) var reader io.Reader