Skip to content

Commit

Permalink
Cherry-pick #11203 to 7.0: Use different monitoring bulk API paths de…
Browse files Browse the repository at this point in the history
…pending on ES version (#11323)

Cherry-pick of PR #11203 to 7.0 branch. Original message: 

Resolves #9480.

Starting Elasticsearch 7.0.0, Beats should ship their monitoring data to the `_monitoring/bulk` Elasticsearch API endpoint. Prior to 7.0.0, `_xpack/monitoring/_bulk` should be used. This PR implements this version-based conditional logic.

I used Wireshark to look at the ES API endpoints being hit.

Running this PR with ES 8.0.0 or ES 7.0.0, I confirmed that the `POST _monitoring/bulk` endpoint was being hit:

<img width="1436" alt="Screen Shot 2019-03-14 at 10 55 52 AM" src="https://user-images.githubusercontent.com/51061/54380101-ed567780-4647-11e9-8ed1-9b9020bb85d4.png">

And running this PR with ES 6.7.0, I confirmed that the `POST _xpack/monitoring/_bulk` endpoint was being hit:

<img width="1437" alt="Screen Shot 2019-03-14 at 10 56 42 AM" src="https://user-images.githubusercontent.com/51061/54380094-eaf41d80-4647-11e9-8658-d9a6ba14541b.png">
  • Loading branch information
ycombinator authored Mar 19, 2019
1 parent 6a0f8ec commit b5e4459
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 1 deletion.
3 changes: 2 additions & 1 deletion libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ func (c *publishClient) Publish(batch publisher.Batch) error {

// Currently one request per event is sent. Reason is that each event can contain different
// interval params and X-Pack requires to send the interval param.
_, err = c.es.BulkWith("_xpack", "monitoring", params, nil, bulk[:])
_, err = c.es.SendMonitoringBulk(params, bulk[:])

if err != nil {
failed = append(failed, event)
reason = err
Expand Down
67 changes: 67 additions & 0 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"io/ioutil"
"net/http"
"strings"

"github.com/elastic/beats/libbeat/common"
)

// MetaBuilder creates meta data for bulk requests
Expand Down Expand Up @@ -77,6 +79,41 @@ func (conn *Connection) BulkWith(
return readQueryResult(result.raw)
}

// SendMonitoringBulk creates a HTTP request to the X-Pack Monitoring API containing a bunch of
// operations and sends them to Elasticsearch. The request is retransmitted up to max_retries
// before returning an error.
func (conn *Connection) SendMonitoringBulk(
params map[string]string,
body []interface{},
) (*QueryResult, error) {
if len(body) == 0 {
return nil, nil
}

enc := conn.encoder
enc.Reset()
if err := bulkEncode(enc, nil, body); err != nil {
return nil, err
}

if !conn.version.IsValid() {
if err := conn.Connect(); err != nil {
return nil, err
}
}

requ, err := newMonitoringBulkRequest(conn.version, conn.URL, params, enc)
if err != nil {
return nil, err
}

_, result, err := conn.sendBulkRequest(requ)
if err != nil {
return nil, err
}
return readQueryResult(result.raw)
}

func newBulkRequest(
urlStr string,
index, docType string,
Expand All @@ -88,6 +125,36 @@ func newBulkRequest(
return nil, err
}

return newBulkRequestWithPath(urlStr, path, params, body)
}

func newMonitoringBulkRequest(
esVersion common.Version,
urlStr string,
params map[string]string,
body bodyEncoder,
) (*bulkRequest, error) {
var path string
var err error
if esVersion.Major < 7 {
path, err = makePath("_xpack", "monitoring", "_bulk")
} else {
path, err = makePath("_monitoring", "bulk", "")
}

if err != nil {
return nil, err
}

return newBulkRequestWithPath(urlStr, path, params, body)
}

func newBulkRequestWithPath(
urlStr string,
path string,
params map[string]string,
body bodyEncoder,
) (*bulkRequest, error) {
url := addToURL(urlStr, path, "", params)

var reader io.Reader
Expand Down

0 comments on commit b5e4459

Please sign in to comment.