Skip to content

Commit

Permalink
ARROW-17275: [Go][Integration] Handle Large offset types in IPC read/…
Browse files Browse the repository at this point in the history
…write (#13770)

Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored Aug 3, 2022
1 parent 3b987d9 commit db6c099
Show file tree
Hide file tree
Showing 33 changed files with 1,297 additions and 305 deletions.
5 changes: 3 additions & 2 deletions ci/scripts/go_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@ set -ex

source_dir=${1}/go

# when we upgrade to at least go1.18, we can add the new -asan option here
testargs="-race"
case "$(uname)" in
MINGW*)
# -race doesn't work on windows currently
# -asan and -race don't work on windows currently
testargs=""
;;
esac

if [[ "$(go env GOHOSTARCH)" = "s390x" ]]; then
testargs="" # -race not supported on s390x
testargs="" # -race and -asan not supported on s390x
fi

# Go static check (skipped in MinGW)
Expand Down
2 changes: 0 additions & 2 deletions dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1587,7 +1587,6 @@ def _temp_path():

generate_primitive_large_offsets_case([17, 20])
.skip_category('C#')
.skip_category('Go')
.skip_category('JS'),

generate_null_case([10, 0])
Expand Down Expand Up @@ -1634,7 +1633,6 @@ def _temp_path():

generate_nested_large_offsets_case()
.skip_category('C#')
.skip_category('Go')
.skip_category('JS'),

generate_unions_case()
Expand Down
2 changes: 1 addition & 1 deletion docs/source/status.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Data Types
+-------------------+-------+-------+-------+------------+-------+-------+-------+
| List ||||||||
+-------------------+-------+-------+-------+------------+-------+-------+-------+
| Large List ||| | | |||
| Large List ||| | | |||
+-------------------+-------+-------+-------+------------+-------+-------+-------+
| Struct ||||||||
+-------------------+-------+-------+-------+------------+-------+-------+-------+
Expand Down
2 changes: 1 addition & 1 deletion go/arrow/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func init() {
arrow.DURATION: func(data arrow.ArrayData) arrow.Array { return NewDurationData(data) },
arrow.LARGE_STRING: func(data arrow.ArrayData) arrow.Array { return NewLargeStringData(data) },
arrow.LARGE_BINARY: func(data arrow.ArrayData) arrow.Array { return NewLargeBinaryData(data) },
arrow.LARGE_LIST: unsupportedArrayType,
arrow.LARGE_LIST: func(data arrow.ArrayData) arrow.Array { return NewLargeListData(data) },
arrow.INTERVAL: func(data arrow.ArrayData) arrow.Array { return NewIntervalData(data) },
arrow.INTERVAL_MONTH_DAY_NANO: func(data arrow.ArrayData) arrow.Array { return NewMonthDayNanoIntervalData(data) },

Expand Down
6 changes: 5 additions & 1 deletion go/arrow/array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ func TestMakeFromData(t *testing.T) {
array.NewData(&testDataType{arrow.INT64}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */),
}},

{name: "large list", d: &testDataType{arrow.LARGE_LIST}, child: []arrow.ArrayData{
array.NewData(&testDataType{arrow.INT64}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */),
array.NewData(&testDataType{arrow.INT64}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */),
}},

