Skip to content

Commit

Permalink
Add metadata to push payload (#9694)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
We are adding support for attaching labels to each log line. This is one
of the series of the PRs broken up to make it easier to review changes.

This PR updates the push payload to send labels with each log entry
optionally. The log labels are supposed to be in the same format as the
stream labels. Just to put it out, here is how it would look for proto
and json push payload with same data:

**proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`,
`Content-Type`: `application/x-protobuf`)**(payload built using
[push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)):
```
push.Stream{
	Entries: []logproto.Entry{
			{
				Timestamp: time.Unix(0, 1688515200000000000),
				Line:      "log line",
				Labels:    `{foo="bar"}`,
			},
	},
	Labels: `{app="test"}`,
}
```

**v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`:
`application/json`)**:
```json
{
    "streams": [{
        "stream": {
            "app": "test"
        },
        "values": [
            ["1688515200000000000", "log line", {
                "foo": "bar"
            }]
        ]
    }]
}
```
**legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**:
```json
{
    "streams": [{
        "labels": "{app=\"test\"}",
        "entries": [{
                "ts": "2023-07-05T00:00:00.000000000Z",
                "line": "log line",
                "labels": "{foo=\"bar\"}"
            }]
    }]
}
```
**Which issue(s) this PR fixes**:

**Special notes for your reviewer**:

We may need to add more thoughtful tests.

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e)

---------

Co-authored-by: Sandeep Sukhani <[email protected]>
  • Loading branch information
salvacorts and sandeepsukhani authored Jul 13, 2023
1 parent 90fc510 commit aae13c3
Show file tree
Hide file tree
Showing 24 changed files with 1,740 additions and 372 deletions.
29 changes: 23 additions & 6 deletions integration/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"strings"
"time"

"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/user"
)

Expand Down Expand Up @@ -86,13 +87,28 @@ func New(instanceID, token, baseURL string, opts ...Option) *Client {

// PushLogLine creates a new logline with the current time as timestamp
func (c *Client) PushLogLine(line string, extraLabels ...map[string]string) error {
return c.pushLogLine(line, c.Now, extraLabels...)
return c.pushLogLine(line, c.Now, nil, extraLabels...)
}

func (c *Client) PushLogLineWithMetadata(line string, logLabels map[string]string, extraLabels ...map[string]string) error {
return c.PushLogLineWithTimestampAndMetadata(line, c.Now, logLabels, extraLabels...)
}

// PushLogLineWithTimestamp creates a new logline at the given timestamp
// The timestamp has to be a Unix timestamp (epoch seconds)
func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extraLabelList ...map[string]string) error {
return c.pushLogLine(line, timestamp, extraLabelList...)
func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extraLabels ...map[string]string) error {
return c.pushLogLine(line, timestamp, nil, extraLabels...)
}

func (c *Client) PushLogLineWithTimestampAndMetadata(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error {
// If the logLabels map is empty, labels.FromMap will allocate some empty slices.
// Since this code is executed for every log line we receive, as an optimization
// to avoid those allocations we'll call labels.FromMap only if the map is not empty.
var lbls labels.Labels
if len(logLabels) > 0 {
lbls = labels.FromMap(logLabels)
}
return c.pushLogLine(line, timestamp, lbls, extraLabelList...)
}

func formatTS(ts time.Time) string {
Expand All @@ -101,21 +117,22 @@ func formatTS(ts time.Time) string {

type stream struct {
Stream map[string]string `json:"stream"`
Values [][]string `json:"values"`
Values [][]any `json:"values"`
}

// pushLogLine creates a new logline
func (c *Client) pushLogLine(line string, timestamp time.Time, extraLabelList ...map[string]string) error {
func (c *Client) pushLogLine(line string, timestamp time.Time, logLabels labels.Labels, extraLabelList ...map[string]string) error {
apiEndpoint := fmt.Sprintf("%s/loki/api/v1/push", c.baseURL)

s := stream{
Stream: map[string]string{
"job": "varlog",
},
Values: [][]string{
Values: [][]any{
{
formatTS(timestamp),
line,
logLabels,
},
},
}
Expand Down
1 change: 0 additions & 1 deletion integration/loki_micro_services_delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"

"github.com/grafana/loki/pkg/storage"
)

Expand Down
10 changes: 5 additions & 5 deletions integration/loki_micro_services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,13 @@ func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T)
cliQueryFrontend.Now = now

t.Run("ingest-logs", func(t *testing.T) {
// ingest logs to the previous period
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-36*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineA", time.Now().Add(-48*time.Hour), map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestampAndMetadata("lineB", time.Now().Add(-36*time.Hour), map[string]string{"traceID": "456"}, map[string]string{"job": "fake"}))

// ingest logs to the current period
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineC", map[string]string{"traceID": "789"}, map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithMetadata("lineD", map[string]string{"traceID": "123"}, map[string]string{"job": "fake"}))

})

t.Run("query-lookback-default", func(t *testing.T) {
Expand Down
72 changes: 63 additions & 9 deletions pkg/loghttp/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/buger/jsonparser"
jsoniter "github.com/json-iterator/go"
"github.com/modern-go/reflect2"
"github.com/prometheus/prometheus/model/labels"
)

func init() {
Expand All @@ -16,8 +17,9 @@ func init() {

// Entry represents a log entry. It includes a log message and the time it occurred at.
type Entry struct {
Timestamp time.Time
Line string
Timestamp time.Time
Line string
NonIndexedLabels labels.Labels
}

func (e *Entry) UnmarshalJSON(data []byte) error {
Expand All @@ -27,25 +29,49 @@ func (e *Entry) UnmarshalJSON(data []byte) error {
)
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
// assert that both items in array are of type string
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
switch i {
case 0: // timestamp
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
ts, err := jsonparser.ParseInt(value)
if err != nil {
parseError = err
return
}
e.Timestamp = time.Unix(0, ts)
case 1: // value
if t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
}
v, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Line = v
case 2: // labels
if t != jsonparser.Object {
parseError = jsonparser.MalformedObjectError
return
}
var nonIndexedLabels labels.Labels
if err := jsonparser.ObjectEach(value, func(key []byte, value []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: string(key),
Value: string(value),
})
return nil
}); err != nil {
parseError = err
return
}
e.NonIndexedLabels = nonIndexedLabels
}
i++
})
Expand All @@ -67,6 +93,7 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
i := 0
var ts time.Time
var line string
var nonIndexedLabels labels.Labels
ok := iter.ReadArrayCB(func(iter *jsoniter.Iterator) bool {
var ok bool
switch i {
Expand All @@ -81,15 +108,30 @@ func (sliceEntryDecoder) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
return false
}
return true
case 2:
iter.ReadMapCB(func(iter *jsoniter.Iterator, labelName string) bool {
labelValue := iter.ReadString()
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: labelName,
Value: labelValue,
})
return true
})
i++
if iter.Error != nil {
return false
}
return true
default:
iter.ReportError("error reading entry", "array must contains 2 values")
iter.ReportError("error reading entry", "array must have at least 2 and up to 3 values")
return false
}
})
if ok {
*((*[]Entry)(ptr)) = append(*((*[]Entry)(ptr)), Entry{
Timestamp: ts,
Line: line,
Timestamp: ts,
Line: line,
NonIndexedLabels: nonIndexedLabels,
})
return true
}
Expand Down Expand Up @@ -126,6 +168,18 @@ func (EntryEncoder) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
stream.WriteRaw(`"`)
stream.WriteMore()
stream.WriteStringWithHTMLEscaped(e.Line)
if len(e.NonIndexedLabels) > 0 {
stream.WriteMore()
stream.WriteObjectStart()
for i, lbl := range e.NonIndexedLabels {
if i > 0 {
stream.WriteMore()
}
stream.WriteObjectField(lbl.Name)
stream.WriteString(lbl.Value)
}
stream.WriteObjectEnd()
}
stream.WriteArrayEnd()
}

