Skip to content

Commit

Permalink
[7.x] Followup to 12606 (#18316) (#18520)
Browse files Browse the repository at this point in the history
* Followup to 12606 (#18316)

* Adding developer CHANGELOG entry

* Refactoring: extracting helper method

* Adding unit tests

* Consolidate event metadata field constants

* Use events.GetMetaStringValue

* Implement op_type values as enum

* Add doc strings

* Deference event pointer

* Renaming op type consts and breaking them out into own block

* Renaming type

* Using stringer

* Using go idiom instead of if-else

* Adding default op type

* Empty string for default

* Store op type enum, not string, in event metadata

* Using events.GetMetaStringValue

* Updating dev CHANGELOG entry

* Allow for op_type metadata field to be set as either string or enum

* No need for .String()

* Handle missing key case gracefully

* Update unit tests

* Update developer CHANGELOG entry

* Fixing up CHANGELOG
  • Loading branch information
ycombinator authored May 19, 2020
1 parent c36648c commit fe1a9a9
Show file tree
Hide file tree
Showing 17 changed files with 311 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Add support for MODULE environment variable in `mage goIntegTest` in metricbeat to run integration tests for a single module. {pull}17147[17147]
- Add support for a `TEST_TAGS` environment variable to add tags for tests selection following go build tags semantics, this environment variable is used by mage test targets to add build tags. Python tests can also be tagged with a decorator (`@beat.tag('sometag')`). {pull}16937[16937] {pull}17075[17075]
- Add fields validation for histogram subfields. {pull}17759[17759]
- Events intended for the Elasticsearch output can now take an `op_type` metadata field of type events.OpType or string to indicate the `op_type` to use for bulk indexing. {pull}12606[12606]
3 changes: 2 additions & 1 deletion filebeat/channel/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
Expand Down Expand Up @@ -183,7 +184,7 @@ func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = p.indexStr
event.Meta[events.FieldMetaRawIndex] = p.indexStr
return event, nil
}

Expand Down
3 changes: 2 additions & 1 deletion journalbeat/input/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/processors"
_ "github.com/elastic/beats/v7/libbeat/processors/actions"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = p.indexStr
event.Meta[events.FieldMetaRawIndex] = p.indexStr
return event, nil
}

Expand Down
13 changes: 0 additions & 13 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,6 @@ func (e *Event) SetID(id string) {
e.Meta["_id"] = id
}

