diff --git a/go/parquet/internal/encoding/delta_bit_packing.go b/go/parquet/internal/encoding/delta_bit_packing.go index 5b246f4591657..1fb91634e977b 100644 --- a/go/parquet/internal/encoding/delta_bit_packing.go +++ b/go/parquet/internal/encoding/delta_bit_packing.go @@ -18,6 +18,7 @@ package encoding import ( "bytes" + "errors" "math" "math/bits" "reflect" @@ -27,7 +28,6 @@ import ( shared_utils "github.com/apache/arrow/go/v14/internal/utils" "github.com/apache/arrow/go/v14/parquet" "github.com/apache/arrow/go/v14/parquet/internal/utils" - "golang.org/x/xerrors" ) // see the deltaBitPack encoder for a description of the encoding format that is @@ -41,7 +41,7 @@ type deltaBitPackDecoder struct { bitdecoder *utils.BitReader blockSize uint64 currentBlockVals uint32 - miniBlocks uint64 + miniBlocksPerBlock uint64 valsPerMini uint32 currentMiniBlockVals uint32 minDelta int64 @@ -79,24 +79,26 @@ func (d *deltaBitPackDecoder) SetData(nvalues int, data []byte) error { var ok bool d.blockSize, ok = d.bitdecoder.GetVlqInt() if !ok { - return xerrors.New("parquet: eof exception") + return errors.New("parquet: eof exception") } - if d.miniBlocks, ok = d.bitdecoder.GetVlqInt(); !ok { - return xerrors.New("parquet: eof exception") + if d.miniBlocksPerBlock, ok = d.bitdecoder.GetVlqInt(); !ok { + return errors.New("parquet: eof exception") + } + if d.miniBlocksPerBlock == 0 { + return errors.New("parquet: cannot have zero miniblock per block") } if d.totalValues, ok = d.bitdecoder.GetVlqInt(); !ok { - return xerrors.New("parquet: eof exception") + return errors.New("parquet: eof exception") } if d.lastVal, ok = d.bitdecoder.GetZigZagVlqInt(); !ok { - return xerrors.New("parquet: eof exception") + return errors.New("parquet: eof exception") } - if d.miniBlocks != 0 { - d.valsPerMini = uint32(d.blockSize / d.miniBlocks) - } + d.valsPerMini = uint32(d.blockSize / d.miniBlocksPerBlock) + d.usedFirst = false return nil } @@ -105,14 +107,14 @@ func (d *deltaBitPackDecoder) initBlock() error { // first we grab the min delta value that we'll start from var ok bool if d.minDelta, ok = d.bitdecoder.GetZigZagVlqInt(); !ok { - return xerrors.New("parquet: eof exception") + return errors.New("parquet: eof exception") } // ensure we have enough space for our miniblocks to decode the widths - d.deltaBitWidths.Resize(int(d.miniBlocks)) + d.deltaBitWidths.Resize(int(d.miniBlocksPerBlock)) var err error - for i := uint64(0); i < d.miniBlocks; i++ { + for i := uint64(0); i < d.miniBlocksPerBlock; i++ { if d.deltaBitWidths.Bytes()[i], err = d.bitdecoder.ReadByte(); err != nil { return err } @@ -143,7 +145,7 @@ func (d *DeltaBitPackInt32Decoder) unpackNextMini() error { for j := 0; j < int(d.valsPerMini); j++ { delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth)) if !ok { - return xerrors.New("parquet: eof exception") + return errors.New("parquet: eof exception") } d.lastVal += int64(delta) + int64(d.minDelta) @@ -172,6 +174,9 @@ func (d *DeltaBitPackInt32Decoder) Decode(out []int32) (int, error) { for len(out) > 0 { // unpack mini blocks until we get all the values we need if d.currentBlockVals == 0 { err = d.initBlock() + if err != nil { + return 0, err + } } if d.currentMiniBlockVals == 0 { err = d.unpackNextMini() @@ -200,7 +205,7 @@ func (d *DeltaBitPackInt32Decoder) DecodeSpaced(out []int32, nullCount int, vali return values, err } if values != toread { - return values, xerrors.New("parquet: number of values / definition levels read did not match") + return values, errors.New("parquet: number of values / definition levels read did not match") } return spacedExpand(out, nullCount, validBits, validBitsOffset), nil @@ -231,10 +236,10 @@ func (d *DeltaBitPackInt64Decoder) unpackNextMini() error { for j := 0; j < int(d.valsPerMini); j++ { delta, ok := d.bitdecoder.GetValue(int(d.deltaBitWidth)) if !ok { - return xerrors.New("parquet: eof exception") + return errors.New("parquet: eof exception") } - d.lastVal += int64(delta) + int64(d.minDelta) + d.lastVal += int64(delta) + d.minDelta d.miniBlockValues = append(d.miniBlockValues, d.lastVal) } d.miniBlockIdx++ @@ -260,6 +265,9 @@ func (d *DeltaBitPackInt64Decoder) Decode(out []int64) (int, error) { for len(out) > 0 { if d.currentBlockVals == 0 { err = d.initBlock() + if err != nil { + return 0, err + } } if d.currentMiniBlockVals == 0 { err = d.unpackNextMini() @@ -293,7 +301,7 @@ func (d DeltaBitPackInt64Decoder) DecodeSpaced(out []int64, nullCount int, valid return values, err } if values != toread { - return values, xerrors.New("parquet: number of values / definition levels read did not match") + return values, errors.New("parquet: number of values / definition levels read did not match") } return spacedExpand(out, nullCount, validBits, validBitsOffset), nil diff --git a/go/parquet/internal/encoding/encoding_test.go b/go/parquet/internal/encoding/encoding_test.go index 65b085d0ea167..50e72de004e19 100644 --- a/go/parquet/internal/encoding/encoding_test.go +++ b/go/parquet/internal/encoding/encoding_test.go @@ -594,6 +594,28 @@ func TestWriteDeltaBitPackedInt32(t *testing.T) { assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j) } }) + + t.Run("test decoding multiple pages", func(t *testing.T) { + values := make([]int32, 1000) + testutils.FillRandomInt32(0, values) + + enc := encoding.NewEncoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator) + enc.(encoding.Int32Encoder).Put(values) + buf, _ := enc.FlushValues() + defer buf.Release() + + // Using same Decoder to decode the data. + dec := encoding.NewDecoder(parquet.Types.Int32, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator) + for i := 0; i < 5; i += 1 { + dec.(encoding.Int32Decoder).SetData(len(values), buf.Bytes()) + + valueBuf := make([]int32, 100) + for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) { + dec.(encoding.Int32Decoder).Decode(valueBuf) + assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j) + } + } + }) } func TestWriteDeltaBitPackedInt64(t *testing.T) { @@ -670,6 +692,28 @@ func TestWriteDeltaBitPackedInt64(t *testing.T) { assert.Equal(t, len(valueBuf), decoded) assert.Equal(t, values, valueBuf) }) + + t.Run("test decoding multiple pages", func(t *testing.T) { + values := make([]int64, 1000) + testutils.FillRandomInt64(0, values) + + enc := encoding.NewEncoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, false, column, memory.DefaultAllocator) + enc.(encoding.Int64Encoder).Put(values) + buf, _ := enc.FlushValues() + defer buf.Release() + + // Using same Decoder to decode the data. + dec := encoding.NewDecoder(parquet.Types.Int64, parquet.Encodings.DeltaBinaryPacked, column, memory.DefaultAllocator) + for i := 0; i < 5; i += 1 { + dec.(encoding.Int64Decoder).SetData(len(values), buf.Bytes()) + + valueBuf := make([]int64, 100) + for i, j := 0, len(valueBuf); j <= len(values); i, j = i+len(valueBuf), j+len(valueBuf) { + dec.(encoding.Int64Decoder).Decode(valueBuf) + assert.Equalf(t, values[i:j], valueBuf, "indexes %d:%d", i, j) + } + } + }) } func TestDeltaLengthByteArrayEncoding(t *testing.T) {