From bb31c9a2b68c3cf17ef82fb6b41544ad240419b5 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 29 Jul 2022 11:26:59 -0400 Subject: [PATCH] ARROW-17219: [Go][IPC] Endianness Conversion for Non-Native Endianness (#13716) Authored-by: Matt Topol Signed-off-by: Matt Topol --- .gitignore | 3 + dev/archery/archery/integration/runner.py | 2 - docs/source/status.rst | 4 +- go/arrow/array/array_test.go | 1 + go/arrow/array/data.go | 13 + go/arrow/datatype.go | 2 + go/arrow/endian/big.go | 7 +- go/arrow/endian/endian.go | 41 +++ go/arrow/endian/little.go | 7 +- .../internal/testing/types/extension_types.go | 36 +++ go/arrow/ipc/endian_swap.go | 144 +++++++++ go/arrow/ipc/endian_swap_test.go | 299 ++++++++++++++++++ go/arrow/ipc/file_reader.go | 29 +- go/arrow/ipc/ipc.go | 24 +- go/arrow/ipc/metadata.go | 5 +- go/arrow/ipc/reader.go | 16 +- go/arrow/schema.go | 41 ++- go/arrow/schema_test.go | 47 +++ 18 files changed, 688 insertions(+), 33 deletions(-) create mode 100644 go/arrow/endian/endian.go create mode 100644 go/arrow/ipc/endian_swap.go create mode 100644 go/arrow/ipc/endian_swap_test.go diff --git a/.gitignore b/.gitignore index 1406c30689f8c..103889cb9bc01 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,6 @@ cpp/Brewfile.lock.json java-dist/ java-native-c/ java-native-cpp/ + +# archery files +dev/archery/build \ No newline at end of file diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index e652ff88af58d..6d6adb3c29cd3 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -134,7 +134,6 @@ def _gold_tests(self, gold_dir): skip.add("Java") if prefix == '1.0.0-bigendian' or prefix == '1.0.0-littleendian': skip.add("C#") - skip.add("Go") skip.add("Java") skip.add("JS") skip.add("Rust") @@ -148,7 +147,6 @@ def _gold_tests(self, gold_dir): if prefix == '4.0.0-shareddict': skip.add("C#") - skip.add("Go") quirks = set() if prefix in {'0.14.1', '0.17.1', diff --git a/docs/source/status.rst b/docs/source/status.rst index 3c35a582c6524..0259538f87567 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -130,7 +130,7 @@ IPC Format +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Buffer compression | ✓ | ✓ (3) | ✓ | | | | ✓ | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ -| Endianness conversion | ✓ (2) | | | | | | | +| Endianness conversion | ✓ (2) | | ✓ (2) | | | | | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ | Custom schema metadata | ✓ | ✓ | ✓ | | ✓ | ✓ | ✓ | +-----------------------------+-------+-------+-------+------------+-------+-------+-------+ @@ -249,7 +249,7 @@ C Stream Interface | Feature | C++ | Python | R | Rust | Go | Java | C/GLib | Ruby | Julia | | | | | | | | | | | | +=============================+=====+========+===+======+====+======+========+======+=======+ -| Stream export | ✓ | ✓ | ✓ | ✓ | | | ✓ | ✓ | | +| Stream export | ✓ | ✓ | ✓ | ✓ | ✓ | | ✓ | ✓ | | +-----------------------------+-----+--------+---+------+----+------+--------+------+-------+ | Stream import | ✓ | ✓ | ✓ | ✓ | ✓ | | ✓ | ✓ | | +-----------------------------+-----+--------+---+------+----+------+--------+------+-------+ diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go index 316db2b034656..7c7a0085c748c 100644 --- a/go/arrow/array/array_test.go +++ b/go/arrow/array/array_test.go @@ -36,6 +36,7 @@ func (d *testDataType) Name() string { panic("implement me") } func (d *testDataType) BitWidth() int { return 8 } func (d *testDataType) Fingerprint() string { return "" } func (testDataType) Layout() arrow.DataTypeLayout { return arrow.DataTypeLayout{} } +func (testDataType) String() string { return "" } func TestMakeFromData(t *testing.T) { tests := []struct { diff --git a/go/arrow/array/data.go b/go/arrow/array/data.go index b7a1993c1a3b8..765c428689dee 100644 --- a/go/arrow/array/data.go +++ b/go/arrow/array/data.go @@ -77,6 +77,19 @@ func NewDataWithDictionary(dtype arrow.DataType, length int, buffers []*memory.B return data } +func (d *Data) Copy() *Data { + // don't pass the slices directly, otherwise it retains the connection + // we need to make new slices and populate them with the same pointers + bufs := make([]*memory.Buffer, len(d.buffers)) + copy(bufs, d.buffers) + children := make([]arrow.ArrayData, len(d.childData)) + copy(children, d.childData) + + data := NewData(d.dtype, d.length, bufs, children, d.nulls, d.offset) + data.SetDictionary(d.dictionary) + return data +} + // Reset sets the Data for re-use. func (d *Data) Reset(dtype arrow.DataType, length int, buffers []*memory.Buffer, childData []arrow.ArrayData, nulls, offset int) { // Retain new buffers before releasing existing buffers in-case they're the same ones to prevent accidental premature diff --git a/go/arrow/datatype.go b/go/arrow/datatype.go index 1503f655e7ce7..b20de8d5665b6 100644 --- a/go/arrow/datatype.go +++ b/go/arrow/datatype.go @@ -17,6 +17,7 @@ package arrow import ( + "fmt" "hash/maphash" "github.com/apache/arrow/go/v9/arrow/internal/debug" @@ -161,6 +162,7 @@ const ( // DataType is the representation of an Arrow type. type DataType interface { + fmt.Stringer ID() Type // Name is name of the data type. Name() string diff --git a/go/arrow/endian/big.go b/go/arrow/endian/big.go index ebd36539db052..0b92585745f42 100644 --- a/go/arrow/endian/big.go +++ b/go/arrow/endian/big.go @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build s390x // +build s390x package endian @@ -22,4 +23,8 @@ import "encoding/binary" var Native = binary.BigEndian -const IsBigEndian = true +const ( + IsBigEndian = true + NativeEndian = BigEndian + NonNativeEndian = LittleEndian +) diff --git a/go/arrow/endian/endian.go b/go/arrow/endian/endian.go new file mode 100644 index 0000000000000..39fe74c782ded --- /dev/null +++ b/go/arrow/endian/endian.go @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endian + +import ( + "github.com/apache/arrow/go/v9/arrow/internal/debug" + "github.com/apache/arrow/go/v9/arrow/internal/flatbuf" +) + +type Endianness flatbuf.Endianness + +const ( + LittleEndian Endianness = Endianness(flatbuf.EndiannessLittle) + BigEndian Endianness = Endianness(flatbuf.EndiannessBig) +) + +func (e Endianness) String() string { + switch e { + case LittleEndian: + return "little" + case BigEndian: + return "big" + default: + debug.Assert(false, "wtf? bad endianness value") + return "???" + } +} diff --git a/go/arrow/endian/little.go b/go/arrow/endian/little.go index d98b5c97a9d75..def1fc64b9e64 100644 --- a/go/arrow/endian/little.go +++ b/go/arrow/endian/little.go @@ -14,6 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build !s390x // +build !s390x package endian @@ -22,4 +23,8 @@ import "encoding/binary" var Native = binary.LittleEndian -var IsBigEndian = false +const ( + IsBigEndian = false + NativeEndian = LittleEndian + NonNativeEndian = BigEndian +) diff --git a/go/arrow/internal/testing/types/extension_types.go b/go/arrow/internal/testing/types/extension_types.go index c266de169e10b..6179d6bb5dca0 100644 --- a/go/arrow/internal/testing/types/extension_types.go +++ b/go/arrow/internal/testing/types/extension_types.go @@ -273,15 +273,51 @@ func (p *DictExtensionType) Deserialize(storage arrow.DataType, data string) (ar return NewDictExtensionType(), nil } +// SmallintArray is an int16 array +type SmallintArray struct { + array.ExtensionArrayBase +} + +type SmallintType struct { + arrow.ExtensionBase +} + +func NewSmallintType() *SmallintType { + return &SmallintType{ExtensionBase: arrow.ExtensionBase{ + Storage: arrow.PrimitiveTypes.Int16}} +} + +func (SmallintType) ArrayType() reflect.Type { return reflect.TypeOf(SmallintArray{}) } + +func (SmallintType) ExtensionName() string { return "smallint" } + +func (SmallintType) Serialize() string { return "smallint" } + +func (s *SmallintType) ExtensionEquals(other arrow.ExtensionType) bool { + return s.Name() == other.Name() +} + +func (SmallintType) Deserialize(storageType arrow.DataType, data string) (arrow.ExtensionType, error) { + if data != "smallint" { + return nil, fmt.Errorf("type identifier did not match: '%s'", data) + } + if !arrow.TypeEqual(storageType, arrow.PrimitiveTypes.Int16) { + return nil, fmt.Errorf("invalid storage type for SmallintType: %s", storageType) + } + return NewSmallintType(), nil +} + var ( _ arrow.ExtensionType = (*UUIDType)(nil) _ arrow.ExtensionType = (*Parametric1Type)(nil) _ arrow.ExtensionType = (*Parametric2Type)(nil) _ arrow.ExtensionType = (*ExtStructType)(nil) _ arrow.ExtensionType = (*DictExtensionType)(nil) + _ arrow.ExtensionType = (*SmallintType)(nil) _ array.ExtensionArray = (*UUIDArray)(nil) _ array.ExtensionArray = (*Parametric1Array)(nil) _ array.ExtensionArray = (*Parametric2Array)(nil) _ array.ExtensionArray = (*ExtStructArray)(nil) _ array.ExtensionArray = (*DictExtensionArray)(nil) + _ array.ExtensionArray = (*SmallintArray)(nil) ) diff --git a/go/arrow/ipc/endian_swap.go b/go/arrow/ipc/endian_swap.go new file mode 100644 index 0000000000000..4fec07d314510 --- /dev/null +++ b/go/arrow/ipc/endian_swap.go @@ -0,0 +1,144 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipc + +import ( + "errors" + "math/bits" + + "github.com/apache/arrow/go/v9/arrow" + "github.com/apache/arrow/go/v9/arrow/array" + "github.com/apache/arrow/go/v9/arrow/memory" +) + +// swap the endianness of the array's buffers as needed in-place to save +// the cost of reallocation. +// +// assumes that nested data buffers are never re-used, if an *array.Data +// child is re-used among the children or the dictionary then this might +// end up double-swapping (putting it back into the original endianness). +// if it is needed to support re-using the buffers, then this can be +// re-factored to instead return a NEW array.Data object with newly +// allocated buffers, rather than doing it in place. +// +// For now this is intended to be used by the IPC readers after loading +// arrays from an IPC message which currently is guaranteed to not re-use +// buffers between arrays. +func swapEndianArrayData(data *array.Data) error { + if data.Offset() != 0 { + return errors.New("unsupported data format: data.offset != 0") + } + if err := swapType(data.DataType(), data); err != nil { + return err + } + return swapChildren(data.Children()) +} + +func swapChildren(children []arrow.ArrayData) (err error) { + for i := range children { + if err = swapEndianArrayData(children[i].(*array.Data)); err != nil { + break + } + } + return +} + +func swapType(dt arrow.DataType, data *array.Data) (err error) { + switch dt.ID() { + case arrow.BINARY, arrow.STRING: + swapOffsets(1, data) + return + case arrow.NULL, arrow.BOOL, arrow.INT8, arrow.UINT8, + arrow.FIXED_SIZE_BINARY, arrow.FIXED_SIZE_LIST, arrow.STRUCT: + return + case arrow.DENSE_UNION, arrow.SPARSE_UNION: + panic("arrow endian swap not yet implemented for union types") + case arrow.LARGE_BINARY, arrow.LARGE_LIST, arrow.LARGE_STRING: + panic("arrow endian swap not yet implemented for large types") + } + + switch dt := dt.(type) { + case *arrow.Decimal128Type: + rawdata := arrow.Uint64Traits.CastFromBytes(data.Buffers()[1].Bytes()) + length := data.Buffers()[1].Len() / arrow.Decimal128SizeBytes + for i := 0; i < length; i++ { + idx := i * 2 + tmp := bits.ReverseBytes64(rawdata[idx]) + rawdata[idx] = bits.ReverseBytes64(rawdata[idx+1]) + rawdata[idx+1] = tmp + } + case *arrow.ListType: + swapOffsets(1, data) + case *arrow.MapType: + swapOffsets(1, data) + case *arrow.DayTimeIntervalType: + byteSwapBuffer(32, data.Buffers()[1]) + case *arrow.MonthDayNanoIntervalType: + rawdata := arrow.MonthDayNanoIntervalTraits.CastFromBytes(data.Buffers()[1].Bytes()) + for i, tmp := range rawdata { + rawdata[i].Days = int32(bits.ReverseBytes32(uint32(tmp.Days))) + rawdata[i].Months = int32(bits.ReverseBytes32(uint32(tmp.Months))) + rawdata[i].Nanoseconds = int64(bits.ReverseBytes64(uint64(tmp.Nanoseconds))) + } + case arrow.ExtensionType: + return swapType(dt.StorageType(), data) + case *arrow.DictionaryType: + // dictionary itself was already swapped in ReadDictionary calls + return swapType(dt.IndexType, data) + case arrow.FixedWidthDataType: + byteSwapBuffer(dt.BitWidth(), data.Buffers()[1]) + } + return +} + +// this can get called on an invalid Array Data object by the IPC reader, +// so we won't rely on the data.length and will instead rely on the buffer's +// own size instead. +func byteSwapBuffer(bw int, buf *memory.Buffer) { + if bw == 1 || buf == nil { + // if byte width == 1, no need to swap anything + return + } + + switch bw { + case 16: + data := arrow.Uint16Traits.CastFromBytes(buf.Bytes()) + for i := range data { + data[i] = bits.ReverseBytes16(data[i]) + } + case 32: + data := arrow.Uint32Traits.CastFromBytes(buf.Bytes()) + for i := range data { + data[i] = bits.ReverseBytes32(data[i]) + } + case 64: + data := arrow.Uint64Traits.CastFromBytes(buf.Bytes()) + for i := range data { + data[i] = bits.ReverseBytes64(data[i]) + } + } +} + +func swapOffsets(index int, data *array.Data) { + if data.Buffers()[index] == nil || data.Buffers()[index].Len() == 0 { + return + } + + // other than unions, offset has one more element than the data.length + // don't yet implement large types, so hardcode 32bit offsets for now + byteSwapBuffer(32, data.Buffers()[index]) +} diff --git a/go/arrow/ipc/endian_swap_test.go b/go/arrow/ipc/endian_swap_test.go new file mode 100644 index 0000000000000..2c62e5d59cd9e --- /dev/null +++ b/go/arrow/ipc/endian_swap_test.go @@ -0,0 +1,299 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ipc + +import ( + "strings" + "testing" + + "github.com/apache/arrow/go/v9/arrow" + "github.com/apache/arrow/go/v9/arrow/array" + "github.com/apache/arrow/go/v9/arrow/endian" + "github.com/apache/arrow/go/v9/arrow/internal/testing/types" + "github.com/apache/arrow/go/v9/arrow/memory" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func AssertArrayDataEqualWithSwappedEndian(t *testing.T, target, expected arrow.ArrayData) bool { + assert.NoError(t, swapEndianArrayData(target.(*array.Data))) + swappedArray := array.MakeFromData(target) + defer swappedArray.Release() + + expectedArray := array.MakeFromData(expected) + defer expectedArray.Release() + + return assert.Truef(t, array.Equal(swappedArray, expectedArray), "got: %s, expected: %s\n", swappedArray, expectedArray) +} + +func TestSwapEndianPrimitiveArrayData(t *testing.T) { + nullBuffer := memory.NewBufferBytes([]byte("\xff")) + + tests := []struct { + dt arrow.DataType + len int + input, expected string + }{ + {arrow.Null, 0, "", ""}, + {arrow.PrimitiveTypes.Int32, 0, "", ""}, + {arrow.FixedWidthTypes.Boolean, 8, "01234567", "01234567"}, + {arrow.PrimitiveTypes.Int8, 8, "01234567", "01234567"}, + {arrow.PrimitiveTypes.Uint16, 4, "01234567", "10325476"}, + {arrow.PrimitiveTypes.Int32, 2, "01234567", "32107654"}, + {arrow.PrimitiveTypes.Uint64, 1, "01234567", "76543210"}, + {&arrow.Decimal128Type{Precision: 38, Scale: 10}, 1, "0123456789abcdef", "fedcba9876543210"}, + {arrow.PrimitiveTypes.Float32, 2, "01200560", "02100650"}, + {arrow.PrimitiveTypes.Float64, 1, "01200560", "06500210"}, + } + + for _, tt := range tests { + t.Run(tt.dt.String(), func(t *testing.T) { + var target, expected arrow.ArrayData + if tt.dt == arrow.Null { + target = array.NewData(arrow.Null, 0, []*memory.Buffer{nil}, nil, 0, 0) + expected = target + } else { + target = array.NewData(tt.dt, tt.len, []*memory.Buffer{nullBuffer, memory.NewBufferBytes([]byte(tt.input))}, nil, 0, 0) + expected = array.NewData(tt.dt, tt.len, []*memory.Buffer{nullBuffer, memory.NewBufferBytes([]byte(tt.expected))}, nil, 0, 0) + defer target.Release() + defer expected.Release() + } + AssertArrayDataEqualWithSwappedEndian(t, target, expected) + }) + } + + data := array.NewData(arrow.PrimitiveTypes.Int64, 1, []*memory.Buffer{nullBuffer, memory.NewBufferBytes([]byte("01234567"))}, nil, 0, 1) + assert.Error(t, swapEndianArrayData(data)) +} + +func replaceBuffer(data *array.Data, idx int, bufdata []byte) *array.Data { + out := data.Copy() + buffers := out.Buffers() + buffers[idx].Release() + buffers[idx] = memory.NewBufferBytes(bufdata) + return out +} + +func replaceBuffersInChild(data *array.Data, childIdx int, bufdata []byte) *array.Data { + out := data.Copy() + // assume updating only buffer[1] in child data + children := out.Children() + child := children[childIdx].(*array.Data).Copy() + children[childIdx].Release() + child.Buffers()[1].Release() + child.Buffers()[1] = memory.NewBufferBytes(bufdata) + children[childIdx] = child + + return out +} + +func replaceBuffersInDict(data *array.Data, bufferIdx int, bufdata []byte) *array.Data { + out := data.Copy() + dictData := out.Dictionary().(*array.Data).Copy() + dictData.Buffers()[bufferIdx].Release() + dictData.Buffers()[bufferIdx] = memory.NewBufferBytes(bufdata) + defer dictData.Release() + out.SetDictionary(dictData) + return out +} + +func TestSwapEndianArrayDataBinary(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + // binary type assumes the json string data is base64 encoded + // MDEyMw== -> 0123 + // NDU= -> 45 + arr, _, err := array.FromJSON(mem, arrow.BinaryTypes.Binary, strings.NewReader(`["MDEyMw==", null, "NDU="]`)) + require.NoError(t, err) + defer arr.Release() + + var offsets []byte + if endian.IsBigEndian { + offsets = []byte{0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6, 0, 0, 0} + } else { + offsets = []byte{0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6} + } + expected := arr.Data().(*array.Data) + test := replaceBuffer(expected, 1, offsets) + defer test.Release() + AssertArrayDataEqualWithSwappedEndian(t, test, expected) +} + +func TestSwapEndianArrayString(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + arr, _, err := array.FromJSON(mem, arrow.BinaryTypes.String, strings.NewReader(`["ABCD", null, "EF"]`)) + require.NoError(t, err) + defer arr.Release() + + var offsets []byte + if endian.IsBigEndian { + offsets = []byte{0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6, 0, 0, 0} + } else { + offsets = []byte{0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6} + } + + expected := arr.Data().(*array.Data) + test := replaceBuffer(expected, 1, offsets) + defer test.Release() + AssertArrayDataEqualWithSwappedEndian(t, test, expected) +} + +func TestSwapEndianArrayListType(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + dt := arrow.ListOf(arrow.PrimitiveTypes.Int32) + arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[[0, 1, 2, 3], null, [4, 5]]`)) + require.NoError(t, err) + defer arr.Release() + + var ( + offsets, data []byte + ) + if endian.IsBigEndian { + offsets = []byte{0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6, 0, 0, 0} + data = []byte{0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5, 0, 0, 0} + } else { + offsets = []byte{0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 6} + data = []byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0, 4, 0, 0, 0, 5} + } + + expected := arr.Data().(*array.Data) + test := replaceBuffer(expected, 1, offsets) + defer test.Release() + test = replaceBuffersInChild(test, 0, data) + defer test.Release() + + AssertArrayDataEqualWithSwappedEndian(t, test, expected) +} + +func TestSwapEndianArrayFixedSizeList(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + dt := arrow.FixedSizeListOf(2, arrow.PrimitiveTypes.Int32) + arr, _, err := array.FromJSON(mem, dt, strings.NewReader(`[[0, 1], null, [2, 3]]`)) + require.NoError(t, err) + defer arr.Release() + + var data []byte + if endian.IsBigEndian { + data = []byte{0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0} + } else { + data = []byte{0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3} + } + + expected := arr.Data().(*array.Data) + test := replaceBuffersInChild(expected, 0, data) + defer test.Release() + + AssertArrayDataEqualWithSwappedEndian(t, test, expected) +} + +func TestSwapEndianArrayDictType(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + dt := &arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.PrimitiveTypes.Int16} + dict, _, err := array.FromJSON(mem, dt.ValueType, strings.NewReader(`[4, 5, 6, 7]`)) + require.NoError(t, err) + defer dict.Release() + + indices, _, _ := array.FromJSON(mem, dt.IndexType, strings.NewReader("[0, 2, 3]")) + defer indices.Release() + + arr := array.NewDictionaryArray(dt, indices, dict) + defer arr.Release() + + var ( + data1, data2 []byte + ) + if endian.IsBigEndian { + data1 = []byte{0, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0} + data2 = []byte{4, 0, 5, 0, 6, 0, 7, 0} + } else { + data1 = []byte{0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 3} + data2 = []byte{0, 4, 0, 5, 0, 6, 0, 7} + } + + expected := arr.Data().(*array.Data) + test := replaceBuffer(expected, 1, data1) + defer test.Release() + test = replaceBuffersInDict(test, 1, data2) + defer test.Release() + + // dictionary must be explicitly swapped! + assert.NoError(t, swapEndianArrayData(test.Dictionary().(*array.Data))) + AssertArrayDataEqualWithSwappedEndian(t, test, expected) +} + +func TestSwapEndianArrayStruct(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + arr, _, err := array.FromJSON(mem, arrow.StructOf( + arrow.Field{Name: "a", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, + arrow.Field{Name: "b", Type: arrow.BinaryTypes.String, Nullable: true}, + ), strings.NewReader(`[{"a": 4, "b": null}, {"a": null, "b": "foo"}]`)) + require.NoError(t, err) + defer arr.Release() + + var data1, data2 []byte + if endian.IsBigEndian { + data1 = []byte{4, 0, 0, 0, 0, 0, 0, 0} + data2 = []byte{0, 0, 0, 0, 0, 0, 0, 0, 3, 0, 0, 0} + } else { + data1 = []byte{0, 0, 0, 4, 0, 0, 0, 0} + data2 = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 3} + } + + expected := arr.Data().(*array.Data) + test := replaceBuffersInChild(expected, 0, data1) + defer test.Release() + test = replaceBuffersInChild(test, 1, data2) + defer test.Release() + AssertArrayDataEqualWithSwappedEndian(t, test, expected) +} + +func TestSwapEndianArrayExtensionType(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.DefaultAllocator) + defer mem.AssertSize(t, 0) + + arrInt16, _, _ := array.FromJSON(mem, arrow.PrimitiveTypes.Int16, strings.NewReader(`[0, 1, 2, 3]`)) + defer arrInt16.Release() + + extData := array.NewData(types.NewSmallintType(), arrInt16.Len(), arrInt16.Data().Buffers(), nil, 0, 0) + defer extData.Release() + + arr := array.MakeFromData(extData) + defer arr.Release() + + var data []byte + if endian.IsBigEndian { + data = []byte{0, 0, 1, 0, 2, 0, 3, 0} + } else { + data = []byte{0, 0, 0, 1, 0, 2, 0, 3} + } + + expected := arr.Data().(*array.Data) + test := replaceBuffer(expected, 1, data) + defer test.Release() + AssertArrayDataEqualWithSwappedEndian(t, test, expected) +} diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index ab6e7bf108faa..6e5439a75b41a 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -26,6 +26,7 @@ import ( "github.com/apache/arrow/go/v9/arrow" "github.com/apache/arrow/go/v9/arrow/array" "github.com/apache/arrow/go/v9/arrow/bitutil" + "github.com/apache/arrow/go/v9/arrow/endian" "github.com/apache/arrow/go/v9/arrow/internal/dictutils" "github.com/apache/arrow/go/v9/arrow/internal/flatbuf" "github.com/apache/arrow/go/v9/arrow/memory" @@ -50,7 +51,8 @@ type FileReader struct { irec int // current record index. used for the arrio.Reader interface err error // last error - mem memory.Allocator + mem memory.Allocator + swapEndianness bool } // NewFileReader opens an Arrow file using the provided reader r. @@ -79,7 +81,7 @@ func NewFileReader(r ReadAtSeeker, opts ...Option) (*FileReader, error) { return nil, fmt.Errorf("arrow/ipc: could not decode footer: %w", err) } - err = f.readSchema() + err = f.readSchema(cfg.ensureNativeEndian) if err != nil { return nil, fmt.Errorf("arrow/ipc: could not decode schema: %w", err) } @@ -131,7 +133,7 @@ func (f *FileReader) readFooter() error { return err } -func (f *FileReader) readSchema() error { +func (f *FileReader) readSchema(ensureNativeEndian bool) error { var ( err error kind dictutils.Kind @@ -146,6 +148,11 @@ func (f *FileReader) readSchema() error { return fmt.Errorf("arrow/ipc: could not read schema: %w", err) } + if ensureNativeEndian && !f.schema.IsNativeEndian() { + f.swapEndianness = true + f.schema = f.schema.WithEndianness(endian.NativeEndian) + } + for i := 0; i < f.NumDictionaries(); i++ { blk, err := f.dict(i) if err != nil { @@ -165,7 +172,7 @@ func (f *FileReader) readSchema() error { return err } - kind, err = readDictionary(&f.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), f.mem) + kind, err = readDictionary(&f.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), f.swapEndianness, f.mem) if err != nil { return err } @@ -293,7 +300,7 @@ func (f *FileReader) RecordAt(i int) (arrow.Record, error) { return nil, fmt.Errorf("arrow/ipc: message %d is not a Record", i) } - return newRecord(f.schema, &f.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), f.mem), nil + return newRecord(f.schema, &f.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), f.swapEndianness, f.mem), nil } // Read reads the current record from the underlying stream and an error, if any. @@ -315,7 +322,7 @@ func (f *FileReader) ReadAt(i int64) (arrow.Record, error) { return f.Record(int(i)) } -func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buffer, body ReadAtSeeker, mem memory.Allocator) arrow.Record { +func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buffer, body ReadAtSeeker, swapEndianness bool, mem memory.Allocator) arrow.Record { var ( msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) md flatbuf.RecordBatch @@ -351,6 +358,10 @@ func newRecord(schema *arrow.Schema, memo *dictutils.Memo, meta *memory.Buffer, panic(err) } + if swapEndianness { + swapEndianArrayData(data.(*array.Data)) + } + cols[i] = array.MakeFromData(data) defer cols[i].Release() } @@ -598,7 +609,7 @@ func (ctx *arrayLoaderContext) loadStruct(dt *arrow.StructType) arrow.ArrayData return array.NewData(dt, int(field.Length()), buffers, subs, int(field.NullCount()), 0) } -func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body ReadAtSeeker, mem memory.Allocator) (dictutils.Kind, error) { +func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body ReadAtSeeker, swapEndianness bool, mem memory.Allocator) (dictutils.Kind, error) { var ( msg = flatbuf.GetRootAsMessage(meta.Bytes(), 0) md flatbuf.DictionaryBatch @@ -635,6 +646,10 @@ func readDictionary(memo *dictutils.Memo, meta *memory.Buffer, body ReadAtSeeker dict := ctx.loadArray(valueType) defer dict.Release() + if swapEndianness { + swapEndianArrayData(dict.(*array.Data)) + } + if md.IsDelta() { memo.AddDelta(id, dict) return dictutils.KindDelta, nil diff --git a/go/arrow/ipc/ipc.go b/go/arrow/ipc/ipc.go index 65207a6d6589a..87c37cfdf7346 100644 --- a/go/arrow/ipc/ipc.go +++ b/go/arrow/ipc/ipc.go @@ -66,14 +66,16 @@ type config struct { footer struct { offset int64 } - codec flatbuf.CompressionType - compressNP int + codec flatbuf.CompressionType + compressNP int + ensureNativeEndian bool } func newConfig(opts ...Option) *config { cfg := &config{ - alloc: memory.NewGoAllocator(), - codec: -1, // uncompressed + alloc: memory.NewGoAllocator(), + codec: -1, // uncompressed + ensureNativeEndian: true, } for _, opt := range opts { @@ -134,6 +136,20 @@ func WithCompressConcurrency(n int) Option { } } +// WithEnsureNativeEndian specifies whether or not to automatically byte-swap +// buffers with endian-sensitive data if the schema's endianness is not the +// platform-native endianness. This includes all numeric types, temporal types, +// decimal types, as well as the offset buffers of variable-sized binary and +// list-like types. +// +// This is only relevant to ipc Reader objects, not to writers. This defaults +// to true. +func WithEnsureNativeEndian(v bool) Option { + return func(cfg *config) { + cfg.ensureNativeEndian = v + } +} + var ( _ arrio.Reader = (*Reader)(nil) _ arrio.Writer = (*Writer)(nil) diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go index eaf2228f3786d..c074a2f98fdf0 100644 --- a/go/arrow/ipc/metadata.go +++ b/go/arrow/ipc/metadata.go @@ -23,6 +23,7 @@ import ( "sort" "github.com/apache/arrow/go/v9/arrow" + "github.com/apache/arrow/go/v9/arrow/endian" "github.com/apache/arrow/go/v9/arrow/internal/dictutils" "github.com/apache/arrow/go/v9/arrow/internal/flatbuf" "github.com/apache/arrow/go/v9/arrow/memory" @@ -922,7 +923,7 @@ func schemaFromFB(schema *flatbuf.Schema, memo *dictutils.Memo) (*arrow.Schema, return nil, fmt.Errorf("arrow/ipc: could not convert schema metadata from flatbuf: %w", err) } - return arrow.NewSchema(fields, &md), nil + return arrow.NewSchemaWithEndian(fields, &md, endian.Endianness(schema.Endianness())), nil } func schemaToFB(b *flatbuffers.Builder, schema *arrow.Schema, memo *dictutils.Mapper) flatbuffers.UOffsetT { @@ -941,7 +942,7 @@ func schemaToFB(b *flatbuffers.Builder, schema *arrow.Schema, memo *dictutils.Ma metaFB := metadataToFB(b, schema.Metadata(), flatbuf.SchemaStartCustomMetadataVector) flatbuf.SchemaStart(b) - flatbuf.SchemaAddEndianness(b, flatbuf.EndiannessLittle) + flatbuf.SchemaAddEndianness(b, flatbuf.Endianness(schema.Endianness())) flatbuf.SchemaAddFields(b, fieldsFB) flatbuf.SchemaAddCustomMetadata(b, metaFB) offset := flatbuf.SchemaEnd(b) diff --git a/go/arrow/ipc/reader.go b/go/arrow/ipc/reader.go index 69f1097eac154..12b0eb52a671f 100644 --- a/go/arrow/ipc/reader.go +++ b/go/arrow/ipc/reader.go @@ -25,6 +25,7 @@ import ( "github.com/apache/arrow/go/v9/arrow" "github.com/apache/arrow/go/v9/arrow/array" + "github.com/apache/arrow/go/v9/arrow/endian" "github.com/apache/arrow/go/v9/arrow/internal/debug" "github.com/apache/arrow/go/v9/arrow/internal/dictutils" "github.com/apache/arrow/go/v9/arrow/internal/flatbuf" @@ -45,10 +46,10 @@ type Reader struct { // types dictTypeMap memo dictutils.Memo readInitialDicts bool + done bool + swapEndianness bool mem memory.Allocator - - done bool } // NewReaderFromMessageReader allows constructing a new reader object with the @@ -78,6 +79,11 @@ func NewReaderFromMessageReader(r MessageReader, opts ...Option) (reader *Reader return nil, fmt.Errorf("arrow/ipc: could not read schema from stream: %w", err) } + if cfg.ensureNativeEndian && !rr.schema.IsNativeEndian() { + rr.swapEndianness = true + rr.schema = rr.schema.WithEndianness(endian.NativeEndian) + } + return rr, nil } @@ -180,7 +186,7 @@ func (r *Reader) getInitialDicts() bool { if msg.Type() != MessageDictionaryBatch { r.err = fmt.Errorf("arrow/ipc: IPC stream did not have the expected (%d) dictionaries at the start of the stream", numDicts) } - if _, err := readDictionary(&r.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), r.mem); err != nil { + if _, err := readDictionary(&r.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), r.swapEndianness, r.mem); err != nil { r.done = true r.err = err return false @@ -205,7 +211,7 @@ func (r *Reader) next() bool { msg, r.err = r.r.Message() for msg != nil && msg.Type() == MessageDictionaryBatch { - if _, r.err = readDictionary(&r.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), r.mem); r.err != nil { + if _, r.err = readDictionary(&r.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), r.swapEndianness, r.mem); r.err != nil { r.done = true return false } @@ -224,7 +230,7 @@ func (r *Reader) next() bool { return false } - r.rec = newRecord(r.schema, &r.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), r.mem) + r.rec = newRecord(r.schema, &r.memo, msg.meta, bytes.NewReader(msg.body.Bytes()), r.swapEndianness, r.mem) return true } diff --git a/go/arrow/schema.go b/go/arrow/schema.go index 80a45bda0e61b..b2181313a16fc 100644 --- a/go/arrow/schema.go +++ b/go/arrow/schema.go @@ -20,6 +20,8 @@ import ( "fmt" "sort" "strings" + + "github.com/apache/arrow/go/v9/arrow/endian" ) type Metadata struct { @@ -136,18 +138,24 @@ func (md Metadata) Equal(rhs Metadata) bool { // Schema is a sequence of Field values, describing the columns of a table or // a record batch. type Schema struct { - fields []Field - index map[string][]int - meta Metadata + fields []Field + index map[string][]int + meta Metadata + endianness endian.Endianness } // NewSchema returns a new Schema value from the slice of fields and metadata. // // NewSchema panics if there is a field with an invalid DataType. func NewSchema(fields []Field, metadata *Metadata) *Schema { + return NewSchemaWithEndian(fields, metadata, endian.NativeEndian) +} + +func NewSchemaWithEndian(fields []Field, metadata *Metadata, e endian.Endianness) *Schema { sc := &Schema{ - fields: make([]Field, 0, len(fields)), - index: make(map[string][]int, len(fields)), + fields: make([]Field, 0, len(fields)), + index: make(map[string][]int, len(fields)), + endianness: e, } if metadata != nil { sc.meta = metadata.clone() @@ -162,9 +170,15 @@ func NewSchema(fields []Field, metadata *Metadata) *Schema { return sc } -func (sc *Schema) Metadata() Metadata { return sc.meta } -func (sc *Schema) Fields() []Field { return sc.fields } -func (sc *Schema) Field(i int) Field { return sc.fields[i] } +func (sc *Schema) WithEndianness(e endian.Endianness) *Schema { + return NewSchemaWithEndian(sc.fields, &sc.meta, e) +} + +func (sc *Schema) Endianness() endian.Endianness { return sc.endianness } +func (sc *Schema) IsNativeEndian() bool { return sc.endianness == endian.NativeEndian } +func (sc *Schema) Metadata() Metadata { return sc.meta } +func (sc *Schema) Fields() []Field { return sc.fields } +func (sc *Schema) Field(i int) Field { return sc.fields[i] } func (sc *Schema) FieldsByName(n string) ([]Field, bool) { indices, ok := sc.index[n] @@ -196,6 +210,8 @@ func (sc *Schema) Equal(o *Schema) bool { return false case len(sc.fields) != len(o.fields): return false + case sc.endianness != o.endianness: + return false } for i := range sc.fields { @@ -215,6 +231,9 @@ func (s *Schema) String() string { } fmt.Fprintf(o, " - %v", f) } + if s.endianness != endian.NativeEndian { + fmt.Fprintf(o, "\n endianness: %v", s.endianness) + } if meta := s.Metadata(); meta.Len() > 0 { fmt.Fprintf(o, "\n metadata: %v", meta) } @@ -237,7 +256,11 @@ func (s *Schema) Fingerprint() string { b.WriteString(fieldFingerprint) b.WriteByte(';') } - // endianness + if s.endianness == endian.LittleEndian { + b.WriteByte('L') + } else { + b.WriteByte('B') + } b.WriteByte('}') return b.String() } diff --git a/go/arrow/schema_test.go b/go/arrow/schema_test.go index 0c7dc90745733..fe1b7cac249cc 100644 --- a/go/arrow/schema_test.go +++ b/go/arrow/schema_test.go @@ -20,6 +20,8 @@ import ( "fmt" "reflect" "testing" + + "github.com/apache/arrow/go/v9/arrow/endian" ) func TestMetadata(t *testing.T) { @@ -135,6 +137,7 @@ func TestSchema(t *testing.T) { md *Metadata err error serialize string + addEndian bool }{ { fields: []Field{ @@ -185,6 +188,27 @@ func TestSchema(t *testing.T) { - dup: type=int32 - dup: type=int64`, }, + { + fields: []Field{ + {Name: "f1", Type: PrimitiveTypes.Int32, Nullable: true}, + {Name: "f2", Type: PrimitiveTypes.Uint8}, + {Name: "f3", Type: BinaryTypes.String, Nullable: true}, + {Name: "f4", Type: ListOf(PrimitiveTypes.Int16), Nullable: true}, + }, + md: func() *Metadata { + md := MetadataFrom(map[string]string{"k1": "v1", "k2": "v2"}) + return &md + }(), + addEndian: true, // only print endianness if non-native endian + serialize: `schema: + fields: 4 + - f1: type=int32, nullable + - f2: type=uint8 + - f3: type=utf8, nullable + - f4: type=list, nullable + endianness: ` + endian.NonNativeEndian.String() + ` + metadata: ["k1": "v1", "k2": "v2"]`, + }, } { t.Run("", func(t *testing.T) { if tc.err != nil { @@ -209,6 +233,9 @@ func TestSchema(t *testing.T) { } s := NewSchema(tc.fields, tc.md) + if tc.addEndian { + s = s.WithEndianness(endian.NonNativeEndian) + } if got, want := len(s.Fields()), len(tc.fields); got != want { t.Fatalf("invalid number of fields. got=%d, want=%d", got, want) @@ -342,6 +369,26 @@ func TestSchemaEqual(t *testing.T) { }, md), want: false, }, + { + a: NewSchemaWithEndian(fields, nil, endian.LittleEndian), + b: NewSchemaWithEndian(fields, nil, endian.LittleEndian), + want: true, + }, + { + a: NewSchemaWithEndian(fields, nil, endian.LittleEndian), + b: NewSchemaWithEndian(fields, nil, endian.BigEndian), + want: false, + }, + { + a: NewSchemaWithEndian(fields, nil, endian.LittleEndian), + b: NewSchema(fields, nil), + want: !endian.IsBigEndian, + }, + { + a: NewSchemaWithEndian(fields, nil, endian.BigEndian), + b: NewSchema(fields, nil), + want: endian.IsBigEndian, + }, } { t.Run("", func(t *testing.T) { if !tc.a.Equal(tc.a) {