Skip to content

Commit

Permalink
mdbx: Return err early in iter.Next() (#10078)
Browse files Browse the repository at this point in the history
`HasNext` will return true even with existing error and the application
will expect a next entry. The `Next` function can get into an internal
error (such as a `panic()`) while fetching the next cursor item and thus
fail to return the error.

---------

Co-authored-by: alex.sharov <[email protected]>
  • Loading branch information
somnathb1 and AskAlexSharov authored Apr 27, 2024
1 parent b14b766 commit d7d0960
Showing 1 changed file with 47 additions and 39 deletions.
86 changes: 47 additions & 39 deletions erigon-lib/kv/mdbx/kv_mdbx.go
Original file line number Diff line number Diff line change
Expand Up @@ -1968,7 +1968,6 @@ type cursor2iter struct {
tx *MdbxTx

fromPrefix, toPrefix, nextK, nextV []byte
err error
orderAscend order.By
limit int64
ctx context.Context
Expand Down Expand Up @@ -1998,44 +1997,51 @@ func (s *cursor2iter) init(table string, tx kv.Tx) (*cursor2iter, error) {

if s.fromPrefix == nil { // no initial position
if s.orderAscend {
s.nextK, s.nextV, s.err = s.c.First()
s.nextK, s.nextV, err = s.c.First()
} else {
s.nextK, s.nextV, s.err = s.c.Last()
s.nextK, s.nextV, err = s.c.Last()
}
return s, s.err
return s, err
}

if s.orderAscend {
s.nextK, s.nextV, s.err = s.c.Seek(s.fromPrefix)
return s, s.err
s.nextK, s.nextV, err = s.c.Seek(s.fromPrefix)
return s, err
} else {
// seek exactly to given key or previous one
s.nextK, s.nextV, s.err = s.c.SeekExact(s.fromPrefix)
if s.err != nil {
return s, s.err
s.nextK, s.nextV, err = s.c.SeekExact(s.fromPrefix)
if err != nil {
return s, err
}
if s.nextK != nil { // go to last value of this key
if casted, ok := s.c.(kv.CursorDupSort); ok {
s.nextV, s.err = casted.LastDup()
s.nextV, err = casted.LastDup()
}
} else { // key not found, go to prev one
s.nextK, s.nextV, s.err = s.c.Prev()
s.nextK, s.nextV, err = s.c.Prev()
}
return s, s.err
return s, err
}
}

func (s *cursor2iter) advance() (err error) {
if s.orderAscend {
s.nextK, s.nextV, err = s.c.Next()
} else {
s.nextK, s.nextV, err = s.c.Prev()
}
return err
}

func (s *cursor2iter) Close() {
if s.c != nil {
s.c.Close()
delete(s.tx.streams, s.id)
s.c = nil
}
}

func (s *cursor2iter) HasNext() bool {
if s.err != nil { // always true, then .Next() call will return this error
return true
}
if s.limit == 0 { // limit reached
return false
}
Expand All @@ -2051,20 +2057,19 @@ func (s *cursor2iter) HasNext() bool {
cmp := bytes.Compare(s.nextK, s.toPrefix)
return (bool(s.orderAscend) && cmp < 0) || (!bool(s.orderAscend) && cmp > 0)
}

func (s *cursor2iter) Next() (k, v []byte, err error) {
select {
case <-s.ctx.Done():
return nil, nil, s.ctx.Err()
default:
}
s.limit--
k, v, err = s.nextK, s.nextV, s.err
if s.orderAscend {
s.nextK, s.nextV, s.err = s.c.Next()
} else {
s.nextK, s.nextV, s.err = s.c.Prev()
k, v = s.nextK, s.nextV
if err = s.advance(); err != nil {
return nil, nil, err
}
return k, v, err
return k, v, nil
}

func (tx *MdbxTx) RangeDupSort(table string, key []byte, fromPrefix, toPrefix []byte, asc order.By, limit int) (iter.KV, error) {
Expand All @@ -2084,7 +2089,6 @@ type cursorDup2iter struct {

key []byte
fromPrefix, toPrefix, nextV []byte
err error
orderAscend bool
limit int64
ctx context.Context
Expand Down Expand Up @@ -2112,26 +2116,35 @@ func (s *cursorDup2iter) init(table string, tx kv.Tx) (*cursorDup2iter, error) {

if s.fromPrefix == nil { // no initial position
if s.orderAscend {
s.nextV, s.err = s.c.FirstDup()
s.nextV, err = s.c.FirstDup()
} else {
s.nextV, s.err = s.c.LastDup()
s.nextV, err = s.c.LastDup()
}
return s, s.err
return s, err
}

if s.orderAscend {
s.nextV, s.err = s.c.SeekBothRange(s.key, s.fromPrefix)
return s, s.err
s.nextV, err = s.c.SeekBothRange(s.key, s.fromPrefix)
return s, err
} else {
// seek exactly to given key or previous one
_, s.nextV, s.err = s.c.SeekBothExact(s.key, s.fromPrefix)
_, s.nextV, err = s.c.SeekBothExact(s.key, s.fromPrefix)
if s.nextV == nil { // no such key
_, s.nextV, s.err = s.c.PrevDup()
_, s.nextV, err = s.c.PrevDup()
}
return s, s.err
return s, err
}
}

func (s *cursorDup2iter) advance() (err error) {
if s.orderAscend {
_, s.nextV, err = s.c.NextDup()
} else {
_, s.nextV, err = s.c.PrevDup()
}
return err
}

func (s *cursorDup2iter) Close() {
if s.c != nil {
s.c.Close()
Expand All @@ -2140,9 +2153,6 @@ func (s *cursorDup2iter) Close() {
}
}
func (s *cursorDup2iter) HasNext() bool {
if s.err != nil { // always true, then .Next() call will return this error
return true
}
if s.limit == 0 { // limit reached
return false
}
Expand All @@ -2165,13 +2175,11 @@ func (s *cursorDup2iter) Next() (k, v []byte, err error) {
default:
}
s.limit--
v, err = s.nextV, s.err
if s.orderAscend {
_, s.nextV, s.err = s.c.NextDup()
} else {
_, s.nextV, s.err = s.c.PrevDup()
v = s.nextV
if err = s.advance(); err != nil {
return nil, nil, err
}
return s.key, v, err
return s.key, v, nil
}

func (tx *MdbxTx) ForAmount(bucket string, fromPrefix []byte, amount uint32, walker func(k, v []byte) error) error {
Expand Down

0 comments on commit d7d0960

Please sign in to comment.