Skip to content

Commit

Permalink
feat: add _extracted suffix to detected fields conflicts (#13993)
Browse files Browse the repository at this point in the history
  • Loading branch information
trevorwhitney authored Aug 29, 2024
1 parent a0c7598 commit ab1caea
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 26 deletions.
15 changes: 12 additions & 3 deletions pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -1199,6 +1199,11 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p
emtpyparser := ""

for _, stream := range streams {
streamLbls, err := syntax.ParseLabels(stream.Labels)
if err != nil {
streamLbls = labels.EmptyLabels()
}

for _, entry := range stream.Entries {
structuredMetadata := getStructuredMetadata(entry)
for k, vals := range structuredMetadata {
Expand Down Expand Up @@ -1226,7 +1231,7 @@ func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*p
}
}

detected, parser := parseLine(entry.Line)
detected, parser := parseLine(entry.Line, streamLbls)
for k, vals := range detected {
df, ok := detectedFields[k]
if !ok && fieldCount < limit {
Expand Down Expand Up @@ -1283,11 +1288,11 @@ func getStructuredMetadata(entry push.Entry) map[string][]string {
return result
}

func parseLine(line string) (map[string][]string, *string) {
func parseLine(line string, streamLbls labels.Labels) (map[string][]string, *string) {
parser := "logfmt"
logFmtParser := logql_log.NewLogfmtParser(true, false)

lbls := logql_log.NewBaseLabelsBuilder().ForLabels(labels.EmptyLabels(), 0)
lbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, 0)
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lbls)
if !logfmtSuccess || lbls.HasErr() {
parser = "json"
Expand All @@ -1301,6 +1306,10 @@ func parseLine(line string) (map[string][]string, *string) {

parsedLabels := map[string]map[string]struct{}{}
for _, lbl := range lbls.LabelsResult().Labels() {
// skip indexed labels, as we only want detected fields
if streamLbls.Has(lbl.Name) {
continue
}
if values, ok := parsedLabels[lbl.Name]; ok {
values[lbl.Value] = struct{}{}
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/querier_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream
}

func mockLogfmtStream(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`)
return mockLogfmtStreamWithLabels(from, quantity, `{type="test", name="foo"}`)
}

func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
Expand All @@ -586,7 +586,7 @@ func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Str
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`,
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t name=bar`,
i,
i,
(i * 10),
Expand Down
93 changes: 72 additions & 21 deletions pkg/querier/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1777,15 +1777,16 @@ func TestQuerier_DetectedFields(t *testing.T) {
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)
assert.Len(t, detectedFields, 8)
expectedCardinality := map[string]uint64{
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
"name_extracted": 1,
}
for _, d := range detectedFields {
card := expectedCardinality[d.Label]
Expand Down Expand Up @@ -1821,17 +1822,18 @@ func TestQuerier_DetectedFields(t *testing.T) {
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 9)
assert.Len(t, detectedFields, 10)
expectedCardinality := map[string]uint64{
"variable": 5,
"constant": 1,
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
"variable": 5,
"constant": 1,
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
"name_extracted": 1,
}
for _, d := range detectedFields {
card := expectedCardinality[d.Label]
Expand Down Expand Up @@ -1867,7 +1869,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)
assert.Len(t, detectedFields, 8)

var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField
for _, field := range detectedFields {
Expand Down Expand Up @@ -1923,7 +1925,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 9)
assert.Len(t, detectedFields, 10)

var messageField, countField, bytesField, durationField, floatField, evenField, constantField, variableField *logproto.DetectedField
for _, field := range detectedFields {
Expand Down Expand Up @@ -1955,7 +1957,56 @@ func TestQuerier_DetectedFields(t *testing.T) {
assert.Equal(t, []string{"logfmt"}, evenField.Parsers)
assert.Equal(t, []string{""}, constantField.Parsers)
assert.Equal(t, []string{""}, variableField.Parsers)
})
},
)

t.Run(
"adds _extracted suffix to detected fields that conflict with indexed labels",
func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)

queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStreamWithStructuredMetadata(1, 2)}), nil)

ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)

querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)

resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)

detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 10)

var nameField *logproto.DetectedField
for _, field := range detectedFields {
switch field.Label {
case "name_extracted":
nameField = field
}
}

assert.NotNil(t, nameField)
assert.Equal(t, "name_extracted", nameField.Label)
assert.Equal(t, logproto.DetectedFieldString, nameField.Type)
assert.Equal(t, []string{"logfmt"}, nameField.Parsers)
assert.Equal(t, uint64(1), nameField.Cardinality)
},
)
}

func BenchmarkQuerierDetectedFields(b *testing.B) {
Expand Down

0 comments on commit ab1caea

Please sign in to comment.