Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38462: [Go][Parquet] Handle Boolean RLE encoding/decoding #38367

Merged
merged 7 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion go/parquet/file/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package file

import (
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -345,14 +346,19 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error {
c.curDecoder = decoder
} else {
switch encoding {
case format.Encoding_RLE:
if c.descr.PhysicalType() != parquet.Types.Boolean {
return fmt.Errorf("parquet: only boolean supports RLE encoding, got %s", c.descr.PhysicalType())
}
fallthrough
case format.Encoding_PLAIN,
format.Encoding_DELTA_BYTE_ARRAY,
format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
format.Encoding_DELTA_BINARY_PACKED:
c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
c.decoders[encoding] = c.curDecoder
case format.Encoding_RLE_DICTIONARY:
return xerrors.New("parquet: dictionary page must be before data page")
return errors.New("parquet: dictionary page must be before data page")
case format.Encoding_BYTE_STREAM_SPLIT:
return fmt.Errorf("parquet: unsupported data encoding %s", encoding)
default:
Expand Down
61 changes: 61 additions & 0 deletions go/parquet/file/file_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"crypto/rand"
"encoding/binary"
"io"
"os"
"path"
"testing"

"github.com/apache/arrow/go/v14/arrow/memory"
Expand Down Expand Up @@ -385,3 +387,62 @@ func TestDeltaLengthByteArrayPackingWithNulls(t *testing.T) {
assert.NotNil(t, readData[0])
}
}

func TestRleBooleanEncodingFileRead(t *testing.T) {
dir := os.Getenv("PARQUET_TEST_DATA")
if dir == "" {
t.Skip("no path supplied with PARQUET_TEST_DATA")
}
assert.DirExists(t, dir)

props := parquet.NewReaderProperties(memory.DefaultAllocator)
fileReader, err := file.OpenParquetFile(path.Join(dir, "rle_boolean_encoding.parquet"),
false, file.WithReadProps(props))
require.NoError(t, err)
defer fileReader.Close()

assert.Equal(t, 1, fileReader.NumRowGroups())
rgr := fileReader.RowGroup(0)
assert.EqualValues(t, 68, rgr.NumRows())

rdr, err := rgr.Column(0)
require.NoError(t, err)
brdr := rdr.(*file.BooleanColumnChunkReader)

values := make([]bool, 68)
defLvls, repLvls := make([]int16, 68), make([]int16, 68)
total, read, err := brdr.ReadBatch(68, values, defLvls, repLvls)
require.NoError(t, err)

assert.EqualValues(t, 68, total)
md, err := rgr.MetaData().ColumnChunk(0)
require.NoError(t, err)
stats, err := md.Statistics()
require.NoError(t, err)
assert.EqualValues(t, total-stats.NullCount(), read)

expected := []bool{
true, false, true, true, false, false,
true, true, true, false, false, true, true,
false, true, true, false, false, true, true,
false, true, true, false, false, true, true,
true, false, false, false, false, true, true,
false, true, true, false, false, true, true,
true, false, false, true, true, false, false,
true, true, true, false, true, true, false,
true, true, false, false, true, true, true,
}
expectedNulls := []int{2, 15, 23, 38, 48, 60}

expectedNullIdx := 0
for i, v := range defLvls {
if expectedNullIdx < len(expectedNulls) && i == expectedNulls[expectedNullIdx] {
assert.Zero(t, v)
expectedNullIdx++
} else {
assert.EqualValues(t, 1, v)
}
}

assert.Equal(t, expected, values[:len(expected)])
}
82 changes: 80 additions & 2 deletions go/parquet/internal/encoding/boolean_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,16 @@
package encoding

import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"io"

"github.com/apache/arrow/go/v14/arrow/bitutil"
shared_utils "github.com/apache/arrow/go/v14/internal/utils"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/internal/utils"
"golang.org/x/xerrors"
)

// PlainBooleanDecoder is for the Plain Encoding type, there is no
Expand Down Expand Up @@ -103,7 +108,80 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBit
return 0, err
}
if valuesRead != toRead {
return valuesRead, xerrors.New("parquet: boolean decoder: number of values / definition levels read did not match")
return valuesRead, errors.New("parquet: boolean decoder: number of values / definition levels read did not match")
}
return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}
return dec.Decode(out)
}

type RleBooleanDecoder struct {
decoder

rleDec *utils.RleDecoder
}

func (RleBooleanDecoder) Type() parquet.Type {
return parquet.Types.Boolean
}

func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
dec.nvals = nvals

if len(data) < 4 {
return fmt.Errorf("invalid length - %d (corrupt data page?)", len(data))
}

// load the first 4 bytes in little-endian which indicates the length
nbytes := binary.LittleEndian.Uint32(data[:4])
if nbytes > uint32(len(data)-4) {
return fmt.Errorf("received invalid number of bytes - %d (corrupt data page?)", nbytes)
}

