-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve the JSON parser performance. #7723
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,9 +5,9 @@ import ( | |
"errors" | ||
"fmt" | ||
"io" | ||
"strings" | ||
"unicode/utf8" | ||
|
||
"github.com/buger/jsonparser" | ||
"github.com/grafana/loki/pkg/logql/log/jsonexpr" | ||
"github.com/grafana/loki/pkg/logql/log/logfmt" | ||
"github.com/grafana/loki/pkg/logql/log/pattern" | ||
|
@@ -23,170 +23,162 @@ const ( | |
duplicateSuffix = "_extracted" | ||
trueString = "true" | ||
falseString = "false" | ||
// How much stack space to allocate for unescaping JSON strings; if a string longer | ||
// than this needs to be escaped, it will result in a heap allocation | ||
unescapeStackBufSize = 64 | ||
) | ||
|
||
var ( | ||
_ Stage = &JSONParser{} | ||
_ Stage = &RegexpParser{} | ||
_ Stage = &LogfmtParser{} | ||
|
||
trueBytes = []byte("true") | ||
|
||
errUnexpectedJSONObject = fmt.Errorf("expecting json object(%d), but it is not", jsoniter.ObjectValue) | ||
errMissingCapture = errors.New("at least one named capture must be supplied") | ||
) | ||
|
||
type JSONParser struct { | ||
buf []byte // buffer used to build json keys | ||
lbs *LabelsBuilder | ||
prefixBuffer []byte // buffer used to build json keys | ||
lbs *LabelsBuilder | ||
|
||
keys internedStringSet | ||
} | ||
|
||
// NewJSONParser creates a log stage that can parse a json log line and add properties as labels. | ||
func NewJSONParser() *JSONParser { | ||
return &JSONParser{ | ||
buf: make([]byte, 0, 1024), | ||
keys: internedStringSet{}, | ||
prefixBuffer: make([]byte, 0, 1024), | ||
keys: internedStringSet{}, | ||
} | ||
} | ||
|
||
func (j *JSONParser) Process(_ int64, line []byte, lbs *LabelsBuilder) ([]byte, bool) { | ||
if lbs.ParserLabelHints().NoLabels() { | ||
return line, true | ||
} | ||
it := jsoniter.ConfigFastest.BorrowIterator(line) | ||
defer jsoniter.ConfigFastest.ReturnIterator(it) | ||
|
||
// reset the state. | ||
j.buf = j.buf[:0] | ||
j.prefixBuffer = j.prefixBuffer[:0] | ||
j.lbs = lbs | ||
|
||
if err := j.readObject(it); err != nil { | ||
if err := jsonparser.ObjectEach(line, j.parseObject); err != nil { | ||
lbs.SetErr(errJSON) | ||
lbs.SetErrorDetails(err.Error()) | ||
return line, true | ||
} | ||
return line, true | ||
} | ||
|
||
func (j *JSONParser) readObject(it *jsoniter.Iterator) error { | ||
// we only care about object and values. | ||
if nextType := it.WhatIsNext(); nextType != jsoniter.ObjectValue { | ||
return errUnexpectedJSONObject | ||
} | ||
_ = it.ReadMapCB(j.parseMap("")) | ||
if it.Error != nil && it.Error != io.EOF { | ||
return it.Error | ||
func (j *JSONParser) parseObject(key, value []byte, dataType jsonparser.ValueType, offset int) error { | ||
switch dataType { | ||
case jsonparser.String, jsonparser.Number, jsonparser.Boolean: | ||
j.parseLabelValue(key, value, dataType) | ||
case jsonparser.Object: | ||
prefixLen := len(j.prefixBuffer) | ||
var err error | ||
if ok := j.nextKeyPrefix(key); ok { | ||
err = jsonparser.ObjectEach(value, j.parseObject) | ||
} | ||
// rollback the prefix as we exit the current object. | ||
j.prefixBuffer = j.prefixBuffer[:prefixLen] | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (j *JSONParser) parseMap(prefix string) func(iter *jsoniter.Iterator, field string) bool { | ||
return func(iter *jsoniter.Iterator, field string) bool { | ||
switch iter.WhatIsNext() { | ||
// are we looking at a value that needs to be added ? | ||
case jsoniter.StringValue, jsoniter.NumberValue, jsoniter.BoolValue: | ||
j.parseLabelValue(iter, prefix, field) | ||
// Or another new object based on a prefix. | ||
case jsoniter.ObjectValue: | ||
if key, ok := j.nextKeyPrefix(prefix, field); ok { | ||
return iter.ReadMapCB(j.parseMap(key)) | ||
} | ||
// If this keys is not expected we skip the object | ||
iter.Skip() | ||
default: | ||
iter.Skip() | ||
} | ||
return true | ||
// nextKeyPrefix load the next prefix in the buffer and tells if it should be processed based on hints. | ||
func (j *JSONParser) nextKeyPrefix(key []byte) bool { | ||
// first add the spacer if needed. | ||
if len(j.prefixBuffer) != 0 { | ||
j.prefixBuffer = append(j.prefixBuffer, byte(jsonSpacer)) | ||
} | ||
j.prefixBuffer = appendSanitized(j.prefixBuffer, key) | ||
return j.lbs.ParserLabelHints().ShouldExtractPrefix(unsafeGetString(j.prefixBuffer)) | ||
} | ||
|
||
func (j *JSONParser) nextKeyPrefix(prefix, field string) (string, bool) { | ||
// first time we add return the field as prefix. | ||
if len(prefix) == 0 { | ||
field = sanitizeLabelKey(field, true) | ||
if j.lbs.ParserLabelHints().ShouldExtractPrefix(field) { | ||
return field, true | ||
} | ||
return "", false | ||
} | ||
// otherwise we build the prefix and check using the buffer | ||
j.buf = j.buf[:0] | ||
j.buf = append(j.buf, prefix...) | ||
j.buf = append(j.buf, byte(jsonSpacer)) | ||
j.buf = append(j.buf, sanitizeLabelKey(field, false)...) | ||
// if matches keep going | ||
if j.lbs.ParserLabelHints().ShouldExtractPrefix(unsafeGetString(j.buf)) { | ||
return string(j.buf), true | ||
} | ||
return "", false | ||
} | ||
|
||
func (j *JSONParser) parseLabelValue(iter *jsoniter.Iterator, prefix, field string) { | ||
func (j *JSONParser) parseLabelValue(key, value []byte, dataType jsonparser.ValueType) { | ||
// the first time we use the field as label key. | ||
if len(prefix) == 0 { | ||
key, ok := j.keys.Get(unsafeGetBytes(field), func() (string, bool) { | ||
field = sanitizeLabelKey(field, true) | ||
if !j.lbs.ParserLabelHints().ShouldExtract(field) { | ||
return "", false | ||
} | ||
if len(j.prefixBuffer) == 0 { | ||
key, ok := j.keys.Get(key, func() (string, bool) { | ||
field := sanitizeLabelKey(string(key), true) | ||
if j.lbs.BaseHas(field) { | ||
field = field + duplicateSuffix | ||
} | ||
if !j.lbs.ParserLabelHints().ShouldExtract(field) { | ||
return "", false | ||
} | ||
return field, true | ||
}) | ||
if !ok { | ||
iter.Skip() | ||
return | ||
} | ||
j.lbs.Set(key, readValue(iter)) | ||
j.lbs.Set(key, readValue(value, dataType)) | ||
return | ||
|
||
} | ||
// otherwise we build the label key using the buffer | ||
j.buf = j.buf[:0] | ||
j.buf = append(j.buf, prefix...) | ||
j.buf = append(j.buf, byte(jsonSpacer)) | ||
j.buf = append(j.buf, sanitizeLabelKey(field, false)...) | ||
key, ok := j.keys.Get(j.buf, func() (string, bool) { | ||
if j.lbs.BaseHas(string(j.buf)) { | ||
j.buf = append(j.buf, duplicateSuffix...) | ||
|
||
// snapshot the current prefix position | ||
prefixLen := len(j.prefixBuffer) | ||
j.prefixBuffer = append(j.prefixBuffer, byte(jsonSpacer)) | ||
j.prefixBuffer = appendSanitized(j.prefixBuffer, key) | ||
keyString, ok := j.keys.Get(j.prefixBuffer, func() (string, bool) { | ||
if j.lbs.BaseHas(string(j.prefixBuffer)) { | ||
j.prefixBuffer = append(j.prefixBuffer, duplicateSuffix...) | ||
} | ||
if !j.lbs.ParserLabelHints().ShouldExtract(string(j.buf)) { | ||
if !j.lbs.ParserLabelHints().ShouldExtract(string(j.prefixBuffer)) { | ||
return "", false | ||
} | ||
return string(j.buf), true | ||
return string(j.prefixBuffer), true | ||
}) | ||
|
||
// reset the prefix position | ||
j.prefixBuffer = j.prefixBuffer[:prefixLen] | ||
if !ok { | ||
iter.Skip() | ||
return | ||
} | ||
j.lbs.Set(key, readValue(iter)) | ||
j.lbs.Set(keyString, readValue(value, dataType)) | ||
} | ||
|
||
func (j *JSONParser) RequiredLabelNames() []string { return []string{} } | ||
|
||
func readValue(iter *jsoniter.Iterator) string { | ||
switch iter.WhatIsNext() { | ||
case jsoniter.StringValue: | ||
v := iter.ReadString() | ||
// the rune error replacement is rejected by Prometheus, so we skip it. | ||
if strings.ContainsRune(v, utf8.RuneError) { | ||
return "" | ||
} | ||
return v | ||
case jsoniter.NumberValue: | ||
return iter.ReadNumber().String() | ||
case jsoniter.BoolValue: | ||
if iter.ReadBool() { | ||
func readValue(v []byte, dataType jsonparser.ValueType) string { | ||
switch dataType { | ||
case jsonparser.String: | ||
return unescapeJSONString(v) | ||
case jsonparser.Null: | ||
return "" | ||
case jsonparser.Number: | ||
return string(v) | ||
case jsonparser.Boolean: | ||
if bytes.Equal(v, trueBytes) { | ||
return trueString | ||
} | ||
return falseString | ||
default: | ||
iter.Skip() | ||
return "" | ||
} | ||
} | ||
|
||
func unescapeJSONString(b []byte) string { | ||
var stackbuf [unescapeStackBufSize]byte // stack-allocated array for allocation-free unescaping of small strings | ||
bU, err := jsonparser.Unescape(b, stackbuf[:]) | ||
Comment on lines
+167
to
+168
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is really nice. Do you know how much overhead I'm wondering if we should go the other way around and use the escape raw string. This makes it a little bit more complicated later on when the labels are matched or read out. Overall I'm wondering if labels should be lazy. I've talked to Ed about this. If we can somehow can ensure the original line is not garbage collected until the process is done it would be enough to keep an index or rather unsafe pointer into the line. No allocation is required. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most of the cpu is there, not sure if we can change this without risk. However I do like the lazy idea give it a go. What would be great is to limit extraction but I wasn't able to because the sanitize function is irreversible. |
||
if err != nil { | ||
return "" | ||
} | ||
res := string(bU) | ||
// rune error is rejected by Prometheus | ||
for _, r := range res { | ||
if r == utf8.RuneError { | ||
return "" | ||
} | ||
} | ||
return res | ||
} | ||
|
||
type RegexpParser struct { | ||
regex *regexp.Regexp | ||
nameIndex map[int]string | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could sanitize and unescape at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only sanitize key but not values, this is to comply to the Prometheus spec