Skip to content

Commit

Permalink
ARROW-17276: [Go][Integration] Implement IPC handling for union type (#…
Browse files Browse the repository at this point in the history
…13806)

With this, the Go implementation finally fully supports IPC handling for All the Arrow DataTypes!

Authored-by: Matt Topol <[email protected]>
Signed-off-by: Matt Topol <[email protected]>
  • Loading branch information
zeroshade authored Aug 8, 2022
1 parent b1d36c0 commit d171b6c
Show file tree
Hide file tree
Showing 17 changed files with 808 additions and 54 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ r/man/*.Rd linguist-generated=true
cpp/src/generated/*.h linguist-generated=true
r/NEWS.md merge=union
go/**/*.s linguist-generated=true
go/arrow/unionmode_string.go linguist-generated=true
1 change: 0 additions & 1 deletion dev/archery/archery/integration/datagen.py
Original file line number Diff line number Diff line change
Expand Up @@ -1636,7 +1636,6 @@ def _temp_path():

generate_unions_case()
.skip_category('C#')
.skip_category('Go')
.skip_category('JS'),

generate_custom_metadata_case()
Expand Down
1 change: 1 addition & 0 deletions dev/release/rat_exclude_files.txt
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ go/arrow/flight/internal/flight/Flight_grpc.pb.go
go/arrow/internal/cpu/*
go/arrow/type_string.go
go/arrow/cdata/test/go.sum
go/arrow/unionmode_string.go
go/arrow/compute/datumkind_string.go
go/arrow/compute/valueshape_string.go
go/*.tmpldata
Expand Down
4 changes: 2 additions & 2 deletions docs/source/status.rst
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ Data Types
+-------------------+-------+-------+-------+------------+-------+-------+-------+
| Map ||||| | ||
+-------------------+-------+-------+-------+------------+-------+-------+-------+
| Dense Union ||| | | | ||
| Dense Union ||| | | | ||
+-------------------+-------+-------+-------+------------+-------+-------+-------+
| Sparse Union ||| | | | ||
| Sparse Union ||| | | | ||
+-------------------+-------+-------+-------+------------+-------+-------+-------+

+-------------------+-------+-------+-------+------------+-------+-------+-------+
Expand Down
10 changes: 6 additions & 4 deletions go/arrow/array/union.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,16 @@ func (a *SparseUnion) setData(data *Data) {
}

func (a *SparseUnion) getOneForMarshal(i int) interface{} {
typeID := a.RawTypeCodes()[i]

childID := a.ChildID(i)
field := a.unionType.Fields()[childID]
data := a.Field(childID)

if data.IsNull(i) {
return nil
}

return map[string]interface{}{field.Name: data.(arraymarshal).getOneForMarshal(i)}
return []interface{}{typeID, data.(arraymarshal).getOneForMarshal(i)}
}

func (a *SparseUnion) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -570,16 +571,17 @@ func (a *DenseUnion) setData(data *Data) {
}

func (a *DenseUnion) getOneForMarshal(i int) interface{} {
typeID := a.RawTypeCodes()[i]

childID := a.ChildID(i)
field := a.unionType.Fields()[childID]
data := a.Field(childID)

offsets := a.RawValueOffsets()
if data.IsNull(int(offsets[i])) {
return nil
}

return map[string]interface{}{field.Name: data.(arraymarshal).getOneForMarshal(int(offsets[i]))}
return []interface{}{typeID, data.(arraymarshal).getOneForMarshal(int(offsets[i]))}
}

func (a *DenseUnion) MarshalJSON() ([]byte, error) {
Expand Down
1 change: 0 additions & 1 deletion go/arrow/bitutil/bitmaps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ func (s *BitmapOpSuite) testAligned(op bitmapOp, leftBits, rightBits []int, resu
out *memory.Buffer
length int64
)

for _, lOffset := range []int64{0, 1, 3, 5, 7, 8, 13, 21, 38, 75, 120, 65536} {
s.Run(fmt.Sprintf("left offset %d", lOffset), func() {
left = bitmapFromSlice(leftBits, int(lOffset))
Expand Down
4 changes: 2 additions & 2 deletions go/arrow/datatype_nested.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,8 @@ const (
MaxUnionTypeCode UnionTypeCode = 127
InvalidUnionChildID int = -1

SparseMode UnionMode = iota
DenseMode
SparseMode UnionMode = iota // SPARSE
DenseMode // DENSE
)

// UnionType is an interface to encompass both Dense and Sparse Union types.
Expand Down
1 change: 1 addition & 0 deletions go/arrow/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ package arrow

// stringer
//go:generate stringer -type=Type
//go:generate stringer -type=UnionMode -linecomment
64 changes: 63 additions & 1 deletion go/arrow/internal/arrdata/arrdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func init() {
Records["decimal128"] = makeDecimal128sRecords()
Records["maps"] = makeMapsRecords()
Records["extension"] = makeExtensionRecords()
// Records["union"] = makeUnionRecords()
Records["union"] = makeUnionRecords()

for k := range Records {
RecordNames = append(RecordNames, k)
Expand Down Expand Up @@ -935,6 +935,68 @@ func makeExtensionRecords() []arrow.Record {
return recs
}

func makeUnionRecords() []arrow.Record {
mem := memory.NewGoAllocator()

unionFields := []arrow.Field{
{Name: "u0", Type: arrow.PrimitiveTypes.Int32, Nullable: true},
{Name: "u1", Type: arrow.PrimitiveTypes.Uint8, Nullable: true},
}

typeCodes := []arrow.UnionTypeCode{5, 10}
sparseType := arrow.SparseUnionOf(unionFields, typeCodes)
denseType := arrow.DenseUnionOf(unionFields, typeCodes)

schema := arrow.NewSchema([]arrow.Field{
{Name: "sparse", Type: sparseType, Nullable: true},
{Name: "dense", Type: denseType, Nullable: true},
}, nil)

sparseChildren := make([]arrow.Array, 4)
denseChildren := make([]arrow.Array, 4)

const length = 7

typeIDsBuffer := memory.NewBufferBytes(arrow.Uint8Traits.CastToBytes([]uint8{5, 10, 5, 5, 10, 10, 5}))
sparseChildren[0] = arrayOf(mem, []int32{0, 1, 2, 3, 4, 5, 6},
[]bool{true, true, true, false, true, true, true})
defer sparseChildren[0].Release()
sparseChildren[1] = arrayOf(mem, []uint8{10, 11, 12, 13, 14, 15, 16},
nil)
defer sparseChildren[1].Release()
sparseChildren[2] = arrayOf(mem, []int32{0, -1, -2, -3, -4, -5, -6},
[]bool{true, true, true, true, true, true, false})
defer sparseChildren[2].Release()
sparseChildren[3] = arrayOf(mem, []uint8{100, 101, 102, 103, 104, 105, 106},
nil)
defer sparseChildren[3].Release()

denseChildren[0] = arrayOf(mem, []int32{0, 2, 3, 7}, []bool{true, false, true, true})
defer denseChildren[0].Release()
denseChildren[1] = arrayOf(mem, []uint8{11, 14, 15}, nil)
defer denseChildren[1].Release()
denseChildren[2] = arrayOf(mem, []int32{0, -2, -3, -7}, []bool{false, true, true, false})
defer denseChildren[2].Release()
denseChildren[3] = arrayOf(mem, []uint8{101, 104, 105}, nil)
defer denseChildren[3].Release()

offsetsBuffer := memory.NewBufferBytes(arrow.Int32Traits.CastToBytes([]int32{0, 0, 1, 2, 1, 2, 3}))
sparse1 := array.NewSparseUnion(sparseType, length, sparseChildren[:2], typeIDsBuffer, 0)
dense1 := array.NewDenseUnion(denseType, length, denseChildren[:2], typeIDsBuffer, offsetsBuffer, 0)

sparse2 := array.NewSparseUnion(sparseType, length, sparseChildren[2:], typeIDsBuffer, 0)
dense2 := array.NewDenseUnion(denseType, length, denseChildren[2:], typeIDsBuffer, offsetsBuffer, 0)

defer sparse1.Release()
defer dense1.Release()
defer sparse2.Release()
defer dense2.Release()

return []arrow.Record{
array.NewRecord(schema, []arrow.Array{sparse1, dense1}, -1),
array.NewRecord(schema, []arrow.Array{sparse2, dense2}, -1)}
}

func extArray(mem memory.Allocator, dt arrow.ExtensionType, a interface{}, valids []bool) arrow.Array {
var storage arrow.Array
switch st := dt.StorageType().(type) {
Expand Down
79 changes: 73 additions & 6 deletions go/arrow/internal/arrjson/arrjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ func typeToJSON(arrowType arrow.DataType) (json.RawMessage, error) {
typ = decimalJSON{"decimal", int(dt.Scale), int(dt.Precision), 128}
case *arrow.Decimal256Type:
typ = decimalJSON{"decimal", int(dt.Scale), int(dt.Precision), 256}
case arrow.UnionType:
typ = unionJSON{"union", dt.Mode().String(), dt.TypeCodes()}
default:
return nil, fmt.Errorf("unknown arrow.DataType %v", arrowType)
}
Expand Down Expand Up @@ -462,6 +464,17 @@ func typeFromJSON(typ json.RawMessage, children []FieldWrapper) (arrowType arrow
case 128, 0: // default to 128 bits when missing
arrowType = &arrow.Decimal128Type{Precision: int32(t.Precision), Scale: int32(t.Scale)}
}
case "union":
t := unionJSON{}
if err = json.Unmarshal(typ, &t); err != nil {
return
}
switch t.Mode {
case "SPARSE":
arrowType = arrow.SparseUnionOf(fieldsFromJSON(children), t.TypeIDs)
case "DENSE":
arrowType = arrow.DenseUnionOf(fieldsFromJSON(children), t.TypeIDs)
}
}

if arrowType == nil {
Expand Down Expand Up @@ -598,6 +611,12 @@ type mapJSON struct {
KeysSorted bool `json:"keysSorted,omitempty"`
}

type unionJSON struct {
Name string `json:"name"`
Mode string `json:"mode"`
TypeIDs []arrow.UnionTypeCode `json:"typeIds"`
}

func schemaToJSON(schema *arrow.Schema, mapper *dictutils.Mapper) Schema {
return Schema{
Fields: fieldsToJSON(schema.Fields(), dictutils.NewFieldPos(), mapper),
Expand Down Expand Up @@ -742,12 +761,13 @@ func recordToJSON(rec arrow.Record) Record {
}

type Array struct {
Name string `json:"name"`
Count int `json:"count"`
Valids []int `json:"VALIDITY,omitempty"`
Data []interface{} `json:"DATA,omitempty"`
Offset interface{} `json:"-"`
Children []Array `json:"children,omitempty"`
Name string `json:"name"`
Count int `json:"count"`
Valids []int `json:"VALIDITY,omitempty"`
Data []interface{} `json:"DATA,omitempty"`
TypeID []arrow.UnionTypeCode `json:"TYPE_ID,omitempty"`
Offset interface{} `json:"OFFSET,omitempty"`
Children []Array `json:"children,omitempty"`
}

func (a *Array) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -782,6 +802,10 @@ func (a *Array) UnmarshalJSON(b []byte) (err error) {
return
}

if len(rawOffsets) == 0 {
return
}

switch rawOffsets[0].(type) {
case string:
out := make([]int64, len(rawOffsets))
Expand Down Expand Up @@ -1152,6 +1176,31 @@ func arrayFromJSON(mem memory.Allocator, dt arrow.DataType, arr Array) arrow.Arr
defer indices.Release()
return array.NewData(dt, indices.Len(), indices.Buffers(), indices.Children(), indices.NullN(), indices.Offset())

case arrow.UnionType:
fields := make([]arrow.ArrayData, len(dt.Fields()))
for i, f := range dt.Fields() {
child := arrayFromJSON(mem, f.Type, arr.Children[i])
defer child.Release()
fields[i] = child
}

typeIdBuf := memory.NewBufferBytes(arrow.Int8Traits.CastToBytes(arr.TypeID))
defer typeIdBuf.Release()
buffers := []*memory.Buffer{nil, typeIdBuf}
if dt.Mode() == arrow.DenseMode {
var offsets []byte
if arr.Offset == nil {
offsets = []byte{}
} else {
offsets = arrow.Int32Traits.CastToBytes(arr.Offset.([]int32))
}
offsetBuf := memory.NewBufferBytes(offsets)
defer offsetBuf.Release()
buffers = append(buffers, offsetBuf)
}

return array.NewData(dt, arr.Count, buffers, fields, 0, 0)

default:
panic(fmt.Errorf("unknown data type %v %T", dt, dt))
}
Expand Down Expand Up @@ -1478,6 +1527,24 @@ func arrayToJSON(field arrow.Field, arr arrow.Array) Array {
case *array.Dictionary:
return arrayToJSON(field, arr.Indices())

case array.Union:
dt := arr.DataType().(arrow.UnionType)
o := Array{
Name: field.Name,
Count: arr.Len(),
Valids: validsToJSON(arr),
TypeID: arr.RawTypeCodes(),
Children: make([]Array, len(dt.Fields())),
}
if dt.Mode() == arrow.DenseMode {
o.Offset = arr.(*array.DenseUnion).RawValueOffsets()
}
fields := dt.Fields()
for i := range o.Children {
o.Children[i] = arrayToJSON(fields[i], arr.Field(i))
}
return o

default:
panic(fmt.Errorf("unknown array type %T", arr))
}
Expand Down
Loading

0 comments on commit d171b6c

Please sign in to comment.