Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uses custom json-iter decoder for log entries. #3163

Merged
merged 2 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/querytee/response_comparator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (

"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
jsoniter "github.com/json-iterator/go"

"github.com/grafana/loki/pkg/loghttp"
)

func compareStreams(expectedRaw, actualRaw json.RawMessage, tolerance float64) error {
var expected, actual loghttp.Streams

err := json.Unmarshal(expectedRaw, &expected)
err := jsoniter.Unmarshal(expectedRaw, &expected)
if err != nil {
return err
}
err = json.Unmarshal(actualRaw, &actual)
err = jsoniter.Unmarshal(actualRaw, &actual)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ require (
github.com/klauspost/compress v1.9.5
github.com/mitchellh/mapstructure v1.3.3
github.com/moby/term v0.0.0-20200915141129-7f0af18e79f2 // indirect
github.com/modern-go/reflect2 v1.0.1
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/opentracing/opentracing-go v1.2.0
// github.com/pierrec/lz4 v2.0.5+incompatible
Expand Down
3 changes: 2 additions & 1 deletion pkg/distributor/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ const applicationJSON = "application/json"

// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {

req, err := ParseRequest(r)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -48,6 +47,8 @@ func ParseRequest(r *http.Request) (*logproto.PushRequest, error) {
case applicationJSON:
var err error

// todo once https://github.com/weaveworks/common/commit/73225442af7da93ec8f6a6e2f7c8aafaee3f8840 is in Loki.
// We can try to pass the body as bytes.buffer instead to avoid reading into another buffer.
if loghttp.GetVersion(r.RequestURI) == loghttp.VersionV1 {
err = unmarshal.DecodePushRequest(r.Body, &req)
} else {
Expand Down
108 changes: 108 additions & 0 deletions pkg/loghttp/entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package loghttp

import (
"strconv"
"time"
"unsafe"

jsoniter "github.com/json-iterator/go"
"github.com/modern-go/reflect2"
)

func init() {
jsoniter.RegisterExtension(&jsonExtension{})
}

// Entry represents a log entry. It includes a log message and the time it occurred at.
type Entry struct {
Timestamp time.Time
Line string
}

type jsonExtension struct {
jsoniter.DummyExtension
}

type sliceEntryDecoder struct {
}

func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
i := 0
var ts time.Time
var line string
ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
var ok bool
switch i {
case 0:
ts, ok = readTimestamp(iter)
i++
return ok
case 1:
line = iter.ReadString()
i++
if iter.Error != nil {
return false
}
return true
default:
iter.ReportError("error reading entry", "array must contains 2 values")
return false
}
})
if ok {
*((*[]Entry)(ptr)) = append(*((*[]Entry)(ptr)), Entry{
Timestamp: ts,
Line: line,
})
return true
}
return false
})
}

func readTimestamp(iter *jsoniter.Iterator) (time.Time, bool) {
s := iter.ReadString()
if iter.Error != nil {
return time.Time{}, false
}
t, err := strconv.ParseInt(s, 10, 64)
if err != nil {
iter.ReportError("error reading entry timestamp", err.Error())
return time.Time{}, false

}
return time.Unix(0, t), true
}

type entryEncoder struct{}

func (entryEncoder) IsEmpty(ptr unsafe.Pointer) bool {
// we don't omit-empty with log entries.
return false
}

func (entryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
e := *((*Entry)(ptr))
stream.WriteArrayStart()
stream.WriteRaw(`"`)
stream.WriteRaw(strconv.FormatInt(e.Timestamp.UnixNano(), 10))
stream.WriteRaw(`"`)
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(e.Line)
stream.WriteArrayEnd()
}

func (e *jsonExtension) CreateDecoder(typ reflect2.Type) jsoniter.ValDecoder {
if typ == reflect2.TypeOf([]Entry{}) {
return sliceEntryDecoder{}
}
return nil
}

func (e *jsonExtension) CreateEncoder(typ reflect2.Type) jsoniter.ValEncoder {
if typ == reflect2.TypeOf(Entry{}) {
return entryEncoder{}
}
return nil
}
42 changes: 3 additions & 39 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"net/http"
"strconv"
"time"
"unsafe"

Expand All @@ -31,7 +30,7 @@ const (
QueryStatusFail = "fail"
)