dec.data = data[4:]
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
if dec.rleDec == nil {
dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
} else {
dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
}
return nil
}

func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
max := shared_utils.MinInt(len(out), dec.nvals)

var (
buf [1024]uint64
n = max
pitrou marked this conversation as resolved.
Show resolved Hide resolved
)

for n > 0 {
batch := shared_utils.MinInt(len(buf), n)
decoded := dec.rleDec.GetBatch(buf[:batch])
if decoded != batch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should dec.nvals dec in this branch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in theory the return of the error would indicate that the decoder should not be used any further, so I don't know if we necessarily need to decode dec.nvals but it also wouldn't be wrong to do so.

return max - n, io.ErrUnexpectedEOF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a point in returning max - n here? What is the caller supposed to do with it?
(note you aren't actually writing out the decoded values, so I assume the decoded bytes are lost, which means the decoder can't be used anymore afterwards)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we may meet the case if user use Decoder directly.

However, during arrow reading it, I guess the upper reader will keep non-null value count and didn't hit the branch here?

Copy link
Member Author

@zeroshade zeroshade Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are writing out the decoded values via the loop at line 165. The input to this function is an output slice to write to and we populate it after the call to GetBatch. So returning max - n here informs the caller of how many values were populated into that slice before the error was hit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But dec.rleDec.GetBatch has consumed some input and decoded some bytes that are just lost in buf, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. After receiving an error, a given decoder should not continue to be used, but we return the number of successfully output values so that a caller knows what values are there before the error was encountered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldn't matter without using raw-decode api. Because RecordReader will handing Decode well. But it's still make the syntax a bit inconsistent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you have a point though I think that would be a greater issue to think about attempting to fix.

For DataPageV2Header it contains the number of nulls so we can easily handle changing dec.nvals to the correct number, but for DataPageV1 you have a point that technically this might be slightly off or is at least a bug waiting to happen that should be addressed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, lets create an issue for that. I think user would not easily touch it because we always suggest using RecordReader, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or going through the ColumnChunkReader which has similar correct handling. We don't actually expose the raw decoder api for users to access at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha that's right.

}

for i := 0; i < batch; i++ {
out[i] = buf[i] != 0
}
n -= batch
out = out[batch:]
}

dec.nvals -= max
return max, nil
}

func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount > 0 {
toRead := len(out) - nullCount
valuesRead, err := dec.Decode(out[:toRead])
if err != nil {
return 0, err
}
if valuesRead != toRead {
return valuesRead, errors.New("parquet: rle boolean decoder: number of values / definition levels read did not match")
}
return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}
Expand Down
55 changes: 55 additions & 0 deletions go/parquet/internal/encoding/boolean_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package encoding

import (
"encoding/binary"

"github.com/apache/arrow/go/v14/arrow/bitutil"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/internal/debug"
"github.com/apache/arrow/go/v14/parquet/internal/utils"
)

Expand Down Expand Up @@ -87,3 +90,55 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer, error) {

return enc.sink.Finish(), nil
}

const rleLengthInBytes = 4

type RleBooleanEncoder struct {
encoder

bufferedValues []bool
}

func (RleBooleanEncoder) Type() parquet.Type {
return parquet.Types.Boolean
}

func (enc *RleBooleanEncoder) Put(in []bool) {
enc.bufferedValues = append(enc.bufferedValues, in...)
}

func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte, validBitsOffset int64) {
bufferOut := make([]bool, len(in))
nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
enc.Put(bufferOut[:nvalid])
}

func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
return rleLengthInBytes + int64(enc.maxRleBufferSize())
}

func (enc *RleBooleanEncoder) maxRleBufferSize() int {
return utils.MaxRLEBufferSize(1, len(enc.bufferedValues)) +
utils.MinRLEBufferSize(1)
}

func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
rleBufferSizeMax := enc.maxRleBufferSize()
enc.sink.SetOffset(rleLengthInBytes)
enc.sink.Reserve(rleBufferSizeMax)

rleEncoder := utils.NewRleEncoder(enc.sink, 1)
for _, v := range enc.bufferedValues {
if v {
rleEncoder.Put(1)
} else {
rleEncoder.Put(0)
}
}
n := rleEncoder.Flush()
debug.Assert(n <= rleBufferSizeMax, "num encoded bytes larger than expected max")
buf := enc.sink.Finish()
binary.LittleEndian.PutUint32(buf.Bytes(), uint32(n))

return buf, nil
}
2 changes: 1 addition & 1 deletion go/parquet/internal/encoding/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ func (d *dictEncoder) FlushValues() (Buffer, error) {
// EstimatedDataEncodedSize returns the maximum number of bytes needed to store the RLE encoded indexes, not including the
// dictionary index in the computation.
func (d *dictEncoder) EstimatedDataEncodedSize() int64 {
return 1 + int64(utils.MaxBufferSize(d.BitWidth(), len(d.idxValues))+utils.MinBufferSize(d.BitWidth()))
return 1 + int64(utils.MaxRLEBufferSize(d.BitWidth(), len(d.idxValues))+utils.MinRLEBufferSize(d.BitWidth()))
}

// NumEntries returns the number of entires in the dictionary index for this encoder.
Expand Down
12 changes: 12 additions & 0 deletions go/parquet/internal/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,16 @@ func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
b.checkRoundTrip(parquet.Encodings.Plain)
}

