Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Followup to 12606 #18316

Merged
merged 22 commits into from
May 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- The disk spool types `spool.Spool` and `spool.Settings` have been renamed to the internal types `spool.diskSpool` and `spool.settings`. {pull}16693[16693]
- `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691]
- Require logger as first parameter for `outputs.transport.transport#ProxyDialer` and `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761]

- The `libbeat/outputs/transport` package has been moved to `libbeat/common/transport`. {pull}16734[16734]
- The `libbeat/outputs/tls.go` file has been removed. All exported symbols in that file (`libbeat/outputs.*`) are now available as `libbeat/common/tlscommon.*`. {pull}16734[16734]
- The newly generated Beats are using go modules to manage dependencies. {pull}16288[16288]
Expand Down Expand Up @@ -87,3 +86,4 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Add support for a `TEST_TAGS` environment variable to add tags for tests selection following go build tags semantics, this environment variable is used by mage test targets to add build tags. Python tests can also be tagged with a decorator (`@beat.tag('sometag')`). {pull}16937[16937] {pull}17075[17075]
- Add fields validation for histogram subfields. {pull}17759[17759]
- Add IP* fields to `fields.yml` generator script in Filebeat. {issue}17998[17998] {pull}18256[18256]
- Events intended for the Elasticsearch output can now take an `op_type` metadata field of type events.OpType or string to indicate the `op_type` to use for bulk indexing. {pull}12606[12606]
3 changes: 2 additions & 1 deletion filebeat/channel/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
Expand Down Expand Up @@ -195,7 +196,7 @@ func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = p.indexStr
event.Meta[events.FieldMetaRawIndex] = p.indexStr
return event, nil
}

Expand Down
3 changes: 2 additions & 1 deletion journalbeat/input/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/assert"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/processors"
_ "github.com/elastic/beats/v7/libbeat/processors/actions"
Expand Down Expand Up @@ -138,7 +139,7 @@ func (p *setRawIndex) Run(event *beat.Event) (*beat.Event, error) {
if event.Meta == nil {
event.Meta = common.MapStr{}
}
event.Meta["raw_index"] = p.indexStr
event.Meta[events.FieldMetaRawIndex] = p.indexStr
return event, nil
}

Expand Down
13 changes: 0 additions & 13 deletions libbeat/beat/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,6 @@ func (e *Event) SetID(id string) {
e.Meta["_id"] = id
}

func (e *Event) GetMetaStringValue(key string) (string, error) {
tmp, err := e.Meta.GetValue(key)
if err != nil {
return "", err
}

if s, ok := tmp.(string); ok {
return s, nil
}

return "", nil
}

