From df7f1056cdf23d927b8cb02c0d94b3029fcf6759 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 27 Jul 2022 15:04:35 -0400 Subject: [PATCH 01/13] Handle Large Offset primitives in IPC and integration tests --- dev/archery/archery/integration/datagen.py | 1 - go/arrow/datatype.go | 5 + go/arrow/datatype_binary.go | 8 ++ go/arrow/datatype_nested.go | 4 + go/arrow/internal/arrjson/arrjson.go | 127 ++++++++++++++++++++- go/arrow/ipc/file_reader.go | 2 +- go/arrow/ipc/metadata.go | 16 +++ go/arrow/ipc/writer.go | 86 +++++++++++++- 8 files changed, 235 insertions(+), 14 deletions(-) diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 591aa6d0be1e9..69eb374a4f7f1 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1587,7 +1587,6 @@ def _temp_path(): generate_primitive_large_offsets_case([17, 20]) .skip_category('C#') - .skip_category('Go') .skip_category('JS'), generate_null_case([10, 0]) diff --git a/go/arrow/datatype.go b/go/arrow/datatype.go index 42976d473b402..7bbf480872cf5 100644 --- a/go/arrow/datatype.go +++ b/go/arrow/datatype.go @@ -183,6 +183,11 @@ type BinaryDataType interface { binary() } +type OffsetsDataType interface { + DataType + OffsetTypeTraits() OffsetTraits +} + func HashType(seed maphash.Seed, dt DataType) uint64 { var h maphash.Hash h.SetSeed(seed) diff --git a/go/arrow/datatype_binary.go b/go/arrow/datatype_binary.go index 19d1dcc3bde07..e77e3df44be42 100644 --- a/go/arrow/datatype_binary.go +++ b/go/arrow/datatype_binary.go @@ -16,6 +16,10 @@ package arrow +type OffsetTraits interface { + BytesRequired(int) int +} + type BinaryType struct{} func (t *BinaryType) ID() Type { return BINARY } @@ -27,6 +31,7 @@ func (t *BinaryType) Layout() DataTypeLayout { return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int32SizeBytes), SpecVariableWidth()}} } +func (t *BinaryType) OffsetTypeTraits() OffsetTraits { return Int32Traits } type StringType struct{} @@ -39,6 +44,7 @@ func (t *StringType) Layout() DataTypeLayout { return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int32SizeBytes), SpecVariableWidth()}} } +func (t *StringType) OffsetTypeTraits() OffsetTraits { return Int32Traits } type LargeBinaryType struct{} @@ -51,6 +57,7 @@ func (t *LargeBinaryType) Layout() DataTypeLayout { return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int64SizeBytes), SpecVariableWidth()}} } +func (t *LargeBinaryType) OffsetTypeTraits() OffsetTraits { return Int64Traits } type LargeStringType struct{} @@ -63,6 +70,7 @@ func (t *LargeStringType) Layout() DataTypeLayout { return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int64SizeBytes), SpecVariableWidth()}} } +func (t *LargeStringType) OffsetTypeTraits() OffsetTraits { return Int64Traits } var ( BinaryTypes = struct { diff --git a/go/arrow/datatype_nested.go b/go/arrow/datatype_nested.go index 108ef82779e54..4b5118534fb64 100644 --- a/go/arrow/datatype_nested.go +++ b/go/arrow/datatype_nested.go @@ -94,6 +94,8 @@ func (ListType) Layout() DataTypeLayout { return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int32SizeBytes)}} } +func (ListType) OffsetTypeTraits() OffsetTraits { return Int32Traits } + // FixedSizeListType describes a nested type in which each array slot contains // a fixed-size sequence of values, all having the same relative type. type FixedSizeListType struct { @@ -329,6 +331,8 @@ func (t *MapType) Layout() DataTypeLayout { return t.value.Layout() } +func (MapType) OffsetTypeTraits() OffsetTraits { return Int32Traits } + type Field struct { Name string // Field name Type DataType // The field's data type diff --git a/go/arrow/internal/arrjson/arrjson.go b/go/arrow/internal/arrjson/arrjson.go index de862a169d3ee..b85da410bc8fd 100644 --- a/go/arrow/internal/arrjson/arrjson.go +++ b/go/arrow/internal/arrjson/arrjson.go @@ -151,8 +151,12 @@ func typeToJSON(arrowType arrow.DataType) (json.RawMessage, error) { typ = floatJSON{"floatingpoint", "DOUBLE"} case *arrow.BinaryType: typ = nameJSON{"binary"} + case *arrow.LargeBinaryType: + typ = nameJSON{"largebinary"} case *arrow.StringType: typ = nameJSON{"utf8"} + case *arrow.LargeStringType: + typ = nameJSON{"largeutf8"} case *arrow.Date32Type: typ = unitZoneJSON{Name: "date", Unit: "DAY"} case *arrow.Date64Type: @@ -319,8 +323,12 @@ func typeFromJSON(typ json.RawMessage, children []FieldWrapper) (arrowType arrow } case "binary": arrowType = arrow.BinaryTypes.Binary + case "largebinary": + arrowType = arrow.BinaryTypes.LargeBinary case "utf8": arrowType = arrow.BinaryTypes.String + case "largeutf8": + arrowType = arrow.BinaryTypes.LargeString case "date": t := unitZoneJSON{} if err = json.Unmarshal(typ, &t); err != nil { @@ -439,7 +447,6 @@ func typeFromJSON(typ json.RawMessage, children []FieldWrapper) (arrowType arrow arrowType = &arrow.Decimal128Type{Precision: int32(t.Precision), Scale: int32(t.Scale)} } - if arrowType == nil { err = fmt.Errorf("unhandled type unmarshalling from json: %s", tmp.Name) } @@ -728,10 +735,63 @@ type Array struct { Count int `json:"count"` Valids []int `json:"VALIDITY,omitempty"` Data []interface{} `json:"DATA,omitempty"` - Offset []int32 `json:"OFFSET,omitempty"` + Offset interface{} `json:"-"` Children []Array `json:"children,omitempty"` } +func (a *Array) MarshalJSON() ([]byte, error) { + type Alias Array + aux := struct { + *Alias + OutOffset interface{} `json:"OFFSET,omitempty"` + }{Alias: (*Alias)(a), OutOffset: a.Offset} + return json.Marshal(aux) +} + +func (a *Array) UnmarshalJSON(b []byte) (err error) { + type Alias Array + aux := &struct { + *Alias + RawOffset json.RawMessage `json:"OFFSET,omitempty"` + }{Alias: (*Alias)(a)} + + dec := json.NewDecoder(bytes.NewReader(b)) + dec.UseNumber() + + if err = dec.Decode(&aux); err != nil { + return + } + + if len(aux.RawOffset) == 0 { + return + } + + var rawOffsets []interface{} + if err = json.Unmarshal(aux.RawOffset, &rawOffsets); err != nil { + return + } + + switch rawOffsets[0].(type) { + case string: + out := make([]int64, len(rawOffsets)) + for i, o := range rawOffsets { + out[i], err = strconv.ParseInt(o.(string), 10, 64) + if err != nil { + return + } + } + a.Offset = out + case float64: + out := make([]int32, len(rawOffsets)) + for i, o := range rawOffsets { + out[i] = int32(o.(float64)) + } + a.Offset = out + } + + return nil +} + func arraysFromJSON(mem memory.Allocator, schema *arrow.Schema, arrs []Array) []arrow.ArrayData { o := make([]arrow.ArrayData, len(arrs)) for i, v := range arrs { @@ -874,6 +934,22 @@ func arrayFromJSON(mem memory.Allocator, dt arrow.DataType, arr Array) arrow.Arr bldr.AppendValues(data, valids) return returnNewArrayData(bldr) + case *arrow.LargeStringType: + bldr := array.NewLargeStringBuilder(mem) + defer bldr.Release() + data := strFromJSON(arr.Data) + valids := validsFromJSON(arr.Valids) + bldr.AppendValues(data, valids) + return returnNewArrayData(bldr) + + case *arrow.LargeBinaryType: + bldr := array.NewBinaryBuilder(mem, dt) + defer bldr.Release() + data := bytesFromJSON(arr.Data) + valids := validsFromJSON(arr.Valids) + bldr.AppendValues(data, valids) + return returnNewArrayData(bldr) + case *arrow.BinaryType: bldr := array.NewBinaryBuilder(mem, dt) defer bldr.Release() @@ -892,7 +968,7 @@ func arrayFromJSON(mem memory.Allocator, dt arrow.DataType, arr Array) arrow.Arr nulls := arr.Count - bitutil.CountSetBits(bitmap.Bytes(), 0, arr.Count) return array.NewData(dt, arr.Count, []*memory.Buffer{bitmap, - memory.NewBufferBytes(arrow.Int32Traits.CastToBytes(arr.Offset))}, + memory.NewBufferBytes(arrow.Int32Traits.CastToBytes(arr.Offset.([]int32)))}, []arrow.ArrayData{elems}, nulls, 0) case *arrow.FixedSizeListType: @@ -951,7 +1027,7 @@ func arrayFromJSON(mem memory.Allocator, dt arrow.DataType, arr Array) arrow.Arr nulls := arr.Count - bitutil.CountSetBits(bitmap.Bytes(), 0, arr.Count) return array.NewData(dt, arr.Count, []*memory.Buffer{bitmap, - memory.NewBufferBytes(arrow.Int32Traits.CastToBytes(arr.Offset))}, + memory.NewBufferBytes(arrow.Int32Traits.CastToBytes(arr.Offset.([]int32)))}, []arrow.ArrayData{elems}, nulls, 0) case *arrow.Date32Type: @@ -1159,6 +1235,21 @@ func arrayToJSON(field arrow.Field, arr arrow.Array) Array { Count: arr.Len(), Data: strToJSON(arr), Valids: validsToJSON(arr), + Offset: arr.ValueOffsets(), + } + + case *array.LargeString: + offsets := arr.ValueOffsets() + strOffsets := make([]string, len(offsets)) + for i, o := range offsets { + strOffsets[i] = strconv.FormatInt(o, 10) + } + return Array{ + Name: field.Name, + Count: arr.Len(), + Data: strToJSON(arr), + Valids: validsToJSON(arr), + Offset: strOffsets, } case *array.Binary: @@ -1170,6 +1261,20 @@ func arrayToJSON(field arrow.Field, arr arrow.Array) Array { Offset: arr.ValueOffsets(), } + case *array.LargeBinary: + offsets := arr.ValueOffsets() + strOffsets := make([]string, len(offsets)) + for i, o := range offsets { + strOffsets[i] = strconv.FormatInt(o, 10) + } + return Array{ + Name: field.Name, + Count: arr.Len(), + Data: bytesToJSON(arr), + Valids: validsToJSON(arr), + Offset: strOffsets, + } + case *array.List: o := Array{ Name: field.Name, @@ -1622,7 +1727,12 @@ func strFromJSON(vs []interface{}) []string { return o } -func strToJSON(arr *array.String) []interface{} { +type strlike interface { + arrow.Array + Value(int) string +} + +func strToJSON(arr strlike) []interface{} { o := make([]interface{}, arr.Len()) for i := range o { o[i] = arr.Value(i) @@ -1649,7 +1759,12 @@ func bytesFromJSON(vs []interface{}) [][]byte { return o } -func bytesToJSON(arr *array.Binary) []interface{} { +type binarylike interface { + arrow.Array + Value(int) []byte +} + +func bytesToJSON(arr binarylike) []interface{} { o := make([]interface{}, arr.Len()) for i := range o { o[i] = strings.ToUpper(hex.EncodeToString(arr.Value(i))) diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index 4bc95ad41666e..7b835dacda51f 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -469,7 +469,7 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) arrow.ArrayData { *arrow.DurationType: return ctx.loadPrimitive(dt) - case *arrow.BinaryType, *arrow.StringType: + case *arrow.BinaryType, *arrow.StringType, *arrow.LargeStringType, *arrow.LargeBinaryType: return ctx.loadBinary(dt) case *arrow.FixedSizeBinaryType: diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go index f2aa663155235..c2cd3a06ba196 100644 --- a/go/arrow/ipc/metadata.go +++ b/go/arrow/ipc/metadata.go @@ -298,11 +298,21 @@ func (fv *fieldVisitor) visit(field arrow.Field) { flatbuf.BinaryStart(fv.b) fv.offset = flatbuf.BinaryEnd(fv.b) + case *arrow.LargeBinaryType: + fv.dtype = flatbuf.TypeLargeBinary + flatbuf.LargeBinaryStart(fv.b) + fv.offset = flatbuf.LargeBinaryEnd(fv.b) + case *arrow.StringType: fv.dtype = flatbuf.TypeUtf8 flatbuf.Utf8Start(fv.b) fv.offset = flatbuf.Utf8End(fv.b) + case *arrow.LargeStringType: + fv.dtype = flatbuf.TypeLargeUtf8 + flatbuf.LargeUtf8Start(fv.b) + fv.offset = flatbuf.LargeUtf8End(fv.b) + case *arrow.Date32Type: fv.dtype = flatbuf.TypeDate flatbuf.DateStart(fv.b) @@ -629,6 +639,12 @@ func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arr case flatbuf.TypeUtf8: return arrow.BinaryTypes.String, nil + case flatbuf.TypeLargeBinary: + return arrow.BinaryTypes.LargeBinary, nil + + case flatbuf.TypeLargeUtf8: + return arrow.BinaryTypes.LargeString, nil + case flatbuf.TypeBool: return arrow.FixedWidthTypes.Boolean, nil diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index 5371d8a7e8a5b..2a7b3aac920a1 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -562,6 +562,36 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { p.body = append(p.body, voffsets) p.body = append(p.body, values) + case *arrow.LargeBinaryType: + arr := arr.(*array.LargeBinary) + voffsets, err := w.getZeroBasedValueOffsets(arr) + if err != nil { + return fmt.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err) + } + data := arr.Data() + values := data.Buffers()[2] + + var totalDataBytes int64 + if voffsets != nil { + totalDataBytes = int64(len(arr.ValueBytes())) + } + + switch { + case needTruncate(int64(data.Offset()), values, totalDataBytes): + // slice data buffer to include the range we need now. + var ( + beg = int64(arr.ValueOffset(0)) + len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes)) + ) + values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len]) + default: + if values != nil { + values.Retain() + } + } + p.body = append(p.body, voffsets) + p.body = append(p.body, values) + case *arrow.StringType: arr := arr.(*array.String) voffsets, err := w.getZeroBasedValueOffsets(arr) @@ -592,6 +622,36 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { p.body = append(p.body, voffsets) p.body = append(p.body, values) + case *arrow.LargeStringType: + arr := arr.(*array.LargeString) + voffsets, err := w.getZeroBasedValueOffsets(arr) + if err != nil { + return fmt.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err) + } + data := arr.Data() + values := data.Buffers()[2] + + var totalDataBytes int64 + if voffsets != nil { + totalDataBytes = int64(len(arr.ValueBytes())) + } + + switch { + case needTruncate(int64(data.Offset()), values, totalDataBytes): + // slice data buffer to include the range we need now. + var ( + beg = int64(arr.ValueOffset(0)) + len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes)) + ) + values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len]) + default: + if values != nil { + values.Retain() + } + } + p.body = append(p.body, voffsets) + p.body = append(p.body, values) + case *arrow.StructType: w.depth-- arr := arr.(*array.Struct) @@ -707,7 +767,8 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { func (w *recordEncoder) getZeroBasedValueOffsets(arr arrow.Array) (*memory.Buffer, error) { data := arr.Data() voffsets := data.Buffers()[1] - offsetBytesNeeded := arrow.Int32Traits.BytesRequired(data.Len() + 1) + offsetTraits := arr.DataType().(arrow.OffsetsDataType).OffsetTypeTraits() + offsetBytesNeeded := offsetTraits.BytesRequired(data.Len() + 1) if data.Offset() != 0 || offsetBytesNeeded < voffsets.Len() { // if we have a non-zero offset, then the value offsets do not start at @@ -719,13 +780,26 @@ func (w *recordEncoder) getZeroBasedValueOffsets(arr arrow.Array) (*memory.Buffe shiftedOffsets := memory.NewResizableBuffer(w.mem) shiftedOffsets.Resize(offsetBytesNeeded) - dest := arrow.Int32Traits.CastFromBytes(shiftedOffsets.Bytes()) - offsets := arrow.Int32Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Offset()+data.Len()+1] + switch arr.DataType().Layout().Buffers[1].ByteWidth { + case 8: + dest := arrow.Int64Traits.CastFromBytes(shiftedOffsets.Bytes()) + offsets := arrow.Int64Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Offset()+data.Len()+1] + + startOffset := offsets[0] + for i, o := range offsets { + dest[i] = o - startOffset + } - startOffset := offsets[0] - for i, o := range offsets { - dest[i] = o - startOffset + default: + dest := arrow.Int32Traits.CastFromBytes(shiftedOffsets.Bytes()) + offsets := arrow.Int32Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Offset()+data.Len()+1] + + startOffset := offsets[0] + for i, o := range offsets { + dest[i] = o - startOffset + } } + voffsets = shiftedOffsets } else { voffsets.Retain() From cb2f193e6d3cecbba64cbfd0381fb68b94addbfd Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 27 Jul 2022 15:05:00 -0400 Subject: [PATCH 02/13] handle large binary and large string in cdata and scalar pkgs --- go/arrow/cdata/cdata.go | 39 +++++++++++++++++++++++++++------ go/arrow/cdata/cdata_exports.go | 4 ++++ go/arrow/cdata/cdata_test.go | 20 +++++++++++++++++ go/arrow/scalar/binary.go | 38 +++++++++++++++++++++++++++++--- go/arrow/scalar/parse.go | 8 +++++++ 5 files changed, 99 insertions(+), 10 deletions(-) diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index 5709eab796fcb..31b02ffea2f89 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -357,6 +357,10 @@ func (imp *cimporter) doImport(src *CArrowArray) error { return imp.importStringLike() case *arrow.BinaryType: return imp.importStringLike() + case *arrow.LargeStringType: + return imp.importLargeStringLike() + case *arrow.LargeBinaryType: + return imp.importLargeStringLike() case *arrow.ListType: return imp.importListLike() case *arrow.MapType: @@ -399,6 +403,27 @@ func (imp *cimporter) doImport(src *CArrowArray) error { return nil } +func (imp *cimporter) importLargeStringLike() error { + if err := imp.checkNoChildren(); err != nil { + return err + } + + if err := imp.checkNumBuffers(3); err != nil { + return err + } + + nulls, err := imp.importNullBitmap(0) + if err != nil { + return err + } + + offsets := imp.importOffsetsBuffer(1, int64(arrow.Int64SizeBytes)) + typedOffsets := arrow.Int64Traits.CastFromBytes(offsets.Bytes()) + values := imp.importVariableValuesBuffer(2, 1, int(typedOffsets[imp.arr.offset+imp.arr.length])) + imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset)) + return nil +} + func (imp *cimporter) importStringLike() error { if err := imp.checkNoChildren(); err != nil { return err @@ -413,8 +438,9 @@ func (imp *cimporter) importStringLike() error { return err } - offsets := imp.importOffsetsBuffer(1) - values := imp.importVariableValuesBuffer(2, 1, arrow.Int32Traits.CastFromBytes(offsets.Bytes())) + offsets := imp.importOffsetsBuffer(1, int64(arrow.Int32SizeBytes)) + typedOffsets := arrow.Int32Traits.CastFromBytes(offsets.Bytes()) + values := imp.importVariableValuesBuffer(2, 1, int(typedOffsets[imp.arr.offset+imp.arr.length])) imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset)) return nil } @@ -433,7 +459,7 @@ func (imp *cimporter) importListLike() error { return err } - offsets := imp.importOffsetsBuffer(1) + offsets := imp.importOffsetsBuffer(1, int64(arrow.Int32SizeBytes)) imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset)) return nil } @@ -513,14 +539,13 @@ func (imp *cimporter) importFixedSizeBuffer(bufferID int, byteWidth int64) *memo return imp.importBuffer(bufferID, bufsize) } -func (imp *cimporter) importOffsetsBuffer(bufferID int) *memory.Buffer { - const offsetsize = int64(arrow.Int32SizeBytes) // go doesn't implement int64 offsets yet +func (imp *cimporter) importOffsetsBuffer(bufferID int, offsetsize int64) *memory.Buffer { bufsize := offsetsize * int64((imp.arr.length + imp.arr.offset + 1)) return imp.importBuffer(bufferID, bufsize) } -func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth int, offsets []int32) *memory.Buffer { - bufsize := byteWidth * int(offsets[imp.arr.length]) +func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth int, bytelen int) *memory.Buffer { + bufsize := byteWidth * bytelen return imp.importBuffer(bufferID, int64(bufsize)) } diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go index 6badb702464da..1a6cff8fa50c9 100644 --- a/go/arrow/cdata/cdata_exports.go +++ b/go/arrow/cdata/cdata_exports.go @@ -151,8 +151,12 @@ func (exp *schemaExporter) exportFormat(dt arrow.DataType) string { return fmt.Sprintf("d:%d,%d", dt.Precision, dt.Scale) case *arrow.BinaryType: return "z" + case *arrow.LargeBinaryType: + return "Z" case *arrow.StringType: return "u" + case *arrow.LargeStringType: + return "U" case *arrow.Date32Type: return "tdD" case *arrow.Date64Type: diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go index ed16a30044feb..b458e79187c17 100644 --- a/go/arrow/cdata/cdata_test.go +++ b/go/arrow/cdata/cdata_test.go @@ -113,7 +113,9 @@ func TestPrimitiveSchemas(t *testing.T) { {arrow.PrimitiveTypes.Float64, "g"}, {&arrow.FixedSizeBinaryType{ByteWidth: 3}, "w:3"}, {arrow.BinaryTypes.Binary, "z"}, + {arrow.BinaryTypes.LargeBinary, "Z"}, {arrow.BinaryTypes.String, "u"}, + {arrow.BinaryTypes.LargeString, "U"}, {&arrow.Decimal128Type{Precision: 16, Scale: 4}, "d:16,4"}, {&arrow.Decimal128Type{Precision: 15, Scale: 0}, "d:15,0"}, {&arrow.Decimal128Type{Precision: 15, Scale: -4}, "d:15,-4"}, @@ -397,6 +399,22 @@ func createTestStrArr() arrow.Array { return bld.NewStringArray() } +func createTestLargeBinaryArr() arrow.Array { + bld := array.NewBinaryBuilder(memory.DefaultAllocator, arrow.BinaryTypes.LargeBinary) + defer bld.Release() + + bld.AppendValues([][]byte{[]byte("foo"), []byte("bar"), nil}, []bool{true, true, false}) + return bld.NewLargeBinaryArray() +} + +func createTestLargeStrArr() arrow.Array { + bld := array.NewLargeStringBuilder(memory.DefaultAllocator) + defer bld.Release() + + bld.AppendValues([]string{"foo", "bar", ""}, []bool{true, true, false}) + return bld.NewLargeStringArray() +} + func createTestDecimalArr() arrow.Array { bld := array.NewDecimal128Builder(memory.DefaultAllocator, &arrow.Decimal128Type{Precision: 16, Scale: 4}) defer bld.Release() @@ -425,6 +443,8 @@ func TestPrimitiveArrs(t *testing.T) { {"fixed size binary", createTestFSBArr}, {"binary", createTestBinaryArr}, {"utf8", createTestStrArr}, + {"largebinary", createTestLargeBinaryArr}, + {"largeutf8", createTestLargeStrArr}, {"decimal128", createTestDecimalArr}, } diff --git a/go/arrow/scalar/binary.go b/go/arrow/scalar/binary.go index 7f875fce453a1..aedee064d9e8b 100644 --- a/go/arrow/scalar/binary.go +++ b/go/arrow/scalar/binary.go @@ -61,9 +61,13 @@ func (b *Binary) CastTo(to arrow.DataType) (Scalar, error) { switch to.ID() { case arrow.BINARY: - return b, nil + return NewBinaryScalar(b.Value, b.Type), nil + case arrow.LARGE_BINARY: + return NewLargeBinaryScalar(b.Value), nil case arrow.STRING: return NewStringScalarFromBuffer(b.Value), nil + case arrow.LARGE_STRING: + return NewLargeStringScalarFromBuffer(b.Value), nil case arrow.FIXED_SIZE_BINARY: if b.Value.Len() == to.(*arrow.FixedSizeBinaryType).ByteWidth { return NewFixedSizeBinaryScalar(b.Value, to), nil @@ -86,9 +90,18 @@ func (b *Binary) ValidateFull() error { } func NewBinaryScalar(val *memory.Buffer, typ arrow.DataType) *Binary { + val.Retain() return &Binary{scalar{typ, true}, val} } +type LargeBinary struct { + *Binary +} + +func NewLargeBinaryScalar(val *memory.Buffer) *LargeBinary { + return &LargeBinary{NewBinaryScalar(val, arrow.BinaryTypes.LargeBinary)} +} + type String struct { *Binary } @@ -129,10 +142,29 @@ func NewStringScalar(val string) *String { } func NewStringScalarFromBuffer(val *memory.Buffer) *String { - val.Retain() + // NewBinaryScalar will call Retain on val, so we don't have to return &String{NewBinaryScalar(val, arrow.BinaryTypes.String)} } +// alias the String struct we are embedding so it doesn't hide the +// String() function that we want to expose +type stringScalar = String + +type LargeString struct { + *stringScalar +} + +func NewLargeStringScalar(val string) *LargeString { + buf := memory.NewBufferBytes([]byte(val)) + defer buf.Release() + return NewLargeStringScalarFromBuffer(buf) +} + +func NewLargeStringScalarFromBuffer(val *memory.Buffer) *LargeString { + // NewBinaryScalar will call retain on val, so we don't have to + return &LargeString{stringScalar: &String{NewBinaryScalar(val, arrow.BinaryTypes.LargeString)}} +} + type FixedSizeBinary struct { *Binary } @@ -154,6 +186,6 @@ func (b *FixedSizeBinary) Validate() (err error) { func (b *FixedSizeBinary) ValidateFull() error { return b.Validate() } func NewFixedSizeBinaryScalar(val *memory.Buffer, typ arrow.DataType) *FixedSizeBinary { - val.Retain() + // NewBinaryScalar will call Retain on val, so we don't have to return &FixedSizeBinary{NewBinaryScalar(val, typ)} } diff --git a/go/arrow/scalar/parse.go b/go/arrow/scalar/parse.go index e16ed5236d870..571f741d5761a 100644 --- a/go/arrow/scalar/parse.go +++ b/go/arrow/scalar/parse.go @@ -375,8 +375,12 @@ func MakeScalarParam(val interface{}, dt arrow.DataType) (Scalar, error) { switch dt.ID() { case arrow.BINARY: return NewBinaryScalar(buf, dt), nil + case arrow.LARGE_BINARY: + return NewLargeBinaryScalar(buf), nil case arrow.STRING: return NewStringScalarFromBuffer(buf), nil + case arrow.LARGE_STRING: + return NewLargeStringScalarFromBuffer(buf), nil case arrow.FIXED_SIZE_BINARY: if buf.Len() == dt.(*arrow.FixedSizeBinaryType).ByteWidth { return NewFixedSizeBinaryScalar(buf, dt), nil @@ -387,8 +391,12 @@ func MakeScalarParam(val interface{}, dt arrow.DataType) (Scalar, error) { switch dt.ID() { case arrow.BINARY: return NewBinaryScalar(v, dt), nil + case arrow.LARGE_BINARY: + return NewLargeBinaryScalar(v), nil case arrow.STRING: return NewStringScalarFromBuffer(v), nil + case arrow.LARGE_STRING: + return NewLargeStringScalarFromBuffer(v), nil case arrow.FIXED_SIZE_BINARY: if v.Len() == dt.(*arrow.FixedSizeBinaryType).ByteWidth { return NewFixedSizeBinaryScalar(v, dt), nil From 312ed145f136d8428d0e9ce482155ba8acc83971 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 1 Aug 2022 16:00:35 -0400 Subject: [PATCH 03/13] fix missing format strings for LargeString/LargeBinary --- go/arrow/cdata/cdata.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index 31b02ffea2f89..97429429c64c1 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -75,7 +75,9 @@ var formatToSimpleType = map[string]arrow.DataType{ "f": arrow.PrimitiveTypes.Float32, "g": arrow.PrimitiveTypes.Float64, "z": arrow.BinaryTypes.Binary, + "Z": arrow.BinaryTypes.LargeBinary, "u": arrow.BinaryTypes.String, + "U": arrow.BinaryTypes.LargeString, "tdD": arrow.FixedWidthTypes.Date32, "tdm": arrow.FixedWidthTypes.Date64, "tts": arrow.FixedWidthTypes.Time32s, From 85e9f5cefdcf045dc60c68d2548e5c65f49973da Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 27 Jul 2022 17:36:17 -0400 Subject: [PATCH 04/13] implement large list handling --- dev/archery/archery/integration/datagen.py | 1 - docs/source/status.rst | 2 +- go/arrow/array/array.go | 2 +- go/arrow/array/array_test.go | 6 +- go/arrow/array/compare.go | 25 ++ go/arrow/array/concat.go | 17 ++ go/arrow/array/concat_test.go | 37 +++ go/arrow/array/list.go | 224 ++++++++++++++++-- go/arrow/array/list_test.go | 187 +++++++++++++++ go/arrow/cdata/cdata.go | 12 +- go/arrow/cdata/cdata_exports.go | 5 + go/arrow/cdata/cdata_test.go | 18 ++ go/arrow/datatype_nested.go | 53 ++++- go/arrow/internal/arrdata/arrdata.go | 11 + go/arrow/internal/arrjson/arrjson.go | 47 +++- go/arrow/internal/arrjson/arrjson_test.go | 258 ++++++++++++++++++--- go/arrow/ipc/file_reader.go | 10 +- go/arrow/ipc/metadata.go | 13 ++ go/arrow/ipc/writer.go | 37 +++ go/arrow/scalar/nested.go | 14 ++ go/arrow/scalar/numeric.gen.go | 20 ++ go/arrow/scalar/numeric.gen.go.tmpl | 2 + go/arrow/scalar/parse.go | 5 + go/arrow/scalar/scalar.go | 6 +- 24 files changed, 933 insertions(+), 79 deletions(-) diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index 69eb374a4f7f1..9069b04a4ecec 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1633,7 +1633,6 @@ def _temp_path(): generate_nested_large_offsets_case() .skip_category('C#') - .skip_category('Go') .skip_category('JS'), generate_unions_case() diff --git a/docs/source/status.rst b/docs/source/status.rst index 0259538f87567..a1cac8af09791 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -77,7 +77,7 @@ Data Types +-------------------+-------+-------+-------+------------+-------+-------+-------+ | List | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+-------+ -| Large List | ✓ | ✓ | | | | ✓ | ✓ | +| Large List | ✓ | ✓ | ✓ | | | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+-------+ | Struct | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | ✓ | +-------------------+-------+-------+-------+------------+-------+-------+-------+ diff --git a/go/arrow/array/array.go b/go/arrow/array/array.go index cd534902b4117..07fa343fb69be 100644 --- a/go/arrow/array/array.go +++ b/go/arrow/array/array.go @@ -178,7 +178,7 @@ func init() { arrow.DURATION: func(data arrow.ArrayData) arrow.Array { return NewDurationData(data) }, arrow.LARGE_STRING: func(data arrow.ArrayData) arrow.Array { return NewLargeStringData(data) }, arrow.LARGE_BINARY: func(data arrow.ArrayData) arrow.Array { return NewLargeBinaryData(data) }, - arrow.LARGE_LIST: unsupportedArrayType, + arrow.LARGE_LIST: func(data arrow.ArrayData) arrow.Array { return NewLargeListData(data) }, arrow.INTERVAL: func(data arrow.ArrayData) arrow.Array { return NewIntervalData(data) }, arrow.INTERVAL_MONTH_DAY_NANO: func(data arrow.ArrayData) arrow.Array { return NewMonthDayNanoIntervalData(data) }, diff --git a/go/arrow/array/array_test.go b/go/arrow/array/array_test.go index b01a0c63fa94e..bc01bc8a89e3c 100644 --- a/go/arrow/array/array_test.go +++ b/go/arrow/array/array_test.go @@ -81,6 +81,11 @@ func TestMakeFromData(t *testing.T) { array.NewData(&testDataType{arrow.INT64}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */), }}, + {name: "large list", d: &testDataType{arrow.LARGE_LIST}, child: []arrow.ArrayData{ + array.NewData(&testDataType{arrow.INT64}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */), + array.NewData(&testDataType{arrow.INT64}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */), + }}, + {name: "struct", d: &testDataType{arrow.STRUCT}}, {name: "struct", d: &testDataType{arrow.STRUCT}, child: []arrow.ArrayData{ array.NewData(&testDataType{arrow.INT64}, 0 /* length */, make([]*memory.Buffer, 2 /*null bitmap, values*/), nil /* childData */, 0 /* nulls */, 0 /* offset */), @@ -117,7 +122,6 @@ func TestMakeFromData(t *testing.T) { // unsupported types {name: "sparse union", d: &testDataType{arrow.SPARSE_UNION}, expPanic: true, expError: "unsupported data type: SPARSE_UNION"}, {name: "dense union", d: &testDataType{arrow.DENSE_UNION}, expPanic: true, expError: "unsupported data type: DENSE_UNION"}, - {name: "large list", d: &testDataType{arrow.LARGE_LIST}, expPanic: true, expError: "unsupported data type: LARGE_LIST"}, {name: "decimal256", d: &testDataType{arrow.DECIMAL256}, expPanic: true, expError: "unsupported data type: DECIMAL256"}, // invalid types diff --git a/go/arrow/array/compare.go b/go/arrow/array/compare.go index 283234b3984d0..19c06de0b2846 100644 --- a/go/arrow/array/compare.go +++ b/go/arrow/array/compare.go @@ -294,6 +294,9 @@ func Equal(left, right arrow.Array) bool { case *List: r := right.(*List) return arrayEqualList(l, r) + case *LargeList: + r := right.(*LargeList) + return arrayEqualLargeList(l, r) case *FixedSizeList: r := right.(*FixedSizeList) return arrayEqualFixedSizeList(l, r) @@ -535,6 +538,9 @@ func arrayApproxEqual(left, right arrow.Array, opt equalOption) bool { case *List: r := right.(*List) return arrayApproxEqualList(l, r, opt) + case *LargeList: + r := right.(*LargeList) + return arrayApproxEqualLargeList(l, r, opt) case *FixedSizeList: r := right.(*FixedSizeList) return arrayApproxEqualFixedSizeList(l, r, opt) @@ -650,6 +656,25 @@ func arrayApproxEqualList(left, right *List, opt equalOption) bool { return true } +func arrayApproxEqualLargeList(left, right *LargeList, opt equalOption) bool { + for i := 0; i < left.Len(); i++ { + if left.IsNull(i) { + continue + } + o := func() bool { + l := left.newListValue(i) + defer l.Release() + r := right.newListValue(i) + defer r.Release() + return arrayApproxEqual(l, r, opt) + }() + if !o { + return false + } + } + return true +} + func arrayApproxEqualFixedSizeList(left, right *FixedSizeList, opt equalOption) bool { for i := 0; i < left.Len(); i++ { if left.IsNull(i) { diff --git a/go/arrow/array/concat.go b/go/arrow/array/concat.go index a62fc5338c706..22885f569ab28 100644 --- a/go/arrow/array/concat.go +++ b/go/arrow/array/concat.go @@ -447,6 +447,23 @@ func concat(data []arrow.ArrayData, mem memory.Allocator) (arrow.ArrayData, erro defer c.Release() } + out.buffers[1] = offsetBuffer + out.childData = make([]arrow.ArrayData, 1) + out.childData[0], err = concat(childData, mem) + if err != nil { + return nil, err + } + case *arrow.LargeListType: + offsetWidth := dt.Layout().Buffers[1].ByteWidth + offsetBuffer, valueRanges, err := concatOffsets(gatherFixedBuffers(data, 1, offsetWidth), offsetWidth, mem) + if err != nil { + return nil, err + } + childData := gatherChildrenRanges(data, 0, valueRanges) + for _, c := range childData { + defer c.Release() + } + out.buffers[1] = offsetBuffer out.childData = make([]arrow.ArrayData, 1) out.childData[0], err = concat(childData, mem) diff --git a/go/arrow/array/concat_test.go b/go/arrow/array/concat_test.go index 745104757e452..f3c1c7ea80f28 100644 --- a/go/arrow/array/concat_test.go +++ b/go/arrow/array/concat_test.go @@ -76,6 +76,7 @@ func TestConcatenate(t *testing.T) { {arrow.BinaryTypes.String}, {arrow.BinaryTypes.LargeString}, {arrow.ListOf(arrow.PrimitiveTypes.Int8)}, + {arrow.LargeListOf(arrow.PrimitiveTypes.Int8)}, {arrow.FixedSizeListOf(3, arrow.PrimitiveTypes.Int8)}, {arrow.StructOf()}, {arrow.MapOf(arrow.PrimitiveTypes.Uint16, arrow.PrimitiveTypes.Int8)}, @@ -158,6 +159,32 @@ func (cts *ConcatTestSuite) generateArr(size int64, nullprob float64) arrow.Arra bldr := array.NewListBuilder(memory.DefaultAllocator, arrow.PrimitiveTypes.Int8) defer bldr.Release() + valid := make([]bool, len(offsetsVector)-1) + for i := range valid { + valid[i] = true + } + bldr.AppendValues(offsetsVector, valid) + vb := bldr.ValueBuilder().(*array.Int8Builder) + for i := 0; i < values.Len(); i++ { + if values.IsValid(i) { + vb.Append(values.Value(i)) + } else { + vb.AppendNull() + } + } + return bldr.NewArray() + case arrow.LARGE_LIST: + valuesSize := size * 8 + values := cts.rng.Int8(valuesSize, 0, 127, nullprob).(*array.Int8) + defer values.Release() + offsetsVector := cts.largeoffsets(int64(valuesSize), int32(size)) + // ensure the first and last offsets encompass the whole values + offsetsVector[0] = 0 + offsetsVector[len(offsetsVector)-1] = int64(valuesSize) + + bldr := array.NewLargeListBuilder(memory.DefaultAllocator, arrow.PrimitiveTypes.Int8) + defer bldr.Release() + valid := make([]bool, len(offsetsVector)-1) for i := range valid { valid[i] = true @@ -263,6 +290,16 @@ func (cts *ConcatTestSuite) offsets(length, slicecount int32) []int32 { return offsets } +func (cts *ConcatTestSuite) largeoffsets(length int64, slicecount int32) []int64 { + offsets := make([]int64, slicecount+1) + dist := rand.New(rand.NewSource(cts.seed)) + for i := range offsets { + offsets[i] = dist.Int63n(length + 1) + } + sort.Slice(offsets, func(i, j int) bool { return offsets[i] < offsets[j] }) + return offsets +} + func (cts *ConcatTestSuite) TestCheckConcat() { for _, sz := range cts.sizes { cts.Run(fmt.Sprintf("size %d", sz), func() { diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go index 9f63bcf382d96..19933181b7612 100644 --- a/go/arrow/array/list.go +++ b/go/arrow/array/list.go @@ -146,28 +146,177 @@ func (a *List) Release() { a.values.Release() } -type ListBuilder struct { +// LargeList represents an immutable sequence of array values. +type LargeList struct { + array + values arrow.Array + offsets []int64 +} + +// NewLargeListData returns a new LargeList array value, from data. +func NewLargeListData(data arrow.ArrayData) *LargeList { + a := &LargeList{} + a.refCount = 1 + a.setData(data.(*Data)) + return a +} + +func (a *LargeList) ListValues() arrow.Array { return a.values } + +func (a *LargeList) String() string { + o := new(strings.Builder) + o.WriteString("[") + for i := 0; i < a.Len(); i++ { + if i > 0 { + o.WriteString(" ") + } + if !a.IsValid(i) { + o.WriteString("(null)") + continue + } + sub := a.newListValue(i) + fmt.Fprintf(o, "%v", sub) + sub.Release() + } + o.WriteString("]") + return o.String() +} + +func (a *LargeList) newListValue(i int) arrow.Array { + j := i + a.array.data.offset + beg := int64(a.offsets[j]) + end := int64(a.offsets[j+1]) + return NewSlice(a.values, beg, end) +} + +func (a *LargeList) setData(data *Data) { + a.array.setData(data) + vals := data.buffers[1] + if vals != nil { + a.offsets = arrow.Int64Traits.CastFromBytes(vals.Bytes()) + } + a.values = MakeFromData(data.childData[0]) +} + +func (a *LargeList) getOneForMarshal(i int) interface{} { + if a.IsNull(i) { + return nil + } + + slice := a.newListValue(i) + defer slice.Release() + v, err := json.Marshal(slice) + if err != nil { + panic(err) + } + return json.RawMessage(v) +} + +func (a *LargeList) MarshalJSON() ([]byte, error) { + var buf bytes.Buffer + enc := json.NewEncoder(&buf) + + buf.WriteByte('[') + for i := 0; i < a.Len(); i++ { + if i != 0 { + buf.WriteByte(',') + } + if err := enc.Encode(a.getOneForMarshal(i)); err != nil { + return nil, err + } + } + buf.WriteByte(']') + return buf.Bytes(), nil +} + +func arrayEqualLargeList(left, right *LargeList) bool { + for i := 0; i < left.Len(); i++ { + if left.IsNull(i) { + continue + } + o := func() bool { + l := left.newListValue(i) + defer l.Release() + r := right.newListValue(i) + defer r.Release() + return Equal(l, r) + }() + if !o { + return false + } + } + return true +} + +// Len returns the number of elements in the array. +func (a *LargeList) Len() int { return a.array.Len() } + +func (a *LargeList) Offsets() []int64 { return a.offsets } + +func (a *LargeList) Retain() { + a.array.Retain() + a.values.Retain() +} + +func (a *LargeList) Release() { + a.array.Release() + a.values.Release() +} + +type listBuilder struct { builder etype arrow.DataType // data type of the list's elements. values Builder // value builder for the list's elements. - offsets *Int32Builder + offsets Builder + + dt arrow.DataType + appendOffsetVal func(int) +} + +type ListBuilder struct { + listBuilder +} + +type LargeListBuilder struct { + listBuilder } // NewListBuilder returns a builder, using the provided memory allocator. // The created list builder will create a list whose elements will be of type etype. func NewListBuilder(mem memory.Allocator, etype arrow.DataType) *ListBuilder { + offsetBldr := NewInt32Builder(mem) return &ListBuilder{ - builder: builder{refCount: 1, mem: mem}, - etype: etype, - values: NewBuilder(mem, etype), - offsets: NewInt32Builder(mem), + listBuilder{ + builder: builder{refCount: 1, mem: mem}, + etype: etype, + values: NewBuilder(mem, etype), + offsets: offsetBldr, + dt: arrow.ListOf(etype), + appendOffsetVal: func(o int) { offsetBldr.Append(int32(o)) }, + }, + } +} + +// NewLargeListBuilder returns a builder, using the provided memory allocator. +// The created list builder will create a list whose elements will be of type etype. +func NewLargeListBuilder(mem memory.Allocator, etype arrow.DataType) *LargeListBuilder { + offsetBldr := NewInt64Builder(mem) + return &LargeListBuilder{ + listBuilder{ + builder: builder{refCount: 1, mem: mem}, + etype: etype, + values: NewBuilder(mem, etype), + offsets: offsetBldr, + dt: arrow.LargeListOf(etype), + appendOffsetVal: func(o int) { offsetBldr.Append(int64(o)) }, + }, } } // Release decreases the reference count by 1. // When the reference count goes to zero, the memory is freed. -func (b *ListBuilder) Release() { +func (b *listBuilder) Release() { debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases") if atomic.AddInt64(&b.refCount, -1) == 0 { @@ -181,17 +330,18 @@ func (b *ListBuilder) Release() { b.offsets.Release() } -func (b *ListBuilder) appendNextOffset() { - b.offsets.Append(int32(b.values.Len())) +func (b *listBuilder) appendNextOffset() { + b.appendOffsetVal(b.values.Len()) + // b.offsets.Append(int32(b.values.Len())) } -func (b *ListBuilder) Append(v bool) { +func (b *listBuilder) Append(v bool) { b.Reserve(1) b.unsafeAppendBoolToBitmap(v) b.appendNextOffset() } -func (b *ListBuilder) AppendNull() { +func (b *listBuilder) AppendNull() { b.Reserve(1) b.unsafeAppendBoolToBitmap(false) b.appendNextOffset() @@ -199,11 +349,17 @@ func (b *ListBuilder) AppendNull() { func (b *ListBuilder) AppendValues(offsets []int32, valid []bool) { b.Reserve(len(valid)) - b.offsets.AppendValues(offsets, nil) + b.offsets.(*Int32Builder).AppendValues(offsets, nil) + b.builder.unsafeAppendBoolsToBitmap(valid, len(valid)) +} + +func (b *LargeListBuilder) AppendValues(offsets []int64, valid []bool) { + b.Reserve(len(valid)) + b.offsets.(*Int64Builder).AppendValues(offsets, nil) b.builder.unsafeAppendBoolsToBitmap(valid, len(valid)) } -func (b *ListBuilder) unsafeAppendBoolToBitmap(isValid bool) { +func (b *listBuilder) unsafeAppendBoolToBitmap(isValid bool) { if isValid { bitutil.SetBit(b.nullBitmap.Bytes(), b.length) } else { @@ -212,26 +368,26 @@ func (b *ListBuilder) unsafeAppendBoolToBitmap(isValid bool) { b.length++ } -func (b *ListBuilder) init(capacity int) { +func (b *listBuilder) init(capacity int) { b.builder.init(capacity) b.offsets.init(capacity + 1) } // Reserve ensures there is enough space for appending n elements // by checking the capacity and calling Resize if necessary. -func (b *ListBuilder) Reserve(n int) { +func (b *listBuilder) Reserve(n int) { b.builder.reserve(n, b.resizeHelper) b.offsets.Reserve(n) } // Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(), // additional memory will be allocated. If n is smaller, the allocated memory may reduced. -func (b *ListBuilder) Resize(n int) { +func (b *listBuilder) Resize(n int) { b.resizeHelper(n) b.offsets.Resize(n) } -func (b *ListBuilder) resizeHelper(n int) { +func (b *listBuilder) resizeHelper(n int) { if n < minBuilderCapacity { n = minBuilderCapacity } @@ -243,7 +399,7 @@ func (b *ListBuilder) resizeHelper(n int) { } } -func (b *ListBuilder) ValueBuilder() Builder { +func (b *listBuilder) ValueBuilder() Builder { return b.values } @@ -253,6 +409,12 @@ func (b *ListBuilder) NewArray() arrow.Array { return b.NewListArray() } +// NewArray creates a List array from the memory buffers used by the builder and resets the ListBuilder +// so it can be used to build a new array. +func (b *LargeListBuilder) NewArray() arrow.Array { + return b.NewLargeListArray() +} + // NewListArray creates a List array from the memory buffers used by the builder and resets the ListBuilder // so it can be used to build a new array. func (b *ListBuilder) NewListArray() (a *List) { @@ -265,19 +427,31 @@ func (b *ListBuilder) NewListArray() (a *List) { return } -func (b *ListBuilder) newData() (data *Data) { +// NewListArray creates a List array from the memory buffers used by the builder and resets the ListBuilder +// so it can be used to build a new array. +func (b *LargeListBuilder) NewLargeListArray() (a *LargeList) { + if b.offsets.Len() != b.length+1 { + b.appendNextOffset() + } + data := b.newData() + a = NewLargeListData(data) + data.Release() + return +} + +func (b *listBuilder) newData() (data *Data) { values := b.values.NewArray() defer values.Release() var offsets *memory.Buffer if b.offsets != nil { - arr := b.offsets.NewInt32Array() + arr := b.offsets.NewArray() defer arr.Release() offsets = arr.Data().Buffers()[1] } data = NewData( - arrow.ListOf(b.etype), b.length, + b.dt, b.length, []*memory.Buffer{ b.nullBitmap, offsets, @@ -291,7 +465,7 @@ func (b *ListBuilder) newData() (data *Data) { return } -func (b *ListBuilder) unmarshalOne(dec *json.Decoder) error { +func (b *listBuilder) unmarshalOne(dec *json.Decoder) error { t, err := dec.Token() if err != nil { return err @@ -318,7 +492,7 @@ func (b *ListBuilder) unmarshalOne(dec *json.Decoder) error { return nil } -func (b *ListBuilder) unmarshal(dec *json.Decoder) error { +func (b *listBuilder) unmarshal(dec *json.Decoder) error { for dec.More() { if err := b.unmarshalOne(dec); err != nil { return err @@ -327,7 +501,7 @@ func (b *ListBuilder) unmarshal(dec *json.Decoder) error { return nil } -func (b *ListBuilder) UnmarshalJSON(data []byte) error { +func (b *listBuilder) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) t, err := dec.Token() if err != nil { @@ -343,5 +517,7 @@ func (b *ListBuilder) UnmarshalJSON(data []byte) error { var ( _ arrow.Array = (*List)(nil) + _ arrow.Array = (*LargeList)(nil) _ Builder = (*ListBuilder)(nil) + _ Builder = (*LargeListBuilder)(nil) ) diff --git a/go/arrow/array/list_test.go b/go/arrow/array/list_test.go index a0f196fb023f2..4fdbcd830ff03 100644 --- a/go/arrow/array/list_test.go +++ b/go/arrow/array/list_test.go @@ -211,3 +211,190 @@ func TestListArraySlice(t *testing.T) { t.Fatalf("got=%q, want=%q", got, want) } } + +func TestLargeListArray(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + var ( + vs = []int32{0, 1, 2, 3, 4, 5, 6} + lengths = []int{3, 0, 4} + isValid = []bool{true, false, true} + offsets = []int64{0, 3, 3, 7} + ) + + lb := array.NewLargeListBuilder(pool, arrow.PrimitiveTypes.Int32) + defer lb.Release() + + for i := 0; i < 10; i++ { + vb := lb.ValueBuilder().(*array.Int32Builder) + vb.Reserve(len(vs)) + + pos := 0 + for i, length := range lengths { + lb.Append(isValid[i]) + for j := 0; j < length; j++ { + vb.Append(vs[pos]) + pos++ + } + } + + arr := lb.NewArray().(*array.LargeList) + defer arr.Release() + + arr.Retain() + arr.Release() + + if got, want := arr.DataType().ID(), arrow.LARGE_LIST; got != want { + t.Fatalf("got=%v, want=%v", got, want) + } + + if got, want := arr.Len(), len(isValid); got != want { + t.Fatalf("got=%d, want=%d", got, want) + } + + for i := range lengths { + if got, want := arr.IsValid(i), isValid[i]; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + if got, want := arr.IsNull(i), lengths[i] == 0; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + } + + if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) { + t.Fatalf("got=%v, want=%v", got, want) + } + + varr := arr.ListValues().(*array.Int32) + if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { + t.Fatalf("got=%v, want=%v", got, want) + } + } +} + +func TestLargeListArrayEmpty(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + lb := array.NewLargeListBuilder(pool, arrow.PrimitiveTypes.Int32) + defer lb.Release() + arr := lb.NewArray().(*array.LargeList) + defer arr.Release() + if got, want := arr.Len(), 0; got != want { + t.Fatalf("got=%d, want=%d", got, want) + } +} + +func TestLargeListArrayBulkAppend(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + var ( + vs = []int32{0, 1, 2, 3, 4, 5, 6} + lengths = []int{3, 0, 4} + isValid = []bool{true, false, true} + offsets = []int64{0, 3, 3, 7} + ) + + lb := array.NewLargeListBuilder(pool, arrow.PrimitiveTypes.Int32) + defer lb.Release() + vb := lb.ValueBuilder().(*array.Int32Builder) + vb.Reserve(len(vs)) + + lb.AppendValues(offsets, isValid) + for _, v := range vs { + vb.Append(v) + } + + arr := lb.NewArray().(*array.LargeList) + defer arr.Release() + + if got, want := arr.DataType().ID(), arrow.LARGE_LIST; got != want { + t.Fatalf("got=%v, want=%v", got, want) + } + + if got, want := arr.Len(), len(isValid); got != want { + t.Fatalf("got=%d, want=%d", got, want) + } + + for i := range lengths { + if got, want := arr.IsValid(i), isValid[i]; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + if got, want := arr.IsNull(i), lengths[i] == 0; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + } + + if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) { + t.Fatalf("got=%v, want=%v", got, want) + } + + varr := arr.ListValues().(*array.Int32) + if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { + t.Fatalf("got=%v, want=%v", got, want) + } +} + +func TestLargeListArraySlice(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + var ( + vs = []int32{0, 1, 2, 3, 4, 5, 6} + lengths = []int{3, 0, 4} + isValid = []bool{true, false, true} + offsets = []int64{0, 3, 3, 7} + ) + + lb := array.NewLargeListBuilder(pool, arrow.PrimitiveTypes.Int32) + defer lb.Release() + vb := lb.ValueBuilder().(*array.Int32Builder) + vb.Reserve(len(vs)) + + lb.AppendValues(offsets, isValid) + for _, v := range vs { + vb.Append(v) + } + + arr := lb.NewArray().(*array.LargeList) + defer arr.Release() + + if got, want := arr.DataType().ID(), arrow.LARGE_LIST; got != want { + t.Fatalf("got=%v, want=%v", got, want) + } + + if got, want := arr.Len(), len(isValid); got != want { + t.Fatalf("got=%d, want=%d", got, want) + } + + for i := range lengths { + if got, want := arr.IsValid(i), isValid[i]; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + if got, want := arr.IsNull(i), lengths[i] == 0; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + } + + if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) { + t.Fatalf("got=%v, want=%v", got, want) + } + + varr := arr.ListValues().(*array.Int32) + if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { + t.Fatalf("got=%v, want=%v", got, want) + } + + if got, want := arr.String(), `[[0 1 2] (null) [3 4 5 6]]`; got != want { + t.Fatalf("got=%q, want=%q", got, want) + } + + sub := array.NewSlice(arr, 1, 3).(*array.LargeList) + defer sub.Release() + + if got, want := sub.String(), `[(null) [3 4 5 6]]`; got != want { + t.Fatalf("got=%q, want=%q", got, want) + } +} diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index 97429429c64c1..44046c78caea3 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -228,6 +228,8 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) { switch f[1] { case 'l': // list dt = arrow.ListOfField(childFields[0]) + case 'L': // large list + dt = arrow.LargeListOfField(childFields[0]) case 'w': // fixed size list is w:# where # is the list size. listSize, err := strconv.Atoi(strings.Split(f, ":")[1]) if err != nil { @@ -288,6 +290,11 @@ func (imp *cimporter) doImportChildren() error { if err := imp.children[0].importChild(imp, children[0]); err != nil { return err } + case arrow.LARGE_LIST: // only one child to import + imp.children[0].dt = imp.dt.(*arrow.LargeListType).Elem() + if err := imp.children[0].importChild(imp, children[0]); err != nil { + return err + } case arrow.FIXED_SIZE_LIST: // only one child to import imp.children[0].dt = imp.dt.(*arrow.FixedSizeListType).Elem() if err := imp.children[0].importChild(imp, children[0]); err != nil { @@ -365,6 +372,8 @@ func (imp *cimporter) doImport(src *CArrowArray) error { return imp.importLargeStringLike() case *arrow.ListType: return imp.importListLike() + case *arrow.LargeListType: + return imp.importListLike() case *arrow.MapType: return imp.importListLike() case *arrow.FixedSizeListType: @@ -461,7 +470,8 @@ func (imp *cimporter) importListLike() error { return err } - offsets := imp.importOffsetsBuffer(1, int64(arrow.Int32SizeBytes)) + offsetSize := imp.dt.Layout().Buffers[1].ByteWidth + offsets := imp.importOffsetsBuffer(1, int64(offsetSize)) imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset)) return nil } diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go index 1a6cff8fa50c9..a3da68447db22 100644 --- a/go/arrow/cdata/cdata_exports.go +++ b/go/arrow/cdata/cdata_exports.go @@ -216,6 +216,8 @@ func (exp *schemaExporter) exportFormat(dt arrow.DataType) string { return "tin" case *arrow.ListType: return "+l" + case *arrow.LargeListType: + return "+L" case *arrow.FixedSizeListType: return fmt.Sprintf("+w:%d", dt.Len()) case *arrow.StructType: @@ -240,6 +242,9 @@ func (exp *schemaExporter) export(field arrow.Field) { case *arrow.ListType: exp.children = make([]schemaExporter, 1) exp.children[0].export(dt.ElemField()) + case *arrow.LargeListType: + exp.children = make([]schemaExporter, 1) + exp.children[0].export(dt.ElemField()) case *arrow.StructType: exp.children = make([]schemaExporter, len(dt.Fields())) for i, f := range dt.Fields() { diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go index b458e79187c17..03c01181c13ef 100644 --- a/go/arrow/cdata/cdata_test.go +++ b/go/arrow/cdata/cdata_test.go @@ -502,6 +502,23 @@ func createTestListArr() arrow.Array { return bld.NewArray() } +func createTestLargeListArr() arrow.Array { + bld := array.NewLargeListBuilder(memory.DefaultAllocator, arrow.PrimitiveTypes.Int8) + defer bld.Release() + + vb := bld.ValueBuilder().(*array.Int8Builder) + + bld.Append(true) + vb.AppendValues([]int8{1, 2}, []bool{true, true}) + + bld.Append(true) + vb.AppendValues([]int8{3, 0}, []bool{true, false}) + + bld.AppendNull() + + return bld.NewArray() +} + func createTestFixedSizeList() arrow.Array { bld := array.NewFixedSizeListBuilder(memory.DefaultAllocator, 2, arrow.PrimitiveTypes.Int64) defer bld.Release() @@ -565,6 +582,7 @@ func TestNestedArrays(t *testing.T) { fn func() arrow.Array }{ {"list", createTestListArr}, + {"large list", createTestLargeListArr}, {"fixed size list", createTestFixedSizeList}, {"struct", createTestStructArr}, {"map", createTestMapArr}, diff --git a/go/arrow/datatype_nested.go b/go/arrow/datatype_nested.go index 4b5118534fb64..bf2804c89f36f 100644 --- a/go/arrow/datatype_nested.go +++ b/go/arrow/datatype_nested.go @@ -96,6 +96,57 @@ func (ListType) Layout() DataTypeLayout { func (ListType) OffsetTypeTraits() OffsetTraits { return Int32Traits } +type LargeListType struct { + ListType +} + +func (LargeListType) ID() Type { return LARGE_LIST } +func (LargeListType) Name() string { return "large_list" } +func (t *LargeListType) String() string { + return "large_" + t.ListType.String() +} + +func (t *LargeListType) Fingerprint() string { + child := t.elem.Type.Fingerprint() + if len(child) > 0 { + return typeFingerprint(t) + "{" + child + "}" + } + return "" +} + +func (LargeListType) Layout() DataTypeLayout { + return DataTypeLayout{Buffers: []BufferSpec{SpecBitmap(), SpecFixedWidth(Int64SizeBytes)}} +} + +func (LargeListType) OffsetTypeTraits() OffsetTraits { return Int64Traits } + +func LargeListOfField(f Field) *LargeListType { + if f.Type == nil { + panic("arrow: nil type for list field") + } + return &LargeListType{ListType{elem: f}} +} + +// ListOf returns the list type with element type t. +// For example, if t represents int32, ListOf(t) represents []int32. +// +// ListOf panics if t is nil or invalid. NullableElem defaults to true +func LargeListOf(t DataType) *LargeListType { + if t == nil { + panic("arrow: nil DataType") + } + return &LargeListType{ListType{elem: Field{Name: "item", Type: t, Nullable: true}}} +} + +// ListOfNonNullable is like ListOf but NullableElem defaults to false, indicating +// that the child type should be marked as non-nullable. +func LargeListOfNonNullable(t DataType) *LargeListType { + if t == nil { + panic("arrow: nil DataType") + } + return &LargeListType{ListType{elem: Field{Name: "item", Type: t, Nullable: false}}} +} + // FixedSizeListType describes a nested type in which each array slot contains // a fixed-size sequence of values, all having the same relative type. type FixedSizeListType struct { @@ -325,7 +376,7 @@ func (t *MapType) Fingerprint() string { return fingerprint + "{" + keyFingerprint + itemFingerprint + "}" } -func (t *MapType) Fields() []Field { return t.ValueType().Fields() } +func (t *MapType) Fields() []Field { return []Field{t.ValueField()} } func (t *MapType) Layout() DataTypeLayout { return t.value.Layout() diff --git a/go/arrow/internal/arrdata/arrdata.go b/go/arrow/internal/arrdata/arrdata.go index 897f66f3dbeb7..38c1774c69d45 100644 --- a/go/arrow/internal/arrdata/arrdata.go +++ b/go/arrow/internal/arrdata/arrdata.go @@ -1445,5 +1445,16 @@ func buildArray(bldr array.Builder, data arrow.Array) { bldr.AppendNull() } } + + case *array.LargeStringBuilder: + data := data.(*array.LargeString) + for i := 0; i < data.Len(); i++ { + switch { + case data.IsValid(i): + bldr.Append(data.Value(i)) + default: + bldr.AppendNull() + } + } } } diff --git a/go/arrow/internal/arrjson/arrjson.go b/go/arrow/internal/arrjson/arrjson.go index b85da410bc8fd..d8cd259e1ddce 100644 --- a/go/arrow/internal/arrjson/arrjson.go +++ b/go/arrow/internal/arrjson/arrjson.go @@ -205,6 +205,8 @@ func typeToJSON(arrowType arrow.DataType) (json.RawMessage, error) { } case *arrow.ListType: typ = nameJSON{"list"} + case *arrow.LargeListType: + typ = nameJSON{"largelist"} case *arrow.MapType: typ = mapJSON{Name: "map", KeysSorted: dt.KeysSorted} case *arrow.StructType: @@ -384,6 +386,13 @@ func typeFromJSON(typ json.RawMessage, children []FieldWrapper) (arrowType arrow Metadata: children[0].arrowMeta, Nullable: children[0].Nullable, }) + case "largelist": + arrowType = arrow.LargeListOfField(arrow.Field{ + Name: children[0].Name, + Type: children[0].arrowType, + Metadata: children[0].arrowMeta, + Nullable: children[0].Nullable, + }) case "map": t := mapJSON{} if err = json.Unmarshal(typ, &t); err != nil { @@ -646,15 +655,8 @@ func fieldsToJSON(fields []arrow.Field, parentPos dictutils.FieldPos, mapper *di } } - switch dt := typ.(type) { - case *arrow.ListType: - o[i].Children = fieldsToJSON([]arrow.Field{dt.ElemField()}, pos, mapper) - case *arrow.FixedSizeListType: - o[i].Children = fieldsToJSON([]arrow.Field{dt.ElemField()}, pos, mapper) - case *arrow.StructType: + if dt, ok := typ.(arrow.NestedType); ok { o[i].Children = fieldsToJSON(dt.Fields(), pos, mapper) - case *arrow.MapType: - o[i].Children = fieldsToJSON([]arrow.Field{dt.ValueField()}, pos, mapper) } } return o @@ -971,6 +973,19 @@ func arrayFromJSON(mem memory.Allocator, dt arrow.DataType, arr Array) arrow.Arr memory.NewBufferBytes(arrow.Int32Traits.CastToBytes(arr.Offset.([]int32)))}, []arrow.ArrayData{elems}, nulls, 0) + case *arrow.LargeListType: + valids := validsFromJSON(arr.Valids) + elems := arrayFromJSON(mem, dt.Elem(), arr.Children[0]) + defer elems.Release() + + bitmap := validsToBitmap(valids, mem) + defer bitmap.Release() + + nulls := arr.Count - bitutil.CountSetBits(bitmap.Bytes(), 0, arr.Count) + return array.NewData(dt, arr.Count, []*memory.Buffer{bitmap, + memory.NewBufferBytes(arrow.Int64Traits.CastToBytes(arr.Offset.([]int64)))}, + []arrow.ArrayData{elems}, nulls, 0) + case *arrow.FixedSizeListType: valids := validsFromJSON(arr.Valids) elems := arrayFromJSON(mem, dt.Elem(), arr.Children[0]) @@ -1287,6 +1302,22 @@ func arrayToJSON(field arrow.Field, arr arrow.Array) Array { } return o + case *array.LargeList: + offsets := arr.Offsets() + strOffsets := make([]string, len(offsets)) + for i, o := range offsets { + strOffsets[i] = strconv.FormatInt(o, 10) + } + return Array{ + Name: field.Name, + Count: arr.Len(), + Valids: validsToJSON(arr), + Offset: strOffsets, + Children: []Array{ + arrayToJSON(arrow.Field{Name: "item", Type: arr.DataType().(*arrow.LargeListType).Elem()}, arr.ListValues()), + }, + } + case *array.Map: o := Array{ Name: field.Name, diff --git a/go/arrow/internal/arrjson/arrjson_test.go b/go/arrow/internal/arrjson/arrjson_test.go index 2b25fb6c99caa..eccdb9aee37ed 100644 --- a/go/arrow/internal/arrjson/arrjson_test.go +++ b/go/arrow/internal/arrjson/arrjson_test.go @@ -21,12 +21,12 @@ import ( "io" "io/ioutil" "os" - "strings" "testing" "github.com/apache/arrow/go/v10/arrow/array" "github.com/apache/arrow/go/v10/arrow/internal/arrdata" "github.com/apache/arrow/go/v10/arrow/memory" + "github.com/stretchr/testify/assert" ) func TestReadWrite(t *testing.T) { @@ -87,9 +87,7 @@ func TestReadWrite(t *testing.T) { } fileBytes, _ := ioutil.ReadFile(f.Name()) - if wantJSONs[name] != strings.TrimSpace(string(fileBytes)) { - t.Fatalf("not expected JSON pretty output for case: %v", name) - } + assert.JSONEq(t, wantJSONs[name], string(fileBytes)) _, err = f.Seek(0, io.SeekStart) if err != nil { @@ -1115,6 +1113,34 @@ func makeStructsWantJSONs() string { "", "4444", "4555" + ], + "OFFSET": [ + 0, + 3, + 3, + 3, + 6, + 9, + 13, + 13, + 13, + 17, + 21, + 25, + 25, + 25, + 29, + 33, + 37, + 37, + 37, + 41, + 45, + 49, + 49, + 49, + 53, + 57 ] } ] @@ -1269,6 +1295,34 @@ func makeStructsWantJSONs() string { "", "-4444", "-4555" + ], + "OFFSET": [ + 0, + 4, + 4, + 4, + 8, + 12, + 17, + 17, + 17, + 22, + 27, + 32, + 32, + 32, + 37, + 42, + 47, + 47, + 47, + 52, + 57, + 62, + 62, + 62, + 67, + 72 ] } ] @@ -1315,13 +1369,7 @@ func makeListsWantJSONs() string { 1, 1, 1 - ], - "OFFSET": [ - 0, - 5, - 10, - 15 - ], + ], "children": [ { "name": "item", @@ -1361,6 +1409,12 @@ func makeListsWantJSONs() string { 25 ] } + ], + "OFFSET": [ + 0, + 5, + 10, + 15 ] } ] @@ -1376,12 +1430,6 @@ func makeListsWantJSONs() string { 1, 1 ], - "OFFSET": [ - 0, - 5, - 10, - 15 - ], "children": [ { "name": "item", @@ -1421,6 +1469,12 @@ func makeListsWantJSONs() string { -25 ] } + ], + "OFFSET": [ + 0, + 5, + 10, + 15 ] } ] @@ -1436,12 +1490,6 @@ func makeListsWantJSONs() string { 0, 1 ], - "OFFSET": [ - 0, - 5, - 10, - 15 - ], "children": [ { "name": "item", @@ -1481,6 +1529,12 @@ func makeListsWantJSONs() string { -25 ] } + ], + "OFFSET": [ + 0, + 5, + 10, + 15 ] } ] @@ -1491,14 +1545,14 @@ func makeListsWantJSONs() string { { "name": "list_nullable", "count": 0, - "OFFSET": [ - 0 - ], "children": [ { "name": "item", "count": 0 } + ], + "OFFSET": [ + 0 ] } ] @@ -1706,6 +1760,14 @@ func makeStringsWantJSONs() string { "3", "4", "5" + ], + "OFFSET": [ + 0, + 3, + 4, + 5, + 6, + 7 ] }, { @@ -1755,6 +1817,14 @@ func makeStringsWantJSONs() string { "33", "44", "55" + ], + "OFFSET": [ + 0, + 2, + 4, + 6, + 8, + 10 ] }, { @@ -1804,6 +1874,14 @@ func makeStringsWantJSONs() string { "333", "444", "555" + ], + "OFFSET": [ + 0, + 3, + 6, + 9, + 12, + 15 ] }, { @@ -3408,12 +3486,7 @@ func makeMapsWantJSONs() string { "VALIDITY": [ 1, 0 - ], - "OFFSET": [ - 0, - 25, - 50 - ], + ], "children": [ { "name": "entries", @@ -3685,10 +3758,68 @@ func makeMapsWantJSONs() string { "", "-4444", "-4555" + ], + "OFFSET": [ + 0, + 3, + 3, + 3, + 6, + 9, + 13, + 13, + 13, + 17, + 21, + 25, + 25, + 25, + 29, + 33, + 37, + 37, + 37, + 41, + 45, + 49, + 49, + 49, + 53, + 57, + 61, + 61, + 61, + 65, + 69, + 74, + 74, + 74, + 79, + 84, + 89, + 89, + 89, + 94, + 99, + 104, + 104, + 104, + 109, + 114, + 119, + 119, + 119, + 124, + 129 ] } ] } + ], + "OFFSET": [ + 0, + 25, + 50 ] } ] @@ -3703,11 +3834,6 @@ func makeMapsWantJSONs() string { 1, 0 ], - "OFFSET": [ - 0, - 25, - 50 - ], "children": [ { "name": "entries", @@ -3979,10 +4105,68 @@ func makeMapsWantJSONs() string { "", "4444", "4555" + ], + "OFFSET": [ + 0, + 4, + 4, + 4, + 8, + 12, + 17, + 17, + 17, + 22, + 27, + 32, + 32, + 32, + 37, + 42, + 47, + 47, + 47, + 52, + 57, + 62, + 62, + 62, + 67, + 72, + 75, + 75, + 75, + 78, + 81, + 85, + 85, + 85, + 89, + 93, + 97, + 97, + 97, + 101, + 105, + 109, + 109, + 109, + 113, + 117, + 121, + 121, + 121, + 125, + 129 ] } ] } + ], + "OFFSET": [ + 0, + 25, + 50 ] } ] diff --git a/go/arrow/ipc/file_reader.go b/go/arrow/ipc/file_reader.go index 7b835dacda51f..90c145648d092 100644 --- a/go/arrow/ipc/file_reader.go +++ b/go/arrow/ipc/file_reader.go @@ -478,6 +478,9 @@ func (ctx *arrayLoaderContext) loadArray(dt arrow.DataType) arrow.ArrayData { case *arrow.ListType: return ctx.loadList(dt) + case *arrow.LargeListType: + return ctx.loadList(dt) + case *arrow.FixedSizeListType: return ctx.loadFixedSizeList(dt) @@ -571,7 +574,12 @@ func (ctx *arrayLoaderContext) loadMap(dt *arrow.MapType) arrow.ArrayData { return array.NewData(dt, int(field.Length()), buffers, []arrow.ArrayData{sub}, int(field.NullCount()), 0) } -func (ctx *arrayLoaderContext) loadList(dt *arrow.ListType) arrow.ArrayData { +type listLike interface { + arrow.DataType + Elem() arrow.DataType +} + +func (ctx *arrayLoaderContext) loadList(dt listLike) arrow.ArrayData { field, buffers := ctx.loadCommon(2) buffers = append(buffers, ctx.buffer()) defer releaseBuffers(buffers) diff --git a/go/arrow/ipc/metadata.go b/go/arrow/ipc/metadata.go index c2cd3a06ba196..69f808ae25f52 100644 --- a/go/arrow/ipc/metadata.go +++ b/go/arrow/ipc/metadata.go @@ -370,6 +370,12 @@ func (fv *fieldVisitor) visit(field arrow.Field) { flatbuf.ListStart(fv.b) fv.offset = flatbuf.ListEnd(fv.b) + case *arrow.LargeListType: + fv.dtype = flatbuf.TypeLargeList + fv.kids = append(fv.kids, fieldToFB(fv.b, fv.pos.Child(0), dt.ElemField(), fv.memo)) + flatbuf.LargeListStart(fv.b) + fv.offset = flatbuf.LargeListEnd(fv.b) + case *arrow.FixedSizeListType: fv.dtype = flatbuf.TypeFixedSizeList fv.kids = append(fv.kids, fieldToFB(fv.b, fv.pos.Child(0), dt.ElemField(), fv.memo)) @@ -655,6 +661,13 @@ func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arr dt := arrow.ListOfField(children[0]) return dt, nil + case flatbuf.TypeLargeList: + if len(children) != 1 { + return nil, fmt.Errorf("arrow/ipc: LargeList must have exactly 1 child field (got=%d)", len(children)) + } + dt := arrow.LargeListOfField(children[0]) + return dt, nil + case flatbuf.TypeFixedSizeList: var dt flatbuf.FixedSizeList dt.Init(data.Bytes, data.Pos) diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index 2a7b3aac920a1..9d85836cb22d0 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -733,6 +733,43 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { } err = w.visit(p, values) + if err != nil { + return fmt.Errorf("could not visit list element for array %T: %w", arr, err) + } + w.depth++ + case *arrow.LargeListType: + arr := arr.(*array.LargeList) + voffsets, err := w.getZeroBasedValueOffsets(arr) + if err != nil { + return fmt.Errorf("could not retrieve zero-based value offsets for array %T: %w", arr, err) + } + p.body = append(p.body, voffsets) + + w.depth-- + var ( + values = arr.ListValues() + mustRelease = false + values_offset int64 + values_length int64 + ) + defer func() { + if mustRelease { + values.Release() + } + }() + + if voffsets != nil { + values_offset = arr.Offsets()[0] + values_length = arr.Offsets()[arr.Len()] - values_offset + } + + if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) { + // must also slice the values + values = array.NewSlice(values, values_offset, values_length) + mustRelease = true + } + err = w.visit(p, values) + if err != nil { return fmt.Errorf("could not visit list element for array %T: %w", arr, err) } diff --git a/go/arrow/scalar/nested.go b/go/arrow/scalar/nested.go index 3d7a1ff197903..756e383f5a7b6 100644 --- a/go/arrow/scalar/nested.go +++ b/go/arrow/scalar/nested.go @@ -66,6 +66,8 @@ func (l *List) Validate() (err error) { switch dt := l.Type.(type) { case *arrow.ListType: valueType = dt.Elem() + case *arrow.LargeListType: + valueType = dt.Elem() case *arrow.FixedSizeListType: valueType = dt.Elem() case *arrow.MapType: @@ -120,6 +122,18 @@ func NewListScalarData(val arrow.ArrayData) *List { return &List{scalar{arrow.ListOf(val.DataType()), true}, array.MakeFromData(val)} } +type LargeList struct { + *List +} + +func NewLargeListScalar(val arrow.Array) *LargeList { + return &LargeList{&List{scalar{arrow.LargeListOf(val.DataType()), true}, array.MakeFromData(val.Data())}} +} + +func NewLargeListScalarData(val arrow.ArrayData) *LargeList { + return &LargeList{&List{scalar{arrow.LargeListOf(val.DataType()), true}, array.MakeFromData(val)}} +} + func makeMapType(typ *arrow.StructType) *arrow.MapType { debug.Assert(len(typ.Fields()) == 2, "must pass struct with only 2 fields for MapScalar") return arrow.MapOf(typ.Field(0).Type, typ.Field(1).Type) diff --git a/go/arrow/scalar/numeric.gen.go b/go/arrow/scalar/numeric.gen.go index 56a09ece5c5bd..0dd0aca3d42c4 100644 --- a/go/arrow/scalar/numeric.gen.go +++ b/go/arrow/scalar/numeric.gen.go @@ -81,6 +81,8 @@ func (s *Int8) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type int8 to type %s", dt) @@ -145,6 +147,8 @@ func (s *Int16) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type int16 to type %s", dt) @@ -209,6 +213,8 @@ func (s *Int32) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type int32 to type %s", dt) @@ -273,6 +279,8 @@ func (s *Int64) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type int64 to type %s", dt) @@ -337,6 +345,8 @@ func (s *Uint8) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type uint8 to type %s", dt) @@ -401,6 +411,8 @@ func (s *Uint16) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type uint16 to type %s", dt) @@ -465,6 +477,8 @@ func (s *Uint32) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type uint32 to type %s", dt) @@ -529,6 +543,8 @@ func (s *Uint64) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type uint64 to type %s", dt) @@ -593,6 +609,8 @@ func (s *Float32) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type float32 to type %s", dt) @@ -657,6 +675,8 @@ func (s *Float64) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type float64 to type %s", dt) diff --git a/go/arrow/scalar/numeric.gen.go.tmpl b/go/arrow/scalar/numeric.gen.go.tmpl index dc1a54586ab9b..07df9fc1d4b15 100644 --- a/go/arrow/scalar/numeric.gen.go.tmpl +++ b/go/arrow/scalar/numeric.gen.go.tmpl @@ -72,6 +72,8 @@ func (s *{{.Name}}) CastTo(dt arrow.DataType) (Scalar, error) { return NewMonthIntervalScalar(arrow.MonthInterval(s.Value)), nil case *arrow.StringType: return NewStringScalar(fmt.Sprintf("%v", s.Value)), nil + case *arrow.LargeStringType: + return NewLargeStringScalar(fmt.Sprintf("%v", s.Value)), nil } return nil, fmt.Errorf("invalid scalar cast from type {{.Type}} to type %s", dt) diff --git a/go/arrow/scalar/parse.go b/go/arrow/scalar/parse.go index 571f741d5761a..d22647435b1fd 100644 --- a/go/arrow/scalar/parse.go +++ b/go/arrow/scalar/parse.go @@ -416,6 +416,11 @@ func MakeScalarParam(val interface{}, dt arrow.DataType) (Scalar, error) { return nil, fmt.Errorf("inconsistent type for list scalar array and data type") } return NewListScalar(v), nil + case arrow.LARGE_LIST: + if !arrow.TypeEqual(v.DataType(), dt.(*arrow.LargeListType).Elem()) { + return nil, fmt.Errorf("inconsistent type for large list scalar array and data type") + } + return NewLargeListScalar(v), nil case arrow.FIXED_SIZE_LIST: if !arrow.TypeEqual(v.DataType(), dt.(*arrow.FixedSizeListType).Elem()) { return nil, fmt.Errorf("inconsistent type for list scalar array and data type") diff --git a/go/arrow/scalar/scalar.go b/go/arrow/scalar/scalar.go index 5a6709c4857f3..a96e0593c4045 100644 --- a/go/arrow/scalar/scalar.go +++ b/go/arrow/scalar/scalar.go @@ -445,9 +445,9 @@ func init() { arrow.SPARSE_UNION: unsupportedScalarType, arrow.DENSE_UNION: unsupportedScalarType, arrow.DICTIONARY: func(dt arrow.DataType) Scalar { return NewNullDictScalar(dt) }, - arrow.LARGE_STRING: unsupportedScalarType, - arrow.LARGE_BINARY: unsupportedScalarType, - arrow.LARGE_LIST: unsupportedScalarType, + arrow.LARGE_STRING: func(dt arrow.DataType) Scalar { return &LargeString{&String{&Binary{scalar: scalar{dt, false}}}} }, + arrow.LARGE_BINARY: func(dt arrow.DataType) Scalar { return &LargeBinary{&Binary{scalar: scalar{dt, false}}} }, + arrow.LARGE_LIST: func(dt arrow.DataType) Scalar { return &LargeList{&List{scalar: scalar{dt, false}}} }, arrow.DECIMAL256: unsupportedScalarType, arrow.MAP: func(dt arrow.DataType) Scalar { return &Map{&List{scalar: scalar{dt, false}}} }, arrow.EXTENSION: func(dt arrow.DataType) Scalar { return &Extension{scalar: scalar{dt, false}} }, From 51b57e3e6ae613089848a2bdb1a60301931d8f78 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 1 Aug 2022 17:03:34 -0400 Subject: [PATCH 05/13] missed a spot in test generation --- go/arrow/cdata/cdata_test_framework.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/arrow/cdata/cdata_test_framework.go b/go/arrow/cdata/cdata_test_framework.go index 96520d8efcb19..bb4db1e339be0 100644 --- a/go/arrow/cdata/cdata_test_framework.go +++ b/go/arrow/cdata/cdata_test_framework.go @@ -208,6 +208,10 @@ func createCArr(arr arrow.Array) *CArrowArray { clist := []*CArrowArray{createCArr(arr.ListValues())} children = (**CArrowArray)(unsafe.Pointer(&clist[0])) nchildren += 1 + case *array.LargeList: + clist := []*CArrowArray{createCArr(arr.ListValues())} + children = (**CArrowArray)(unsafe.Pointer(&clist[0])) + nchildren += 1 case *array.FixedSizeList: clist := []*CArrowArray{createCArr(arr.ListValues())} children = (**CArrowArray)(unsafe.Pointer(&clist[0])) From 716de4f401417ef0f2b2435d5fee99b5d7fa863a Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Mon, 1 Aug 2022 20:22:32 -0400 Subject: [PATCH 06/13] add endian swap for large types --- go/arrow/ipc/endian_swap.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/go/arrow/ipc/endian_swap.go b/go/arrow/ipc/endian_swap.go index 569e32091d99e..f6ccc0d1a6c1e 100644 --- a/go/arrow/ipc/endian_swap.go +++ b/go/arrow/ipc/endian_swap.go @@ -60,15 +60,16 @@ func swapChildren(children []arrow.ArrayData) (err error) { func swapType(dt arrow.DataType, data *array.Data) (err error) { switch dt.ID() { case arrow.BINARY, arrow.STRING: - swapOffsets(1, data) + swapOffsets(1, 32, data) + return + case arrow.LARGE_BINARY, arrow.LARGE_STRING: + swapOffsets(1, 64, data) return case arrow.NULL, arrow.BOOL, arrow.INT8, arrow.UINT8, arrow.FIXED_SIZE_BINARY, arrow.FIXED_SIZE_LIST, arrow.STRUCT: return case arrow.DENSE_UNION, arrow.SPARSE_UNION: panic("arrow endian swap not yet implemented for union types") - case arrow.LARGE_BINARY, arrow.LARGE_LIST, arrow.LARGE_STRING: - panic("arrow endian swap not yet implemented for large types") } switch dt := dt.(type) { @@ -82,9 +83,11 @@ func swapType(dt arrow.DataType, data *array.Data) (err error) { rawdata[idx+1] = tmp } case *arrow.ListType: - swapOffsets(1, data) + swapOffsets(1, 32, data) + case *arrow.LargeListType: + swapOffsets(1, 64, data) case *arrow.MapType: - swapOffsets(1, data) + swapOffsets(1, 32, data) case *arrow.DayTimeIntervalType: byteSwapBuffer(32, data.Buffers()[1]) case *arrow.MonthDayNanoIntervalType: @@ -133,12 +136,12 @@ func byteSwapBuffer(bw int, buf *memory.Buffer) { } } -func swapOffsets(index int, data *array.Data) { +func swapOffsets(index int, bitWidth int, data *array.Data) { if data.Buffers()[index] == nil || data.Buffers()[index].Len() == 0 { return } // other than unions, offset has one more element than the data.length // don't yet implement large types, so hardcode 32bit offsets for now - byteSwapBuffer(32, data.Buffers()[index]) + byteSwapBuffer(bitWidth, data.Buffers()[index]) } From 6ec8283ab5a1a4f33e695ff69d49a5b1355ce9c3 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 2 Aug 2022 12:01:36 -0400 Subject: [PATCH 07/13] Update go/arrow/array/list.go Co-authored-by: Antoine Pitrou --- go/arrow/array/list.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go index 19933181b7612..52d54c84ef48a 100644 --- a/go/arrow/array/list.go +++ b/go/arrow/array/list.go @@ -409,7 +409,7 @@ func (b *ListBuilder) NewArray() arrow.Array { return b.NewListArray() } -// NewArray creates a List array from the memory buffers used by the builder and resets the ListBuilder +// NewArray creates a LargeList array from the memory buffers used by the builder and resets the LargeListBuilder // so it can be used to build a new array. func (b *LargeListBuilder) NewArray() arrow.Array { return b.NewLargeListArray() From 4e07896ca44b9cdd0590532a35096033a79d7acd Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 2 Aug 2022 12:04:40 -0400 Subject: [PATCH 08/13] Apply suggestions from code review Co-authored-by: Antoine Pitrou --- go/arrow/array/list.go | 2 +- go/arrow/array/list_test.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go index 52d54c84ef48a..6d8a776be3152 100644 --- a/go/arrow/array/list.go +++ b/go/arrow/array/list.go @@ -427,7 +427,7 @@ func (b *ListBuilder) NewListArray() (a *List) { return } -// NewListArray creates a List array from the memory buffers used by the builder and resets the ListBuilder +// NewLargeListArray creates a List array from the memory buffers used by the builder and resets the LargeListBuilder // so it can be used to build a new array. func (b *LargeListBuilder) NewLargeListArray() (a *LargeList) { if b.offsets.Len() != b.length+1 { diff --git a/go/arrow/array/list_test.go b/go/arrow/array/list_test.go index 4fdbcd830ff03..a2c036ec5f5e6 100644 --- a/go/arrow/array/list_test.go +++ b/go/arrow/array/list_test.go @@ -218,9 +218,9 @@ func TestLargeListArray(t *testing.T) { var ( vs = []int32{0, 1, 2, 3, 4, 5, 6} - lengths = []int{3, 0, 4} - isValid = []bool{true, false, true} - offsets = []int64{0, 3, 3, 7} + lengths = []int{3, 0, 0, 4} + isValid = []bool{true, false, true, true} + offsets = []int64{0, 3, 3, 3, 7} ) lb := array.NewLargeListBuilder(pool, arrow.PrimitiveTypes.Int32) From f5478df9a707ff6b2591fae8bbbee58b067b0a4d Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 2 Aug 2022 13:25:16 -0400 Subject: [PATCH 09/13] slight refactoring based on feedback to reduce duplication --- go/arrow/array/binary.go | 14 + go/arrow/array/builder.go | 2 + go/arrow/array/list.go | 63 ++-- go/arrow/array/list_test.go | 577 +++++++++++++++--------------------- go/arrow/array/string.go | 8 + go/arrow/cdata/cdata.go | 47 ++- go/arrow/ipc/writer.go | 148 +-------- 7 files changed, 335 insertions(+), 524 deletions(-) diff --git a/go/arrow/array/binary.go b/go/arrow/array/binary.go index 6be1909b513f9..0ce181e9d77c5 100644 --- a/go/arrow/array/binary.go +++ b/go/arrow/array/binary.go @@ -26,6 +26,12 @@ import ( "github.com/goccy/go-json" ) +type BinaryLike interface { + arrow.Array + ValueBytes() []byte + ValueOffset64(int) int64 +} + // A type which represents an immutable sequence of variable-length binary strings. type Binary struct { array @@ -64,6 +70,10 @@ func (a *Binary) ValueOffset(i int) int { return int(a.valueOffsets[a.array.data.offset+i]) } +func (a *Binary) ValueOffset64(i int) int64 { + return int64(a.ValueOffset(i)) +} + func (a *Binary) ValueLen(i int) int { if i < 0 || i >= a.array.data.length { panic("arrow/array: index out of range") @@ -193,6 +203,10 @@ func (a *LargeBinary) ValueOffset(i int) int64 { return a.valueOffsets[a.array.data.offset+i] } +func (a *LargeBinary) ValueOffset64(i int) int64 { + return a.ValueOffset(i) +} + func (a *LargeBinary) ValueLen(i int) int { if i < 0 || i >= a.array.data.length { panic("arrow/array: index out of range") diff --git a/go/arrow/array/builder.go b/go/arrow/array/builder.go index f68d59ac4f2af..321e8a95124f3 100644 --- a/go/arrow/array/builder.go +++ b/go/arrow/array/builder.go @@ -303,6 +303,8 @@ func NewBuilder(mem memory.Allocator, dtype arrow.DataType) Builder { typ := dtype.(*arrow.DictionaryType) return NewDictionaryBuilder(mem, typ) case arrow.LARGE_LIST: + typ := dtype.(*arrow.LargeListType) + return NewLargeListBuilder(mem, typ.Elem()) case arrow.MAP: typ := dtype.(*arrow.MapType) return NewMapBuilder(mem, typ.KeyType(), typ.ItemType(), typ.KeysSorted) diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go index 6d8a776be3152..1b07a3128df52 100644 --- a/go/arrow/array/list.go +++ b/go/arrow/array/list.go @@ -29,6 +29,12 @@ import ( "github.com/goccy/go-json" ) +type ListLike interface { + arrow.Array + ListValues() arrow.Array + ValueOffsets(i int) (start, end int64) +} + // List represents an immutable sequence of array values. type List struct { array @@ -146,6 +152,12 @@ func (a *List) Release() { a.values.Release() } +func (a *List) ValueOffsets(i int) (start, end int64) { + debug.Assert(i >= 0 && i < a.array.data.length, "index out of range") + start, end = int64(a.offsets[i]), int64(a.offsets[i+1]) + return +} + // LargeList represents an immutable sequence of array values. type LargeList struct { array @@ -253,6 +265,12 @@ func (a *LargeList) Len() int { return a.array.Len() } func (a *LargeList) Offsets() []int64 { return a.offsets } +func (a *LargeList) ValueOffsets(i int) (start, end int64) { + debug.Assert(i >= 0 && i < a.array.data.length, "index out of range") + start, end = a.offsets[i], a.offsets[i+1] + return +} + func (a *LargeList) Retain() { a.array.Retain() a.values.Retain() @@ -263,23 +281,30 @@ func (a *LargeList) Release() { a.values.Release() } -type listBuilder struct { +type baseListBuilder struct { builder etype arrow.DataType // data type of the list's elements. values Builder // value builder for the list's elements. offsets Builder + // actual list type dt arrow.DataType appendOffsetVal func(int) } +type ListLikeBuilder interface { + Builder + ValueBuilder() Builder + Append(bool) +} + type ListBuilder struct { - listBuilder + baseListBuilder } type LargeListBuilder struct { - listBuilder + baseListBuilder } // NewListBuilder returns a builder, using the provided memory allocator. @@ -287,7 +312,7 @@ type LargeListBuilder struct { func NewListBuilder(mem memory.Allocator, etype arrow.DataType) *ListBuilder { offsetBldr := NewInt32Builder(mem) return &ListBuilder{ - listBuilder{ + baseListBuilder{ builder: builder{refCount: 1, mem: mem}, etype: etype, values: NewBuilder(mem, etype), @@ -303,7 +328,7 @@ func NewListBuilder(mem memory.Allocator, etype arrow.DataType) *ListBuilder { func NewLargeListBuilder(mem memory.Allocator, etype arrow.DataType) *LargeListBuilder { offsetBldr := NewInt64Builder(mem) return &LargeListBuilder{ - listBuilder{ + baseListBuilder{ builder: builder{refCount: 1, mem: mem}, etype: etype, values: NewBuilder(mem, etype), @@ -316,7 +341,7 @@ func NewLargeListBuilder(mem memory.Allocator, etype arrow.DataType) *LargeListB // Release decreases the reference count by 1. // When the reference count goes to zero, the memory is freed. -func (b *listBuilder) Release() { +func (b *baseListBuilder) Release() { debug.Assert(atomic.LoadInt64(&b.refCount) > 0, "too many releases") if atomic.AddInt64(&b.refCount, -1) == 0 { @@ -330,18 +355,18 @@ func (b *listBuilder) Release() { b.offsets.Release() } -func (b *listBuilder) appendNextOffset() { +func (b *baseListBuilder) appendNextOffset() { b.appendOffsetVal(b.values.Len()) // b.offsets.Append(int32(b.values.Len())) } -func (b *listBuilder) Append(v bool) { +func (b *baseListBuilder) Append(v bool) { b.Reserve(1) b.unsafeAppendBoolToBitmap(v) b.appendNextOffset() } -func (b *listBuilder) AppendNull() { +func (b *baseListBuilder) AppendNull() { b.Reserve(1) b.unsafeAppendBoolToBitmap(false) b.appendNextOffset() @@ -359,7 +384,7 @@ func (b *LargeListBuilder) AppendValues(offsets []int64, valid []bool) { b.builder.unsafeAppendBoolsToBitmap(valid, len(valid)) } -func (b *listBuilder) unsafeAppendBoolToBitmap(isValid bool) { +func (b *baseListBuilder) unsafeAppendBoolToBitmap(isValid bool) { if isValid { bitutil.SetBit(b.nullBitmap.Bytes(), b.length) } else { @@ -368,26 +393,26 @@ func (b *listBuilder) unsafeAppendBoolToBitmap(isValid bool) { b.length++ } -func (b *listBuilder) init(capacity int) { +func (b *baseListBuilder) init(capacity int) { b.builder.init(capacity) b.offsets.init(capacity + 1) } // Reserve ensures there is enough space for appending n elements // by checking the capacity and calling Resize if necessary. -func (b *listBuilder) Reserve(n int) { +func (b *baseListBuilder) Reserve(n int) { b.builder.reserve(n, b.resizeHelper) b.offsets.Reserve(n) } // Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(), // additional memory will be allocated. If n is smaller, the allocated memory may reduced. -func (b *listBuilder) Resize(n int) { +func (b *baseListBuilder) Resize(n int) { b.resizeHelper(n) b.offsets.Resize(n) } -func (b *listBuilder) resizeHelper(n int) { +func (b *baseListBuilder) resizeHelper(n int) { if n < minBuilderCapacity { n = minBuilderCapacity } @@ -399,7 +424,7 @@ func (b *listBuilder) resizeHelper(n int) { } } -func (b *listBuilder) ValueBuilder() Builder { +func (b *baseListBuilder) ValueBuilder() Builder { return b.values } @@ -439,7 +464,7 @@ func (b *LargeListBuilder) NewLargeListArray() (a *LargeList) { return } -func (b *listBuilder) newData() (data *Data) { +func (b *baseListBuilder) newData() (data *Data) { values := b.values.NewArray() defer values.Release() @@ -465,7 +490,7 @@ func (b *listBuilder) newData() (data *Data) { return } -func (b *listBuilder) unmarshalOne(dec *json.Decoder) error { +func (b *baseListBuilder) unmarshalOne(dec *json.Decoder) error { t, err := dec.Token() if err != nil { return err @@ -492,7 +517,7 @@ func (b *listBuilder) unmarshalOne(dec *json.Decoder) error { return nil } -func (b *listBuilder) unmarshal(dec *json.Decoder) error { +func (b *baseListBuilder) unmarshal(dec *json.Decoder) error { for dec.More() { if err := b.unmarshalOne(dec); err != nil { return err @@ -501,7 +526,7 @@ func (b *listBuilder) unmarshal(dec *json.Decoder) error { return nil } -func (b *listBuilder) UnmarshalJSON(data []byte) error { +func (b *baseListBuilder) UnmarshalJSON(data []byte) error { dec := json.NewDecoder(bytes.NewReader(data)) t, err := dec.Token() if err != nil { diff --git a/go/arrow/array/list_test.go b/go/arrow/array/list_test.go index a2c036ec5f5e6..f493167f76a1c 100644 --- a/go/arrow/array/list_test.go +++ b/go/arrow/array/list_test.go @@ -26,375 +26,274 @@ import ( ) func TestListArray(t *testing.T) { - pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer pool.AssertSize(t, 0) - - var ( - vs = []int32{0, 1, 2, 3, 4, 5, 6} - lengths = []int{3, 0, 4} - isValid = []bool{true, false, true} - offsets = []int32{0, 3, 3, 7} - ) - - lb := array.NewListBuilder(pool, arrow.PrimitiveTypes.Int32) - defer lb.Release() - - for i := 0; i < 10; i++ { - vb := lb.ValueBuilder().(*array.Int32Builder) - vb.Reserve(len(vs)) - - pos := 0 - for i, length := range lengths { - lb.Append(isValid[i]) - for j := 0; j < length; j++ { - vb.Append(vs[pos]) - pos++ + tests := []struct { + typeID arrow.Type + offsets interface{} + dt arrow.DataType + }{ + {arrow.LIST, []int32{0, 3, 3, 3, 7}, arrow.ListOf(arrow.PrimitiveTypes.Int32)}, + {arrow.LARGE_LIST, []int64{0, 3, 3, 3, 7}, arrow.LargeListOf(arrow.PrimitiveTypes.Int32)}, + } + + for _, tt := range tests { + t.Run(tt.typeID.String(), func(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + var ( + vs = []int32{0, 1, 2, 3, 4, 5, 6} + lengths = []int{3, 0, 0, 4} + isValid = []bool{true, false, true, true} + ) + + lb := array.NewBuilder(pool, tt.dt).(array.ListLikeBuilder) + defer lb.Release() + + for i := 0; i < 10; i++ { + vb := lb.ValueBuilder().(*array.Int32Builder) + vb.Reserve(len(vs)) + + pos := 0 + for i, length := range lengths { + lb.Append(isValid[i]) + for j := 0; j < length; j++ { + vb.Append(vs[pos]) + pos++ + } + } + + arr := lb.NewArray().(array.ListLike) + defer arr.Release() + + arr.Retain() + arr.Release() + + if got, want := arr.DataType().ID(), tt.typeID; got != want { + t.Fatalf("got=%v, want=%v", got, want) + } + + if got, want := arr.Len(), len(isValid); got != want { + t.Fatalf("got=%d, want=%d", got, want) + } + + for i := range lengths { + if got, want := arr.IsValid(i), isValid[i]; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + if got, want := arr.IsNull(i), !isValid[i]; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + } + + var got interface{} + switch tt.typeID { + case arrow.LIST: + arr := arr.(*array.List) + got = arr.Offsets() + case arrow.LARGE_LIST: + arr := arr.(*array.LargeList) + got = arr.Offsets() + } + + if !reflect.DeepEqual(got, tt.offsets) { + t.Fatalf("got=%v, want=%v", got, tt.offsets) + } + + varr := arr.ListValues().(*array.Int32) + if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { + t.Fatalf("got=%v, want=%v", got, want) + } } - } - - arr := lb.NewArray().(*array.List) - defer arr.Release() - - arr.Retain() - arr.Release() - - if got, want := arr.DataType().ID(), arrow.LIST; got != want { - t.Fatalf("got=%v, want=%v", got, want) - } - - if got, want := arr.Len(), len(isValid); got != want { - t.Fatalf("got=%d, want=%d", got, want) - } - - for i := range lengths { - if got, want := arr.IsValid(i), isValid[i]; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - if got, want := arr.IsNull(i), lengths[i] == 0; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - } - - if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } - - varr := arr.ListValues().(*array.Int32) - if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } + }) } + } func TestListArrayEmpty(t *testing.T) { - pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer pool.AssertSize(t, 0) - - lb := array.NewListBuilder(pool, arrow.PrimitiveTypes.Int32) - defer lb.Release() - arr := lb.NewArray().(*array.List) - defer arr.Release() - if got, want := arr.Len(), 0; got != want { - t.Fatalf("got=%d, want=%d", got, want) + typ := []arrow.DataType{ + arrow.ListOf(arrow.PrimitiveTypes.Int32), + arrow.LargeListOf(arrow.PrimitiveTypes.Int32), + } + + for _, dt := range typ { + t.Run(dt.String(), func(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + lb := array.NewBuilder(pool, dt) + defer lb.Release() + arr := lb.NewArray() + defer arr.Release() + if got, want := arr.Len(), 0; got != want { + t.Fatalf("got=%d, want=%d", got, want) + } + }) } } func TestListArrayBulkAppend(t *testing.T) { - pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer pool.AssertSize(t, 0) - - var ( - vs = []int32{0, 1, 2, 3, 4, 5, 6} - lengths = []int{3, 0, 4} - isValid = []bool{true, false, true} - offsets = []int32{0, 3, 3, 7} - ) - - lb := array.NewListBuilder(pool, arrow.PrimitiveTypes.Int32) - defer lb.Release() - vb := lb.ValueBuilder().(*array.Int32Builder) - vb.Reserve(len(vs)) - - lb.AppendValues(offsets, isValid) - for _, v := range vs { - vb.Append(v) - } - - arr := lb.NewArray().(*array.List) - defer arr.Release() - - if got, want := arr.DataType().ID(), arrow.LIST; got != want { - t.Fatalf("got=%v, want=%v", got, want) - } - - if got, want := arr.Len(), len(isValid); got != want { - t.Fatalf("got=%d, want=%d", got, want) - } - - for i := range lengths { - if got, want := arr.IsValid(i), isValid[i]; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - if got, want := arr.IsNull(i), lengths[i] == 0; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - } - - if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } - - varr := arr.ListValues().(*array.Int32) - if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } -} - -func TestListArraySlice(t *testing.T) { - pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer pool.AssertSize(t, 0) - - var ( - vs = []int32{0, 1, 2, 3, 4, 5, 6} - lengths = []int{3, 0, 4} - isValid = []bool{true, false, true} - offsets = []int32{0, 3, 3, 7} - ) - - lb := array.NewListBuilder(pool, arrow.PrimitiveTypes.Int32) - defer lb.Release() - vb := lb.ValueBuilder().(*array.Int32Builder) - vb.Reserve(len(vs)) - - lb.AppendValues(offsets, isValid) - for _, v := range vs { - vb.Append(v) - } - - arr := lb.NewArray().(*array.List) - defer arr.Release() - - if got, want := arr.DataType().ID(), arrow.LIST; got != want { - t.Fatalf("got=%v, want=%v", got, want) - } - - if got, want := arr.Len(), len(isValid); got != want { - t.Fatalf("got=%d, want=%d", got, want) - } - - for i := range lengths { - if got, want := arr.IsValid(i), isValid[i]; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - if got, want := arr.IsNull(i), lengths[i] == 0; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - } - - if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } - - varr := arr.ListValues().(*array.Int32) - if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } - - if got, want := arr.String(), `[[0 1 2] (null) [3 4 5 6]]`; got != want { - t.Fatalf("got=%q, want=%q", got, want) - } - - sub := array.NewSlice(arr, 1, 3).(*array.List) - defer sub.Release() - - if got, want := sub.String(), `[(null) [3 4 5 6]]`; got != want { - t.Fatalf("got=%q, want=%q", got, want) - } -} - -func TestLargeListArray(t *testing.T) { - pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer pool.AssertSize(t, 0) - - var ( - vs = []int32{0, 1, 2, 3, 4, 5, 6} - lengths = []int{3, 0, 0, 4} - isValid = []bool{true, false, true, true} - offsets = []int64{0, 3, 3, 3, 7} - ) - - lb := array.NewLargeListBuilder(pool, arrow.PrimitiveTypes.Int32) - defer lb.Release() - - for i := 0; i < 10; i++ { - vb := lb.ValueBuilder().(*array.Int32Builder) - vb.Reserve(len(vs)) - - pos := 0 - for i, length := range lengths { - lb.Append(isValid[i]) - for j := 0; j < length; j++ { - vb.Append(vs[pos]) - pos++ + tests := []struct { + typeID arrow.Type + offsets interface{} + dt arrow.DataType + }{ + {arrow.LIST, []int32{0, 3, 3, 3, 7}, arrow.ListOf(arrow.PrimitiveTypes.Int32)}, + {arrow.LARGE_LIST, []int64{0, 3, 3, 3, 7}, arrow.LargeListOf(arrow.PrimitiveTypes.Int32)}, + } + + for _, tt := range tests { + t.Run(tt.typeID.String(), func(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + var ( + vs = []int32{0, 1, 2, 3, 4, 5, 6} + lengths = []int{3, 0, 0, 4} + isValid = []bool{true, false, true, true} + ) + + lb := array.NewBuilder(pool, tt.dt).(array.ListLikeBuilder) + defer lb.Release() + vb := lb.ValueBuilder().(*array.Int32Builder) + vb.Reserve(len(vs)) + + switch tt.typeID { + case arrow.LIST: + lb.(*array.ListBuilder).AppendValues(tt.offsets.([]int32), isValid) + case arrow.LARGE_LIST: + lb.(*array.LargeListBuilder).AppendValues(tt.offsets.([]int64), isValid) + } + for _, v := range vs { + vb.Append(v) } - } - - arr := lb.NewArray().(*array.LargeList) - defer arr.Release() - - arr.Retain() - arr.Release() - - if got, want := arr.DataType().ID(), arrow.LARGE_LIST; got != want { - t.Fatalf("got=%v, want=%v", got, want) - } - if got, want := arr.Len(), len(isValid); got != want { - t.Fatalf("got=%d, want=%d", got, want) - } + arr := lb.NewArray().(array.ListLike) + defer arr.Release() - for i := range lengths { - if got, want := arr.IsValid(i), isValid[i]; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + if got, want := arr.DataType().ID(), tt.typeID; got != want { + t.Fatalf("got=%v, want=%v", got, want) } - if got, want := arr.IsNull(i), lengths[i] == 0; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + + if got, want := arr.Len(), len(isValid); got != want { + t.Fatalf("got=%d, want=%d", got, want) } - } - if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } + for i := range lengths { + if got, want := arr.IsValid(i), isValid[i]; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + if got, want := arr.IsNull(i), !isValid[i]; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + } - varr := arr.ListValues().(*array.Int32) - if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } - } -} + var got interface{} + switch tt.typeID { + case arrow.LIST: + arr := arr.(*array.List) + got = arr.Offsets() + case arrow.LARGE_LIST: + arr := arr.(*array.LargeList) + got = arr.Offsets() + } -func TestLargeListArrayEmpty(t *testing.T) { - pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer pool.AssertSize(t, 0) + if !reflect.DeepEqual(got, tt.offsets) { + t.Fatalf("got=%v, want=%v", got, tt.offsets) + } - lb := array.NewLargeListBuilder(pool, arrow.PrimitiveTypes.Int32) - defer lb.Release() - arr := lb.NewArray().(*array.LargeList) - defer arr.Release() - if got, want := arr.Len(), 0; got != want { - t.Fatalf("got=%d, want=%d", got, want) + varr := arr.ListValues().(*array.Int32) + if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { + t.Fatalf("got=%v, want=%v", got, want) + } + }) } } -func TestLargeListArrayBulkAppend(t *testing.T) { - pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer pool.AssertSize(t, 0) - - var ( - vs = []int32{0, 1, 2, 3, 4, 5, 6} - lengths = []int{3, 0, 4} - isValid = []bool{true, false, true} - offsets = []int64{0, 3, 3, 7} - ) - - lb := array.NewLargeListBuilder(pool, arrow.PrimitiveTypes.Int32) - defer lb.Release() - vb := lb.ValueBuilder().(*array.Int32Builder) - vb.Reserve(len(vs)) - - lb.AppendValues(offsets, isValid) - for _, v := range vs { - vb.Append(v) - } - - arr := lb.NewArray().(*array.LargeList) - defer arr.Release() - - if got, want := arr.DataType().ID(), arrow.LARGE_LIST; got != want { - t.Fatalf("got=%v, want=%v", got, want) - } - - if got, want := arr.Len(), len(isValid); got != want { - t.Fatalf("got=%d, want=%d", got, want) - } - - for i := range lengths { - if got, want := arr.IsValid(i), isValid[i]; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - if got, want := arr.IsNull(i), lengths[i] == 0; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - } - - if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } +func TestListArraySlice(t *testing.T) { + tests := []struct { + typeID arrow.Type + offsets interface{} + dt arrow.DataType + }{ + {arrow.LIST, []int32{0, 3, 3, 3, 7}, arrow.ListOf(arrow.PrimitiveTypes.Int32)}, + {arrow.LARGE_LIST, []int64{0, 3, 3, 3, 7}, arrow.LargeListOf(arrow.PrimitiveTypes.Int32)}, + } + + for _, tt := range tests { + t.Run(tt.typeID.String(), func(t *testing.T) { + pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer pool.AssertSize(t, 0) + + var ( + vs = []int32{0, 1, 2, 3, 4, 5, 6} + lengths = []int{3, 0, 0, 4} + isValid = []bool{true, false, true, true} + ) + + lb := array.NewBuilder(pool, tt.dt).(array.ListLikeBuilder) + defer lb.Release() + vb := lb.ValueBuilder().(*array.Int32Builder) + vb.Reserve(len(vs)) + + switch tt.typeID { + case arrow.LIST: + lb.(*array.ListBuilder).AppendValues(tt.offsets.([]int32), isValid) + case arrow.LARGE_LIST: + lb.(*array.LargeListBuilder).AppendValues(tt.offsets.([]int64), isValid) + } + for _, v := range vs { + vb.Append(v) + } - varr := arr.ListValues().(*array.Int32) - if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } -} + arr := lb.NewArray().(array.ListLike) + defer arr.Release() -func TestLargeListArraySlice(t *testing.T) { - pool := memory.NewCheckedAllocator(memory.NewGoAllocator()) - defer pool.AssertSize(t, 0) - - var ( - vs = []int32{0, 1, 2, 3, 4, 5, 6} - lengths = []int{3, 0, 4} - isValid = []bool{true, false, true} - offsets = []int64{0, 3, 3, 7} - ) - - lb := array.NewLargeListBuilder(pool, arrow.PrimitiveTypes.Int32) - defer lb.Release() - vb := lb.ValueBuilder().(*array.Int32Builder) - vb.Reserve(len(vs)) - - lb.AppendValues(offsets, isValid) - for _, v := range vs { - vb.Append(v) - } - - arr := lb.NewArray().(*array.LargeList) - defer arr.Release() + if got, want := arr.DataType().ID(), tt.typeID; got != want { + t.Fatalf("got=%v, want=%v", got, want) + } - if got, want := arr.DataType().ID(), arrow.LARGE_LIST; got != want { - t.Fatalf("got=%v, want=%v", got, want) - } + if got, want := arr.Len(), len(isValid); got != want { + t.Fatalf("got=%d, want=%d", got, want) + } - if got, want := arr.Len(), len(isValid); got != want { - t.Fatalf("got=%d, want=%d", got, want) - } + for i := range lengths { + if got, want := arr.IsValid(i), isValid[i]; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + if got, want := arr.IsNull(i), !isValid[i]; got != want { + t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) + } + } - for i := range lengths { - if got, want := arr.IsValid(i), isValid[i]; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - if got, want := arr.IsNull(i), lengths[i] == 0; got != want { - t.Fatalf("got[%d]=%v, want[%d]=%v", i, got, i, want) - } - } + var got interface{} + switch tt.typeID { + case arrow.LIST: + arr := arr.(*array.List) + got = arr.Offsets() + case arrow.LARGE_LIST: + arr := arr.(*array.LargeList) + got = arr.Offsets() + } - if got, want := arr.Offsets(), offsets; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } + if !reflect.DeepEqual(got, tt.offsets) { + t.Fatalf("got=%v, want=%v", got, tt.offsets) + } - varr := arr.ListValues().(*array.Int32) - if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { - t.Fatalf("got=%v, want=%v", got, want) - } + varr := arr.ListValues().(*array.Int32) + if got, want := varr.Int32Values(), vs; !reflect.DeepEqual(got, want) { + t.Fatalf("got=%v, want=%v", got, want) + } - if got, want := arr.String(), `[[0 1 2] (null) [3 4 5 6]]`; got != want { - t.Fatalf("got=%q, want=%q", got, want) - } + if got, want := arr.String(), `[[0 1 2] (null) [] [3 4 5 6]]`; got != want { + t.Fatalf("got=%q, want=%q", got, want) + } - sub := array.NewSlice(arr, 1, 3).(*array.LargeList) - defer sub.Release() + sub := array.NewSlice(arr, 1, 4).(array.ListLike) + defer sub.Release() - if got, want := sub.String(), `[(null) [3 4 5 6]]`; got != want { - t.Fatalf("got=%q, want=%q", got, want) + if got, want := sub.String(), `[(null) [] [3 4 5 6]]`; got != want { + t.Fatalf("got=%q, want=%q", got, want) + } + }) } } diff --git a/go/arrow/array/string.go b/go/arrow/array/string.go index 9681a19a8705c..7f8212b5cb745 100644 --- a/go/arrow/array/string.go +++ b/go/arrow/array/string.go @@ -62,6 +62,10 @@ func (a *String) ValueOffset(i int) int { return int(a.offsets[i+a.array.data.offset]) } +func (a *String) ValueOffset64(i int) int64 { + return int64(a.ValueOffset(i)) +} + func (a *String) ValueOffsets() []int32 { beg := a.array.data.offset end := beg + a.array.data.length + 1 @@ -193,6 +197,10 @@ func (a *LargeString) ValueOffset(i int) int64 { return a.offsets[i+a.array.data.offset] } +func (a *LargeString) ValueOffset64(i int) int64 { + return a.ValueOffset(i) +} + func (a *LargeString) ValueOffsets() []int64 { beg := a.array.data.offset end := beg + a.array.data.length + 1 diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index 44046c78caea3..9e1f0b2076dbc 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -363,13 +363,13 @@ func (imp *cimporter) doImport(src *CArrowArray) error { case arrow.FixedWidthDataType: return imp.importFixedSizePrimitive() case *arrow.StringType: - return imp.importStringLike() + return imp.importStringLike(int64(arrow.Int32SizeBytes)) case *arrow.BinaryType: - return imp.importStringLike() + return imp.importStringLike(int64(arrow.Int32SizeBytes)) case *arrow.LargeStringType: - return imp.importLargeStringLike() + return imp.importStringLike(int64(arrow.Int64SizeBytes)) case *arrow.LargeBinaryType: - return imp.importLargeStringLike() + return imp.importStringLike(int64(arrow.Int64SizeBytes)) case *arrow.ListType: return imp.importListLike() case *arrow.LargeListType: @@ -414,7 +414,7 @@ func (imp *cimporter) doImport(src *CArrowArray) error { return nil } -func (imp *cimporter) importLargeStringLike() error { +func (imp *cimporter) importStringLike(offsetByteWidth int64) error { if err := imp.checkNoChildren(); err != nil { return err } @@ -428,30 +428,17 @@ func (imp *cimporter) importLargeStringLike() error { return err } - offsets := imp.importOffsetsBuffer(1, int64(arrow.Int64SizeBytes)) - typedOffsets := arrow.Int64Traits.CastFromBytes(offsets.Bytes()) - values := imp.importVariableValuesBuffer(2, 1, int(typedOffsets[imp.arr.offset+imp.arr.length])) - imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset)) - return nil -} - -func (imp *cimporter) importStringLike() error { - if err := imp.checkNoChildren(); err != nil { - return err - } - - if err := imp.checkNumBuffers(3); err != nil { - return err - } - - nulls, err := imp.importNullBitmap(0) - if err != nil { - return err + offsets := imp.importOffsetsBuffer(1, offsetByteWidth) + var nvals int64 + switch offsetByteWidth { + case 4: + typedOffsets := arrow.Int32Traits.CastFromBytes(offsets.Bytes()) + nvals = int64(typedOffsets[imp.arr.offset+imp.arr.length]) + case 8: + typedOffsets := arrow.Int64Traits.CastFromBytes(offsets.Bytes()) + nvals = typedOffsets[imp.arr.offset+imp.arr.length] } - - offsets := imp.importOffsetsBuffer(1, int64(arrow.Int32SizeBytes)) - typedOffsets := arrow.Int32Traits.CastFromBytes(offsets.Bytes()) - values := imp.importVariableValuesBuffer(2, 1, int(typedOffsets[imp.arr.offset+imp.arr.length])) + values := imp.importVariableValuesBuffer(2, 1, nvals) imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset)) return nil } @@ -556,8 +543,8 @@ func (imp *cimporter) importOffsetsBuffer(bufferID int, offsetsize int64) *memor return imp.importBuffer(bufferID, bufsize) } -func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth int, bytelen int) *memory.Buffer { - bufsize := byteWidth * bytelen +func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth, nvals int64) *memory.Buffer { + bufsize := byteWidth * nvals return imp.importBuffer(bufferID, int64(bufsize)) } diff --git a/go/arrow/ipc/writer.go b/go/arrow/ipc/writer.go index 9d85836cb22d0..71ded3412e729 100644 --- a/go/arrow/ipc/writer.go +++ b/go/arrow/ipc/writer.go @@ -29,6 +29,7 @@ import ( "github.com/apache/arrow/go/v10/arrow" "github.com/apache/arrow/go/v10/arrow/array" "github.com/apache/arrow/go/v10/arrow/bitutil" + "github.com/apache/arrow/go/v10/arrow/internal/debug" "github.com/apache/arrow/go/v10/arrow/internal/dictutils" "github.com/apache/arrow/go/v10/arrow/internal/flatbuf" "github.com/apache/arrow/go/v10/arrow/memory" @@ -532,8 +533,8 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { } p.body = append(p.body, values) - case *arrow.BinaryType: - arr := arr.(*array.Binary) + case *arrow.BinaryType, *arrow.LargeBinaryType, *arrow.StringType, *arrow.LargeStringType: + arr := arr.(array.BinaryLike) voffsets, err := w.getZeroBasedValueOffsets(arr) if err != nil { return fmt.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err) @@ -550,97 +551,7 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { case needTruncate(int64(data.Offset()), values, totalDataBytes): // slice data buffer to include the range we need now. var ( - beg = int64(arr.ValueOffset(0)) - len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes)) - ) - values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len]) - default: - if values != nil { - values.Retain() - } - } - p.body = append(p.body, voffsets) - p.body = append(p.body, values) - - case *arrow.LargeBinaryType: - arr := arr.(*array.LargeBinary) - voffsets, err := w.getZeroBasedValueOffsets(arr) - if err != nil { - return fmt.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err) - } - data := arr.Data() - values := data.Buffers()[2] - - var totalDataBytes int64 - if voffsets != nil { - totalDataBytes = int64(len(arr.ValueBytes())) - } - - switch { - case needTruncate(int64(data.Offset()), values, totalDataBytes): - // slice data buffer to include the range we need now. - var ( - beg = int64(arr.ValueOffset(0)) - len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes)) - ) - values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len]) - default: - if values != nil { - values.Retain() - } - } - p.body = append(p.body, voffsets) - p.body = append(p.body, values) - - case *arrow.StringType: - arr := arr.(*array.String) - voffsets, err := w.getZeroBasedValueOffsets(arr) - if err != nil { - return fmt.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err) - } - data := arr.Data() - values := data.Buffers()[2] - - var totalDataBytes int64 - if voffsets != nil { - totalDataBytes = int64(len(arr.ValueBytes())) - } - - switch { - case needTruncate(int64(data.Offset()), values, totalDataBytes): - // slice data buffer to include the range we need now. - var ( - beg = int64(arr.ValueOffset(0)) - len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes)) - ) - values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len]) - default: - if values != nil { - values.Retain() - } - } - p.body = append(p.body, voffsets) - p.body = append(p.body, values) - - case *arrow.LargeStringType: - arr := arr.(*array.LargeString) - voffsets, err := w.getZeroBasedValueOffsets(arr) - if err != nil { - return fmt.Errorf("could not retrieve zero-based value offsets from %T: %w", arr, err) - } - data := arr.Data() - values := data.Buffers()[2] - - var totalDataBytes int64 - if voffsets != nil { - totalDataBytes = int64(len(arr.ValueBytes())) - } - - switch { - case needTruncate(int64(data.Offset()), values, totalDataBytes): - // slice data buffer to include the range we need now. - var ( - beg = int64(arr.ValueOffset(0)) + beg = arr.ValueOffset64(0) len = minI64(paddedLength(totalDataBytes, kArrowAlignment), int64(totalDataBytes)) ) values = memory.NewBufferBytes(data.Buffers()[2].Bytes()[beg : beg+len]) @@ -700,8 +611,8 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { return fmt.Errorf("could not visit list element for array %T: %w", arr, err) } w.depth++ - case *arrow.ListType: - arr := arr.(*array.List) + case *arrow.ListType, *arrow.LargeListType: + arr := arr.(array.ListLike) voffsets, err := w.getZeroBasedValueOffsets(arr) if err != nil { return fmt.Errorf("could not retrieve zero-based value offsets for array %T: %w", arr, err) @@ -721,49 +632,13 @@ func (w *recordEncoder) visit(p *Payload, arr arrow.Array) error { } }() - if voffsets != nil { - values_offset = int64(arr.Offsets()[0]) - values_length = int64(arr.Offsets()[arr.Len()]) - values_offset + if arr.Len() > 0 && voffsets != nil { + values_offset, _ = arr.ValueOffsets(0) + _, values_length = arr.ValueOffsets(arr.Len() - 1) + values_length -= values_offset } - if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) { - // must also slice the values - values = array.NewSlice(values, values_offset, values_length) - mustRelease = true - } - err = w.visit(p, values) - - if err != nil { - return fmt.Errorf("could not visit list element for array %T: %w", arr, err) - } - w.depth++ - case *arrow.LargeListType: - arr := arr.(*array.LargeList) - voffsets, err := w.getZeroBasedValueOffsets(arr) - if err != nil { - return fmt.Errorf("could not retrieve zero-based value offsets for array %T: %w", arr, err) - } - p.body = append(p.body, voffsets) - - w.depth-- - var ( - values = arr.ListValues() - mustRelease = false - values_offset int64 - values_length int64 - ) - defer func() { - if mustRelease { - values.Release() - } - }() - - if voffsets != nil { - values_offset = arr.Offsets()[0] - values_length = arr.Offsets()[arr.Len()] - values_offset - } - - if len(arr.Offsets()) != 0 || values_length < int64(values.Len()) { + if arr.Len() != 0 || values_length < int64(values.Len()) { // must also slice the values values = array.NewSlice(values, values_offset, values_length) mustRelease = true @@ -828,6 +703,7 @@ func (w *recordEncoder) getZeroBasedValueOffsets(arr arrow.Array) (*memory.Buffe } default: + debug.Assert(arr.DataType().Layout().Buffers[1].ByteWidth == 4, "invalid offset bytewidth") dest := arrow.Int32Traits.CastFromBytes(shiftedOffsets.Bytes()) offsets := arrow.Int32Traits.CastFromBytes(voffsets.Bytes())[data.Offset() : data.Offset()+data.Len()+1] From 9c925869fa7a6c72f661a4a20e623a4452e7fd74 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 2 Aug 2022 13:37:37 -0400 Subject: [PATCH 10/13] add docstring comment to OffsetTraits interface --- go/arrow/datatype_binary.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go/arrow/datatype_binary.go b/go/arrow/datatype_binary.go index e77e3df44be42..fa6513693f8bd 100644 --- a/go/arrow/datatype_binary.go +++ b/go/arrow/datatype_binary.go @@ -16,7 +16,14 @@ package arrow +// OffsetTraits is a convenient interface over the various type traits +// constants such as arrow.Int32Traits allowing types with offsets, like +// BinaryType, StringType, LargeBinaryType and LargeStringType to have +// a method to return information about their offset type and how many bytes +// would be required to allocate an offset buffer for them. type OffsetTraits interface { + // BytesRequired returns the number of bytes required to be allocated + // in order to hold the passed in number of elements of this type. BytesRequired(int) int } From 807cdf13cd449fd26c9dd76fce76f02fe1b7133b Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 2 Aug 2022 13:41:58 -0400 Subject: [PATCH 11/13] add -asan to go tests --- ci/scripts/go_test.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ci/scripts/go_test.sh b/ci/scripts/go_test.sh index 9923cff6330f5..7b84204cc2ae8 100755 --- a/ci/scripts/go_test.sh +++ b/ci/scripts/go_test.sh @@ -21,16 +21,16 @@ set -ex source_dir=${1}/go -testargs="-race" +testargs="-race -asan" case "$(uname)" in MINGW*) - # -race doesn't work on windows currently + # -asan and -race don't work on windows currently testargs="" ;; esac if [[ "$(go env GOHOSTARCH)" = "s390x" ]]; then - testargs="" # -race not supported on s390x + testargs="" # -race and -asan not supported on s390x fi # Go static check (skipped in MinGW) From b99f9e1a0ea85fbb8e95c33d7a6d7a14c08f53e4 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Tue, 2 Aug 2022 13:48:51 -0400 Subject: [PATCH 12/13] remove -asan for now --- ci/scripts/go_test.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ci/scripts/go_test.sh b/ci/scripts/go_test.sh index 7b84204cc2ae8..760aa149aa91b 100755 --- a/ci/scripts/go_test.sh +++ b/ci/scripts/go_test.sh @@ -21,7 +21,8 @@ set -ex source_dir=${1}/go -testargs="-race -asan" +# when we upgrade to at least go1.18, we can add the new -asan option here +testargs="-race" case "$(uname)" in MINGW*) # -asan and -race don't work on windows currently From 941aafb72a80ad8db945cf27324c08cf06320e66 Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Wed, 3 Aug 2022 15:13:23 -0400 Subject: [PATCH 13/13] style consistency --- go/arrow/array/list.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/arrow/array/list.go b/go/arrow/array/list.go index 1b07a3128df52..2c9d7e4f3d639 100644 --- a/go/arrow/array/list.go +++ b/go/arrow/array/list.go @@ -167,7 +167,7 @@ type LargeList struct { // NewLargeListData returns a new LargeList array value, from data. func NewLargeListData(data arrow.ArrayData) *LargeList { - a := &LargeList{} + a := new(LargeList) a.refCount = 1 a.setData(data.(*Data)) return a