From 365abee3db315f6d5196f4344db517b15379f6f5 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 19 Sep 2023 15:42:07 +0200 Subject: [PATCH] GH-37789: [Integration][Go] Go C Data Interface integration testing --- ci/scripts/integration_arrow.sh | 30 ++- dev/archery/archery/integration/tester_go.py | 135 +++++++++++- go/arrow/cdata/cdata.go | 2 +- go/arrow/cdata/cdata_exports.go | 39 ++-- go/arrow/cdata/cdata_test.go | 12 +- go/arrow/internal/arrjson/reader.go | 10 + .../internal/cdata_integration/entrypoints.go | 193 ++++++++++++++++++ 7 files changed, 392 insertions(+), 29 deletions(-) create mode 100644 go/arrow/internal/cdata_integration/entrypoints.go diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh index a165f8027bf8f..3f8043af9ac0b 100755 --- a/ci/scripts/integration_arrow.sh +++ b/ci/scripts/integration_arrow.sh @@ -24,15 +24,35 @@ gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration pip install -e $arrow_dir/dev/archery[integration] +# --run-ipc \ +# --run-flight \ + +# XXX Can we better integrate this with the rest of the Go build tooling? +pushd ${arrow_dir}/go/arrow/internal/cdata_integration + +case "$(uname)" in + Linux) + go_lib="arrow_go_integration.so" + ;; + Darwin) + go_lib="arrow_go_integration.so" + ;; + MINGW*) + go_lib="arrow_go_integration.dll" + ;; +esac + +go build -tags cdata_integration,assert -buildmode=c-shared -o ${go_lib} . + +popd + # Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1 time archery integration \ --run-c-data \ - --run-ipc \ - --run-flight \ --with-cpp=1 \ - --with-csharp=1 \ - --with-java=1 \ - --with-js=1 \ + --with-csharp=0 \ + --with-java=0 \ + --with-js=0 \ --with-go=1 \ --gold-dirs=$gold_dir/0.14.1 \ --gold-dirs=$gold_dir/0.17.1 \ diff --git a/dev/archery/archery/integration/tester_go.py b/dev/archery/archery/integration/tester_go.py index fea33cd0ac6c1..88c108c63f978 100644 --- a/dev/archery/archery/integration/tester_go.py +++ b/dev/archery/archery/integration/tester_go.py @@ -16,11 +16,14 @@ # under the License. import contextlib +import functools import os import subprocess -from .tester import Tester +from . import cdata +from .tester import Tester, CDataExporter, CDataImporter from .util import run_cmd, log +from ..utils.source import ARROW_ROOT_DEFAULT # FIXME(sbinet): revisit for Go modules @@ -39,12 +42,21 @@ "localhost", ] +_dll_suffix = ".dll" if os.name == "nt" else ".so" + +_DLL_PATH = os.path.join( + ARROW_ROOT_DEFAULT, + "go/arrow/internal/cdata_integration") +_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" + _dll_suffix) + class GoTester(Tester): PRODUCER = True CONSUMER = True FLIGHT_SERVER = True FLIGHT_CLIENT = True + C_DATA_EXPORTER = True + C_DATA_IMPORTER = True name = 'Go' @@ -119,3 +131,124 @@ def flight_request(self, port, json_path=None, scenario_name=None): if self.debug: log(' '.join(cmd)) run_cmd(cmd) + + def make_c_data_exporter(self): + return GoCDataExporter(self.debug, self.args) + + def make_c_data_importer(self): + return GoCDataImporter(self.debug, self.args) + + +_go_c_data_entrypoints = """ + const char* ArrowGo_ExportSchemaFromJson(const char* json_path, + uintptr_t out); + const char* ArrowGo_ImportSchemaAndCompareToJson( + const char* json_path, uintptr_t c_schema); + + const char* ArrowGo_ExportBatchFromJson(const char* json_path, + int num_batch, + uintptr_t out); + const char* ArrowGo_ImportBatchAndCompareToJson( + const char* json_path, int num_batch, uintptr_t c_array); + + int64_t ArrowGo_BytesAllocated(); + void ArrowGo_RunGC(); + void ArrowGo_FreeError(const char*); + """ + + +@functools.lru_cache +def _load_ffi(ffi, lib_path=_INTEGRATION_DLL): + ffi.cdef(_go_c_data_entrypoints) + dll = ffi.dlopen(lib_path) + dll.ArrowGo_ExportSchemaFromJson + return dll + + +class _CDataBase: + + def __init__(self, debug, args): + self.debug = debug + self.args = args + self.ffi = cdata.ffi() + self.dll = _load_ffi(self.ffi) + + def _pointer_to_int(self, c_ptr): + return self.ffi.cast('uintptr_t', c_ptr) + + def _check_go_error(self, go_error): + """ + Check a `const char*` error return from an integration entrypoint. + + A null means success, a non-empty string is an error message. + The string is dynamically allocated on the Go side. + """ + assert self.ffi.typeof(go_error) is self.ffi.typeof("const char*") + if go_error != self.ffi.NULL: + try: + error = self.ffi.string(go_error).decode('utf8', + errors='replace') + raise RuntimeError( + f"Go C Data Integration call failed: {error}") + finally: + self.dll.ArrowGo_FreeError(go_error) + + def _run_gc(self): + self.dll.ArrowGo_RunGC() + + +class GoCDataExporter(CDataExporter, _CDataBase): + # Note: the Arrow Go C Data export functions expect their output + # ArrowStream or ArrowArray argument to be zero-initialized. + # This is currently ensured through the use of `ffi.new`. + + def export_schema_from_json(self, json_path, c_schema_ptr): + go_error = self.dll.ArrowGo_ExportSchemaFromJson( + str(json_path).encode(), self._pointer_to_int(c_schema_ptr)) + self._check_go_error(go_error) + + def export_batch_from_json(self, json_path, num_batch, c_array_ptr): + go_error = self.dll.ArrowGo_ExportBatchFromJson( + str(json_path).encode(), num_batch, + self._pointer_to_int(c_array_ptr)) + self._check_go_error(go_error) + + @property + def supports_releasing_memory(self): + return True + + def record_allocation_state(self): + self._run_gc() + return self.dll.ArrowGo_BytesAllocated() + + def compare_allocation_state(self, recorded, gc_until): + def pred(): + return self.record_allocation_state() == recorded + + return gc_until(pred) + + +class GoCDataImporter(CDataImporter, _CDataBase): + + def import_schema_and_compare_to_json(self, json_path, c_schema_ptr): + go_error = self.dll.ArrowGo_ImportSchemaAndCompareToJson( + str(json_path).encode(), self._pointer_to_int(c_schema_ptr)) + self._check_go_error(go_error) + + def import_batch_and_compare_to_json(self, json_path, num_batch, + c_array_ptr): + go_error = self.dll.ArrowGo_ImportBatchAndCompareToJson( + str(json_path).encode(), num_batch, + self._pointer_to_int(c_array_ptr)) + self._check_go_error(go_error) + + @property + def supports_releasing_memory(self): + return True + + def gc_until(self, predicate): + for i in range(10): + if predicate(): + return True + self._run_gc() + return False diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index bc8fc6e987b93..dc8825a7edb67 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -197,7 +197,7 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) { // handle types with params via colon typs := strings.Split(f, ":") - defaulttz := "UTC" + defaulttz := "" switch typs[0] { case "tss": tz := typs[1] diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go index ae6247494b100..ca6aa2fc21d74 100644 --- a/go/arrow/cdata/cdata_exports.go +++ b/go/arrow/cdata/cdata_exports.go @@ -368,34 +368,37 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { exportField(arrow.Field{Type: arr.DataType()}, outSchema) } + nbuffers := len(arr.Data().Buffers()) + buf_offset := 0 + // Some types don't have validity bitmaps, but we keep them shifted + // to make processing easier in other contexts. This means that + // we have to adjust when exporting. + has_validity_bitmap := internal.DefaultHasValidityBitmap(arr.DataType().ID()) + if nbuffers > 0 && !has_validity_bitmap { + nbuffers-- + buf_offset++ + } + out.dictionary = nil out.null_count = C.int64_t(arr.NullN()) out.length = C.int64_t(arr.Len()) out.offset = C.int64_t(arr.Data().Offset()) - out.n_buffers = C.int64_t(len(arr.Data().Buffers())) - - if out.n_buffers > 0 { - var ( - nbuffers = len(arr.Data().Buffers()) - bufs = arr.Data().Buffers() - ) - // unions don't have validity bitmaps, but we keep them shifted - // to make processing easier in other contexts. This means that - // we have to adjust for union arrays - if !internal.DefaultHasValidityBitmap(arr.DataType().ID()) { - out.n_buffers-- - nbuffers-- - bufs = bufs[1:] - } + out.n_buffers = C.int64_t(nbuffers) + out.buffers = nil + + if nbuffers > 0 { + bufs := arr.Data().Buffers() buffers := allocateBufferPtrArr(nbuffers) - for i := range bufs { - buf := bufs[i] + for i := 0; i < nbuffers; i++ { + buf := bufs[i + buf_offset] if buf == nil || buf.Len() == 0 { - if i > 0 || !internal.DefaultHasValidityBitmap(arr.DataType().ID()) { + if i > 0 || !has_validity_bitmap { // apache/arrow#33936: export a dummy buffer to be friendly to // implementations that don't import NULL properly buffers[i] = (*C.void)(unsafe.Pointer(&C.kGoCdataZeroRegion)) } else { + // null pointer permitted for the validity bitmap + // (assuming null count is 0) buffers[i] = nil } continue diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go index a0c2f25496a6b..af05649b1c541 100644 --- a/go/arrow/cdata/cdata_test.go +++ b/go/arrow/cdata/cdata_test.go @@ -184,13 +184,17 @@ func TestImportTemporalSchema(t *testing.T) { {arrow.FixedWidthTypes.MonthInterval, "tiM"}, {arrow.FixedWidthTypes.DayTimeInterval, "tiD"}, {arrow.FixedWidthTypes.MonthDayNanoInterval, "tin"}, - {arrow.FixedWidthTypes.Timestamp_s, "tss:"}, + {arrow.FixedWidthTypes.Timestamp_s, "tss:UTC"}, + {&arrow.TimestampType{Unit: arrow.Second}, "tss:"}, {&arrow.TimestampType{Unit: arrow.Second, TimeZone: "Europe/Paris"}, "tss:Europe/Paris"}, - {arrow.FixedWidthTypes.Timestamp_ms, "tsm:"}, + {arrow.FixedWidthTypes.Timestamp_ms, "tsm:UTC"}, + {&arrow.TimestampType{Unit: arrow.Millisecond}, "tsm:"}, {&arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "Europe/Paris"}, "tsm:Europe/Paris"}, - {arrow.FixedWidthTypes.Timestamp_us, "tsu:"}, + {arrow.FixedWidthTypes.Timestamp_us, "tsu:UTC"}, + {&arrow.TimestampType{Unit: arrow.Microsecond}, "tsu:"}, {&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "Europe/Paris"}, "tsu:Europe/Paris"}, - {arrow.FixedWidthTypes.Timestamp_ns, "tsn:"}, + {arrow.FixedWidthTypes.Timestamp_ns, "tsn:UTC"}, + {&arrow.TimestampType{Unit: arrow.Nanosecond}, "tsn:"}, {&arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "Europe/Paris"}, "tsn:Europe/Paris"}, } diff --git a/go/arrow/internal/arrjson/reader.go b/go/arrow/internal/arrjson/reader.go index 34b9b6e10ec4a..c8056ef1dc744 100644 --- a/go/arrow/internal/arrjson/reader.go +++ b/go/arrow/internal/arrjson/reader.go @@ -82,6 +82,8 @@ func (r *Reader) Release() { r.recs[i] = nil } } + r.memo.Clear() + r.memo = nil } } func (r *Reader) Schema() *arrow.Schema { return r.schema } @@ -96,6 +98,14 @@ func (r *Reader) Read() (arrow.Record, error) { return rec, nil } +func (r *Reader) ReadAt(index int) (arrow.Record, error) { + if index >= r.NumRecords() { + return nil, io.EOF + } + rec := r.recs[index] + return rec, nil +} + var ( _ arrio.Reader = (*Reader)(nil) ) diff --git a/go/arrow/internal/cdata_integration/entrypoints.go b/go/arrow/internal/cdata_integration/entrypoints.go new file mode 100644 index 0000000000000..827334ea034b6 --- /dev/null +++ b/go/arrow/internal/cdata_integration/entrypoints.go @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build cdata_integration +// +build cdata_integration + +package main + +import ( + "C" + "fmt" + "os" + "runtime" + "unsafe" + + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/cdata" + "github.com/apache/arrow/go/v14/arrow/internal/arrjson" + "github.com/apache/arrow/go/v14/arrow/memory" +) + +// #include +// #include +import "C" + +var alloc = memory.NewCheckedAllocator(memory.NewGoAllocator()) + +//export ArrowGo_BytesAllocated +func ArrowGo_BytesAllocated() int64 { + return int64(alloc.CurrentAlloc()) +} + +//export ArrowGo_RunGC +func ArrowGo_RunGC() { + runtime.GC() +} + +//export ArrowGo_FreeError +func ArrowGo_FreeError(cError *C.char) { + C.free(unsafe.Pointer(cError)) +} + +// When used in a defer() statement, this functions catches an incoming +// panic and converts it into a regular error. This avoids crashing the +// archery integration process and lets other tests proceed. +// Not all panics may be caught and some will still crash the process, though. +func handlePanic(err *error) { + if e := recover(); e != nil { + *err = e.(error) + } +} + +func newJsonReader(cJsonPath *C.char) (*arrjson.Reader, error) { + jsonPath := C.GoString(cJsonPath) + + f, err := os.Open(jsonPath) + if err != nil { + return nil, fmt.Errorf("could not open JSON file %q: %w", jsonPath, err) + } + defer f.Close() + + jsonReader, err := arrjson.NewReader(f, arrjson.WithAllocator(alloc)) + if err != nil { + return nil, fmt.Errorf("could not open JSON file reader from file %q: %w", jsonPath, err) + } + return jsonReader, nil +} + +func exportSchemaFromJson(cJsonPath *C.char, out *cdata.CArrowSchema) (err error) { + jsonReader, err := newJsonReader(cJsonPath) + if err != nil { + return err + } + defer jsonReader.Release() + schema := jsonReader.Schema() + defer handlePanic(&err) + cdata.ExportArrowSchema(schema, out) + return nil +} + +func importSchemaAndCompareToJson(cJsonPath *C.char, cSchema *cdata.CArrowSchema) (err error) { + jsonReader, err := newJsonReader(cJsonPath) + if err != nil { + return err + } + defer jsonReader.Release() + schema := jsonReader.Schema() + importedSchema, err := cdata.ImportCArrowSchema(cSchema) + if err != nil { + return err + } + if !schema.Equal(importedSchema) || !schema.Metadata().Equal(importedSchema.Metadata()) { + return fmt.Errorf( + "Schemas are different:\n- Json Schema: %s\n- Imported Schema: %s", + schema.String(), + importedSchema.String()); + } + return nil +} + +func exportBatchFromJson(cJsonPath *C.char, num_batch int, out *cdata.CArrowArray) (err error) { + // XXX this function exports a single batch at a time, but the JSON reader + // reads all batches at construction. + jsonReader, err := newJsonReader(cJsonPath) + if err != nil { + return err + } + defer jsonReader.Release() + batch, err := jsonReader.ReadAt(num_batch) + if err != nil { + return err + } + defer handlePanic(&err) + cdata.ExportArrowRecordBatch(batch, out, nil) + return nil +} + +func importBatchAndCompareToJson(cJsonPath *C.char, num_batch int, cArray *cdata.CArrowArray) (err error) { + jsonReader, err := newJsonReader(cJsonPath) + if err != nil { + return err + } + defer jsonReader.Release() + schema := jsonReader.Schema() + batch, err := jsonReader.ReadAt(num_batch) + if err != nil { + return err + } + + importedBatch, err := cdata.ImportCRecordBatchWithSchema(cArray, schema) + if err != nil { + return err + } + defer importedBatch.Release() + if !array.RecordEqual(batch, importedBatch) { + return fmt.Errorf( + "Batches are different:\n- Json Batch: %s\n- Imported Batch: %s", + "XXX", + "XXX"); + } + return nil +} + +//export ArrowGo_ExportSchemaFromJson +func ArrowGo_ExportSchemaFromJson(cJsonPath *C.char, out uintptr) *C.char { + err := exportSchemaFromJson(cJsonPath, cdata.SchemaFromPtr(out)) + if err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export ArrowGo_ExportBatchFromJson +func ArrowGo_ExportBatchFromJson(cJsonPath *C.char, num_batch int, out uintptr) *C.char { + err := exportBatchFromJson(cJsonPath, num_batch, cdata.ArrayFromPtr(out)) + if err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export ArrowGo_ImportSchemaAndCompareToJson +func ArrowGo_ImportSchemaAndCompareToJson(cJsonPath *C.char, cSchema uintptr) *C.char { + err := importSchemaAndCompareToJson(cJsonPath, cdata.SchemaFromPtr(cSchema)) + if err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export ArrowGo_ImportBatchAndCompareToJson +func ArrowGo_ImportBatchAndCompareToJson(cJsonPath *C.char, num_batch int, cArray uintptr) *C.char { + err := importBatchAndCompareToJson(cJsonPath, num_batch, cdata.ArrayFromPtr(cArray)) + if err != nil { + return C.CString(err.Error()) + } + return nil +} + +func main() {}