Skip to content

Commit

Permalink
GH-37419: [Go][Parquet] Decimal256 support for pqarrow (#37503)
Browse files Browse the repository at this point in the history
### Rationale for this change
When support for decimal128 was added to the Go Parquet lib, decimal256 wasn't yet implemented in the library. This adds proper support to read/write decimal256 with Parquet based on origin schemas and precision.

### Are these changes tested?
yes

* Closes: #37419

Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored Sep 5, 2023
1 parent b5d36e9 commit 697a2cb
Show file tree
Hide file tree
Showing 6 changed files with 298 additions and 43 deletions.
5 changes: 5 additions & 0 deletions go/arrow/decimal128/decimal128.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ import (
"github.com/apache/arrow/go/v14/arrow/internal/debug"
)

const (
MaxPrecision = 38
MaxScale = 38
)

var (
MaxDecimal128 = New(542101086242752217, 687399551400673280-1)
)
Expand Down
122 changes: 108 additions & 14 deletions go/parquet/pqarrow/column_readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/bitutil"
"github.com/apache/arrow/go/v14/arrow/decimal128"
"github.com/apache/arrow/go/v14/arrow/decimal256"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/apache/arrow/go/v14/internal/utils"
"github.com/apache/arrow/go/v14/parquet"
Expand Down Expand Up @@ -493,14 +494,14 @@ func transferColumnData(rdr file.RecordReader, valueType arrow.DataType, descr *
data = transferDate64(rdr, valueType)
case arrow.FIXED_SIZE_BINARY, arrow.BINARY, arrow.STRING, arrow.LARGE_BINARY, arrow.LARGE_STRING:
return transferBinary(rdr, valueType), nil
case arrow.DECIMAL:
case arrow.DECIMAL, arrow.DECIMAL256:
switch descr.PhysicalType() {
case parquet.Types.Int32, parquet.Types.Int64:
data = transferDecimalInteger(rdr, valueType)
case parquet.Types.ByteArray, parquet.Types.FixedLenByteArray:
return transferDecimalBytes(rdr.(file.BinaryRecordReader), valueType)
default:
return nil, errors.New("physical type for decimal128 must be int32, int64, bytearray or fixed len byte array")
return nil, errors.New("physical type for decimal128/decimal256 must be int32, int64, bytearray or fixed len byte array")
}
case arrow.TIMESTAMP:
tstype := valueType.(*arrow.TimestampType)
Expand Down Expand Up @@ -722,10 +723,20 @@ func transferDecimalInteger(rdr file.RecordReader, dt arrow.DataType) arrow.Arra
values = reflect.ValueOf(arrow.Int64Traits.CastFromBytes(rdr.Values())[:length])
}

data := make([]byte, arrow.Decimal128Traits.BytesRequired(length))
out := arrow.Decimal128Traits.CastFromBytes(data)
for i := 0; i < values.Len(); i++ {
out[i] = decimal128.FromI64(values.Index(i).Int())
var data []byte
switch dt.ID() {
case arrow.DECIMAL128:
data = make([]byte, arrow.Decimal128Traits.BytesRequired(length))
out := arrow.Decimal128Traits.CastFromBytes(data)
for i := 0; i < values.Len(); i++ {
out[i] = decimal128.FromI64(values.Index(i).Int())
}
case arrow.DECIMAL256:
data = make([]byte, arrow.Decimal256Traits.BytesRequired(length))
out := arrow.Decimal256Traits.CastFromBytes(data)
for i := 0; i < values.Len(); i++ {
out[i] = decimal256.FromI64(values.Index(i).Int())
}
}

var nullmap *memory.Buffer
Expand Down Expand Up @@ -801,28 +812,72 @@ func bigEndianToDecimal128(buf []byte) (decimal128.Num, error) {
return decimal128.New(hi, uint64(lo)), nil
}

func bigEndianToDecimal256(buf []byte) (decimal256.Num, error) {
const (
minDecimalBytes = 1
maxDecimalBytes = 32
)

if len(buf) < minDecimalBytes || len(buf) > maxDecimalBytes {
return decimal256.Num{},
fmt.Errorf("%w: length of byte array for bigEndianToDecimal256 was %d but must be between %d and %d",
arrow.ErrInvalid, len(buf), minDecimalBytes, maxDecimalBytes)
}

var littleEndian [4]uint64
// bytes are coming in big-endian, so the first byte is the MSB and
// therefore holds the sign bit
initWord, isNeg := uint64(0), int8(buf[0]) < 0
if isNeg {
// sign extend if necessary
initWord = uint64(0xFFFFFFFFFFFFFFFF)
}

for wordIdx := 0; wordIdx < 4; wordIdx++ {
wordLen := utils.MinInt(len(buf), arrow.Uint64SizeBytes)
word := buf[len(buf)-wordLen:]

if wordLen == 8 {
// full words can be assigned as-is
littleEndian[wordIdx] = binary.BigEndian.Uint64(word)
} else {
result := initWord
if len(buf) > 0 {
// incorporate the actual values if present
// shift left enough bits to make room for the incoming int64
result = result << uint64(wordLen)
// preserve the upper bits by inplace OR-ing the int64
result |= uint64FromBigEndianShifted(word)
}
littleEndian[wordIdx] = result
}

buf = buf[:len(buf)-wordLen]
}

return decimal256.New(littleEndian[3], littleEndian[2], littleEndian[1], littleEndian[0]), nil
}

type varOrFixedBin interface {
arrow.Array
Value(i int) []byte
}

// convert physical byte storage, instead of integers, to decimal128
func transferDecimalBytes(rdr file.BinaryRecordReader, dt arrow.DataType) (*arrow.Chunked, error) {
convert := func(arr arrow.Array) (arrow.Array, error) {
length := arr.Len()
convert128 := func(in varOrFixedBin) (arrow.Array, error) {
length := in.Len()
data := make([]byte, arrow.Decimal128Traits.BytesRequired(length))
out := arrow.Decimal128Traits.CastFromBytes(data)

input := arr.(varOrFixedBin)
nullCount := input.NullN()

nullCount := in.NullN()
var err error
for i := 0; i < length; i++ {
if nullCount > 0 && input.IsNull(i) {
if nullCount > 0 && in.IsNull(i) {
continue
}

rec := input.Value(i)
rec := in.Value(i)
if len(rec) <= 0 {
return nil, fmt.Errorf("invalud BYTEARRAY length for type: %s", dt)
}
Expand All @@ -833,12 +888,51 @@ func transferDecimalBytes(rdr file.BinaryRecordReader, dt arrow.DataType) (*arro
}

ret := array.NewData(dt, length, []*memory.Buffer{
input.Data().Buffers()[0], memory.NewBufferBytes(data),
in.Data().Buffers()[0], memory.NewBufferBytes(data),
}, nil, nullCount, 0)
defer ret.Release()
return array.MakeFromData(ret), nil
}

convert256 := func(in varOrFixedBin) (arrow.Array, error) {
length := in.Len()
data := make([]byte, arrow.Decimal256Traits.BytesRequired(length))
out := arrow.Decimal256Traits.CastFromBytes(data)

nullCount := in.NullN()
var err error
for i := 0; i < length; i++ {
if nullCount > 0 && in.IsNull(i) {
continue
}

rec := in.Value(i)
if len(rec) <= 0 {
return nil, fmt.Errorf("invalid BYTEARRAY length for type: %s", dt)
}
out[i], err = bigEndianToDecimal256(rec)
if err != nil {
return nil, err
}
}

ret := array.NewData(dt, length, []*memory.Buffer{
in.Data().Buffers()[0], memory.NewBufferBytes(data),
}, nil, nullCount, 0)
defer ret.Release()
return array.MakeFromData(ret), nil
}

convert := func(arr arrow.Array) (arrow.Array, error) {
switch dt.ID() {
case arrow.DECIMAL128:
return convert128(arr.(varOrFixedBin))
case arrow.DECIMAL256:
return convert256(arr.(varOrFixedBin))
}
return nil, arrow.ErrNotImplemented
}

chunks := rdr.GetBuilderChunks()
var err error
for idx, chunk := range chunks {
Expand Down
63 changes: 63 additions & 0 deletions go/parquet/pqarrow/encode_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@ import (
"encoding/binary"
"errors"
"fmt"
"math"
"time"
"unsafe"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/bitutil"
"github.com/apache/arrow/go/v14/arrow/decimal128"
"github.com/apache/arrow/go/v14/arrow/decimal256"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/apache/arrow/go/v14/internal/utils"
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/file"
"github.com/apache/arrow/go/v14/parquet/internal/debug"
)

// get the count of the number of leaf arrays for the type
Expand Down Expand Up @@ -327,6 +330,18 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr
for idx, val := range leafArr.(*array.Date64).Date64Values() {
data[idx] = int32(val / 86400000) // coerce date64 values
}
case arrow.DECIMAL128:
for idx, val := range leafArr.(*array.Decimal128).Values() {
debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
data[idx] = int32(val.LowBits())
}
case arrow.DECIMAL256:
for idx, val := range leafArr.(*array.Decimal256).Values() {
debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "casting Decimal128 greater than the value range; high bits must be 0 or -1")
debug.Assert(val.LowBits() <= math.MaxUint32, "casting Decimal128 to int32 when value > MaxUint32")
data[idx] = int32(val.LowBits())
}
default:
return fmt.Errorf("type mismatch, column is int32 writer, arrow array is %s, and not a compatible type", leafArr.DataType().Name())
}
Expand Down Expand Up @@ -396,6 +411,20 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr
data = arrow.Int64Traits.CastFromBytes(leafArr.Data().Buffers()[1].Bytes())
data = data[leafArr.Data().Offset() : leafArr.Data().Offset()+leafArr.Len()]
}
case arrow.DECIMAL128:
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
for idx, val := range leafArr.(*array.Decimal128).Values() {
debug.Assert(val.HighBits() == 0 || val.HighBits() == -1, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
data[idx] = int64(val.LowBits())
}
case arrow.DECIMAL256:
ctx.dataBuffer.ResizeNoShrink(arrow.Int64Traits.BytesRequired(leafArr.Len()))
data = arrow.Int64Traits.CastFromBytes(ctx.dataBuffer.Bytes())
for idx, val := range leafArr.(*array.Decimal256).Values() {
debug.Assert(val.Array()[3] == 0 || val.Array()[3] == 0xFFFFFFFF, "trying to cast Decimal128 to int64 greater than range, high bits must be 0 or -1")
data[idx] = int64(val.LowBits())
}
default:
return fmt.Errorf("unimplemented arrow type to write to int64 column: %s", leafArr.DataType().Name())
}
Expand Down Expand Up @@ -519,6 +548,40 @@ func writeDenseArrow(ctx *arrowWriteContext, cw file.ColumnChunkWriter, leafArr
}
wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
}
case *arrow.Decimal256Type:
// parquet decimal are stored with FixedLength values where the length is
// proportional to the precision. Arrow's Decimal are always stored with 16/32
// bytes. thus the internal FLBA must be adjusted by the offset calculation
offset := int(bitutil.BytesForBits(int64(dt.BitWidth()))) - int(DecimalSize(dt.Precision))
ctx.dataBuffer.ResizeNoShrink((leafArr.Len() - leafArr.NullN()) * dt.BitWidth())
scratch := ctx.dataBuffer.Bytes()
typeLen := wr.Descr().TypeLength()
fixDecimalEndianness := func(in decimal256.Num) parquet.FixedLenByteArray {
out := scratch[offset : offset+typeLen]
vals := in.Array()
binary.BigEndian.PutUint64(scratch, vals[3])
binary.BigEndian.PutUint64(scratch[arrow.Uint64SizeBytes:], vals[2])
binary.BigEndian.PutUint64(scratch[2*arrow.Uint64SizeBytes:], vals[1])
binary.BigEndian.PutUint64(scratch[3*arrow.Uint64SizeBytes:], vals[0])
scratch = scratch[4*arrow.Uint64SizeBytes:]
return out
}

data := make([]parquet.FixedLenByteArray, leafArr.Len())
arr := leafArr.(*array.Decimal256)
if leafArr.NullN() == 0 {
for idx := range data {
data[idx] = fixDecimalEndianness(arr.Value(idx))
}
_, err = wr.WriteBatch(data, defLevels, repLevels)
} else {
for idx := range data {
if arr.IsValid(idx) {
data[idx] = fixDecimalEndianness(arr.Value(idx))
}
}
wr.WriteBatchSpaced(data, defLevels, repLevels, arr.NullBitmapBytes(), int64(arr.Data().Offset()))
}
default:
return fmt.Errorf("%w: invalid column type to write to FixedLenByteArray: %s", arrow.ErrInvalid, leafArr.DataType().Name())
}
Expand Down
65 changes: 58 additions & 7 deletions go/parquet/pqarrow/encode_arrow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/bitutil"
"github.com/apache/arrow/go/v14/arrow/decimal128"
"github.com/apache/arrow/go/v14/arrow/decimal256"
"github.com/apache/arrow/go/v14/arrow/ipc"
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/apache/arrow/go/v14/internal/types"
Expand Down Expand Up @@ -474,9 +475,9 @@ func getLogicalType(typ arrow.DataType) schema.LogicalType {
default:
panic("only micro and nano seconds are supported for arrow TIME64")
}
case arrow.DECIMAL:
dec := typ.(*arrow.Decimal128Type)
return schema.NewDecimalLogicalType(dec.Precision, dec.Scale)
case arrow.DECIMAL, arrow.DECIMAL256:
dec := typ.(arrow.DecimalType)
return schema.NewDecimalLogicalType(dec.GetPrecision(), dec.GetScale())
}
return schema.NoLogicalType{}
}
Expand Down Expand Up @@ -552,15 +553,15 @@ func (ps *ParquetIOTestSuite) makeSimpleSchema(typ arrow.DataType, rep parquet.R
switch typ := typ.(type) {
case *arrow.FixedSizeBinaryType:
byteWidth = int32(typ.ByteWidth)
case *arrow.Decimal128Type:
byteWidth = pqarrow.DecimalSize(typ.Precision)
case arrow.DecimalType:
byteWidth = pqarrow.DecimalSize(typ.GetPrecision())
case *arrow.DictionaryType:
valuesType := typ.ValueType
switch dt := valuesType.(type) {
case *arrow.FixedSizeBinaryType:
byteWidth = int32(dt.ByteWidth)
case *arrow.Decimal128Type:
byteWidth = pqarrow.DecimalSize(dt.Precision)
case arrow.DecimalType:
byteWidth = pqarrow.DecimalSize(dt.GetPrecision())
}
}

Expand Down Expand Up @@ -932,6 +933,56 @@ func (ps *ParquetIOTestSuite) TestReadDecimals() {
ps.True(array.Equal(expected, chunked.Chunk(0)))
}

func (ps *ParquetIOTestSuite) TestReadDecimal256() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)

