Skip to content

Commit

Permalink
Always use create op_type with ES 7.5+
Browse files Browse the repository at this point in the history
Elasticsearch introduces the `create_doc` privilege, which always
requires the op_type to be `create`. We would like to take advantage of
this, in order to reduces the privileges Beats users have to set for
Beats.

In the future Elasticsearch will support `op_type == create` if
documents without ID are indexed, but older Elasticsearch versions
don't.

This change always uses `op_type == create` when the Elasticsearch
version is 7.5+.
  • Loading branch information
urso committed Oct 11, 2019
1 parent 62c6b26 commit f0e52a8
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 4 deletions.
8 changes: 5 additions & 3 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ func (client *Client) publishEvents(
}

origCount := len(data)
data = bulkEncodePublishRequest(body, client.index, client.pipeline, eventType, data)
data = bulkEncodePublishRequest(client.GetVersion(), body, client.index, client.pipeline, eventType, data)
newCount := len(data)
if st != nil && origCount > newCount {
st.Dropped(origCount - newCount)
Expand Down Expand Up @@ -384,6 +384,7 @@ func (client *Client) publishEvents(
// fillBulkRequest encodes all bulk requests and returns slice of events
// successfully added to bulk request.
func bulkEncodePublishRequest(
version common.Version,
body bulkWriter,
index outputs.IndexSelector,
pipeline *outil.Selector,
Expand All @@ -393,7 +394,7 @@ func bulkEncodePublishRequest(
okEvents := data[:0]
for i := range data {
event := &data[i].Content
meta, err := createEventBulkMeta(index, pipeline, eventType, event)
meta, err := createEventBulkMeta(version, index, pipeline, eventType, event)
if err != nil {
logp.Err("Failed to encode event meta data: %s", err)
continue
Expand All @@ -409,6 +410,7 @@ func bulkEncodePublishRequest(
}

func createEventBulkMeta(
version common.Version,
indexSel outputs.IndexSelector,
pipelineSel *outil.Selector,
eventType string,
Expand Down Expand Up @@ -444,7 +446,7 @@ func createEventBulkMeta(
ID: id,
}

if id != "" {
if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) {
return bulkCreateAction{meta}, nil
}
return bulkIndexAction{meta}, nil
Expand Down
2 changes: 1 addition & 1 deletion libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func TestBulkEncodeEvents(t *testing.T) {

recorder := &testBulkRecorder{}

encoded := bulkEncodePublishRequest(recorder, index, pipeline, test.docType, events)
encoded := bulkEncodePublishRequest(common.Version{Major: 7, Minor: 5}, recorder, index, pipeline, test.docType, events)
assert.Equal(t, len(events), len(encoded), "all events should have been encoded")
assert.False(t, recorder.inAction, "incomplete bulk")

Expand Down

0 comments on commit f0e52a8

Please sign in to comment.