From ecdecd1357a971ac3c0e7fc1445b425c4594fda7 Mon Sep 17 00:00:00 2001 From: Michael Burman Date: Mon, 19 Aug 2019 10:48:59 +0300 Subject: [PATCH] Fix badger merge-join algorithm to correctly filter indexes (#1721) * Fix merge-join algorithm to correctly filter indexes, closes #1719 Signed-off-by: Michael Burman * Address comments Signed-off-by: Michael Burman --- .../badger/spanstore/read_write_test.go | 6 ++ plugin/storage/badger/spanstore/reader.go | 85 +++++++++++-------- .../badger/spanstore/rw_internal_test.go | 34 ++++++++ 3 files changed, 88 insertions(+), 37 deletions(-) diff --git a/plugin/storage/badger/spanstore/read_write_test.go b/plugin/storage/badger/spanstore/read_write_test.go index 42abb204dd8..0ddec8e1747 100644 --- a/plugin/storage/badger/spanstore/read_write_test.go +++ b/plugin/storage/badger/spanstore/read_write_test.go @@ -161,6 +161,11 @@ func TestIndexSeeks(t *testing.T) { VStr: fmt.Sprintf("val%d", j), VType: model.StringType, }, + { + Key: "error", + VType: model.BoolType, + VBool: true, + }, }, } err := sw.WriteSpan(&s) @@ -200,6 +205,7 @@ func TestIndexSeeks(t *testing.T) { params.OperationName = "operation-1" tags := make(map[string]string) tags["k11"] = "val0" + tags["error"] = "true" params.Tags = tags params.DurationMin = time.Duration(1 * time.Millisecond) // params.DurationMax = time.Duration(1 * time.Hour) diff --git a/plugin/storage/badger/spanstore/reader.go b/plugin/storage/badger/spanstore/reader.go index 629e9c7860e..5d0e1563c45 100644 --- a/plugin/storage/badger/spanstore/reader.go +++ b/plugin/storage/badger/spanstore/reader.go @@ -105,13 +105,12 @@ func (r *TraceReader) getTraces(traceIDs []model.TraceID) ([]*model.Trace, error err := r.store.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions - opts.PrefetchSize = 10 // TraceIDs are not sorted, pointless to prefetch large amount of values it := txn.NewIterator(opts) defer it.Close() val := []byte{} for _, prefix := range prefixes { - spans := make([]*model.Span, 0, 4) // reduce reallocation requirements by defining some initial length + spans := make([]*model.Span, 0, 32) // reduce reallocation requirements by defining some initial length for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { // Add value to the span store (decode from JSON / defined encoding first) @@ -346,53 +345,65 @@ func (r *TraceReader) durationQueries(query *spanstore.TraceQueryParameters, ids return ids } +func mergeJoinIds(left, right [][]byte) [][]byte { + // len(left) or len(right) is the maximum, whichever is the smallest + allocateSize := len(left) + if len(right) < allocateSize { + allocateSize = len(right) + } + + merged := make([][]byte, 0, allocateSize) + + lMax := len(left) - 1 + rMax := len(right) - 1 + for r, l := 0, 0; r <= rMax && l <= lMax; { + switch bytes.Compare(left[l], right[r]) { + case 0: + // Left matches right - merge + merged = append(merged, left[l]) + // Advance both + l++ + r++ + case 1: + // left > right, increase right one + r++ + case -1: + // left < right, increase left one + l++ + } + } + return merged +} + // sortMergeIds does a sort-merge join operation to the list of TraceIDs to remove duplicates func sortMergeIds(query *spanstore.TraceQueryParameters, ids [][][]byte) []model.TraceID { // Key only scan is a lot faster in the badger - use sort-merge join algorithm instead of hash join since we have the keys in sorted order already - intersected := ids[0] - mergeIntersected := make([][]byte, 0, len(intersected)) // intersected is the maximum size + + var merged [][]byte if len(ids) > 1 { - for i := 1; i < len(ids); i++ { - mergeIntersected = make([][]byte, 0, len(intersected)) // intersected is the maximum size - k := len(intersected) - 1 - for j := len(ids[i]) - 1; j >= 0 && k >= 0; { - // The result will be 0 if a==b, -1 if a < b, and +1 if a > b. - switch bytes.Compare(intersected[k], ids[i][j]) { - case 1: - k-- // Move on to the next item in the intersected list - // a > b - case -1: - j-- - // a < b - // Move on to next iteration of j - case 0: - mergeIntersected = append(mergeIntersected, intersected[k]) - k-- // Move on to next item - // Match - } - } - intersected = mergeIntersected + merged = mergeJoinIds(ids[0], ids[1]) + for i := 2; i < len(ids); i++ { + merged = mergeJoinIds(merged, ids[i]) } - } else { - // mergeIntersected should be reversed intersected - for i, j := 0, len(intersected)-1; j >= 0; i, j = i+1, j-1 { - mergeIntersected = append(mergeIntersected, intersected[j]) - } - intersected = mergeIntersected + merged = ids[0] + } + + // Get top query.NumTraces results (order in DESC) + if query.NumTraces < len(merged) { + merged = merged[len(merged)-query.NumTraces:] } - // Get top query.NumTraces results (note, the slice is now in descending timestamp order) - if query.NumTraces < len(intersected) { - intersected = intersected[:query.NumTraces] + // Results are in ASC (badger's default order), but Jaeger uses DESC, thus we need to reverse the array + for left, right := 0, len(merged)-1; left < right; left, right = left+1, right-1 { + merged[left], merged[right] = merged[right], merged[left] } - // Enrich the traceIds to model.Trace - // result := make([]*model.Trace, 0, len(intersected)) - keys := make([]model.TraceID, 0, len(intersected)) + // Create the structs from [][]byte to TraceID + keys := make([]model.TraceID, 0, len(merged)) - for _, key := range intersected { + for _, key := range merged { keys = append(keys, model.TraceID{ High: binary.BigEndian.Uint64(key[:8]), Low: binary.BigEndian.Uint64(key[8:]), diff --git a/plugin/storage/badger/spanstore/rw_internal_test.go b/plugin/storage/badger/spanstore/rw_internal_test.go index 16f5694455d..cccbf118ffe 100644 --- a/plugin/storage/badger/spanstore/rw_internal_test.go +++ b/plugin/storage/badger/spanstore/rw_internal_test.go @@ -176,3 +176,37 @@ func createDummySpan() model.Span { return testSpan } + +func TestMergeJoin(t *testing.T) { + assert := assert.New(t) + + // Test equals + + left := make([][]byte, 16) + right := make([][]byte, 16) + + for i := 0; i < 16; i++ { + left[i] = make([]byte, 4) + binary.BigEndian.PutUint32(left[i], uint32(i)) + + right[i] = make([]byte, 4) + binary.BigEndian.PutUint32(right[i], uint32(i)) + } + + merged := mergeJoinIds(left, right) + assert.Equal(16, len(merged)) + + // Check order + assert.Equal(uint32(15), binary.BigEndian.Uint32(merged[15])) + + // Test simple non-equality different size + + merged = mergeJoinIds(left[1:2], right[13:]) + assert.Empty(merged) + + // Different size, some equalities + + merged = mergeJoinIds(left[0:3], right[1:7]) + assert.Equal(2, len(merged)) + assert.Equal(uint32(2), binary.BigEndian.Uint32(merged[1])) +}