Skip to content

Commit

Permalink
ARROW-17219: [Go][IPC] Endianness Conversion for Non-Native Endianness (
Browse files Browse the repository at this point in the history
#13716)

Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored Jul 29, 2022
1 parent a4d4bd0 commit bb31c9a
Show file tree
Hide file tree
Showing 18 changed files with 688 additions and 33 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,6 @@ cpp/Brewfile.lock.json
java-dist/
java-native-c/
java-native-cpp/

# archery files
dev/archery/build
2 changes: 0 additions & 2 deletions dev/archery/archery/integration/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ def _gold_tests(self, gold_dir):
skip.add("Java")
if prefix == '1.0.0-bigendian' or prefix == '1.0.0-littleendian':
skip.add("C#")
skip.add("Go")
skip.add("Java")
skip.add("JS")
skip.add("Rust")
Expand All @@ -148,7 +147,6 @@ def _gold_tests(self, gold_dir):

if prefix == '4.0.0-shareddict':
skip.add("C#")
skip.add("Go")

quirks = set()
if prefix in {'0.14.1', '0.17.1',
Expand Down
4 changes: 2 additions & 2 deletions docs/source/status.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ IPC Format
+-----------------------------+-------+-------+-------+------------+-------+-------+-------+
| Buffer compression || ✓ (3) || | | ||
+-----------------------------+-------+-------+-------+------------+-------+-------+-------+
| Endianness conversion | ✓ (2) | | | | | | |
| Endianness conversion | ✓ (2) | | ✓ (2) | | | | |
+-----------------------------+-------+-------+-------+------------+-------+-------+-------+
| Custom schema metadata |||| ||||
+-----------------------------+-------+-------+-------+------------+-------+-------+-------+
Expand Down Expand Up @@ -249,7 +249,7 @@ C Stream Interface
| Feature | C++ | Python | R | Rust | Go | Java | C/GLib | Ruby | Julia |
| | | | | | | | | | |
+=============================+=====+========+===+======+====+======+========+======+=======+
| Stream export ||||| | ||| |
| Stream export ||||| | ||| |
+-----------------------------+-----+--------+---+------+----+------+--------+------+-------+
| Stream import |||||| ||| |
+-----------------------------+-----+--------+---+------+----+------+--------+------+-------+
Expand Down
1 change: 1 addition & 0 deletions go/arrow/array/array_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (d *testDataType) Name() string { panic("implement me") }
func (d *testDataType) BitWidth() int { return 8 }
func (d *testDataType) Fingerprint() string { return "" }
func (testDataType) Layout() arrow.DataTypeLayout { return arrow.DataTypeLayout{} }
func (testDataType) String() string { return "" }

func TestMakeFromData(t *testing.T) {
tests := []struct {
Expand Down
13 changes: 13 additions & 0 deletions go/arrow/array/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ func NewDataWithDictionary(dtype arrow.DataType, length int, buffers []*memory.B
return data
}

func (d *Data) Copy() *Data {
// don't pass the slices directly, otherwise it retains the connection
// we need to make new slices and populate them with the same pointers
bufs := make([]*memory.Buffer, len(d.buffers))
copy(bufs, d.buffers)
children := make([]arrow.ArrayData, len(d.childData))
copy(children, d.childData)

data := NewData(d.dtype, d.length, bufs, children, d.nulls, d.offset)
data.SetDictionary(d.dictionary)
return data
}

// Reset sets the Data for re-use.
func (d *Data) Reset(dtype arrow.DataType, length int, buffers []*memory.Buffer, childData []arrow.ArrayData, nulls, offset int) {
// Retain new buffers before releasing existing buffers in-case they're the same ones to prevent accidental premature
Expand Down
2 changes: 2 additions & 0 deletions go/arrow/datatype.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package arrow

import (
"fmt"
"hash/maphash"

"github.com/apache/arrow/go/v9/arrow/internal/debug"
Expand Down Expand Up @@ -161,6 +162,7 @@ const (

// DataType is the representation of an Arrow type.
type DataType interface {
fmt.Stringer
ID() Type
// Name is name of the data type.
Name() string
Expand Down
7 changes: 6 additions & 1 deletion go/arrow/endian/big.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build s390x
// +build s390x

package endian
Expand All @@ -22,4 +23,8 @@ import "encoding/binary"

var Native = binary.BigEndian

const IsBigEndian = true
const (
IsBigEndian = true
NativeEndian = BigEndian
NonNativeEndian = LittleEndian
)
41 changes: 41 additions & 0 deletions go/arrow/endian/endian.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package endian

import (
"github.com/apache/arrow/go/v9/arrow/internal/debug"
"github.com/apache/arrow/go/v9/arrow/internal/flatbuf"
)

type Endianness flatbuf.Endianness

const (
LittleEndian Endianness = Endianness(flatbuf.EndiannessLittle)
BigEndian Endianness = Endianness(flatbuf.EndiannessBig)
)

func (e Endianness) String() string {
switch e {
case LittleEndian:
return "little"
case BigEndian:
return "big"
default:
debug.Assert(false, "wtf? bad endianness value")
return "???"
}
}
7 changes: 6 additions & 1 deletion go/arrow/endian/little.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !s390x
// +build !s390x

package endian
Expand All @@ -22,4 +23,8 @@ import "encoding/binary"

var Native = binary.LittleEndian

var IsBigEndian = false
const (
IsBigEndian = false
NativeEndian = LittleEndian
NonNativeEndian = BigEndian
)
36 changes: 36 additions & 0 deletions go/arrow/internal/testing/types/extension_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,15 +273,51 @@ func (p *DictExtensionType) Deserialize(storage arrow.DataType, data string) (ar
return NewDictExtensionType(), nil
}

// SmallintArray is an int16 array
type SmallintArray struct {
array.ExtensionArrayBase
}

type SmallintType struct {
arrow.ExtensionBase
}

func NewSmallintType() *SmallintType {
return &SmallintType{ExtensionBase: arrow.ExtensionBase{
Storage: arrow.PrimitiveTypes.Int16}}
}

func (SmallintType) ArrayType() reflect.Type { return reflect.TypeOf(SmallintArray{}) }

func (SmallintType) ExtensionName() string { return "smallint" }

func (SmallintType) Serialize() string { return "smallint" }

func (s *SmallintType) ExtensionEquals(other arrow.ExtensionType) bool {
return s.Name() == other.Name()
}

func (SmallintType) Deserialize(storageType arrow.DataType, data string) (arrow.ExtensionType, error) {
if data != "smallint" {
return nil, fmt.Errorf("type identifier did not match: '%s'", data)
}
if !arrow.TypeEqual(storageType, arrow.PrimitiveTypes.Int16) {
return nil, fmt.Errorf("invalid storage type for SmallintType: %s", storageType)
}
return NewSmallintType(), nil
}

var (
_ arrow.ExtensionType = (*UUIDType)(nil)
_ arrow.ExtensionType = (*Parametric1Type)(nil)
_ arrow.ExtensionType = (*Parametric2Type)(nil)
_ arrow.ExtensionType = (*ExtStructType)(nil)
_ arrow.ExtensionType = (*DictExtensionType)(nil)
_ arrow.ExtensionType = (*SmallintType)(nil)
_ array.ExtensionArray = (*UUIDArray)(nil)
_ array.ExtensionArray = (*Parametric1Array)(nil)
_ array.ExtensionArray = (*Parametric2Array)(nil)
_ array.ExtensionArray = (*ExtStructArray)(nil)
_ array.ExtensionArray = (*DictExtensionArray)(nil)
_ array.ExtensionArray = (*SmallintArray)(nil)
)
144 changes: 144 additions & 0 deletions go/arrow/ipc/endian_swap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ipc

import (
"errors"
"math/bits"

"github.com/apache/arrow/go/v9/arrow"
"github.com/apache/arrow/go/v9/arrow/array"
"github.com/apache/arrow/go/v9/arrow/memory"
)

// swap the endianness of the array's buffers as needed in-place to save
// the cost of reallocation.
//
// assumes that nested data buffers are never re-used, if an *array.Data
// child is re-used among the children or the dictionary then this might
// end up double-swapping (putting it back into the original endianness).
// if it is needed to support re-using the buffers, then this can be
// re-factored to instead return a NEW array.Data object with newly
// allocated buffers, rather than doing it in place.
//
// For now this is intended to be used by the IPC readers after loading
// arrays from an IPC message which currently is guaranteed to not re-use
// buffers between arrays.
func swapEndianArrayData(data *array.Data) error {
if data.Offset() != 0 {
return errors.New("unsupported data format: data.offset != 0")
}
if err := swapType(data.DataType(), data); err != nil {
return err
}
return swapChildren(data.Children())
}

func swapChildren(children []arrow.ArrayData) (err error) {
for i := range children {
if err = swapEndianArrayData(children[i].(*array.Data)); err != nil {
break
}
}
return
}

func swapType(dt arrow.DataType, data *array.Data) (err error) {
switch dt.ID() {
case arrow.BINARY, arrow.STRING:
swapOffsets(1, data)
return
case arrow.NULL, arrow.BOOL, arrow.INT8, arrow.UINT8,
arrow.FIXED_SIZE_BINARY, arrow.FIXED_SIZE_LIST, arrow.STRUCT:
return
case arrow.DENSE_UNION, arrow.SPARSE_UNION:
panic("arrow endian swap not yet implemented for union types")
case arrow.LARGE_BINARY, arrow.LARGE_LIST, arrow.LARGE_STRING:
panic("arrow endian swap not yet implemented for large types")
}

switch dt := dt.(type) {
case *arrow.Decimal128Type:
rawdata := arrow.Uint64Traits.CastFromBytes(data.Buffers()[1].Bytes())
length := data.Buffers()[1].Len() / arrow.Decimal128SizeBytes
for i := 0; i < length; i++ {
idx := i * 2
tmp := bits.ReverseBytes64(rawdata[idx])
rawdata[idx] = bits.ReverseBytes64(rawdata[idx+1])
rawdata[idx+1] = tmp
}
case *arrow.ListType:
swapOffsets(1, data)
case *arrow.MapType:
swapOffsets(1, data)
case *arrow.DayTimeIntervalType:
byteSwapBuffer(32, data.Buffers()[1])
case *arrow.MonthDayNanoIntervalType:
rawdata := arrow.MonthDayNanoIntervalTraits.CastFromBytes(data.Buffers()[1].Bytes())
for i, tmp := range rawdata {
rawdata[i].Days = int32(bits.ReverseBytes32(uint32(tmp.Days)))
rawdata[i].Months = int32(bits.ReverseBytes32(uint32(tmp.Months)))
rawdata[i].Nanoseconds = int64(bits.ReverseBytes64(uint64(tmp.Nanoseconds)))
}
case arrow.ExtensionType:
return swapType(dt.StorageType(), data)
case *arrow.DictionaryType:
// dictionary itself was already swapped in ReadDictionary calls
return swapType(dt.IndexType, data)
case arrow.FixedWidthDataType:
byteSwapBuffer(dt.BitWidth(), data.Buffers()[1])
}
return
}

// this can get called on an invalid Array Data object by the IPC reader,
// so we won't rely on the data.length and will instead rely on the buffer's
// own size instead.
func byteSwapBuffer(bw int, buf *memory.Buffer) {
if bw == 1 || buf == nil {
// if byte width == 1, no need to swap anything
return
}

switch bw {
case 16:
data := arrow.Uint16Traits.CastFromBytes(buf.Bytes())
for i := range data {
data[i] = bits.ReverseBytes16(data[i])
}
case 32:
data := arrow.Uint32Traits.CastFromBytes(buf.Bytes())
for i := range data {
data[i] = bits.ReverseBytes32(data[i])
}
case 64:
data := arrow.Uint64Traits.CastFromBytes(buf.Bytes())
for i := range data {
data[i] = bits.ReverseBytes64(data[i])
}
}
}

func swapOffsets(index int, data *array.Data) {
if data.Buffers()[index] == nil || data.Buffers()[index].Len() == 0 {
return
}

// other than unions, offset has one more element than the data.length
// don't yet implement large types, so hardcode 32bit offsets for now
byteSwapBuffer(32, data.Buffers()[index])
}
Loading

0 comments on commit bb31c9a

Please sign in to comment.