-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
json_decode_fields processor #2605
Changes from all commits
21555b2
842c996
91b64a8
3e53022
c4c3a8c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
package jsontransform | ||
|
||
import ( | ||
"encoding/json" | ||
|
||
"github.com/elastic/beats/libbeat/common" | ||
) | ||
|
||
// TransformNumbers walks a json decoded tree an replaces json.Number | ||
// with int64, float64, or string, in this order of preference (i.e. if it | ||
// parses as an int, use int. if it parses as a float, use float. etc). | ||
func TransformNumbers(dict common.MapStr) { | ||
for k, v := range dict { | ||
switch vv := v.(type) { | ||
case json.Number: | ||
dict[k] = transformNumber(vv) | ||
case map[string]interface{}: | ||
TransformNumbers(vv) | ||
case []interface{}: | ||
transformNumbersArray(vv) | ||
} | ||
} | ||
} | ||
|
||
func transformNumber(value json.Number) interface{} { | ||
i64, err := value.Int64() | ||
if err == nil { | ||
return i64 | ||
} | ||
f64, err := value.Float64() | ||
if err == nil { | ||
return f64 | ||
} | ||
return value.String() | ||
} | ||
|
||
func transformNumbersArray(arr []interface{}) { | ||
for i, v := range arr { | ||
switch vv := v.(type) { | ||
case json.Number: | ||
arr[i] = transformNumber(vv) | ||
case map[string]interface{}: | ||
TransformNumbers(vv) | ||
case []interface{}: | ||
transformNumbersArray(vv) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
package actions | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"fmt" | ||
"strings" | ||
|
||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/common/jsontransform" | ||
"github.com/elastic/beats/libbeat/logp" | ||
"github.com/elastic/beats/libbeat/processors" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
type decodeJSONFields struct { | ||
fields []string | ||
maxDepth int | ||
processArray bool | ||
} | ||
|
||
type config struct { | ||
Fields []string `config:"fields"` | ||
MaxDepth int `config:"maxDepth" validate:"min=1"` | ||
ProcessArray bool `config:"processArray"` | ||
} | ||
|
||
var ( | ||
defaultConfig = config{ | ||
MaxDepth: 1, | ||
ProcessArray: false, | ||
} | ||
) | ||
|
||
var debug = logp.MakeDebug("filters") | ||
|
||
func init() { | ||
processors.RegisterPlugin("decode_json_fields", | ||
configChecked(newDecodeJSONFields, | ||
requireFields("fields"), | ||
allowedFields("fields", "maxDepth", "processArray"))) | ||
} | ||
|
||
func newDecodeJSONFields(c common.Config) (processors.Processor, error) { | ||
config := defaultConfig | ||
|
||
err := c.Unpack(&config) | ||
|
||
if err != nil { | ||
logp.Warn("Error unpacking config for decode_json_fields") | ||
return nil, fmt.Errorf("fail to unpack the decode_json_fields configuration: %s", err) | ||
} | ||
|
||
f := decodeJSONFields{fields: config.Fields, maxDepth: config.MaxDepth, processArray: config.ProcessArray} | ||
return f, nil | ||
} | ||
|
||
func (f decodeJSONFields) Run(event common.MapStr) (common.MapStr, error) { | ||
var errs []string | ||
|
||
for _, field := range f.fields { | ||
data, err := event.GetValue(field) | ||
if err != nil && errors.Cause(err) != common.ErrKeyNotFound { | ||
debug("Error trying to GetValue for field : %s in event : %v", field, event) | ||
errs = append(errs, err.Error()) | ||
continue | ||
} | ||
text, ok := data.(string) | ||
if ok { | ||
var output interface{} | ||
err := unmarshal(f.maxDepth, []byte(text), &output, f.processArray) | ||
if err != nil { | ||
debug("Error trying to unmarshal %s", event[field]) | ||
errs = append(errs, err.Error()) | ||
continue | ||
} | ||
|
||
_, err = event.Put(field, output) | ||
if err != nil { | ||
debug("Error trying to Put value %v for field : %s", output, field) | ||
errs = append(errs, err.Error()) | ||
continue | ||
} | ||
} | ||
} | ||
|
||
return event, fmt.Errorf(strings.Join(errs, ", ")) | ||
} | ||
|
||
func unmarshal(maxDepth int, text []byte, fields *interface{}, processArray bool) error { | ||
if err := DecodeJSON(text, fields); err != nil { | ||
return err | ||
} | ||
|
||
maxDepth-- | ||
if maxDepth == 0 { | ||
return nil | ||
} | ||
|
||
tryUnmarshal := func(v interface{}) (interface{}, bool) { | ||
str, isString := v.(string) | ||
if !isString { | ||
return v, false | ||
} | ||
|
||
var tmp interface{} | ||
err := unmarshal(maxDepth, []byte(str), &tmp, processArray) | ||
if err != nil { | ||
return v, false | ||
} | ||
|
||
return tmp, true | ||
} | ||
|
||
// try to deep unmarshal fields | ||
switch O := interface{}(*fields).(type) { | ||
case map[string]interface{}: | ||
for k, v := range O { | ||
if decoded, ok := tryUnmarshal(v); ok { | ||
O[k] = decoded | ||
} | ||
} | ||
// We want to process arrays here | ||
case []interface{}: | ||
if !processArray { | ||
break | ||
} | ||
|
||
for i, v := range O { | ||
if decoded, ok := tryUnmarshal(v); ok { | ||
O[i] = decoded | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func DecodeJSON(text []byte, to *interface{}) error { | ||
dec := json.NewDecoder(bytes.NewReader(text)) | ||
dec.UseNumber() | ||
err := dec.Decode(to) | ||
|
||
if err != nil { | ||
return err | ||
} | ||
|
||
switch O := interface{}(*to).(type) { | ||
case map[string]interface{}: | ||
jsontransform.TransformNumbers(O) | ||
} | ||
return nil | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thinking a little more about
Then (extending unmarshal a little), unmarshal can be generalized a little:
As you can see, I added a maximum parsing depth. Not sure we really want to parse fully recursively here. I'd use Having DecodeJson, there is no need for exporting jsontransform as is (but I'd keep jsontransform package for now, as other modules in beats might make use of it). What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the idea of having a maxDepth setup so that we don't recursively parse till the end. |
||
|
||
func (f decodeJSONFields) String() string { | ||
return "decode_json_fields=" + strings.Join(f.fields, ", ") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably not be under bug fixes? This is a new feature as far as I understand.