bigEndian := []parquet.ByteArray{
// 123456
[]byte{1, 226, 64},
// 987654
[]byte{15, 18, 6},
// -123456
[]byte{255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254, 29, 192},
}

bldr := array.NewDecimal256Builder(mem, &arrow.Decimal256Type{Precision: 40, Scale: 3})
defer bldr.Release()

bldr.Append(decimal256.FromU64(123456))
bldr.Append(decimal256.FromU64(987654))
bldr.Append(decimal256.FromI64(-123456))

expected := bldr.NewDecimal256Array()
defer expected.Release()

sc := schema.MustGroup(schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{
schema.Must(schema.NewPrimitiveNodeLogical("decimals", parquet.Repetitions.Required, schema.NewDecimalLogicalType(40, 3), parquet.Types.ByteArray, -1, -1)),
}, -1))

sink := encoding.NewBufferWriter(0, mem)
defer sink.Release()
writer := file.NewParquetWriter(sink, sc)

rgw := writer.AppendRowGroup()
cw, _ := rgw.NextColumn()
cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(bigEndian, nil, nil)
cw.Close()
rgw.Close()
writer.Close()

rdr := ps.createReader(mem, sink.Bytes())
cr, err := rdr.GetColumn(context.TODO(), 0)
ps.NoError(err)

chunked, err := cr.NextBatch(smallSize)
ps.NoError(err)
defer chunked.Release()

ps.Len(chunked.Chunks(), 1)
ps.Truef(array.Equal(expected, chunked.Chunk(0)), "expected: %s\ngot: %s", expected, chunked.Chunk(0))
}

func (ps *ParquetIOTestSuite) TestReadNestedStruct() {
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
defer mem.AssertSize(ps.T(), 0)
Expand Down
Loading

0 comments on commit 697a2cb

Please sign in to comment.