diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 3c695891b48d6..ad8fedb9bd9e4 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -232,7 +232,7 @@ jobs: name: AMD64 Windows 2019 Go ${{ matrix.go }} runs-on: windows-2019 if: ${{ !contains(github.event.pull_request.title, 'WIP') }} - timeout-minutes: 15 + timeout-minutes: 25 strategy: fail-fast: false matrix: diff --git a/ci/scripts/go_build.sh b/ci/scripts/go_build.sh index 3c8cc0f4ee2e2..2a38901337c56 100755 --- a/ci/scripts/go_build.sh +++ b/ci/scripts/go_build.sh @@ -41,3 +41,22 @@ pushd ${source_dir}/parquet go install -v ./... popd + +if [[ -n "${ARROW_GO_INTEGRATION}" ]]; then + pushd ${source_dir}/arrow/internal/cdata_integration + + case "$(uname)" in + Linux) + go_lib="arrow_go_integration.so" + ;; + Darwin) + go_lib="arrow_go_integration.so" + ;; + MINGW*) + go_lib="arrow_go_integration.dll" + ;; + esac + go build -tags cdata_integration,assert -buildmode=c-shared -o ${go_lib} . + + popd +fi diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 2fd1d2d7f0c44..a780d33cbf323 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -70,6 +70,7 @@ def __init__(self, json_files, self.serial = serial self.gold_dirs = gold_dirs self.failures: List[Outcome] = [] + self.skips: List[Outcome] = [] self.match = match if self.match is not None: @@ -207,6 +208,8 @@ def case_wrapper(test_case): self.failures.append(outcome.failure) if self.stop_on_error: break + elif outcome.skipped: + self.skips.append(outcome) else: with ThreadPoolExecutor() as executor: @@ -215,6 +218,8 @@ def case_wrapper(test_case): self.failures.append(outcome.failure) if self.stop_on_error: break + elif outcome.skipped: + self.skips.append(outcome) def _compare_ipc_implementations( self, @@ -638,7 +643,7 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True, log(f'{exc_type}: {exc_value}') log() - log(fail_count, "failures") + log(f"{fail_count} failures, {len(runner.skips)} skips") if fail_count > 0: sys.exit(1) diff --git a/dev/archery/archery/integration/tester_go.py b/dev/archery/archery/integration/tester_go.py index fea33cd0ac6c1..6fa26ea02b8e7 100644 --- a/dev/archery/archery/integration/tester_go.py +++ b/dev/archery/archery/integration/tester_go.py @@ -16,11 +16,14 @@ # under the License. import contextlib +import functools import os import subprocess -from .tester import Tester +from . import cdata +from .tester import Tester, CDataExporter, CDataImporter from .util import run_cmd, log +from ..utils.source import ARROW_ROOT_DEFAULT # FIXME(sbinet): revisit for Go modules @@ -39,12 +42,21 @@ "localhost", ] +_dll_suffix = ".dll" if os.name == "nt" else ".so" + +_DLL_PATH = os.path.join( + ARROW_ROOT_DEFAULT, + "go/arrow/internal/cdata_integration") +_INTEGRATION_DLL = os.path.join(_DLL_PATH, "arrow_go_integration" + _dll_suffix) + class GoTester(Tester): PRODUCER = True CONSUMER = True FLIGHT_SERVER = True FLIGHT_CLIENT = True + C_DATA_EXPORTER = True + C_DATA_IMPORTER = True name = 'Go' @@ -119,3 +131,123 @@ def flight_request(self, port, json_path=None, scenario_name=None): if self.debug: log(' '.join(cmd)) run_cmd(cmd) + + def make_c_data_exporter(self): + return GoCDataExporter(self.debug, self.args) + + def make_c_data_importer(self): + return GoCDataImporter(self.debug, self.args) + + +_go_c_data_entrypoints = """ + const char* ArrowGo_ExportSchemaFromJson(const char* json_path, + uintptr_t out); + const char* ArrowGo_ImportSchemaAndCompareToJson( + const char* json_path, uintptr_t c_schema); + + const char* ArrowGo_ExportBatchFromJson(const char* json_path, + int num_batch, + uintptr_t out); + const char* ArrowGo_ImportBatchAndCompareToJson( + const char* json_path, int num_batch, uintptr_t c_array); + + int64_t ArrowGo_BytesAllocated(); + void ArrowGo_RunGC(); + void ArrowGo_FreeError(const char*); + """ + + +@functools.lru_cache +def _load_ffi(ffi, lib_path=_INTEGRATION_DLL): + ffi.cdef(_go_c_data_entrypoints) + dll = ffi.dlopen(lib_path) + return dll + + +class _CDataBase: + + def __init__(self, debug, args): + self.debug = debug + self.args = args + self.ffi = cdata.ffi() + self.dll = _load_ffi(self.ffi) + + def _pointer_to_int(self, c_ptr): + return self.ffi.cast('uintptr_t', c_ptr) + + def _check_go_error(self, go_error): + """ + Check a `const char*` error return from an integration entrypoint. + + A null means success, a non-empty string is an error message. + The string is dynamically allocated on the Go side. + """ + assert self.ffi.typeof(go_error) is self.ffi.typeof("const char*") + if go_error != self.ffi.NULL: + try: + error = self.ffi.string(go_error).decode('utf8', + errors='replace') + raise RuntimeError( + f"Go C Data Integration call failed: {error}") + finally: + self.dll.ArrowGo_FreeError(go_error) + + def _run_gc(self): + self.dll.ArrowGo_RunGC() + + +class GoCDataExporter(CDataExporter, _CDataBase): + # Note: the Arrow Go C Data export functions expect their output + # ArrowStream or ArrowArray argument to be zero-initialized. + # This is currently ensured through the use of `ffi.new`. + + def export_schema_from_json(self, json_path, c_schema_ptr): + go_error = self.dll.ArrowGo_ExportSchemaFromJson( + str(json_path).encode(), self._pointer_to_int(c_schema_ptr)) + self._check_go_error(go_error) + + def export_batch_from_json(self, json_path, num_batch, c_array_ptr): + go_error = self.dll.ArrowGo_ExportBatchFromJson( + str(json_path).encode(), num_batch, + self._pointer_to_int(c_array_ptr)) + self._check_go_error(go_error) + + @property + def supports_releasing_memory(self): + return True + + def record_allocation_state(self): + self._run_gc() + return self.dll.ArrowGo_BytesAllocated() + + def compare_allocation_state(self, recorded, gc_until): + def pred(): + return self.record_allocation_state() == recorded + + return gc_until(pred) + + +class GoCDataImporter(CDataImporter, _CDataBase): + + def import_schema_and_compare_to_json(self, json_path, c_schema_ptr): + go_error = self.dll.ArrowGo_ImportSchemaAndCompareToJson( + str(json_path).encode(), self._pointer_to_int(c_schema_ptr)) + self._check_go_error(go_error) + + def import_batch_and_compare_to_json(self, json_path, num_batch, + c_array_ptr): + go_error = self.dll.ArrowGo_ImportBatchAndCompareToJson( + str(json_path).encode(), num_batch, + self._pointer_to_int(c_array_ptr)) + self._check_go_error(go_error) + + @property + def supports_releasing_memory(self): + return True + + def gc_until(self, predicate): + for i in range(10): + if predicate(): + return True + self._run_gc() + return False diff --git a/docker-compose.yml b/docker-compose.yml index 8ae06900c57f9..62e5aee0a841c 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1732,6 +1732,7 @@ services: <<: [*common, *ccache] # tell archery where the arrow binaries are located ARROW_CPP_EXE_PATH: /build/cpp/debug + ARROW_GO_INTEGRATION: 1 ARCHERY_INTEGRATION_WITH_RUST: 0 command: ["/arrow/ci/scripts/rust_build.sh /arrow /build && diff --git a/go/arrow/cdata/cdata.go b/go/arrow/cdata/cdata.go index bc8fc6e987b93..dc8825a7edb67 100644 --- a/go/arrow/cdata/cdata.go +++ b/go/arrow/cdata/cdata.go @@ -197,7 +197,7 @@ func importSchema(schema *CArrowSchema) (ret arrow.Field, err error) { // handle types with params via colon typs := strings.Split(f, ":") - defaulttz := "UTC" + defaulttz := "" switch typs[0] { case "tss": tz := typs[1] diff --git a/go/arrow/cdata/cdata_exports.go b/go/arrow/cdata/cdata_exports.go index ae6247494b100..187c2deb9755f 100644 --- a/go/arrow/cdata/cdata_exports.go +++ b/go/arrow/cdata/cdata_exports.go @@ -368,34 +368,36 @@ func exportArray(arr arrow.Array, out *CArrowArray, outSchema *CArrowSchema) { exportField(arrow.Field{Type: arr.DataType()}, outSchema) } + nbuffers := len(arr.Data().Buffers()) + buf_offset := 0 + // Some types don't have validity bitmaps, but we keep them shifted + // to make processing easier in other contexts. This means that + // we have to adjust when exporting. + has_validity_bitmap := internal.DefaultHasValidityBitmap(arr.DataType().ID()) + if nbuffers > 0 && !has_validity_bitmap { + nbuffers-- + buf_offset++ + } + out.dictionary = nil out.null_count = C.int64_t(arr.NullN()) out.length = C.int64_t(arr.Len()) out.offset = C.int64_t(arr.Data().Offset()) - out.n_buffers = C.int64_t(len(arr.Data().Buffers())) - - if out.n_buffers > 0 { - var ( - nbuffers = len(arr.Data().Buffers()) - bufs = arr.Data().Buffers() - ) - // unions don't have validity bitmaps, but we keep them shifted - // to make processing easier in other contexts. This means that - // we have to adjust for union arrays - if !internal.DefaultHasValidityBitmap(arr.DataType().ID()) { - out.n_buffers-- - nbuffers-- - bufs = bufs[1:] - } + out.n_buffers = C.int64_t(nbuffers) + out.buffers = nil + + if nbuffers > 0 { + bufs := arr.Data().Buffers() buffers := allocateBufferPtrArr(nbuffers) - for i := range bufs { - buf := bufs[i] + for i, buf := range bufs[buf_offset:] { if buf == nil || buf.Len() == 0 { - if i > 0 || !internal.DefaultHasValidityBitmap(arr.DataType().ID()) { + if i > 0 || !has_validity_bitmap { // apache/arrow#33936: export a dummy buffer to be friendly to // implementations that don't import NULL properly buffers[i] = (*C.void)(unsafe.Pointer(&C.kGoCdataZeroRegion)) } else { + // null pointer permitted for the validity bitmap + // (assuming null count is 0) buffers[i] = nil } continue diff --git a/go/arrow/cdata/cdata_test.go b/go/arrow/cdata/cdata_test.go index a0c2f25496a6b..af05649b1c541 100644 --- a/go/arrow/cdata/cdata_test.go +++ b/go/arrow/cdata/cdata_test.go @@ -184,13 +184,17 @@ func TestImportTemporalSchema(t *testing.T) { {arrow.FixedWidthTypes.MonthInterval, "tiM"}, {arrow.FixedWidthTypes.DayTimeInterval, "tiD"}, {arrow.FixedWidthTypes.MonthDayNanoInterval, "tin"}, - {arrow.FixedWidthTypes.Timestamp_s, "tss:"}, + {arrow.FixedWidthTypes.Timestamp_s, "tss:UTC"}, + {&arrow.TimestampType{Unit: arrow.Second}, "tss:"}, {&arrow.TimestampType{Unit: arrow.Second, TimeZone: "Europe/Paris"}, "tss:Europe/Paris"}, - {arrow.FixedWidthTypes.Timestamp_ms, "tsm:"}, + {arrow.FixedWidthTypes.Timestamp_ms, "tsm:UTC"}, + {&arrow.TimestampType{Unit: arrow.Millisecond}, "tsm:"}, {&arrow.TimestampType{Unit: arrow.Millisecond, TimeZone: "Europe/Paris"}, "tsm:Europe/Paris"}, - {arrow.FixedWidthTypes.Timestamp_us, "tsu:"}, + {arrow.FixedWidthTypes.Timestamp_us, "tsu:UTC"}, + {&arrow.TimestampType{Unit: arrow.Microsecond}, "tsu:"}, {&arrow.TimestampType{Unit: arrow.Microsecond, TimeZone: "Europe/Paris"}, "tsu:Europe/Paris"}, - {arrow.FixedWidthTypes.Timestamp_ns, "tsn:"}, + {arrow.FixedWidthTypes.Timestamp_ns, "tsn:UTC"}, + {&arrow.TimestampType{Unit: arrow.Nanosecond}, "tsn:"}, {&arrow.TimestampType{Unit: arrow.Nanosecond, TimeZone: "Europe/Paris"}, "tsn:Europe/Paris"}, } diff --git a/go/arrow/internal/arrjson/reader.go b/go/arrow/internal/arrjson/reader.go index 34b9b6e10ec4a..c8056ef1dc744 100644 --- a/go/arrow/internal/arrjson/reader.go +++ b/go/arrow/internal/arrjson/reader.go @@ -82,6 +82,8 @@ func (r *Reader) Release() { r.recs[i] = nil } } + r.memo.Clear() + r.memo = nil } } func (r *Reader) Schema() *arrow.Schema { return r.schema } @@ -96,6 +98,14 @@ func (r *Reader) Read() (arrow.Record, error) { return rec, nil } +func (r *Reader) ReadAt(index int) (arrow.Record, error) { + if index >= r.NumRecords() { + return nil, io.EOF + } + rec := r.recs[index] + return rec, nil +} + var ( _ arrio.Reader = (*Reader)(nil) ) diff --git a/go/arrow/internal/cdata_integration/entrypoints.go b/go/arrow/internal/cdata_integration/entrypoints.go new file mode 100644 index 0000000000000..629b8a762a689 --- /dev/null +++ b/go/arrow/internal/cdata_integration/entrypoints.go @@ -0,0 +1,192 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build cdata_integration +// +build cdata_integration + +package main + +import ( + "fmt" + "os" + "runtime" + "unsafe" + + "github.com/apache/arrow/go/v14/arrow/array" + "github.com/apache/arrow/go/v14/arrow/cdata" + "github.com/apache/arrow/go/v14/arrow/internal/arrjson" + "github.com/apache/arrow/go/v14/arrow/memory" +) + +// #include +// #include +import "C" + +var alloc = memory.NewCheckedAllocator(memory.NewGoAllocator()) + +//export ArrowGo_BytesAllocated +func ArrowGo_BytesAllocated() int64 { + return int64(alloc.CurrentAlloc()) +} + +//export ArrowGo_RunGC +func ArrowGo_RunGC() { + runtime.GC() +} + +//export ArrowGo_FreeError +func ArrowGo_FreeError(cError *C.char) { + C.free(unsafe.Pointer(cError)) +} + +// When used in a defer() statement, this functions catches an incoming +// panic and converts it into a regular error. This avoids crashing the +// archery integration process and lets other tests proceed. +// Not all panics may be caught and some will still crash the process, though. +func handlePanic(err *error) { + if e := recover(); e != nil { + // Add a prefix while wrapping the panic-error + *err = fmt.Errorf("panic: %w", e.(error)) + } +} + +func newJsonReader(cJsonPath *C.char) (*arrjson.Reader, error) { + jsonPath := C.GoString(cJsonPath) + + f, err := os.Open(jsonPath) + if err != nil { + return nil, fmt.Errorf("could not open JSON file %q: %w", jsonPath, err) + } + defer f.Close() + + jsonReader, err := arrjson.NewReader(f, arrjson.WithAllocator(alloc)) + if err != nil { + return nil, fmt.Errorf("could not open JSON file reader from file %q: %w", jsonPath, err) + } + return jsonReader, nil +} + +func exportSchemaFromJson(cJsonPath *C.char, out *cdata.CArrowSchema) error { + jsonReader, err := newJsonReader(cJsonPath) + if err != nil { + return err + } + defer jsonReader.Release() + schema := jsonReader.Schema() + defer handlePanic(&err) + cdata.ExportArrowSchema(schema, out) + return err +} + +func importSchemaAndCompareToJson(cJsonPath *C.char, cSchema *cdata.CArrowSchema) error { + jsonReader, err := newJsonReader(cJsonPath) + if err != nil { + return err + } + defer jsonReader.Release() + schema := jsonReader.Schema() + importedSchema, err := cdata.ImportCArrowSchema(cSchema) + if err != nil { + return err + } + if !schema.Equal(importedSchema) || !schema.Metadata().Equal(importedSchema.Metadata()) { + return fmt.Errorf( + "Schemas are different:\n- Json Schema: %s\n- Imported Schema: %s", + schema.String(), + importedSchema.String()) + } + return nil +} + +func exportBatchFromJson(cJsonPath *C.char, num_batch int, out *cdata.CArrowArray) error { + // XXX this function exports a single batch at a time, but the JSON reader + // reads all batches at construction. + jsonReader, err := newJsonReader(cJsonPath) + if err != nil { + return err + } + defer jsonReader.Release() + batch, err := jsonReader.ReadAt(num_batch) + if err != nil { + return err + } + defer handlePanic(&err) + cdata.ExportArrowRecordBatch(batch, out, nil) + return err +} + +func importBatchAndCompareToJson(cJsonPath *C.char, num_batch int, cArray *cdata.CArrowArray) error { + jsonReader, err := newJsonReader(cJsonPath) + if err != nil { + return err + } + defer jsonReader.Release() + schema := jsonReader.Schema() + batch, err := jsonReader.ReadAt(num_batch) + if err != nil { + return err + } + + importedBatch, err := cdata.ImportCRecordBatchWithSchema(cArray, schema) + if err != nil { + return err + } + defer importedBatch.Release() + if !array.RecordEqual(batch, importedBatch) { + return fmt.Errorf( + "Batches are different:\n- Json Batch: %v\n- Imported Batch: %v", + batch, importedBatch) + } + return nil +} + +//export ArrowGo_ExportSchemaFromJson +func ArrowGo_ExportSchemaFromJson(cJsonPath *C.char, out uintptr) *C.char { + err := exportSchemaFromJson(cJsonPath, cdata.SchemaFromPtr(out)) + if err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export ArrowGo_ExportBatchFromJson +func ArrowGo_ExportBatchFromJson(cJsonPath *C.char, num_batch int, out uintptr) *C.char { + err := exportBatchFromJson(cJsonPath, num_batch, cdata.ArrayFromPtr(out)) + if err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export ArrowGo_ImportSchemaAndCompareToJson +func ArrowGo_ImportSchemaAndCompareToJson(cJsonPath *C.char, cSchema uintptr) *C.char { + err := importSchemaAndCompareToJson(cJsonPath, cdata.SchemaFromPtr(cSchema)) + if err != nil { + return C.CString(err.Error()) + } + return nil +} + +//export ArrowGo_ImportBatchAndCompareToJson +func ArrowGo_ImportBatchAndCompareToJson(cJsonPath *C.char, num_batch int, cArray uintptr) *C.char { + err := importBatchAndCompareToJson(cJsonPath, num_batch, cdata.ArrayFromPtr(cArray)) + if err != nil { + return C.CString(err.Error()) + } + return nil +} + +func main() {}