func (e *Event) GetValue(key string) (interface{}, error) {
if key == "@timestamp" {
return e.Timestamp, nil
Expand Down
28 changes: 28 additions & 0 deletions libbeat/beat/events/optype.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package events

type OpType int

//go:generate stringer -linecomment -type OpType
const (
OpTypeDefault OpType = iota //
OpTypeCreate //create
OpTypeIndex // index
OpTypeDelete // delete
)
43 changes: 43 additions & 0 deletions libbeat/beat/events/optype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 84 additions & 0 deletions libbeat/beat/events/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package events

import "github.com/elastic/beats/v7/libbeat/beat"

const (
// FieldMetaID defines the ID for the event. Also see FieldMetaOpType.
FieldMetaID = "_id"

// FieldMetaAlias defines the index alias to use for the event. If set, it takes
// precedence over values defined using FieldMetaIndex or FieldMetaRawIndex.
FieldMetaAlias = "alias"

// FieldMetaIndex defines the base index name to use for the event. The value is suffixed
// with a Y-m-d value based on the event's timestamp. If set, it takes precedence over the
// value defined using FieldMetaRawIndex.
FieldMetaIndex = "index"

// FieldMetaRawIndex defines the raw index name to use for the event. It is used as-is, without
// any additional manipulation.
FieldMetaRawIndex = "raw_index"

// FieldMetaPipeline defines the ingest node pipeline to use for this event.
FieldMetaPipeline = "pipeline"
ycombinator marked this conversation as resolved.
Show resolved Hide resolved

// FieldMetaOpType defines the metadata key name for event operation type to use with the Elasticsearch
// Bulk API encoding of the event. The key's value can be an empty string, `create`, `index`, or `delete`.
// If empty, `create` will be used if FieldMetaID is set; otherwise `index` will be used.
FieldMetaOpType = "op_type"
)

// GetMetaStringValue returns the value of the given event metadata string field
func GetMetaStringValue(e beat.Event, key string) (string, error) {
tmp, err := e.Meta.GetValue(key)
if err != nil {
return "", err
}

if s, ok := tmp.(string); ok {
return s, nil
}

return "", nil
}

// GetOpType returns the event's op_type, if set
func GetOpType(e beat.Event) OpType {
tmp, err := e.Meta.GetValue(FieldMetaOpType)
if err != nil {
return OpTypeDefault
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, now I'm curious. Why is the @metadata.op_type a string? Do we read it from the log file as is?

In case the field is set internally we should prefer it to be an enum always. In case it can indeed be both let's use:

tmp, err := e.Meta.GetValue(FieldMetaOpType)
if err != nil {
  ...
}

switch v := tmp.(type) {
case OpType:
  return v, nil
case string:
 ...
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I'm not thinking about this correctly 😠. @metadata.op_type is expected to be set by developers in code. So there's no reason it shouldn't always be an enum. The only time we should use it as a string is if we need to serialize it for printing in log messages or sending in in JSON, etc. Will fix.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it makes sense to support it as 'string' input. E.g. in setups like Beat -> json file -> Filebeat -> Elasticsearch or Beat-> Kafka -> Filebeat -> Elasticsearch we want it to continue to work (JSON unfortunately hasn't that many types).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, in those setups we would need to read op_type from the metadata as well (in the Filebeat step). Will add the string input support.


switch v := tmp.(type) {
case OpType:
return v
case string:
switch v {
case "create":
return OpTypeCreate
case "index":
return OpTypeIndex
case "delete":
return OpTypeDelete
}
}

return OpTypeDefault
}
90 changes: 90 additions & 0 deletions libbeat/beat/events/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package events

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
)

func TestGetMetaStringValue(t *testing.T) {
tests := map[string]struct {
event beat.Event
metaFieldPath string
expectedValue string
expectedErr error
}{
"nonexistent_field": {
beat.Event{
Meta: common.MapStr{
"foo": "bar",
},
},
"nonexistent",
"",
common.ErrKeyNotFound,
},
"root": {
beat.Event{
Meta: common.MapStr{
"foo": "bar",
"baz": "hello",
},
},
"baz",
"hello",
nil,
},
"nested": {
beat.Event{
Meta: common.MapStr{
"foo": "bar",
"baz": common.MapStr{
"qux": "hello",
},
},
},
"baz.qux",
"hello",
nil,
},
"non_string": {
beat.Event{
Meta: common.MapStr{
"foo": "bar",
"baz": 17,
},
},
"baz",
"",
nil,
},
}

for name, test := range tests {
t.Run(name, func(t *testing.T) {
value, err := GetMetaStringValue(test.event, test.metaFieldPath)
require.Equal(t, test.expectedValue, value)
require.Equal(t, test.expectedErr, err)
})
}
}
23 changes: 9 additions & 14 deletions libbeat/idxmgmt/std.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/atomic"
"github.com/elastic/beats/v7/libbeat/idxmgmt/ilm"
Expand Down Expand Up @@ -352,28 +353,22 @@ func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
return ""
}

if tmp := evt.Meta["alias"]; tmp != nil {
if alias, ok := tmp.(string); ok {
return alias
}
if alias, err := events.GetMetaStringValue(*evt, events.FieldMetaAlias); err == nil {
return alias
}

if tmp := evt.Meta["index"]; tmp != nil {
if idx, ok := tmp.(string); ok {
ts := evt.Timestamp.UTC()
return fmt.Sprintf("%s-%d.%02d.%02d",
idx, ts.Year(), ts.Month(), ts.Day())
}
if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaIndex); err == nil {
ts := evt.Timestamp.UTC()
return fmt.Sprintf("%s-%d.%02d.%02d",
idx, ts.Year(), ts.Month(), ts.Day())
}

// This is functionally identical to Meta["alias"], returning the overriding
// metadata as the index name if present. It is currently used by Filebeat
// to send the index for particular inputs to formatted string templates,
// which are then expanded by a processor to the "raw_index" field.
if tmp := evt.Meta["raw_index"]; tmp != nil {
if idx, ok := tmp.(string); ok {
return idx
}
if idx, err := events.GetMetaStringValue(*evt, events.FieldMetaRawIndex); err == nil {
return idx
}

return ""
Expand Down
13 changes: 7 additions & 6 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/pkg/errors"

"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/logp"
Expand Down Expand Up @@ -200,14 +201,14 @@ func (c *publishClient) publishBulk(ctx context.Context, event publisher.Event,
meta["_type"] = "doc"
}

action := common.MapStr{}
var opType string
opType := events.OpTypeCreate
if esVersion.LessThan(createDocPrivAvailableESVersion) {
opType = "index"
} else {
opType = "create"
opType = events.OpTypeIndex
}

action := common.MapStr{
opType.String(): meta,
}
urso marked this conversation as resolved.
Show resolved Hide resolved
action[opType] = meta

event.Content.Fields.Put("timestamp", event.Content.Timestamp)

Expand Down
Loading