Skip to content

Commit

Permalink
Fixes unmarshalling of tailing responses.
Browse files Browse the repository at this point in the history
Some websocket were still using old json package.

Signed-off-by: Cyril Tovena <[email protected]>
  • Loading branch information
cyriltovena committed Jan 26, 2021
1 parent 33abab5 commit ac84d01
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 6 deletions.
5 changes: 2 additions & 3 deletions pkg/canary/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/build"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/unmarshal"
)

var (
Expand Down Expand Up @@ -303,7 +304,6 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}
tss = append(tss, *ts)
}

}
default:
return nil, fmt.Errorf("unexpected result type, expected a log stream result instead received %v", value.Type())
Expand All @@ -313,7 +313,6 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}

func (r *Reader) run() {

r.closeAndReconnect()

tailResponse := &loghttp.TailResponse{}
Expand All @@ -332,7 +331,7 @@ func (r *Reader) run() {
// Set a read timeout of 10x the interval we expect to see messages
// Ignore the error as it will get caught when we call ReadJSON
_ = r.conn.SetReadDeadline(time.Now().Add(10 * r.interval))
err := r.conn.ReadJSON(tailResponse)
err := unmarshal.ReadTailResponseJSON(tailResponse, r.conn)
if err != nil {
fmt.Fprintf(r.w, "error reading websocket, will retry in 10 seconds: %s\n", err)
// Even though we sleep between connection retries, we found it's possible to DOS Loki if the connection
Expand Down
5 changes: 2 additions & 3 deletions pkg/logcli/query/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql/unmarshal"
)

// TailQuery connects to the Loki websocket endpoint and tails logs
Expand Down Expand Up @@ -43,7 +44,7 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
}

for {
err := conn.ReadJSON(tailResponse)
err := unmarshal.ReadTailResponseJSON(tailResponse, conn)
if err != nil {
log.Println("Error reading stream:", err)
return
Expand All @@ -52,7 +53,6 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
labels := loghttp.LabelSet{}
for _, stream := range tailResponse.Streams {
if !q.NoLabels {

if len(q.IgnoreLabelsKey) > 0 || len(q.ShowLabelsKey) > 0 {

ls := stream.Labels
Expand All @@ -70,7 +70,6 @@ func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
} else {
labels = stream.Labels
}

}

for _, entry := range stream.Entries {
Expand Down
15 changes: 15 additions & 0 deletions pkg/logql/unmarshal/unmarshal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"unsafe"

json "github.com/json-iterator/go"
jsoniter "github.com/json-iterator/go"

"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
Expand Down Expand Up @@ -42,3 +43,17 @@ func NewStream(s *loghttp.Stream) logproto.Stream {
Labels: s.Labels.String(),
}
}

// WebsocketReader knows how to read message to a websocket connection.
type WebsocketReader interface {
ReadMessage() (int, []byte, error)
}

// ReadTailResponseJSON unmarshals the loghttp.TailResponse from a websocket reader.
func ReadTailResponseJSON(r *loghttp.TailResponse, reader WebsocketReader) error {
_, data, err := reader.ReadMessage()
if err != nil {
return err
}
return jsoniter.Unmarshal(data, r)
}
42 changes: 42 additions & 0 deletions pkg/logql/unmarshal/unmarshal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (

"github.com/stretchr/testify/require"

"github.com/grafana/loki/pkg/loghttp"
legacy_loghttp "github.com/grafana/loki/pkg/loghttp/legacy"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/marshal"
)

// covers requests to /loki/api/v1/push
Expand Down Expand Up @@ -88,3 +91,42 @@ func Benchmark_DecodePushRequest(b *testing.B) {
require.Equal(b, 10000, len(actual.Streams[0].Entries))
}
}

type websocket struct {
buf []byte
}

func (ws *websocket) WriteMessage(t int, data []byte) error {
ws.buf = append(ws.buf, data...)
return nil
}

func (ws *websocket) ReadMessage() (int, []byte, error) {
return 0, ws.buf, nil
}

func Test_ReadTailResponse(t *testing.T) {
ws := &websocket{}
require.NoError(t, marshal.WriteTailResponseJSON(legacy_loghttp.TailResponse{
Streams: []logproto.Stream{
{Labels: `{app="bar"}`, Entries: []logproto.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}}},
},
DroppedEntries: []legacy_loghttp.DroppedEntry{
{Timestamp: time.Unix(0, 1), Labels: `{app="foo"}`},
},
}, ws))
res := &loghttp.TailResponse{}
require.NoError(t, ReadTailResponseJSON(res, ws))

require.Equal(t, &loghttp.TailResponse{
Streams: []loghttp.Stream{
{
Labels: loghttp.LabelSet{"app": "bar"},
Entries: []loghttp.Entry{{Timestamp: time.Unix(0, 2), Line: "2"}},
},
},
DroppedStreams: []loghttp.DroppedStream{
{Timestamp: time.Unix(0, 1), Labels: loghttp.LabelSet{"app": "foo"}},
},
}, res)
}

0 comments on commit ac84d01

Please sign in to comment.