From 3b129288a202558d9703901cfdf50556bb49f910 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Tue, 7 Mar 2023 10:22:45 +0700 Subject: [PATCH] Revert "Revert "etl: distinct empty values from nil" (#918)" This reverts commit f0a051c44c9ddede535ab5073b957ae8b525c845. --- etl/buffers.go | 64 ++++++++++++++++++++++++++++++-------- etl/collector.go | 11 ++++--- etl/dataprovider.go | 16 ++++++---- etl/etl_test.go | 70 ++++++++++++++++++++++++++++++++++++++++++ kv/helpers.go | 16 ++++++++-- state/aggregator_v3.go | 6 ++++ 6 files changed, 157 insertions(+), 26 deletions(-) diff --git a/etl/buffers.go b/etl/buffers.go index 09597c59c..b73ecb5f4 100644 --- a/etl/buffers.go +++ b/etl/buffers.go @@ -80,16 +80,19 @@ type sortableBuffer struct { // Put adds key and value to the buffer. These slices will not be accessed later, // so no copying is necessary func (b *sortableBuffer) Put(k, v []byte) { - b.offsets = append(b.offsets, len(b.data)) - b.lens = append(b.lens, len(k)) - if len(k) > 0 { - b.data = append(b.data, k...) + lk, lv := len(k), len(v) + if k == nil { + lk = -1 } - b.offsets = append(b.offsets, len(b.data)) - b.lens = append(b.lens, len(v)) - if len(v) > 0 { - b.data = append(b.data, v...) + if v == nil { + lv = -1 } + b.lens = append(b.lens, lk, lv) + + b.offsets = append(b.offsets, len(b.data)) + b.data = append(b.data, k...) + b.offsets = append(b.offsets, len(b.data)) + b.data = append(b.data, v...) } func (b *sortableBuffer) Size() int { @@ -121,9 +124,25 @@ func (b *sortableBuffer) Get(i int, keyBuf, valBuf []byte) ([]byte, []byte) { keyLen, valLen := b.lens[i2], b.lens[i2+1] if keyLen > 0 { keyBuf = append(keyBuf, b.data[keyOffset:keyOffset+keyLen]...) + } else if keyLen == 0 { + if keyBuf != nil { + keyBuf = keyBuf[:0] + } else { + keyBuf = []byte{} + } + } else { + keyBuf = nil } if valLen > 0 { valBuf = append(valBuf, b.data[valOffset:valOffset+valLen]...) + } else if valLen == 0 { + if valBuf != nil { + valBuf = valBuf[:0] + } else { + valBuf = []byte{} + } + } else { + valBuf = nil } return keyBuf, valBuf } @@ -148,10 +167,13 @@ func (b *sortableBuffer) Write(w io.Writer) error { var numBuf [binary.MaxVarintLen64]byte for i, offset := range b.offsets { l := b.lens[i] - n := binary.PutUvarint(numBuf[:], uint64(l)) + n := binary.PutVarint(numBuf[:], int64(l)) if _, err := w.Write(numBuf[:n]); err != nil { return err } + if l <= 0 { + continue + } if _, err := w.Write(b.data[offset : offset+l]); err != nil { return err } @@ -221,14 +243,22 @@ func (b *appendSortableBuffer) Write(w io.Writer) error { var numBuf [binary.MaxVarintLen64]byte entries := b.sortedBuf for _, entry := range entries { - n := binary.PutUvarint(numBuf[:], uint64(len(entry.key))) + lk := int64(len(entry.key)) + if entry.key == nil { + lk = -1 + } + n := binary.PutVarint(numBuf[:], lk) if _, err := w.Write(numBuf[:n]); err != nil { return err } if _, err := w.Write(entry.key); err != nil { return err } - n = binary.PutUvarint(numBuf[:], uint64(len(entry.value))) + lv := int64(len(entry.key)) + if entry.value == nil { + lv = -1 + } + n = binary.PutVarint(numBuf[:], lv) if _, err := w.Write(numBuf[:n]); err != nil { return err } @@ -307,14 +337,22 @@ func (b *oldestEntrySortableBuffer) Write(w io.Writer) error { var numBuf [binary.MaxVarintLen64]byte entries := b.sortedBuf for _, entry := range entries { - n := binary.PutUvarint(numBuf[:], uint64(len(entry.key))) + lk := int64(len(entry.key)) + if entry.key == nil { + lk = -1 + } + n := binary.PutVarint(numBuf[:], lk) if _, err := w.Write(numBuf[:n]); err != nil { return err } if _, err := w.Write(entry.key); err != nil { return err } - n = binary.PutUvarint(numBuf[:], uint64(len(entry.value))) + lv := int64(len(entry.value)) + if entry.value == nil { + lv = -1 + } + n = binary.PutVarint(numBuf[:], lv) if _, err := w.Write(numBuf[:n]); err != nil { return err } diff --git a/etl/collector.go b/etl/collector.go index 60d56e26c..9a7c8d8ce 100644 --- a/etl/collector.go +++ b/etl/collector.go @@ -195,10 +195,13 @@ func (c *Collector) Load(db kv.RwTx, toBucket string, loadFunc LoadFunc, args Tr log.Info(fmt.Sprintf("[%s] ETL [2/2] Loading", c.logPrefix), logArs...) } - if canUseAppend && len(v) == 0 { - return nil // nothing to delete after end of bucket - } - if len(v) == 0 { + isNil := (c.bufType == SortableSliceBuffer && v == nil) || + (c.bufType == SortableAppendBuffer && len(v) == 0) || //backward compatibility + (c.bufType == SortableOldestAppearedBuffer && len(v) == 0) + if isNil { + if canUseAppend { + return nil // nothing to delete after end of bucket + } if err := cursor.Delete(k); err != nil { return err } diff --git a/etl/dataprovider.go b/etl/dataprovider.go index 28cd41985..baab747b9 100644 --- a/etl/dataprovider.go +++ b/etl/dataprovider.go @@ -103,13 +103,13 @@ func (p *fileDataProvider) String() string { } func readElementFromDisk(r io.Reader, br io.ByteReader, keyBuf, valBuf []byte) ([]byte, []byte, error) { - n, err := binary.ReadUvarint(br) + n, err := binary.ReadVarint(br) if err != nil { return nil, nil, err } - if n > 0 { + if n >= 0 { // Reallocate the slice or extend it if there is enough capacity - if len(keyBuf)+int(n) > cap(keyBuf) { + if keyBuf == nil || len(keyBuf)+int(n) > cap(keyBuf) { newKeyBuf := make([]byte, len(keyBuf)+int(n)) copy(newKeyBuf, keyBuf) keyBuf = newKeyBuf @@ -119,13 +119,15 @@ func readElementFromDisk(r io.Reader, br io.ByteReader, keyBuf, valBuf []byte) ( if _, err = io.ReadFull(r, keyBuf[len(keyBuf)-int(n):]); err != nil { return nil, nil, err } + } else { + keyBuf = nil } - if n, err = binary.ReadUvarint(br); err != nil { + if n, err = binary.ReadVarint(br); err != nil { return nil, nil, err } - if n > 0 { + if n >= 0 { // Reallocate the slice or extend it if there is enough capacity - if len(valBuf)+int(n) > cap(valBuf) { + if valBuf == nil || len(valBuf)+int(n) > cap(valBuf) { newValBuf := make([]byte, len(valBuf)+int(n)) copy(newValBuf, valBuf) valBuf = newValBuf @@ -135,6 +137,8 @@ func readElementFromDisk(r io.Reader, br io.ByteReader, keyBuf, valBuf []byte) ( if _, err = io.ReadFull(r, valBuf[len(valBuf)-int(n):]); err != nil { return nil, nil, err } + } else { + valBuf = nil } return keyBuf, valBuf, err } diff --git a/etl/etl_test.go b/etl/etl_test.go index d549b0e22..c568680cf 100644 --- a/etl/etl_test.go +++ b/etl/etl_test.go @@ -39,6 +39,76 @@ func decodeHex(in string) []byte { return payload } +func TestEmptyValueIsNotANil(t *testing.T) { + t.Run("sortable", func(t *testing.T) { + collector := NewCollector(t.Name(), "", NewSortableBuffer(1)) + defer collector.Close() + require := require.New(t) + require.NoError(collector.Collect([]byte{1}, []byte{})) + require.NoError(collector.Collect([]byte{2}, nil)) + require.NoError(collector.Load(nil, "", func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error { + if k[0] == 1 { + require.Equal([]byte{}, v) + } else { + require.Nil(v) + } + return nil + }, TransformArgs{})) + }) + t.Run("append", func(t *testing.T) { + // append buffer doesn't support nil values + collector := NewCollector(t.Name(), "", NewAppendBuffer(1)) + defer collector.Close() + require := require.New(t) + require.NoError(collector.Collect([]byte{1}, []byte{})) + require.NoError(collector.Collect([]byte{2}, nil)) + require.NoError(collector.Load(nil, "", func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error { + require.Nil(v) + return nil + }, TransformArgs{})) + }) + t.Run("oldest", func(t *testing.T) { + collector := NewCollector(t.Name(), "", NewOldestEntryBuffer(1)) + defer collector.Close() + require := require.New(t) + require.NoError(collector.Collect([]byte{1}, []byte{})) + require.NoError(collector.Collect([]byte{2}, nil)) + require.NoError(collector.Load(nil, "", func(k, v []byte, table CurrentTableReader, next LoadNextFunc) error { + if k[0] == 1 { + require.Equal([]byte{}, v) + } else { + require.Nil(v) + } + return nil + }, TransformArgs{})) + }) +} + +func TestEmptyKeyValue(t *testing.T) { + _, tx := memdb.NewTestTx(t) + require := require.New(t) + table := kv.ChaindataTables[0] + collector := NewCollector(t.Name(), "", NewSortableBuffer(1)) + defer collector.Close() + require.NoError(collector.Collect([]byte{2}, []byte{})) + require.NoError(collector.Collect([]byte{1}, []byte{1})) + require.NoError(collector.Load(tx, table, IdentityLoadFunc, TransformArgs{})) + v, err := tx.GetOne(table, []byte{2}) + require.NoError(err) + require.Equal([]byte{}, v) + v, err = tx.GetOne(table, []byte{1}) + require.NoError(err) + require.Equal([]byte{1}, v) + + collector = NewCollector(t.Name(), "", NewSortableBuffer(1)) + defer collector.Close() + require.NoError(collector.Collect([]byte{}, nil)) + require.NoError(collector.Load(tx, table, IdentityLoadFunc, TransformArgs{})) + v, err = tx.GetOne(table, []byte{}) + require.NoError(err) + require.Nil(v) +} + func TestWriteAndReadBufferEntry(t *testing.T) { b := NewSortableBuffer(128) buffer := bytes.NewBuffer(make([]byte, 0)) diff --git a/kv/helpers.go b/kv/helpers.go index e584ff24a..871c71152 100644 --- a/kv/helpers.go +++ b/kv/helpers.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os" + "sync" "time" "github.com/ledgerwatch/erigon-lib/common" @@ -133,14 +134,22 @@ func GetBool(tx Getter, bucket string, k []byte) (enabled bool, err error) { return bytes2bool(vBytes), nil } -func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string, from []byte, amount uint32) { +func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string, from []byte, amount uint32) (clean func()) { if db == nil { - return + return func() {} } if ok := progress.CompareAndSwap(false, true); !ok { - return + return func() {} } + ctx, cancel := context.WithCancel(ctx) + wg := sync.WaitGroup{} + clean = func() { + cancel() + wg.Wait() + } + wg.Add(1) go func() { + defer wg.Done() defer progress.Store(false) _ = db.View(ctx, func(tx Tx) error { c, err := tx.Cursor(table) @@ -163,6 +172,7 @@ func ReadAhead(ctx context.Context, db RoDB, progress *atomic.Bool, table string return nil }) }() + return clean } // FirstKey - candidate on move to kv.Tx interface diff --git a/state/aggregator_v3.go b/state/aggregator_v3.go index bde6544e0..a7ac06940 100644 --- a/state/aggregator_v3.go +++ b/state/aggregator_v3.go @@ -226,6 +226,9 @@ func (a *AggregatorV3) BuildOptionalMissedIndicesInBackground(ctx context.Contex defer a.wg.Done() defer a.workingOptionalIndices.Store(false) if err := a.BuildOptionalMissedIndices(ctx, workers); err != nil { + if errors.Is(err, context.Canceled) { + return + } log.Warn("merge", "err", err) } }() @@ -1183,6 +1186,9 @@ func (a *AggregatorV3) BuildFilesInBackground() { defer a.wg.Done() defer a.workingMerge.Store(false) if err := a.MergeLoop(a.ctx, 1); err != nil { + if errors.Is(err, context.Canceled) { + return + } log.Warn("merge", "err", err) }