Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iterator: optimize merged iterator (again) #421

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 101 additions & 61 deletions leveldb/iterator/merged_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
package iterator

import (
"container/heap"

"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/util"
Expand Down Expand Up @@ -36,8 +34,7 @@ type mergedIterator struct {
errf func(err error)
releaser util.Releaser

indexes []int // the heap of iterator indexes
reverse bool //nolint: structcheck // if true, indexes is a max-heap
heap indexHeap
}

func assertKey(key []byte) []byte {
Expand Down Expand Up @@ -72,20 +69,19 @@ func (i *mergedIterator) First() bool {
return false
}

h := i.indexHeap()
h.Reset(false)
i.heap.Reset()
for x, iter := range i.iters {
switch {
case iter.First():
i.keys[x] = assertKey(iter.Key())
h.Push(x)
i.heap.Add(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
heap.Init(h)
i.heap.Init(false)
i.dir = dirSOI
return i.next()
}
Expand All @@ -98,20 +94,19 @@ func (i *mergedIterator) Last() bool {
return false
}

h := i.indexHeap()
h.Reset(true)
i.heap.Reset()
for x, iter := range i.iters {
switch {
case iter.Last():
i.keys[x] = assertKey(iter.Key())
h.Push(x)
i.heap.Add(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
heap.Init(h)
i.heap.Init(true)
i.dir = dirEOI
return i.prev()
}
Expand All @@ -124,31 +119,29 @@ func (i *mergedIterator) Seek(key []byte) bool {
return false
}

h := i.indexHeap()
h.Reset(false)
i.heap.Reset()
for x, iter := range i.iters {
switch {
case iter.Seek(key):
i.keys[x] = assertKey(iter.Key())
h.Push(x)
i.heap.Add(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
heap.Init(h)
i.heap.Init(false)
i.dir = dirSOI
return i.next()
}

func (i *mergedIterator) next() bool {
h := i.indexHeap()
if h.Len() == 0 {
if i.heap.Empty() {
i.dir = dirEOI
return false
}
i.index = heap.Pop(h).(int)
i.index = i.heap.Top()
i.dir = dirForward
return true
}
Expand Down Expand Up @@ -177,22 +170,22 @@ func (i *mergedIterator) Next() bool {
switch {
case iter.Next():
i.keys[x] = assertKey(iter.Key())
heap.Push(i.indexHeap(), x)
i.heap.FixTopWith(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
i.heap.Pop()
}
return i.next()
}

func (i *mergedIterator) prev() bool {
h := i.indexHeap()
if h.Len() == 0 {
if i.heap.Empty() {
i.dir = dirSOI
return false
}
i.index = heap.Pop(h).(int)
i.index = i.heap.Top()
i.dir = dirBackward
return true
}
Expand All @@ -209,37 +202,37 @@ func (i *mergedIterator) Prev() bool {
case dirEOI:
return i.Last()
case dirForward:
key := append([]byte(nil), i.keys[i.index]...)
h := i.indexHeap()
h.Reset(true)
i.heap.Reset()
for x, iter := range i.iters {
if x == i.index {
i.heap.Add(x)
continue
}
seek := iter.Seek(key)
seek := iter.Seek(i.keys[i.index])
switch {
case seek && iter.Prev(), !seek && iter.Last():
i.keys[x] = assertKey(iter.Key())
h.Push(x)
i.heap.Add(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
}
}
heap.Init(h)
i.heap.Init(true)
}

x := i.index
iter := i.iters[x]
switch {
case iter.Prev():
i.keys[x] = assertKey(iter.Key())
heap.Push(i.indexHeap(), x)
i.heap.FixTopWith(x)
case i.iterErr(iter):
return false
default:
i.keys[x] = nil
i.heap.Pop()
}
return i.prev()
}
Expand All @@ -266,7 +259,8 @@ func (i *mergedIterator) Release() {
}
i.iters = nil
i.keys = nil
i.indexes = nil
i.heap.indexes = nil
i.heap.keys = nil
if i.releaser != nil {
i.releaser.Release()
i.releaser = nil
Expand All @@ -292,10 +286,6 @@ func (i *mergedIterator) SetErrorCallback(f func(err error)) {
i.errf = f
}

func (i *mergedIterator) indexHeap() *indexHeap {
return (*indexHeap)(i)
}

// NewMergedIterator returns an iterator that merges its input. Walking the
// resultant iterator will return all key/value pairs of all input iterators
// in strictly increasing key order, as defined by cmp.
Expand All @@ -307,44 +297,94 @@ func (i *mergedIterator) indexHeap() *indexHeap {
// won't be ignored and will halt 'merged iterator', otherwise the iterator will
// continue to the next 'input iterator'.
func NewMergedIterator(iters []Iterator, cmp comparer.Comparer, strict bool) Iterator {
keys := make([][]byte, len(iters))
return &mergedIterator{
iters: iters,
cmp: cmp,
strict: strict,
keys: make([][]byte, len(iters)),
indexes: make([]int, 0, len(iters)),
iters: iters,
cmp: cmp,
strict: strict,
keys: keys,
heap: indexHeap{
indexes: make([]int, 0, len(iters)),
keys: keys,
cmp: cmp,
},
}
}

// indexHeap implements heap.Interface.
type indexHeap mergedIterator
// indexHeap provides heap operations for indexes.
// It specializes 'heap' with int element type.
type indexHeap struct {
indexes []int
keys [][]byte
cmp comparer.Comparer
reverse bool
}

func (h *indexHeap) Len() int { return len(h.indexes) }
func (h *indexHeap) Less(i, j int) bool {
i, j = h.indexes[i], h.indexes[j]
r := h.cmp.Compare(h.keys[i], h.keys[j])
if h.reverse {
return r > 0
func (h *indexHeap) Init(reverse bool) {
h.reverse = reverse
// heapify
n := len(h.indexes)
for i := n/2 - 1; i >= 0; i-- {
h.down(i, n)
}
return r < 0
}

func (h *indexHeap) Swap(i, j int) {
h.indexes[i], h.indexes[j] = h.indexes[j], h.indexes[i]
func (h *indexHeap) Reset() {
h.indexes = h.indexes[:0]
}

func (h *indexHeap) Push(value interface{}) {
h.indexes = append(h.indexes, value.(int))
func (h *indexHeap) Add(x int) {
h.indexes = append(h.indexes, x)
}

func (h *indexHeap) Pop() interface{} {
e := len(h.indexes) - 1
popped := h.indexes[e]
h.indexes = h.indexes[:e]
return popped
func (h *indexHeap) Empty() bool { return len(h.indexes) == 0 }
func (h *indexHeap) Top() int { return h.indexes[0] }

func (h *indexHeap) Pop() int {
top := h.indexes[0]
n := len(h.indexes) - 1
h.swap(0, n)
h.down(0, n)
h.indexes = h.indexes[:n]
return top
}

func (h *indexHeap) Reset(reverse bool) {
h.reverse = reverse
h.indexes = h.indexes[:0]
func (h *indexHeap) FixTopWith(x int) {
// replace top
h.indexes[0] = x
// and then fix
h.down(0, len(h.indexes))
}

func (h *indexHeap) down(i0, n int) bool {
i := i0
for {
j1 := 2*i + 1
if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
break
}
j := j1 // left child
if j2 := j1 + 1; j2 < n && h.less(j2, j1) {
j = j2 // = 2*i + 2 // right child
}
if !h.less(j, i) {
break
}
h.swap(i, j)
i = j
}
return i > i0
}

func (h *indexHeap) less(i, j int) bool {
i, j = h.indexes[i], h.indexes[j]
r := h.cmp.Compare(h.keys[i], h.keys[j])
if h.reverse {
return r > 0
}
return r < 0
}

func (h *indexHeap) swap(i, j int) {
h.indexes[i], h.indexes[j] = h.indexes[j], h.indexes[i]
}
13 changes: 11 additions & 2 deletions leveldb/iterator/merged_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ var _ = testutil.Defer(func() {
})
})

func BenchmarkMergedIterator(b *testing.B) {
n := 11
func benchmarkMergedIteratorN(b *testing.B, n int) {
iters := make([]Iterator, n)
for i := range iters {
kv := testutil.KeyValue_Generate(nil, 100, 1, 1, 10, 4, 4)
Expand All @@ -79,3 +78,13 @@ func BenchmarkMergedIterator(b *testing.B) {
}
}
}

func BenchmarkMergedIterator(b *testing.B) {
b.Run("2 iters", func(b *testing.B) {
benchmarkMergedIteratorN(b, 2)
})

b.Run("50 iters", func(b *testing.B) {
benchmarkMergedIteratorN(b, 50)
})
}