{name: "struct", d: &testDataType{arrow.STRUCT}},
{name: "struct", d: &testDataType{arrow.STRUCT}, child: []arrow.ArrayData{
array.NewData(&testDataType{arrow.INT64}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */),
Expand Down Expand Up @@ -117,7 +122,6 @@ func TestMakeFromData(t *testing.T) {
// unsupported types
{name: "sparse union", d: &testDataType{arrow.SPARSE_UNION}, expPanic: true, expError: "unsupported data type: SPARSE_UNION"},
{name: "dense union", d: &testDataType{arrow.DENSE_UNION}, expPanic: true, expError: "unsupported data type: DENSE_UNION"},
{name: "large list", d: &testDataType{arrow.LARGE_LIST}, expPanic: true, expError: "unsupported data type: LARGE_LIST"},
{name: "decimal256", d: &testDataType{arrow.DECIMAL256}, expPanic: true, expError: "unsupported data type: DECIMAL256"},

// invalid types
Expand Down
14 changes: 14 additions & 0 deletions go/arrow/array/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ import (
"github.com/goccy/go-json"
)

type BinaryLike interface {
arrow.Array
ValueBytes() []byte
ValueOffset64(int) int64
}

// A type which represents an immutable sequence of variable-length binary strings.
type Binary struct {
array
Expand Down Expand Up @@ -64,6 +70,10 @@ func (a *Binary) ValueOffset(i int) int {
return int(a.valueOffsets[a.array.data.offset+i])
}

func (a *Binary) ValueOffset64(i int) int64 {
return int64(a.ValueOffset(i))
}

func (a *Binary) ValueLen(i int) int {
if i < 0 || i >= a.array.data.length {
panic("arrow/array: index out of range")
Expand Down Expand Up @@ -193,6 +203,10 @@ func (a *LargeBinary) ValueOffset(i int) int64 {
return a.valueOffsets[a.array.data.offset+i]
}

func (a *LargeBinary) ValueOffset64(i int) int64 {
return a.ValueOffset(i)
}

func (a *LargeBinary) ValueLen(i int) int {
if i < 0 || i >= a.array.data.length {
panic("arrow/array: index out of range")
Expand Down
2 changes: 2 additions & 0 deletions go/arrow/array/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ func NewBuilder(mem memory.Allocator, dtype arrow.DataType) Builder {
typ := dtype.(*arrow.DictionaryType)
return NewDictionaryBuilder(mem, typ)
case arrow.LARGE_LIST:
typ := dtype.(*arrow.LargeListType)
return NewLargeListBuilder(mem, typ.Elem())
case arrow.MAP:
typ := dtype.(*arrow.MapType)
return NewMapBuilder(mem, typ.KeyType(), typ.ItemType(), typ.KeysSorted)
Expand Down
25 changes: 25 additions & 0 deletions go/arrow/array/compare.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,9 @@ func Equal(left, right arrow.Array) bool {
case *List:
r := right.(*List)
return arrayEqualList(l, r)
case *LargeList:
r := right.(*LargeList)
return arrayEqualLargeList(l, r)
case *FixedSizeList:
r := right.(*FixedSizeList)
return arrayEqualFixedSizeList(l, r)
Expand Down Expand Up @@ -535,6 +538,9 @@ func arrayApproxEqual(left, right arrow.Array, opt equalOption) bool {
case *List:
r := right.(*List)
return arrayApproxEqualList(l, r, opt)
case *LargeList:
r := right.(*LargeList)
return arrayApproxEqualLargeList(l, r, opt)
case *FixedSizeList:
r := right.(*FixedSizeList)
return arrayApproxEqualFixedSizeList(l, r, opt)
Expand Down Expand Up @@ -650,6 +656,25 @@ func arrayApproxEqualList(left, right *List, opt equalOption) bool {
return true
}

func arrayApproxEqualLargeList(left, right *LargeList, opt equalOption) bool {
for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
continue
}
o := func() bool {
l := left.newListValue(i)
defer l.Release()
r := right.newListValue(i)
defer r.Release()
return arrayApproxEqual(l, r, opt)
}()
if !o {
return false
}
}
return true
}

func arrayApproxEqualFixedSizeList(left, right *FixedSizeList, opt equalOption) bool {
for i := 0; i < left.Len(); i++ {
if left.IsNull(i) {
Expand Down
17 changes: 17 additions & 0 deletions go/arrow/array/concat.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,23 @@ func concat(data []arrow.ArrayData, mem memory.Allocator) (arrow.ArrayData, erro
defer c.Release()
}

out.buffers[1] = offsetBuffer
out.childData = make([]arrow.ArrayData, 1)
out.childData[0], err = concat(childData, mem)
if err != nil {
return nil, err
}
case *arrow.LargeListType:
offsetWidth := dt.Layout().Buffers[1].ByteWidth
offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem)
if err != nil {
return nil, err
}
childData := gatherChildrenRanges(data, 0, valueRanges)
for _, c := range childData {
defer c.Release()
}

out.buffers[1] = offsetBuffer
out.childData = make([]arrow.ArrayData, 1)
out.childData[0], err = concat(childData, mem)
Expand Down
37 changes: 37 additions & 0 deletions go/arrow/array/concat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func TestConcatenate(t *testing.T) {
{arrow.BinaryTypes.String},
{arrow.BinaryTypes.LargeString},
{arrow.ListOf(arrow.PrimitiveTypes.Int8)},
{arrow.LargeListOf(arrow.PrimitiveTypes.Int8)},
{arrow.FixedSizeListOf(3, arrow.PrimitiveTypes.Int8)},
{arrow.StructOf()},
{arrow.MapOf(arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Int8)},
Expand Down Expand Up @@ -158,6 +159,32 @@ func (cts *ConcatTestSuite) generateArr(size int64, nullprob float64) arrow.Arra
bldr := array.NewListBuilder(memory.DefaultAllocator, arrow.PrimitiveTypes.Int8)
defer bldr.Release()

valid := make([]bool, len(offsetsVector)-1)
for i := range valid {
valid[i] = true
}
bldr.AppendValues(offsetsVector, valid)
vb := bldr.ValueBuilder().(*array.Int8Builder)
for i := 0; i < values.Len(); i++ {
if values.IsValid(i) {
vb.Append(values.Value(i))
} else {
vb.AppendNull()
}
}
return bldr.NewArray()
case arrow.LARGE_LIST:
valuesSize := size * 8
values := cts.rng.Int8(valuesSize, 0, 127, nullprob).(*array.Int8)
defer values.Release()
offsetsVector := cts.largeoffsets(int64(valuesSize), int32(size))
// ensure the first and last offsets encompass the whole values
offsetsVector[0] = 0
offsetsVector[len(offsetsVector)-1] = int64(valuesSize)

bldr := array.NewLargeListBuilder(memory.DefaultAllocator, arrow.PrimitiveTypes.Int8)
defer bldr.Release()

valid := make([]bool, len(offsetsVector)-1)
for i := range valid {
valid[i] = true
Expand Down Expand Up @@ -263,6 +290,16 @@ func (cts *ConcatTestSuite) offsets(length, slicecount int32) []int32 {
return offsets
}

func (cts *ConcatTestSuite) largeoffsets(length int64, slicecount int32) []int64 {
offsets := make([]int64, slicecount+1)
dist := rand.New(rand.NewSource(cts.seed))
for i := range offsets {
offsets[i] = dist.Int63n(length + 1)
}
sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] })
return offsets
}

func (cts *ConcatTestSuite) TestCheckConcat() {
for _, sz := range cts.sizes {
cts.Run(fmt.Sprintf("size %d", sz), func() {
Expand Down
Loading

0 comments on commit db6c099

Please sign in to comment.