Skip to content

Commit

Permalink
Enable require_alias for Bulk requests for all actions when target is…
Browse files Browse the repository at this point in the history
… a write alias (elastic#29879)

## What does this PR do?

This PR adds support for requiring alias when using ILM. From now on a `Selector` can tell Elasticsearch client if the target we are shipping events to is an alias or an index. By default, we consider everything an index, and only consider a target an alias when ILM is enabled.

The feature is only supported since ES 7.10, so if the user tries to connect to an older version, we cannot help them with this parameter.

## Why is it important?

We see issues around ILM sometimes where users have deleted their write alias causing running beats instances to auto-create an index where the write alias should (with auto-mappings to boot, since the template won't be applied).
  • Loading branch information
kvch authored Jan 18, 2022
1 parent 4c4400e commit 84bf434
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 19 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

*Affecting all Beats*

- Enable `require_alias` for Bulk requests for all actions when target is a write alias. {issue}27874[27874] {pull}29879[29879]


*Auditbeat*

Expand Down
9 changes: 5 additions & 4 deletions libbeat/esleg/eslegclient/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ type BulkDeleteAction struct {
}

type BulkMeta struct {
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type,omitempty" struct:"_type,omitempty"`
Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"`
ID string `json:"_id,omitempty" struct:"_id,omitempty"`
Index string `json:"_index" struct:"_index"`
DocType string `json:"_type,omitempty" struct:"_type,omitempty"`
Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"`
ID string `json:"_id,omitempty" struct:"_id,omitempty"`
RequireAlias bool `json:"require_alias,omitempty" struct:"require_alias,omitempty"`
}

type bulkRequest struct {
Expand Down
4 changes: 4 additions & 0 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,17 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) {
return idx, err
}

func (s ilmIndexSelector) IsAlias() bool { return true }

func (s indexSelector) Select(evt *beat.Event) (string, error) {
if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" {
return idx, nil
}
return s.sel.Select(evt)
}

func (s indexSelector) IsAlias() bool { return false }

func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
if len(evt.Meta) == 0 {
return ""
Expand Down
8 changes: 8 additions & 0 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,10 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev
ID: id,
}

if isRequireAliasSupported(version) {
meta.RequireAlias = client.index.IsAlias()
}

if opType == events.OpTypeDelete {
if id != "" {
return eslegclient.BulkDeleteAction{Delete: meta}, nil
Expand All @@ -333,6 +337,10 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev
return eslegclient.BulkIndexAction{Index: meta}, nil
}

func isRequireAliasSupported(version common.Version) bool {
return !version.LessThan(common.MustNewVersion("7.10.0"))
}

func (client *Client) getPipeline(event *beat.Event) (string, error) {
if event.Meta != nil {
pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline)
Expand Down
7 changes: 6 additions & 1 deletion libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,8 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu
}

info := beat.Info{Beat: "libbeat"}
im, _ := idxmgmt.DefaultSupport(nil, info, nil)
// ILM must be disabled otherwise custom index settings are ignored.
im, _ := idxmgmt.DefaultSupport(nil, info, disabledILMConfig())
output, err := makeES(im, info, stats, config)
if err != nil {
t.Fatal(err)
Expand All @@ -438,6 +439,10 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu
return client, client
}

func disabledILMConfig() *common.Config {
return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}})
}

// setupRoleMapping sets up role mapping for the Kerberos user beats@ELASTIC
func setupRoleMapping(t *testing.T, host string) error {
_, client := connectTestEsWithoutStats(t, map[string]interface{}{
Expand Down
60 changes: 46 additions & 14 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,22 +409,49 @@ func TestClientWithHeaders(t *testing.T) {

func TestBulkEncodeEvents(t *testing.T) {
cases := map[string]struct {
version string
docType string
config common.MapStr
events []common.MapStr
version string
docType string
config common.MapStr
ilmConfig *common.Config
isAlias bool
events []common.MapStr
}{
"6.x": {
version: "6.8.0",
docType: "doc",
config: common.MapStr{},
events: []common.MapStr{{"message": "test"}},
version: "6.8.0",
docType: "doc",
config: common.MapStr{},
ilmConfig: common.NewConfig(),
events: []common.MapStr{{"message": "test"}},
},
"latest": {
version: version.GetDefaultVersion(),
docType: "",
config: common.MapStr{},
events: []common.MapStr{{"message": "test"}},
"require_alias not supported": {
version: "7.9.0",
docType: "",
config: common.MapStr{},
ilmConfig: common.NewConfig(),
events: []common.MapStr{{"message": "test"}},
},
"require_alias is supported": {
version: "7.10.0",
docType: "",
config: common.MapStr{},
ilmConfig: common.NewConfig(),
isAlias: true,
events: []common.MapStr{{"message": "test"}},
},
"latest with ILM": {
version: version.GetDefaultVersion(),
docType: "",
config: common.MapStr{},
ilmConfig: common.NewConfig(),
isAlias: true,
events: []common.MapStr{{"message": "test"}},
},
"latest without ILM": {
version: version.GetDefaultVersion(),
docType: "",
config: common.MapStr{},
ilmConfig: disabledILMConfig(),
events: []common.MapStr{{"message": "test"}},
},
}

Expand All @@ -437,7 +464,7 @@ func TestBulkEncodeEvents(t *testing.T) {
Version: test.version,
}

im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig())
im, err := idxmgmt.DefaultSupport(nil, info, test.ilmConfig)
require.NoError(t, err)

index, pipeline, err := buildSelectors(im, info, cfg)
Expand Down Expand Up @@ -479,6 +506,7 @@ func TestBulkEncodeEvents(t *testing.T) {
}

assert.NotEqual(t, "", meta.Index)
assert.Equal(t, test.isAlias, meta.RequireAlias)
assert.Equal(t, test.docType, meta.DocType)
}

Expand All @@ -487,6 +515,10 @@ func TestBulkEncodeEvents(t *testing.T) {
}
}

func disabledILMConfig() *common.Config {
return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}})
}

func TestBulkEncodeEventsWithOpType(t *testing.T) {
cases := []common.MapStr{
{"_id": "111", "op_type": e.OpTypeIndex, "message": "test 1", "bulkIndex": 0},
Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/elasticsearch/death_letter_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ func (d DeadLetterSelector) Select(event *beat.Event) (string, error) {
}
return d.Selector.Select(event)
}

func (d DeadLetterSelector) IsAlias() bool { return false }
2 changes: 2 additions & 0 deletions libbeat/outputs/outil/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ func (s Selector) Select(evt *beat.Event) (string, error) {
return s.sel.sel(evt)
}

func (s Selector) IsAlias() bool { return false }

// IsEmpty checks if the selector is not configured and will always return an empty string.
func (s Selector) IsEmpty() bool {
return s.sel == nilSelector || s.sel == nil
Expand Down
2 changes: 2 additions & 0 deletions libbeat/outputs/output_reg.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,10 @@ type IndexManager interface {
}

// IndexSelector is used to find the index name an event shall be indexed to.
// It also used to check if during indexing required_alias should be set.
type IndexSelector interface {
Select(event *beat.Event) (string, error)
IsAlias() bool
}

// Group configures and combines multiple clients into load-balanced group of clients
Expand Down

0 comments on commit 84bf434

Please sign in to comment.