Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Commit

Permalink
etl: distinct empty values from nil (#902)
Browse files Browse the repository at this point in the history
  • Loading branch information
AskAlexSharov authored Mar 7, 2023
1 parent f645d8d commit f4a0286
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 26 deletions.
64 changes: 51 additions & 13 deletions etl/buffers.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,19 @@ type sortableBuffer struct {
// Put adds key and value to the buffer. These slices will not be accessed later,
// so no copying is necessary
func (b *sortableBuffer) Put(k, v []byte) {
b.offsets = append(b.offsets, len(b.data))
b.lens = append(b.lens, len(k))
if len(k) > 0 {
b.data = append(b.data, k...)
lk, lv := len(k), len(v)
if k == nil {
lk = -1
}
b.offsets = append(b.offsets, len(b.data))
b.lens = append(b.lens, len(v))
if len(v) > 0 {
b.data = append(b.data, v...)
if v == nil {
lv = -1
}
b.lens = append(b.lens, lk, lv)

b.offsets = append(b.offsets, len(b.data))
b.data = append(b.data, k...)
b.offsets = append(b.offsets, len(b.data))
b.data = append(b.data, v...)
}

func (b *sortableBuffer) Size() int {
Expand Down Expand Up @@ -121,9 +124,25 @@ func (b *sortableBuffer) Get(i int, keyBuf, valBuf []byte) ([]byte, []byte) {
keyLen, valLen := b.lens[i2], b.lens[i2+1]
if keyLen > 0 {
keyBuf = append(keyBuf, b.data[keyOffset:keyOffset+keyLen]...)
} else if keyLen == 0 {
if keyBuf != nil {
keyBuf = keyBuf[:0]
} else {
keyBuf = []byte{}
}
} else {
keyBuf = nil
}
if valLen > 0 {
valBuf = append(valBuf, b.data[valOffset:valOffset+valLen]...)
} else if valLen == 0 {
if valBuf != nil {
valBuf = valBuf[:0]
} else {
valBuf = []byte{}
}
} else {
valBuf = nil
}
return keyBuf, valBuf
}
Expand All @@ -148,10 +167,13 @@ func (b *sortableBuffer) Write(w io.Writer) error {
var numBuf [binary.MaxVarintLen64]byte
for i, offset := range b.offsets {
l := b.lens[i]
n := binary.PutUvarint(numBuf[:], uint64(l))
n := binary.PutVarint(numBuf[:], int64(l))
if _, err := w.Write(numBuf[:n]); err != nil {
return err
}
if l <= 0 {
continue
}
if _, err := w.Write(b.data[offset : offset+l]); err != nil {
return err
}
Expand Down Expand Up @@ -221,14 +243,22 @@ func (b *appendSortableBuffer) Write(w io.Writer) error {
var numBuf [binary.MaxVarintLen64]byte
entries := b.sortedBuf
for _, entry := range entries {
n := binary.PutUvarint(numBuf[:], uint64(len(entry.key)))
lk := int64(len(entry.key))
if entry.key == nil {
lk = -1
}
n := binary.PutVarint(numBuf[:], lk)
if _, err := w.Write(numBuf[:n]); err != nil {
return err
}
if _, err := w.Write(entry.key); err != nil {
return err
}
n = binary.PutUvarint(numBuf[:], uint64(len(entry.value)))
lv := int64(len(entry.key))
if entry.value == nil {
lv = -1
}
n = binary.PutVarint(numBuf[:], lv)
if _, err := w.Write(numBuf[:n]); err != nil {
return err
}
Expand Down Expand Up @@ -307,14 +337,22 @@ func (b *oldestEntrySortableBuffer) Write(w io.Writer) error {
var numBuf [binary.MaxVarintLen64]byte
entries := b.sortedBuf
for _, entry := range entries {
n := binary.PutUvarint(numBuf[:], uint64(len(entry.key)))
lk := int64(len(entry.key))
if entry.key == nil {
lk = -1
}
n := binary.PutVarint(numBuf[:], lk)
if _, err := w.Write(numBuf[:n]); err != nil {
return err
}
if _, err := w.Write(entry.key); err != nil {
return err
}
n = binary.PutUvarint(numBuf[:], uint64(len(entry.value)))
lv := int64(len(entry.value))
if entry.value == nil {
lv = -1
}
n = binary.PutVarint(numBuf[:], lv)
if _, err := w.Write(numBuf[:n]); err != nil {
return err
}
Expand Down
11 changes: 7 additions & 4 deletions etl/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,13 @@ func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args Tr
log.Info(fmt.Sprintf("[%s] ETL [2/2] Loading", c.logPrefix), logArs...)
}

if canUseAppend && len(v) == 0 {
return nil // nothing to delete after end of bucket
}
if len(v) == 0 {
isNil := (c.bufType == SortableSliceBuffer && v == nil) ||
(c.bufType == SortableAppendBuffer && len(v) == 0) || //backward compatibility
(c.bufType == SortableOldestAppearedBuffer && len(v) == 0)
if isNil {
if canUseAppend {
return nil // nothing to delete after end of bucket
}
if err := cursor.Delete(k); err != nil {
return err
}
Expand Down
16 changes: 10 additions & 6 deletions etl/dataprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ func (p *fileDataProvider) String() string {
}

func readElementFromDisk(r io.Reader, br io.ByteReader, keyBuf, valBuf []byte) ([]byte, []byte, error) {
n, err := binary.ReadUvarint(br)
n, err := binary.ReadVarint(br)
if err != nil {
return nil, nil, err
}
if n > 0 {
if n >= 0 {
// Reallocate the slice or extend it if there is enough capacity
if len(keyBuf)+int(n) > cap(keyBuf) {
if keyBuf == nil || len(keyBuf)+int(n) > cap(keyBuf) {
newKeyBuf := make([]byte, len(keyBuf)+int(n))
copy(newKeyBuf, keyBuf)
keyBuf = newKeyBuf
Expand All @@ -119,13 +119,15 @@ func readElementFromDisk(r io.Reader, br io.ByteReader, keyBuf, valBuf []byte) (
if _, err = io.ReadFull(r, keyBuf[len(keyBuf)-int(n):]); err != nil {
return nil, nil, err
}
} else {
keyBuf = nil
}
if n, err = binary.ReadUvarint(br); err != nil {
if n, err = binary.ReadVarint(br); err != nil {
return nil, nil, err
}
if n > 0 {
if n >= 0 {
// Reallocate the slice or extend it if there is enough capacity
if len(valBuf)+int(n) > cap(valBuf) {
if valBuf == nil || len(valBuf)+int(n) > cap(valBuf) {
newValBuf := make([]byte, len(valBuf)+int(n))
copy(newValBuf, valBuf)
valBuf = newValBuf
Expand All @@ -135,6 +137,8 @@ func readElementFromDisk(r io.Reader, br io.ByteReader, keyBuf, valBuf []byte) (
if _, err = io.ReadFull(r, valBuf[len(valBuf)-int(n):]); err != nil {
return nil, nil, err
}
} else {
valBuf = nil
}
return keyBuf, valBuf, err
}
Expand Down
70 changes: 70 additions & 0 deletions etl/etl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,76 @@ func decodeHex(in string) []byte {
return payload
}

func TestEmptyValueIsNotANil(t *testing.T) {
t.Run("sortable", func(t *testing.T) {
collector := NewCollector(t.Name(), "", NewSortableBuffer(1))
defer collector.Close()
require := require.New(t)
require.NoError(collector.Collect([]byte{1}, []byte{}))
require.NoError(collector.Collect([]byte{2}, nil))
require.NoError(collector.Load(nil, "", func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error {
if k[0] == 1 {
require.Equal([]byte{}, v)
} else {
require.Nil(v)
}
return nil
}, TransformArgs{}))
})
t.Run("append", func(t *testing.T) {
// append buffer doesn't support nil values
collector := NewCollector(t.Name(), "", NewAppendBuffer(1))
defer collector.Close()
require := require.New(t)
require.NoError(collector.Collect([]byte{1}, []byte{}))
require.NoError(collector.Collect([]byte{2}, nil))
require.NoError(collector.Load(nil, "", func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error {
require.Nil(v)
return nil
}, TransformArgs{}))
})
t.Run("oldest", func(t *testing.T) {
collector := NewCollector(t.Name(), "", NewOldestEntryBuffer(1))
defer collector.Close()
require := require.New(t)
require.NoError(collector.Collect([]byte{1}, []byte{}))
require.NoError(collector.Collect([]byte{2}, nil))
require.NoError(collector.Load(nil, "", func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error {
if k[0] == 1 {
require.Equal([]byte{}, v)
} else {
require.Nil(v)
}
return nil
}, TransformArgs{}))
})
}

func TestEmptyKeyValue(t *testing.T) {
_, tx := memdb.NewTestTx(t)
require := require.New(t)
table := kv.ChaindataTables[0]
collector := NewCollector(t.Name(), "", NewSortableBuffer(1))
defer collector.Close()
require.NoError(collector.Collect([]byte{2}, []byte{}))
require.NoError(collector.Collect([]byte{1}, []byte{1}))
require.NoError(collector.Load(tx, table, IdentityLoadFunc, TransformArgs{}))
v, err := tx.GetOne(table, []byte{2})
require.NoError(err)
require.Equal([]byte{}, v)
v, err = tx.GetOne(table, []byte{1})
require.NoError(err)
require.Equal([]byte{1}, v)

collector = NewCollector(t.Name(), "", NewSortableBuffer(1))
defer collector.Close()
require.NoError(collector.Collect([]byte{}, nil))
require.NoError(collector.Load(tx, table, IdentityLoadFunc, TransformArgs{}))
v, err = tx.GetOne(table, []byte{})
require.NoError(err)
require.Nil(v)
}

func TestWriteAndReadBufferEntry(t *testing.T) {
b := NewSortableBuffer(128)
buffer := bytes.NewBuffer(make([]byte, 0))
Expand Down
16 changes: 13 additions & 3 deletions kv/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"os"
"sync"
"time"

"github.com/ledgerwatch/erigon-lib/common"
Expand Down Expand Up @@ -133,14 +134,22 @@ func GetBool(tx Getter, bucket string, k []byte) (enabled bool, err error) {
return bytes2bool(vBytes), nil
}

func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string, from []byte, amount uint32) {
func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string, from []byte, amount uint32) (clean func()) {
if db == nil {
return
return func() {}
}
if ok := progress.CompareAndSwap(false, true); !ok {
return
return func() {}
}
ctx, cancel := context.WithCancel(ctx)
wg := sync.WaitGroup{}
clean = func() {
cancel()
wg.Wait()
}
wg.Add(1)
go func() {
defer wg.Done()
defer progress.Store(false)
_ = db.View(ctx, func(tx Tx) error {
c, err := tx.Cursor(table)
Expand All @@ -163,6 +172,7 @@ func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string
return nil
})
}()
return clean
}

// FirstKey - candidate on move to kv.Tx interface
Expand Down
6 changes: 6 additions & 0 deletions state/aggregator_v3.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ func (a *AggregatorV3) BuildOptionalMissedIndicesInBackground(ctx context.Contex
defer a.wg.Done()
defer a.workingOptionalIndices.Store(false)
if err := a.BuildOptionalMissedIndices(ctx, workers); err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Warn("merge", "err", err)
}
}()
Expand Down Expand Up @@ -1183,6 +1186,9 @@ func (a *AggregatorV3) BuildFilesInBackground() {
defer a.wg.Done()
defer a.workingMerge.Store(false)
if err := a.MergeLoop(a.ctx, 1); err != nil {
if errors.Is(err, context.Canceled) {
return
}
log.Warn("merge", "err", err)
}

Expand Down

0 comments on commit f4a0286

Please sign in to comment.