func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
switch b.typ {
case reflect.TypeOf(true):
b.initData(2000, 200)
pitrou marked this conversation as resolved.
Show resolved Hide resolved
b.checkRoundTrip(parquet.Encodings.RLE)
default:
b.T().SkipNow()
}
}

func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() {
b.initData(10000, 1)

Expand Down Expand Up @@ -408,6 +418,8 @@ func (b *BaseEncodingTestSuite) TestSpacedRoundTrip() {
if validBits != nil {
b.checkRoundTripSpaced(parquet.Encodings.Plain, validBits, validBitsOffset)
switch b.typ {
case reflect.TypeOf(false):
b.checkRoundTripSpaced(parquet.Encodings.RLE, validBits, validBitsOffset)
case reflect.TypeOf(int32(0)), reflect.TypeOf(int64(0)):
b.checkRoundTripSpaced(parquet.Encodings.DeltaBinaryPacked, validBits, validBitsOffset)
case reflect.TypeOf(parquet.ByteArray{}):
Expand Down
2 changes: 1 addition & 1 deletion go/parquet/internal/encoding/levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func LevelEncodingMaxBufferSize(encoding parquet.Encoding, maxLvl int16, nbuffer
nbytes := 0
switch encoding {
case parquet.Encodings.RLE:
nbytes = utils.MaxBufferSize(bitWidth, nbuffered) + utils.MinBufferSize(bitWidth)
nbytes = utils.MaxRLEBufferSize(bitWidth, nbuffered) + utils.MinRLEBufferSize(bitWidth)
case parquet.Encodings.BitPacked:
nbytes = int(bitutil.BytesForBits(int64(nbuffered * bitWidth)))
default:
Expand Down
4 changes: 4 additions & 0 deletions go/parquet/internal/encoding/typed_encoder.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *s
switch e {
case format.Encoding_PLAIN:
return &Plain{{.Name}}Encoder{encoder: newEncoderBase(e, descr, mem)}
{{- if eq .Name "Boolean" }}
case format.Encoding_RLE:
return &RleBooleanEncoder{encoder: newEncoderBase(e, descr, mem)}
{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case format.Encoding_DELTA_BINARY_PACKED:
return DeltaBitPack{{.Name}}Encoder{&deltaBitPackEncoder{
Expand Down Expand Up @@ -117,6 +121,10 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column,
switch e {
case parquet.Encodings.Plain:
return &Plain{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e), descr)}
{{- if eq .Name "Boolean" }}
case parquet.Encodings.RLE:
return &RleBooleanDecoder{decoder: newDecoderBase(format.Encoding(e), descr)}
{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case parquet.Encodings.DeltaBinaryPacked:
if mem == nil {
Expand Down
13 changes: 7 additions & 6 deletions go/parquet/internal/testutils/random.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,15 +438,16 @@ func fillRandomIsValid(seed uint64, pctNull float64, out []bool) {
// If the type is parquet.ByteArray or parquet.FixedLenByteArray, heap must not be null.
//
// The default values are:
// []bool uses the current time as the seed with only values of 1 being false, for use
// of creating validity boolean slices.
// all other types use 0 as the seed
// a []parquet.ByteArray is populated with lengths between 2 and 12
// a []parquet.FixedLenByteArray is populated with fixed size random byte arrays of length 12.
//
// []bool uses the current time as the seed with only values of 1 being false, for use
// of creating validity boolean slices.
// all other types use 0 as the seed
// a []parquet.ByteArray is populated with lengths between 2 and 12
// a []parquet.FixedLenByteArray is populated with fixed size random byte arrays of length 12.
func InitValues(values interface{}, heap *memory.Buffer) {
switch arr := values.(type) {
case []bool:
fillRandomIsValid(uint64(time.Now().Unix()), 1.0, arr)
fillRandomIsValid(uint64(time.Now().Unix()), 0.5, arr)
case []int32:
FillRandomInt32(0, arr)
case []int64:
Expand Down
2 changes: 1 addition & 1 deletion go/parquet/internal/utils/bit_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ func (r *RLERandomSuite) checkRoundTrip(vals []uint64, width int) bool {

func (r *RLERandomSuite) checkRoundTripSpaced(vals arrow.Array, width int) {
nvalues := vals.Len()
bufsize := utils.MaxBufferSize(width, nvalues)
bufsize := utils.MaxRLEBufferSize(width, nvalues)

buffer := make([]byte, bufsize)
encoder := utils.NewRleEncoder(utils.NewWriterAtBuffer(buffer), width)
Expand Down
Loading
Loading