diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index dc8825a7edb67..1d454c57f722e 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -40,7 +40,6 @@ import ( "errors" "fmt" "io" - "reflect" "runtime" "strconv" "strings" @@ -152,13 +151,8 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) { var childFields []arrow.Field if schema.n_children > 0 { // call ourselves recursively if there are children. - var schemaChildren []*CArrowSchema // set up a slice to reference safely - s := (*reflect.SliceHeader)(unsafe.Pointer(&schemaChildren)) - s.Data = uintptr(unsafe.Pointer(schema.children)) - s.Len = int(schema.n_children) - s.Cap = int(schema.n_children) - + schemaChildren := unsafe.Slice(schema.children, schema.n_children) childFields = make([]arrow.Field, schema.n_children) for i, c := range schemaChildren { childFields[i], err = importSchema((*CArrowSchema)(c)) @@ -341,21 +335,18 @@ type cimporter struct { parent *cimporter children []cimporter cbuffers []*C.void + + alloc *importAllocator } func (imp *cimporter) importChild(parent *cimporter, src *CArrowArray) error { - imp.parent = parent - return imp.doImport(src) + imp.parent, imp.arr, imp.alloc = parent, src, parent.alloc + return imp.doImport() } // import any child arrays for lists, structs, and so on. func (imp *cimporter) doImportChildren() error { - var children []*CArrowArray - // create a proper slice for our children - s := (*reflect.SliceHeader)(unsafe.Pointer(&children)) - s.Data = uintptr(unsafe.Pointer(imp.arr.children)) - s.Len = int(imp.arr.n_children) - s.Cap = int(imp.arr.n_children) + children := unsafe.Slice(imp.arr.children, imp.arr.n_children) if len(children) > 0 { imp.children = make([]cimporter, len(children)) @@ -418,26 +409,44 @@ func (imp *cimporter) doImportChildren() error { func (imp *cimporter) initarr() { imp.arr = C.get_arr() + if imp.alloc == nil { + imp.alloc = &importAllocator{arr: imp.arr} + } +} + +func (imp *cimporter) doImportArr(src *CArrowArray) error { + imp.arr = C.get_arr() + C.ArrowArrayMove(src, imp.arr) + if imp.alloc == nil { + imp.alloc = &importAllocator{arr: imp.arr} + } + + // we tie the releasing of the array to when the buffers are + // cleaned up, so if there are no buffers that we've imported + // such as for a null array or a nested array with no bitmap + // and only null columns, then we can release the CArrowArray + // struct immediately after import, since we have no imported + // memory that we have to track the lifetime of. + defer func() { + if imp.alloc.bufCount == 0 { + C.ArrowArrayRelease(imp.arr) + } + }() + + return imp.doImport() } // import is called recursively as needed for importing an array and its children // in order to generate array.Data objects -func (imp *cimporter) doImport(src *CArrowArray) error { - imp.initarr() +func (imp *cimporter) doImport() error { // move the array from the src object passed in to the one referenced by // this importer. That way we can set up a finalizer on the created // arrow.ArrayData object so we clean up our Array's memory when garbage collected. - C.ArrowArrayMove(src, imp.arr) defer func(arr *CArrowArray) { - if imp.data != nil { - runtime.SetFinalizer(imp.data, func(arrow.ArrayData) { - defer C.free(unsafe.Pointer(arr)) - C.ArrowArrayRelease(arr) - if C.ArrowArrayIsReleased(arr) != 1 { - panic("did not release C mem") - } - }) - } else { + // this should only occur in the case of an error happening + // during import, at which point we need to clean up the + // ArrowArray struct we allocated. + if imp.data == nil { C.free(unsafe.Pointer(arr)) } }(imp.arr) @@ -447,6 +456,12 @@ func (imp *cimporter) doImport(src *CArrowArray) error { return err } + for _, c := range imp.children { + if c.data != nil { + defer c.data.Release() + } + } + if imp.arr.n_buffers > 0 { // get a view of the buffers, zero-copy. we're just looking at the pointers imp.cbuffers = unsafe.Slice((**C.void)(unsafe.Pointer(imp.arr.buffers)), imp.arr.n_buffers) @@ -458,6 +473,7 @@ func (imp *cimporter) doImport(src *CArrowArray) error { if err := imp.checkNoChildren(); err != nil { return err } + imp.data = array.NewData(dt, int(imp.arr.length), nil, nil, int(imp.arr.null_count), int(imp.arr.offset)) case arrow.FixedWidthDataType: return imp.importFixedSizePrimitive() @@ -488,6 +504,9 @@ func (imp *cimporter) doImport(src *CArrowArray) error { if err != nil { return err } + if nulls != nil { + defer nulls.Release() + } imp.data = array.NewData(dt, int(imp.arr.length), []*memory.Buffer{nulls}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset)) case *arrow.StructType: @@ -499,6 +518,9 @@ func (imp *cimporter) doImport(src *CArrowArray) error { if err != nil { return err } + if nulls != nil { + defer nulls.Release() + } children := make([]arrow.ArrayData, len(imp.children)) for i := range imp.children { @@ -529,9 +551,11 @@ func (imp *cimporter) doImport(src *CArrowArray) error { if bufs[1], err = imp.importFixedSizeBuffer(1, 1); err != nil { return err } + defer bufs[1].Release() if bufs[2], err = imp.importFixedSizeBuffer(2, int64(arrow.Int32SizeBytes)); err != nil { return err } + defer bufs[2].Release() } else { if err := imp.checkNumBuffers(2); err != nil { return err @@ -540,9 +564,11 @@ func (imp *cimporter) doImport(src *CArrowArray) error { if bufs[1], err = imp.importFixedSizeBuffer(0, 1); err != nil { return err } + defer bufs[1].Release() if bufs[2], err = imp.importFixedSizeBuffer(1, int64(arrow.Int32SizeBytes)); err != nil { return err } + defer bufs[2].Release() } children := make([]arrow.ArrayData, len(imp.children)) @@ -562,6 +588,7 @@ func (imp *cimporter) doImport(src *CArrowArray) error { if buf, err = imp.importFixedSizeBuffer(1, 1); err != nil { return err } + defer buf.Release() } else { if err := imp.checkNumBuffers(1); err != nil { return err @@ -570,6 +597,7 @@ func (imp *cimporter) doImport(src *CArrowArray) error { if buf, err = imp.importFixedSizeBuffer(0, 1); err != nil { return err } + defer buf.Release() } children := make([]arrow.ArrayData, len(imp.children)) @@ -596,14 +624,17 @@ func (imp *cimporter) importStringLike(offsetByteWidth int64) (err error) { var ( nulls, offsets, values *memory.Buffer ) - if nulls, err = imp.importNullBitmap(0); err != nil { return } + if nulls != nil { + defer nulls.Release() + } if offsets, err = imp.importOffsetsBuffer(1, offsetByteWidth); err != nil { return } + defer offsets.Release() var nvals int64 switch offsetByteWidth { @@ -617,6 +648,8 @@ func (imp *cimporter) importStringLike(offsetByteWidth int64) (err error) { if values, err = imp.importVariableValuesBuffer(2, 1, nvals); err != nil { return } + defer values.Release() + imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets, values}, nil, int(imp.arr.null_count), int(imp.arr.offset)) return } @@ -634,11 +667,17 @@ func (imp *cimporter) importListLike() (err error) { if nulls, err = imp.importNullBitmap(0); err != nil { return } + if nulls != nil { + defer nulls.Release() + } offsetSize := imp.dt.Layout().Buffers[1].ByteWidth if offsets, err = imp.importOffsetsBuffer(1, int64(offsetSize)); err != nil { return } + if offsets != nil { + defer offsets.Release() + } imp.data = array.NewData(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, offsets}, []arrow.ArrayData{imp.children[0].data}, int(imp.arr.null_count), int(imp.arr.offset)) return @@ -677,7 +716,7 @@ func (imp *cimporter) importFixedSizePrimitive() error { var dict *array.Data if dt, ok := imp.dt.(*arrow.DictionaryType); ok { dictImp := &cimporter{dt: dt.ValueType} - if err := dictImp.doImport(imp.arr.dictionary); err != nil { + if err := dictImp.importChild(imp, imp.arr.dictionary); err != nil { return err } defer dictImp.data.Release() @@ -685,6 +724,13 @@ func (imp *cimporter) importFixedSizePrimitive() error { dict = dictImp.data.(*array.Data) } + if nulls != nil { + defer nulls.Release() + } + if values != nil { + defer values.Release() + } + imp.data = array.NewDataWithDictionary(imp.dt, int(imp.arr.length), []*memory.Buffer{nulls, values}, int(imp.arr.null_count), int(imp.arr.offset), dict) return nil } @@ -721,9 +767,9 @@ func (imp *cimporter) importBuffer(bufferID int, sz int64) (*memory.Buffer, erro } return memory.NewBufferBytes([]byte{}), nil } - const maxLen = 0x7fffffff - data := (*[maxLen]byte)(unsafe.Pointer(imp.cbuffers[bufferID]))[:sz:sz] - return memory.NewBufferBytes(data), nil + data := unsafe.Slice((*byte)(unsafe.Pointer(imp.cbuffers[bufferID])), sz) + imp.alloc.addBuffer() + return memory.NewBufferWithAllocator(data, imp.alloc), nil } func (imp *cimporter) importBitsBuffer(bufferID int) (*memory.Buffer, error) { @@ -760,7 +806,7 @@ func (imp *cimporter) importVariableValuesBuffer(bufferID int, byteWidth, nvals func importCArrayAsType(arr *CArrowArray, dt arrow.DataType) (imp *cimporter, err error) { imp = &cimporter{dt: dt} - err = imp.doImport(arr) + err = imp.doImportArr(arr) return } diff --git a/go/arrow/cdata/cdata_fulltest.c b/go/arrow/cdata/cdata_fulltest.c index 7aed597942b51..4291cfff865b5 100644 --- a/go/arrow/cdata/cdata_fulltest.c +++ b/go/arrow/cdata/cdata_fulltest.c @@ -27,8 +27,20 @@ #include "arrow/c/helpers.h" #include "utils.h" +int is_little_endian() +{ + unsigned int x = 1; + char *c = (char*) &x; + return (int)*c; +} + static const int64_t kDefaultFlags = ARROW_FLAG_NULLABLE; +extern void releaseTestArr(struct ArrowArray* array); +void goReleaseTestArray(struct ArrowArray* array) { + releaseTestArr(array); +} + static void release_int32_type(struct ArrowSchema* schema) { // mark released schema->release = NULL; diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go index af05649b1c541..f09fa3ff2f6fa 100644 --- a/go/arrow/cdata/cdata_test.go +++ b/go/arrow/cdata/cdata_test.go @@ -40,6 +40,7 @@ import ( "github.com/apache/arrow/go/v14/arrow/decimal128" "github.com/apache/arrow/go/v14/arrow/internal/arrdata" "github.com/apache/arrow/go/v14/arrow/memory" + "github.com/apache/arrow/go/v14/arrow/memory/mallocator" "github.com/stretchr/testify/assert" ) @@ -488,8 +489,11 @@ func TestPrimitiveArrs(t *testing.T) { arr := tt.fn() defer arr.Release() - carr := createCArr(arr) - defer freeTestArr(carr) + mem := mallocator.NewMallocator() + defer mem.AssertSize(t, 0) + + carr := createCArr(arr, mem) + defer freeTestMallocatorArr(carr, mem) imported, err := ImportCArrayWithType(carr, arr.DataType()) assert.NoError(t, err) @@ -508,8 +512,11 @@ func TestPrimitiveSliced(t *testing.T) { sl := array.NewSlice(arr, 1, 2) defer sl.Release() - carr := createCArr(sl) - defer freeTestArr(carr) + mem := mallocator.NewMallocator() + defer mem.AssertSize(t, 0) + + carr := createCArr(sl, mem) + defer freeTestMallocatorArr(carr, mem) imported, err := ImportCArrayWithType(carr, arr.DataType()) assert.NoError(t, err) @@ -687,8 +694,11 @@ func TestNestedArrays(t *testing.T) { arr := tt.fn() defer arr.Release() - carr := createCArr(arr) - defer freeTestArr(carr) + mem := mallocator.NewMallocator() + defer mem.AssertSize(t, 0) + + carr := createCArr(arr, mem) + defer freeTestMallocatorArr(carr, mem) imported, err := ImportCArrayWithType(carr, arr.DataType()) assert.NoError(t, err) @@ -701,11 +711,14 @@ func TestNestedArrays(t *testing.T) { } func TestRecordBatch(t *testing.T) { + mem := mallocator.NewMallocator() + defer mem.AssertSize(t, 0) + arr := createTestStructArr() defer arr.Release() - carr := createCArr(arr) - defer freeTestArr(carr) + carr := createCArr(arr, mem) + defer freeTestMallocatorArr(carr, mem) sc := testStruct([]string{"+s", "c", "u"}, []string{"", "a", "b"}, []int64{0, flagIsNullable, flagIsNullable}) defer freeMallocedSchemas(sc) diff --git a/go/arrow/cdata/cdata_test_framework.go b/go/arrow/cdata/cdata_test_framework.go index 7dd23926a607a..2df52dcb3cd40 100644 --- a/go/arrow/cdata/cdata_test_framework.go +++ b/go/arrow/cdata/cdata_test_framework.go @@ -26,25 +26,20 @@ package cdata // #include "arrow/c/helpers.h" // // void setup_array_stream_test(const int n_batches, struct ArrowArrayStream* out); -// struct ArrowArray* get_test_arr() { +// static struct ArrowArray* get_test_arr() { // struct ArrowArray* array = (struct ArrowArray*)malloc(sizeof(struct ArrowArray)); // memset(array, 0, sizeof(*array)); // return array; // } -// struct ArrowArrayStream* get_test_stream() { +// static struct ArrowArrayStream* get_test_stream() { // struct ArrowArrayStream* out = (struct ArrowArrayStream*)malloc(sizeof(struct ArrowArrayStream)); // memset(out, 0, sizeof(struct ArrowArrayStream)); // return out; // } // -// void release_test_arr(struct ArrowArray* arr) { -// for (int i = 0; i < arr->n_buffers; ++i) { -// free((void*)arr->buffers[i]); -// } -// ArrowArrayMarkReleased(arr); -// } +// void release_test_arr(struct ArrowArray* arr); // -// int32_t* get_data() { +// static int32_t* get_data() { // int32_t* data = malloc(sizeof(int32_t)*10); // for (int i = 0; i < 10; ++i) { data[i] = i+1; } // return data; @@ -62,16 +57,22 @@ package cdata // int test_exported_stream(struct ArrowArrayStream* stream); // void test_stream_schema_fallible(struct ArrowArrayStream* stream); // int confuse_go_gc(struct ArrowArrayStream* stream, unsigned int seed); +// extern void releaseTestArr(struct ArrowArray* array); +// extern void goReleaseTestArray(struct ArrowArray* array); import "C" + import ( "errors" "fmt" "io" "math/rand" + "runtime/cgo" "unsafe" "github.com/apache/arrow/go/v14/arrow" "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/internal" + "github.com/apache/arrow/go/v14/arrow/memory/mallocator" ) const ( @@ -227,50 +228,103 @@ func testSchema(fmts, names []string, flags []int64) **CArrowSchema { return C.test_schema((**C.char)(unsafe.Pointer(&cfmts[0])), (**C.char)(unsafe.Pointer(&cnames[0])), (*C.int64_t)(unsafe.Pointer(&cflags[0])), C.int(len(fmts))) } -func freeTestArr(carr *CArrowArray) { - C.free(unsafe.Pointer(carr)) +func freeAny[T any](alloc *mallocator.Mallocator, p *T, n int) { + raw := unsafe.Slice((*byte)(unsafe.Pointer(p)), int(unsafe.Sizeof(*p))*n) + alloc.Free(raw) +} + +func freeTestMallocatorArr(carr *CArrowArray, alloc *mallocator.Mallocator) { + freeAny(alloc, carr, 1) +} + +func getTestArr(alloc *mallocator.Mallocator) *CArrowArray { + raw := alloc.Allocate(C.sizeof_struct_ArrowArray) + return (*CArrowArray)(unsafe.Pointer(&raw[0])) +} + +type testReleaser struct { + alloc *mallocator.Mallocator + bufs [][]byte +} + +//export releaseTestArr +func releaseTestArr(arr *CArrowArray) { + if C.ArrowArrayIsReleased(arr) == 1 { + return + } + defer C.ArrowArrayMarkReleased(arr) + + h := getHandle(arr.private_data) + tr := h.Value().(*testReleaser) + + alloc := tr.alloc + for _, b := range tr.bufs { + alloc.Free(b) + } + + if arr.n_buffers > 0 { + freeAny(alloc, arr.buffers, int(arr.n_buffers)) + } + + if arr.dictionary != nil { + C.ArrowArrayRelease(arr.dictionary) + freeAny(alloc, arr.dictionary, 1) + } + + if arr.n_children > 0 { + children := unsafe.Slice(arr.children, arr.n_children) + for _, c := range children { + C.ArrowArrayRelease(c) + freeTestMallocatorArr(c, alloc) + } + + freeAny(alloc, arr.children, int(arr.n_children)) + } + + h.Delete() + C.free(unsafe.Pointer(arr.private_data)) +} + +func allocateBufferMallocatorPtrArr(alloc *mallocator.Mallocator, n int) []*C.void { + raw := alloc.Allocate(int(unsafe.Sizeof((*C.void)(nil))) * n) + return unsafe.Slice((**C.void)(unsafe.Pointer(&raw[0])), n) +} + +func allocateChildrenPtrArr(alloc *mallocator.Mallocator, n int) []*CArrowArray { + raw := alloc.Allocate(int(unsafe.Sizeof((*CArrowArray)(nil))) * n) + return unsafe.Slice((**CArrowArray)(unsafe.Pointer(&raw[0])), n) } -func createCArr(arr arrow.Array) *CArrowArray { +func createCArr(arr arrow.Array, alloc *mallocator.Mallocator) *CArrowArray { var ( - carr = C.get_test_arr() + carr = getTestArr(alloc) children = (**CArrowArray)(nil) nchildren = C.int64_t(0) ) switch arr := arr.(type) { - case *array.List: - clist := []*CArrowArray{createCArr(arr.ListValues())} - children = (**CArrowArray)(unsafe.Pointer(&clist[0])) - nchildren += 1 - case *array.LargeList: - clist := []*CArrowArray{createCArr(arr.ListValues())} - children = (**CArrowArray)(unsafe.Pointer(&clist[0])) - nchildren += 1 - case *array.FixedSizeList: - clist := []*CArrowArray{createCArr(arr.ListValues())} + case array.ListLike: + clist := allocateChildrenPtrArr(alloc, 1) + clist[0] = createCArr(arr.ListValues(), alloc) children = (**CArrowArray)(unsafe.Pointer(&clist[0])) nchildren += 1 case *array.Struct: - clist := []*CArrowArray{} + clist := allocateChildrenPtrArr(alloc, arr.NumField()) for i := 0; i < arr.NumField(); i++ { - clist = append(clist, createCArr(arr.Field(i))) + clist[i] = createCArr(arr.Field(i), alloc) nchildren += 1 } children = (**CArrowArray)(unsafe.Pointer(&clist[0])) - case *array.Map: - clist := []*CArrowArray{createCArr(arr.ListValues())} - children = (**CArrowArray)(unsafe.Pointer(&clist[0])) - nchildren += 1 case *array.RunEndEncoded: - clist := []*CArrowArray{createCArr(arr.RunEndsArr()), - createCArr(arr.Values())} + clist := allocateChildrenPtrArr(alloc, 2) + clist[0] = createCArr(arr.RunEndsArr(), alloc) + clist[1] = createCArr(arr.Values(), alloc) children = (**CArrowArray)(unsafe.Pointer(&clist[0])) nchildren += 2 case array.Union: - clist := []*CArrowArray{} + clist := allocateChildrenPtrArr(alloc, arr.NumFields()) for i := 0; i < arr.NumFields(); i++ { - clist = append(clist, createCArr(arr.Field(i))) + clist[i] = createCArr(arr.Field(i), alloc) nchildren += 1 } children = (**CArrowArray)(unsafe.Pointer(&clist[0])) @@ -282,21 +336,36 @@ func createCArr(arr arrow.Array) *CArrowArray { carr.length = C.int64_t(arr.Len()) carr.null_count = C.int64_t(arr.NullN()) carr.offset = C.int64_t(arr.Data().Offset()) - carr.release = (*[0]byte)(C.release_test_arr) + carr.release = (*[0]byte)(C.goReleaseTestArray) + tr := &testReleaser{alloc: alloc} + h := cgo.NewHandle(tr) + carr.private_data = createHandle(h) buffers := arr.Data().Buffers() - if len(buffers) == 0 { + bufOffset, nbuffers := 0, len(buffers) + hasValidityBitmap := internal.DefaultHasValidityBitmap(arr.DataType().ID()) + if nbuffers > 0 && !hasValidityBitmap { + nbuffers-- + bufOffset++ + } + + if nbuffers == 0 { return carr } - cbufs := allocateBufferPtrArr(len(buffers)) - for i, b := range buffers { + tr.bufs = make([][]byte, 0, nbuffers) + cbufs := allocateBufferMallocatorPtrArr(alloc, nbuffers) + for i, b := range buffers[bufOffset:] { if b != nil { - cbufs[i] = (*C.void)(C.CBytes(b.Bytes())) + raw := alloc.Allocate(b.Len()) + copy(raw, b.Bytes()) + tr.bufs = append(tr.bufs, raw) + cbufs[i] = (*C.void)(unsafe.Pointer(&raw[0])) } else { cbufs[i] = nil } } + carr.n_buffers = C.int64_t(len(cbufs)) if len(cbufs) > 0 { carr.buffers = (*unsafe.Pointer)(unsafe.Pointer(&cbufs[0])) diff --git a/go/arrow/cdata/exports.go b/go/arrow/cdata/exports.go index 4dc2364da2913..9d2576818e31c 100644 --- a/go/arrow/cdata/exports.go +++ b/go/arrow/cdata/exports.go @@ -17,7 +17,6 @@ package cdata import ( - "reflect" "runtime/cgo" "unsafe" @@ -59,12 +58,7 @@ func releaseExportedSchema(schema *CArrowSchema) { C.free(unsafe.Pointer(schema.dictionary)) } - var children []*CArrowSchema - s := (*reflect.SliceHeader)(unsafe.Pointer(&children)) - s.Data = uintptr(unsafe.Pointer(schema.children)) - s.Len = int(schema.n_children) - s.Cap = int(schema.n_children) - + children := unsafe.Slice(schema.children, schema.n_children) for _, c := range children { C.ArrowSchemaRelease(c) } @@ -106,11 +100,7 @@ func releaseExportedArray(arr *CArrowArray) { } if arr.n_children > 0 { - var children []*CArrowArray - s := (*reflect.SliceHeader)(unsafe.Pointer(&children)) - s.Data = uintptr(unsafe.Pointer(arr.children)) - s.Len = int(arr.n_children) - s.Cap = int(arr.n_children) + children := unsafe.Slice(arr.children, arr.n_children) for _, c := range children { C.ArrowArrayRelease(c) diff --git a/go/arrow/cdata/import_allocator.go b/go/arrow/cdata/import_allocator.go new file mode 100644 index 0000000000000..eff8c7517caef --- /dev/null +++ b/go/arrow/cdata/import_allocator.go @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cdata + +import ( + "sync/atomic" + "unsafe" + + "github.com/apache/arrow/go/v14/arrow/internal/debug" +) + +// #include "arrow/c/helpers.h" +// #include +import "C" + +type importAllocator struct { + bufCount int64 + + arr *CArrowArray +} + +func (i *importAllocator) addBuffer() { + atomic.AddInt64(&i.bufCount, 1) +} + +func (*importAllocator) Allocate(int) []byte { + panic("cannot allocate from importAllocator") +} + +func (*importAllocator) Reallocate(int, []byte) []byte { + panic("cannot reallocate from importAllocator") +} + +func (i *importAllocator) Free([]byte) { + debug.Assert(atomic.LoadInt64(&i.bufCount) > 0, "too many releases") + + if atomic.AddInt64(&i.bufCount, -1) == 0 { + defer C.free(unsafe.Pointer(i.arr)) + C.ArrowArrayRelease(i.arr) + if C.ArrowArrayIsReleased(i.arr) != 1 { + panic("did not release C mem") + } + } +} diff --git a/go/arrow/cdata/interface.go b/go/arrow/cdata/interface.go index c7e81dde9f063..bf5c5270bae2f 100644 --- a/go/arrow/cdata/interface.go +++ b/go/arrow/cdata/interface.go @@ -116,6 +116,7 @@ func ImportCRecordBatchWithSchema(arr *CArrowArray, sc *arrow.Schema) (arrow.Rec if err != nil { return nil, err } + defer imp.data.Release() st := array.NewStructData(imp.data) defer st.Release() diff --git a/go/arrow/cdata/utils.h b/go/arrow/cdata/utils.h index f38281057b495..dda46b72b728b 100644 --- a/go/arrow/cdata/utils.h +++ b/go/arrow/cdata/utils.h @@ -17,17 +17,6 @@ // +build cgo // +build test -/* - Function check_for_endianness() returns 1, if architecture - is little endian, 0 in case of big endian. -*/ -inline int is_little_endian() -{ - unsigned int x = 1; - char *c = (char*) &x; - return (int)*c; -} - // metadata keys 1: {"key1", "key2"} // metadata values 1: {"", "bar"} static const char kEncodedMeta1LE[] = { diff --git a/go/arrow/memory/buffer.go b/go/arrow/memory/buffer.go index 2ddb3f829c833..5a2b4297031f0 100644 --- a/go/arrow/memory/buffer.go +++ b/go/arrow/memory/buffer.go @@ -33,6 +33,18 @@ type Buffer struct { parent *Buffer } +// NewBufferWithAllocator returns a buffer with the mutable flag set +// as false. The intention here is to allow wrapping a byte slice along +// with an allocator as a buffer to track the lifetime via refcounts +// in order to call Free when the refcount goes to zero. +// +// The primary example this is used for, is currently importing data +// through the c data interface and tracking the lifetime of the +// imported buffers. +func NewBufferWithAllocator(data []byte, mem Allocator) *Buffer { + return &Buffer{refCount: 1, buf: data, length: len(data), mem: mem} +} + // NewBufferBytes creates a fixed-size buffer from the specified data. func NewBufferBytes(data []byte) *Buffer { return &Buffer{refCount: 0, buf: data, length: len(data)}