Skip to content

Commit

Permalink
[badger] Filter duplicate results from a single indexSeek [fixes #1604]…
Browse files Browse the repository at this point in the history
… (#1649)

Signed-off-by: Michael Burman <[email protected]>
  • Loading branch information
burmanm authored and yurishkuro committed Jul 4, 2019
1 parent c36a281 commit 62cf818
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 34 deletions.
9 changes: 8 additions & 1 deletion plugin/storage/badger/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,16 @@ func (r *TraceReader) indexSeeksToTraceIDs(query *spanstore.TraceQueryParameters
if err != nil {
return nil, err
}

// Same traceID can be returned multiple times, but always in sorted order so checking the previous key is enough
prevTraceID := []byte{}
ids = append(ids, make([][]byte, 0, len(indexResults)))
for _, k := range indexResults {
ids[i] = append(ids[i], k[len(k)-sizeOfTraceID:])
traceID := k[len(k)-sizeOfTraceID:]
if !bytes.Equal(prevTraceID, traceID) {
ids[i] = append(ids[i], traceID)
prevTraceID = traceID
}
}
}
return ids, nil
Expand Down
123 changes: 90 additions & 33 deletions plugin/storage/badger/spanstore/rw_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,24 @@
package spanstore

import (
"bytes"
"context"
"encoding/binary"
"testing"
"time"

"github.com/dgraph-io/badger"
"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/storage/spanstore"
)

func TestEncodingTypes(t *testing.T) {

tid := time.Now()

dummyKv := []model.KeyValue{
model.KeyValue{
Key: "key",
VType: model.StringType,
VStr: "value",
},
}

testSpan := model.Span{
TraceID: model.TraceID{
Low: uint64(0),
High: 1,
},
SpanID: model.SpanID(0),
OperationName: "operation",
Process: &model.Process{
ServiceName: "service",
Tags: dummyKv,
},
StartTime: tid.Add(time.Duration(1 * time.Millisecond)),
Duration: time.Duration(1 * time.Millisecond),
Tags: dummyKv,
Logs: []model.Log{
model.Log{
Timestamp: tid,
Fields: dummyKv,
},
},
}

// JSON encoding
runWithBadger(t, func(store *badger.DB, t *testing.T) {
testSpan := createDummySpan()

cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour), nil)
rw := NewTraceReader(store, cache)
Expand All @@ -75,6 +47,8 @@ func TestEncodingTypes(t *testing.T) {

// Unknown encoding write
runWithBadger(t, func(store *badger.DB, t *testing.T) {
testSpan := createDummySpan()

cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour), nil)
// rw := NewTraceReader(store, cache)
Expand All @@ -86,6 +60,8 @@ func TestEncodingTypes(t *testing.T) {

// Unknown encoding reader
runWithBadger(t, func(store *badger.DB, t *testing.T) {
testSpan := createDummySpan()

cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour), nil)
rw := NewTraceReader(store, cache)
Expand Down Expand Up @@ -119,3 +95,84 @@ func TestDecodeErrorReturns(t *testing.T) {
_, err = decodeValue(garbage, jsonEncoding)
assert.Error(t, err)
}

func TestSortMergeIdsDuplicateDetection(t *testing.T) {
// Different IndexSeeks return the same results
ids := make([][][]byte, 2)
ids[0] = make([][]byte, 1)
ids[1] = make([][]byte, 1)
buf := new(bytes.Buffer)
binary.Write(buf, binary.BigEndian, uint64(0))
binary.Write(buf, binary.BigEndian, uint64(156697987635))
b := buf.Bytes()
ids[0][0] = b
ids[1][0] = b

query := &spanstore.TraceQueryParameters{
NumTraces: 64,
}

traces := sortMergeIds(query, ids)
assert.Equal(t, 1, len(traces))
}

func TestDuplicateTraceIDDetection(t *testing.T) {
runWithBadger(t, func(store *badger.DB, t *testing.T) {
testSpan := createDummySpan()
cache := NewCacheStore(store, time.Duration(1*time.Hour), true)
sw := NewSpanWriter(store, cache, time.Duration(1*time.Hour), nil)
rw := NewTraceReader(store, cache)

for i := 0; i < 8; i++ {
testSpan.SpanID = model.SpanID(i)
testSpan.StartTime = testSpan.StartTime.Add(time.Millisecond)
err := sw.WriteSpan(&testSpan)
assert.NoError(t, err)
}

traces, err := rw.FindTraceIDs(context.Background(), &spanstore.TraceQueryParameters{
ServiceName: "service",
StartTimeMax: time.Now().Add(time.Hour),
StartTimeMin: testSpan.StartTime.Add(-1 * time.Hour),
})

assert.NoError(t, err)
assert.Equal(t, 1, len(traces))
})
}

func createDummySpan() model.Span {
tid := time.Now()

dummyKv := []model.KeyValue{
model.KeyValue{
Key: "key",
VType: model.StringType,
VStr: "value",
},
}

testSpan := model.Span{
TraceID: model.TraceID{
Low: uint64(0),
High: 1,
},
SpanID: model.SpanID(0),
OperationName: "operation",
Process: &model.Process{
ServiceName: "service",
Tags: dummyKv,
},
StartTime: tid.Add(time.Duration(1 * time.Millisecond)),
Duration: time.Duration(1 * time.Millisecond),
Tags: dummyKv,
Logs: []model.Log{
model.Log{
Timestamp: tid,
Fields: dummyKv,
},
},
}

return testSpan
}

0 comments on commit 62cf818

Please sign in to comment.