//QueryResponse represents the http json response to a label query
// QueryResponse represents the http json response to a Loki range and instant query
type QueryResponse struct {
Status string `json:"status"`
Data QueryResponseData `json:"data"`
Expand All @@ -58,7 +57,7 @@ type ResultValue interface {
Type() ResultType
}

//QueryResponseData represents the http json response to a label query
// QueryResponseData represents the http json response to a label query
type QueryResponseData struct {
ResultType ResultType `json:"resultType"`
Result ResultValue `json:"result"`
Expand Down Expand Up @@ -92,18 +91,12 @@ func (s Streams) ToProto() []logproto.Stream {
return result
}

//Stream represents a log stream. It includes a set of log entries and their labels.
// Stream represents a log stream. It includes a set of log entries and their labels.
type Stream struct {
Labels LabelSet `json:"stream"`
Entries []Entry `json:"values"`
}

//Entry represents a log entry. It includes a log message and the time it occurred at.
type Entry struct {
Timestamp time.Time
Line string
}

// UnmarshalJSON implements the json.Unmarshaler interface.
func (q *QueryResponseData) UnmarshalJSON(data []byte) error {
unmarshal := struct {
Expand Down Expand Up @@ -152,35 +145,6 @@ func (q *QueryResponseData) UnmarshalJSON(data []byte) error {
return nil
}

// MarshalJSON implements the json.Marshaler interface.
func (e *Entry) MarshalJSON() ([]byte, error) {
l, err := json.Marshal(e.Line)
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf("[\"%d\",%s]", e.Timestamp.UnixNano(), l)), nil
}

// UnmarshalJSON implements the json.Unmarshaler interface.
func (e *Entry) UnmarshalJSON(data []byte) error {
var unmarshal []string

err := json.Unmarshal(data, &unmarshal)
if err != nil {
return err
}

t, err := strconv.ParseInt(unmarshal[0], 10, 64)
if err != nil {
return err
}

e.Timestamp = time.Unix(0, t)
e.Line = unmarshal[1]

return nil
}

// Scalar is a single timestamp/float with no labels
type Scalar model.Scalar

Expand Down
3 changes: 0 additions & 3 deletions pkg/logql/marshal/marshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ import (
// WriteQueryResponseJSON marshals the promql.Value to v1 loghttp JSON and then
// writes it to the provided io.Writer.
func WriteQueryResponseJSON(v logql.Result, w io.Writer) error {

value, err := NewResultValue(v.Data)

if err != nil {
return err
}
Expand Down Expand Up @@ -52,7 +50,6 @@ func WriteLabelResponseJSON(l logproto.LabelResponse, w io.Writer) error {
// then writes it to the provided connection.
func WriteTailResponseJSON(r legacy.TailResponse, c *websocket.Conn) error {
v1Response, err := NewTailResponse(r)

if err != nil {
return err
}
Expand Down
11 changes: 10 additions & 1 deletion pkg/logql/marshal/marshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,6 @@ func Test_TailResponseMarshalLoop(t *testing.T) {
}

func Test_WriteSeriesResponseJSON(t *testing.T) {

for i, tc := range []struct {
input logproto.SeriesResponse
expected string
Expand Down Expand Up @@ -512,3 +511,13 @@ func testJSONBytesEqual(t *testing.T, expected []byte, actual []byte, msg string

require.Equalf(t, expectedValue, actualValue, msg, args)
}

func Benchmark_Encode(b *testing.B) {
buf := bytes.NewBuffer(nil)

for n := 0; n < b.N; n++ {
for _, queryTest := range queryTests {
require.NoError(b, WriteQueryResponseJSON(logql.Result{Data: queryTest.actual}, buf))
}
}
}
24 changes: 4 additions & 20 deletions pkg/logql/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package unmarshal

import (
"io"
"unsafe"

json "github.com/json-iterator/go"

Expand All @@ -13,12 +14,9 @@ import (
func DecodePushRequest(b io.Reader, r *logproto.PushRequest) error {
var request loghttp.PushRequest

err := json.NewDecoder(b).Decode(&request)

if err != nil {
if err := json.NewDecoder(b).Decode(&request); err != nil {
return err
}

*r = NewPushRequest(request)

return nil
Expand All @@ -39,22 +37,8 @@ func NewPushRequest(r loghttp.PushRequest) logproto.PushRequest {

// NewStream constructs a logproto.Stream from a Stream
func NewStream(s *loghttp.Stream) logproto.Stream {
ret := logproto.Stream{
Entries: make([]logproto.Entry, len(s.Entries)),
return logproto.Stream{
Entries: *(*[]logproto.Entry)(unsafe.Pointer(&s.Entries)),
Labels: s.Labels.String(),
}

for i, e := range s.Entries {
ret.Entries[i] = NewEntry(e)
}

return ret
}

// NewEntry constructs a logproto.Entry from a Entry
func NewEntry(e loghttp.Entry) logproto.Entry {
return logproto.Entry{
Timestamp: e.Timestamp,
Line: e.Line,
}
}
35 changes: 34 additions & 1 deletion pkg/logql/unmarshal/unmarshal_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package unmarshal

import (
"fmt"
"io/ioutil"
"strings"
"testing"
Expand Down Expand Up @@ -44,7 +45,6 @@ var pushTests = []struct {
}

func Test_DecodePushRequest(t *testing.T) {

for i, pushTest := range pushTests {
var actual logproto.PushRequest
closer := ioutil.NopCloser(strings.NewReader(pushTest.actual))
Expand All @@ -55,3 +55,36 @@ func Test_DecodePushRequest(t *testing.T) {
require.Equalf(t, pushTest.expected, actual.Streams, "Push Test %d failed", i)
}
}

func Benchmark_DecodePushRequest(b *testing.B) {
requestFmt := `{
"streams": [
{
"stream": {
"test": "test",
"foo" : "bar"
},
"values":[
%s
]
}
]
}`
var entries strings.Builder
for i := 0; i < 10000; i++ {
entries.WriteString(`[ "123456789012345", "WARN [CompactionExecutor:61771] 2021-01-12 09:40:23,192 TimeWindowCompactionController.java:41 - You are running with sstables overlapping checks disabled, it can result in loss of data" ],`)
}
requestString := fmt.Sprintf(requestFmt, entries.String()[:len(entries.String())-1])
r := strings.NewReader("")

b.ResetTimer()
b.ReportAllocs()

for n := 0; n < b.N; n++ {
var actual logproto.PushRequest
r.Reset(requestString)
err := DecodePushRequest(r, &actual)
require.NoError(b, err)
require.Equal(b, 10000, len(actual.Streams[0].Entries))
}
}
1 change: 1 addition & 0 deletions vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ github.com/moby/term/windows
# github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
## explicit
github.com/modern-go/reflect2
# github.com/morikuni/aec v1.0.0
github.com/morikuni/aec
Expand Down