diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 421e9a2d723..68b414ee4a3 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -327,7 +327,7 @@ func (client *Client) publishEvents( } origCount := len(data) - data = bulkEncodePublishRequest(body, client.index, client.pipeline, eventType, data) + data = bulkEncodePublishRequest(client.GetVersion(), body, client.index, client.pipeline, eventType, data) newCount := len(data) if st != nil && origCount > newCount { st.Dropped(origCount - newCount) @@ -384,6 +384,7 @@ func (client *Client) publishEvents( // fillBulkRequest encodes all bulk requests and returns slice of events // successfully added to bulk request. func bulkEncodePublishRequest( + version common.Version, body bulkWriter, index outputs.IndexSelector, pipeline *outil.Selector, @@ -393,7 +394,7 @@ func bulkEncodePublishRequest( okEvents := data[:0] for i := range data { event := &data[i].Content - meta, err := createEventBulkMeta(index, pipeline, eventType, event) + meta, err := createEventBulkMeta(version, index, pipeline, eventType, event) if err != nil { logp.Err("Failed to encode event meta data: %s", err) continue @@ -409,6 +410,7 @@ func bulkEncodePublishRequest( } func createEventBulkMeta( + version common.Version, indexSel outputs.IndexSelector, pipelineSel *outil.Selector, eventType string, @@ -444,7 +446,7 @@ func createEventBulkMeta( ID: id, } - if id != "" { + if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { return bulkCreateAction{meta}, nil } return bulkIndexAction{meta}, nil diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 27825c65353..098753cbc23 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -388,7 +388,7 @@ func TestBulkEncodeEvents(t *testing.T) { recorder := &testBulkRecorder{} - encoded := bulkEncodePublishRequest(recorder, index, pipeline, test.docType, events) + encoded := bulkEncodePublishRequest(common.Version{Major: 7, Minor: 5}, recorder, index, pipeline, test.docType, events) assert.Equal(t, len(events), len(encoded), "all events should have been encoded") assert.False(t, recorder.inAction, "incomplete bulk")