diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh index 30cbb2d63791c..a165f8027bf8f 100755 --- a/ci/scripts/integration_arrow.sh +++ b/ci/scripts/integration_arrow.sh @@ -22,10 +22,12 @@ set -ex arrow_dir=${1} gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration -pip install -e $arrow_dir/dev/archery +pip install -e $arrow_dir/dev/archery[integration] # Rust can be enabled by exporting ARCHERY_INTEGRATION_WITH_RUST=1 -archery integration \ +time archery integration \ + --run-c-data \ + --run-ipc \ --run-flight \ --with-cpp=1 \ --with-csharp=1 \ diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index f474d0c517fa0..9a6117011535e 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -383,7 +383,11 @@ endif() # if(ARROW_BUILD_INTEGRATION OR ARROW_BUILD_TESTS) - list(APPEND ARROW_SRCS integration/json_integration.cc integration/json_internal.cc) + list(APPEND + ARROW_SRCS + integration/c_data_integration_internal.cc + integration/json_integration.cc + integration/json_internal.cc) endif() if(ARROW_CSV) diff --git a/cpp/src/arrow/integration/c_data_integration_internal.cc b/cpp/src/arrow/integration/c_data_integration_internal.cc new file mode 100644 index 0000000000000..79e09eaf91a39 --- /dev/null +++ b/cpp/src/arrow/integration/c_data_integration_internal.cc @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/integration/c_data_integration_internal.h" + +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/integration/json_integration.h" +#include "arrow/io/file.h" +#include "arrow/memory_pool.h" +#include "arrow/pretty_print.h" +#include "arrow/record_batch.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" +#include "arrow/util/logging.h" + +namespace arrow::internal::integration { +namespace { + +template +const char* StatusToErrorString(Func&& func) { + static std::string error; + + Status st = func(); + if (st.ok()) { + return nullptr; + } + error = st.ToString(); + ARROW_CHECK_GT(error.length(), 0); + return error.c_str(); +} + +Result> ReadSchemaFromJson(const std::string& json_path, + MemoryPool* pool) { + ARROW_ASSIGN_OR_RAISE(auto file, io::ReadableFile::Open(json_path, pool)); + ARROW_ASSIGN_OR_RAISE(auto reader, IntegrationJsonReader::Open(pool, file)); + return reader->schema(); +} + +Result> ReadBatchFromJson(const std::string& json_path, + int num_batch, MemoryPool* pool) { + ARROW_ASSIGN_OR_RAISE(auto file, io::ReadableFile::Open(json_path, pool)); + ARROW_ASSIGN_OR_RAISE(auto reader, IntegrationJsonReader::Open(pool, file)); + return reader->ReadRecordBatch(num_batch); +} + +// XXX ideally, we should allow use of a custom memory pool in the C bridge API, +// but that requires non-trivial refactor + +Status ExportSchemaFromJson(std::string json_path, ArrowSchema* out) { + auto pool = default_memory_pool(); + ARROW_ASSIGN_OR_RAISE(auto schema, ReadSchemaFromJson(json_path, pool)); + return ExportSchema(*schema, out); +} + +Status ImportSchemaAndCompareToJson(std::string json_path, ArrowSchema* c_schema) { + auto pool = default_memory_pool(); + ARROW_ASSIGN_OR_RAISE(auto json_schema, ReadSchemaFromJson(json_path, pool)); + ARROW_ASSIGN_OR_RAISE(auto imported_schema, ImportSchema(c_schema)); + if (!imported_schema->Equals(json_schema, /*check_metadata=*/true)) { + return Status::Invalid("Schemas are different:", "\n- Json Schema: ", *json_schema, + "\n- Imported Schema: ", *imported_schema); + } + return Status::OK(); +} + +Status ExportBatchFromJson(std::string json_path, int num_batch, ArrowArray* out) { + auto pool = default_memory_pool(); + ARROW_ASSIGN_OR_RAISE(auto batch, ReadBatchFromJson(json_path, num_batch, pool)); + return ExportRecordBatch(*batch, out); +} + +Status ImportBatchAndCompareToJson(std::string json_path, int num_batch, + ArrowArray* c_batch) { + auto pool = default_memory_pool(); + ARROW_ASSIGN_OR_RAISE(auto batch, ReadBatchFromJson(json_path, num_batch, pool)); + ARROW_ASSIGN_OR_RAISE(auto imported_batch, ImportRecordBatch(c_batch, batch->schema())); + RETURN_NOT_OK(imported_batch->ValidateFull()); + if (!imported_batch->Equals(*batch, /*check_metadata=*/true)) { + std::stringstream pp_expected; + std::stringstream pp_actual; + PrettyPrintOptions options(/*indent=*/2); + options.window = 50; + ARROW_CHECK_OK(PrettyPrint(*batch, options, &pp_expected)); + ARROW_CHECK_OK(PrettyPrint(*imported_batch, options, &pp_actual)); + return Status::Invalid("Record Batches are different:", "\n- Json Batch: ", + pp_expected.str(), "\n- Imported Batch: ", pp_actual.str()); + } + return Status::OK(); +} + +} // namespace +} // namespace arrow::internal::integration + +const char* ArrowCpp_CDataIntegration_ExportSchemaFromJson(const char* json_path, + ArrowSchema* out) { + using namespace arrow::internal::integration; // NOLINT(build/namespaces) + return StatusToErrorString([=]() { return ExportSchemaFromJson(json_path, out); }); +} + +const char* ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(const char* json_path, + ArrowSchema* schema) { + using namespace arrow::internal::integration; // NOLINT(build/namespaces) + return StatusToErrorString( + [=]() { return ImportSchemaAndCompareToJson(json_path, schema); }); +} + +const char* ArrowCpp_CDataIntegration_ExportBatchFromJson(const char* json_path, + int num_batch, + ArrowArray* out) { + using namespace arrow::internal::integration; // NOLINT(build/namespaces) + return StatusToErrorString( + [=]() { return ExportBatchFromJson(json_path, num_batch, out); }); +} + +const char* ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(const char* json_path, + int num_batch, + ArrowArray* batch) { + using namespace arrow::internal::integration; // NOLINT(build/namespaces) + return StatusToErrorString( + [=]() { return ImportBatchAndCompareToJson(json_path, num_batch, batch); }); +} + +int64_t ArrowCpp_BytesAllocated() { + auto pool = arrow::default_memory_pool(); + return pool->bytes_allocated(); +} diff --git a/cpp/src/arrow/integration/c_data_integration_internal.h b/cpp/src/arrow/integration/c_data_integration_internal.h new file mode 100644 index 0000000000000..0a62363dffab3 --- /dev/null +++ b/cpp/src/arrow/integration/c_data_integration_internal.h @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "arrow/c/abi.h" +#include "arrow/util/visibility.h" + +// This file only serves as documentation for the C Data Interface integration +// entrypoints. The actual functions are called by Archery through DLL symbol lookup. + +extern "C" { + +ARROW_EXPORT +const char* ArrowCpp_CDataIntegration_ExportSchemaFromJson(const char* json_path, + ArrowSchema* out); + +ARROW_EXPORT +const char* ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson(const char* json_path, + ArrowSchema* schema); + +ARROW_EXPORT +const char* ArrowCpp_CDataIntegration_ExportBatchFromJson(const char* json_path, + int num_batch, ArrowArray* out); + +ARROW_EXPORT +const char* ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson(const char* json_path, + int num_batch, + ArrowArray* batch); + +ARROW_EXPORT +int64_t ArrowCpp_BytesAllocated(); + +} // extern "C" diff --git a/cpp/src/arrow/integration/json_integration.cc b/cpp/src/arrow/integration/json_integration.cc index 178abe5e8b687..590f6eddd7c24 100644 --- a/cpp/src/arrow/integration/json_integration.cc +++ b/cpp/src/arrow/integration/json_integration.cc @@ -144,10 +144,9 @@ class IntegrationJsonReader::Impl { } Result> ReadRecordBatch(int i) { - DCHECK_GE(i, 0) << "i out of bounds"; - DCHECK_LT(i, static_cast(record_batches_->GetArray().Size())) - << "i out of bounds"; - + if (i < 0 || i >= static_cast(record_batches_->GetArray().Size())) { + return Status::IndexError("record batch index ", i, " out of bounds"); + } return json::ReadRecordBatch(record_batches_->GetArray()[i], schema_, &dictionary_memo_, pool_); } diff --git a/cpp/src/arrow/symbols.map b/cpp/src/arrow/symbols.map index 9ef0e404bc091..0144e6116554b 100644 --- a/cpp/src/arrow/symbols.map +++ b/cpp/src/arrow/symbols.map @@ -32,6 +32,7 @@ }; # Also export C-level helpers arrow_*; + Arrow*; # ARROW-14771: export Protobuf symbol table descriptor_table_Flight_2eproto; descriptor_table_FlightSql_2eproto; diff --git a/dev/archery/archery/cli.py b/dev/archery/archery/cli.py index 70f865cc2fa70..7a3b45f9788e6 100644 --- a/dev/archery/archery/cli.py +++ b/dev/archery/archery/cli.py @@ -723,8 +723,12 @@ def _set_default(opt, default): envvar="ARCHERY_INTEGRATION_WITH_RUST") @click.option('--write_generated_json', default="", help='Generate test JSON to indicated path') +@click.option('--run-ipc', is_flag=True, default=False, + help='Run IPC integration tests') @click.option('--run-flight', is_flag=True, default=False, help='Run Flight integration tests') +@click.option('--run-c-data', is_flag=True, default=False, + help='Run C Data Interface integration tests') @click.option('--debug', is_flag=True, default=False, help='Run executables in debug mode as relevant') @click.option('--serial', is_flag=True, default=False, @@ -753,15 +757,19 @@ def integration(with_all=False, random_seed=12345, **args): gen_path = args['write_generated_json'] languages = ['cpp', 'csharp', 'java', 'js', 'go', 'rust'] + formats = ['ipc', 'flight', 'c_data'] enabled_languages = 0 for lang in languages: - param = 'with_{}'.format(lang) + param = f'with_{lang}' if with_all: args[param] = with_all + enabled_languages += args[param] - if args[param]: - enabled_languages += 1 + enabled_formats = 0 + for fmt in formats: + param = f'run_{fmt}' + enabled_formats += args[param] if gen_path: # XXX See GH-37575: this option is only used by the JS test suite @@ -769,8 +777,13 @@ def integration(with_all=False, random_seed=12345, **args): os.makedirs(gen_path, exist_ok=True) write_js_test_json(gen_path) else: + if enabled_formats == 0: + raise click.UsageError( + "Need to enable at least one format to test " + "(IPC, Flight, C Data Interface); try --help") if enabled_languages == 0: - raise Exception("Must enable at least 1 language to test") + raise click.UsageError( + "Need to enable at least one language to test; try --help") run_all_tests(**args) diff --git a/dev/archery/archery/integration/cdata.py b/dev/archery/archery/integration/cdata.py new file mode 100644 index 0000000000000..c201f5f867f8f --- /dev/null +++ b/dev/archery/archery/integration/cdata.py @@ -0,0 +1,107 @@ +# licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import cffi +from contextlib import contextmanager +import functools + +from .tester import CDataExporter, CDataImporter + + +_c_data_decls = """ + struct ArrowSchema { + // Array type description + const char* format; + const char* name; + const char* metadata; + int64_t flags; + int64_t n_children; + struct ArrowSchema** children; + struct ArrowSchema* dictionary; + + // Release callback + void (*release)(struct ArrowSchema*); + // Opaque producer-specific data + void* private_data; + }; + + struct ArrowArray { + // Array data description + int64_t length; + int64_t null_count; + int64_t offset; + int64_t n_buffers; + int64_t n_children; + const void** buffers; + struct ArrowArray** children; + struct ArrowArray* dictionary; + + // Release callback + void (*release)(struct ArrowArray*); + // Opaque producer-specific data + void* private_data; + }; + + struct ArrowArrayStream { + int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out); + int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out); + + const char* (*get_last_error)(struct ArrowArrayStream*); + + // Release callback + void (*release)(struct ArrowArrayStream*); + // Opaque producer-specific data + void* private_data; + }; + """ + + +@functools.lru_cache +def ffi() -> cffi.FFI: + """ + Return a FFI object supporting C Data Interface types. + """ + ffi = cffi.FFI() + ffi.cdef(_c_data_decls) + return ffi + + +@contextmanager +def check_memory_released(exporter: CDataExporter, importer: CDataImporter): + """ + A context manager for memory release checks. + + The context manager arranges cooperation between the exporter and importer + to try and release memory at the end of the enclosed block. + + However, if either the exporter or importer doesn't support deterministic + memory release, no memory check is performed. + """ + do_check = (exporter.supports_releasing_memory and + importer.supports_releasing_memory) + if do_check: + before = exporter.record_allocation_state() + yield + # We don't use a `finally` clause: if the enclosed block raised an + # exception, no need to add another one. + if do_check: + ok = exporter.compare_allocation_state(before, importer.gc_until) + if not ok: + after = exporter.record_allocation_state() + raise RuntimeError( + f"Memory was not released correctly after roundtrip: " + f"before = {before}, after = {after} (should have been equal)") diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py index f924c8a73cb8a..53f7ba58bff99 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -25,6 +25,7 @@ import numpy as np from .util import frombytes, tobytes, random_bytes, random_utf8 +from .util import SKIP_C_SCHEMA, SKIP_C_ARRAY def metadata_key_values(pairs): @@ -1224,15 +1225,16 @@ def get_json(self): class File(object): def __init__(self, name, schema, batches, dictionaries=None, - skip=None, path=None, quirks=None): + skip_testers=None, path=None, quirks=None): self.name = name self.schema = schema self.dictionaries = dictionaries or [] self.batches = batches - self.skip = set() + self.skipped_testers = set() + self.skipped_formats = {} self.path = path - if skip: - self.skip.update(skip) + if skip_testers: + self.skipped_testers.update(skip_testers) # For tracking flags like whether to validate decimal values # fit into the given precision (ARROW-13558). self.quirks = set() @@ -1258,14 +1260,39 @@ def write(self, path): f.write(json.dumps(self.get_json(), indent=2).encode('utf-8')) self.path = path - def skip_category(self, category): - """Skip this test for the given category. + def skip_tester(self, tester): + """Skip this test for the given tester (such as 'C#'). + """ + self.skipped_testers.add(tester) + return self - Category should be SKIP_ARROW or SKIP_FLIGHT. + def skip_format(self, format, tester='all'): + """Skip this test for the given format, and optionally tester. """ - self.skip.add(category) + self.skipped_formats.setdefault(format, set()).add(tester) return self + def add_skips_from(self, other_file): + """Add skips from another File object. + """ + self.skipped_testers.update(other_file.skipped_testers) + for format, testers in other_file.skipped_formats.items(): + self.skipped_formats.setdefault(format, set()).update(testers) + + def should_skip(self, tester, format): + """Whether this (tester, format) combination should be skipped. + """ + if tester in self.skipped_testers: + return True + testers = self.skipped_formats.get(format, ()) + return 'all' in testers or tester in testers + + @property + def num_batches(self): + """The number of record batches in this file. + """ + return len(self.batches) + def get_field(name, type_, **kwargs): if type_ == 'binary': @@ -1295,8 +1322,8 @@ def get_field(name, type_, **kwargs): raise TypeError(dtype) -def _generate_file(name, fields, batch_sizes, dictionaries=None, skip=None, - metadata=None): +def _generate_file(name, fields, batch_sizes, *, + dictionaries=None, metadata=None): schema = Schema(fields, metadata=metadata) batches = [] for size in batch_sizes: @@ -1307,7 +1334,7 @@ def _generate_file(name, fields, batch_sizes, dictionaries=None, skip=None, batches.append(RecordBatch(size, columns)) - return File(name, schema, batches, dictionaries, skip=skip) + return File(name, schema, batches, dictionaries) def generate_custom_metadata_case(): @@ -1666,8 +1693,8 @@ def _temp_path(): generate_primitive_case([0, 0, 0], name='primitive_zerolength'), generate_primitive_large_offsets_case([17, 20]) - .skip_category('C#') - .skip_category('JS'), + .skip_tester('C#') + .skip_tester('JS'), generate_null_case([10, 0]), @@ -1676,66 +1703,71 @@ def _temp_path(): generate_decimal128_case(), generate_decimal256_case() - .skip_category('JS'), + .skip_tester('JS'), generate_datetime_case(), generate_duration_case() - .skip_category('C#') - .skip_category('JS'), # TODO(ARROW-5239): Intervals + JS + .skip_tester('C#') + .skip_tester('JS'), # TODO(ARROW-5239): Intervals + JS generate_interval_case() - .skip_category('C#') - .skip_category('JS'), # TODO(ARROW-5239): Intervals + JS + .skip_tester('C#') + .skip_tester('JS'), # TODO(ARROW-5239): Intervals + JS generate_month_day_nano_interval_case() - .skip_category('C#') - .skip_category('JS'), + .skip_tester('C#') + .skip_tester('JS'), generate_map_case() - .skip_category('C#'), + .skip_tester('C#'), generate_non_canonical_map_case() - .skip_category('C#') - .skip_category('Java'), # TODO(ARROW-8715) + .skip_tester('C#') + .skip_tester('Java') # TODO(ARROW-8715) + # Canonical map names are restored on import, so the schemas are unequal + .skip_format(SKIP_C_SCHEMA, 'C++'), generate_nested_case(), generate_recursive_nested_case(), generate_nested_large_offsets_case() - .skip_category('C#') - .skip_category('JS'), + .skip_tester('C#') + .skip_tester('JS'), generate_unions_case() - .skip_category('C#'), + .skip_tester('C#'), generate_custom_metadata_case() - .skip_category('C#'), + .skip_tester('C#'), generate_duplicate_fieldnames_case() - .skip_category('C#') - .skip_category('JS'), + .skip_tester('C#') + .skip_tester('JS'), generate_dictionary_case() - .skip_category('C#'), + .skip_tester('C#'), generate_dictionary_unsigned_case() - .skip_category('C#') - .skip_category('Java'), # TODO(ARROW-9377) + .skip_tester('C#') + .skip_tester('Java'), # TODO(ARROW-9377) generate_nested_dictionary_case() - .skip_category('C#') - .skip_category('Java'), # TODO(ARROW-7779) + .skip_tester('C#') + .skip_tester('Java'), # TODO(ARROW-7779) generate_run_end_encoded_case() - .skip_category('C#') - .skip_category('Java') - .skip_category('JS') - .skip_category('Rust'), + .skip_tester('C#') + .skip_tester('Java') + .skip_tester('JS') + .skip_tester('Rust'), generate_extension_case() - .skip_category('C#'), + .skip_tester('C#') + # TODO: ensure the extension is registered in the C++ entrypoint + .skip_format(SKIP_C_SCHEMA, 'C++') + .skip_format(SKIP_C_ARRAY, 'C++'), ] generated_paths = [] diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 0ee9ab814e5e6..88d2d1d364182 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -25,17 +25,19 @@ import sys import tempfile import traceback -from typing import Callable, List +from typing import Callable, List, Optional +from . import cdata from .scenario import Scenario -from .tester import Tester -from .tester_cpp import CPPTester +from .tester import Tester, CDataExporter, CDataImporter +from .tester_cpp import CppTester from .tester_go import GoTester from .tester_rust import RustTester from .tester_java import JavaTester from .tester_js import JSTester from .tester_csharp import CSharpTester -from .util import guid, SKIP_ARROW, SKIP_FLIGHT, printer +from .util import guid, printer +from .util import SKIP_C_ARRAY, SKIP_C_SCHEMA, SKIP_FLIGHT, SKIP_IPC from ..utils.source import ARROW_ROOT_DEFAULT from . import datagen @@ -76,7 +78,7 @@ def __init__(self, json_files, self.json_files = [json_file for json_file in self.json_files if self.match in json_file.name] - def run(self): + def run_ipc(self): """ Run Arrow IPC integration tests for the matrix of enabled implementations. @@ -84,23 +86,24 @@ def run(self): for producer, consumer in itertools.product( filter(lambda t: t.PRODUCER, self.testers), filter(lambda t: t.CONSUMER, self.testers)): - self._compare_implementations( + self._compare_ipc_implementations( producer, consumer, self._produce_consume, self.json_files) if self.gold_dirs: for gold_dir, consumer in itertools.product( self.gold_dirs, filter(lambda t: t.CONSUMER, self.testers)): - log('\n\n\n\n') + log('\n') log('******************************************************') log('Tests against golden files in {}'.format(gold_dir)) log('******************************************************') def run_gold(_, consumer, test_case: datagen.File): return self._run_gold(gold_dir, consumer, test_case) - self._compare_implementations( + self._compare_ipc_implementations( consumer, consumer, run_gold, self._gold_tests(gold_dir)) + log('\n') def run_flight(self): """ @@ -112,6 +115,18 @@ def run_flight(self): self.testers) for server, client in itertools.product(servers, clients): self._compare_flight_implementations(server, client) + log('\n') + + def run_c_data(self): + """ + Run Arrow C Data interface integration tests for the matrix of + enabled implementations. + """ + for producer, consumer in itertools.product( + filter(lambda t: t.C_DATA_EXPORTER, self.testers), + filter(lambda t: t.C_DATA_IMPORTER, self.testers)): + self._compare_c_data_implementations(producer, consumer) + log('\n') def _gold_tests(self, gold_dir): prefix = os.path.basename(os.path.normpath(gold_dir)) @@ -125,28 +140,31 @@ def _gold_tests(self, gold_dir): with open(out_path, "wb") as out: out.write(i.read()) + # Find the generated file with the same name as this gold file try: - skip = next(f for f in self.json_files - if f.name == name).skip + equiv_json_file = next(f for f in self.json_files + if f.name == name) except StopIteration: - skip = set() + equiv_json_file = None + + skip_testers = set() if name == 'union' and prefix == '0.17.1': - skip.add("Java") - skip.add("JS") + skip_testers.add("Java") + skip_testers.add("JS") if prefix == '1.0.0-bigendian' or prefix == '1.0.0-littleendian': - skip.add("C#") - skip.add("Java") - skip.add("JS") - skip.add("Rust") + skip_testers.add("C#") + skip_testers.add("Java") + skip_testers.add("JS") + skip_testers.add("Rust") if prefix == '2.0.0-compression': - skip.add("C#") - skip.add("JS") + skip_testers.add("C#") + skip_testers.add("JS") # See https://github.com/apache/arrow/pull/9822 for how to # disable specific compression type tests. if prefix == '4.0.0-shareddict': - skip.add("C#") + skip_testers.add("C#") quirks = set() if prefix in {'0.14.1', '0.17.1', @@ -157,12 +175,18 @@ def _gold_tests(self, gold_dir): quirks.add("no_date64_validate") quirks.add("no_times_validate") - yield datagen.File(name, None, None, skip=skip, path=out_path, - quirks=quirks) + json_file = datagen.File(name, schema=None, batches=None, + path=out_path, + skip_testers=skip_testers, + quirks=quirks) + if equiv_json_file is not None: + json_file.add_skips_from(equiv_json_file) + yield json_file def _run_test_cases(self, case_runner: Callable[[datagen.File], Outcome], - test_cases: List[datagen.File]) -> None: + test_cases: List[datagen.File], + *, serial: Optional[bool] = None) -> None: """ Populate self.failures with the outcomes of the ``case_runner`` ran against ``test_cases`` @@ -171,10 +195,13 @@ def case_wrapper(test_case): with printer.cork(): return case_runner(test_case) + if serial is None: + serial = self.serial + if self.failures and self.stop_on_error: return - if self.serial: + if serial: for outcome in map(case_wrapper, test_cases): if outcome.failure is not None: self.failures.append(outcome.failure) @@ -189,7 +216,7 @@ def case_wrapper(test_case): if self.stop_on_error: break - def _compare_implementations( + def _compare_ipc_implementations( self, producer: Tester, consumer: Tester, @@ -221,22 +248,17 @@ def _run_ipc_test_case( outcome = Outcome() json_path = test_case.path - log('==========================================================') + log('=' * 70) log('Testing file {0}'.format(json_path)) - log('==========================================================') - - if producer.name in test_case.skip: - log('-- Skipping test because producer {0} does ' - 'not support'.format(producer.name)) - outcome.skipped = True - elif consumer.name in test_case.skip: - log('-- Skipping test because consumer {0} does ' - 'not support'.format(consumer.name)) + if test_case.should_skip(producer.name, SKIP_IPC): + log(f'-- Skipping test because producer {producer.name} does ' + f'not support IPC') outcome.skipped = True - elif SKIP_ARROW in test_case.skip: - log('-- Skipping test') + elif test_case.should_skip(consumer.name, SKIP_IPC): + log(f'-- Skipping test because consumer {consumer.name} does ' + f'not support IPC') outcome.skipped = True else: @@ -247,6 +269,8 @@ def _run_ipc_test_case( outcome.failure = Failure(test_case, producer, consumer, sys.exc_info()) + log('=' * 70) + return outcome def _produce_consume(self, @@ -344,22 +368,17 @@ def _run_flight_test_case(self, """ outcome = Outcome() - log('=' * 58) + log('=' * 70) log('Testing file {0}'.format(test_case.name)) - log('=' * 58) - - if producer.name in test_case.skip: - log('-- Skipping test because producer {0} does ' - 'not support'.format(producer.name)) - outcome.skipped = True - elif consumer.name in test_case.skip: - log('-- Skipping test because consumer {0} does ' - 'not support'.format(consumer.name)) + if test_case.should_skip(producer.name, SKIP_FLIGHT): + log(f'-- Skipping test because producer {producer.name} does ' + f'not support Flight') outcome.skipped = True - elif SKIP_FLIGHT in test_case.skip: - log('-- Skipping test') + elif test_case.should_skip(consumer.name, SKIP_FLIGHT): + log(f'-- Skipping test because consumer {consumer.name} does ' + f'not support Flight') outcome.skipped = True else: @@ -380,6 +399,125 @@ def _run_flight_test_case(self, outcome.failure = Failure(test_case, producer, consumer, sys.exc_info()) + log('=' * 70) + + return outcome + + def _compare_c_data_implementations( + self, + producer: Tester, + consumer: Tester + ): + log('##########################################################') + log(f'C Data Interface: ' + f'{producer.name} exporting, {consumer.name} importing') + log('##########################################################') + + # Serial execution is required for proper memory accounting + serial = True + + exporter = producer.make_c_data_exporter() + importer = consumer.make_c_data_importer() + + case_runner = partial(self._run_c_schema_test_case, producer, consumer, + exporter, importer) + self._run_test_cases(case_runner, self.json_files, serial=serial) + + case_runner = partial(self._run_c_array_test_cases, producer, consumer, + exporter, importer) + self._run_test_cases(case_runner, self.json_files, serial=serial) + + def _run_c_schema_test_case(self, + producer: Tester, consumer: Tester, + exporter: CDataExporter, + importer: CDataImporter, + test_case: datagen.File) -> Outcome: + """ + Run one C ArrowSchema test case. + """ + outcome = Outcome() + + def do_run(): + json_path = test_case.path + ffi = cdata.ffi() + c_schema_ptr = ffi.new("struct ArrowSchema*") + with cdata.check_memory_released(exporter, importer): + exporter.export_schema_from_json(json_path, c_schema_ptr) + importer.import_schema_and_compare_to_json(json_path, c_schema_ptr) + + log('=' * 70) + log(f'Testing C ArrowSchema from file {test_case.name!r}') + + if test_case.should_skip(producer.name, SKIP_C_SCHEMA): + log(f'-- Skipping test because producer {producer.name} does ' + f'not support C ArrowSchema') + outcome.skipped = True + + elif test_case.should_skip(consumer.name, SKIP_C_SCHEMA): + log(f'-- Skipping test because consumer {consumer.name} does ' + f'not support C ArrowSchema') + outcome.skipped = True + + else: + try: + do_run() + except Exception: + traceback.print_exc(file=printer.stdout) + outcome.failure = Failure(test_case, producer, consumer, + sys.exc_info()) + + log('=' * 70) + + return outcome + + def _run_c_array_test_cases(self, + producer: Tester, consumer: Tester, + exporter: CDataExporter, + importer: CDataImporter, + test_case: datagen.File) -> Outcome: + """ + Run one set C ArrowArray test cases. + """ + outcome = Outcome() + + def do_run(): + json_path = test_case.path + ffi = cdata.ffi() + c_array_ptr = ffi.new("struct ArrowArray*") + for num_batch in range(test_case.num_batches): + log(f'... with record batch #{num_batch}') + with cdata.check_memory_released(exporter, importer): + exporter.export_batch_from_json(json_path, + num_batch, + c_array_ptr) + importer.import_batch_and_compare_to_json(json_path, + num_batch, + c_array_ptr) + + log('=' * 70) + log(f'Testing C ArrowArray ' + f'from file {test_case.name!r}') + + if test_case.should_skip(producer.name, SKIP_C_ARRAY): + log(f'-- Skipping test because producer {producer.name} does ' + f'not support C ArrowSchema') + outcome.skipped = True + + elif test_case.should_skip(consumer.name, SKIP_C_ARRAY): + log(f'-- Skipping test because consumer {consumer.name} does ' + f'not support C ArrowSchema') + outcome.skipped = True + + else: + try: + do_run() + except Exception: + traceback.print_exc(file=printer.stdout) + outcome.failure = Failure(test_case, producer, consumer, + sys.exc_info()) + + log('=' * 70) + return outcome @@ -387,7 +525,7 @@ def get_static_json_files(): glob_pattern = os.path.join(ARROW_ROOT_DEFAULT, 'integration', 'data', '*.json') return [ - datagen.File(name=os.path.basename(p), path=p, skip=set(), + datagen.File(name=os.path.basename(p), path=p, schema=None, batches=None) for p in glob.glob(glob_pattern) ] @@ -395,13 +533,14 @@ def get_static_json_files(): def run_all_tests(with_cpp=True, with_java=True, with_js=True, with_csharp=True, with_go=True, with_rust=False, - run_flight=False, tempdir=None, **kwargs): + run_ipc=False, run_flight=False, run_c_data=False, + tempdir=None, **kwargs): tempdir = tempdir or tempfile.mkdtemp(prefix='arrow-integration-') testers: List[Tester] = [] if with_cpp: - testers.append(CPPTester(**kwargs)) + testers.append(CppTester(**kwargs)) if with_java: testers.append(JavaTester(**kwargs)) @@ -434,54 +573,57 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True, Scenario( "ordered", description="Ensure FlightInfo.ordered is supported.", - skip={"JS", "C#", "Rust"}, + skip_testers={"JS", "C#", "Rust"}, ), Scenario( "expiration_time:do_get", description=("Ensure FlightEndpoint.expiration_time with " "DoGet is working as expected."), - skip={"JS", "C#", "Rust"}, + skip_testers={"JS", "C#", "Rust"}, ), Scenario( "expiration_time:list_actions", description=("Ensure FlightEndpoint.expiration_time related " "pre-defined actions is working with ListActions " "as expected."), - skip={"JS", "C#", "Rust"}, + skip_testers={"JS", "C#", "Rust"}, ), Scenario( "expiration_time:cancel_flight_info", description=("Ensure FlightEndpoint.expiration_time and " "CancelFlightInfo are working as expected."), - skip={"JS", "C#", "Rust"}, + skip_testers={"JS", "C#", "Rust"}, ), Scenario( "expiration_time:renew_flight_endpoint", description=("Ensure FlightEndpoint.expiration_time and " "RenewFlightEndpoint are working as expected."), - skip={"JS", "C#", "Rust"}, + skip_testers={"JS", "C#", "Rust"}, ), Scenario( "poll_flight_info", description="Ensure PollFlightInfo is supported.", - skip={"JS", "C#", "Rust"} + skip_testers={"JS", "C#", "Rust"} ), Scenario( "flight_sql", description="Ensure Flight SQL protocol is working as expected.", - skip={"Rust"} + skip_testers={"Rust"} ), Scenario( "flight_sql:extension", description="Ensure Flight SQL extensions work as expected.", - skip={"Rust"} + skip_testers={"Rust"} ), ] runner = IntegrationRunner(json_files, flight_scenarios, testers, **kwargs) - runner.run() + if run_ipc: + runner.run_ipc() if run_flight: runner.run_flight() + if run_c_data: + runner.run_c_data() fail_count = 0 if runner.failures: @@ -492,7 +634,8 @@ def run_all_tests(with_cpp=True, with_java=True, with_js=True, log(test_case.name, producer.name, "producing, ", consumer.name, "consuming") if exc_info: - traceback.print_exception(*exc_info) + exc_type, exc_value, exc_tb = exc_info + log(f'{exc_type}: {exc_value}') log() log(fail_count, "failures") diff --git a/dev/archery/archery/integration/scenario.py b/dev/archery/archery/integration/scenario.py index 1fcbca64e6a1f..89c64452e5fc5 100644 --- a/dev/archery/archery/integration/scenario.py +++ b/dev/archery/archery/integration/scenario.py @@ -23,7 +23,10 @@ class Scenario: Does not correspond to a particular IPC JSON file. """ - def __init__(self, name, description, skip=None): + def __init__(self, name, description, skip_testers=None): self.name = name self.description = description - self.skip = skip or set() + self.skipped_testers = skip_testers or set() + + def should_skip(self, tester, format): + return tester in self.skipped_testers diff --git a/dev/archery/archery/integration/tester.py b/dev/archery/archery/integration/tester.py index 54bfe621efd92..8c469c160250b 100644 --- a/dev/archery/archery/integration/tester.py +++ b/dev/archery/archery/integration/tester.py @@ -17,12 +17,180 @@ # Base class for language-specific integration test harnesses +from abc import ABC, abstractmethod +import os import subprocess +import typing from .util import log -class Tester(object): +_Predicate = typing.Callable[[], bool] + + +class CDataExporter(ABC): + + @abstractmethod + def export_schema_from_json(self, json_path: os.PathLike, + c_schema_ptr: object): + """ + Read a JSON integration file and export its schema. + + Parameters + ---------- + json_path : Path + Path to the JSON file + c_schema_ptr : cffi pointer value + Pointer to the ``ArrowSchema`` struct to export to. + """ + + @abstractmethod + def export_batch_from_json(self, json_path: os.PathLike, + num_batch: int, + c_array_ptr: object): + """ + Read a JSON integration file and export one of its batches. + + Parameters + ---------- + json_path : Path + Path to the JSON file + num_batch : int + Number of the record batch in the JSON file + c_schema_ptr : cffi pointer value + Pointer to the ``ArrowArray`` struct to export to. + """ + + @property + @abstractmethod + def supports_releasing_memory(self) -> bool: + """ + Whether the implementation is able to release memory deterministically. + + Here, "release memory" means that, after the `release` callback of + a C Data Interface export is called, `compare_allocation_state` is + able to trigger the deallocation of the memory underlying the export + (for example buffer data). + + If false, then `record_allocation_state` and `compare_allocation_state` + are allowed to raise NotImplementedError. + """ + + def record_allocation_state(self) -> object: + """ + Record the current memory allocation state. + + Returns + ------- + state : object + Opaque object representing the allocation state, + for example the number of allocated bytes. + """ + raise NotImplementedError + + def compare_allocation_state(self, recorded: object, + gc_until: typing.Callable[[_Predicate], bool] + ) -> bool: + """ + Compare the current memory allocation state with the recorded one. + + Parameters + ---------- + recorded : object + The previous allocation state returned by + `record_allocation_state()` + gc_until : callable + A callable itself accepting a callable predicate, and + returning a boolean. + `gc_until` should try to release memory until the predicate + becomes true, or until it decides to give up. The final value + of the predicate should be returned. + `gc_until` is typically provided by the C Data Interface importer. + + Returns + ------- + success : bool + Whether memory allocation state finally reached its previously + recorded value. + """ + raise NotImplementedError + + +class CDataImporter(ABC): + + @abstractmethod + def import_schema_and_compare_to_json(self, json_path: os.PathLike, + c_schema_ptr: object): + """ + Import schema and compare it to the schema of a JSON integration file. + + An error is raised if importing fails or the schemas differ. + + Parameters + ---------- + json_path : Path + The path to the JSON file + c_schema_ptr : cffi pointer value + Pointer to the ``ArrowSchema`` struct to import from. + """ + + @abstractmethod + def import_batch_and_compare_to_json(self, json_path: os.PathLike, + num_batch: int, + c_array_ptr: object): + """ + Import record batch and compare it to one of the batches + from a JSON integration file. + + The schema used for importing the record batch is the one from + the JSON file. + + An error is raised if importing fails or the batches differ. + + Parameters + ---------- + json_path : Path + The path to the JSON file + num_batch : int + Number of the record batch in the JSON file + c_array_ptr : cffi pointer value + Pointer to the ``ArrowArray`` struct to import from. + """ + + @property + @abstractmethod + def supports_releasing_memory(self) -> bool: + """ + Whether the implementation is able to release memory deterministically. + + Here, "release memory" means calling the `release` callback of + a C Data Interface export (which should then trigger a deallocation + mechanism on the exporter). + + If false, then `gc_until` is allowed to raise NotImplementedError. + """ + + def gc_until(self, predicate: _Predicate): + """ + Try to release memory until the predicate becomes true, or fail. + + Depending on the CDataImporter implementation, this may for example + try once, or run a garbage collector a given number of times, or + any other implementation-specific strategy for releasing memory. + + The running time should be kept reasonable and compatible with + execution of multiple C Data integration tests. + + This should not raise if `supports_releasing_memory` is true. + + Returns + ------- + success : bool + The final value of the predicate. + """ + + +class Tester: """ The interface to declare a tester to run integration tests against. """ @@ -34,8 +202,12 @@ class Tester(object): FLIGHT_SERVER = False # whether the language supports receiving Flight FLIGHT_CLIENT = False + # whether the language supports the C Data Interface as an exporter + C_DATA_EXPORTER = False + # whether the language supports the C Data Interface as an importer + C_DATA_IMPORTER = False - # the name shown in the logs + # the name used for skipping and shown in the logs name = "unknown" def __init__(self, debug=False, **args): @@ -85,3 +257,9 @@ def flight_server(self, scenario_name=None): def flight_request(self, port, json_path=None, scenario_name=None): raise NotImplementedError + + def make_c_data_exporter(self) -> CDataExporter: + raise NotImplementedError + + def make_c_data_importer(self) -> CDataImporter: + raise NotImplementedError diff --git a/dev/archery/archery/integration/tester_cpp.py b/dev/archery/archery/integration/tester_cpp.py index 52cc565dc00a3..9ddc3c480002a 100644 --- a/dev/archery/archery/integration/tester_cpp.py +++ b/dev/archery/archery/integration/tester_cpp.py @@ -16,10 +16,12 @@ # under the License. import contextlib +import functools import os import subprocess -from .tester import Tester +from . import cdata +from .tester import Tester, CDataExporter, CDataImporter from .util import run_cmd, log from ..utils.source import ARROW_ROOT_DEFAULT @@ -39,12 +41,19 @@ "localhost", ] +_dll_suffix = ".dll" if os.name == "nt" else ".so" -class CPPTester(Tester): +_DLL_PATH = _EXE_PATH +_ARROW_DLL = os.path.join(_DLL_PATH, "libarrow" + _dll_suffix) + + +class CppTester(Tester): PRODUCER = True CONSUMER = True FLIGHT_SERVER = True FLIGHT_CLIENT = True + C_DATA_EXPORTER = True + C_DATA_IMPORTER = True name = 'C++' @@ -133,3 +142,104 @@ def flight_request(self, port, json_path=None, scenario_name=None): if self.debug: log(' '.join(cmd)) run_cmd(cmd) + + def make_c_data_exporter(self): + return CppCDataExporter(self.debug, self.args) + + def make_c_data_importer(self): + return CppCDataImporter(self.debug, self.args) + + +_cpp_c_data_entrypoints = """ + const char* ArrowCpp_CDataIntegration_ExportSchemaFromJson( + const char* json_path, struct ArrowSchema* out); + const char* ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson( + const char* json_path, struct ArrowSchema* schema); + + const char* ArrowCpp_CDataIntegration_ExportBatchFromJson( + const char* json_path, int num_batch, struct ArrowArray* out); + const char* ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson( + const char* json_path, int num_batch, struct ArrowArray* batch); + + int64_t ArrowCpp_BytesAllocated(); + """ + + +@functools.lru_cache +def _load_ffi(ffi, lib_path=_ARROW_DLL): + ffi.cdef(_cpp_c_data_entrypoints) + dll = ffi.dlopen(lib_path) + dll.ArrowCpp_CDataIntegration_ExportSchemaFromJson + return dll + + +class _CDataBase: + + def __init__(self, debug, args): + self.debug = debug + self.args = args + self.ffi = cdata.ffi() + self.dll = _load_ffi(self.ffi) + + def _check_c_error(self, c_error): + """ + Check a `const char*` error return from an integration entrypoint. + + A null means success, a non-empty string is an error message. + The string is statically allocated on the C++ side. + """ + assert self.ffi.typeof(c_error) is self.ffi.typeof("const char*") + if c_error != self.ffi.NULL: + error = self.ffi.string(c_error).decode('utf8', + errors='replace') + raise RuntimeError( + f"C++ C Data Integration call failed: {error}") + + +class CppCDataExporter(CDataExporter, _CDataBase): + + def export_schema_from_json(self, json_path, c_schema_ptr): + c_error = self.dll.ArrowCpp_CDataIntegration_ExportSchemaFromJson( + str(json_path).encode(), c_schema_ptr) + self._check_c_error(c_error) + + def export_batch_from_json(self, json_path, num_batch, c_array_ptr): + c_error = self.dll.ArrowCpp_CDataIntegration_ExportBatchFromJson( + str(json_path).encode(), num_batch, c_array_ptr) + self._check_c_error(c_error) + + @property + def supports_releasing_memory(self): + return True + + def record_allocation_state(self): + return self.dll.ArrowCpp_BytesAllocated() + + def compare_allocation_state(self, recorded, gc_until): + def pred(): + # No GC on our side, so just compare allocation state + return self.record_allocation_state() == recorded + + return gc_until(pred) + + +class CppCDataImporter(CDataImporter, _CDataBase): + + def import_schema_and_compare_to_json(self, json_path, c_schema_ptr): + c_error = self.dll.ArrowCpp_CDataIntegration_ImportSchemaAndCompareToJson( + str(json_path).encode(), c_schema_ptr) + self._check_c_error(c_error) + + def import_batch_and_compare_to_json(self, json_path, num_batch, + c_array_ptr): + c_error = self.dll.ArrowCpp_CDataIntegration_ImportBatchAndCompareToJson( + str(json_path).encode(), num_batch, c_array_ptr) + self._check_c_error(c_error) + + @property + def supports_releasing_memory(self): + return True + + def gc_until(self, predicate): + # No GC on our side, so can evaluate predicate immediately + return predicate() diff --git a/dev/archery/archery/integration/util.py b/dev/archery/archery/integration/util.py index 80ba30052e4da..afef7d5eb13b9 100644 --- a/dev/archery/archery/integration/util.py +++ b/dev/archery/archery/integration/util.py @@ -32,8 +32,10 @@ def guid(): # SKIP categories -SKIP_ARROW = 'arrow' +SKIP_C_ARRAY = 'c_array' +SKIP_C_SCHEMA = 'c_schema' SKIP_FLIGHT = 'flight' +SKIP_IPC = 'ipc' class _Printer: diff --git a/dev/archery/setup.py b/dev/archery/setup.py index 627e576fb6f59..08e41225f673a 100755 --- a/dev/archery/setup.py +++ b/dev/archery/setup.py @@ -28,16 +28,17 @@ jinja_req = 'jinja2>=2.11' extras = { - 'lint': ['numpydoc==1.1.0', 'autopep8', 'flake8==6.1.0', 'cython-lint', - 'cmake_format==0.6.13'], 'benchmark': ['pandas'], - 'docker': ['ruamel.yaml', 'python-dotenv'], - 'release': ['pygithub', jinja_req, 'jira', 'semver', 'gitpython'], 'crossbow': ['github3.py', jinja_req, 'pygit2>=1.6.0', 'requests', 'ruamel.yaml', 'setuptools_scm'], 'crossbow-upload': ['github3.py', jinja_req, 'ruamel.yaml', 'setuptools_scm'], - 'numpydoc': ['numpydoc==1.1.0'] + 'docker': ['ruamel.yaml', 'python-dotenv'], + 'integration': ['cffi'], + 'lint': ['numpydoc==1.1.0', 'autopep8', 'flake8==6.1.0', 'cython-lint', + 'cmake_format==0.6.13'], + 'numpydoc': ['numpydoc==1.1.0'], + 'release': ['pygithub', jinja_req, 'jira', 'semver', 'gitpython'], } extras['bot'] = extras['crossbow'] + ['pygithub', 'jira'] extras['all'] = list(set(functools.reduce(operator.add, extras.values())))