diff --git a/.asf.yaml b/.asf.yaml index ba325c2abf231..f3a8ed9fee90f 100644 --- a/.asf.yaml +++ b/.asf.yaml @@ -23,7 +23,6 @@ github: - benibus - jbonofre - js8544 - - laurentgo - vibhatha - ZhangHuiGui diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml index 5ccefa32725f3..7dfe987d2eaff 100644 --- a/.github/workflows/cpp.yml +++ b/.github/workflows/cpp.yml @@ -69,65 +69,37 @@ env: DOCKER_VOLUME_PREFIX: ".docker/" jobs: - docker-targets: - name: Docker targets - runs-on: ubuntu-latest - outputs: - targets: ${{ steps.detect-targets.outputs.targets }} - steps: - - name: Detect targets - id: detect-targets - run: | - echo "targets<> "$GITHUB_OUTPUT" - echo "[" >> "$GITHUB_OUTPUT" - cat <> "$GITHUB_OUTPUT" - { - "arch": "amd64", - "clang-tools": "14", - "image": "conda-cpp", - "llvm": "14", - "runs-on": "ubuntu-latest", - "simd-level": "AVX2", - "title": "AMD64 Conda C++ AVX2", - "ubuntu": "22.04" - }, - { - "arch": "amd64", - "clang-tools": "14", - "image": "ubuntu-cpp-sanitizer", - "llvm": "14", - "runs-on": "ubuntu-latest", - "title": "AMD64 Ubuntu 22.04 C++ ASAN UBSAN", - "ubuntu": "22.04" - } - JSON - if [ "$GITHUB_REPOSITORY_OWNER" = "apache" ]; then - echo "," >> "$GITHUB_OUTPUT" - cat <> "$GITHUB_OUTPUT" - { - "arch": "arm64v8", - "clang-tools": "10", - "image": "ubuntu-cpp", - "llvm": "10", - "runs-on": ["self-hosted", "arm", "linux"], - "title": "ARM64 Ubuntu 20.04 C++", - "ubuntu": "20.04" - } - JSON - fi - echo "]" >> "$GITHUB_OUTPUT" - echo "JSON" >> "$GITHUB_OUTPUT" - docker: name: ${{ matrix.title }} - needs: docker-targets runs-on: ${{ matrix.runs-on }} if: ${{ !contains(github.event.pull_request.title, 'WIP') }} timeout-minutes: 75 strategy: fail-fast: false matrix: - include: ${{ fromJson(needs.docker-targets.outputs.targets) }} + include: + - arch: amd64 + clang-tools: 14 + image: conda-cpp + llvm: 14 + runs-on: ubuntu-latest + simd-level: AVX2 + title: AMD64 Conda C++ AVX2 + ubuntu: 22.04 + - arch: amd64 + clang-tools: 14 + image: ubuntu-cpp-sanitizer + llvm: 14 + runs-on: ubuntu-latest + title: AMD64 Ubuntu 22.04 C++ ASAN UBSAN + ubuntu: 22.04 + - arch: arm64v8 + clang-tools: 10 + image: ubuntu-cpp + llvm: 10 + runs-on: ubuntu-24.04-arm + title: ARM64 Ubuntu 20.04 C++ + ubuntu: 20.04 env: ARCH: ${{ matrix.arch }} ARROW_SIMD_LEVEL: ${{ matrix.simd-level }} diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml index d59da447612a6..f9718cbf7bb18 100644 --- a/.github/workflows/dev.yml +++ b/.github/workflows/dev.yml @@ -124,7 +124,7 @@ jobs: shell: bash run: | gem install test-unit - pip install "cython>=0.29.31" setuptools pytest requests setuptools-scm + pip install "cython>=3" setuptools pytest requests setuptools-scm - name: Run Release Test env: ARROW_GITHUB_API_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/pr_review_trigger.yml b/.github/workflows/pr_review_trigger.yml index 2c840e95c8db6..a6dd5f1275331 100644 --- a/.github/workflows/pr_review_trigger.yml +++ b/.github/workflows/pr_review_trigger.yml @@ -29,7 +29,7 @@ jobs: runs-on: ubuntu-latest steps: - name: "Upload PR review Payload" - uses: actions/upload-artifact@6f51ac03b9356f520e9adb1b1b7802705f340c2b # v4.5.0 + uses: actions/upload-artifact@65c4c4a1ddee5b72f698fdd19549f0f0fb45cf08 # v4.6.0 with: path: "${{ github.event_path }}" name: "pr_review_payload" diff --git a/.github/workflows/r.yml b/.github/workflows/r.yml index bc7db519b64f7..cb000f8b95c1b 100644 --- a/.github/workflows/r.yml +++ b/.github/workflows/r.yml @@ -177,7 +177,7 @@ jobs: if: always() - name: Save the test output if: always() - uses: actions/upload-artifact@6f51ac03b9356f520e9adb1b1b7802705f340c2b # v4.5.0 + uses: actions/upload-artifact@65c4c4a1ddee5b72f698fdd19549f0f0fb45cf08 # v4.6.0 with: name: test-output-${{ matrix.ubuntu }}-${{ matrix.r }} path: r/check/arrow.Rcheck/tests/testthat.Rout* @@ -237,7 +237,7 @@ jobs: if: always() - name: Save the test output if: always() - uses: actions/upload-artifact@6f51ac03b9356f520e9adb1b1b7802705f340c2b # v4.5.0 + uses: actions/upload-artifact@65c4c4a1ddee5b72f698fdd19549f0f0fb45cf08 # v4.6.0 with: name: test-output-bundled path: r/check/arrow.Rcheck/tests/testthat.Rout* @@ -299,7 +299,7 @@ jobs: # So that they're unique when multiple are downloaded in the next step shell: bash run: mv libarrow.zip libarrow-rtools${{ matrix.config.rtools }}-${{ matrix.config.arch }}.zip - - uses: actions/upload-artifact@6f51ac03b9356f520e9adb1b1b7802705f340c2b # v4.5.0 + - uses: actions/upload-artifact@65c4c4a1ddee5b72f698fdd19549f0f0fb45cf08 # v4.6.0 with: name: libarrow-rtools${{ matrix.config.rtools }}-${{ matrix.config.arch }}.zip path: libarrow-rtools${{ matrix.config.rtools }}-${{ matrix.config.arch }}.zip diff --git a/c_glib/meson.build b/c_glib/meson.build index bd7843d8bc362..017765cd14626 100644 --- a/c_glib/meson.build +++ b/c_glib/meson.build @@ -35,7 +35,7 @@ project('arrow-glib', 'c', 'cpp', # * 22.04: 0.61.2 meson_version: '>=0.53.2') -version = '19.0.0-SNAPSHOT' +version = '20.0.0-SNAPSHOT' if version.endswith('-SNAPSHOT') version_numbers = version.split('-')[0].split('.') version_tag = version.split('-')[1] diff --git a/c_glib/parquet-glib/arrow-file-writer.cpp b/c_glib/parquet-glib/arrow-file-writer.cpp index 2b8e2bdeac026..738fb4fd824c8 100644 --- a/c_glib/parquet-glib/arrow-file-writer.cpp +++ b/c_glib/parquet-glib/arrow-file-writer.cpp @@ -574,7 +574,6 @@ gparquet_arrow_file_writer_write_table(GParquetArrowFileWriter *writer, /** * gparquet_arrow_file_writer_new_row_group: * @writer: A #GParquetArrowFileWriter. - * @chunk_size: The max number of rows in a row group. * @error: (nullable): Return location for a #GError or %NULL. * * Start a new row group. @@ -584,13 +583,11 @@ gparquet_arrow_file_writer_write_table(GParquetArrowFileWriter *writer, * Since: 18.0.0 */ gboolean -gparquet_arrow_file_writer_new_row_group(GParquetArrowFileWriter *writer, - gsize chunk_size, - GError **error) +gparquet_arrow_file_writer_new_row_group(GParquetArrowFileWriter *writer, GError **error) { auto parquet_arrow_file_writer = gparquet_arrow_file_writer_get_raw(writer); return garrow::check(error, - parquet_arrow_file_writer->NewRowGroup(chunk_size), + parquet_arrow_file_writer->NewRowGroup(), "[parquet][arrow][file-writer][new-row-group]"); } diff --git a/c_glib/parquet-glib/arrow-file-writer.h b/c_glib/parquet-glib/arrow-file-writer.h index 2c82f7c1f87de..4986430c951d0 100644 --- a/c_glib/parquet-glib/arrow-file-writer.h +++ b/c_glib/parquet-glib/arrow-file-writer.h @@ -135,9 +135,7 @@ gparquet_arrow_file_writer_write_table(GParquetArrowFileWriter *writer, GPARQUET_AVAILABLE_IN_18_0 gboolean -gparquet_arrow_file_writer_new_row_group(GParquetArrowFileWriter *writer, - gsize chunk_size, - GError **error); +gparquet_arrow_file_writer_new_row_group(GParquetArrowFileWriter *writer, GError **error); GPARQUET_AVAILABLE_IN_18_0 gboolean diff --git a/c_glib/test/parquet/test-arrow-file-writer.rb b/c_glib/test/parquet/test-arrow-file-writer.rb index d8344bf1c50b0..418de4782d0b0 100644 --- a/c_glib/test/parquet/test-arrow-file-writer.rb +++ b/c_glib/test/parquet/test-arrow-file-writer.rb @@ -89,10 +89,10 @@ def test_write_table def test_write_chunked_array schema = build_schema("enabled" => :boolean) writer = Parquet::ArrowFileWriter.new(schema, @file.path) - writer.new_row_group(2) + writer.new_row_group chunked_array = Arrow::ChunkedArray.new([build_boolean_array([true, nil])]) writer.write_chunked_array(chunked_array) - writer.new_row_group(1) + writer.new_row_group chunked_array = Arrow::ChunkedArray.new([build_boolean_array([false])]) writer.write_chunked_array(chunked_array) writer.close diff --git a/c_glib/tool/generate-version-header.py b/c_glib/tool/generate-version-header.py index 4995ce570aeb0..6a8976204c05a 100755 --- a/c_glib/tool/generate-version-header.py +++ b/c_glib/tool/generate-version-header.py @@ -140,6 +140,7 @@ def generate_availability_macros(library: str) -> str: ALL_VERSIONS = [ + (20, 0), (19, 0), (18, 0), (17, 0), diff --git a/c_glib/vcpkg.json b/c_glib/vcpkg.json index f2717f7e27cf2..5873fd9f28ec2 100644 --- a/c_glib/vcpkg.json +++ b/c_glib/vcpkg.json @@ -1,6 +1,6 @@ { "name": "arrow-glib", - "version-string": "19.0.0-SNAPSHOT", + "version-string": "20.0.0-SNAPSHOT", "dependencies": [ "glib", "gobject-introspection", diff --git a/ci/appveyor-cpp-setup.bat b/ci/appveyor-cpp-setup.bat index 912b130acff45..ff159bd0b4b59 100644 --- a/ci/appveyor-cpp-setup.bat +++ b/ci/appveyor-cpp-setup.bat @@ -70,7 +70,6 @@ conda create -n arrow ^ "ninja" ^ "nomkl" ^ "pandas" ^ - "fsspec" ^ "python=%PYTHON%" ^ || exit /B conda list -n arrow @@ -86,7 +85,7 @@ set CXX=cl.exe @rem Download Minio somewhere on PATH, for unit tests @rem if "%ARROW_S3%" == "ON" ( - appveyor DownloadFile https://dl.min.io/server/minio/release/windows-amd64/archive/minio.RELEASE.2024-09-13T20-26-02Z -FileName C:\Windows\Minio.exe || exit /B + appveyor DownloadFile https://dl.min.io/server/minio/release/windows-amd64/archive/minio.RELEASE.2025-01-20T14-49-07Z -FileName C:\Windows\Minio.exe || exit /B ) @rem diff --git a/ci/conda_env_python.txt b/ci/conda_env_python.txt index bf915493de302..9a48f26b79c6e 100644 --- a/ci/conda_env_python.txt +++ b/ci/conda_env_python.txt @@ -15,10 +15,12 @@ # specific language governing permissions and limitations # under the License. -# don't add pandas here, because it is not a mandatory test dependency -boto3 # not a direct dependency of s3fs, but needed for our s3fs fixture +# Don't add pandas here, because it is not a mandatory test dependency + +# Not a direct dependency of s3fs, but needed for our s3fs fixture +boto3 cffi -cython>=0.29.31 +cython>=3 cloudpickle fsspec hypothesis diff --git a/ci/conda_env_sphinx.txt b/ci/conda_env_sphinx.txt index 4665a32e24bbe..751df9b2f3c01 100644 --- a/ci/conda_env_sphinx.txt +++ b/ci/conda_env_sphinx.txt @@ -30,9 +30,5 @@ sphinx-lint sphinxcontrib-jquery sphinxcontrib-mermaid sphinx==6.2 -# Requirement for doctest-cython -# Needs upper pin of 0.3.0, see: -# https://github.com/lgpage/pytest-cython/issues/67 -# With 0.3.* bug fix release, the pin can be removed -pytest-cython==0.2.2 +pytest-cython pandas diff --git a/ci/docker/conda-python-cython2.dockerfile b/ci/docker/conda-python-cython2.dockerfile deleted file mode 100644 index 859ad868b0c71..0000000000000 --- a/ci/docker/conda-python-cython2.dockerfile +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -ARG repo -ARG arch -ARG python=3.9 -FROM ${repo}:${arch}-conda-python-${python} - -RUN mamba install -q -y "cython<3" && \ - mamba clean --all diff --git a/ci/docker/conda.dockerfile b/ci/docker/conda.dockerfile index fbd81903b0a3a..0d48fb3ef83d0 100644 --- a/ci/docker/conda.dockerfile +++ b/ci/docker/conda.dockerfile @@ -21,9 +21,15 @@ FROM ${arch}/ubuntu:22.04 # install build essentials RUN export DEBIAN_FRONTEND=noninteractive && \ apt-get update -y -q && \ - apt-get install -y -q curl wget tzdata libc6-dbg gdb \ - && apt-get clean \ - && rm -rf /var/lib/apt/lists/* + apt-get install -y -q \ + curl \ + gdb \ + libc6-dbg \ + patch \ + tzdata \ + wget && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* # install conda and mamba via miniforge COPY ci/scripts/install_conda.sh /arrow/ci/scripts/ diff --git a/ci/docker/debian-12-cpp.dockerfile b/ci/docker/debian-12-cpp.dockerfile index f486d07ff8894..fe3976248cc86 100644 --- a/ci/docker/debian-12-cpp.dockerfile +++ b/ci/docker/debian-12-cpp.dockerfile @@ -84,6 +84,7 @@ RUN apt-get update -y -q && \ ninja-build \ nlohmann-json3-dev \ npm \ + patch \ pkg-config \ protobuf-compiler-grpc \ python3-dev \ diff --git a/ci/docker/ubuntu-20.04-cpp.dockerfile b/ci/docker/ubuntu-20.04-cpp.dockerfile index 8dc778d544a6d..259c5fb77fa41 100644 --- a/ci/docker/ubuntu-20.04-cpp.dockerfile +++ b/ci/docker/ubuntu-20.04-cpp.dockerfile @@ -106,6 +106,7 @@ RUN apt-get update -y -q && \ ninja-build \ nlohmann-json3-dev \ npm \ + patch \ pkg-config \ protobuf-compiler \ python3-dev \ diff --git a/ci/docker/ubuntu-22.04-cpp.dockerfile b/ci/docker/ubuntu-22.04-cpp.dockerfile index 28cef2946385c..721b37dcae842 100644 --- a/ci/docker/ubuntu-22.04-cpp.dockerfile +++ b/ci/docker/ubuntu-22.04-cpp.dockerfile @@ -111,6 +111,7 @@ RUN apt-get update -y -q && \ ninja-build \ nlohmann-json3-dev \ npm \ + patch \ pkg-config \ protobuf-compiler \ protobuf-compiler-grpc \ diff --git a/ci/docker/ubuntu-24.04-cpp.dockerfile b/ci/docker/ubuntu-24.04-cpp.dockerfile index 3f486b09f95ff..592a9a6a232e5 100644 --- a/ci/docker/ubuntu-24.04-cpp.dockerfile +++ b/ci/docker/ubuntu-24.04-cpp.dockerfile @@ -111,6 +111,7 @@ RUN apt-get update -y -q && \ ninja-build \ nlohmann-json3-dev \ npm \ + patch \ pkg-config \ protobuf-compiler \ protobuf-compiler-grpc \ diff --git a/ci/scripts/PKGBUILD b/ci/scripts/PKGBUILD index 4c567d550b92a..efeed954006c1 100644 --- a/ci/scripts/PKGBUILD +++ b/ci/scripts/PKGBUILD @@ -18,7 +18,7 @@ _realname=arrow pkgbase=mingw-w64-${_realname} pkgname="${MINGW_PACKAGE_PREFIX}-${_realname}" -pkgver=18.1.0.9000 +pkgver=19.0.0.9000 pkgrel=8000 pkgdesc="Apache Arrow is a cross-language development platform for in-memory data (mingw-w64)" arch=("any") diff --git a/ci/scripts/install_minio.sh b/ci/scripts/install_minio.sh index 6f9701ab5a150..8685ced0bd1ab 100755 --- a/ci/scripts/install_minio.sh +++ b/ci/scripts/install_minio.sh @@ -63,7 +63,7 @@ if [ "${version}" != "latest" ]; then fi # Use specific versions for minio server and client to avoid CI failures on new releases. -minio_version="minio.RELEASE.2024-09-13T20-26-02Z" +minio_version="minio.RELEASE.2025-01-20T14-49-07Z" mc_version="mc.RELEASE.2024-09-16T17-43-14Z" download() diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 103e0f08445d9..a7d80c2e96c23 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -79,7 +79,7 @@ if(POLICY CMP0170) cmake_policy(SET CMP0170 NEW) endif() -set(ARROW_VERSION "19.0.0-SNAPSHOT") +set(ARROW_VERSION "20.0.0-SNAPSHOT") string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}") diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index abfe6d274f7b8..f9459f4175c83 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -4573,11 +4573,16 @@ target_include_directories(arrow::hadoop INTERFACE "${HADOOP_HOME}/include") function(build_orc) message(STATUS "Building Apache ORC from source") + # Remove this and "patch" in "ci/docker/{debian,ubuntu}-*.dockerfile" once we have a patch for ORC 2.1.1 + find_program(PATCH patch REQUIRED) + set(ORC_PATCH_COMMAND ${PATCH} -p1 -i ${CMAKE_CURRENT_LIST_DIR}/orc.diff) + if(CMAKE_VERSION VERSION_GREATER_EQUAL 3.29) fetchcontent_declare(orc ${FC_DECLARE_COMMON_OPTIONS} URL ${ORC_SOURCE_URL} - URL_HASH "SHA256=${ARROW_ORC_BUILD_SHA256_CHECKSUM}") + URL_HASH "SHA256=${ARROW_ORC_BUILD_SHA256_CHECKSUM}" + PATCH_COMMAND ${ORC_PATCH_COMMAND}) prepare_fetchcontent() set(CMAKE_UNITY_BUILD FALSE) @@ -4667,16 +4672,10 @@ function(build_orc) OFF CACHE BOOL "" FORCE) - # We can remove this with ORC 2.0.2 or later. - list(PREPEND CMAKE_MODULE_PATH - ${CMAKE_CURRENT_BINARY_DIR}/_deps/orc-src/cmake_modules) - fetchcontent_makeavailable(orc) add_library(orc::orc INTERFACE IMPORTED) target_link_libraries(orc::orc INTERFACE orc) - target_include_directories(orc::orc INTERFACE "${orc_BINARY_DIR}/c++/include" - "${orc_SOURCE_DIR}/c++/include") list(APPEND ARROW_BUNDLED_STATIC_LIBS orc) else() @@ -4701,6 +4700,9 @@ function(build_orc) get_target_property(ORC_ZSTD_ROOT ${ARROW_ZSTD_LIBZSTD} INTERFACE_INCLUDE_DIRECTORIES) get_filename_component(ORC_ZSTD_ROOT "${ORC_ZSTD_ROOT}" DIRECTORY) + get_target_property(ORC_ZLIB_ROOT ZLIB::ZLIB INTERFACE_INCLUDE_DIRECTORIES) + get_filename_component(ORC_ZLIB_ROOT "${ORC_ZLIB_ROOT}" DIRECTORY) + set(ORC_CMAKE_ARGS ${EP_COMMON_CMAKE_ARGS} "-DCMAKE_INSTALL_PREFIX=${ORC_PREFIX}" @@ -4710,7 +4712,6 @@ function(build_orc) -DBUILD_TOOLS=OFF -DBUILD_CPP_TESTS=OFF -DINSTALL_VENDORED_LIBS=OFF - "-DLZ4_HOME=${ORC_LZ4_ROOT}" "-DPROTOBUF_EXECUTABLE=$" "-DPROTOBUF_HOME=${ORC_PROTOBUF_ROOT}" "-DPROTOBUF_INCLUDE_DIR=$" @@ -4718,16 +4719,17 @@ function(build_orc) "-DPROTOC_LIBRARY=$" "-DSNAPPY_HOME=${ORC_SNAPPY_ROOT}" "-DSNAPPY_LIBRARY=$" + "-DLZ4_HOME=${ORC_LZ4_ROOT}" "-DLZ4_LIBRARY=$" "-DLZ4_STATIC_LIB=$" "-DLZ4_INCLUDE_DIR=${ORC_LZ4_ROOT}/include" "-DSNAPPY_INCLUDE_DIR=${ORC_SNAPPY_INCLUDE_DIR}" "-DZSTD_HOME=${ORC_ZSTD_ROOT}" "-DZSTD_INCLUDE_DIR=$" - "-DZSTD_LIBRARY=$") - if(ZLIB_ROOT) - set(ORC_CMAKE_ARGS ${ORC_CMAKE_ARGS} "-DZLIB_HOME=${ZLIB_ROOT}") - endif() + "-DZSTD_LIBRARY=$" + "-DZLIB_HOME=${ORC_ZLIB_ROOT}" + "-DZLIB_INCLUDE_DIR=$" + "-DZLIB_LIBRARY=$") # Work around CMake bug file(MAKE_DIRECTORY ${ORC_INCLUDE_DIR}) @@ -4743,7 +4745,8 @@ function(build_orc) ${ARROW_ZSTD_LIBZSTD} ${Snappy_TARGET} LZ4::lz4 - ZLIB::ZLIB) + ZLIB::ZLIB + PATCH_COMMAND ${ORC_PATCH_COMMAND}) add_library(orc::orc STATIC IMPORTED) set_target_properties(orc::orc PROPERTIES IMPORTED_LOCATION "${ORC_STATIC_LIB}") target_include_directories(orc::orc BEFORE INTERFACE "${ORC_INCLUDE_DIR}") diff --git a/cpp/cmake_modules/orc.diff b/cpp/cmake_modules/orc.diff new file mode 100644 index 0000000000000..7bdbfa1cf5d33 --- /dev/null +++ b/cpp/cmake_modules/orc.diff @@ -0,0 +1,289 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +diff --git a/CMakeLists.txt b/CMakeLists.txt +index 1f8931508..f8e57bf5f 100644 +--- a/CMakeLists.txt ++++ b/CMakeLists.txt +@@ -30,8 +30,8 @@ SET(CPACK_PACKAGE_VERSION_MAJOR "2") + SET(CPACK_PACKAGE_VERSION_MINOR "1") + SET(CPACK_PACKAGE_VERSION_PATCH "0") + SET(ORC_VERSION "${CPACK_PACKAGE_VERSION_MAJOR}.${CPACK_PACKAGE_VERSION_MINOR}.${CPACK_PACKAGE_VERSION_PATCH}") +-set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_CURRENT_SOURCE_DIR}/cmake_modules") + set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # For clang-tidy. ++list(PREPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake_modules") + + option (BUILD_JAVA + "Include ORC Java library in the build process" +@@ -225,5 +225,3 @@ if (BUILD_CPP_TESTS) + ) + endif () + endif () +- +-INCLUDE(CheckFormat) +diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt +index 694667c06..af13a94aa 100644 +--- a/c++/src/CMakeLists.txt ++++ b/c++/src/CMakeLists.txt +@@ -218,8 +218,8 @@ target_include_directories (orc + INTERFACE + $ + PUBLIC +- $ +- $ ++ $ ++ $ + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR} + ${CMAKE_CURRENT_BINARY_DIR} +diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake +index 017e6c5b8..fe376ed16 100644 +--- a/cmake_modules/ThirdpartyToolchain.cmake ++++ b/cmake_modules/ThirdpartyToolchain.cmake +@@ -103,13 +103,13 @@ endif () + + # ---------------------------------------------------------------------- + # Macros for adding third-party libraries +-macro (add_resolved_library target_name link_lib include_dir) +- add_library (${target_name} INTERFACE IMPORTED) ++macro (orc_add_resolved_library target_name link_lib include_dir) ++ add_library (${target_name} INTERFACE IMPORTED GLOBAL) + target_link_libraries (${target_name} INTERFACE ${link_lib}) + target_include_directories (${target_name} SYSTEM INTERFACE ${include_dir}) + endmacro () + +-macro (add_built_library external_project_name target_name link_lib include_dir) ++macro (orc_add_built_library external_project_name target_name link_lib include_dir) + file (MAKE_DIRECTORY "${include_dir}") + + add_library (${target_name} STATIC IMPORTED) +@@ -122,7 +122,7 @@ macro (add_built_library external_project_name target_name link_lib include_dir) + endif () + endmacro () + +-function(provide_cmake_module MODULE_NAME) ++function(orc_provide_cmake_module MODULE_NAME) + set(module "${CMAKE_SOURCE_DIR}/cmake_modules/${MODULE_NAME}.cmake") + if(EXISTS "${module}") + message(STATUS "Providing CMake module for ${MODULE_NAME} as part of CMake package") +@@ -130,8 +130,8 @@ function(provide_cmake_module MODULE_NAME) + endif() + endfunction() + +-function(provide_find_module PACKAGE_NAME) +- provide_cmake_module("Find${PACKAGE_NAME}") ++function(orc_provide_find_module PACKAGE_NAME) ++ orc_provide_cmake_module("Find${PACKAGE_NAME}") + endfunction() + + # ---------------------------------------------------------------------- +@@ -156,7 +156,7 @@ ExternalProject_Add (orc-format_ep + # Snappy + if (ORC_PACKAGE_KIND STREQUAL "conan") + find_package (Snappy REQUIRED CONFIG) +- add_resolved_library (orc_snappy ${Snappy_LIBRARIES} ${Snappy_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_snappy ${Snappy_LIBRARIES} ${Snappy_INCLUDE_DIR}) + list (APPEND ORC_SYSTEM_DEPENDENCIES Snappy) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") + elseif (ORC_PACKAGE_KIND STREQUAL "vcpkg") +@@ -168,13 +168,13 @@ elseif (ORC_PACKAGE_KIND STREQUAL "vcpkg") + elseif (NOT "${SNAPPY_HOME}" STREQUAL "") + find_package (Snappy REQUIRED) + if (ORC_PREFER_STATIC_SNAPPY AND SNAPPY_STATIC_LIB) +- add_resolved_library (orc_snappy ${SNAPPY_STATIC_LIB} ${SNAPPY_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_snappy ${SNAPPY_STATIC_LIB} ${SNAPPY_INCLUDE_DIR}) + else () +- add_resolved_library (orc_snappy ${SNAPPY_LIBRARY} ${SNAPPY_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_snappy ${SNAPPY_LIBRARY} ${SNAPPY_INCLUDE_DIR}) + endif () + list (APPEND ORC_SYSTEM_DEPENDENCIES Snappy) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") +- provide_find_module (Snappy) ++ orc_provide_find_module (Snappy) + else () + set(SNAPPY_HOME "${THIRDPARTY_DIR}/snappy_ep-install") + set(SNAPPY_INCLUDE_DIR "${SNAPPY_HOME}/include") +@@ -194,7 +194,7 @@ else () + ${THIRDPARTY_LOG_OPTIONS} + BUILD_BYPRODUCTS "${SNAPPY_STATIC_LIB}") + +- add_built_library (snappy_ep orc_snappy ${SNAPPY_STATIC_LIB} ${SNAPPY_INCLUDE_DIR}) ++ orc_add_built_library (snappy_ep orc_snappy ${SNAPPY_STATIC_LIB} ${SNAPPY_INCLUDE_DIR}) + + list (APPEND ORC_VENDOR_DEPENDENCIES "orc::vendored_snappy|${SNAPPY_STATIC_LIB_NAME}") + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") +@@ -207,7 +207,7 @@ add_library (orc::snappy ALIAS orc_snappy) + + if (ORC_PACKAGE_KIND STREQUAL "conan") + find_package (ZLIB REQUIRED CONFIG) +- add_resolved_library (orc_zlib ${ZLIB_LIBRARIES} ${ZLIB_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_zlib ${ZLIB_LIBRARIES} ${ZLIB_INCLUDE_DIR}) + list (APPEND ORC_SYSTEM_DEPENDENCIES ZLIB) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") + elseif (ORC_PACKAGE_KIND STREQUAL "vcpkg") +@@ -219,13 +219,13 @@ elseif (ORC_PACKAGE_KIND STREQUAL "vcpkg") + elseif (NOT "${ZLIB_HOME}" STREQUAL "") + find_package (ZLIB REQUIRED) + if (ORC_PREFER_STATIC_ZLIB AND ZLIB_STATIC_LIB) +- add_resolved_library (orc_zlib ${ZLIB_STATIC_LIB} ${ZLIB_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_zlib ${ZLIB_STATIC_LIB} ${ZLIB_INCLUDE_DIR}) + else () +- add_resolved_library (orc_zlib ${ZLIB_LIBRARY} ${ZLIB_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_zlib ${ZLIB_LIBRARY} ${ZLIB_INCLUDE_DIR}) + endif () + list (APPEND ORC_SYSTEM_DEPENDENCIES ZLIB) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") +- provide_find_module (ZLIB) ++ orc_provide_find_module (ZLIB) + else () + set(ZLIB_PREFIX "${THIRDPARTY_DIR}/zlib_ep-install") + set(ZLIB_INCLUDE_DIR "${ZLIB_PREFIX}/include") +@@ -252,7 +252,7 @@ else () + ${THIRDPARTY_LOG_OPTIONS} + BUILD_BYPRODUCTS "${ZLIB_STATIC_LIB}") + +- add_built_library (zlib_ep orc_zlib ${ZLIB_STATIC_LIB} ${ZLIB_INCLUDE_DIR}) ++ orc_add_built_library (zlib_ep orc_zlib ${ZLIB_STATIC_LIB} ${ZLIB_INCLUDE_DIR}) + + list (APPEND ORC_VENDOR_DEPENDENCIES "orc::vendored_zlib|${ZLIB_STATIC_LIB_NAME}") + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") +@@ -265,7 +265,7 @@ add_library (orc::zlib ALIAS orc_zlib) + + if (ORC_PACKAGE_KIND STREQUAL "conan") + find_package (ZSTD REQUIRED CONFIG) +- add_resolved_library (orc_zstd ${zstd_LIBRARIES} ${zstd_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_zstd ${zstd_LIBRARIES} ${zstd_INCLUDE_DIR}) + list (APPEND ORC_SYSTEM_DEPENDENCIES ZSTD) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$,zstd::libzstd_shared,zstd::libzstd_static>>") + elseif (ORC_PACKAGE_KIND STREQUAL "vcpkg") +@@ -277,14 +277,14 @@ elseif (ORC_PACKAGE_KIND STREQUAL "vcpkg") + elseif (NOT "${ZSTD_HOME}" STREQUAL "") + find_package (ZSTD REQUIRED) + if (ORC_PREFER_STATIC_ZSTD AND ZSTD_STATIC_LIB) +- add_resolved_library (orc_zstd ${ZSTD_STATIC_LIB} ${ZSTD_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_zstd ${ZSTD_STATIC_LIB} ${ZSTD_INCLUDE_DIR}) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") + else () +- add_resolved_library (orc_zstd ${ZSTD_LIBRARY} ${ZSTD_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_zstd ${ZSTD_LIBRARY} ${ZSTD_INCLUDE_DIR}) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$,zstd::libzstd_shared,zstd::libzstd_static>>") + endif () + list (APPEND ORC_SYSTEM_DEPENDENCIES ZSTD) +- provide_find_module (ZSTD) ++ orc_provide_find_module (ZSTD) + else () + set(ZSTD_HOME "${THIRDPARTY_DIR}/zstd_ep-install") + set(ZSTD_INCLUDE_DIR "${ZSTD_HOME}/include") +@@ -318,7 +318,7 @@ else () + ${THIRDPARTY_LOG_OPTIONS} + BUILD_BYPRODUCTS ${ZSTD_STATIC_LIB}) + +- add_built_library (zstd_ep orc_zstd ${ZSTD_STATIC_LIB} ${ZSTD_INCLUDE_DIR}) ++ orc_add_built_library (zstd_ep orc_zstd ${ZSTD_STATIC_LIB} ${ZSTD_INCLUDE_DIR}) + + list (APPEND ORC_VENDOR_DEPENDENCIES "orc::vendored_zstd|${ZSTD_STATIC_LIB_NAME}") + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") +@@ -330,7 +330,7 @@ add_library (orc::zstd ALIAS orc_zstd) + # LZ4 + if (ORC_PACKAGE_KIND STREQUAL "conan") + find_package (LZ4 REQUIRED CONFIG) +- add_resolved_library (orc_lz4 ${lz4_LIBRARIES} ${lz4_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_lz4 ${lz4_LIBRARIES} ${lz4_INCLUDE_DIR}) + list (APPEND ORC_SYSTEM_DEPENDENCIES LZ4) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") + elseif (ORC_PACKAGE_KIND STREQUAL "vcpkg") +@@ -342,13 +342,13 @@ elseif (ORC_PACKAGE_KIND STREQUAL "vcpkg") + elseif (NOT "${LZ4_HOME}" STREQUAL "") + find_package (LZ4 REQUIRED) + if (ORC_PREFER_STATIC_LZ4 AND LZ4_STATIC_LIB) +- add_resolved_library (orc_lz4 ${LZ4_STATIC_LIB} ${LZ4_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_lz4 ${LZ4_STATIC_LIB} ${LZ4_INCLUDE_DIR}) + else () +- add_resolved_library (orc_lz4 ${LZ4_LIBRARY} ${LZ4_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_lz4 ${LZ4_LIBRARY} ${LZ4_INCLUDE_DIR}) + endif () + list (APPEND ORC_SYSTEM_DEPENDENCIES LZ4) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") +- provide_find_module (LZ4) ++ orc_provide_find_module (LZ4) + else () + set(LZ4_PREFIX "${THIRDPARTY_DIR}/lz4_ep-install") + set(LZ4_INCLUDE_DIR "${LZ4_PREFIX}/include") +@@ -375,7 +375,7 @@ else () + ${THIRDPARTY_LOG_OPTIONS} + BUILD_BYPRODUCTS ${LZ4_STATIC_LIB}) + +- add_built_library (lz4_ep orc_lz4 ${LZ4_STATIC_LIB} ${LZ4_INCLUDE_DIR}) ++ orc_add_built_library (lz4_ep orc_lz4 ${LZ4_STATIC_LIB} ${LZ4_INCLUDE_DIR}) + + list (APPEND ORC_VENDOR_DEPENDENCIES "orc::vendored_lz4|${LZ4_STATIC_LIB_NAME}") + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") +@@ -491,7 +491,7 @@ endif () + + if (ORC_PACKAGE_KIND STREQUAL "conan") + find_package (Protobuf REQUIRED CONFIG) +- add_resolved_library (orc_protobuf ${protobuf_LIBRARIES} ${protobuf_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_protobuf ${protobuf_LIBRARIES} ${protobuf_INCLUDE_DIR}) + list (APPEND ORC_SYSTEM_DEPENDENCIES Protobuf) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") + elseif (ORC_PACKAGE_KIND STREQUAL "vcpkg") +@@ -505,20 +505,20 @@ elseif (NOT "${PROTOBUF_HOME}" STREQUAL "") + find_package (Protobuf REQUIRED) + + if (ORC_PREFER_STATIC_PROTOBUF AND PROTOBUF_STATIC_LIB) +- add_resolved_library (orc_protobuf ${PROTOBUF_STATIC_LIB} ${PROTOBUF_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_protobuf ${PROTOBUF_STATIC_LIB} ${PROTOBUF_INCLUDE_DIR}) + else () +- add_resolved_library (orc_protobuf ${PROTOBUF_LIBRARY} ${PROTOBUF_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_protobuf ${PROTOBUF_LIBRARY} ${PROTOBUF_INCLUDE_DIR}) + endif () + + if (ORC_PREFER_STATIC_PROTOBUF AND PROTOC_STATIC_LIB) +- add_resolved_library (orc_protoc ${PROTOC_STATIC_LIB} ${PROTOBUF_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_protoc ${PROTOC_STATIC_LIB} ${PROTOBUF_INCLUDE_DIR}) + else () +- add_resolved_library (orc_protoc ${PROTOC_LIBRARY} ${PROTOBUF_INCLUDE_DIR}) ++ orc_add_resolved_library (orc_protoc ${PROTOC_LIBRARY} ${PROTOBUF_INCLUDE_DIR}) + endif () + + list (APPEND ORC_SYSTEM_DEPENDENCIES Protobuf) + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") +- provide_find_module (Protobuf) ++ orc_provide_find_module (Protobuf) + else () + set(PROTOBUF_PREFIX "${THIRDPARTY_DIR}/protobuf_ep-install") + set(PROTOBUF_INCLUDE_DIR "${PROTOBUF_PREFIX}/include") +@@ -556,8 +556,8 @@ else () + ${THIRDPARTY_LOG_OPTIONS} + BUILD_BYPRODUCTS "${PROTOBUF_STATIC_LIB}" "${PROTOC_STATIC_LIB}") + +- add_built_library (protobuf_ep orc_protobuf ${PROTOBUF_STATIC_LIB} ${PROTOBUF_INCLUDE_DIR}) +- add_built_library (protobuf_ep orc_protoc ${PROTOC_STATIC_LIB} ${PROTOBUF_INCLUDE_DIR}) ++ orc_add_built_library (protobuf_ep orc_protobuf ${PROTOBUF_STATIC_LIB} ${PROTOBUF_INCLUDE_DIR}) ++ orc_add_built_library (protobuf_ep orc_protoc ${PROTOC_STATIC_LIB} ${PROTOBUF_INCLUDE_DIR}) + + list (APPEND ORC_VENDOR_DEPENDENCIES "orc::vendored_protobuf|${PROTOBUF_STATIC_LIB_NAME}") + list (APPEND ORC_INSTALL_INTERFACE_TARGETS "$") +@@ -610,7 +610,7 @@ if(BUILD_LIBHDFSPP) + BUILD_BYPRODUCTS "${LIBHDFSPP_STATIC_LIB}" + CMAKE_ARGS ${LIBHDFSPP_CMAKE_ARGS}) + +- add_built_library(libhdfspp_ep libhdfspp ${LIBHDFSPP_STATIC_LIB} ${LIBHDFSPP_INCLUDE_DIR}) ++ orc_add_built_library(libhdfspp_ep libhdfspp ${LIBHDFSPP_STATIC_LIB} ${LIBHDFSPP_INCLUDE_DIR}) + + set (LIBHDFSPP_LIBRARIES + libhdfspp diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 6e2294371e7a6..eb9860b240f16 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -771,13 +771,14 @@ if(ARROW_COMPUTE) compute/kernels/scalar_validity.cc compute/kernels/vector_array_sort.cc compute/kernels/vector_cumulative_ops.cc - compute/kernels/vector_pairwise.cc compute/kernels/vector_nested.cc + compute/kernels/vector_pairwise.cc compute/kernels/vector_rank.cc compute/kernels/vector_replace.cc compute/kernels/vector_run_end_encode.cc compute/kernels/vector_select_k.cc compute/kernels/vector_sort.cc + compute/kernels/vector_swizzle.cc compute/key_hash_internal.cc compute/key_map_internal.cc compute/light_array_internal.cc diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt index 0a2536b11e33c..54269f1df0eb6 100644 --- a/cpp/src/arrow/acero/CMakeLists.txt +++ b/cpp/src/arrow/acero/CMakeLists.txt @@ -117,7 +117,7 @@ if(ARROW_TESTING) if(ARROW_WITH_OPENTELEMETRY) target_link_libraries(arrow_acero_testing PRIVATE ${ARROW_OPENTELEMETRY_LIBS}) endif() - list(APPEND ARROW_ACERO_TEST_LINK_LIBS arrow_acero_testing) + list(APPEND ARROW_ACERO_TEST_LINK_LIBS arrow_acero_testing arrow_compute_testing) endif() # Only for hash_aggregate_test.cc. if(ARROW_USE_BOOST) diff --git a/cpp/src/arrow/acero/accumulation_queue.h b/cpp/src/arrow/acero/accumulation_queue.h index a173f9840388f..92d62d5d99d16 100644 --- a/cpp/src/arrow/acero/accumulation_queue.h +++ b/cpp/src/arrow/acero/accumulation_queue.h @@ -22,6 +22,7 @@ #include #include +#include "arrow/acero/visibility.h" #include "arrow/compute/exec.h" #include "arrow/result.h" @@ -70,7 +71,7 @@ class AccumulationQueue { /// For example, in a top-n node, the process callback should determine how many /// rows need to be delivered for the given batch, and then return a task to actually /// deliver those rows. -class SequencingQueue { +class ARROW_ACERO_EXPORT SequencingQueue { public: using Task = std::function; @@ -123,7 +124,7 @@ class SequencingQueue { /// /// It can be helpful to think of this as if a dedicated thread is running Process as /// batches arrive -class SerialSequencingQueue { +class ARROW_ACERO_EXPORT SerialSequencingQueue { public: /// Strategy that describes how to handle items class Processor { diff --git a/cpp/src/arrow/acero/aggregate_node.h b/cpp/src/arrow/acero/aggregate_node.h index 790264b208305..0c6fea16a8acc 100644 --- a/cpp/src/arrow/acero/aggregate_node.h +++ b/cpp/src/arrow/acero/aggregate_node.h @@ -24,6 +24,7 @@ #include "arrow/acero/visibility.h" #include "arrow/compute/api_aggregate.h" +#include "arrow/compute/test_util_internal.h" #include "arrow/compute/type_fwd.h" #include "arrow/result.h" #include "arrow/type_fwd.h" diff --git a/cpp/src/arrow/acero/aggregate_node_test.cc b/cpp/src/arrow/acero/aggregate_node_test.cc index c623271db9fb4..f980496d527d1 100644 --- a/cpp/src/arrow/acero/aggregate_node_test.cc +++ b/cpp/src/arrow/acero/aggregate_node_test.cc @@ -24,6 +24,7 @@ #include "arrow/acero/test_util_internal.h" #include "arrow/compute/api_aggregate.h" +#include "arrow/compute/test_util_internal.h" #include "arrow/result.h" #include "arrow/table.h" #include "arrow/testing/gtest_util.h" @@ -32,6 +33,8 @@ namespace arrow { +using compute::ExecBatchFromJSON; + namespace acero { Result> TableGroupBy( diff --git a/cpp/src/arrow/acero/asof_join_node_test.cc b/cpp/src/arrow/acero/asof_join_node_test.cc index 64d41ccb1ab20..c726ac7c821a7 100644 --- a/cpp/src/arrow/acero/asof_join_node_test.cc +++ b/cpp/src/arrow/acero/asof_join_node_test.cc @@ -41,8 +41,9 @@ #include "arrow/acero/util.h" #include "arrow/api.h" #include "arrow/compute/api_scalar.h" -#include "arrow/compute/kernels/test_util.h" +#include "arrow/compute/cast.h" #include "arrow/compute/row/row_encoder_internal.h" +#include "arrow/compute/test_util_internal.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" #include "arrow/testing/random.h" @@ -67,6 +68,7 @@ namespace arrow { using compute::Cast; using compute::Divide; +using compute::ExecBatchFromJSON; using compute::Multiply; using compute::Subtract; diff --git a/cpp/src/arrow/acero/hash_aggregate_test.cc b/cpp/src/arrow/acero/hash_aggregate_test.cc index 1e2975afc91b3..7f4b6dd75272f 100644 --- a/cpp/src/arrow/acero/hash_aggregate_test.cc +++ b/cpp/src/arrow/acero/hash_aggregate_test.cc @@ -42,7 +42,6 @@ #include "arrow/compute/kernels/codegen_internal.h" #include "arrow/compute/registry.h" #include "arrow/compute/row/grouper.h" -#include "arrow/compute/row/grouper_internal.h" #include "arrow/table.h" #include "arrow/testing/generator.h" #include "arrow/testing/gtest_util.h" @@ -70,9 +69,11 @@ using internal::checked_cast; using internal::checked_pointer_cast; using internal::ToChars; +using compute::ArgShape; using compute::CallFunction; using compute::CountOptions; using compute::default_exec_context; +using compute::ExecBatchFromJSON; using compute::ExecSpan; using compute::FunctionOptions; using compute::Grouper; @@ -84,6 +85,7 @@ using compute::SortKey; using compute::SortOrder; using compute::Take; using compute::TDigestOptions; +using compute::ValidateOutput; using compute::VarianceOptions; namespace acero { @@ -159,8 +161,6 @@ TEST(AggregateSchema, SingleKeyAndSegmentKey) { output_schema); } -namespace { - using GroupByFunction = std::function( const std::vector&, const std::vector&, const std::vector&, const std::vector&, bool, bool)>; @@ -538,930 +538,6 @@ Result GroupByTest(GroupByFunction group_by, const std::vector& ar return GroupByTest(group_by, arguments, keys, {}, aggregates, use_threads); } -template -void TestGroupClassSupportedKeys( - std::function>(const std::vector&)> - make_func) { - ASSERT_OK(make_func({boolean()})); - - ASSERT_OK(make_func({int8(), uint16(), int32(), uint64()})); - - ASSERT_OK(make_func({dictionary(int64(), utf8())})); - - ASSERT_OK(make_func({float16(), float32(), float64()})); - - ASSERT_OK(make_func({utf8(), binary(), large_utf8(), large_binary()})); - - ASSERT_OK(make_func({fixed_size_binary(16), fixed_size_binary(32)})); - - ASSERT_OK(make_func({decimal128(32, 10), decimal256(76, 20)})); - - ASSERT_OK(make_func({date32(), date64()})); - - for (auto unit : { - TimeUnit::SECOND, - TimeUnit::MILLI, - TimeUnit::MICRO, - TimeUnit::NANO, - }) { - ASSERT_OK(make_func({timestamp(unit), duration(unit)})); - } - - ASSERT_OK( - make_func({day_time_interval(), month_interval(), month_day_nano_interval()})); - - ASSERT_OK(make_func({null()})); - - ASSERT_RAISES(NotImplemented, make_func({struct_({field("", int64())})})); - - ASSERT_RAISES(NotImplemented, make_func({struct_({})})); - - ASSERT_RAISES(NotImplemented, make_func({list(int32())})); - - ASSERT_RAISES(NotImplemented, make_func({fixed_size_list(int32(), 5)})); - - ASSERT_RAISES(NotImplemented, make_func({dense_union({field("", int32())})})); -} - -void TestSegments(std::unique_ptr& segmenter, const ExecSpan& batch, - std::vector expected_segments) { - ASSERT_OK_AND_ASSIGN(auto actual_segments, segmenter->GetSegments(batch)); - ASSERT_EQ(actual_segments.size(), expected_segments.size()); - for (size_t i = 0; i < actual_segments.size(); ++i) { - SCOPED_TRACE("segment #" + ToChars(i)); - ASSERT_EQ(actual_segments[i], expected_segments[i]); - } -} - -Result> MakeGrouper(const std::vector& key_types) { - return Grouper::Make(key_types, default_exec_context()); -} - -Result> MakeRowSegmenter( - const std::vector& key_types) { - return RowSegmenter::Make(key_types, /*nullable_leys=*/false, default_exec_context()); -} - -Result> MakeGenericSegmenter( - const std::vector& key_types) { - return MakeAnyKeysSegmenter(key_types, default_exec_context()); -} - -} // namespace - -TEST(RowSegmenter, SupportedKeys) { - TestGroupClassSupportedKeys(MakeRowSegmenter); -} - -TEST(RowSegmenter, Basics) { - std::vector bad_types2 = {int32(), float32()}; - std::vector types2 = {int32(), int32()}; - std::vector bad_types1 = {float32()}; - std::vector types1 = {int32()}; - std::vector types0 = {}; - auto batch2 = ExecBatchFromJSON(types2, "[[1, 1], [1, 2], [2, 2]]"); - auto batch1 = ExecBatchFromJSON(types1, "[[1], [1], [2]]"); - ExecBatch batch0({}, 3); - { - SCOPED_TRACE("types0 segmenting of batch2"); - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types0)); - ExecSpan span2(batch2); - EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 0 "), - segmenter->GetSegments(span2)); - ExecSpan span0(batch0); - TestSegments(segmenter, span0, {{0, 3, true, true}}); - } - { - SCOPED_TRACE("bad_types1 segmenting of batch1"); - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(bad_types1)); - ExecSpan span1(batch1); - EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch value 0 of type "), - segmenter->GetSegments(span1)); - } - { - SCOPED_TRACE("types1 segmenting of batch2"); - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types1)); - ExecSpan span2(batch2); - EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 1 "), - segmenter->GetSegments(span2)); - ExecSpan span1(batch1); - TestSegments(segmenter, span1, {{0, 2, false, true}, {2, 1, true, false}}); - } - { - SCOPED_TRACE("bad_types2 segmenting of batch2"); - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(bad_types2)); - ExecSpan span2(batch2); - EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch value 1 of type "), - segmenter->GetSegments(span2)); - } - { - SCOPED_TRACE("types2 segmenting of batch1"); - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types2)); - ExecSpan span1(batch1); - EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("expected batch size 2 "), - segmenter->GetSegments(span1)); - ExecSpan span2(batch2); - TestSegments(segmenter, span2, - {{0, 1, false, true}, {1, 1, false, false}, {2, 1, true, false}}); - } -} - -TEST(RowSegmenter, NonOrdered) { - for (int num_keys = 1; num_keys <= 2; ++num_keys) { - SCOPED_TRACE("non-ordered " + ToChars(num_keys) + " int32(s)"); - std::vector types(num_keys, int32()); - std::vector values(num_keys, ArrayFromJSON(int32(), "[1, 1, 2, 1, 2]")); - ExecBatch batch(std::move(values), 5); - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types)); - TestSegments(segmenter, ExecSpan(batch), - {{0, 2, false, true}, - {2, 1, false, false}, - {3, 1, false, false}, - {4, 1, true, false}}); - } -} - -TEST(RowSegmenter, EmptyBatches) { - { - SCOPED_TRACE("empty batches {int32}"); - std::vector types = {int32()}; - std::vector batches = { - ExecBatchFromJSON(types, "[]"), ExecBatchFromJSON(types, "[]"), - ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"), - ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[]"), - ExecBatchFromJSON(types, "[[2], [2]]"), ExecBatchFromJSON(types, "[]"), - }; - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types)); - TestSegments(segmenter, ExecSpan(batches[0]), {}); - TestSegments(segmenter, ExecSpan(batches[1]), {}); - TestSegments(segmenter, ExecSpan(batches[2]), {{0, 1, true, true}}); - TestSegments(segmenter, ExecSpan(batches[3]), {}); - TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, true}}); - TestSegments(segmenter, ExecSpan(batches[5]), {}); - TestSegments(segmenter, ExecSpan(batches[6]), {{0, 2, true, false}}); - TestSegments(segmenter, ExecSpan(batches[7]), {}); - } - { - SCOPED_TRACE("empty batches {int32, int32}"); - std::vector types = {int32(), int32()}; - std::vector batches = { - ExecBatchFromJSON(types, "[]"), - ExecBatchFromJSON(types, "[]"), - ExecBatchFromJSON(types, "[[1, 1]]"), - ExecBatchFromJSON(types, "[]"), - ExecBatchFromJSON(types, "[[1, 1]]"), - ExecBatchFromJSON(types, "[]"), - ExecBatchFromJSON(types, "[[2, 2], [2, 2]]"), - ExecBatchFromJSON(types, "[]"), - }; - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types)); - TestSegments(segmenter, ExecSpan(batches[0]), {}); - TestSegments(segmenter, ExecSpan(batches[1]), {}); - TestSegments(segmenter, ExecSpan(batches[2]), {{0, 1, true, true}}); - TestSegments(segmenter, ExecSpan(batches[3]), {}); - TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, true}}); - TestSegments(segmenter, ExecSpan(batches[5]), {}); - TestSegments(segmenter, ExecSpan(batches[6]), {{0, 2, true, false}}); - TestSegments(segmenter, ExecSpan(batches[7]), {}); - } -} - -TEST(RowSegmenter, MultipleSegments) { - auto test_with_keys = [](int num_keys, const std::shared_ptr& key) { - SCOPED_TRACE("multiple segments " + ToChars(num_keys) + " " + - key->type()->ToString()); - std::vector types(num_keys, key->type()); - std::vector values(num_keys, key); - ExecBatch batch(std::move(values), key->length()); - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types)); - TestSegments(segmenter, ExecSpan(batch), - {{0, 2, false, true}, - {2, 1, false, false}, - {3, 1, false, false}, - {4, 2, false, false}, - {6, 2, false, false}, - {8, 1, true, false}}); - }; - for (int num_keys = 1; num_keys <= 2; ++num_keys) { - test_with_keys(num_keys, ArrayFromJSON(int32(), "[1, 1, 2, 5, 3, 3, 5, 5, 4]")); - test_with_keys( - num_keys, - ArrayFromJSON(fixed_size_binary(2), - R"(["aa", "aa", "bb", "ee", "cc", "cc", "ee", "ee", "dd"])")); - test_with_keys(num_keys, DictArrayFromJSON(dictionary(int8(), utf8()), - "[0, 0, 1, 4, 2, 2, 4, 4, 3]", - R"(["a", "b", "c", "d", "e"])")); - } -} - -TEST(RowSegmenter, MultipleSegmentsMultipleBatches) { - { - SCOPED_TRACE("multiple segments multiple batches {int32}"); - std::vector types = {int32()}; - std::vector batches = { - ExecBatchFromJSON(types, "[[1]]"), ExecBatchFromJSON(types, "[[1], [2]]"), - ExecBatchFromJSON(types, "[[5], [3]]"), - ExecBatchFromJSON(types, "[[3], [5], [5]]"), ExecBatchFromJSON(types, "[[4]]")}; - - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types)); - TestSegments(segmenter, ExecSpan(batches[0]), {{0, 1, true, true}}); - TestSegments(segmenter, ExecSpan(batches[1]), - {{0, 1, false, true}, {1, 1, true, false}}); - TestSegments(segmenter, ExecSpan(batches[2]), - {{0, 1, false, false}, {1, 1, true, false}}); - TestSegments(segmenter, ExecSpan(batches[3]), - {{0, 1, false, true}, {1, 2, true, false}}); - TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, false}}); - } - { - SCOPED_TRACE("multiple segments multiple batches {int32, int32}"); - std::vector types = {int32(), int32()}; - std::vector batches = { - ExecBatchFromJSON(types, "[[1, 1]]"), - ExecBatchFromJSON(types, "[[1, 1], [2, 2]]"), - ExecBatchFromJSON(types, "[[5, 5], [3, 3]]"), - ExecBatchFromJSON(types, "[[3, 3], [5, 5], [5, 5]]"), - ExecBatchFromJSON(types, "[[4, 4]]")}; - - ASSERT_OK_AND_ASSIGN(auto segmenter, MakeRowSegmenter(types)); - TestSegments(segmenter, ExecSpan(batches[0]), {{0, 1, true, true}}); - TestSegments(segmenter, ExecSpan(batches[1]), - {{0, 1, false, true}, {1, 1, true, false}}); - TestSegments(segmenter, ExecSpan(batches[2]), - {{0, 1, false, false}, {1, 1, true, false}}); - TestSegments(segmenter, ExecSpan(batches[3]), - {{0, 1, false, true}, {1, 2, true, false}}); - TestSegments(segmenter, ExecSpan(batches[4]), {{0, 1, true, false}}); - } -} - -namespace { - -void TestRowSegmenterConstantBatch( - const std::shared_ptr& type, - std::function shape_func, - std::function>(int64_t key)> value_func, - std::function>(const std::vector&)> - make_segmenter) { - constexpr int64_t n_keys = 3, n_rows = 3, repetitions = 3; - std::vector types(n_keys, type); - std::vector full_values(n_keys); - for (int64_t i = 0; i < n_keys; i++) { - auto shape = shape_func(i); - ASSERT_OK_AND_ASSIGN(auto scalar, value_func(i)); - if (shape == ArgShape::SCALAR) { - full_values[i] = std::move(scalar); - } else { - ASSERT_OK_AND_ASSIGN(full_values[i], MakeArrayFromScalar(*scalar, n_rows)); - } - } - auto test_with_keys = [&](int64_t keys) -> Status { - SCOPED_TRACE("constant-batch with " + ToChars(keys) + " key(s)"); - std::vector values(full_values.begin(), full_values.begin() + keys); - ExecBatch batch(values, n_rows); - std::vector key_types(types.begin(), types.begin() + keys); - ARROW_ASSIGN_OR_RAISE(auto segmenter, make_segmenter(key_types)); - for (int64_t i = 0; i < repetitions; i++) { - TestSegments(segmenter, ExecSpan(batch), {{0, n_rows, true, true}}); - ARROW_RETURN_NOT_OK(segmenter->Reset()); - } - return Status::OK(); - }; - for (int64_t i = 0; i <= n_keys; i++) { - ASSERT_OK(test_with_keys(i)); - } -} - -} // namespace - -TEST(RowSegmenter, ConstantArrayBatch) { - TestRowSegmenterConstantBatch( - int32(), [](int64_t key) { return ArgShape::ARRAY; }, - [](int64_t key) { return MakeScalar(1); }, MakeRowSegmenter); -} - -TEST(RowSegmenter, ConstantScalarBatch) { - TestRowSegmenterConstantBatch( - int32(), [](int64_t key) { return ArgShape::SCALAR; }, - [](int64_t key) { return MakeScalar(1); }, MakeRowSegmenter); -} - -TEST(RowSegmenter, ConstantMixedBatch) { - TestRowSegmenterConstantBatch( - int32(), - [](int64_t key) { return key % 2 == 0 ? ArgShape::SCALAR : ArgShape::ARRAY; }, - [](int64_t key) { return MakeScalar(1); }, MakeRowSegmenter); -} - -TEST(RowSegmenter, ConstantArrayBatchWithAnyKeysSegmenter) { - TestRowSegmenterConstantBatch( - int32(), [](int64_t key) { return ArgShape::ARRAY; }, - [](int64_t key) { return MakeScalar(1); }, MakeGenericSegmenter); -} - -TEST(RowSegmenter, ConstantScalarBatchWithAnyKeysSegmenter) { - TestRowSegmenterConstantBatch( - int32(), [](int64_t key) { return ArgShape::SCALAR; }, - [](int64_t key) { return MakeScalar(1); }, MakeGenericSegmenter); -} - -TEST(RowSegmenter, ConstantMixedBatchWithAnyKeysSegmenter) { - TestRowSegmenterConstantBatch( - int32(), - [](int64_t key) { return key % 2 == 0 ? ArgShape::SCALAR : ArgShape::ARRAY; }, - [](int64_t key) { return MakeScalar(1); }, MakeGenericSegmenter); -} - -TEST(RowSegmenter, ConstantFixedSizeBinaryArrayBatch) { - constexpr int fsb = 8; - auto type = fixed_size_binary(fsb); - ASSERT_OK_AND_ASSIGN(auto value, MakeScalar(type, std::string(fsb, 'X'))); - TestRowSegmenterConstantBatch( - type, [](int64_t key) { return ArgShape::ARRAY; }, - [&](int64_t key) { return value; }, MakeRowSegmenter); -} - -TEST(RowSegmenter, ConstantFixedSizeBinaryScalarBatch) { - constexpr int fsb = 8; - auto type = fixed_size_binary(fsb); - ASSERT_OK_AND_ASSIGN(auto value, MakeScalar(type, std::string(fsb, 'X'))); - TestRowSegmenterConstantBatch( - fixed_size_binary(8), [](int64_t key) { return ArgShape::SCALAR; }, - [&](int64_t key) { return value; }, MakeRowSegmenter); -} - -TEST(RowSegmenter, ConstantFixedSizeBinaryMixedBatch) { - constexpr int fsb = 8; - auto type = fixed_size_binary(fsb); - ASSERT_OK_AND_ASSIGN(auto value, MakeScalar(type, std::string(fsb, 'X'))); - TestRowSegmenterConstantBatch( - fixed_size_binary(8), - [](int64_t key) { return key % 2 == 0 ? ArgShape::SCALAR : ArgShape::ARRAY; }, - [&](int64_t key) { return value; }, MakeRowSegmenter); -} - -TEST(RowSegmenter, ConstantFixedSizeBinaryArrayBatchWithAnyKeysSegmenter) { - constexpr int fsb = 8; - auto type = fixed_size_binary(fsb); - ASSERT_OK_AND_ASSIGN(auto value, MakeScalar(type, std::string(fsb, 'X'))); - TestRowSegmenterConstantBatch( - type, [](int64_t key) { return ArgShape::ARRAY; }, - [&](int64_t key) { return value; }, MakeGenericSegmenter); -} - -TEST(RowSegmenter, ConstantFixedSizeBinaryScalarBatchWithAnyKeysSegmenter) { - constexpr int fsb = 8; - auto type = fixed_size_binary(fsb); - ASSERT_OK_AND_ASSIGN(auto value, MakeScalar(type, std::string(fsb, 'X'))); - TestRowSegmenterConstantBatch( - fixed_size_binary(8), [](int64_t key) { return ArgShape::SCALAR; }, - [&](int64_t key) { return value; }, MakeGenericSegmenter); -} - -TEST(RowSegmenter, ConstantFixedSizeBinaryMixedBatchWithAnyKeysSegmenter) { - constexpr int fsb = 8; - auto type = fixed_size_binary(fsb); - ASSERT_OK_AND_ASSIGN(auto value, MakeScalar(type, std::string(fsb, 'X'))); - TestRowSegmenterConstantBatch( - fixed_size_binary(8), - [](int64_t key) { return key % 2 == 0 ? ArgShape::SCALAR : ArgShape::ARRAY; }, - [&](int64_t key) { return value; }, MakeGenericSegmenter); -} - -TEST(RowSegmenter, ConstantDictionaryArrayBatch) { - auto index_type = int32(); - auto value_type = utf8(); - auto dict_type = dictionary(index_type, value_type); - auto dict = ArrayFromJSON(value_type, R"(["alpha", null, "gamma"])"); - ASSERT_OK_AND_ASSIGN(auto index_value, MakeScalar(index_type, 0)); - auto dict_value = DictionaryScalar::Make(std::move(index_value), dict); - TestRowSegmenterConstantBatch( - dict_type, [](int64_t key) { return ArgShape::ARRAY; }, - [&](int64_t key) { return dict_value; }, MakeRowSegmenter); -} - -TEST(RowSegmenter, ConstantDictionaryScalarBatch) { - auto index_type = int32(); - auto value_type = utf8(); - auto dict_type = dictionary(index_type, value_type); - auto dict = ArrayFromJSON(value_type, R"(["alpha", null, "gamma"])"); - ASSERT_OK_AND_ASSIGN(auto index_value, MakeScalar(index_type, 0)); - auto dict_value = DictionaryScalar::Make(std::move(index_value), dict); - TestRowSegmenterConstantBatch( - dict_type, [](int64_t key) { return ArgShape::SCALAR; }, - [&](int64_t key) { return dict_value; }, MakeRowSegmenter); -} - -TEST(RowSegmenter, ConstantDictionaryMixedBatch) { - auto index_type = int32(); - auto value_type = utf8(); - auto dict_type = dictionary(index_type, value_type); - auto dict = ArrayFromJSON(value_type, R"(["alpha", null, "gamma"])"); - ASSERT_OK_AND_ASSIGN(auto index_value, MakeScalar(index_type, 0)); - auto dict_value = DictionaryScalar::Make(std::move(index_value), dict); - TestRowSegmenterConstantBatch( - dict_type, - [](int64_t key) { return key % 2 == 0 ? ArgShape::SCALAR : ArgShape::ARRAY; }, - [&](int64_t key) { return dict_value; }, MakeRowSegmenter); -} - -TEST(RowSegmenter, ConstantDictionaryArrayBatchWithAnyKeysSegmenter) { - auto index_type = int32(); - auto value_type = utf8(); - auto dict_type = dictionary(index_type, value_type); - auto dict = ArrayFromJSON(value_type, R"(["alpha", null, "gamma"])"); - ASSERT_OK_AND_ASSIGN(auto index_value, MakeScalar(index_type, 0)); - auto dict_value = DictionaryScalar::Make(std::move(index_value), dict); - TestRowSegmenterConstantBatch( - dict_type, [](int64_t key) { return ArgShape::ARRAY; }, - [&](int64_t key) { return dict_value; }, MakeGenericSegmenter); -} - -TEST(RowSegmenter, ConstantDictionaryScalarBatchWithAnyKeysSegmenter) { - auto index_type = int32(); - auto value_type = utf8(); - auto dict_type = dictionary(index_type, value_type); - auto dict = ArrayFromJSON(value_type, R"(["alpha", null, "gamma"])"); - ASSERT_OK_AND_ASSIGN(auto index_value, MakeScalar(index_type, 0)); - auto dict_value = DictionaryScalar::Make(std::move(index_value), dict); - TestRowSegmenterConstantBatch( - dict_type, [](int64_t key) { return ArgShape::SCALAR; }, - [&](int64_t key) { return dict_value; }, MakeGenericSegmenter); -} - -TEST(RowSegmenter, ConstantDictionaryMixedBatchWithAnyKeysSegmenter) { - auto index_type = int32(); - auto value_type = utf8(); - auto dict_type = dictionary(index_type, value_type); - auto dict = ArrayFromJSON(value_type, R"(["alpha", null, "gamma"])"); - ASSERT_OK_AND_ASSIGN(auto index_value, MakeScalar(index_type, 0)); - auto dict_value = DictionaryScalar::Make(std::move(index_value), dict); - TestRowSegmenterConstantBatch( - dict_type, - [](int64_t key) { return key % 2 == 0 ? ArgShape::SCALAR : ArgShape::ARRAY; }, - [&](int64_t key) { return dict_value; }, MakeGenericSegmenter); -} - -TEST(RowSegmenter, RowConstantBatch) { - constexpr size_t n = 3; - std::vector types = {int32(), int32(), int32()}; - auto full_batch = ExecBatchFromJSON(types, "[[1, 1, 1], [2, 2, 2], [3, 3, 3]]"); - std::vector expected_segments_for_size_0 = {{0, 3, true, true}}; - std::vector expected_segments = { - {0, 1, false, true}, {1, 1, false, false}, {2, 1, true, false}}; - auto test_by_size = [&](size_t size) -> Status { - SCOPED_TRACE("constant-batch with " + ToChars(size) + " key(s)"); - std::vector values(full_batch.values.begin(), - full_batch.values.begin() + size); - ExecBatch batch(values, full_batch.length); - std::vector key_types(types.begin(), types.begin() + size); - ARROW_ASSIGN_OR_RAISE(auto segmenter, MakeRowSegmenter(key_types)); - TestSegments(segmenter, ExecSpan(batch), - size == 0 ? expected_segments_for_size_0 : expected_segments); - return Status::OK(); - }; - for (size_t i = 0; i <= n; i++) { - ASSERT_OK(test_by_size(i)); - } -} - -TEST(Grouper, SupportedKeys) { TestGroupClassSupportedKeys(MakeGrouper); } - -struct TestGrouper { - explicit TestGrouper(std::vector types, std::vector shapes = {}) - : types_(std::move(types)), shapes_(std::move(shapes)) { - grouper_ = Grouper::Make(types_).ValueOrDie(); - - FieldVector fields; - for (const auto& type : types_) { - fields.push_back(field("", type.GetSharedPtr())); - } - key_schema_ = schema(std::move(fields)); - } - - void ExpectConsume(const std::string& key_json, const std::string& expected) { - auto expected_arr = ArrayFromJSON(uint32(), expected); - if (shapes_.size() > 0) { - ExpectConsume(ExecBatchFromJSON(types_, shapes_, key_json), expected_arr); - } else { - ExpectConsume(ExecBatchFromJSON(types_, key_json), expected_arr); - } - } - - void ExpectConsume(const std::vector& key_values, Datum expected) { - ASSERT_OK_AND_ASSIGN(auto key_batch, ExecBatch::Make(key_values)); - ExpectConsume(key_batch, expected); - } - - void ExpectConsume(const ExecBatch& key_batch, Datum expected) { - Datum ids; - ConsumeAndValidate(key_batch, &ids); - AssertEquivalentIds(expected, ids); - } - - void ExpectUniques(const ExecBatch& uniques) { - EXPECT_THAT(grouper_->GetUniques(), ResultWith(Eq(uniques))); - } - - void ExpectUniques(const std::string& uniques_json) { - if (shapes_.size() > 0) { - ExpectUniques(ExecBatchFromJSON(types_, shapes_, uniques_json)); - } else { - ExpectUniques(ExecBatchFromJSON(types_, uniques_json)); - } - } - - void AssertEquivalentIds(const Datum& expected, const Datum& actual) { - auto left = expected.make_array(); - auto right = actual.make_array(); - ASSERT_EQ(left->length(), right->length()) << "#ids unequal"; - int64_t num_ids = left->length(); - auto left_data = left->data(); - auto right_data = right->data(); - auto left_ids = reinterpret_cast(left_data->buffers[1]->data()); - auto right_ids = reinterpret_cast(right_data->buffers[1]->data()); - uint32_t max_left_id = 0; - uint32_t max_right_id = 0; - for (int64_t i = 0; i < num_ids; ++i) { - if (left_ids[i] > max_left_id) { - max_left_id = left_ids[i]; - } - if (right_ids[i] > max_right_id) { - max_right_id = right_ids[i]; - } - } - std::vector right_to_left_present(max_right_id + 1, false); - std::vector left_to_right_present(max_left_id + 1, false); - std::vector right_to_left(max_right_id + 1); - std::vector left_to_right(max_left_id + 1); - for (int64_t i = 0; i < num_ids; ++i) { - uint32_t left_id = left_ids[i]; - uint32_t right_id = right_ids[i]; - if (!left_to_right_present[left_id]) { - left_to_right[left_id] = right_id; - left_to_right_present[left_id] = true; - } - if (!right_to_left_present[right_id]) { - right_to_left[right_id] = left_id; - right_to_left_present[right_id] = true; - } - ASSERT_EQ(left_id, right_to_left[right_id]); - ASSERT_EQ(right_id, left_to_right[left_id]); - } - } - - void ConsumeAndValidate(const ExecBatch& key_batch, Datum* ids = nullptr) { - ASSERT_OK_AND_ASSIGN(Datum id_batch, grouper_->Consume(ExecSpan(key_batch))); - - ValidateConsume(key_batch, id_batch); - - if (ids) { - *ids = std::move(id_batch); - } - } - - void ValidateConsume(const ExecBatch& key_batch, const Datum& id_batch) { - if (uniques_.length == -1) { - ASSERT_OK_AND_ASSIGN(uniques_, grouper_->GetUniques()); - } else if (static_cast(grouper_->num_groups()) > uniques_.length) { - ASSERT_OK_AND_ASSIGN(ExecBatch new_uniques, grouper_->GetUniques()); - - // check that uniques_ are prefixes of new_uniques - for (int i = 0; i < uniques_.num_values(); ++i) { - auto new_unique = new_uniques[i].make_array(); - ValidateOutput(*new_unique); - - AssertDatumsEqual(uniques_[i], new_unique->Slice(0, uniques_.length), - /*verbose=*/true); - } - - uniques_ = std::move(new_uniques); - } - - // check that the ids encode an equivalent key sequence - auto ids = id_batch.make_array(); - ValidateOutput(*ids); - - for (int i = 0; i < key_batch.num_values(); ++i) { - SCOPED_TRACE(ToChars(i) + "th key array"); - auto original = - key_batch[i].is_array() - ? key_batch[i].make_array() - : *MakeArrayFromScalar(*key_batch[i].scalar(), key_batch.length); - ASSERT_OK_AND_ASSIGN(auto encoded, Take(*uniques_[i].make_array(), *ids)); - AssertArraysEqual(*original, *encoded, /*verbose=*/true, - EqualOptions().nans_equal(true)); - } - } - - std::vector types_; - std::vector shapes_; - std::shared_ptr key_schema_; - std::unique_ptr grouper_; - ExecBatch uniques_ = ExecBatch({}, -1); -}; - -TEST(Grouper, BooleanKey) { - TestGrouper g({boolean()}); - - g.ExpectConsume("[[true], [true]]", "[0, 0]"); - - g.ExpectConsume("[[true], [true]]", "[0, 0]"); - - g.ExpectConsume("[[false], [null]]", "[1, 2]"); - - g.ExpectConsume("[[true], [false], [true], [false], [null], [false], [null]]", - "[0, 1, 0, 1, 2, 1, 2]"); -} - -TEST(Grouper, NumericKey) { - for (auto ty : { - uint8(), - int8(), - uint16(), - int16(), - uint32(), - int32(), - uint64(), - int64(), - float16(), - float32(), - float64(), - }) { - SCOPED_TRACE("key type: " + ty->ToString()); - - TestGrouper g({ty}); - - g.ExpectConsume("[[3], [3]]", "[0, 0]"); - g.ExpectUniques("[[3]]"); - - g.ExpectConsume("[[3], [3]]", "[0, 0]"); - g.ExpectUniques("[[3]]"); - - g.ExpectConsume("[[27], [81], [81]]", "[1, 2, 2]"); - g.ExpectUniques("[[3], [27], [81]]"); - - g.ExpectConsume("[[3], [27], [3], [27], [null], [81], [27], [81]]", - "[0, 1, 0, 1, 3, 2, 1, 2]"); - g.ExpectUniques("[[3], [27], [81], [null]]"); - } -} - -TEST(Grouper, FloatingPointKey) { - TestGrouper g({float32()}); - - // -0.0 hashes differently from 0.0 - g.ExpectConsume("[[0.0], [-0.0]]", "[0, 1]"); - - g.ExpectConsume("[[Inf], [-Inf]]", "[2, 3]"); - - // assert(!(NaN == NaN)) does not cause spurious new groups - g.ExpectConsume("[[NaN], [NaN]]", "[4, 4]"); - - // TODO(bkietz) test denormal numbers, more NaNs -} - -TEST(Grouper, StringKey) { - for (auto ty : {utf8(), large_utf8(), fixed_size_binary(2)}) { - SCOPED_TRACE("key type: " + ty->ToString()); - - TestGrouper g({ty}); - - g.ExpectConsume(R"([["eh"], ["eh"]])", "[0, 0]"); - - g.ExpectConsume(R"([["eh"], ["eh"]])", "[0, 0]"); - - g.ExpectConsume(R"([["be"], [null]])", "[1, 2]"); - } -} - -TEST(Grouper, DictKey) { - TestGrouper g({dictionary(int32(), utf8())}); - - // For dictionary keys, all batches must share a single dictionary. - // Eventually, differing dictionaries will be unified and indices transposed - // during encoding to relieve this restriction. - const auto dict = ArrayFromJSON(utf8(), R"(["ex", "why", "zee", null])"); - - auto WithIndices = [&](const std::string& indices) { - return Datum(*DictionaryArray::FromArrays(ArrayFromJSON(int32(), indices), dict)); - }; - - // NB: null index is not considered equivalent to index=3 (which encodes null in dict) - g.ExpectConsume({WithIndices(" [3, 1, null, 0, 2]")}, - ArrayFromJSON(uint32(), "[0, 1, 2, 3, 4]")); - - g = TestGrouper({dictionary(int32(), utf8())}); - - g.ExpectConsume({WithIndices(" [0, 1, 2, 3, null]")}, - ArrayFromJSON(uint32(), "[0, 1, 2, 3, 4]")); - - g.ExpectConsume({WithIndices(" [3, 1, null, 0, 2]")}, - ArrayFromJSON(uint32(), "[3, 1, 4, 0, 2]")); - - auto dict_arr = *DictionaryArray::FromArrays( - ArrayFromJSON(int32(), "[0, 1]"), - ArrayFromJSON(utf8(), R"(["different", "dictionary"])")); - ExecSpan dict_span({*dict_arr->data()}, 2); - EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, - HasSubstr("Unifying differing dictionaries"), - g.grouper_->Consume(dict_span)); -} - -TEST(Grouper, StringInt64Key) { - TestGrouper g({utf8(), int64()}); - - g.ExpectConsume(R"([["eh", 0], ["eh", 0]])", "[0, 0]"); - - g.ExpectConsume(R"([["eh", 0], ["eh", null]])", "[0, 1]"); - - g.ExpectConsume(R"([["eh", 1], ["bee", 1]])", "[2, 3]"); - - g.ExpectConsume(R"([["eh", null], ["bee", 1]])", "[1, 3]"); - - g = TestGrouper({utf8(), int64()}); - - g.ExpectConsume(R"([ - ["ex", 0], - ["ex", 0], - ["why", 0], - ["ex", 1], - ["why", 0], - ["ex", 1], - ["ex", 0], - ["why", 1] - ])", - "[0, 0, 1, 2, 1, 2, 0, 3]"); - - g.ExpectConsume(R"([ - ["ex", 0], - [null, 0], - [null, 0], - ["ex", 1], - [null, null], - ["ex", 1], - ["ex", 0], - ["why", null] - ])", - "[0, 4, 4, 2, 5, 2, 0, 6]"); -} - -TEST(Grouper, DoubleStringInt64Key) { - TestGrouper g({float64(), utf8(), int64()}); - - g.ExpectConsume(R"([[1.5, "eh", 0], [1.5, "eh", 0]])", "[0, 0]"); - - g.ExpectConsume(R"([[1.5, "eh", 0], [1.5, "eh", 0]])", "[0, 0]"); - - g.ExpectConsume(R"([[1.0, "eh", 0], [1.0, "be", null]])", "[1, 2]"); - - // note: -0 and +0 hash differently - g.ExpectConsume(R"([[-0.0, "be", 7], [0.0, "be", 7]])", "[3, 4]"); -} - -TEST(Grouper, RandomInt64Keys) { - TestGrouper g({int64()}); - for (int i = 0; i < 4; ++i) { - SCOPED_TRACE(ToChars(i) + "th key batch"); - - ExecBatch key_batch{ - *random::GenerateBatch(g.key_schema_->fields(), 1 << 12, 0xDEADBEEF)}; - g.ConsumeAndValidate(key_batch); - } -} - -TEST(Grouper, RandomStringInt64Keys) { - TestGrouper g({utf8(), int64()}); - for (int i = 0; i < 4; ++i) { - SCOPED_TRACE(ToChars(i) + "th key batch"); - - ExecBatch key_batch{ - *random::GenerateBatch(g.key_schema_->fields(), 1 << 12, 0xDEADBEEF)}; - g.ConsumeAndValidate(key_batch); - } -} - -TEST(Grouper, RandomStringInt64DoubleInt32Keys) { - TestGrouper g({utf8(), int64(), float64(), int32()}); - for (int i = 0; i < 4; ++i) { - SCOPED_TRACE(ToChars(i) + "th key batch"); - - ExecBatch key_batch{ - *random::GenerateBatch(g.key_schema_->fields(), 1 << 12, 0xDEADBEEF)}; - g.ConsumeAndValidate(key_batch); - } -} - -TEST(Grouper, NullKeys) { - TestGrouper g({null()}); - g.ExpectConsume("[[null], [null]]", "[0, 0]"); -} - -TEST(Grouper, MultipleNullKeys) { - TestGrouper g({null(), null(), null(), null()}); - g.ExpectConsume("[[null, null, null, null], [null, null, null, null]]", "[0, 0]"); -} - -TEST(Grouper, Int64NullKeys) { - TestGrouper g({int64(), null()}); - g.ExpectConsume("[[1, null], [2, null], [1, null]]", "[0, 1, 0]"); -} - -TEST(Grouper, StringNullKeys) { - TestGrouper g({utf8(), null()}); - g.ExpectConsume(R"([["be", null], ["eh", null]])", "[0, 1]"); -} - -TEST(Grouper, DoubleNullStringKey) { - TestGrouper g({float64(), null(), utf8()}); - - g.ExpectConsume(R"([[1.5, null, "eh"], [1.5, null, "eh"]])", "[0, 0]"); - g.ExpectConsume(R"([[null, null, "eh"], [1.0, null, null]])", "[1, 2]"); - g.ExpectConsume(R"([ - [1.0, null, "wh"], - [4.4, null, null], - [5.2, null, "eh"], - [6.5, null, "be"], - [7.3, null, null], - [1.0, null, "wh"], - [9.1, null, "eh"], - [10.2, null, "be"], - [1.0, null, null] - ])", - "[3, 4, 5, 6, 7, 3, 8, 9, 2]"); -} - -TEST(Grouper, EmptyNullKeys) { - TestGrouper g({null()}); - g.ExpectConsume("[]", "[]"); -} - -TEST(Grouper, MakeGroupings) { - auto ExpectGroupings = [](std::string ids_json, std::string expected_json) { - auto ids = checked_pointer_cast(ArrayFromJSON(uint32(), ids_json)); - auto expected = ArrayFromJSON(list(int32()), expected_json); - - auto num_groups = static_cast(expected->length()); - ASSERT_OK_AND_ASSIGN(auto actual, Grouper::MakeGroupings(*ids, num_groups)); - AssertArraysEqual(*expected, *actual, /*verbose=*/true); - - // validate ApplyGroupings - ASSERT_OK_AND_ASSIGN(auto grouped_ids, Grouper::ApplyGroupings(*actual, *ids)); - - for (uint32_t group = 0; group < num_groups; ++group) { - auto ids_slice = checked_pointer_cast(grouped_ids->value_slice(group)); - for (auto slot : *ids_slice) { - EXPECT_EQ(slot, group); - } - } - }; - - ExpectGroupings("[]", "[[]]"); - - ExpectGroupings("[0, 0, 0]", "[[0, 1, 2]]"); - - ExpectGroupings("[0, 0, 0, 1, 1, 2]", "[[0, 1, 2], [3, 4], [5], []]"); - - ExpectGroupings("[2, 1, 2, 1, 1, 2]", "[[], [1, 3, 4], [0, 2, 5], [], []]"); - - ExpectGroupings("[2, 2, 5, 5, 2, 3]", "[[], [], [0, 1, 4], [5], [], [2, 3], [], []]"); - - auto ids = checked_pointer_cast(ArrayFromJSON(uint32(), "[0, null, 1]")); - EXPECT_RAISES_WITH_MESSAGE_THAT(Invalid, HasSubstr("MakeGroupings with null ids"), - Grouper::MakeGroupings(*ids, 5)); -} - -TEST(Grouper, ScalarValues) { - // large_utf8 forces GrouperImpl over GrouperFastImpl - for (const auto& str_type : {utf8(), large_utf8()}) { - { - TestGrouper g( - {boolean(), int32(), decimal128(3, 2), decimal256(3, 2), fixed_size_binary(2), - str_type, int32()}, - {ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::SCALAR, - ArgShape::SCALAR, ArgShape::SCALAR, ArgShape::ARRAY}); - g.ExpectConsume( - R"([ -[true, 1, "1.00", "2.00", "ab", "foo", 2], -[true, 1, "1.00", "2.00", "ab", "foo", 2], -[true, 1, "1.00", "2.00", "ab", "foo", 3] -])", - "[0, 0, 1]"); - } - { - auto dict_type = dictionary(int32(), utf8()); - TestGrouper g({dict_type, str_type}, {ArgShape::SCALAR, ArgShape::SCALAR}); - const auto dict = R"(["foo", null])"; - g.ExpectConsume( - {DictScalarFromJSON(dict_type, "0", dict), ScalarFromJSON(str_type, R"("")")}, - ArrayFromJSON(uint32(), "[0]")); - g.ExpectConsume( - {DictScalarFromJSON(dict_type, "1", dict), ScalarFromJSON(str_type, R"("")")}, - ArrayFromJSON(uint32(), "[1]")); - } - } -} - void TestSegmentKey(GroupByFunction group_by, const std::shared_ptr& table, Datum output, const std::vector& segment_keys); diff --git a/cpp/src/arrow/acero/hash_join_node_test.cc b/cpp/src/arrow/acero/hash_join_node_test.cc index 76ad9c7d650eb..654fd59c45d5a 100644 --- a/cpp/src/arrow/acero/hash_join_node_test.cc +++ b/cpp/src/arrow/acero/hash_join_node_test.cc @@ -26,9 +26,9 @@ #include "arrow/acero/test_util_internal.h" #include "arrow/acero/util.h" #include "arrow/api.h" -#include "arrow/compute/kernels/test_util.h" #include "arrow/compute/light_array_internal.h" #include "arrow/compute/row/row_encoder_internal.h" +#include "arrow/compute/test_util_internal.h" #include "arrow/extension/uuid.h" #include "arrow/testing/extension_type.h" #include "arrow/testing/generator.h" @@ -49,6 +49,7 @@ using compute::and_; using compute::call; using compute::default_exec_context; using compute::ExecBatchBuilder; +using compute::ExecBatchFromJSON; using compute::ExecSpan; using compute::field_ref; using compute::SortIndices; @@ -2350,7 +2351,7 @@ TEST(HashJoin, FineGrainedResidualFilter) { auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ [null, null, "r_payload"], [null, 0, "r_payload"], - [null, 42, "r_payload"], + [null, 42, "r_payload"], ["both1", null, "r_payload"], ["both2", null, "r_payload"], ["right_only", null, "r_payload"], @@ -2519,7 +2520,7 @@ TEST(HashJoin, FineGrainedResidualFilter) { auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ [null, null, "r_payload"], [null, 0, "r_payload"], - [null, 42, "r_payload"], + [null, 42, "r_payload"], ["both1", null, "r_payload"], ["both1", 0, "r_payload"], ["both1", 42, "r_payload"], @@ -2704,7 +2705,7 @@ TEST(HashJoin, FineGrainedResidualFilter) { auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ [null, null, "r_payload"], [null, 0, "r_payload"], - [null, 42, "r_payload"], + [null, 42, "r_payload"], ["right_only", null, "r_payload"], ["right_only", 0, "r_payload"], ["right_only", 42, "r_payload"]])"); @@ -2879,7 +2880,7 @@ TEST(HashJoin, FineGrainedResidualFilter) { auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ [null, null, "r_payload"], [null, 0, "r_payload"], - [null, 42, "r_payload"], + [null, 42, "r_payload"], ["both1", null, "r_payload"], ["both1", 0, "r_payload"], ["both2", null, "r_payload"], @@ -3054,7 +3055,7 @@ TEST(HashJoin, FineGrainedResidualFilter) { auto expected = ExecBatchFromJSON({utf8(), int32(), utf8()}, R"([ [null, null, "r_payload"], [null, 0, "r_payload"], - [null, 42, "r_payload"], + [null, 42, "r_payload"], ["both1", null, "r_payload"], ["both2", null, "r_payload"], ["right_only", null, "r_payload"], @@ -3370,8 +3371,10 @@ TEST(HashJoin, LARGE_MEMORY_TEST(BuildSideOver4GBVarLength)) { constexpr int value_no_match_length_min = 128; constexpr int value_no_match_length_max = 129; constexpr int value_match_length = 130; + // The value "DDD..." will be hashed to the partition over 4GB of the hash table. + // Matching at this area gives us more coverage. const auto value_match = - std::make_shared(std::string(value_match_length, 'X')); + std::make_shared(std::string(value_match_length, 'D')); constexpr int16_t num_rows_per_batch_left = 128; constexpr int16_t num_rows_per_batch_right = 4096; const int64_t num_batches_left = 8; @@ -3446,5 +3449,104 @@ TEST(HashJoin, LARGE_MEMORY_TEST(BuildSideOver4GBVarLength)) { num_batches_left * num_rows_per_batch_left * num_batches_right); } +// GH-45334: The row ids of the matching rows on the right side (the build side) are very +// big, causing the index calculation overflow. +TEST(HashJoin, BuildSideLargeRowIds) { + GTEST_SKIP() << "Test disabled due to excessively time and resource consuming, " + "for local debugging only."; + + // A fair amount of match rows to trigger both SIMD and non-SIMD code paths. + const int64_t num_match_rows = 35; + const int64_t num_rows_per_match_batch = 35; + const int64_t num_match_batches = num_match_rows / num_rows_per_match_batch; + + const int64_t num_unmatch_rows_large = 720898048; + const int64_t num_rows_per_unmatch_batch_large = 352001; + const int64_t num_unmatch_batches_large = + num_unmatch_rows_large / num_rows_per_unmatch_batch_large; + + auto schema_small = + schema({field("small_key", int64()), field("small_payload", int64())}); + auto schema_large = + schema({field("large_key", int64()), field("large_payload", int64())}); + + // A carefully chosen key value which hashes to 0xFFFFFFFE, making the match rows to be + // placed at higher address of the row table. + const int64_t match_key = 289339070; + const int64_t match_payload = 42; + + // Match arrays of length num_rows_per_match_batch. + ASSERT_OK_AND_ASSIGN( + auto match_key_arr, + Constant(MakeScalar(match_key))->Generate(num_rows_per_match_batch)); + ASSERT_OK_AND_ASSIGN( + auto match_payload_arr, + Constant(MakeScalar(match_payload))->Generate(num_rows_per_match_batch)); + // Append 1 row of null to trigger null processing code paths. + ASSERT_OK_AND_ASSIGN(auto null_arr, MakeArrayOfNull(int64(), 1)); + ASSERT_OK_AND_ASSIGN(match_key_arr, Concatenate({match_key_arr, null_arr})); + ASSERT_OK_AND_ASSIGN(match_payload_arr, Concatenate({match_payload_arr, null_arr})); + // Match batch. + ExecBatch match_batch({match_key_arr, match_payload_arr}, num_rows_per_match_batch + 1); + + // Small batch. + ExecBatch batch_small = match_batch; + + // Large unmatch batches. + const int64_t seed = 42; + std::vector unmatch_batches_large; + unmatch_batches_large.reserve(num_unmatch_batches_large); + ASSERT_OK_AND_ASSIGN(auto unmatch_payload_arr_large, + MakeArrayOfNull(int64(), num_rows_per_unmatch_batch_large)); + int64_t unmatch_range_per_batch = + (std::numeric_limits::max() - match_key) / num_unmatch_batches_large; + for (int i = 0; i < num_unmatch_batches_large; ++i) { + auto unmatch_key_arr_large = RandomArrayGenerator(seed).Int64( + num_rows_per_unmatch_batch_large, + /*min=*/match_key + 1 + i * unmatch_range_per_batch, + /*max=*/match_key + 1 + (i + 1) * unmatch_range_per_batch); + unmatch_batches_large.push_back( + ExecBatch({unmatch_key_arr_large, unmatch_payload_arr_large}, + num_rows_per_unmatch_batch_large)); + } + // Large match batch. + ExecBatch match_batch_large = match_batch; + + // Batches with schemas. + auto batches_small = BatchesWithSchema{ + std::vector(num_match_batches, batch_small), schema_small}; + auto batches_large = BatchesWithSchema{std::move(unmatch_batches_large), schema_large}; + for (int i = 0; i < num_match_batches; i++) { + batches_large.batches.push_back(match_batch_large); + } + + Declaration source_small{ + "exec_batch_source", + ExecBatchSourceNodeOptions(batches_small.schema, batches_small.batches)}; + Declaration source_large{ + "exec_batch_source", + ExecBatchSourceNodeOptions(batches_large.schema, batches_large.batches)}; + + HashJoinNodeOptions join_opts(JoinType::INNER, /*left_keys=*/{"small_key"}, + /*right_keys=*/{"large_key"}); + Declaration join{ + "hashjoin", {std::move(source_small), std::move(source_large)}, join_opts}; + + // Join should emit num_match_rows * num_match_rows rows. + ASSERT_OK_AND_ASSIGN(auto batches_result, DeclarationToExecBatches(std::move(join))); + Declaration result{"exec_batch_source", + ExecBatchSourceNodeOptions(std::move(batches_result.schema), + std::move(batches_result.batches))}; + AssertRowCountEq(result, num_match_rows * num_match_rows); + + // All rows should be match_key/payload. + auto predicate = and_({equal(field_ref("small_key"), literal(match_key)), + equal(field_ref("small_payload"), literal(match_payload)), + equal(field_ref("large_key"), literal(match_key)), + equal(field_ref("large_payload"), literal(match_payload))}); + Declaration filter{"filter", {result}, FilterNodeOptions{std::move(predicate)}}; + AssertRowCountEq(std::move(filter), num_match_rows * num_match_rows); +} + } // namespace acero } // namespace arrow diff --git a/cpp/src/arrow/acero/plan_test.cc b/cpp/src/arrow/acero/plan_test.cc index e74ad6a6665a4..61ab09f6674d9 100644 --- a/cpp/src/arrow/acero/plan_test.cc +++ b/cpp/src/arrow/acero/plan_test.cc @@ -27,6 +27,7 @@ #include "arrow/acero/util.h" #include "arrow/compute/exec.h" #include "arrow/compute/expression.h" +#include "arrow/compute/test_util_internal.h" #include "arrow/io/util_internal.h" #include "arrow/record_batch.h" #include "arrow/table.h" @@ -51,8 +52,10 @@ using testing::UnorderedElementsAreArray; namespace arrow { +using compute::ArgShape; using compute::call; using compute::CountOptions; +using compute::ExecBatchFromJSON; using compute::field_ref; using compute::ScalarAggregateOptions; using compute::SortKey; diff --git a/cpp/src/arrow/acero/swiss_join.cc b/cpp/src/arrow/acero/swiss_join.cc index 200a75d1dcc6c..0ef014c6ff540 100644 --- a/cpp/src/arrow/acero/swiss_join.cc +++ b/cpp/src/arrow/acero/swiss_join.cc @@ -439,11 +439,11 @@ Status RowArrayMerge::PrepareForMerge(RowArray* target, num_rows = 0; num_bytes = 0; for (size_t i = 0; i < sources.size(); ++i) { - target->rows_.mutable_offsets()[num_rows] = static_cast(num_bytes); + target->rows_.mutable_offsets()[num_rows] = num_bytes; num_rows += sources[i]->rows_.length(); num_bytes += sources[i]->rows_.offsets()[sources[i]->rows_.length()]; } - target->rows_.mutable_offsets()[num_rows] = static_cast(num_bytes); + target->rows_.mutable_offsets()[num_rows] = num_bytes; } return Status::OK(); @@ -477,14 +477,15 @@ void RowArrayMerge::CopyFixedLength(RowTableImpl* target, const RowTableImpl& so const int64_t* source_rows_permutation) { int64_t num_source_rows = source.length(); - int64_t fixed_length = target->metadata().fixed_length; + uint32_t fixed_length = target->metadata().fixed_length; // Permutation of source rows is optional. Without permutation all that is // needed is memcpy. // if (!source_rows_permutation) { - memcpy(target->mutable_data(1) + fixed_length * first_target_row_id, source.data(1), - fixed_length * num_source_rows); + DCHECK_LE(first_target_row_id, std::numeric_limits::max()); + memcpy(target->mutable_fixed_length_rows(static_cast(first_target_row_id)), + source.fixed_length_rows(/*row_id=*/0), fixed_length * num_source_rows); } else { // Row length must be a multiple of 64-bits due to enforced alignment. // Loop for each output row copying a fixed number of 64-bit words. @@ -494,10 +495,13 @@ void RowArrayMerge::CopyFixedLength(RowTableImpl* target, const RowTableImpl& so int64_t num_words_per_row = fixed_length / sizeof(uint64_t); for (int64_t i = 0; i < num_source_rows; ++i) { int64_t source_row_id = source_rows_permutation[i]; + DCHECK_LE(source_row_id, std::numeric_limits::max()); const uint64_t* source_row_ptr = reinterpret_cast( - source.data(1) + fixed_length * source_row_id); + source.fixed_length_rows(static_cast(source_row_id))); + int64_t target_row_id = first_target_row_id + i; + DCHECK_LE(target_row_id, std::numeric_limits::max()); uint64_t* target_row_ptr = reinterpret_cast( - target->mutable_data(1) + fixed_length * (first_target_row_id + i)); + target->mutable_fixed_length_rows(static_cast(target_row_id))); for (int64_t word = 0; word < num_words_per_row; ++word) { target_row_ptr[word] = source_row_ptr[word]; @@ -529,16 +533,16 @@ void RowArrayMerge::CopyVaryingLength(RowTableImpl* target, const RowTableImpl& // We can simply memcpy bytes of rows if their order has not changed. // - memcpy(target->mutable_data(2) + target_offsets[first_target_row_id], source.data(2), - source_offsets[num_source_rows] - source_offsets[0]); + memcpy(target->mutable_var_length_rows() + target_offsets[first_target_row_id], + source.var_length_rows(), source_offsets[num_source_rows] - source_offsets[0]); } else { int64_t target_row_offset = first_target_row_offset; - uint64_t* target_row_ptr = - reinterpret_cast(target->mutable_data(2) + target_row_offset); + uint64_t* target_row_ptr = reinterpret_cast( + target->mutable_var_length_rows() + target_row_offset); for (int64_t i = 0; i < num_source_rows; ++i) { int64_t source_row_id = source_rows_permutation[i]; const uint64_t* source_row_ptr = reinterpret_cast( - source.data(2) + source_offsets[source_row_id]); + source.var_length_rows() + source_offsets[source_row_id]); int64_t length = source_offsets[source_row_id + 1] - source_offsets[source_row_id]; // Though the row offset is 64-bit, the length of a single row must be 32-bit as // required by current row table implementation. @@ -564,14 +568,18 @@ void RowArrayMerge::CopyNulls(RowTableImpl* target, const RowTableImpl& source, const int64_t* source_rows_permutation) { int64_t num_source_rows = source.length(); int num_bytes_per_row = target->metadata().null_masks_bytes_per_row; - uint8_t* target_nulls = target->null_masks() + num_bytes_per_row * first_target_row_id; + DCHECK_LE(first_target_row_id, std::numeric_limits::max()); + uint8_t* target_nulls = + target->mutable_null_masks(static_cast(first_target_row_id)); if (!source_rows_permutation) { - memcpy(target_nulls, source.null_masks(), num_bytes_per_row * num_source_rows); + memcpy(target_nulls, source.null_masks(/*row_id=*/0), + num_bytes_per_row * num_source_rows); } else { - for (int64_t i = 0; i < num_source_rows; ++i) { + for (uint32_t i = 0; i < num_source_rows; ++i) { int64_t source_row_id = source_rows_permutation[i]; + DCHECK_LE(source_row_id, std::numeric_limits::max()); const uint8_t* source_nulls = - source.null_masks() + num_bytes_per_row * source_row_id; + source.null_masks(static_cast(source_row_id)); for (int64_t byte = 0; byte < num_bytes_per_row; ++byte) { *target_nulls++ = *source_nulls++; } diff --git a/cpp/src/arrow/acero/swiss_join_avx2.cc b/cpp/src/arrow/acero/swiss_join_avx2.cc index 1d6b7eda6e6a0..86d08870e58d8 100644 --- a/cpp/src/arrow/acero/swiss_join_avx2.cc +++ b/cpp/src/arrow/acero/swiss_join_avx2.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/acero/swiss_join_internal.h" +#include "arrow/compute/row/row_util_avx2_internal.h" #include "arrow/util/bit_util.h" #include "arrow/util/simd.h" @@ -46,7 +47,7 @@ int RowArrayAccessor::Visit_avx2(const RowTableImpl& rows, int column_id, int nu if (!is_fixed_length_column) { int varbinary_column_id = VarbinaryColumnId(rows.metadata(), column_id); - const uint8_t* row_ptr_base = rows.data(2); + const uint8_t* row_ptr_base = rows.var_length_rows(); const RowTableImpl::offset_type* row_offsets = rows.offsets(); auto row_offsets_i64 = reinterpret_cast(row_offsets); @@ -172,7 +173,7 @@ int RowArrayAccessor::Visit_avx2(const RowTableImpl& rows, int column_id, int nu if (is_fixed_length_row) { // Case 3: This is a fixed length column in fixed length row // - const uint8_t* row_ptr_base = rows.data(1); + const uint8_t* row_ptr_base = rows.fixed_length_rows(/*row_id=*/0); for (int i = 0; i < num_rows / kUnroll; ++i) { // Load 8 32-bit row ids. __m256i row_id = @@ -197,7 +198,7 @@ int RowArrayAccessor::Visit_avx2(const RowTableImpl& rows, int column_id, int nu } else { // Case 4: This is a fixed length column in varying length row // - const uint8_t* row_ptr_base = rows.data(2); + const uint8_t* row_ptr_base = rows.var_length_rows(); const RowTableImpl::offset_type* row_offsets = rows.offsets(); auto row_offsets_i64 = reinterpret_cast(row_offsets); @@ -237,31 +238,16 @@ int RowArrayAccessor::VisitNulls_avx2(const RowTableImpl& rows, int column_id, // constexpr int kUnroll = 8; - const uint8_t* null_masks = rows.null_masks(); - __m256i null_bits_per_row = - _mm256_set1_epi32(8 * rows.metadata().null_masks_bytes_per_row); - __m256i pos_after_encoding = - _mm256_set1_epi32(rows.metadata().pos_after_encoding(column_id)); + uint32_t pos_after_encoding = rows.metadata().pos_after_encoding(column_id); for (int i = 0; i < num_rows / kUnroll; ++i) { __m256i row_id = _mm256_loadu_si256(reinterpret_cast(row_ids) + i); - __m256i bit_id = _mm256_mullo_epi32(row_id, null_bits_per_row); - bit_id = _mm256_add_epi32(bit_id, pos_after_encoding); - __m256i bytes = _mm256_i32gather_epi32(reinterpret_cast(null_masks), - _mm256_srli_epi32(bit_id, 3), 1); - __m256i bit_in_word = _mm256_sllv_epi32( - _mm256_set1_epi32(1), _mm256_and_si256(bit_id, _mm256_set1_epi32(7))); - // `result` will contain one 32-bit word per tested null bit, either 0xffffffff if the - // null bit was set or 0 if it was unset. - __m256i result = - _mm256_cmpeq_epi32(_mm256_and_si256(bytes, bit_in_word), bit_in_word); - // NB: Be careful about sign-extension when casting the return value of - // _mm256_movemask_epi8 (signed 32-bit) to unsigned 64-bit, which will pollute the - // higher bits of the following OR. - uint32_t null_bytes_lo = static_cast( - _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(result)))); - uint64_t null_bytes_hi = - _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_extracti128_si256(result, 1))); - uint64_t null_bytes = null_bytes_lo | (null_bytes_hi << 32); + __m256i null32 = GetNullBitInt32(rows, pos_after_encoding, row_id); + null32 = _mm256_cmpeq_epi32(null32, _mm256_set1_epi32(1)); + uint32_t null32_lo = + _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_castsi256_si128(null32))); + uint32_t null32_hi = + _mm256_movemask_epi8(_mm256_cvtepi32_epi64(_mm256_extracti128_si256(null32, 1))); + uint64_t null_bytes = null32_lo | (static_cast(null32_hi) << 32); process_8_values_fn(i * kUnroll, null_bytes); } diff --git a/cpp/src/arrow/acero/swiss_join_internal.h b/cpp/src/arrow/acero/swiss_join_internal.h index 6d48a35ecd0eb..d0d97aa1cc0fe 100644 --- a/cpp/src/arrow/acero/swiss_join_internal.h +++ b/cpp/src/arrow/acero/swiss_join_internal.h @@ -72,7 +72,7 @@ class RowArrayAccessor { if (!is_fixed_length_column) { int varbinary_column_id = VarbinaryColumnId(rows.metadata(), column_id); - const uint8_t* row_ptr_base = rows.data(2); + const uint8_t* row_ptr_base = rows.var_length_rows(); const RowTableImpl::offset_type* row_offsets = rows.offsets(); uint32_t field_offset_within_row, field_length; @@ -108,22 +108,21 @@ class RowArrayAccessor { if (field_length == 0) { field_length = 1; } - uint32_t row_length = rows.metadata().fixed_length; bool is_fixed_length_row = rows.metadata().is_fixed_length; if (is_fixed_length_row) { // Case 3: This is a fixed length column in a fixed length row // - const uint8_t* row_ptr_base = rows.data(1) + field_offset_within_row; for (int i = 0; i < num_rows; ++i) { uint32_t row_id = row_ids[i]; - const uint8_t* row_ptr = row_ptr_base + row_length * row_id; + const uint8_t* row_ptr = + rows.fixed_length_rows(row_id) + field_offset_within_row; process_value_fn(i, row_ptr, field_length); } } else { // Case 4: This is a fixed length column in a varying length row // - const uint8_t* row_ptr_base = rows.data(2) + field_offset_within_row; + const uint8_t* row_ptr_base = rows.var_length_rows() + field_offset_within_row; const RowTableImpl::offset_type* row_offsets = rows.offsets(); for (int i = 0; i < num_rows; ++i) { uint32_t row_id = row_ids[i]; @@ -142,13 +141,10 @@ class RowArrayAccessor { template static void VisitNulls(const RowTableImpl& rows, int column_id, int num_rows, const uint32_t* row_ids, PROCESS_VALUE_FN process_value_fn) { - const uint8_t* null_masks = rows.null_masks(); - uint32_t null_mask_num_bytes = rows.metadata().null_masks_bytes_per_row; uint32_t pos_after_encoding = rows.metadata().pos_after_encoding(column_id); for (int i = 0; i < num_rows; ++i) { uint32_t row_id = row_ids[i]; - int64_t bit_id = row_id * null_mask_num_bytes * 8 + pos_after_encoding; - process_value_fn(i, bit_util::GetBit(null_masks, bit_id) ? 0xff : 0); + process_value_fn(i, rows.is_null(row_id, pos_after_encoding) ? 0xff : 0); } } diff --git a/cpp/src/arrow/acero/test_util_internal.cc b/cpp/src/arrow/acero/test_util_internal.cc index 107a20354c0e7..2748d4107ed36 100644 --- a/cpp/src/arrow/acero/test_util_internal.cc +++ b/cpp/src/arrow/acero/test_util_internal.cc @@ -38,6 +38,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/exec.h" #include "arrow/compute/function_internal.h" +#include "arrow/compute/test_util_internal.h" #include "arrow/datum.h" #include "arrow/io/interfaces.h" #include "arrow/record_batch.h" @@ -59,67 +60,12 @@ namespace arrow { using arrow::internal::CpuInfo; using arrow::internal::Executor; +using compute::ExecBatchFromJSON; using compute::SortKey; using compute::Take; namespace acero { -namespace { - -void ValidateOutputImpl(const ArrayData& output) { - ASSERT_OK(::arrow::internal::ValidateArrayFull(output)); - TestInitialized(output); -} - -void ValidateOutputImpl(const ChunkedArray& output) { - ASSERT_OK(output.ValidateFull()); - for (const auto& chunk : output.chunks()) { - TestInitialized(*chunk); - } -} - -void ValidateOutputImpl(const RecordBatch& output) { - ASSERT_OK(output.ValidateFull()); - for (const auto& column : output.column_data()) { - TestInitialized(*column); - } -} - -void ValidateOutputImpl(const Table& output) { - ASSERT_OK(output.ValidateFull()); - for (const auto& column : output.columns()) { - for (const auto& chunk : column->chunks()) { - TestInitialized(*chunk); - } - } -} - -void ValidateOutputImpl(const Scalar& output) { ASSERT_OK(output.ValidateFull()); } - -} // namespace - -void ValidateOutput(const Datum& output) { - switch (output.kind()) { - case Datum::ARRAY: - ValidateOutputImpl(*output.array()); - break; - case Datum::CHUNKED_ARRAY: - ValidateOutputImpl(*output.chunked_array()); - break; - case Datum::RECORD_BATCH: - ValidateOutputImpl(*output.record_batch()); - break; - case Datum::TABLE: - ValidateOutputImpl(*output.table()); - break; - case Datum::SCALAR: - ValidateOutputImpl(*output.scalar()); - break; - default: - break; - } -} - std::vector HardwareFlagsForTesting() { // Acero currently only has AVX2 optimizations return arrow::GetSupportedHardwareFlags({CpuInfo::AVX2}); @@ -199,36 +145,6 @@ ExecNode* MakeDummyNode(ExecPlan* plan, std::string label, std::vector& types, std::string_view json) { - auto fields = ::arrow::internal::MapVector( - [](const TypeHolder& th) { return field("", th.GetSharedPtr()); }, types); - - ExecBatch batch{*RecordBatchFromJSON(schema(std::move(fields)), json)}; - - return batch; -} - -ExecBatch ExecBatchFromJSON(const std::vector& types, - const std::vector& shapes, std::string_view json) { - DCHECK_EQ(types.size(), shapes.size()); - - ExecBatch batch = ExecBatchFromJSON(types, json); - - auto value_it = batch.values.begin(); - for (ArgShape shape : shapes) { - if (shape == ArgShape::SCALAR) { - if (batch.length == 0) { - *value_it = MakeNullScalar(value_it->type()); - } else { - *value_it = value_it->make_array()->GetScalar(0).ValueOrDie(); - } - } - ++value_it; - } - - return batch; -} - Future<> StartAndFinish(ExecPlan* plan) { RETURN_NOT_OK(plan->Validate()); plan->StartProducing(); diff --git a/cpp/src/arrow/acero/test_util_internal.h b/cpp/src/arrow/acero/test_util_internal.h index 569fb1254db4a..2367524a5600c 100644 --- a/cpp/src/arrow/acero/test_util_internal.h +++ b/cpp/src/arrow/acero/test_util_internal.h @@ -36,8 +36,6 @@ namespace arrow::acero { -void ValidateOutput(const Datum& output); - // Enumerate all hardware flags that can be tested on this platform // and would lead to different code paths being tested in Acero. std::vector HardwareFlagsForTesting(); @@ -50,16 +48,6 @@ ExecNode* MakeDummyNode(ExecPlan* plan, std::string label, std::vector& types, std::string_view json); - -/// \brief Shape qualifier for value types. In certain instances -/// (e.g. "map_lookup" kernel), an argument may only be a scalar, where in -/// other kernels arguments can be arrays or scalars -enum class ArgShape { ANY, ARRAY, SCALAR }; - -ExecBatch ExecBatchFromJSON(const std::vector& types, - const std::vector& shapes, std::string_view json); - struct BatchesWithSchema { std::vector batches; std::shared_ptr schema; diff --git a/cpp/src/arrow/acero/tpch_node_test.cc b/cpp/src/arrow/acero/tpch_node_test.cc index 17fb43452bc58..f484d6c9d523e 100644 --- a/cpp/src/arrow/acero/tpch_node_test.cc +++ b/cpp/src/arrow/acero/tpch_node_test.cc @@ -27,7 +27,6 @@ #include "arrow/acero/test_util_internal.h" #include "arrow/acero/tpch_node.h" #include "arrow/acero/util.h" -#include "arrow/compute/kernels/test_util.h" #include "arrow/compute/row/row_encoder_internal.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/matchers.h" diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index b9d6c53215b41..b3c314fccc0b3 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -235,7 +235,7 @@ void AssertTableWriteReadEqual(const std::vector>& input_ write_options.compression = Compression::UNCOMPRESSED; #endif write_options.file_version = adapters::orc::FileVersion(0, 11); - write_options.compression_block_size = 32768; + write_options.compression_block_size = 64 * 1024; write_options.row_index_stride = 5000; EXPECT_OK_AND_ASSIGN(auto writer, adapters::orc::ORCFileWriter::Open( buffer_output_stream.get(), write_options)); @@ -272,7 +272,7 @@ void AssertBatchWriteReadEqual( write_options.compression = Compression::UNCOMPRESSED; #endif write_options.file_version = adapters::orc::FileVersion(0, 11); - write_options.compression_block_size = 32768; + write_options.compression_block_size = 64 * 1024; write_options.row_index_stride = 5000; EXPECT_OK_AND_ASSIGN(auto writer, adapters::orc::ORCFileWriter::Open( buffer_output_stream.get(), write_options)); @@ -330,7 +330,7 @@ std::unique_ptr CreateWriter(uint64_t stripe_size, liborc::OutputStream* stream) { liborc::WriterOptions options; options.setStripeSize(stripe_size); - options.setCompressionBlockSize(1024); + options.setCompressionBlockSize(64 * 1024); options.setMemoryPool(liborc::getDefaultPool()); options.setRowIndexStride(0); return liborc::createWriter(type, stream, options); @@ -668,7 +668,7 @@ TEST_F(TestORCWriterTrivialNoWrite, noWrite) { write_options.compression = Compression::UNCOMPRESSED; #endif write_options.file_version = adapters::orc::FileVersion(0, 11); - write_options.compression_block_size = 32768; + write_options.compression_block_size = 64 * 1024; write_options.row_index_stride = 5000; EXPECT_OK_AND_ASSIGN(auto writer, adapters::orc::ORCFileWriter::Open( buffer_output_stream.get(), write_options)); diff --git a/cpp/src/arrow/compute/CMakeLists.txt b/cpp/src/arrow/compute/CMakeLists.txt index ca811dac041fe..6deb2cbad8cb3 100644 --- a/cpp/src/arrow/compute/CMakeLists.txt +++ b/cpp/src/arrow/compute/CMakeLists.txt @@ -28,6 +28,14 @@ endif() # Unit tests # +# Define arrow_compute_testing object library for common test files +if(ARROW_TESTING) + add_library(arrow_compute_testing OBJECT test_util_internal.cc) + # Even though this is still just an object library we still need to "link" our + # dependencies so that include paths are configured correctly + target_link_libraries(arrow_compute_testing PUBLIC ${ARROW_GTEST_GMOCK}) +endif() + set(ARROW_COMPUTE_TEST_PREFIX "arrow-compute") set(ARROW_COMPUTE_TEST_LABELS "arrow-compute-tests") set(ARROW_COMPUTE_TEST_ARGS PREFIX ${ARROW_COMPUTE_TEST_PREFIX} LABELS @@ -87,9 +95,16 @@ add_arrow_test(internals_test function_test.cc exec_test.cc kernel_test.cc - registry_test.cc) + registry_test.cc + EXTRA_LINK_LIBS + arrow_compute_testing) + +add_arrow_compute_test(expression_test + SOURCES + expression_test.cc + EXTRA_LINK_LIBS + arrow_compute_testing) -add_arrow_compute_test(expression_test SOURCES expression_test.cc) add_arrow_compute_test(row_test SOURCES key_hash_test.cc @@ -98,7 +113,9 @@ add_arrow_compute_test(row_test row/grouper_test.cc row/row_encoder_internal_test.cc row/row_test.cc - util_internal_test.cc) + util_internal_test.cc + EXTRA_LINK_LIBS + arrow_compute_testing) add_arrow_benchmark(function_benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index f0d5c0fcc3d72..22ecf1cc87844 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -155,6 +155,12 @@ static auto kPairwiseOptionsType = GetFunctionOptionsType( DataMember("periods", &PairwiseOptions::periods)); static auto kListFlattenOptionsType = GetFunctionOptionsType( DataMember("recursive", &ListFlattenOptions::recursive)); +static auto kInversePermutationOptionsType = + GetFunctionOptionsType( + DataMember("max_index", &InversePermutationOptions::max_index), + DataMember("output_type", &InversePermutationOptions::output_type)); +static auto kScatterOptionsType = GetFunctionOptionsType( + DataMember("max_index", &ScatterOptions::max_index)); } // namespace } // namespace internal @@ -230,6 +236,17 @@ ListFlattenOptions::ListFlattenOptions(bool recursive) : FunctionOptions(internal::kListFlattenOptionsType), recursive(recursive) {} constexpr char ListFlattenOptions::kTypeName[]; +InversePermutationOptions::InversePermutationOptions( + int64_t max_index, std::shared_ptr output_type) + : FunctionOptions(internal::kInversePermutationOptionsType), + max_index(max_index), + output_type(std::move(output_type)) {} +constexpr char InversePermutationOptions::kTypeName[]; + +ScatterOptions::ScatterOptions(int64_t max_index) + : FunctionOptions(internal::kScatterOptionsType), max_index(max_index) {} +constexpr char ScatterOptions::kTypeName[]; + namespace internal { void RegisterVectorOptions(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunctionOptionsType(kFilterOptionsType)); @@ -244,6 +261,8 @@ void RegisterVectorOptions(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunctionOptionsType(kRankOptionsType)); DCHECK_OK(registry->AddFunctionOptionsType(kPairwiseOptionsType)); DCHECK_OK(registry->AddFunctionOptionsType(kListFlattenOptionsType)); + DCHECK_OK(registry->AddFunctionOptionsType(kInversePermutationOptionsType)); + DCHECK_OK(registry->AddFunctionOptionsType(kScatterOptionsType)); } } // namespace internal @@ -429,5 +448,19 @@ Result CumulativeMean(const Datum& values, const CumulativeOptions& optio return CallFunction("cumulative_mean", {Datum(values)}, &options, ctx); } +// ---------------------------------------------------------------------- +// Swizzle functions + +Result InversePermutation(const Datum& indices, + const InversePermutationOptions& options, + ExecContext* ctx) { + return CallFunction("inverse_permutation", {indices}, &options, ctx); +} + +Result Scatter(const Datum& values, const Datum& indices, + const ScatterOptions& options, ExecContext* ctx) { + return CallFunction("scatter", {values, indices}, &options, ctx); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index e5bcc37329661..ada1665b3ec7c 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -257,6 +257,40 @@ class ARROW_EXPORT ListFlattenOptions : public FunctionOptions { bool recursive = false; }; +/// \brief Options for inverse_permutation function +class ARROW_EXPORT InversePermutationOptions : public FunctionOptions { + public: + explicit InversePermutationOptions(int64_t max_index = -1, + std::shared_ptr output_type = NULLPTR); + static constexpr char const kTypeName[] = "InversePermutationOptions"; + static InversePermutationOptions Defaults() { return InversePermutationOptions(); } + + /// \brief The max value in the input indices to allow. The length of the function's + /// output will be this value plus 1. If negative, this value will be set to the length + /// of the input indices minus 1 and the length of the function's output will be the + /// length of the input indices. + int64_t max_index = -1; + /// \brief The type of the output inverse permutation. If null, the output will be of + /// the same type as the input indices, otherwise must be signed integer type. An + /// invalid error will be reported if this type is not able to store the length of the + /// input indices. + std::shared_ptr output_type = NULLPTR; +}; + +/// \brief Options for scatter function +class ARROW_EXPORT ScatterOptions : public FunctionOptions { + public: + explicit ScatterOptions(int64_t max_index = -1); + static constexpr char const kTypeName[] = "ScatterOptions"; + static ScatterOptions Defaults() { return ScatterOptions(); } + + /// \brief The max value in the input indices to allow. The length of the function's + /// output will be this value plus 1. If negative, this value will be set to the length + /// of the input indices minus 1 and the length of the function's output will be the + /// length of the input indices. + int64_t max_index = -1; +}; + /// @} /// \brief Filter with a boolean selection filter @@ -705,5 +739,58 @@ Result> PairwiseDiff(const Array& array, bool check_overflow = false, ExecContext* ctx = NULLPTR); +/// \brief Return the inverse permutation of the given indices. +/// +/// For indices[i] = x, inverse_permutation[x] = i. And inverse_permutation[x] = null if x +/// does not appear in the input indices. Indices must be in the range of [0, max_index], +/// or null, which will be ignored. If multiple indices point to the same value, the last +/// one is used. +/// +/// For example, with +/// indices = [null, 0, null, 2, 4, 1, 1] +/// the inverse permutation is +/// [1, 6, 3, null, 4, null, null] +/// if max_index = 6. +/// +/// \param[in] indices array-like indices +/// \param[in] options configures the max index and the output type +/// \param[in] ctx the function execution context, optional +/// \return the resulting inverse permutation +/// +/// \since 20.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result InversePermutation( + const Datum& indices, + const InversePermutationOptions& options = InversePermutationOptions::Defaults(), + ExecContext* ctx = NULLPTR); + +/// \brief Scatter the values into specified positions according to the indices. +/// +/// For indices[i] = x, output[x] = values[i]. And output[x] = null if x does not appear +/// in the input indices. Indices must be in the range of [0, max_index], or null, in +/// which case the corresponding value will be ignored. If multiple indices point to the +/// same value, the last one is used. +/// +/// For example, with +/// values = [a, b, c, d, e, f, g] +/// indices = [null, 0, null, 2, 4, 1, 1] +/// the output is +/// [b, g, d, null, e, null, null] +/// if max_index = 6. +/// +/// \param[in] values datum to scatter +/// \param[in] indices array-like indices +/// \param[in] options configures the max index of to scatter +/// \param[in] ctx the function execution context, optional +/// \return the resulting datum +/// +/// \since 20.0.0 +/// \note API not yet finalized +ARROW_EXPORT +Result Scatter(const Datum& values, const Datum& indices, + const ScatterOptions& options = ScatterOptions::Defaults(), + ExecContext* ctx = NULLPTR); + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index c269de0763217..b7d017d482013 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -136,6 +136,10 @@ TEST(FunctionOptions, Equality) { options.emplace_back(new SelectKOptions(5, {{SortKey("key", SortOrder::Ascending)}})); options.emplace_back(new Utf8NormalizeOptions()); options.emplace_back(new Utf8NormalizeOptions(Utf8NormalizeOptions::NFD)); + options.emplace_back( + new InversePermutationOptions(/*max_index=*/42, /*output_type=*/int32())); + options.emplace_back(new ScatterOptions()); + options.emplace_back(new ScatterOptions(/*max_index=*/42)); for (size_t i = 0; i < options.size(); i++) { const size_t prev_i = i == 0 ? options.size() - 1 : i - 1; diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 7c7b9c8b68d45..4dedd1f23e090 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -18,9 +18,9 @@ # ---------------------------------------------------------------------- # Tests that don't require the full kernel library -# Define arrow_compute_testing object library for common test files +# Define arrow_compute_kernels_testing object library for common test files if(ARROW_TESTING) - add_library(arrow_compute_kernels_testing OBJECT test_util.cc) + add_library(arrow_compute_kernels_testing OBJECT test_util_internal.cc) # Even though this is still just an object library we still need to "link" our # dependencies so that include paths are configured correctly target_link_libraries(arrow_compute_kernels_testing PUBLIC ${ARROW_GTEST_GMOCK}) @@ -31,12 +31,14 @@ add_arrow_test(scalar_cast_test SOURCES scalar_cast_test.cc EXTRA_LINK_LIBS - arrow_compute_kernels_testing) + arrow_compute_kernels_testing + arrow_compute_testing) # ---------------------------------------------------------------------- # Scalar kernels -set(ARROW_COMPUTE_SCALAR_TYPE_TEST_LINK_LIBS arrow_compute_kernels_testing) +set(ARROW_COMPUTE_SCALAR_TYPE_TEST_LINK_LIBS arrow_compute_kernels_testing + arrow_compute_testing) if(ARROW_WITH_UTF8PROC) list(APPEND ARROW_COMPUTE_SCALAR_TYPE_TEST_LINK_LIBS utf8proc::utf8proc) endif() @@ -52,13 +54,15 @@ add_arrow_compute_test(scalar_if_else_test SOURCES scalar_if_else_test.cc EXTRA_LINK_LIBS - arrow_compute_kernels_testing) + arrow_compute_kernels_testing + arrow_compute_testing) add_arrow_compute_test(scalar_temporal_test SOURCES scalar_temporal_test.cc EXTRA_LINK_LIBS - arrow_compute_kernels_testing) + arrow_compute_kernels_testing + arrow_compute_testing) add_arrow_compute_test(scalar_math_test SOURCES @@ -66,7 +70,8 @@ add_arrow_compute_test(scalar_math_test scalar_compare_test.cc scalar_round_arithmetic_test.cc EXTRA_LINK_LIBS - arrow_compute_kernels_testing) + arrow_compute_kernels_testing + arrow_compute_testing) add_arrow_compute_test(scalar_utility_test SOURCES @@ -74,7 +79,8 @@ add_arrow_compute_test(scalar_utility_test scalar_set_lookup_test.cc scalar_validity_test.cc EXTRA_LINK_LIBS - arrow_compute_kernels_testing) + arrow_compute_kernels_testing + arrow_compute_testing) add_arrow_benchmark(scalar_arithmetic_benchmark PREFIX "arrow-compute") add_arrow_benchmark(scalar_boolean_benchmark PREFIX "arrow-compute") @@ -101,19 +107,29 @@ add_arrow_compute_test(vector_test vector_run_end_encode_test.cc select_k_test.cc EXTRA_LINK_LIBS - arrow_compute_kernels_testing) + arrow_compute_kernels_testing + arrow_compute_testing) add_arrow_compute_test(vector_sort_test SOURCES vector_sort_test.cc EXTRA_LINK_LIBS - arrow_compute_kernels_testing) + arrow_compute_kernels_testing + arrow_compute_testing) add_arrow_compute_test(vector_selection_test SOURCES vector_selection_test.cc EXTRA_LINK_LIBS - arrow_compute_kernels_testing) + arrow_compute_kernels_testing + arrow_compute_testing) + +add_arrow_compute_test(vector_swizzle_test + SOURCES + vector_swizzle_test.cc + EXTRA_LINK_LIBS + arrow_compute_kernels_testing + arrow_compute_testing) add_arrow_benchmark(vector_hash_benchmark PREFIX "arrow-compute") add_arrow_benchmark(vector_sort_benchmark PREFIX "arrow-compute") @@ -132,6 +148,7 @@ add_arrow_compute_test(aggregate_test aggregate_test.cc EXTRA_LINK_LIBS arrow_compute_kernels_testing + arrow_compute_testing Boost::headers) # ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compute/kernels/aggregate_test.cc b/cpp/src/arrow/compute/kernels/aggregate_test.cc index 65439af2748b5..e6ad915fd5667 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_test.cc @@ -32,7 +32,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/cast.h" #include "arrow/compute/kernels/aggregate_internal.h" -#include "arrow/compute/kernels/test_util.h" +#include "arrow/compute/kernels/test_util_internal.h" #include "arrow/compute/registry.h" #include "arrow/type.h" #include "arrow/type_traits.h" diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index 594bd1fce0b84..2a492f581f53b 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -1037,8 +1037,9 @@ ArrayKernelExec GenerateFloatingPoint(detail::GetTypeId get_id) { // Generate a kernel given a templated functor for integer types // // See "Numeric" above for description of the generator functor -template