From 14da4086cab07a522b4d5db2e5c9ed2ee906df73 Mon Sep 17 00:00:00 2001 From: Jaime Soriano Pastor Date: Tue, 3 Jul 2018 08:31:57 +0200 Subject: [PATCH] libbeat: Refactor error handling in schema.Apply() (#7335) `schema.Apply()` returned a `schema.Errors` object as error, with this object is not easy to check if there was really an error, specially when optional fields are involved. This leads to lots of cases where these errors were being ignored. This change introduces a series of changes to have better control on the errors happening during the application of an schema. The interface is kept as `event, err := schema.Apply(data)`, but now the error returned is always a plain error in case of one or more errors happened, or nil if no error happened. `schema.ApplyTo()` returns the list of errors, and can be used where multiple schemas are applied for the same event. `Apply()` and `ApplyTo()` can now receive a list of options, these options are functions that can be used to filter the errors returned or to access the unfiltered list of errors. This allows different combinations of resulting errors in case of optional or required fields. It also allows to add additional checks on all the errors happened, even on the ones ignored for the final result. Three options are added by now: * `AllRequired` to fail if any non-optional field is missing, this mimics the current behaviour, and is set by default if no other option is set. * `FailOnRequired` to fail if any required field is missing. A new `Required` option is also added to be able to declare required fields. * `NotFoundKeys(cb func([]string))` is an option builder that receives a function, this function is called with the list of missing keys, so additional custom checks or tests can be done. For lists of errors, the `multierror` library is used now, some custom errors have been added to help on previous modifications and to improve some error messages. Errors on structured data contain now the full paths of the fields. --- CHANGELOG-developer.asciidoc | 1 + CHANGELOG.asciidoc | 1 + .../providers/jolokia/discovery.go | 18 ++- libbeat/common/schema/error.go | 81 +++++++--- libbeat/common/schema/error_test.go | 37 ----- libbeat/common/schema/errors.go | 87 ---------- libbeat/common/schema/errors_test.go | 32 ---- .../common/schema/mapstriface/mapstriface.go | 89 +++++++---- .../schema/mapstriface/mapstriface_test.go | 133 +++++++++++++++- libbeat/common/schema/mapstrstr/mapstrstr.go | 16 +- .../common/schema/mapstrstr/mapstrstr_test.go | 62 +++++++- libbeat/common/schema/options.go | 76 +++++++++ libbeat/common/schema/options_test.go | 148 ++++++++++++++++++ libbeat/common/schema/schema.go | 60 ++++--- metricbeat/module/apache/status/data.go | 6 +- .../module/apache/status/status_test.go | 4 +- metricbeat/module/elasticsearch/index/data.go | 15 +- .../elasticsearch/index_summary/data.go | 15 +- .../elasticsearch/index_summary/data_test.go | 1 + metricbeat/module/elasticsearch/node/data.go | 15 +- .../module/elasticsearch/node/data_test.go | 11 +- .../module/elasticsearch/node_stats/data.go | 12 +- .../_meta/test/invalid_format.json | 2 +- .../elasticsearch/pending_tasks/data.go | 18 ++- .../elasticsearch/pending_tasks/data_test.go | 38 ++--- metricbeat/module/elasticsearch/shard/data.go | 1 - .../module/elasticsearch/shard/data_xpack.go | 6 +- metricbeat/module/elasticsearch/testing.go | 21 +-- metricbeat/module/kibana/stats/data.go | 9 +- metricbeat/module/kibana/stats/data_test.go | 10 +- metricbeat/module/logstash/node/data.go | 2 +- metricbeat/module/logstash/node_stats/data.go | 2 +- metricbeat/module/rabbitmq/connection/data.go | 18 ++- metricbeat/module/rabbitmq/exchange/data.go | 16 +- metricbeat/module/rabbitmq/node/data.go | 6 +- metricbeat/module/rabbitmq/queue/data.go | 20 ++- 36 files changed, 713 insertions(+), 376 deletions(-) delete mode 100644 libbeat/common/schema/error_test.go delete mode 100644 libbeat/common/schema/errors.go delete mode 100644 libbeat/common/schema/errors_test.go create mode 100644 libbeat/common/schema/options.go create mode 100644 libbeat/common/schema/options_test.go diff --git a/CHANGELOG-developer.asciidoc b/CHANGELOG-developer.asciidoc index 2858f667405..54fdb5e7697 100644 --- a/CHANGELOG-developer.asciidoc +++ b/CHANGELOG-developer.asciidoc @@ -33,3 +33,4 @@ The list below covers the major changes between 6.3.0 and master only. ==== Added - Libbeat provides a global registry for beats developer that allow to register and retrieve plugin. {pull}7392[7392] +- Added more options to control required and optional fields in schema.Apply(), error returned is a plain nil if no error happened {pull}7335[7335] diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 25274dc66e1..3cb72f465aa 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -170,6 +170,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff] - Add ability to define input configuration as stringified JSON for autodiscover. {pull}7372[7372] - Add processor definition support for hints builder {pull}7386[7386] - Add support to disable html escaping in outputs. {pull}7445[7445] +- Refactor error handing in schema.Apply(). {pull}7335[7335] - Add additional types to kubernetes metadata {pull}7457[7457] *Auditbeat* diff --git a/libbeat/autodiscover/providers/jolokia/discovery.go b/libbeat/autodiscover/providers/jolokia/discovery.go index d702cb947ee..fc704dafb16 100644 --- a/libbeat/autodiscover/providers/jolokia/discovery.go +++ b/libbeat/autodiscover/providers/jolokia/discovery.go @@ -58,14 +58,14 @@ import ( // Message contains the information of a Jolokia Discovery message var messageSchema = s.Schema{ "agent": s.Object{ - "id": c.Str("agent_id"), - "version": c.Str("agent_version", s.Optional), + "id": c.Str("agent_id", s.Required), + "version": c.Str("agent_version"), }, - "secured": c.Bool("secured", s.Optional), + "secured": c.Bool("secured"), "server": s.Object{ - "product": c.Str("server_product", s.Optional), - "vendor": c.Str("server_vendor", s.Optional), - "version": c.Str("server_version", s.Optional), + "product": c.Str("server_product"), + "vendor": c.Str("server_vendor"), + "version": c.Str("server_version"), }, "url": c.Str("url"), } @@ -245,7 +245,11 @@ func (d *Discovery) sendProbe(config InterfaceConfig) { logp.Err(err.Error()) continue } - message, _ := messageSchema.Apply(m) + message, err := messageSchema.Apply(m, s.FailOnRequired) + if err != nil { + logp.Err(err.Error()) + continue + } d.update(config, message) } }() diff --git a/libbeat/common/schema/error.go b/libbeat/common/schema/error.go index 85e2dfb0027..05be95abe15 100644 --- a/libbeat/common/schema/error.go +++ b/libbeat/common/schema/error.go @@ -17,37 +17,76 @@ package schema -import "fmt" - -const ( - RequiredType ErrorType = iota - OptionalType ErrorType = iota +import ( + "fmt" ) -type ErrorType int +// KeyError is an error with a field key +type KeyError interface { + Key() string + SetKey(k string) +} + +type errorKey struct { + key string +} + +// Key returns the value of the field key +func (k *errorKey) Key() string { + return k.key +} + +// SetKey sets the value of the field key +func (k *errorKey) SetKey(v string) { + k.key = v +} + +// KeyNotFoundError is an error happening when a field key is not found +type KeyNotFoundError struct { + errorKey + + Err error + Optional bool + Required bool +} -type Error struct { - key string - message string - errorType ErrorType +// NewKeyNotFoundError builds a KeyNotFoundError +func NewKeyNotFoundError(key string) *KeyNotFoundError { + var e KeyNotFoundError + e.SetKey(key) + return &e } -func NewError(key string, message string) *Error { - return &Error{ - key: key, - message: message, - errorType: RequiredType, +// Error returns the error message of a KeyNotFoundError +func (err *KeyNotFoundError) Error() string { + msg := fmt.Sprintf("key `%s` not found", err.Key()) + if err.Err != nil { + msg += ": " + err.Err.Error() } + return msg } -func (err *Error) SetType(errorType ErrorType) { - err.errorType = errorType +// WrongFormatError is an error happening when a field format is incorrect +type WrongFormatError struct { + errorKey + + Msg string } -func (err *Error) IsType(errorType ErrorType) bool { - return err.errorType == errorType +// NewWrongFormatError builds a new WrongFormatError +func NewWrongFormatError(key string, msg string) *WrongFormatError { + e := WrongFormatError{ + Msg: msg, + } + e.SetKey(key) + return &e } -func (err *Error) Error() string { - return fmt.Sprintf("Missing field: %s, Error: %s", err.key, err.message) +// Error returns the error message of a WrongFormatError +func (err *WrongFormatError) Error() string { + msg := fmt.Sprintf("wrong format in `%s`", err.Key()) + if err.Msg != "" { + msg += ": " + err.Msg + } + return msg } diff --git a/libbeat/common/schema/error_test.go b/libbeat/common/schema/error_test.go deleted file mode 100644 index 50f9afaf9a3..00000000000 --- a/libbeat/common/schema/error_test.go +++ /dev/null @@ -1,37 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package schema - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestIsError(t *testing.T) { - err := NewError("test", "Hello World") - assert.Error(t, err) -} - -func TestType(t *testing.T) { - err := NewError("test", "Hello World") - assert.True(t, err.IsType(RequiredType)) - - err.SetType(OptionalType) - assert.True(t, err.IsType(OptionalType)) -} diff --git a/libbeat/common/schema/errors.go b/libbeat/common/schema/errors.go deleted file mode 100644 index 660a6910691..00000000000 --- a/libbeat/common/schema/errors.go +++ /dev/null @@ -1,87 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package schema - -import ( - "strings" - - "github.com/elastic/beats/libbeat/logp" -) - -type Errors []Error - -func NewErrors() *Errors { - return &Errors{} -} - -func (errs *Errors) AddError(err *Error) { - *errs = append(*errs, *err) -} - -func (errs *Errors) AddErrors(errors *Errors) { - if errors == nil { - return - } - *errs = append(*errs, *errors...) -} - -func (errs *Errors) HasRequiredErrors() bool { - for _, err := range *errs { - if err.IsType(RequiredType) { - return true - } - } - return false -} - -func (errs *Errors) Error() string { - error := "Required fields are missing: " - for _, err := range *errs { - if err.IsType(RequiredType) { - error = error + "," + err.key - } - } - return error -} - -// Log logs all missing required and optional fields to the debug log. -func (errs *Errors) Log() { - if len(*errs) == 0 { - return - } - var optional, required []string - - for _, err := range *errs { - if err.IsType(RequiredType) { - required = append(required, err.key) - } else { - optional = append(optional, err.key) - } - } - - log := "" - if len(required) > 0 { - log = log + "required: " + strings.Join(required, ",") + "; " - } - - if len(optional) > 0 { - log = log + "optional: " + strings.Join(optional, ",") + ";" - } - - logp.Debug("schema", "Fields missing - %s", log) -} diff --git a/libbeat/common/schema/errors_test.go b/libbeat/common/schema/errors_test.go deleted file mode 100644 index 5450554d5af..00000000000 --- a/libbeat/common/schema/errors_test.go +++ /dev/null @@ -1,32 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package schema - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestErrors(t *testing.T) { - errs := NewErrors() - err := NewError("test", "Hello World") - errs.AddError(err) - - assert.True(t, errs.HasRequiredErrors()) -} diff --git a/libbeat/common/schema/mapstriface/mapstriface.go b/libbeat/common/schema/mapstriface/mapstriface.go index 91e811e9ec2..f43cc62e79e 100644 --- a/libbeat/common/schema/mapstriface/mapstriface.go +++ b/libbeat/common/schema/mapstriface/mapstriface.go @@ -75,6 +75,8 @@ import ( "fmt" "time" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" "github.com/elastic/beats/libbeat/common/schema" "github.com/elastic/beats/libbeat/logp" @@ -84,29 +86,35 @@ type ConvMap struct { Key string // The key in the data map Schema schema.Schema // The schema describing how to convert the sub-map Optional bool + Required bool } // Map drills down in the data dictionary by using the key -func (convMap ConvMap) Map(key string, event common.MapStr, data map[string]interface{}) *schema.Errors { - subData, ok := data[convMap.Key].(map[string]interface{}) +func (convMap ConvMap) Map(key string, event common.MapStr, data map[string]interface{}) multierror.Errors { + d, found := data[convMap.Key] + if !found { + err := schema.NewKeyNotFoundError(convMap.Key) + err.Optional = convMap.Optional + err.Required = convMap.Required + return multierror.Errors{err} + } + subData, ok := d.(map[string]interface{}) if !ok { - err := schema.NewError(convMap.Key, "Error accessing sub-dictionary") - if convMap.Optional { - err.SetType(schema.OptionalType) - } else { - logp.Err("Error accessing sub-dictionary `%s`", convMap.Key) - } - - errors := schema.NewErrors() - errors.AddError(err) - - return errors + msg := fmt.Sprintf("expected dictionary, found %T", subData) + err := schema.NewWrongFormatError(convMap.Key, msg) + logp.Err(err.Error()) + return multierror.Errors{err} } subEvent := common.MapStr{} - convMap.Schema.ApplyTo(subEvent, subData) + _, errors := convMap.Schema.ApplyTo(subEvent, subData) + for _, err := range errors { + if err, ok := err.(schema.KeyError); ok { + err.SetKey(convMap.Key + "." + err.Key()) + } + } event[key] = subEvent - return nil + return errors } func (convMap ConvMap) HasKey(key string) bool { @@ -124,7 +132,7 @@ func Dict(key string, s schema.Schema, opts ...DictSchemaOption) ConvMap { func toStrFromNum(key string, data map[string]interface{}) (interface{}, error) { emptyIface, exists := data[key] if !exists { - return false, fmt.Errorf("Key %s not found", key) + return "", schema.NewKeyNotFoundError(key) } switch emptyIface.(type) { case int, int32, int64, uint, uint32, uint64, float32, float64: @@ -132,7 +140,8 @@ func toStrFromNum(key string, data map[string]interface{}) (interface{}, error) case json.Number: return string(emptyIface.(json.Number)), nil default: - return "", fmt.Errorf("Expected number, found %T", emptyIface) + msg := fmt.Sprintf("expected number, found %T", emptyIface) + return "", schema.NewWrongFormatError(key, msg) } } @@ -144,11 +153,12 @@ func StrFromNum(key string, opts ...schema.SchemaOption) schema.Conv { func toStr(key string, data map[string]interface{}) (interface{}, error) { emptyIface, err := common.MapStr(data).GetValue(key) if err != nil { - return "", fmt.Errorf("Key %s not found: %s", key, err.Error()) + return "", schema.NewKeyNotFoundError(key) } str, ok := emptyIface.(string) if !ok { - return "", fmt.Errorf("Expected string, found %T", emptyIface) + msg := fmt.Sprintf("expected string, found %T", emptyIface) + return "", schema.NewWrongFormatError(key, msg) } return str, nil } @@ -161,7 +171,9 @@ func Str(key string, opts ...schema.SchemaOption) schema.Conv { func toIfc(key string, data map[string]interface{}) (interface{}, error) { intf, err := common.MapStr(data).GetValue(key) if err != nil { - return "", fmt.Errorf("Key %s not found: %s", key, err.Error()) + e := schema.NewKeyNotFoundError(key) + e.Err = err + return nil, e } return intf, nil } @@ -174,11 +186,12 @@ func Ifc(key string, opts ...schema.SchemaOption) schema.Conv { func toBool(key string, data map[string]interface{}) (interface{}, error) { emptyIface, exists := data[key] if !exists { - return false, fmt.Errorf("Key %s not found", key) + return false, schema.NewKeyNotFoundError(key) } boolean, ok := emptyIface.(bool) if !ok { - return false, fmt.Errorf("Expected bool, found %T", emptyIface) + msg := fmt.Sprintf("expected bool, found %T", emptyIface) + return false, schema.NewWrongFormatError(key, msg) } return boolean, nil } @@ -191,7 +204,7 @@ func Bool(key string, opts ...schema.SchemaOption) schema.Conv { func toInteger(key string, data map[string]interface{}) (interface{}, error) { emptyIface, exists := data[key] if !exists { - return 0, fmt.Errorf("Key %s not found", key) + return 0, schema.NewKeyNotFoundError(key) } switch emptyIface.(type) { case int64: @@ -210,9 +223,11 @@ func toInteger(key string, data map[string]interface{}) (interface{}, error) { if err == nil { return int64(f64), nil } - return 0, fmt.Errorf("expected integer, found json.Number (%v) that cannot be converted", num) + msg := fmt.Sprintf("expected integer, found json.Number (%v) that cannot be converted", num) + return 0, schema.NewWrongFormatError(key, msg) default: - return 0, fmt.Errorf("expected integer, found %T", emptyIface) + msg := fmt.Sprintf("expected integer, found %T", emptyIface) + return 0, schema.NewWrongFormatError(key, msg) } } @@ -225,7 +240,7 @@ func Float(key string, opts ...schema.SchemaOption) schema.Conv { func toFloat(key string, data map[string]interface{}) (interface{}, error) { emptyIface, exists := data[key] if !exists { - return 0, fmt.Errorf("key %s not found", key) + return 0.0, schema.NewKeyNotFoundError(key) } switch emptyIface.(type) { case float64: @@ -244,9 +259,11 @@ func toFloat(key string, data map[string]interface{}) (interface{}, error) { if err == nil { return f64, nil } - return 0, fmt.Errorf("expected float, found json.Number (%v) that cannot be converted", num) + msg := fmt.Sprintf("expected float, found json.Number (%v) that cannot be converted", num) + return 0.0, schema.NewWrongFormatError(key, msg) default: - return 0, fmt.Errorf("expected float, found %T", emptyIface) + msg := fmt.Sprintf("expected float, found %T", emptyIface) + return 0.0, schema.NewWrongFormatError(key, msg) } } @@ -259,7 +276,7 @@ func Int(key string, opts ...schema.SchemaOption) schema.Conv { func toTime(key string, data map[string]interface{}) (interface{}, error) { emptyIface, exists := data[key] if !exists { - return common.Time(time.Unix(0, 0)), fmt.Errorf("Key %s not found", key) + return common.Time(time.Unix(0, 0)), schema.NewKeyNotFoundError(key) } switch emptyIface.(type) { @@ -275,7 +292,8 @@ func toTime(key string, data map[string]interface{}) (interface{}, error) { } } - return common.Time(time.Unix(0, 0)), fmt.Errorf("Expected date, found %T", emptyIface) + msg := fmt.Sprintf("expected date, found %T", emptyIface) + return common.Time(time.Unix(0, 0)), schema.NewWrongFormatError(key, msg) } // Time creates a Conv object for converting Time objects. @@ -287,13 +305,20 @@ func Time(key string, opts ...schema.SchemaOption) schema.Conv { // functions type DictSchemaOption func(c ConvMap) ConvMap -// The optional flag suppresses the error message in case the key -// doesn't exist or results in an error. +// DictOptional sets the optional flag, which suppresses the error in +// case the key doesn't exist or results in an error. func DictOptional(c ConvMap) ConvMap { c.Optional = true return c } +// DictRequired sets the required flag, which forces an error even if fields +// are optional by default +func DictRequired(c ConvMap) ConvMap { + c.Required = true + return c +} + // setOptions adds the optional flags to the Conv object func dictSetOptions(c ConvMap, opts []DictSchemaOption) ConvMap { for _, opt := range opts { diff --git a/libbeat/common/schema/mapstriface/mapstriface_test.go b/libbeat/common/schema/mapstriface/mapstriface_test.go index 372b95ab81b..36905a6cd4f 100644 --- a/libbeat/common/schema/mapstriface/mapstriface_test.go +++ b/libbeat/common/schema/mapstriface/mapstriface_test.go @@ -120,6 +120,135 @@ func TestConversions(t *testing.T) { "test_array": []string{"a", "b", "c"}, } - output, _ := schema.Apply(input) - assert.Equal(t, output, expected) + event, _ := schema.Apply(input) + assert.Equal(t, event, expected) +} + +func TestOptionalField(t *testing.T) { + cases := []struct { + Description string + Input map[string]interface{} + Schema s.Schema + Expected common.MapStr + ExpectError bool + }{ + { + "missing optional field", + map[string]interface{}{ + "testString": "hello", + "testInt": 42, + }, + s.Schema{ + "test_string": Str("testString"), + "test_int": Int("testInt"), + "test_opt": Bool("testOptionalInt", s.Optional), + }, + common.MapStr{ + "test_string": "hello", + "test_int": int64(42), + }, + false, + }, + { + "wrong format in optional field", + map[string]interface{}{ + "testInt": "hello", + }, + s.Schema{ + "test_int": Int("testInt", s.Optional), + }, + common.MapStr{}, + true, + }, + } + + for _, c := range cases { + event, err := c.Schema.Apply(c.Input) + if c.ExpectError { + assert.Error(t, err, c.Description) + } else { + assert.NoError(t, err, c.Description) + assert.Equal(t, c.Expected, event, c.Description) + } + } +} + +func TestFullFieldPathInErrors(t *testing.T) { + cases := []struct { + Description string + Schema s.Schema + Input map[string]interface{} + Expected string + }{ + { + "missing nested key", + s.Schema{ + "a": Dict("A", s.Schema{ + "b": Dict("B", s.Schema{ + "c": Bool("C"), + }), + }), + }, + map[string]interface{}{ + "A": map[string]interface{}{ + "B": map[string]interface{}{}, + }, + }, + `A.B.C`, + }, + { + "wrong nested format key", + s.Schema{ + "test_dict": Dict("testDict", s.Schema{ + "test_bool": Bool("testBool"), + }), + }, + map[string]interface{}{ + "testDict": map[string]interface{}{ + "testBool": "foo", + }, + }, + `testDict.testBool`, + }, + { + "wrong nested sub-dictionary", + s.Schema{ + "test_dict": Dict("testDict", s.Schema{ + "test_dict": Dict("testDict", s.Schema{}), + }), + }, + map[string]interface{}{ + "testDict": map[string]interface{}{ + "testDict": "foo", + }, + }, + `testDict.testDict`, + }, + { + "empty input", + s.Schema{ + "test_dict": Dict("rootDict", s.Schema{ + "test_dict": Dict("testDict", s.Schema{}), + }), + }, + map[string]interface{}{}, + `rootDict`, + }, + } + + for _, c := range cases { + _, err := c.Schema.Apply(c.Input) + if assert.Error(t, err, c.Description) { + assert.Contains(t, err.Error(), c.Expected, c.Description) + } + + _, errs := c.Schema.ApplyTo(common.MapStr{}, c.Input) + assert.Error(t, errs.Err(), c.Description) + if assert.Equal(t, 1, len(errs), c.Description) { + keyErr, ok := errs[0].(s.KeyError) + if assert.True(t, ok, c.Description) { + assert.Equal(t, c.Expected, keyErr.Key(), c.Description) + } + } + } } diff --git a/libbeat/common/schema/mapstrstr/mapstrstr.go b/libbeat/common/schema/mapstrstr/mapstrstr.go index 4b7219531f4..7b9dae83f14 100644 --- a/libbeat/common/schema/mapstrstr/mapstrstr.go +++ b/libbeat/common/schema/mapstrstr/mapstrstr.go @@ -75,7 +75,8 @@ func toBool(key string, data map[string]interface{}) (interface{}, error) { value, err := strconv.ParseBool(str) if err != nil { - return false, fmt.Errorf("Error converting param to bool: %s", key) + msg := fmt.Sprintf("error converting param to bool: `%s`", str) + return false, schema.NewWrongFormatError(key, msg) } return value, nil @@ -95,7 +96,8 @@ func toFloat(key string, data map[string]interface{}) (interface{}, error) { value, err := strconv.ParseFloat(str, 64) if err != nil { - return 0.0, fmt.Errorf("Error converting param to float: %s", key) + msg := fmt.Sprintf("error converting param to float: `%s`", str) + return 0.0, schema.NewWrongFormatError(key, msg) } return value, nil @@ -115,7 +117,8 @@ func toInt(key string, data map[string]interface{}) (interface{}, error) { value, err := strconv.ParseInt(str, 10, 64) if err != nil { - return 0, fmt.Errorf("Error converting param to int: %s", key) + msg := fmt.Sprintf("error converting param to int: `%s`", str) + return 0, schema.NewWrongFormatError(key, msg) } return value, nil @@ -145,7 +148,8 @@ func Time(layout, key string, opts ...schema.SchemaOption) schema.Conv { value, err := time.Parse(layout, str) if err != nil { - return 0, fmt.Errorf("Error converting param to time.Time: %s. Original: %s", key, str) + msg := fmt.Sprintf("error converting param to time.Time: `%s`", str) + return common.Time{}, schema.NewWrongFormatError(key, msg) } return common.Time(value), nil @@ -162,12 +166,12 @@ func Str(key string, opts ...schema.SchemaOption) schema.Conv { func getString(key string, data map[string]interface{}) (string, error) { val, exists := data[key] if !exists { - return "", fmt.Errorf("Key `%s` not found", key) + return "", schema.NewKeyNotFoundError(key) } str, ok := val.(string) if !ok { - return "", fmt.Errorf("Expected value of `%s` to have type string but has %T", key, val) + return "", schema.NewWrongFormatError(key, fmt.Sprintf("expected type string but has %T", val)) } return str, nil diff --git a/libbeat/common/schema/mapstrstr/mapstrstr_test.go b/libbeat/common/schema/mapstrstr/mapstrstr_test.go index a2d4b624969..d1815ff0847 100644 --- a/libbeat/common/schema/mapstrstr/mapstrstr_test.go +++ b/libbeat/common/schema/mapstrstr/mapstrstr_test.go @@ -71,6 +71,64 @@ func TestConversions(t *testing.T) { }, } - output, _ := schema.Apply(input) - assert.Equal(t, output, expected) + event, _ := schema.Apply(input) + assert.Equal(t, event, expected) +} + +func TestKeyInErrors(t *testing.T) { + cases := []struct { + Description string + Schema s.Schema + Input map[string]interface{} + Expected string + }{ + { + "missing nested key", + s.Schema{ + "a": s.Object{ + "b": s.Object{ + "c": Bool("C"), + }, + }, + }, + map[string]interface{}{}, + `C`, + }, + { + "wrong nested format key", + s.Schema{ + "test": s.Object{ + "bool": Bool("testBool"), + }, + }, + map[string]interface{}{ + "testBool": "foo", + }, + `testBool`, + }, + { + "empty input", + s.Schema{ + "root": Str("Root"), + }, + map[string]interface{}{}, + `Root`, + }, + } + + for _, c := range cases { + _, err := c.Schema.Apply(c.Input) + if assert.Error(t, err, c.Description) { + assert.Contains(t, err.Error(), c.Expected, c.Description) + } + + _, errs := c.Schema.ApplyTo(common.MapStr{}, c.Input) + assert.Error(t, errs.Err(), c.Description) + if assert.Equal(t, 1, len(errs), c.Description) { + keyErr, ok := errs[0].(s.KeyError) + if assert.True(t, ok, c.Description) { + assert.Equal(t, c.Expected, keyErr.Key(), c.Description) + } + } + } } diff --git a/libbeat/common/schema/options.go b/libbeat/common/schema/options.go new file mode 100644 index 00000000000..b7d36536700 --- /dev/null +++ b/libbeat/common/schema/options.go @@ -0,0 +1,76 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schema + +import ( + "github.com/joeshaw/multierror" + + "github.com/elastic/beats/libbeat/common" +) + +// DefaultApplyOptions are the default options for Apply() +var DefaultApplyOptions = []ApplyOption{AllRequired} + +// ApplyOption modifies the result of Apply +type ApplyOption func(common.MapStr, multierror.Errors) (common.MapStr, multierror.Errors) + +// AllRequired considers any missing field as an error, except if explicitly +// set as optional +func AllRequired(event common.MapStr, errors multierror.Errors) (common.MapStr, multierror.Errors) { + k := 0 + for i, err := range errors { + if err, ok := err.(*KeyNotFoundError); ok { + if err.Optional { + continue + } + } + errors[k] = errors[i] + k++ + } + return event, errors[:k] +} + +// FailOnRequired considers missing fields as an error only if they are set +// as required +func FailOnRequired(event common.MapStr, errors multierror.Errors) (common.MapStr, multierror.Errors) { + k := 0 + for i, err := range errors { + if err, ok := err.(*KeyNotFoundError); ok { + if !err.Required { + continue + } + } + errors[k] = errors[i] + k++ + } + return event, errors[:k] +} + +// NotFoundKeys calls a function with the list of missing keys as parameter +func NotFoundKeys(cb func(keys []string)) ApplyOption { + return func(event common.MapStr, errors multierror.Errors) (common.MapStr, multierror.Errors) { + var keys []string + for _, err := range errors { + if err, ok := err.(*KeyNotFoundError); ok { + keys = append(keys, err.Key()) + } + } + cb(keys) + return event, errors + } +} diff --git a/libbeat/common/schema/options_test.go b/libbeat/common/schema/options_test.go new file mode 100644 index 00000000000..0e4271d7c21 --- /dev/null +++ b/libbeat/common/schema/options_test.go @@ -0,0 +1,148 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package schema + +import ( + "testing" + + "github.com/joeshaw/multierror" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/libbeat/common" +) + +func TestApplyOptions(t *testing.T) { + cases := []struct { + Description string + Options []ApplyOption + Errors multierror.Errors + ExpectError bool + }{ + { + "all fields required, no error", + []ApplyOption{AllRequired}, + multierror.Errors{}, + false, + }, + { + "all fields required, an error", + []ApplyOption{AllRequired}, + multierror.Errors{ + NewKeyNotFoundError("foo"), + }, + true, + }, + { + "all fields required, some other error, it should fail", + []ApplyOption{AllRequired}, + multierror.Errors{ + errors.New("something bad happened"), + }, + true, + }, + { + "all fields required, an error, collecting missing keys doesn't alter result", + []ApplyOption{NotFoundKeys(func([]string) {}), AllRequired}, + multierror.Errors{ + NewKeyNotFoundError("foo"), + }, + true, + }, + { + "fail on required, an error, not required", + []ApplyOption{FailOnRequired}, + multierror.Errors{ + &KeyNotFoundError{errorKey: errorKey{"foo"}, Required: false}, + }, + false, + }, + { + "fail on required, an error, required", + []ApplyOption{FailOnRequired}, + multierror.Errors{ + &KeyNotFoundError{errorKey: errorKey{"foo"}, Required: true}, + }, + true, + }, + { + "fail on required, some other error, it should fail", + []ApplyOption{FailOnRequired}, + multierror.Errors{ + errors.New("something bad happened"), + }, + true, + }, + } + + for _, c := range cases { + event := common.MapStr{} + errors := c.Errors + for _, opt := range c.Options { + event, errors = opt(event, errors) + } + if c.ExpectError { + assert.Error(t, errors.Err(), c.Description) + } else { + assert.NoError(t, errors.Err(), c.Description) + } + } +} + +func TestNotFoundKeys(t *testing.T) { + cases := []struct { + Description string + Errors multierror.Errors + Expected []string + }{ + { + "empty errors, no key", + multierror.Errors{}, + []string{}, + }, + { + "key not found error", + multierror.Errors{ + NewKeyNotFoundError("foo"), + }, + []string{"foo"}, + }, + { + "only another error, so no key", + multierror.Errors{ + NewWrongFormatError("foo", ""), + }, + []string{}, + }, + { + "two errors, only one is key not found", + multierror.Errors{ + NewKeyNotFoundError("foo"), + NewWrongFormatError("bar", ""), + }, + []string{"foo"}, + }, + } + + for _, c := range cases { + opt := NotFoundKeys(func(keys []string) { + assert.ElementsMatch(t, c.Expected, keys, c.Description) + }) + opt(common.MapStr{}, c.Errors) + } +} diff --git a/libbeat/common/schema/schema.go b/libbeat/common/schema/schema.go index ecfe760ba09..491da93d3c3 100644 --- a/libbeat/common/schema/schema.go +++ b/libbeat/common/schema/schema.go @@ -18,6 +18,8 @@ package schema import ( + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" ) @@ -30,7 +32,7 @@ type Schema map[string]Mapper type Mapper interface { // Map applies the Mapper conversion on the data and adds the result // to the event on the key. - Map(key string, event common.MapStr, data map[string]interface{}) *Errors + Map(key string, event common.MapStr, data map[string]interface{}) multierror.Errors HasKey(key string) bool } @@ -39,29 +41,25 @@ type Mapper interface { type Conv struct { Func Converter // Convertor function Key string // The key in the data map - Optional bool // Whether to log errors if the key is not found + Optional bool // Whether to ignore errors if the key is not found + Required bool // Whether to provoke errors if the key is not found } -// Convertor function type +// Converter function type type Converter func(key string, data map[string]interface{}) (interface{}, error) // Map applies the conversion on the data and adds the result // to the event on the key. -func (conv Conv) Map(key string, event common.MapStr, data map[string]interface{}) *Errors { +func (conv Conv) Map(key string, event common.MapStr, data map[string]interface{}) multierror.Errors { value, err := conv.Func(conv.Key, data) if err != nil { - err := NewError(key, err.Error()) - if conv.Optional { - err.SetType(OptionalType) + if err, keyNotFound := err.(*KeyNotFoundError); keyNotFound { + err.Optional = conv.Optional + err.Required = conv.Required } - - errs := NewErrors() - errs.AddError(err) - return errs - - } else { - event[key] = value + return multierror.Errors{err} } + event[key] = value return nil } @@ -72,7 +70,8 @@ func (conv Conv) HasKey(key string) bool { // implements Mapper interface for structure type Object map[string]Mapper -func (o Object) Map(key string, event common.MapStr, data map[string]interface{}) *Errors { +// Map applies the schema for an object +func (o Object) Map(key string, event common.MapStr, data map[string]interface{}) multierror.Errors { subEvent := common.MapStr{} errs := applySchemaToEvent(subEvent, data, o) event[key] = subEvent @@ -85,15 +84,21 @@ func (o Object) HasKey(key string) bool { // ApplyTo adds the fields extracted from data, converted using the schema, to the // event map. -func (s Schema) ApplyTo(event common.MapStr, data map[string]interface{}) (common.MapStr, *Errors) { +func (s Schema) ApplyTo(event common.MapStr, data map[string]interface{}, opts ...ApplyOption) (common.MapStr, multierror.Errors) { + if len(opts) == 0 { + opts = DefaultApplyOptions + } errors := applySchemaToEvent(event, data, s) - errors.Log() + for _, opt := range opts { + event, errors = opt(event, errors) + } return event, errors } // Apply converts the fields extracted from data, using the schema, into a new map and reports back the errors. -func (s Schema) Apply(data map[string]interface{}) (common.MapStr, *Errors) { - return s.ApplyTo(common.MapStr{}, data) +func (s Schema) Apply(data map[string]interface{}, opts ...ApplyOption) (common.MapStr, error) { + event, errors := s.ApplyTo(common.MapStr{}, data, opts...) + return event, errors.Err() } // HasKey checks if the key is part of the schema @@ -110,11 +115,11 @@ func hasKey(key string, mappers map[string]Mapper) bool { return false } -func applySchemaToEvent(event common.MapStr, data map[string]interface{}, conversions map[string]Mapper) *Errors { - errs := NewErrors() +func applySchemaToEvent(event common.MapStr, data map[string]interface{}, conversions map[string]Mapper) multierror.Errors { + var errs multierror.Errors for key, mapper := range conversions { errors := mapper.Map(key, event, data) - errs.AddErrors(errors) + errs = append(errs, errors...) } return errs } @@ -123,13 +128,20 @@ func applySchemaToEvent(event common.MapStr, data map[string]interface{}, conver // functions type SchemaOption func(c Conv) Conv -// The optional flag suppresses the error message in case the key -// doesn't exist or results in an error. +// Optional sets the optional flag, that suppresses the error in case +// the key doesn't exist func Optional(c Conv) Conv { c.Optional = true return c } +// Required sets the required flag, that provokes an error in case the key +// doesn't exist, even if other missing keys can be ignored +func Required(c Conv) Conv { + c.Required = true + return c +} + // setOptions adds the optional flags to the Conv object func SetOptions(c Conv, opts []SchemaOption) Conv { for _, opt := range opts { diff --git a/metricbeat/module/apache/status/data.go b/metricbeat/module/apache/status/data.go index 3a2a4325944..f20c61d0284 100644 --- a/metricbeat/module/apache/status/data.go +++ b/metricbeat/module/apache/status/data.go @@ -93,17 +93,17 @@ var ( } ) -func applySchema(event common.MapStr, fullEvent map[string]interface{}) *s.Errors { +func applySchema(event common.MapStr, fullEvent map[string]interface{}) error { applicableSchema := schema if _, found := fullEvent["ServerUptimeSeconds"]; !found { applicableSchema = schemaOld } _, err := applicableSchema.ApplyTo(event, fullEvent) - return err + return err.Err() } // Map body to MapStr -func eventMapping(scanner *bufio.Scanner, hostname string) (common.MapStr, *s.Errors) { +func eventMapping(scanner *bufio.Scanner, hostname string) (common.MapStr, error) { var ( totalS int totalR int diff --git a/metricbeat/module/apache/status/status_test.go b/metricbeat/module/apache/status/status_test.go index 03dcb563618..2fde31afb56 100644 --- a/metricbeat/module/apache/status/status_test.go +++ b/metricbeat/module/apache/status/status_test.go @@ -252,7 +252,7 @@ func TestStatusOutputs(t *testing.T) { assert.NoError(t, err, "cannot open test file "+filename) scanner := bufio.NewScanner(f) - _, errors := eventMapping(scanner, "localhost") - assert.False(t, errors.HasRequiredErrors(), "error mapping "+filename) + _, err = eventMapping(scanner, "localhost") + assert.NoError(t, err, "error mapping "+filename) } } diff --git a/metricbeat/module/elasticsearch/index/data.go b/metricbeat/module/elasticsearch/index/data.go index 1c79273adf6..95dfe7e32d3 100644 --- a/metricbeat/module/elasticsearch/index/data.go +++ b/metricbeat/module/elasticsearch/index/data.go @@ -20,6 +20,8 @@ package index import ( "encoding/json" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" @@ -49,8 +51,7 @@ var ( } ) -func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) []error { - +func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error { var indicesStruct struct { Indices map[string]map[string]interface{} `json:"indices"` } @@ -58,15 +59,16 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) []e err := json.Unmarshal(content, &indicesStruct) if err != nil { r.Error(err) - return []error{err} + return err } - var errs []error + var errors multierror.Errors for name, index := range indicesStruct.Indices { event := mb.Event{} event.MetricSetFields, err = schema.Apply(index) if err != nil { - errs = append(errs, err) + r.Error(err) + errors = append(errors, err) } // Write name here as full name only available as key event.MetricSetFields["name"] = name @@ -77,5 +79,6 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) []e event.ModuleFields.Put("cluster.id", info.ClusterID) r.Event(event) } - return errs + + return errors.Err() } diff --git a/metricbeat/module/elasticsearch/index_summary/data.go b/metricbeat/module/elasticsearch/index_summary/data.go index eb53bea3a30..6b7fe459259 100644 --- a/metricbeat/module/elasticsearch/index_summary/data.go +++ b/metricbeat/module/elasticsearch/index_summary/data.go @@ -66,7 +66,7 @@ var ( } ) -func eventMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) []error { +func eventMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) error { var all struct { Data map[string]interface{} `json:"_all"` } @@ -74,17 +74,16 @@ func eventMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) []er err := json.Unmarshal(content, &all) if err != nil { r.Error(err) - return []error{err} + return err } - var errs []error - - fields, err := schema.Apply(all.Data) + fields, err := schema.Apply(all.Data, s.FailOnRequired) if err != nil { - errs = append(errs, err) + r.Error(err) + return err } - event := mb.Event{} + var event mb.Event event.RootFields = common.MapStr{} event.RootFields.Put("service.name", "elasticsearch") @@ -96,5 +95,5 @@ func eventMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) []er r.Event(event) - return errs + return nil } diff --git a/metricbeat/module/elasticsearch/index_summary/data_test.go b/metricbeat/module/elasticsearch/index_summary/data_test.go index 3ef5dcf2609..4d9fe5f27bc 100644 --- a/metricbeat/module/elasticsearch/index_summary/data_test.go +++ b/metricbeat/module/elasticsearch/index_summary/data_test.go @@ -44,5 +44,6 @@ func TestEmpty(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} eventMapping(reporter, info, input) + assert.Empty(t, reporter.GetErrors()) assert.Equal(t, 1, len(reporter.GetEvents())) } diff --git a/metricbeat/module/elasticsearch/node/data.go b/metricbeat/module/elasticsearch/node/data.go index 9ee631210b9..62f18896d10 100644 --- a/metricbeat/module/elasticsearch/node/data.go +++ b/metricbeat/module/elasticsearch/node/data.go @@ -20,6 +20,8 @@ package node import ( "encoding/json" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" @@ -57,7 +59,7 @@ var ( } ) -func eventsMapping(r mb.ReporterV2, content []byte) []error { +func eventsMapping(r mb.ReporterV2, content []byte) error { nodesStruct := struct { ClusterName string `json:"cluster_name"` Nodes map[string]map[string]interface{} `json:"nodes"` @@ -66,15 +68,12 @@ func eventsMapping(r mb.ReporterV2, content []byte) []error { err := json.Unmarshal(content, &nodesStruct) if err != nil { r.Error(err) - return []error{err} + return err } - var errs []error - + var errs multierror.Errors for name, node := range nodesStruct.Nodes { - event := mb.Event{} - event.MetricSetFields, err = eventMapping(node) if err != nil { errs = append(errs, err) @@ -92,9 +91,9 @@ func eventsMapping(r mb.ReporterV2, content []byte) []error { r.Event(event) } - return errs + return errs.Err() } -func eventMapping(node map[string]interface{}) (common.MapStr, *s.Errors) { +func eventMapping(node map[string]interface{}) (common.MapStr, error) { return schema.Apply(node) } diff --git a/metricbeat/module/elasticsearch/node/data_test.go b/metricbeat/module/elasticsearch/node/data_test.go index de87210af8d..de2537ddfa8 100644 --- a/metricbeat/module/elasticsearch/node/data_test.go +++ b/metricbeat/module/elasticsearch/node/data_test.go @@ -23,7 +23,6 @@ import ( "io/ioutil" "testing" - s "github.com/elastic/beats/libbeat/common/schema" mbtest "github.com/elastic/beats/metricbeat/mb/testing" "github.com/stretchr/testify/assert" @@ -42,12 +41,6 @@ func TestInvalid(t *testing.T) { assert.NoError(t, err) reporter := &mbtest.CapturingReporterV2{} - errs := eventsMapping(reporter, content) - - errors, ok := errs[0].(*s.Errors) - if ok { - assert.True(t, errors.HasRequiredErrors(), "mapping error: %s", errors) - } else { - t.Error(err) - } + err = eventsMapping(reporter, content) + assert.Error(t, err) } diff --git a/metricbeat/module/elasticsearch/node_stats/data.go b/metricbeat/module/elasticsearch/node_stats/data.go index 1fbeb02d9fe..576a3a8683a 100644 --- a/metricbeat/module/elasticsearch/node_stats/data.go +++ b/metricbeat/module/elasticsearch/node_stats/data.go @@ -20,6 +20,8 @@ package node_stats import ( "encoding/json" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" @@ -103,22 +105,22 @@ type nodesStruct struct { Nodes map[string]map[string]interface{} `json:"nodes"` } -func eventsMapping(r mb.ReporterV2, content []byte) []error { +func eventsMapping(r mb.ReporterV2, content []byte) error { nodeData := &nodesStruct{} err := json.Unmarshal(content, nodeData) if err != nil { r.Error(err) - return []error{err} + return err } - var errs []error + var errs multierror.Errors for name, node := range nodeData.Nodes { event := mb.Event{} event.MetricSetFields, err = schema.Apply(node) if err != nil { - errs = append(errs, err) + r.Error(err) } event.ModuleFields = common.MapStr{ @@ -133,5 +135,5 @@ func eventsMapping(r mb.ReporterV2, content []byte) []error { event.RootFields.Put("service.name", "elasticsearch") r.Event(event) } - return errs + return errs.Err() } diff --git a/metricbeat/module/elasticsearch/pending_tasks/_meta/test/invalid_format.json b/metricbeat/module/elasticsearch/pending_tasks/_meta/test/invalid_format.json index 6f8c7376fde..6ff68b5642c 100644 --- a/metricbeat/module/elasticsearch/pending_tasks/_meta/test/invalid_format.json +++ b/metricbeat/module/elasticsearch/pending_tasks/_meta/test/invalid_format.json @@ -8,4 +8,4 @@ "time_in_queue": "86ms" } ] - } \ No newline at end of file + } diff --git a/metricbeat/module/elasticsearch/pending_tasks/data.go b/metricbeat/module/elasticsearch/pending_tasks/data.go index f219ea4bb11..28c87c2055c 100644 --- a/metricbeat/module/elasticsearch/pending_tasks/data.go +++ b/metricbeat/module/elasticsearch/pending_tasks/data.go @@ -20,6 +20,8 @@ package pending_tasks import ( "encoding/json" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" @@ -34,7 +36,7 @@ var ( } ) -func eventsMapping(content []byte) ([]common.MapStr, error) { +func eventsMapping(content []byte, applyOpts ...s.ApplyOption) ([]common.MapStr, error) { tasksStruct := struct { Tasks []map[string]interface{} `json:"tasks"` }{} @@ -42,15 +44,21 @@ func eventsMapping(content []byte) ([]common.MapStr, error) { if err := json.Unmarshal(content, &tasksStruct); err != nil { return nil, err } + if tasksStruct.Tasks == nil { + return nil, s.NewKeyNotFoundError("tasks") + } var events []common.MapStr - errors := s.NewErrors() + var errors multierror.Errors + opts := append(applyOpts, s.AllRequired) for _, task := range tasksStruct.Tasks { - event, errs := schema.Apply(task) - errors.AddErrors(errs) + event, err := schema.Apply(task, opts...) + if err != nil { + errors = append(errors, err) + } events = append(events, event) } - return events, errors + return events, errors.Err() } diff --git a/metricbeat/module/elasticsearch/pending_tasks/data_test.go b/metricbeat/module/elasticsearch/pending_tasks/data_test.go index b4d8564ad2e..c312f9795c3 100644 --- a/metricbeat/module/elasticsearch/pending_tasks/data_test.go +++ b/metricbeat/module/elasticsearch/pending_tasks/data_test.go @@ -37,14 +37,8 @@ func TestEmptyQueueShouldGiveNoError(t *testing.T) { content, err := ioutil.ReadFile(file) assert.NoError(t, err) - _, errs := eventsMapping(content) - - errors, ok := errs.(*s.Errors) - if ok { - assert.False(t, errors.HasRequiredErrors(), "mapping error: %s", errors) - } else { - t.Error(err) - } + _, err = eventsMapping(content) + assert.NoError(t, err) } func TestNotEmptyQueueShouldGiveNoError(t *testing.T) { @@ -52,14 +46,8 @@ func TestNotEmptyQueueShouldGiveNoError(t *testing.T) { content, err := ioutil.ReadFile(file) assert.NoError(t, err) - _, errs := eventsMapping(content) - - errors, ok := errs.(*s.Errors) - if ok { - assert.False(t, errors.HasRequiredErrors(), "mapping error: %s", errors) - } else { - t.Error(err) - } + _, err = eventsMapping(content) + assert.NoError(t, err) } func TestEmptyQueueShouldGiveZeroEvent(t *testing.T) { @@ -68,7 +56,6 @@ func TestEmptyQueueShouldGiveZeroEvent(t *testing.T) { assert.NoError(t, err) events, _ := eventsMapping(content) - assert.Zero(t, len(events)) } @@ -97,15 +84,13 @@ func TestInvalidJsonForRequiredFieldShouldThrowError(t *testing.T) { content, err := ioutil.ReadFile(file) assert.NoError(t, err) - _, errs := eventsMapping(content) - - errors, ok := errs.(*s.Errors) - if ok { - assert.True(t, errors.HasRequiredErrors(), "mapping error: %s", errors) - assert.EqualError(t, errors, "Required fields are missing: ,source") - } else { - t.Error(err) - } + var notFoundKeys []string + expectedNotFoundKeys := []string{"source"} + _, err = eventsMapping(content, s.NotFoundKeys(func(keys []string) { + notFoundKeys = keys + })) + assert.ElementsMatch(t, expectedNotFoundKeys, notFoundKeys) + assert.Error(t, err) } func TestInvalidJsonForBadFormatShouldThrowError(t *testing.T) { @@ -114,7 +99,6 @@ func TestInvalidJsonForBadFormatShouldThrowError(t *testing.T) { assert.NoError(t, err) _, err = eventsMapping(content) - assert.Error(t, err) } diff --git a/metricbeat/module/elasticsearch/shard/data.go b/metricbeat/module/elasticsearch/shard/data.go index e6e0a00cde1..00de2076896 100644 --- a/metricbeat/module/elasticsearch/shard/data.go +++ b/metricbeat/module/elasticsearch/shard/data.go @@ -62,7 +62,6 @@ func eventsMapping(r mb.ReporterV2, content []byte) { event := mb.Event{} fields, _ := schema.Apply(shard) - event.ModuleFields = common.MapStr{} event.ModuleFields.Put("node.name", fields["node"]) delete(fields, "node") diff --git a/metricbeat/module/elasticsearch/shard/data_xpack.go b/metricbeat/module/elasticsearch/shard/data_xpack.go index fbce9531d37..6ed9c917e9e 100644 --- a/metricbeat/module/elasticsearch/shard/data_xpack.go +++ b/metricbeat/module/elasticsearch/shard/data_xpack.go @@ -62,7 +62,11 @@ func eventsMappingXPack(r mb.ReporterV2, m *MetricSet, content []byte) { for _, shards := range index.Shards { for _, shard := range shards { event := mb.Event{} - fields, _ := schema.Apply(shard) + fields, err := schema.Apply(shard) + if err != nil { + r.Error(err) + continue + } fields["shard"] = fields["number"] delete(fields, "number") diff --git a/metricbeat/module/elasticsearch/testing.go b/metricbeat/module/elasticsearch/testing.go index 60a8eedd64f..612ba74a0d2 100644 --- a/metricbeat/module/elasticsearch/testing.go +++ b/metricbeat/module/elasticsearch/testing.go @@ -24,7 +24,6 @@ import ( "path/filepath" "testing" - s "github.com/elastic/beats/libbeat/common/schema" "github.com/elastic/beats/metricbeat/mb" mbtest "github.com/elastic/beats/metricbeat/mb/testing" @@ -32,7 +31,7 @@ import ( ) // TestMapper tests mapping methods -func TestMapper(t *testing.T, glob string, mapper func(mb.ReporterV2, []byte) []error) { +func TestMapper(t *testing.T, glob string, mapper func(mb.ReporterV2, []byte) error) { files, err := filepath.Glob(glob) assert.NoError(t, err) // Makes sure glob matches at least 1 file @@ -44,12 +43,8 @@ func TestMapper(t *testing.T, glob string, mapper func(mb.ReporterV2, []byte) [] assert.NoError(t, err) reporter := &mbtest.CapturingReporterV2{} - errors := mapper(reporter, input) - for _, errs := range errors { - if e, ok := errs.(*s.Errors); ok { - assert.False(t, e.HasRequiredErrors(), "mapping error: %s", e) - } - } + err = mapper(reporter, input) + assert.NoError(t, err) assert.True(t, len(reporter.GetEvents()) >= 1) assert.Equal(t, 0, len(reporter.GetErrors())) }) @@ -57,7 +52,7 @@ func TestMapper(t *testing.T, glob string, mapper func(mb.ReporterV2, []byte) [] } // TestMapperWithInfo tests mapping methods with Info fields -func TestMapperWithInfo(t *testing.T, glob string, mapper func(mb.ReporterV2, Info, []byte) []error) { +func TestMapperWithInfo(t *testing.T, glob string, mapper func(mb.ReporterV2, Info, []byte) error) { files, err := filepath.Glob(glob) assert.NoError(t, err) // Makes sure glob matches at least 1 file @@ -74,12 +69,8 @@ func TestMapperWithInfo(t *testing.T, glob string, mapper func(mb.ReporterV2, In assert.NoError(t, err) reporter := &mbtest.CapturingReporterV2{} - errors := mapper(reporter, info, input) - for _, errs := range errors { - if e, ok := errs.(*s.Errors); ok { - assert.False(t, e.HasRequiredErrors(), "mapping error: %s", e) - } - } + err = mapper(reporter, info, input) + assert.NoError(t, err) assert.True(t, len(reporter.GetEvents()) >= 1) assert.Equal(t, 0, len(reporter.GetErrors())) }) diff --git a/metricbeat/module/kibana/stats/data.go b/metricbeat/module/kibana/stats/data.go index e1aa7d480e2..126b01c3f2b 100644 --- a/metricbeat/module/kibana/stats/data.go +++ b/metricbeat/module/kibana/stats/data.go @@ -48,8 +48,8 @@ var ( }, }), "requests": c.Dict("requests", s.Schema{ - "total": c.Int("total"), - "disconnects": c.Int("disconnects"), + "total": c.Int("total", s.Optional), + "disconnects": c.Int("disconnects", s.Optional), }), "concurrent_connections": c.Int("concurrent_connections"), "sockets": c.Dict("sockets", s.Schema{ @@ -95,8 +95,11 @@ func eventMapping(r mb.ReporterV2, content []byte) error { } dataFields, err := schema.Apply(data) - event := mb.Event{} + if err != nil { + r.Error(err) + } + var event mb.Event event.RootFields = common.MapStr{} event.RootFields.Put("service.name", "kibana") diff --git a/metricbeat/module/kibana/stats/data_test.go b/metricbeat/module/kibana/stats/data_test.go index 447179325ea..ca01331137c 100644 --- a/metricbeat/module/kibana/stats/data_test.go +++ b/metricbeat/module/kibana/stats/data_test.go @@ -24,7 +24,6 @@ import ( "path/filepath" "testing" - s "github.com/elastic/beats/libbeat/common/schema" mbtest "github.com/elastic/beats/metricbeat/mb/testing" "github.com/stretchr/testify/assert" @@ -41,11 +40,8 @@ func TestStats(t *testing.T) { reporter := &mbtest.CapturingReporterV2{} err = eventMapping(reporter, input) - if e, ok := err.(*s.Errors); ok { - assert.False(t, e.HasRequiredErrors(), "mapping error: %s", e) - } - - assert.True(t, len(reporter.GetEvents()) >= 1) - assert.Equal(t, 0, len(reporter.GetErrors())) + assert.NoError(t, err, f) + assert.True(t, len(reporter.GetEvents()) >= 1, f) + assert.Equal(t, 0, len(reporter.GetErrors()), f) } } diff --git a/metricbeat/module/logstash/node/data.go b/metricbeat/module/logstash/node/data.go index 229eb6c25a1..eb1b5ba61f5 100644 --- a/metricbeat/module/logstash/node/data.go +++ b/metricbeat/module/logstash/node/data.go @@ -34,6 +34,6 @@ var ( } ) -func eventMapping(node map[string]interface{}) (common.MapStr, *s.Errors) { +func eventMapping(node map[string]interface{}) (common.MapStr, error) { return schema.Apply(node) } diff --git a/metricbeat/module/logstash/node_stats/data.go b/metricbeat/module/logstash/node_stats/data.go index 9c40caa7e22..6dad0598b83 100644 --- a/metricbeat/module/logstash/node_stats/data.go +++ b/metricbeat/module/logstash/node_stats/data.go @@ -33,6 +33,6 @@ var ( } ) -func eventMapping(node map[string]interface{}) (common.MapStr, *s.Errors) { +func eventMapping(node map[string]interface{}) (common.MapStr, error) { return schema.Apply(node) } diff --git a/metricbeat/module/rabbitmq/connection/data.go b/metricbeat/module/rabbitmq/connection/data.go index b1ce201c3a9..b4ee6d20a44 100644 --- a/metricbeat/module/rabbitmq/connection/data.go +++ b/metricbeat/module/rabbitmq/connection/data.go @@ -20,6 +20,8 @@ package connection import ( "encoding/json" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" @@ -62,19 +64,19 @@ func eventsMapping(content []byte) ([]common.MapStr, error) { return nil, err } - events := []common.MapStr{} - errors := s.NewErrors() - + var events []common.MapStr + var errors multierror.Errors for _, node := range connections { - event, errs := eventMapping(node) + event, err := eventMapping(node) events = append(events, event) - errors.AddErrors(errs) - + if err != nil { + errors = append(errors, err) + } } - return events, errors + return events, errors.Err() } -func eventMapping(connection map[string]interface{}) (common.MapStr, *s.Errors) { +func eventMapping(connection map[string]interface{}) (common.MapStr, error) { return schema.Apply(connection) } diff --git a/metricbeat/module/rabbitmq/exchange/data.go b/metricbeat/module/rabbitmq/exchange/data.go index e3c81c8eb3a..72e869fb595 100644 --- a/metricbeat/module/rabbitmq/exchange/data.go +++ b/metricbeat/module/rabbitmq/exchange/data.go @@ -20,6 +20,8 @@ package exchange import ( "encoding/json" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" @@ -61,18 +63,20 @@ func eventsMapping(content []byte) ([]common.MapStr, error) { return nil, err } - events := []common.MapStr{} - errors := s.NewErrors() + var events []common.MapStr + var errors multierror.Errors for _, exchange := range exchanges { - event, errs := eventMapping(exchange) + event, err := eventMapping(exchange) + if err != nil { + errors = append(errors, err) + } events = append(events, event) - errors.AddErrors(errs) } - return events, errors + return events, errors.Err() } -func eventMapping(exchange map[string]interface{}) (common.MapStr, *s.Errors) { +func eventMapping(exchange map[string]interface{}) (common.MapStr, error) { return schema.Apply(exchange) } diff --git a/metricbeat/module/rabbitmq/node/data.go b/metricbeat/module/rabbitmq/node/data.go index 6526cef0565..4318887d70f 100644 --- a/metricbeat/module/rabbitmq/node/data.go +++ b/metricbeat/module/rabbitmq/node/data.go @@ -159,7 +159,11 @@ func eventsMapping(r mb.ReporterV2, content []byte) { } func eventMapping(r mb.ReporterV2, node map[string]interface{}) { - event, _ := schema.Apply(node) + event, err := schema.Apply(node) + if err != nil { + r.Error(err) + return + } r.Event(mb.Event{ MetricSetFields: event, }) diff --git a/metricbeat/module/rabbitmq/queue/data.go b/metricbeat/module/rabbitmq/queue/data.go index a2cb7949741..56b977eab12 100644 --- a/metricbeat/module/rabbitmq/queue/data.go +++ b/metricbeat/module/rabbitmq/queue/data.go @@ -20,6 +20,8 @@ package queue import ( "encoding/json" + "github.com/joeshaw/multierror" + "github.com/elastic/beats/libbeat/common" s "github.com/elastic/beats/libbeat/common/schema" c "github.com/elastic/beats/libbeat/common/schema/mapstriface" @@ -89,18 +91,20 @@ func eventsMapping(content []byte) ([]common.MapStr, error) { return nil, err } - events := []common.MapStr{} - errors := s.NewErrors() - + var events []common.MapStr + var errors multierror.Errors for _, queue := range queues { - event, errs := eventMapping(queue) + event, err := eventMapping(queue) events = append(events, event) - errors.AddErrors(errs) + if err != nil { + errors = append(errors, err) + } } - return events, errors + return events, errors.Err() } -func eventMapping(queue map[string]interface{}) (common.MapStr, *s.Errors) { - return schema.Apply(queue) +func eventMapping(queue map[string]interface{}) (common.MapStr, error) { + event, err := schema.Apply(queue) + return event, err }