func (e *Event) GetMetaStringValue(key string) (string, error) {
tmp, err := e.Meta.GetValue(key)
if err != nil {
return "", err
}

if s, ok := tmp.(string); ok {
return s, nil
}

return "", nil
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == "@timestamp" {
return e.Timestamp, nil
Expand Down
28 changes: 28 additions & 0 deletions libbeat/beat/events/optype.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package events

type OpType int

//go:generate stringer -linecomment -type OpType
const (
OpTypeDefault OpType = iota //
OpTypeCreate //create
OpTypeIndex // index
OpTypeDelete // delete
)
43 changes: 43 additions & 0 deletions libbeat/beat/events/optype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 84 additions & 0 deletions libbeat/beat/events/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package events

import "github.com/elastic/beats/v7/libbeat/beat"

const (
// FieldMetaID defines the ID for the event. Also see FieldMetaOpType.
FieldMetaID = "_id"

// FieldMetaAlias defines the index alias to use for the event. If set, it takes
// precedence over values defined using FieldMetaIndex or FieldMetaRawIndex.
FieldMetaAlias = "alias"

// FieldMetaIndex defines the base index name to use for the event. The value is suffixed
// with a Y-m-d value based on the event's timestamp. If set, it takes precedence over the
// value defined using FieldMetaRawIndex.
FieldMetaIndex = "index"

// FieldMetaRawIndex defines the raw index name to use for the event. It is used as-is, without
// any additional manipulation.
FieldMetaRawIndex = "raw_index"

// FieldMetaPipeline defines the ingest node pipeline to use for this event.
FieldMetaPipeline = "pipeline"

// FieldMetaOpType defines the metadata key name for event operation type to use with the Elasticsearch
// Bulk API encoding of the event. The key's value can be an empty string, `create`, `index`, or `delete`.
// If empty, `create` will be used if FieldMetaID is set; otherwise `index` will be used.
FieldMetaOpType = "op_type"
)

// GetMetaStringValue returns the value of the given event metadata string field
func GetMetaStringValue(e beat.Event, key string) (string, error) {
tmp, err := e.Meta.GetValue(key)
if err != nil {
return "", err
}

if s, ok := tmp.(string); ok {
return s, nil
}

return "", nil
}

// GetOpType returns the event's op_type, if set
func GetOpType(e beat.Event) OpType {
tmp, err := e.Meta.GetValue(FieldMetaOpType)
if err != nil {
return OpTypeDefault
}

switch v := tmp.(type) {
case OpType:
return v
case string:
switch v {
case "create":
return OpTypeCreate
case "index":
return OpTypeIndex
case "delete":
return OpTypeDelete
}
}

return OpTypeDefault
}
90 changes: 90 additions & 0 deletions libbeat/beat/events/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package events

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestGetMetaStringValue(t *testing.T) {
tests := map[string]struct {
event beat.Event
metaFieldPath string
expectedValue string
expectedErr error
}{
"nonexistent_field": {
beat.Event{
Meta: common.MapStr{
"foo": "bar",
},
},
"nonexistent",
"",
common.ErrKeyNotFound,
},
"root": {
beat.Event{
Meta: common.MapStr{
"foo": "bar",
"baz": "hello",
},
},
"baz",
"hello",
nil,
},
"nested": {
beat.Event{
Meta: common.MapStr{
"foo": "bar",
"baz": common.MapStr{
"qux": "hello",
},
},
},
"baz.qux",
"hello",
nil,
},
"non_string": {
beat.Event{
Meta: common.MapStr{
"foo": "bar",
"baz": 17,
},
},
"baz",
"",
nil,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
value, err := GetMetaStringValue(test.event, test.metaFieldPath)
require.Equal(t, test.expectedValue, value)
require.Equal(t, test.expectedErr, err)
})
}
}
23 changes: 9 additions & 14 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
Expand Down Expand Up @@ -352,28 +353,22 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
return ""
}

if tmp := evt.Meta["alias"]; tmp != nil {
if alias, ok := tmp.(string); ok {
return alias
}
if alias, err := events.GetMetaStringValue(*evt, events.FieldMetaAlias); err == nil {
return alias
}

if tmp := evt.Meta["index"]; tmp != nil {
if idx, ok := tmp.(string); ok {
ts := evt.Timestamp.UTC()
return fmt.Sprintf("%s-%d.%02d.%02d",
idx, ts.Year(), ts.Month(), ts.Day())
}
if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaIndex); err == nil {
ts := evt.Timestamp.UTC()
return fmt.Sprintf("%s-%d.%02d.%02d",
idx, ts.Year(), ts.Month(), ts.Day())
}

// This is functionally identical to Meta["alias"], returning the overriding
// metadata as the index name if present. It is currently used by Filebeat
// to send the index for particular inputs to formatted string templates,
// which are then expanded by a processor to the "raw_index" field.
if tmp := evt.Meta["raw_index"]; tmp != nil {
if idx, ok := tmp.(string); ok {
return idx
}
if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaRawIndex); err == nil {
return idx
}

return ""
Expand Down
13 changes: 7 additions & 6 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -200,14 +201,14 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event,
meta["_type"] = "doc"
}

action := common.MapStr{}
var opType string
opType := events.OpTypeCreate
if esVersion.LessThan(createDocPrivAvailableESVersion) {
opType = "index"
} else {
opType = "create"
opType = events.OpTypeIndex
}

action := common.MapStr{
opType.String(): meta,
}
action[opType] = meta

event.Content.Fields.Put("timestamp", event.Content.Timestamp)

Expand Down
Loading

0 comments on commit fe1a9a9

Please sign in to comment.