Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-38462: [Go][Parquet] Handle Boolean RLE encoding/decoding #38367

Merged
merged 7 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion go/parquet/file/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package file

import (
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -345,14 +346,19 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error {
c.curDecoder = decoder
} else {
switch encoding {
case format.Encoding_RLE:
if c.descr.PhysicalType() != parquet.Types.Boolean {
return fmt.Errorf("parquet: only boolean supports RLE encoding, got %s", c.descr.PhysicalType())
}
fallthrough
case format.Encoding_PLAIN,
format.Encoding_DELTA_BYTE_ARRAY,
format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
format.Encoding_DELTA_BINARY_PACKED:
c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
c.decoders[encoding] = c.curDecoder
case format.Encoding_RLE_DICTIONARY:
return xerrors.New("parquet: dictionary page must be before data page")
return errors.New("parquet: dictionary page must be before data page")
case format.Encoding_BYTE_STREAM_SPLIT:
return fmt.Errorf("parquet: unsupported data encoding %s", encoding)
default:
Expand Down
78 changes: 78 additions & 0 deletions go/parquet/internal/encoding/boolean_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
package encoding

import (
"bytes"
"encoding/binary"
"fmt"
"io"

"github.com/apache/arrow/go/v14/arrow/bitutil"
shared_utils "github.com/apache/arrow/go/v14/internal/utils"
"github.com/apache/arrow/go/v14/parquet"
Expand Down Expand Up @@ -109,3 +114,76 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBit
}
return dec.Decode(out)
}

type RleBooleanDecoder struct {
decoder

rleDec *utils.RleDecoder
}

func (RleBooleanDecoder) Type() parquet.Type {
return parquet.Types.Boolean
}

func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
dec.nvals = nvals

if len(data) < 4 {
return fmt.Errorf("invalid length - %d (corrupt data page?)", len(data))
}

// load the first 4 bytes in little-endian which indicates the length
nbytes := binary.LittleEndian.Uint32(data[:4])
if nbytes > uint32(len(data)-4) {
return fmt.Errorf("received invalid number of bytes - %d (corrupt data page?)", nbytes)
}

dec.data = data[4:]
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
if dec.rleDec == nil {
dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
} else {
dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
}
return nil
}

func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
max := shared_utils.MinInt(len(out), dec.nvals)

var (
buf [1024]uint64
n = max
pitrou marked this conversation as resolved.
Show resolved Hide resolved
)

for n > 0 {
batch := shared_utils.MinInt(len(buf), n)
decoded := dec.rleDec.GetBatch(buf[:batch])
if decoded != batch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should dec.nvals dec in this branch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in theory the return of the error would indicate that the decoder should not be used any further, so I don't know if we necessarily need to decode dec.nvals but it also wouldn't be wrong to do so.

return max - n, io.ErrUnexpectedEOF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a point in returning max - n here? What is the caller supposed to do with it?
(note you aren't actually writing out the decoded values, so I assume the decoded bytes are lost, which means the decoder can't be used anymore afterwards)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we may meet the case if user use Decoder directly.

However, during arrow reading it, I guess the upper reader will keep non-null value count and didn't hit the branch here?

Copy link
Member Author

@zeroshade zeroshade Oct 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are writing out the decoded values via the loop at line 165. The input to this function is an output slice to write to and we populate it after the call to GetBatch. So returning max - n here informs the caller of how many values were populated into that slice before the error was hit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But dec.rleDec.GetBatch has consumed some input and decoded some bytes that are just lost in buf, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. After receiving an error, a given decoder should not continue to be used, but we return the number of successfully output values so that a caller knows what values are there before the error was encountered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This wouldn't matter without using raw-decode api. Because RecordReader will handing Decode well. But it's still make the syntax a bit inconsistent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, you have a point though I think that would be a greater issue to think about attempting to fix.

For DataPageV2Header it contains the number of nulls so we can easily handle changing dec.nvals to the correct number, but for DataPageV1 you have a point that technically this might be slightly off or is at least a bug waiting to happen that should be addressed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, lets create an issue for that. I think user would not easily touch it because we always suggest using RecordReader, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or going through the ColumnChunkReader which has similar correct handling. We don't actually expose the raw decoder api for users to access at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha that's right.

}

for i := 0; i < batch; i++ {
out[i] = buf[i] != 0
}
n -= batch
out = out[batch:]
}

