Skip to content

Commit

Permalink
apacheGH-38399: [Go][Parquet] DeltaBitPack decoder reset usedFirst af…
Browse files Browse the repository at this point in the history
…ter SetData (apache#38413)

### Rationale for this change

As apache#38399 says. DeltaBitPack will corrupt when we meet a column chunk
with more than one page. During first page decoding, it works well. But when the second page comes, the
`d.usedFirst` haven't been reset, which cause the bug.

### What changes are included in this PR?

1. Some style enhancement
2. Bug fix

### Are these changes tested?

Currently not

### Are there any user-facing changes?

bugfix

* Closes: apache#38399

Authored-by: mwish <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
mapleFU authored and loicalleyne committed Nov 13, 2023
1 parent 189f24a commit 33603e6
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 18 deletions.
44 changes: 26 additions & 18 deletions go/parquet/internal/encoding/delta_bit_packing.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package encoding

import (
"bytes"
"errors"
"math"
"math/bits"
"reflect"
Expand All @@ -27,7 +28,6 @@ import (
shared_utils "github.com/apache/arrow/go/v14/internal/utils"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/internal/utils"
"golang.org/x/xerrors"
)

// see the deltaBitPack encoder for a description of the encoding format that is
Expand All @@ -41,7 +41,7 @@ type deltaBitPackDecoder struct {
bitdecoder *utils.BitReader
blockSize uint64
currentBlockVals uint32
miniBlocks uint64
miniBlocksPerBlock uint64
valsPerMini uint32
currentMiniBlockVals uint32
minDelta int64
Expand Down Expand Up @@ -79,24 +79,26 @@ func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error {
var ok bool
d.blockSize, ok = d.bitdecoder.GetVlqInt()
if !ok {
return xerrors.New("parquet: eof exception")
return errors.New("parquet: eof exception")
}

if d.miniBlocks, ok = d.bitdecoder.GetVlqInt(); !ok {
return xerrors.New("parquet: eof exception")
if d.miniBlocksPerBlock, ok = d.bitdecoder.GetVlqInt(); !ok {
return errors.New("parquet: eof exception")
}
if d.miniBlocksPerBlock == 0 {
return errors.New("parquet: cannot have zero miniblock per block")
}

if d.totalValues, ok = d.bitdecoder.GetVlqInt(); !ok {
return xerrors.New("parquet: eof exception")
return errors.New("parquet: eof exception")
}

if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
return xerrors.New("parquet: eof exception")
return errors.New("parquet: eof exception")
}

if d.miniBlocks != 0 {
d.valsPerMini = uint32(d.blockSize / d.miniBlocks)
}
d.valsPerMini = uint32(d.blockSize / d.miniBlocksPerBlock)
d.usedFirst = false
return nil
}

Expand All @@ -105,14 +107,14 @@ func (d *deltaBitPackDecoder) initBlock() error {
// first we grab the min delta value that we'll start from
var ok bool
if d.minDelta, ok = d.bitdecoder.GetZigZagVlqInt(); !ok {
return xerrors.New("parquet: eof exception")
return errors.New("parquet: eof exception")
}

// ensure we have enough space for our miniblocks to decode the widths
d.deltaBitWidths.Resize(int(d.miniBlocks))
d.deltaBitWidths.Resize(int(d.miniBlocksPerBlock))

var err error
for i := uint64(0); i < d.miniBlocks; i++ {
for i := uint64(0); i < d.miniBlocksPerBlock; i++ {
if d.deltaBitWidths.Bytes()[i], err = d.bitdecoder.ReadByte(); err != nil {
return err
}
Expand Down Expand Up @@ -143,7 +145,7 @@ func (d *DeltaBitPackInt32Decoder) unpackNextMini() error {
for j := 0; j < int(d.valsPerMini); j++ {
delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
if !ok {
return xerrors.New("parquet: eof exception")
return errors.New("parquet: eof exception")
}

d.lastVal += int64(delta) + int64(d.minDelta)
Expand Down Expand Up @@ -172,6 +174,9 @@ func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) {
for len(out) > 0 { // unpack mini blocks until we get all the values we need
if d.currentBlockVals == 0 {
err = d.initBlock()
if err != nil {
return 0, err
}
}
if d.currentMiniBlockVals == 0 {
err = d.unpackNextMini()
Expand Down Expand Up @@ -200,7 +205,7 @@ func (d *DeltaBitPackInt32Decoder) DecodeSpaced(out []int32, nullCount int, vali
return values, err
}
if values != toread {
return values, xerrors.New("parquet: number of values / definition levels read did not match")
return values, errors.New("parquet: number of values / definition levels read did not match")
}

return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
Expand Down Expand Up @@ -231,10 +236,10 @@ func (d *DeltaBitPackInt64Decoder) unpackNextMini() error {
for j := 0; j < int(d.valsPerMini); j++ {
delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth))
if !ok {
return xerrors.New("parquet: eof exception")
return errors.New("parquet: eof exception")
}

d.lastVal += int64(delta) + int64(d.minDelta)
d.lastVal += int64(delta) + d.minDelta
d.miniBlockValues = append(d.miniBlockValues, d.lastVal)
}
d.miniBlockIdx++
Expand All @@ -260,6 +265,9 @@ func (d *DeltaBitPackInt64Decoder) Decode(out []int64) (int, error) {
for len(out) > 0 {
if d.currentBlockVals == 0 {
err = d.initBlock()
if err != nil {
return 0, err
}
}
if d.currentMiniBlockVals == 0 {
err = d.unpackNextMini()
Expand Down Expand Up @@ -293,7 +301,7 @@ func (d DeltaBitPackInt64Decoder) DecodeSpaced(out []int64, nullCount int, valid
return values, err
}
if values != toread {
return values, xerrors.New("parquet: number of values / definition levels read did not match")
return values, errors.New("parquet: number of values / definition levels read did not match")
}

return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
Expand Down
44 changes: 44 additions & 0 deletions go/parquet/internal/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,28 @@ func TestWriteDeltaBitPackedInt32(t *testing.T) {
assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
}
})

t.Run("test decoding multiple pages", func(t *testing.T) {
values := make([]int32, 1000)
testutils.FillRandomInt32(0, values)

enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
enc.(encoding.Int32Encoder).Put(values)
buf, _ := enc.FlushValues()
defer buf.Release()

// Using same Decoder to decode the data.
dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
for i := 0; i < 5; i += 1 {
dec.(encoding.Int32Decoder).SetData(len(values), buf.Bytes())

valueBuf := make([]int32, 100)
for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
dec.(encoding.Int32Decoder).Decode(valueBuf)
assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
}
}
})
}

func TestWriteDeltaBitPackedInt64(t *testing.T) {
Expand Down Expand Up @@ -670,6 +692,28 @@ func TestWriteDeltaBitPackedInt64(t *testing.T) {
assert.Equal(t, len(valueBuf), decoded)
assert.Equal(t, values, valueBuf)
})

t.Run("test decoding multiple pages", func(t *testing.T) {
values := make([]int64, 1000)
testutils.FillRandomInt64(0, values)

enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator)
enc.(encoding.Int64Encoder).Put(values)
buf, _ := enc.FlushValues()
defer buf.Release()

// Using same Decoder to decode the data.
dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator)
for i := 0; i < 5; i += 1 {
dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes())

valueBuf := make([]int64, 100)
for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) {
dec.(encoding.Int64Decoder).Decode(valueBuf)
assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j)
}
}
})
}

func TestDeltaLengthByteArrayEncoding(t *testing.T) {
Expand Down

0 comments on commit 33603e6

Please sign in to comment.