Expand Down
128 changes: 124 additions & 4 deletions pkg/loghttp/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"time"
"unsafe"

"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"

"github.com/buger/jsonparser"
json "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"

"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage/stores/index/seriesvolume"
)

var (
Expand Down Expand Up @@ -57,9 +57,129 @@ func (q *QueryResponse) UnmarshalJSON(data []byte) error {
})
}

// PushRequest models a log stream push
// PushRequest models a log stream push but is unmarshalled to proto push format.
type PushRequest struct {
Streams []*Stream `json:"streams"`
Streams []LogProtoStream `json:"streams"`
}

// LogProtoStream helps with unmarshalling of each log stream for push request.
// This might look un-necessary but without it the CPU usage in benchmarks was increasing by ~25% :shrug:
type LogProtoStream logproto.Stream

func (s *LogProtoStream) UnmarshalJSON(data []byte) error {
err := jsonparser.ObjectEach(data, func(key, val []byte, ty jsonparser.ValueType, _ int) error {
switch string(key) {
case "stream":
labels := make(LabelSet)
err := jsonparser.ObjectEach(val, func(key, val []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
labels[string(key)] = string(val)
return nil
})
if err != nil {
return err
}
s.Labels = labels.String()
case "values":
if ty == jsonparser.Null {
return nil
}
entries, err := unmarshalHTTPToLogProtoEntries(val)
if err != nil {
return err
}
s.Entries = entries
}
return nil
})
return err
}

func unmarshalHTTPToLogProtoEntries(data []byte) ([]logproto.Entry, error) {
var (
entries []logproto.Entry
parseError error
)
if _, err := jsonparser.ArrayEach(data, func(value []byte, ty jsonparser.ValueType, _ int, err error) {
if err != nil || parseError != nil {
return
}
if ty == jsonparser.Null {
return
}
e, err := unmarshalHTTPToLogProtoEntry(value)
if err != nil {
parseError = err
return
}
entries = append(entries, e)
}); err != nil {
parseError = err
}

if parseError != nil {
return nil, parseError
}

return entries, nil
}

func unmarshalHTTPToLogProtoEntry(data []byte) (logproto.Entry, error) {
var (
i int
parseError error
e logproto.Entry
)
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
// assert that both items in array are of type string
if (i == 0 || i == 1) && t != jsonparser.String {
parseError = jsonparser.MalformedStringError
return
} else if i == 2 && t != jsonparser.Object {
parseError = jsonparser.MalformedObjectError
return
}
switch i {
case 0: // timestamp
ts, err := jsonparser.ParseInt(value)
if err != nil {
parseError = err
return
}
e.Timestamp = time.Unix(0, ts)
case 1: // value
v, err := jsonparser.ParseString(value)
if err != nil {
parseError = err
return
}
e.Line = v
case 2: // nonIndexedLabels
var nonIndexedLabels labels.Labels
err := jsonparser.ObjectEach(value, func(key, val []byte, dataType jsonparser.ValueType, _ int) error {
if dataType != jsonparser.String {
return jsonparser.MalformedStringError
}
nonIndexedLabels = append(nonIndexedLabels, labels.Label{
Name: string(key),
Value: string(val),
})
return nil
})
if err != nil {
parseError = err
return
}
e.NonIndexedLabels = nonIndexedLabels
}
i++
})
if parseError != nil {
return e, parseError
}
return e, err
}

// ResultType holds the type of the result
Expand Down
Loading

0 comments on commit aae13c3

Please sign in to comment.