Skip to content

Commit

Permalink
apacheGH-38345: [Go][Parquet] Handle Boolean RLE encoding/decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
zeroshade committed Oct 19, 2023
1 parent 0428c5e commit c121de0
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 2 deletions.
8 changes: 7 additions & 1 deletion go/parquet/file/column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package file

import (
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -345,14 +346,19 @@ func (c *columnChunkReader) initDataDecoder(page Page, lvlByteLen int64) error {
c.curDecoder = decoder
} else {
switch encoding {
case format.Encoding_RLE:
if c.descr.PhysicalType() != parquet.Types.Boolean {
return fmt.Errorf("parquet: only boolean supports RLE encoding, got %s", c.descr.PhysicalType())
}
fallthrough
case format.Encoding_PLAIN,
format.Encoding_DELTA_BYTE_ARRAY,
format.Encoding_DELTA_LENGTH_BYTE_ARRAY,
format.Encoding_DELTA_BINARY_PACKED:
c.curDecoder = c.decoderTraits.Decoder(parquet.Encoding(encoding), c.descr, false, c.mem)
c.decoders[encoding] = c.curDecoder
case format.Encoding_RLE_DICTIONARY:
return xerrors.New("parquet: dictionary page must be before data page")
return errors.New("parquet: dictionary page must be before data page")
case format.Encoding_BYTE_STREAM_SPLIT:
return fmt.Errorf("parquet: unsupported data encoding %s", encoding)
default:
Expand Down
78 changes: 78 additions & 0 deletions go/parquet/internal/encoding/boolean_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
package encoding

import (
"bytes"
"encoding/binary"
"fmt"
"io"

"github.com/apache/arrow/go/v14/arrow/bitutil"
shared_utils "github.com/apache/arrow/go/v14/internal/utils"
"github.com/apache/arrow/go/v14/parquet"
Expand Down Expand Up @@ -109,3 +114,76 @@ func (dec *PlainBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBit
}
return dec.Decode(out)
}

type RleBooleanDecoder struct {
decoder

rleDec *utils.RleDecoder
}

func (RleBooleanDecoder) Type() parquet.Type {
return parquet.Types.Boolean
}

func (dec *RleBooleanDecoder) SetData(nvals int, data []byte) error {
dec.nvals = nvals

if len(data) < 4 {
return fmt.Errorf("invalid length - %d (corrupt data page?)", len(data))
}

// load the first 4 bytes in little-endian which indicates the length
nbytes := binary.LittleEndian.Uint32(data[:4])
if nbytes > uint32(len(data)-4) {
return fmt.Errorf("received invalid number of bytes - %d (corrupt data page?)", nbytes)
}

dec.data = data[4:]
if dec.rleDec == nil {
dec.rleDec = utils.NewRleDecoder(bytes.NewReader(dec.data), 1)
} else {
dec.rleDec.Reset(bytes.NewReader(dec.data), 1)
}
return nil
}

func (dec *RleBooleanDecoder) Decode(out []bool) (int, error) {
max := shared_utils.MinInt(len(out), dec.nvals)

var (
buf [1024]uint64
n = max
)

for n > 0 {
batch := shared_utils.MinInt(1024, n)
decoded := dec.rleDec.GetBatch(buf[:batch])
if decoded != batch {
return max - n, io.ErrUnexpectedEOF
}

for i := 0; i < batch; i++ {
out[i] = buf[i] != 0
}
n -= batch
out = out[batch:]
}

dec.nvals -= max
return max, nil
}

func (dec *RleBooleanDecoder) DecodeSpaced(out []bool, nullCount int, validBits []byte, validBitsOffset int64) (int, error) {
if nullCount > 0 {
toRead := len(out) - nullCount
valuesRead, err := dec.Decode(out[:toRead])
if err != nil {
return 0, err
}
if valuesRead != toRead {
return valuesRead, xerrors.New("parquet: rle boolean decoder: number of values / definition levels read did not match")
}
return spacedExpand(out, nullCount, validBits, validBitsOffset), nil
}
return dec.Decode(out)
}
52 changes: 52 additions & 0 deletions go/parquet/internal/encoding/boolean_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package encoding

import (
"encoding/binary"

"github.com/apache/arrow/go/v14/arrow/bitutil"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/internal/utils"
Expand Down Expand Up @@ -87,3 +89,53 @@ func (enc *PlainBooleanEncoder) FlushValues() (Buffer, error) {

return enc.sink.Finish(), nil
}

type RleBooleanEncoder struct {
encoder

bufferedValues []bool
}

func (RleBooleanEncoder) Type() parquet.Type {
return parquet.Types.Boolean
}

func (enc *RleBooleanEncoder) Put(in []bool) {
enc.bufferedValues = append(enc.bufferedValues, in...)
}

func (enc *RleBooleanEncoder) PutSpaced(in []bool, validBits []byte, validBitsOffset int64) {
bufferOut := make([]bool, len(in))
nvalid := spacedCompress(in, bufferOut, validBits, validBitsOffset)
enc.Put(bufferOut[:nvalid])
}

func (enc *RleBooleanEncoder) EstimatedDataEncodedSize() int64 {
const rleLengthInBytes = 4
return rleLengthInBytes + int64(enc.maxRleBufferSize())
}

func (enc *RleBooleanEncoder) maxRleBufferSize() int {
return utils.MaxBufferSize(1, len(enc.bufferedValues))
}

func (enc *RleBooleanEncoder) FlushValues() (Buffer, error) {
const rleLengthInBytes = 4
rleBufferSizeMax := enc.maxRleBufferSize()
enc.sink.SetOffset(rleLengthInBytes)
enc.sink.Reserve(rleBufferSizeMax)

encoder := utils.NewRleEncoder(enc.sink, 1)
for _, v := range enc.bufferedValues {
if v {
encoder.Put(1)
} else {
encoder.Put(0)
}
}
n := encoder.Flush()
buf := enc.sink.Finish()
binary.LittleEndian.PutUint32(buf.Bytes(), uint32(n))

return buf, nil
}
10 changes: 10 additions & 0 deletions go/parquet/internal/encoding/encoding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,16 @@ func (b *BaseEncodingTestSuite) TestBasicRoundTrip() {
b.checkRoundTrip(parquet.Encodings.Plain)
}

func (b *BaseEncodingTestSuite) TestRleBooleanEncodingRoundTrip() {
switch b.typ {
case reflect.TypeOf(true):
b.initData(2000, 200)
b.checkRoundTrip(parquet.Encodings.RLE)
default:
b.T().SkipNow()
}
}

func (b *BaseEncodingTestSuite) TestDeltaEncodingRoundTrip() {
b.initData(10000, 1)

Expand Down
4 changes: 4 additions & 0 deletions go/parquet/internal/encoding/typed_encoder.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions go/parquet/internal/encoding/typed_encoder.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func ({{.lower}}EncoderTraits) Encoder(e format.Encoding, useDict bool, descr *s
switch e {
case format.Encoding_PLAIN:
return &Plain{{.Name}}Encoder{encoder: newEncoderBase(e, descr, mem)}
{{- if eq .Name "Boolean" }}
case format.Encoding_RLE:
return &RleBooleanEncoder{encoder: newEncoderBase(e, descr, mem)}
{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case format.Encoding_DELTA_BINARY_PACKED:
return DeltaBitPack{{.Name}}Encoder{&deltaBitPackEncoder{
Expand Down Expand Up @@ -117,6 +121,10 @@ func ({{.lower}}DecoderTraits) Decoder(e parquet.Encoding, descr *schema.Column,
switch e {
case parquet.Encodings.Plain:
return &Plain{{.Name}}Decoder{decoder: newDecoderBase(format.Encoding(e), descr)}
{{- if eq .Name "Boolean" }}
case parquet.Encodings.RLE:
return &RleBooleanDecoder{decoder: newDecoderBase(format.Encoding(e), descr)}
{{- end}}
{{- if or (eq .Name "Int32") (eq .Name "Int64")}}
case parquet.Encodings.DeltaBinaryPacked:
if mem == nil {
Expand Down

0 comments on commit c121de0

Please sign in to comment.