From a60f5fbea1176a5f0ea7b7805f0cdbb003ebe771 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 6 May 2020 06:52:44 -0700 Subject: [PATCH 01/22] Adding developer CHANGELOG entry --- CHANGELOG-developer.next.asciidoc | 2 +- libbeat/beat/events/util.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 libbeat/beat/events/util.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 31c6dfcf2b03..2c17536ace8f 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -33,7 +33,6 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - The disk spool types `spool.Spool` and `spool.Settings` have been renamed to the internal types `spool.diskSpool` and `spool.settings`. {pull}16693[16693] - `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691] - Require logger as first parameter for `outputs.transport.transport#ProxyDialer` and `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761] - - The `libbeat/outputs/transport` package has been moved to `libbeat/common/transport`. {pull}16734[16734] - The `libbeat/outputs/tls.go` file has been removed. All exported symbols in that file (`libbeat/outputs.*`) are now available as `libbeat/common/tlscommon.*`. {pull}16734[16734] - The newly generated Beats are using go modules to manage dependencies. {pull}16288[16288] @@ -87,3 +86,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Add support for a `TEST_TAGS` environment variable to add tags for tests selection following go build tags semantics, this environment variable is used by mage test targets to add build tags. Python tests can also be tagged with a decorator (`@beat.tag('sometag')`). {pull}16937[16937] {pull}17075[17075] - Add fields validation for histogram subfields. {pull}17759[17759] - Add IP* fields to `fields.yml` generator script in Filebeat. {issue}17998[17998] {pull}18256[18256] +- Events intended for the Elasticsearch output can now take an `op_type` metadata field to indicate the `op_type` to use for bulk indexing. {pull}12606[12606] diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go new file mode 100644 index 000000000000..b3adf695ccdb --- /dev/null +++ b/libbeat/beat/events/util.go @@ -0,0 +1 @@ +package events From f2f3b652ab8c3e8d90899e2d69b6e7d70226ba0d Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 6 May 2020 06:53:04 -0700 Subject: [PATCH 02/22] Refactoring: extracting helper method --- libbeat/beat/event.go | 13 ---------- libbeat/beat/events/util.go | 33 +++++++++++++++++++++++++ libbeat/outputs/elasticsearch/client.go | 10 ++++---- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 183b56b1ce88..4ef560420396 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -54,19 +54,6 @@ func (e *Event) SetID(id string) { e.Meta["_id"] = id } -func (e *Event) GetMetaStringValue(key string) (string, error) { - tmp, err := e.Meta.GetValue(key) - if err != nil { - return "", err - } - - if s, ok := tmp.(string); ok { - return s, nil - } - - return "", nil -} - func (e *Event) GetValue(key string) (interface{}, error) { if key == "@timestamp" { return e.Timestamp, nil diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go index b3adf695ccdb..60ec3cf44f53 100644 --- a/libbeat/beat/events/util.go +++ b/libbeat/beat/events/util.go @@ -1 +1,34 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package events + +import "github.com/elastic/beats/v7/libbeat/beat" + +// GetMetaStringValue returns the value of the given event metadata string field +func GetMetaStringValue(e beat.Event, key string) (string, error) { + tmp, err := e.Meta.GetValue(key) + if err != nil { + return "", err + } + + if s, ok := tmp.(string); ok { + return s, nil + } + + return "", nil +} diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 7a8be4c34b3c..c381f09465df 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -27,15 +27,15 @@ import ( "go.elastic.co/apm" - "github.com/elastic/beats/v7/libbeat/testing" - "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/logp" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/outputs/outil" "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/beats/v7/libbeat/testing" ) // Client is an elasticsearch client. @@ -290,7 +290,7 @@ func bulkEncodePublishRequest( log.Errorf("Failed to encode event meta data: %+v", err) continue } - if opType, err := event.GetMetaStringValue(opTypeKey); err == nil && opType == opTypeDelete { + if opType, err := events.GetMetaStringValue(*event, opTypeKey); err == nil && opType == opTypeDelete { // We don't include the event source in a bulk DELETE bulkItems = append(bulkItems, meta) } else { @@ -325,8 +325,8 @@ func createEventBulkMeta( return nil, err } - id, _ := event.GetMetaStringValue("_id") - opType, _ := event.GetMetaStringValue(opTypeKey) + id, _ := events.GetMetaStringValue(*event, "_id") + opType, _ := events.GetMetaStringValue(*event, opTypeKey) meta := eslegclient.BulkMeta{ Index: index, From cc9c0978fda839e4b10b9bebabfcfa119476e70c Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 6 May 2020 07:01:16 -0700 Subject: [PATCH 03/22] Adding unit tests --- libbeat/beat/events/util_test.go | 90 ++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 libbeat/beat/events/util_test.go diff --git a/libbeat/beat/events/util_test.go b/libbeat/beat/events/util_test.go new file mode 100644 index 000000000000..d9c138130b7c --- /dev/null +++ b/libbeat/beat/events/util_test.go @@ -0,0 +1,90 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package events + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" +) + +func TestGetMetaStringValue(t *testing.T) { + tests := map[string]struct { + event beat.Event + metaFieldPath string + expectedValue string + expectedErr error + }{ + "nonexistent_field": { + beat.Event{ + Meta: common.MapStr{ + "foo": "bar", + }, + }, + "nonexistent", + "", + common.ErrKeyNotFound, + }, + "root": { + beat.Event{ + Meta: common.MapStr{ + "foo": "bar", + "baz": "hello", + }, + }, + "baz", + "hello", + nil, + }, + "nested": { + beat.Event{ + Meta: common.MapStr{ + "foo": "bar", + "baz": common.MapStr{ + "qux": "hello", + }, + }, + }, + "baz.qux", + "hello", + nil, + }, + "non_string": { + beat.Event{ + Meta: common.MapStr{ + "foo": "bar", + "baz": 17, + }, + }, + "baz", + "", + nil, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + value, err := GetMetaStringValue(test.event, test.metaFieldPath) + require.Equal(t, test.expectedValue, value) + require.Equal(t, test.expectedErr, err) + }) + } +} From ff1126450cd49ee7038885eef60ef08751c621e0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 7 May 2020 13:37:20 -0700 Subject: [PATCH 04/22] Consolidate event metadata field constants --- filebeat/channel/runner_test.go | 3 ++- journalbeat/input/input_test.go | 3 ++- libbeat/beat/events/util.go | 16 ++++++++++++++ libbeat/idxmgmt/std.go | 7 +++--- .../monitoring/report/elasticsearch/client.go | 5 +++-- libbeat/outputs/elasticsearch/client.go | 22 ++++++------------- libbeat/outputs/elasticsearch/client_test.go | 7 +++--- .../processors/actions/decode_json_fields.go | 3 ++- .../add_formatted_index.go | 3 ++- .../script/javascript/beatevent_v0_test.go | 3 ++- metricbeat/mb/event_test.go | 3 ++- .../function/beater/proccessors_test.go | 3 ++- 12 files changed, 48 insertions(+), 30 deletions(-) diff --git a/filebeat/channel/runner_test.go b/filebeat/channel/runner_test.go index 101904b9260c..cf42a38e4b8e 100644 --- a/filebeat/channel/runner_test.go +++ b/filebeat/channel/runner_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/actions" @@ -195,7 +196,7 @@ func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) { if event.Meta == nil { event.Meta = common.MapStr{} } - event.Meta["raw_index"] = p.indexStr + event.Meta[events.FieldMetaRawIndex] = p.indexStr return event, nil } diff --git a/journalbeat/input/input_test.go b/journalbeat/input/input_test.go index f80688b786ef..eb925f1ef0b1 100644 --- a/journalbeat/input/input_test.go +++ b/journalbeat/input/input_test.go @@ -25,6 +25,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/processors" _ "github.com/elastic/beats/v7/libbeat/processors/actions" @@ -138,7 +139,7 @@ func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) { if event.Meta == nil { event.Meta = common.MapStr{} } - event.Meta["raw_index"] = p.indexStr + event.Meta[events.FieldMetaRawIndex] = p.indexStr return event, nil } diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go index 60ec3cf44f53..914963d5d87a 100644 --- a/libbeat/beat/events/util.go +++ b/libbeat/beat/events/util.go @@ -19,6 +19,22 @@ package events import "github.com/elastic/beats/v7/libbeat/beat" +const ( + FieldMetaID = "_id" + FieldMetaIndex = "index" + FieldMetaRawIndex = "raw_index" + FieldMetaAlias = "alias" + FieldMetaPipeline = "pipeline" + + // FieldMetaOpType defines the metadata key name for event operation type. + // The key's value can be an empty string, `create`, `index`, or `delete`. If empty, it will assume + // either `create` or `index`. See `createEventBulkMeta`. If in doubt, set explicitly. + FieldMetaOpType = "op_type" + FieldMetaOpTypeCreate = "create" + FieldMetaOpTypeDelete = "delete" + FieldMetaOpTypeIndex = "index" +) + // GetMetaStringValue returns the value of the given event metadata string field func GetMetaStringValue(e beat.Event, key string) (string, error) { tmp, err := e.Meta.GetValue(key) diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index a6aff9af9d34..20b5b0e7d30b 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -22,6 +22,7 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/atomic" "github.com/elastic/beats/v7/libbeat/idxmgmt/ilm" @@ -352,13 +353,13 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { return "" } - if tmp := evt.Meta["alias"]; tmp != nil { + if tmp := evt.Meta[events.FieldMetaAlias]; tmp != nil { if alias, ok := tmp.(string); ok { return alias } } - if tmp := evt.Meta["index"]; tmp != nil { + if tmp := evt.Meta[events.FieldMetaIndex]; tmp != nil { if idx, ok := tmp.(string); ok { ts := evt.Timestamp.UTC() return fmt.Sprintf("%s-%d.%02d.%02d", @@ -370,7 +371,7 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { // metadata as the index name if present. It is currently used by Filebeat // to send the index for particular inputs to formatted string templates, // which are then expanded by a processor to the "raw_index" field. - if tmp := evt.Meta["raw_index"]; tmp != nil { + if tmp := evt.Meta[events.FieldMetaRawIndex]; tmp != nil { if idx, ok := tmp.(string); ok { return idx } diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index fb83a2e636bc..540663d22c97 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -28,6 +28,7 @@ import ( "github.com/pkg/errors" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/logp" @@ -203,9 +204,9 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, action := common.MapStr{} var opType string if esVersion.LessThan(createDocPrivAvailableESVersion) { - opType = "index" + opType = events.FieldMetaOpTypeIndex } else { - opType = "create" + opType = events.FieldMetaOpTypeCreate } action[opType] = meta diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index c381f09465df..fe11f7f2747b 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -68,16 +68,8 @@ type bulkResultStats struct { const ( defaultEventType = "doc" - opTypeCreate = "create" - opTypeDelete = "delete" - opTypeIndex = "index" ) -// opTypeKey defines the metadata key name for event operation type. -// The key's value can be an empty string, `create`, `index`, or `delete`. If empty, it will assume -// either `create` or `index`. See `createEventBulkMeta`. If in doubt, set explicitly. -const opTypeKey = "op_type" - // NewClient instantiates a new client. func NewClient( s ClientSettings, @@ -290,7 +282,7 @@ func bulkEncodePublishRequest( log.Errorf("Failed to encode event meta data: %+v", err) continue } - if opType, err := events.GetMetaStringValue(*event, opTypeKey); err == nil && opType == opTypeDelete { + if opType, err := events.GetMetaStringValue(*event, events.FieldMetaOpType); err == nil && opType == events.FieldMetaOpTypeDelete { // We don't include the event source in a bulk DELETE bulkItems = append(bulkItems, meta) } else { @@ -325,8 +317,8 @@ func createEventBulkMeta( return nil, err } - id, _ := events.GetMetaStringValue(*event, "_id") - opType, _ := events.GetMetaStringValue(*event, opTypeKey) + id, _ := events.GetMetaStringValue(*event, events.FieldMetaID) + opType, _ := events.GetMetaStringValue(*event, events.FieldMetaOpType) meta := eslegclient.BulkMeta{ Index: index, @@ -335,15 +327,15 @@ func createEventBulkMeta( ID: id, } - if opType == opTypeDelete { + if opType == events.FieldMetaOpTypeDelete { if id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil } else { - return nil, fmt.Errorf("%s %s requires _id", opTypeKey, opTypeDelete) + return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.FieldMetaOpTypeDelete) } } if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { - if opType == opTypeIndex { + if opType == events.FieldMetaOpTypeIndex { return eslegclient.BulkIndexAction{Index: meta}, nil } return eslegclient.BulkCreateAction{Create: meta}, nil @@ -353,7 +345,7 @@ func createEventBulkMeta( func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) { if event.Meta != nil { - if pipeline, exists := event.Meta["pipeline"]; exists { + if pipeline, exists := event.Meta[events.FieldMetaPipeline]; exists { if p, ok := pipeline.(string); ok { return p, nil } diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 125105ea69bf..d9a0f89c1053 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -32,6 +32,7 @@ import ( "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" + e "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/idxmgmt" @@ -368,12 +369,12 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { caseMessage, _ := cases[i]["message"].(string) switch bulkItems[bulkEventIndex].(type) { case eslegclient.BulkCreateAction: - validOpTypes := []string{opTypeCreate, ""} + validOpTypes := []string{e.FieldMetaOpTypeCreate, ""} require.Contains(t, validOpTypes, caseOpType, caseMessage) case eslegclient.BulkIndexAction: - require.Equal(t, opTypeIndex, caseOpType, caseMessage) + require.Equal(t, e.FieldMetaOpTypeIndex, caseOpType, caseMessage) case eslegclient.BulkDeleteAction: - require.Equal(t, opTypeDelete, caseOpType, caseMessage) + require.Equal(t, e.FieldMetaOpTypeDelete, caseOpType, caseMessage) default: require.FailNow(t, "unknown type") } diff --git a/libbeat/processors/actions/decode_json_fields.go b/libbeat/processors/actions/decode_json_fields.go index b9ea3440db9f..f0e3db61b3e2 100644 --- a/libbeat/processors/actions/decode_json_fields.go +++ b/libbeat/processors/actions/decode_json_fields.go @@ -26,6 +26,7 @@ import ( "github.com/pkg/errors" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/jsontransform" "github.com/elastic/beats/v7/libbeat/logp" @@ -159,7 +160,7 @@ func (f *decodeJSONFields) Run(event *beat.Event) (*beat.Event, error) { if event.Meta == nil { event.Meta = common.MapStr{} } - event.Meta["_id"] = id + event.Meta[events.FieldMetaID] = id } } diff --git a/libbeat/processors/add_formatted_index/add_formatted_index.go b/libbeat/processors/add_formatted_index/add_formatted_index.go index 72be2a89775c..bd4e542b14f7 100644 --- a/libbeat/processors/add_formatted_index/add_formatted_index.go +++ b/libbeat/processors/add_formatted_index/add_formatted_index.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/fmtstr" ) @@ -48,7 +49,7 @@ func (p *AddFormattedIndex) Run(event *beat.Event) (*beat.Event, error) { if event.Meta == nil { event.Meta = common.MapStr{} } - event.Meta["raw_index"] = index + event.Meta[events.FieldMetaRawIndex] = index return event, nil } diff --git a/libbeat/processors/script/javascript/beatevent_v0_test.go b/libbeat/processors/script/javascript/beatevent_v0_test.go index 030a260d4240..5d77e802a1be 100644 --- a/libbeat/processors/script/javascript/beatevent_v0_test.go +++ b/libbeat/processors/script/javascript/beatevent_v0_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/monitoring" "github.com/elastic/beats/v7/libbeat/tests/resources" @@ -128,7 +129,7 @@ var eventV0Tests = []testCase{ name: "Delete @metadata", source: `evt.Delete("@metadata.pipeline");`, assert: func(t testing.TB, evt *beat.Event, err error) { - assert.Nil(t, evt.Meta["pipeline"]) + assert.Nil(t, evt.Meta[events.FieldMetaPipeline]) }, }, { diff --git a/metricbeat/mb/event_test.go b/metricbeat/mb/event_test.go index 3de07034fca6..50d6857c7774 100644 --- a/metricbeat/mb/event_test.go +++ b/metricbeat/mb/event_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" ) @@ -139,7 +140,7 @@ func TestEventConversionToBeatEvent(t *testing.T) { e := mbEvent.BeatEvent(module, metricSet) e = mbEvent.BeatEvent(module, metricSet) - assert.Equal(t, "foobar", e.Meta["_id"]) + assert.Equal(t, "foobar", e.Meta[events.FieldMetaID]) assert.Equal(t, timestamp, e.Timestamp) assert.Equal(t, common.MapStr{ "type": "docker", diff --git a/x-pack/functionbeat/function/beater/proccessors_test.go b/x-pack/functionbeat/function/beater/proccessors_test.go index 3a18aa82b02a..c38649fe2e2a 100644 --- a/x-pack/functionbeat/function/beater/proccessors_test.go +++ b/x-pack/functionbeat/function/beater/proccessors_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/beat/events" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/processors" _ "github.com/elastic/beats/v7/libbeat/processors/actions" @@ -123,7 +124,7 @@ func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) { if event.Meta == nil { event.Meta = common.MapStr{} } - event.Meta["raw_index"] = p.indexStr + event.Meta[events.FieldMetaRawIndex] = p.indexStr return event, nil } From 611c877b36e7f72de18d91ae3240ca9d94fef9a0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 11 May 2020 07:46:05 -0700 Subject: [PATCH 05/22] Use events.GetMetaStringValue --- libbeat/idxmgmt/std.go | 22 ++++++++-------------- 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index 20b5b0e7d30b..65854af7b75c 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -353,28 +353,22 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { return "" } - if tmp := evt.Meta[events.FieldMetaAlias]; tmp != nil { - if alias, ok := tmp.(string); ok { - return alias - } + if alias, err := events.GetMetaStringValue(evt, events.FieldMetaAlias); err == nil { + return alias } - if tmp := evt.Meta[events.FieldMetaIndex]; tmp != nil { - if idx, ok := tmp.(string); ok { - ts := evt.Timestamp.UTC() - return fmt.Sprintf("%s-%d.%02d.%02d", - idx, ts.Year(), ts.Month(), ts.Day()) - } + if idx, err := events.GetMetaStringValue(evt, events.FieldMetaIndex); err == nil { + ts := evt.Timestamp.UTC() + return fmt.Sprintf("%s-%d.%02d.%02d", + idx, ts.Year(), ts.Month(), ts.Day()) } // This is functionally identical to Meta["alias"], returning the overriding // metadata as the index name if present. It is currently used by Filebeat // to send the index for particular inputs to formatted string templates, // which are then expanded by a processor to the "raw_index" field. - if tmp := evt.Meta[events.FieldMetaRawIndex]; tmp != nil { - if idx, ok := tmp.(string); ok { - return idx - } + if idx, err := events.GetMetaStringValue(evt, events.FieldMetaRawIndex); err == nil { + return idx } return "" From 5ef7bd9251e0afbed3f9e49a45faad83a37c708f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 11 May 2020 08:02:38 -0700 Subject: [PATCH 06/22] Implement op_type values as enum --- libbeat/beat/events/util.go | 15 +++++++++++---- libbeat/monitoring/report/elasticsearch/client.go | 4 ++-- libbeat/outputs/elasticsearch/client.go | 8 ++++---- libbeat/outputs/elasticsearch/client_test.go | 6 +++--- 4 files changed, 20 insertions(+), 13 deletions(-) diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go index 914963d5d87a..d6dcb6e10a69 100644 --- a/libbeat/beat/events/util.go +++ b/libbeat/beat/events/util.go @@ -29,12 +29,19 @@ const ( // FieldMetaOpType defines the metadata key name for event operation type. // The key's value can be an empty string, `create`, `index`, or `delete`. If empty, it will assume // either `create` or `index`. See `createEventBulkMeta`. If in doubt, set explicitly. - FieldMetaOpType = "op_type" - FieldMetaOpTypeCreate = "create" - FieldMetaOpTypeDelete = "delete" - FieldMetaOpTypeIndex = "index" + FieldMetaOpType = "op_type" + + FieldMetaOpTypeCreate MetaOpType = iota + FieldMetaOpTypeDelete + FieldMetaOpTypeIndex ) +type MetaOpType int + +func (o MetaOpType) String() string { + return []string{"create", "delete", "index"}[o] +} + // GetMetaStringValue returns the value of the given event metadata string field func GetMetaStringValue(e beat.Event, key string) (string, error) { tmp, err := e.Meta.GetValue(key) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 540663d22c97..ccfc3cef4331 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -202,13 +202,13 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, } action := common.MapStr{} - var opType string + var opType events.MetaOpType if esVersion.LessThan(createDocPrivAvailableESVersion) { opType = events.FieldMetaOpTypeIndex } else { opType = events.FieldMetaOpTypeCreate } - action[opType] = meta + action[opType.String()] = meta event.Content.Fields.Put("timestamp", event.Content.Timestamp) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index fe11f7f2747b..23c455112de4 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -282,7 +282,7 @@ func bulkEncodePublishRequest( log.Errorf("Failed to encode event meta data: %+v", err) continue } - if opType, err := events.GetMetaStringValue(*event, events.FieldMetaOpType); err == nil && opType == events.FieldMetaOpTypeDelete { + if opType, err := events.GetMetaStringValue(*event, events.FieldMetaOpType); err == nil && opType == events.FieldMetaOpTypeDelete.String() { // We don't include the event source in a bulk DELETE bulkItems = append(bulkItems, meta) } else { @@ -327,15 +327,15 @@ func createEventBulkMeta( ID: id, } - if opType == events.FieldMetaOpTypeDelete { + if opType == events.FieldMetaOpTypeDelete.String() { if id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil } else { - return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.FieldMetaOpTypeDelete) + return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.FieldMetaOpTypeDelete.String()) } } if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { - if opType == events.FieldMetaOpTypeIndex { + if opType == events.FieldMetaOpTypeIndex.String() { return eslegclient.BulkIndexAction{Index: meta}, nil } return eslegclient.BulkCreateAction{Create: meta}, nil diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index d9a0f89c1053..79e30abb5988 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -369,12 +369,12 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { caseMessage, _ := cases[i]["message"].(string) switch bulkItems[bulkEventIndex].(type) { case eslegclient.BulkCreateAction: - validOpTypes := []string{e.FieldMetaOpTypeCreate, ""} + validOpTypes := []string{e.FieldMetaOpTypeCreate.String(), ""} require.Contains(t, validOpTypes, caseOpType, caseMessage) case eslegclient.BulkIndexAction: - require.Equal(t, e.FieldMetaOpTypeIndex, caseOpType, caseMessage) + require.Equal(t, e.FieldMetaOpTypeIndex.String(), caseOpType, caseMessage) case eslegclient.BulkDeleteAction: - require.Equal(t, e.FieldMetaOpTypeDelete, caseOpType, caseMessage) + require.Equal(t, e.FieldMetaOpTypeDelete.String(), caseOpType, caseMessage) default: require.FailNow(t, "unknown type") } From 8914d751d91868ebbf18d5942f826e2a1e4c4333 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 11 May 2020 09:13:43 -0700 Subject: [PATCH 07/22] Add doc strings --- libbeat/beat/events/util.go | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go index d6dcb6e10a69..378d75489968 100644 --- a/libbeat/beat/events/util.go +++ b/libbeat/beat/events/util.go @@ -20,15 +20,28 @@ package events import "github.com/elastic/beats/v7/libbeat/beat" const ( - FieldMetaID = "_id" - FieldMetaIndex = "index" + // FieldMetaID defines the ID for the event. Also see FieldMetaOpType. + FieldMetaID = "_id" + + // FieldMetaAlias defines the index alias to use for the event. If set, it takes + // precedence over values defined using FieldMetaIndex or FieldMetaRawIndex. + FieldMetaAlias = "alias" + + // FieldMetaIndex defines the base index name to use for the event. The value is suffixed + // with a Y-m-d value based on the event's timestamp. If set, it takes precedence over the + // value defined using FieldMetaRawIndex. + FieldMetaIndex = "index" + + // FieldMetaRawIndex defines the raw index name to use for the event. It is used as-is, without + // any additional manipulation. FieldMetaRawIndex = "raw_index" - FieldMetaAlias = "alias" + + // FieldMetaPipeline defines the ingest node pipeline to use for this event. FieldMetaPipeline = "pipeline" - // FieldMetaOpType defines the metadata key name for event operation type. - // The key's value can be an empty string, `create`, `index`, or `delete`. If empty, it will assume - // either `create` or `index`. See `createEventBulkMeta`. If in doubt, set explicitly. + // FieldMetaOpType defines the metadata key name for event operation type to use with the Elasticsearch + // Bulk API encoding of the event. The key's value can be an empty string, `create`, `index`, or `delete`. + // If empty, `create` will be used if FieldMetaID is set; otherwise `index` will be used. FieldMetaOpType = "op_type" FieldMetaOpTypeCreate MetaOpType = iota From 4d0259aeaf87b94139f1a67fe7fba3f0e517b251 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Mon, 11 May 2020 09:21:08 -0700 Subject: [PATCH 08/22] Deference event pointer --- libbeat/idxmgmt/std.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/idxmgmt/std.go b/libbeat/idxmgmt/std.go index 65854af7b75c..06a566468072 100644 --- a/libbeat/idxmgmt/std.go +++ b/libbeat/idxmgmt/std.go @@ -353,11 +353,11 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { return "" } - if alias, err := events.GetMetaStringValue(evt, events.FieldMetaAlias); err == nil { + if alias, err := events.GetMetaStringValue(*evt, events.FieldMetaAlias); err == nil { return alias } - if idx, err := events.GetMetaStringValue(evt, events.FieldMetaIndex); err == nil { + if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaIndex); err == nil { ts := evt.Timestamp.UTC() return fmt.Sprintf("%s-%d.%02d.%02d", idx, ts.Year(), ts.Month(), ts.Day()) @@ -367,7 +367,7 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string { // metadata as the index name if present. It is currently used by Filebeat // to send the index for particular inputs to formatted string templates, // which are then expanded by a processor to the "raw_index" field. - if idx, err := events.GetMetaStringValue(evt, events.FieldMetaRawIndex); err == nil { + if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaRawIndex); err == nil { return idx } From fcda77de7d51c95f73061d9dde84e12fbb0186f1 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 01:49:00 -0700 Subject: [PATCH 09/22] Renaming op type consts and breaking them out into own block --- libbeat/beat/events/util.go | 8 +++++--- libbeat/monitoring/report/elasticsearch/client.go | 4 ++-- libbeat/outputs/elasticsearch/client.go | 8 ++++---- libbeat/outputs/elasticsearch/client_test.go | 6 +++--- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go index 378d75489968..152cb513a596 100644 --- a/libbeat/beat/events/util.go +++ b/libbeat/beat/events/util.go @@ -43,10 +43,12 @@ const ( // Bulk API encoding of the event. The key's value can be an empty string, `create`, `index`, or `delete`. // If empty, `create` will be used if FieldMetaID is set; otherwise `index` will be used. FieldMetaOpType = "op_type" +) - FieldMetaOpTypeCreate MetaOpType = iota - FieldMetaOpTypeDelete - FieldMetaOpTypeIndex +const ( + OpTypeCreate MetaOpType = iota + OpTypeDelete + OpTypeIndex ) type MetaOpType int diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index ccfc3cef4331..5dc19672df63 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -204,9 +204,9 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, action := common.MapStr{} var opType events.MetaOpType if esVersion.LessThan(createDocPrivAvailableESVersion) { - opType = events.FieldMetaOpTypeIndex + opType = events.OpTypeIndex } else { - opType = events.FieldMetaOpTypeCreate + opType = events.OpTypeCreate } action[opType.String()] = meta diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 23c455112de4..6cf9f8af9e76 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -282,7 +282,7 @@ func bulkEncodePublishRequest( log.Errorf("Failed to encode event meta data: %+v", err) continue } - if opType, err := events.GetMetaStringValue(*event, events.FieldMetaOpType); err == nil && opType == events.FieldMetaOpTypeDelete.String() { + if opType, err := events.GetMetaStringValue(*event, events.FieldMetaOpType); err == nil && opType == events.OpTypeDelete.String() { // We don't include the event source in a bulk DELETE bulkItems = append(bulkItems, meta) } else { @@ -327,15 +327,15 @@ func createEventBulkMeta( ID: id, } - if opType == events.FieldMetaOpTypeDelete.String() { + if opType == events.OpTypeDelete.String() { if id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil } else { - return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.FieldMetaOpTypeDelete.String()) + return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.OpTypeDelete.String()) } } if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { - if opType == events.FieldMetaOpTypeIndex.String() { + if opType == events.OpTypeIndex.String() { return eslegclient.BulkIndexAction{Index: meta}, nil } return eslegclient.BulkCreateAction{Create: meta}, nil diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index 79e30abb5988..d4277e6ef833 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -369,12 +369,12 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { caseMessage, _ := cases[i]["message"].(string) switch bulkItems[bulkEventIndex].(type) { case eslegclient.BulkCreateAction: - validOpTypes := []string{e.FieldMetaOpTypeCreate.String(), ""} + validOpTypes := []string{e.OpTypeCreate.String(), ""} require.Contains(t, validOpTypes, caseOpType, caseMessage) case eslegclient.BulkIndexAction: - require.Equal(t, e.FieldMetaOpTypeIndex.String(), caseOpType, caseMessage) + require.Equal(t, e.OpTypeIndex.String(), caseOpType, caseMessage) case eslegclient.BulkDeleteAction: - require.Equal(t, e.FieldMetaOpTypeDelete.String(), caseOpType, caseMessage) + require.Equal(t, e.OpTypeDelete.String(), caseOpType, caseMessage) default: require.FailNow(t, "unknown type") } From 5eb058e4e7a43c57a06f60028356111f364aa37a Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 01:49:44 -0700 Subject: [PATCH 10/22] Renaming type --- libbeat/beat/events/util.go | 8 ++++---- libbeat/monitoring/report/elasticsearch/client.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go index 152cb513a596..24616499d833 100644 --- a/libbeat/beat/events/util.go +++ b/libbeat/beat/events/util.go @@ -45,15 +45,15 @@ const ( FieldMetaOpType = "op_type" ) +type OpType int + const ( - OpTypeCreate MetaOpType = iota + OpTypeCreate OpType = iota OpTypeDelete OpTypeIndex ) -type MetaOpType int - -func (o MetaOpType) String() string { +func (o OpType) String() string { return []string{"create", "delete", "index"}[o] } diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 5dc19672df63..40f216716f1c 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -202,7 +202,7 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, } action := common.MapStr{} - var opType events.MetaOpType + var opType events.OpType if esVersion.LessThan(createDocPrivAvailableESVersion) { opType = events.OpTypeIndex } else { From 583b42e6e67ad308a8295ff6a33dec5aa7434b8f Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 01:58:05 -0700 Subject: [PATCH 11/22] Using stringer --- libbeat/beat/events/optype.go | 27 ++++++++++++++++++ libbeat/beat/events/optype_string.go | 42 ++++++++++++++++++++++++++++ libbeat/beat/events/util.go | 12 -------- 3 files changed, 69 insertions(+), 12 deletions(-) create mode 100644 libbeat/beat/events/optype.go create mode 100644 libbeat/beat/events/optype_string.go diff --git a/libbeat/beat/events/optype.go b/libbeat/beat/events/optype.go new file mode 100644 index 000000000000..b0116079df6c --- /dev/null +++ b/libbeat/beat/events/optype.go @@ -0,0 +1,27 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package events + +type OpType int + +//go:generate stringer -linecomment -type OpType +const ( + OpTypeCreate OpType = iota //create + OpTypeIndex // index + OpTypeDelete // delete +) diff --git a/libbeat/beat/events/optype_string.go b/libbeat/beat/events/optype_string.go new file mode 100644 index 000000000000..9d02dd83280d --- /dev/null +++ b/libbeat/beat/events/optype_string.go @@ -0,0 +1,42 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Code generated by "stringer -linecomment -type OpType"; DO NOT EDIT. + +package events + +import "strconv" + +func _() { + // An "invalid array index" compiler error signifies that the constant values have changed. + // Re-run the stringer command to generate them again. + var x [1]struct{} + _ = x[OpTypeCreate-0] + _ = x[OpTypeIndex-1] + _ = x[OpTypeDelete-2] +} + +const _OpType_name = "createindexdelete" + +var _OpType_index = [...]uint8{0, 6, 11, 17} + +func (i OpType) String() string { + if i < 0 || i >= OpType(len(_OpType_index)-1) { + return "OpType(" + strconv.FormatInt(int64(i), 10) + ")" + } + return _OpType_name[_OpType_index[i]:_OpType_index[i+1]] +} diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go index 24616499d833..9ec565c7ba43 100644 --- a/libbeat/beat/events/util.go +++ b/libbeat/beat/events/util.go @@ -45,18 +45,6 @@ const ( FieldMetaOpType = "op_type" ) -type OpType int - -const ( - OpTypeCreate OpType = iota - OpTypeDelete - OpTypeIndex -) - -func (o OpType) String() string { - return []string{"create", "delete", "index"}[o] -} - // GetMetaStringValue returns the value of the given event metadata string field func GetMetaStringValue(e beat.Event, key string) (string, error) { tmp, err := e.Meta.GetValue(key) From 50f3ee5628fd1641396d2d8b9715f156b31717fb Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 02:00:09 -0700 Subject: [PATCH 12/22] Using go idiom instead of if-else --- libbeat/monitoring/report/elasticsearch/client.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/libbeat/monitoring/report/elasticsearch/client.go b/libbeat/monitoring/report/elasticsearch/client.go index 40f216716f1c..e4f7bb500366 100644 --- a/libbeat/monitoring/report/elasticsearch/client.go +++ b/libbeat/monitoring/report/elasticsearch/client.go @@ -201,14 +201,14 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event, meta["_type"] = "doc" } - action := common.MapStr{} - var opType events.OpType + opType := events.OpTypeCreate if esVersion.LessThan(createDocPrivAvailableESVersion) { opType = events.OpTypeIndex - } else { - opType = events.OpTypeCreate } - action[opType.String()] = meta + + action := common.MapStr{ + opType.String(): meta, + } event.Content.Fields.Put("timestamp", event.Content.Timestamp) From 8551c2cdaa338b16879aab5537172788eb18eba4 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 02:04:02 -0700 Subject: [PATCH 13/22] Adding default op type --- libbeat/beat/events/optype.go | 7 ++++--- libbeat/beat/events/optype_string.go | 11 ++++++----- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/libbeat/beat/events/optype.go b/libbeat/beat/events/optype.go index b0116079df6c..9a6612d7ad36 100644 --- a/libbeat/beat/events/optype.go +++ b/libbeat/beat/events/optype.go @@ -21,7 +21,8 @@ type OpType int //go:generate stringer -linecomment -type OpType const ( - OpTypeCreate OpType = iota //create - OpTypeIndex // index - OpTypeDelete // delete + OpTypeDefault OpType = iota //default + OpTypeCreate //create + OpTypeIndex // index + OpTypeDelete // delete ) diff --git a/libbeat/beat/events/optype_string.go b/libbeat/beat/events/optype_string.go index 9d02dd83280d..5a4a9ed0ca97 100644 --- a/libbeat/beat/events/optype_string.go +++ b/libbeat/beat/events/optype_string.go @@ -25,14 +25,15 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} - _ = x[OpTypeCreate-0] - _ = x[OpTypeIndex-1] - _ = x[OpTypeDelete-2] + _ = x[OpTypeDefault-0] + _ = x[OpTypeCreate-1] + _ = x[OpTypeIndex-2] + _ = x[OpTypeDelete-3] } -const _OpType_name = "createindexdelete" +const _OpType_name = "defaultcreateindexdelete" -var _OpType_index = [...]uint8{0, 6, 11, 17} +var _OpType_index = [...]uint8{0, 7, 13, 18, 24} func (i OpType) String() string { if i < 0 || i >= OpType(len(_OpType_index)-1) { From 8b1f6ea8c460554172188da21c8fec8dc1e97ad1 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 02:08:17 -0700 Subject: [PATCH 14/22] Empty string for default --- libbeat/beat/events/optype.go | 2 +- libbeat/beat/events/optype_string.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/libbeat/beat/events/optype.go b/libbeat/beat/events/optype.go index 9a6612d7ad36..2ffb83d8f36c 100644 --- a/libbeat/beat/events/optype.go +++ b/libbeat/beat/events/optype.go @@ -21,7 +21,7 @@ type OpType int //go:generate stringer -linecomment -type OpType const ( - OpTypeDefault OpType = iota //default + OpTypeDefault OpType = iota // OpTypeCreate //create OpTypeIndex // index OpTypeDelete // delete diff --git a/libbeat/beat/events/optype_string.go b/libbeat/beat/events/optype_string.go index 5a4a9ed0ca97..e13401c73cd6 100644 --- a/libbeat/beat/events/optype_string.go +++ b/libbeat/beat/events/optype_string.go @@ -31,9 +31,9 @@ func _() { _ = x[OpTypeDelete-3] } -const _OpType_name = "defaultcreateindexdelete" +const _OpType_name = "createindexdelete" -var _OpType_index = [...]uint8{0, 7, 13, 18, 24} +var _OpType_index = [...]uint8{0, 0, 6, 11, 17} func (i OpType) String() string { if i < 0 || i >= OpType(len(_OpType_index)-1) { From 49acde0e68a7b08e761e1b18cbca3773108f8be7 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 02:14:41 -0700 Subject: [PATCH 15/22] Store op type enum, not string, in event metadata --- libbeat/beat/events/util.go | 19 +++++++++++++++++++ libbeat/outputs/elasticsearch/client.go | 8 ++++---- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go index 9ec565c7ba43..9f46f31a3651 100644 --- a/libbeat/beat/events/util.go +++ b/libbeat/beat/events/util.go @@ -58,3 +58,22 @@ func GetMetaStringValue(e beat.Event, key string) (string, error) { return "", nil } + +// GetOpType returns the event's op_type, if set +func GetOpType(e beat.Event) OpType { + opType, err := GetMetaStringValue(e, FieldMetaOpType) + if err != nil { + return OpTypeDefault + } + + switch opType { + case "create": + return OpTypeCreate + case "index": + return OpTypeIndex + case "delete": + return OpTypeDelete + default: + return OpTypeDefault + } +} diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 6cf9f8af9e76..1b74abd379c6 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -282,7 +282,7 @@ func bulkEncodePublishRequest( log.Errorf("Failed to encode event meta data: %+v", err) continue } - if opType, err := events.GetMetaStringValue(*event, events.FieldMetaOpType); err == nil && opType == events.OpTypeDelete.String() { + if opType := events.GetOpType(*event); opType == events.OpTypeDelete { // We don't include the event source in a bulk DELETE bulkItems = append(bulkItems, meta) } else { @@ -318,7 +318,7 @@ func createEventBulkMeta( } id, _ := events.GetMetaStringValue(*event, events.FieldMetaID) - opType, _ := events.GetMetaStringValue(*event, events.FieldMetaOpType) + opType := events.GetOpType(*event) meta := eslegclient.BulkMeta{ Index: index, @@ -327,7 +327,7 @@ func createEventBulkMeta( ID: id, } - if opType == events.OpTypeDelete.String() { + if opType == events.OpTypeDelete { if id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil } else { @@ -335,7 +335,7 @@ func createEventBulkMeta( } } if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { - if opType == events.OpTypeIndex.String() { + if opType == events.OpTypeIndex { return eslegclient.BulkIndexAction{Index: meta}, nil } return eslegclient.BulkCreateAction{Create: meta}, nil From 9b0debc9ba9d6d078dc8fc2469a795950ff9acfb Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 02:16:15 -0700 Subject: [PATCH 16/22] Using events.GetMetaStringValue --- libbeat/outputs/elasticsearch/client.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 1b74abd379c6..23042cf4cff6 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -345,12 +345,12 @@ func createEventBulkMeta( func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) { if event.Meta != nil { - if pipeline, exists := event.Meta[events.FieldMetaPipeline]; exists { - if p, ok := pipeline.(string); ok { - return p, nil - } + pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) + if err != nil { return "", errors.New("pipeline metadata is no string") } + + return pipeline, nil } if pipelineSel != nil { From 16a9eaa5130846dbb823dcedac7212bacfb792c9 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 02:19:58 -0700 Subject: [PATCH 17/22] Updating dev CHANGELOG entry --- CHANGELOG-developer.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 2c17536ace8f..d893d662e11e 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -86,4 +86,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Add support for a `TEST_TAGS` environment variable to add tags for tests selection following go build tags semantics, this environment variable is used by mage test targets to add build tags. Python tests can also be tagged with a decorator (`@beat.tag('sometag')`). {pull}16937[16937] {pull}17075[17075] - Add fields validation for histogram subfields. {pull}17759[17759] - Add IP* fields to `fields.yml` generator script in Filebeat. {issue}17998[17998] {pull}18256[18256] -- Events intended for the Elasticsearch output can now take an `op_type` metadata field to indicate the `op_type` to use for bulk indexing. {pull}12606[12606] +- Events intended for the Elasticsearch output can now take an `op_type` metadata field of type events.OpType to indicate the `op_type` to use for bulk indexing. {pull}12606[12606] From 1f4fd5d1851095201e8b3aecce99859dec1eab53 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 11:24:16 -0700 Subject: [PATCH 18/22] Allow for op_type metadata field to be set as either string or enum --- libbeat/beat/events/util.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/libbeat/beat/events/util.go b/libbeat/beat/events/util.go index 9f46f31a3651..46967c82d71b 100644 --- a/libbeat/beat/events/util.go +++ b/libbeat/beat/events/util.go @@ -61,19 +61,24 @@ func GetMetaStringValue(e beat.Event, key string) (string, error) { // GetOpType returns the event's op_type, if set func GetOpType(e beat.Event) OpType { - opType, err := GetMetaStringValue(e, FieldMetaOpType) + tmp, err := e.Meta.GetValue(FieldMetaOpType) if err != nil { return OpTypeDefault } - switch opType { - case "create": - return OpTypeCreate - case "index": - return OpTypeIndex - case "delete": - return OpTypeDelete - default: - return OpTypeDefault + switch v := tmp.(type) { + case OpType: + return v + case string: + switch v { + case "create": + return OpTypeCreate + case "index": + return OpTypeIndex + case "delete": + return OpTypeDelete + } } + + return OpTypeDefault } From 5c02b87b6bdbb60ece5a884b87be581d2d1d0e67 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 11:24:36 -0700 Subject: [PATCH 19/22] No need for .String() --- libbeat/outputs/elasticsearch/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index 23042cf4cff6..b88a15cda68d 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -331,7 +331,7 @@ func createEventBulkMeta( if id != "" { return eslegclient.BulkDeleteAction{Delete: meta}, nil } else { - return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.OpTypeDelete.String()) + return nil, fmt.Errorf("%s %s requires _id", events.FieldMetaOpType, events.OpTypeDelete) } } if id != "" || version.Major > 7 || (version.Major == 7 && version.Minor >= 5) { From 0a7e6b5b67152812c4b98f15a84cb4d8f0ed8a11 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 11:24:46 -0700 Subject: [PATCH 20/22] Handle missing key case gracefully --- libbeat/outputs/elasticsearch/client.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/libbeat/outputs/elasticsearch/client.go b/libbeat/outputs/elasticsearch/client.go index b88a15cda68d..4a3c71df3bfc 100644 --- a/libbeat/outputs/elasticsearch/client.go +++ b/libbeat/outputs/elasticsearch/client.go @@ -346,6 +346,9 @@ func createEventBulkMeta( func getPipeline(event *beat.Event, pipelineSel *outil.Selector) (string, error) { if event.Meta != nil { pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline) + if err == common.ErrKeyNotFound { + return "", nil + } if err != nil { return "", errors.New("pipeline metadata is no string") } From 440e8c9b17bcc78a9cb8d3e5bfa934e35265a7b0 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 11:24:56 -0700 Subject: [PATCH 21/22] Update unit tests --- libbeat/outputs/elasticsearch/client_test.go | 35 +++++++++++--------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/libbeat/outputs/elasticsearch/client_test.go b/libbeat/outputs/elasticsearch/client_test.go index d4277e6ef833..db152bf9045a 100644 --- a/libbeat/outputs/elasticsearch/client_test.go +++ b/libbeat/outputs/elasticsearch/client_test.go @@ -322,12 +322,12 @@ func TestBulkEncodeEvents(t *testing.T) { func TestBulkEncodeEventsWithOpType(t *testing.T) { cases := []common.MapStr{ - {"_id": "111", "op_type": "index", "message": "test 1", "bulkIndex": 0}, - {"_id": "112", "op_type": "", "message": "test 2", "bulkIndex": 2}, - {"_id": "", "op_type": "delete", "message": "test 6", "bulkIndex": -1}, // this won't get encoded due to missing _id - {"_id": "", "op_type": "", "message": "test 3", "bulkIndex": 4}, - {"_id": "114", "op_type": "delete", "message": "test 4", "bulkIndex": 6}, - {"_id": "115", "op_type": "index", "message": "test 5", "bulkIndex": 7}, + {"_id": "111", "op_type": e.OpTypeIndex, "message": "test 1", "bulkIndex": 0}, + {"_id": "112", "message": "test 2", "bulkIndex": 2}, + {"_id": "", "op_type": e.OpTypeDelete, "message": "test 6", "bulkIndex": -1}, // this won't get encoded due to missing _id + {"_id": "", "message": "test 3", "bulkIndex": 4}, + {"_id": "114", "op_type": e.OpTypeDelete, "message": "test 4", "bulkIndex": 6}, + {"_id": "115", "op_type": e.OpTypeIndex, "message": "test 5", "bulkIndex": 7}, } cfg := common.MustNewConfigFrom(common.MapStr{}) @@ -344,16 +344,21 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { events := make([]publisher.Event, len(cases)) for i, fields := range cases { + meta := common.MapStr{ + "_id": fields["_id"], + } + if opType, exists := fields["op_type"]; exists { + meta[e.FieldMetaOpType] = opType + } + events[i] = publisher.Event{ Content: beat.Event{ - Meta: common.MapStr{ - "_id": fields["_id"], - "op_type": fields["op_type"], - }, + Meta: meta, Fields: common.MapStr{ "message": fields["message"], }, - }} + }, + } } encoded, bulkItems := bulkEncodePublishRequest(logp.L(), *common.MustNewVersion(version.GetDefaultVersion()), index, pipeline, events) @@ -365,16 +370,16 @@ func TestBulkEncodeEventsWithOpType(t *testing.T) { if bulkEventIndex == -1 { continue } - caseOpType, _ := cases[i]["op_type"].(string) + caseOpType, _ := cases[i]["op_type"] caseMessage, _ := cases[i]["message"].(string) switch bulkItems[bulkEventIndex].(type) { case eslegclient.BulkCreateAction: - validOpTypes := []string{e.OpTypeCreate.String(), ""} + validOpTypes := []interface{}{e.OpTypeCreate, nil} require.Contains(t, validOpTypes, caseOpType, caseMessage) case eslegclient.BulkIndexAction: - require.Equal(t, e.OpTypeIndex.String(), caseOpType, caseMessage) + require.Equal(t, e.OpTypeIndex, caseOpType, caseMessage) case eslegclient.BulkDeleteAction: - require.Equal(t, e.OpTypeDelete.String(), caseOpType, caseMessage) + require.Equal(t, e.OpTypeDelete, caseOpType, caseMessage) default: require.FailNow(t, "unknown type") } From 85c6e54408abd00a37e5bad784511bf45f73026c Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Wed, 13 May 2020 11:27:33 -0700 Subject: [PATCH 22/22] Update developer CHANGELOG entry --- CHANGELOG-developer.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index d893d662e11e..b89bd78fbbbb 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -86,4 +86,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only. - Add support for a `TEST_TAGS` environment variable to add tags for tests selection following go build tags semantics, this environment variable is used by mage test targets to add build tags. Python tests can also be tagged with a decorator (`@beat.tag('sometag')`). {pull}16937[16937] {pull}17075[17075] - Add fields validation for histogram subfields. {pull}17759[17759] - Add IP* fields to `fields.yml` generator script in Filebeat. {issue}17998[17998] {pull}18256[18256] -- Events intended for the Elasticsearch output can now take an `op_type` metadata field of type events.OpType to indicate the `op_type` to use for bulk indexing. {pull}12606[12606] +- Events intended for the Elasticsearch output can now take an `op_type` metadata field of type events.OpType or string to indicate the `op_type` to use for bulk indexing. {pull}12606[12606]