Skip to content

Commit

Permalink
Fix badger merge-join algorithm to correctly filter indexes (#1721)
Browse files Browse the repository at this point in the history
* Fix merge-join algorithm to correctly filter indexes, closes #1719

Signed-off-by: Michael Burman <[email protected]>

* Address comments

Signed-off-by: Michael Burman <[email protected]>
  • Loading branch information
burmanm authored and pavolloffay committed Aug 19, 2019
1 parent 9740087 commit ecdecd1
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 37 deletions.
6 changes: 6 additions & 0 deletions plugin/storage/badger/spanstore/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ func TestIndexSeeks(t *testing.T) {
VStr: fmt.Sprintf("val%d", j),
VType: model.StringType,
},
{
Key: "error",
VType: model.BoolType,
VBool: true,
},
},
}
err := sw.WriteSpan(&s)
Expand Down Expand Up @@ -200,6 +205,7 @@ func TestIndexSeeks(t *testing.T) {
params.OperationName = "operation-1"
tags := make(map[string]string)
tags["k11"] = "val0"
tags["error"] = "true"
params.Tags = tags
params.DurationMin = time.Duration(1 * time.Millisecond)
// params.DurationMax = time.Duration(1 * time.Hour)
Expand Down
85 changes: 48 additions & 37 deletions plugin/storage/badger/spanstore/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,12 @@ func (r *TraceReader) getTraces(traceIDs []model.TraceID) ([]*model.Trace, error

err := r.store.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchSize = 10 // TraceIDs are not sorted, pointless to prefetch large amount of values
it := txn.NewIterator(opts)
defer it.Close()

val := []byte{}
for _, prefix := range prefixes {
spans := make([]*model.Span, 0, 4) // reduce reallocation requirements by defining some initial length
spans := make([]*model.Span, 0, 32) // reduce reallocation requirements by defining some initial length

for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() {
// Add value to the span store (decode from JSON / defined encoding first)
Expand Down Expand Up @@ -346,53 +345,65 @@ func (r *TraceReader) durationQueries(query *spanstore.TraceQueryParameters, ids
return ids
}

func mergeJoinIds(left, right [][]byte) [][]byte {
// len(left) or len(right) is the maximum, whichever is the smallest
allocateSize := len(left)
if len(right) < allocateSize {
allocateSize = len(right)
}

merged := make([][]byte, 0, allocateSize)

lMax := len(left) - 1
rMax := len(right) - 1
for r, l := 0, 0; r <= rMax && l <= lMax; {
switch bytes.Compare(left[l], right[r]) {
case 0:
// Left matches right - merge
merged = append(merged, left[l])
// Advance both
l++
r++
case 1:
// left > right, increase right one
r++
case -1:
// left < right, increase left one
l++
}
}
return merged
}

// sortMergeIds does a sort-merge join operation to the list of TraceIDs to remove duplicates
func sortMergeIds(query *spanstore.TraceQueryParameters, ids [][][]byte) []model.TraceID {
// Key only scan is a lot faster in the badger - use sort-merge join algorithm instead of hash join since we have the keys in sorted order already
intersected := ids[0]
mergeIntersected := make([][]byte, 0, len(intersected)) // intersected is the maximum size

var merged [][]byte

if len(ids) > 1 {
for i := 1; i < len(ids); i++ {
mergeIntersected = make([][]byte, 0, len(intersected)) // intersected is the maximum size
k := len(intersected) - 1
for j := len(ids[i]) - 1; j >= 0 && k >= 0; {
// The result will be 0 if a==b, -1 if a < b, and +1 if a > b.
switch bytes.Compare(intersected[k], ids[i][j]) {
case 1:
k-- // Move on to the next item in the intersected list
// a > b
case -1:
j--
// a < b
// Move on to next iteration of j
case 0:
mergeIntersected = append(mergeIntersected, intersected[k])
k-- // Move on to next item
// Match
}
}
intersected = mergeIntersected
merged = mergeJoinIds(ids[0], ids[1])
for i := 2; i < len(ids); i++ {
merged = mergeJoinIds(merged, ids[i])
}

} else {
// mergeIntersected should be reversed intersected
for i, j := 0, len(intersected)-1; j >= 0; i, j = i+1, j-1 {
mergeIntersected = append(mergeIntersected, intersected[j])
}
intersected = mergeIntersected
merged = ids[0]
}

// Get top query.NumTraces results (order in DESC)
if query.NumTraces < len(merged) {
merged = merged[len(merged)-query.NumTraces:]
}

// Get top query.NumTraces results (note, the slice is now in descending timestamp order)
if query.NumTraces < len(intersected) {
intersected = intersected[:query.NumTraces]
// Results are in ASC (badger's default order), but Jaeger uses DESC, thus we need to reverse the array
for left, right := 0, len(merged)-1; left < right; left, right = left+1, right-1 {
merged[left], merged[right] = merged[right], merged[left]
}

// Enrich the traceIds to model.Trace
// result := make([]*model.Trace, 0, len(intersected))
keys := make([]model.TraceID, 0, len(intersected))
// Create the structs from [][]byte to TraceID
keys := make([]model.TraceID, 0, len(merged))

for _, key := range intersected {
for _, key := range merged {
keys = append(keys, model.TraceID{
High: binary.BigEndian.Uint64(key[:8]),
Low: binary.BigEndian.Uint64(key[8:]),
Expand Down
34 changes: 34 additions & 0 deletions plugin/storage/badger/spanstore/rw_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,37 @@ func createDummySpan() model.Span {

return testSpan
}

func TestMergeJoin(t *testing.T) {
assert := assert.New(t)

// Test equals

left := make([][]byte, 16)
right := make([][]byte, 16)

for i := 0; i < 16; i++ {
left[i] = make([]byte, 4)
binary.BigEndian.PutUint32(left[i], uint32(i))

right[i] = make([]byte, 4)
binary.BigEndian.PutUint32(right[i], uint32(i))
}

merged := mergeJoinIds(left, right)
assert.Equal(16, len(merged))

// Check order
assert.Equal(uint32(15), binary.BigEndian.Uint32(merged[15]))

// Test simple non-equality different size

merged = mergeJoinIds(left[1:2], right[13:])
assert.Empty(merged)

// Different size, some equalities

merged = mergeJoinIds(left[0:3], right[1:7])
assert.Equal(2, len(merged))
assert.Equal(uint32(2), binary.BigEndian.Uint32(merged[1]))
}

0 comments on commit ecdecd1

Please sign in to comment.