dec.nvals -= max
return max, nil
}

func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount > 0 {
toRead := len(out) - nullCount
valuesRead, err := dec.Decode(out[:toRead])
if err != nil {
return 0, err
}
if valuesRead != toRead {
return valuesRead, xerrors.New("parquet: rle boolean decoder: number of values / definition levels read did not match")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what is the rationale for using xerrors here rather than the go stdlib as in other places?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

at the time I originally wrote the parquet code, xerrors was used for particular benefits of wrapping and otherwise that weren't available in the stdlib. Since then, all the features of xerrors have been folded into the go stdlib and there really isn't a reason to use it anymore, I intend to phase it out as I make changes to the code. So I'm going to fix this to just use errors, it was my mistake to propagate xerrors here so thanks for catching it.

}
return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}
return dec.Decode(out)
}
52 changes: 52 additions & 0 deletions go/parquet/internal/encoding/boolean_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package encoding

import (
"encoding/binary"

"github.com/apache/arrow/go/v14/arrow/bitutil"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/internal/utils"
Expand Down Expand Up @@ -87,3 +89,53 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer, error) {

return enc.sink.Finish(), nil
}

type RleBooleanEncoder struct {
encoder

bufferedValues []bool
}

func (RleBooleanEncoder) Type() parquet.Type {
return parquet.Types.Boolean
}

func (enc *RleBooleanEncoder) Put(in []bool) {
enc.bufferedValues = append(enc.bufferedValues, in...)
}

func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte, validBitsOffset int64) {
bufferOut := make([]bool, len(in))
nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
enc.Put(bufferOut[:nvalid])
}

func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
const rleLengthInBytes = 4
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
return rleLengthInBytes + int64(enc.maxRleBufferSize())
}

func (enc *RleBooleanEncoder) maxRleBufferSize() int {
return utils.MaxBufferSize(1, len(enc.bufferedValues))
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
}

func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
const rleLengthInBytes = 4
rleBufferSizeMax := enc.maxRleBufferSize()
enc.sink.SetOffset(rleLengthInBytes)
enc.sink.Reserve(rleBufferSizeMax)

encoder := utils.NewRleEncoder(enc.sink, 1)
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
for _, v := range enc.bufferedValues {
if v {
encoder.Put(1)
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
} else {
encoder.Put(0)
}
}
n := encoder.Flush()
zeroshade marked this conversation as resolved.
Show resolved Hide resolved
buf := enc.sink.Finish()
binary.LittleEndian.PutUint32(buf.Bytes(), uint32(n))

return buf, nil
}
10 changes: 10 additions & 0 deletions go/parquet/internal/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,16 @@ func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
b.checkRoundTrip(parquet.Encodings.Plain)
}

func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
switch b.typ {
case reflect.TypeOf(true):
b.initData(2000, 200)
pitrou marked this conversation as resolved.
Show resolved Hide resolved
b.checkRoundTrip(parquet.Encodings.RLE)
default:
b.T().SkipNow()
}
}

func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() {
b.initData(10000, 1)

Expand Down
4 changes: 4 additions & 0 deletions go/parquet/internal/encoding/typed_encoder.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *s
switch e {
case format.Encoding_PLAIN:
return &Plain{{.Name}}Encoder{encoder: newEncoderBase(e, descr, mem)}
{{- if eq .Name "Boolean" }}
case format.Encoding_RLE:
return &RleBooleanEncoder{encoder: newEncoderBase(e, descr, mem)}
{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case format.Encoding_DELTA_BINARY_PACKED:
return DeltaBitPack{{.Name}}Encoder{&deltaBitPackEncoder{
Expand Down Expand Up @@ -117,6 +121,10 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column,
switch e {
case parquet.Encodings.Plain:
return &Plain{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e), descr)}
{{- if eq .Name "Boolean" }}
case parquet.Encodings.RLE:
return &RleBooleanDecoder{decoder: newDecoderBase(format.Encoding(e), descr)}
{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case parquet.Encodings.DeltaBinaryPacked:
if mem == nil {
Expand Down
Loading