diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 88562b2df6ee..8eeac98795f8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -36,6 +36,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Affecting all Beats* +- Enable `require_alias` for Bulk requests for all actions when target is a write alias. {issue}27874[27874] {pull}29879[29879] + *Auditbeat* diff --git a/libbeat/esleg/eslegclient/bulkapi.go b/libbeat/esleg/eslegclient/bulkapi.go index 70118d57fd5d..44027a837d3c 100644 --- a/libbeat/esleg/eslegclient/bulkapi.go +++ b/libbeat/esleg/eslegclient/bulkapi.go @@ -51,10 +51,11 @@ type BulkDeleteAction struct { } type BulkMeta struct { - Index string `json:"_index" struct:"_index"` - DocType string `json:"_type,omitempty" struct:"_type,omitempty"` - Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"` - ID string `json:"_id,omitempty" struct:"_id,omitempty"` + Index string `json:"_index" struct:"_index"` + DocType string `json:"_type,omitempty" struct:"_type,omitempty"` + Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"` + ID string `json:"_id,omitempty" struct:"_id,omitempty"` + RequireAlias bool `json:"require_alias,omitempty" struct:"require_alias,omitempty"` } type bulkRequest struct { diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index b1f0071ade25..d8f032c4f422 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -337,6 +337,8 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) { return idx, err } +func (s ilmIndexSelector) IsAlias() bool { return true } + func (s indexSelector) Select(evt *beat.Event) (string, error) { if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" { return idx, nil @@ -344,6 +346,8 @@ func (s indexSelector) Select(evt *beat.Event) (string, error) { return s.sel.Select(evt) } +func (s indexSelector) IsAlias() bool { return false } + func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { if len(evt.Meta) == 0 { return "" diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index deab29c3dcd8..3900bdeb4794 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -317,6 +317,10 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev ID: id, } + if isRequireAliasSupported(version) { + meta.RequireAlias = client.index.IsAlias() + } + if opType == events.OpTypeDelete { if id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil @@ -333,6 +337,10 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev return eslegclient.BulkIndexAction{Index: meta}, nil } +func isRequireAliasSupported(version common.Version) bool { + return !version.LessThan(common.MustNewVersion("7.10.0")) +} + func (client *Client) getPipeline(event *beat.Event) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) diff --git a/libbeat/outputs/elasticsearch/client_integration_test.go b/libbeat/outputs/elasticsearch/client_integration_test.go index 2b05f8a3cdb4..1bf5d99ce290 100644 --- a/libbeat/outputs/elasticsearch/client_integration_test.go +++ b/libbeat/outputs/elasticsearch/client_integration_test.go @@ -420,7 +420,8 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu } info := beat.Info{Beat: "libbeat"} - im, _ := idxmgmt.DefaultSupport(nil, info, nil) + // ILM must be disabled otherwise custom index settings are ignored. + im, _ := idxmgmt.DefaultSupport(nil, info, disabledILMConfig()) output, err := makeES(im, info, stats, config) if err != nil { t.Fatal(err) @@ -438,6 +439,10 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu return client, client } +func disabledILMConfig() *common.Config { + return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}}) +} + // setupRoleMapping sets up role mapping for the Kerberos user beats@ELASTIC func setupRoleMapping(t *testing.T, host string) error { _, client := connectTestEsWithoutStats(t, map[string]interface{}{ diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 2a03d10481dd..9cdd43dca08f 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -409,22 +409,49 @@ func TestClientWithHeaders(t *testing.T) { func TestBulkEncodeEvents(t *testing.T) { cases := map[string]struct { - version string - docType string - config common.MapStr - events []common.MapStr + version string + docType string + config common.MapStr + ilmConfig *common.Config + isAlias bool + events []common.MapStr }{ "6.x": { - version: "6.8.0", - docType: "doc", - config: common.MapStr{}, - events: []common.MapStr{{"message": "test"}}, + version: "6.8.0", + docType: "doc", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + events: []common.MapStr{{"message": "test"}}, }, - "latest": { - version: version.GetDefaultVersion(), - docType: "", - config: common.MapStr{}, - events: []common.MapStr{{"message": "test"}}, + "require_alias not supported": { + version: "7.9.0", + docType: "", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + events: []common.MapStr{{"message": "test"}}, + }, + "require_alias is supported": { + version: "7.10.0", + docType: "", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + isAlias: true, + events: []common.MapStr{{"message": "test"}}, + }, + "latest with ILM": { + version: version.GetDefaultVersion(), + docType: "", + config: common.MapStr{}, + ilmConfig: common.NewConfig(), + isAlias: true, + events: []common.MapStr{{"message": "test"}}, + }, + "latest without ILM": { + version: version.GetDefaultVersion(), + docType: "", + config: common.MapStr{}, + ilmConfig: disabledILMConfig(), + events: []common.MapStr{{"message": "test"}}, }, } @@ -437,7 +464,7 @@ func TestBulkEncodeEvents(t *testing.T) { Version: test.version, } - im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig()) + im, err := idxmgmt.DefaultSupport(nil, info, test.ilmConfig) require.NoError(t, err) index, pipeline, err := buildSelectors(im, info, cfg) @@ -479,6 +506,7 @@ func TestBulkEncodeEvents(t *testing.T) { } assert.NotEqual(t, "", meta.Index) + assert.Equal(t, test.isAlias, meta.RequireAlias) assert.Equal(t, test.docType, meta.DocType) } @@ -487,6 +515,10 @@ func TestBulkEncodeEvents(t *testing.T) { } } +func disabledILMConfig() *common.Config { + return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}}) +} + func TestBulkEncodeEventsWithOpType(t *testing.T) { cases := []common.MapStr{ {"_id": "111", "op_type": e.OpTypeIndex, "message": "test 1", "bulkIndex": 0}, diff --git a/libbeat/outputs/elasticsearch/death_letter_selector.go b/libbeat/outputs/elasticsearch/death_letter_selector.go index 02bd3780cab7..34184c80c1ad 100644 --- a/libbeat/outputs/elasticsearch/death_letter_selector.go +++ b/libbeat/outputs/elasticsearch/death_letter_selector.go @@ -34,3 +34,5 @@ func (d DeadLetterSelector) Select(event *beat.Event) (string, error) { } return d.Selector.Select(event) } + +func (d DeadLetterSelector) IsAlias() bool { return false } diff --git a/libbeat/outputs/outil/select.go b/libbeat/outputs/outil/select.go index 1615a3bdb112..ebe55674e4aa 100644 --- a/libbeat/outputs/outil/select.go +++ b/libbeat/outputs/outil/select.go @@ -87,6 +87,8 @@ func (s Selector) Select(evt *beat.Event) (string, error) { return s.sel.sel(evt) } +func (s Selector) IsAlias() bool { return false } + // IsEmpty checks if the selector is not configured and will always return an empty string. func (s Selector) IsEmpty() bool { return s.sel == nilSelector || s.sel == nil diff --git a/libbeat/outputs/output_reg.go b/libbeat/outputs/output_reg.go index 86c1323c505b..f4abc63298ab 100644 --- a/libbeat/outputs/output_reg.go +++ b/libbeat/outputs/output_reg.go @@ -43,8 +43,10 @@ type IndexManager interface { } // IndexSelector is used to find the index name an event shall be indexed to. +// It also used to check if during indexing required_alias should be set. type IndexSelector interface { Select(event *beat.Event) (string, error) + IsAlias() bool } // Group configures and combines multiple clients into load-balanced group of clients