diff --git a/.asf.yaml b/.asf.yaml
index 4693631be38b3..d2522ecae0b43 100644
--- a/.asf.yaml
+++ b/.asf.yaml
@@ -25,6 +25,7 @@ github:
- davisusanibar
- felipecrv
- js8544
+ - amoeba
notifications:
commits: commits@arrow.apache.org
diff --git a/.env b/.env
index a551e2120a6fb..6746892fd4ed8 100644
--- a/.env
+++ b/.env
@@ -49,7 +49,7 @@ ULIMIT_CORE=-1
ALMALINUX=8
ALPINE_LINUX=3.16
DEBIAN=11
-FEDORA=35
+FEDORA=38
UBUNTU=20.04
# Default versions for various dependencies
diff --git a/.gitattributes b/.gitattributes
index 69f4139c4e4f4..70007c26c8b9b 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -3,6 +3,9 @@ cpp/src/generated/*.cpp linguist-generated=true
cpp/src/generated/*.h linguist-generated=true
go/**/*.s linguist-generated=true
go/arrow/unionmode_string.go linguist-generated=true
+go/arrow/internal/flatbuf/*.go linguist-generated=true
+go/**/*.pb.go linguist-generated=true
+go/parquet/internal/gen-go/parquet/*.go linguist-generated=true
r/R/RcppExports.R linguist-generated=true
r/R/arrowExports.R linguist-generated=true
r/src/RcppExports.cpp linguist-generated=true
diff --git a/.github/workflows/comment_bot.yml b/.github/workflows/comment_bot.yml
index cc9e02d955afd..f27d95c4e8cd7 100644
--- a/.github/workflows/comment_bot.yml
+++ b/.github/workflows/comment_bot.yml
@@ -35,13 +35,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
path: arrow
# fetch the tags for version number generation
fetch-depth: 0
- name: Set up Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Install Archery and Crossbow dependencies
@@ -60,8 +60,8 @@ jobs:
if: startsWith(github.event.comment.body, '@github-actions autotune')
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v4
- - uses: r-lib/actions/pr-fetch@v2
+ - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
+ - uses: r-lib/actions/pr-fetch@11a22a908006c25fe054c4ef0ac0436b1de3edbe # v2.6.4
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: See what is different
@@ -121,7 +121,7 @@ jobs:
--clang_format_binary=clang-format-${CLANG_TOOLS} \
--exclude_glob=cpp/build-support/lint_exclusions.txt \
--source_dir=r/src --quiet --fix
- - uses: r-lib/actions/setup-r@v2
+ - uses: r-lib/actions/setup-r@11a22a908006c25fe054c4ef0ac0436b1de3edbe # v2.6.4
if: env.R_DOCS == 'true' || env.R_CODE == 'true' || endsWith(github.event.comment.body, 'everything')
- name: Update R docs
if: env.R_DOCS == 'true' || endsWith(github.event.comment.body, 'everything')
@@ -149,7 +149,7 @@ jobs:
git config user.name "$(git log -1 --pretty=format:%an)"
git config user.email "$(git log -1 --pretty=format:%ae)"
git commit -a -m 'Autoformat/render all the things [automated commit]' || echo "No changes to commit"
- - uses: r-lib/actions/pr-push@v2
+ - uses: r-lib/actions/pr-push@11a22a908006c25fe054c4ef0ac0436b1de3edbe # v2.6.4
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
@@ -158,8 +158,8 @@ jobs:
if: startsWith(github.event.comment.body, '@github-actions rebase')
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v4
- - uses: r-lib/actions/pr-fetch@v2
+ - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
+ - uses: r-lib/actions/pr-fetch@11a22a908006c25fe054c4ef0ac0436b1de3edbe # v2.6.4
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Rebase on ${{ github.repository }} default branch
@@ -170,7 +170,7 @@ jobs:
git remote add upstream https://github.com/${{ github.repository }}
git fetch --unshallow upstream ${{ github.event.repository.default_branch }}
git rebase upstream/${{ github.event.repository.default_branch }}
- - uses: r-lib/actions/pr-push@v2
+ - uses: r-lib/actions/pr-push@11a22a908006c25fe054c4ef0ac0436b1de3edbe # v2.6.4
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
args: "--force"
@@ -182,7 +182,7 @@ jobs:
if: github.event.comment.body == 'take'
runs-on: ubuntu-latest
steps:
- - uses: actions/github-script@v6
+ - uses: actions/github-script@d7906e4ad0b1822421a7e6a35d5ca353c962f410 # v6.4.1
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
diff --git a/.github/workflows/cpp.yml b/.github/workflows/cpp.yml
index a9361f9f51378..e6ae6c60b0f4c 100644
--- a/.github/workflows/cpp.yml
+++ b/.github/workflows/cpp.yml
@@ -96,12 +96,12 @@ jobs:
UBUNTU: ${{ matrix.ubuntu }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
key: ${{ matrix.image }}-${{ hashFiles('cpp/**') }}
diff --git a/.github/workflows/dev.yml b/.github/workflows/dev.yml
index cfa9ffb49d7ad..df2b20a9e3c77 100644
--- a/.github/workflows/dev.yml
+++ b/.github/workflows/dev.yml
@@ -37,11 +37,11 @@ jobs:
if: ${{ !contains(github.event.pull_request.title, 'WIP') }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
@@ -84,19 +84,19 @@ jobs:
GIT_COMMITTER_EMAIL: "github-actions[bot]@users.noreply.github.com"
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
- name: Install Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: '3.8'
- name: Install Ruby
- uses: ruby/setup-ruby@v1
+ uses: ruby/setup-ruby@250fcd6a742febb1123a77a841497ccaa8b9e939 # v1.152.0
with:
ruby-version: '2.7'
- name: Install .NET
- uses: actions/setup-dotnet@v3
+ uses: actions/setup-dotnet@3447fd6a9f9e57506b15f895c5b76d3b197dc7c2 # v3.2.0
with:
dotnet-version: '7.0.x'
- name: Install Dependencies
diff --git a/.github/workflows/dev_pr.yml b/.github/workflows/dev_pr.yml
index e5d2a77c5a8a2..78b01b561f3cb 100644
--- a/.github/workflows/dev_pr.yml
+++ b/.github/workflows/dev_pr.yml
@@ -43,7 +43,7 @@ jobs:
name: Process
runs-on: ubuntu-latest
steps:
- - uses: actions/checkout@v4
+ - uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
repository: apache/arrow
ref: main
@@ -53,7 +53,7 @@ jobs:
if: |
(github.event.action == 'opened' ||
github.event.action == 'edited')
- uses: actions/github-script@v6
+ uses: actions/github-script@d7906e4ad0b1822421a7e6a35d5ca353c962f410 # v6.4.1
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
@@ -64,7 +64,7 @@ jobs:
if: |
(github.event.action == 'opened' ||
github.event.action == 'edited')
- uses: actions/github-script@v6
+ uses: actions/github-script@d7906e4ad0b1822421a7e6a35d5ca353c962f410 # v6.4.1
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
@@ -75,7 +75,7 @@ jobs:
if: |
(github.event.action == 'opened' ||
github.event.action == 'edited')
- uses: actions/github-script@v6
+ uses: actions/github-script@d7906e4ad0b1822421a7e6a35d5ca353c962f410 # v6.4.1
with:
debug: true
github-token: ${{ secrets.GITHUB_TOKEN }}
@@ -87,7 +87,7 @@ jobs:
if: |
(github.event.action == 'opened' ||
github.event.action == 'synchronize')
- uses: actions/labeler@v4
+ uses: actions/labeler@ac9175f8a1f3625fd0d4fb234536d26811351594 # v4.3.0
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
configuration-path: .github/workflows/dev_pr/labeler.yml
diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml
index a1ac4c3067dae..b30e1eb8809db 100644
--- a/.github/workflows/docs.yml
+++ b/.github/workflows/docs.yml
@@ -38,20 +38,20 @@ jobs:
UBUNTU: "22.04"
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
- name: Free up disk space
run: |
ci/scripts/util_free_space.sh
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
key: ubuntu-docs-${{ hashFiles('cpp/**') }}
restore-keys: ubuntu-docs-
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
diff --git a/.github/workflows/docs_light.yml b/.github/workflows/docs_light.yml
index 74e6eabe24795..e96ccecdff598 100644
--- a/.github/workflows/docs_light.yml
+++ b/.github/workflows/docs_light.yml
@@ -47,17 +47,17 @@ jobs:
PYTHON: "3.9"
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
key: conda-docs-${{ hashFiles('cpp/**') }}
restore-keys: conda-docs-
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml
index a0dfb9fea1673..11668aaf1b301 100644
--- a/.github/workflows/go.yml
+++ b/.github/workflows/go.yml
@@ -73,7 +73,7 @@ jobs:
GO: ${{ matrix.go }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
@@ -106,7 +106,7 @@ jobs:
github.event_name == 'push' &&
github.repository == 'apache/arrow' &&
github.ref_name == 'main'
- uses: actions/setup-go@v4
+ uses: actions/setup-go@93397bea11091df50f3d7e59dc26a7711a8bcfbe # v4.1.0
with:
go-version: ${{ matrix.go }}
cache: true
@@ -162,12 +162,12 @@ jobs:
GO: ${{ matrix.go }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
@@ -203,11 +203,11 @@ jobs:
GO: ${{ matrix.go }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
@@ -240,12 +240,12 @@ jobs:
go: [1.19, '1.20']
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Install go
- uses: actions/setup-go@v4
+ uses: actions/setup-go@93397bea11091df50f3d7e59dc26a7711a8bcfbe # v4.1.0
with:
go-version: ${{ matrix.go }}
cache: true
@@ -273,12 +273,12 @@ jobs:
go: [1.19, '1.20']
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Install go
- uses: actions/setup-go@v4
+ uses: actions/setup-go@93397bea11091df50f3d7e59dc26a7711a8bcfbe # v4.1.0
with:
go-version: ${{ matrix.go }}
cache: true
@@ -299,7 +299,7 @@ jobs:
github.event_name == 'push' &&
github.repository == 'apache/arrow' &&
github.ref_name == 'main'
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: '3.10'
- name: Run Benchmarks
diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml
index 430b0bb2822e7..bd99b62a2fe02 100644
--- a/.github/workflows/integration.yml
+++ b/.github/workflows/integration.yml
@@ -62,12 +62,12 @@ jobs:
timeout-minutes: 60
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Checkout Arrow Rust
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
repository: apache/arrow-rs
path: rust
@@ -75,13 +75,13 @@ jobs:
run: |
ci/scripts/util_free_space.sh
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
key: conda-${{ hashFiles('cpp/**') }}
restore-keys: conda-
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
diff --git a/.github/workflows/issue_bot.yml b/.github/workflows/issue_bot.yml
index ae344a4c1eba9..86d1858c8c596 100644
--- a/.github/workflows/issue_bot.yml
+++ b/.github/workflows/issue_bot.yml
@@ -33,7 +33,7 @@ jobs:
if: github.event.issue.pull_request == null
runs-on: ubuntu-latest
steps:
- - uses: actions/github-script@v6
+ - uses: actions/github-script@d7906e4ad0b1822421a7e6a35d5ca353c962f410 # v6.4.1
with:
script: |
let split_body = context.payload.issue.body.split('### Component(s)');
diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml
index 76bc57a6c712c..69adc184b7fe7 100644
--- a/.github/workflows/java.yml
+++ b/.github/workflows/java.yml
@@ -49,8 +49,8 @@ env:
jobs:
- debian:
- name: ${{ matrix.title }}
+ ubuntu:
+ name: AMD64 Ubuntu 22.04 Java JDK ${{ matrix.jdk }} Maven ${{ matrix.maven }}
runs-on: ubuntu-latest
if: ${{ !contains(github.event.pull_request.title, 'WIP') }}
timeout-minutes: 30
@@ -58,40 +58,25 @@ jobs:
fail-fast: false
matrix:
jdk: [8, 11, 17, 21]
- include:
- - jdk: 8
- title: AMD64 Debian 9 Java JDK 8 Maven 3.5.4
- maven: 3.5.4
- image: debian-java
- - jdk: 11
- title: AMD64 Debian 9 Java JDK 11 Maven 3.6.2
- maven: 3.6.2
- image: debian-java
- - jdk: 17
- title: AMD64 Ubuntu 22.04 Java JDK 17 Maven 3.9.4
- maven: 3.9.4
- image: eclipse-java
- - jdk: 21
- title: AMD64 Ubuntu 22.04 Java JDK 21 Maven 3.9.4
- maven: 3.9.4
- image: eclipse-java
+ maven: [3.9.5]
+ image: [java]
env:
JDK: ${{ matrix.jdk }}
MAVEN: ${{ matrix.maven }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
key: maven-${{ hashFiles('java/**') }}
restore-keys: maven-
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
diff --git a/.github/workflows/java_jni.yml b/.github/workflows/java_jni.yml
index 467e8a88af5d3..76b10b828ee49 100644
--- a/.github/workflows/java_jni.yml
+++ b/.github/workflows/java_jni.yml
@@ -56,7 +56,7 @@ jobs:
timeout-minutes: 500
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
@@ -64,13 +64,13 @@ jobs:
run: |
ci/scripts/util_free_space.sh
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
key: java-jni-manylinux-2014-${{ hashFiles('cpp/**', 'java/**') }}
restore-keys: java-jni-manylinux-2014-
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
@@ -99,18 +99,18 @@ jobs:
timeout-minutes: 90
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
key: maven-${{ hashFiles('java/**') }}
restore-keys: maven-
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
diff --git a/.github/workflows/java_nightly.yml b/.github/workflows/java_nightly.yml
index 41843d663051a..11aa4e59beefd 100644
--- a/.github/workflows/java_nightly.yml
+++ b/.github/workflows/java_nightly.yml
@@ -43,7 +43,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 1
path: arrow
@@ -51,14 +51,14 @@ jobs:
ref: main
submodules: recursive
- name: Checkout Crossbow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
path: crossbow
repository: ursacomputing/crossbow
ref: main
- name: Set up Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
cache: 'pip'
python-version: 3.8
diff --git a/.github/workflows/js.yml b/.github/workflows/js.yml
index 781b2023e2f42..b2040a76dec48 100644
--- a/.github/workflows/js.yml
+++ b/.github/workflows/js.yml
@@ -47,11 +47,11 @@ jobs:
timeout-minutes: 60
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
diff --git a/.github/workflows/pr_bot.yml b/.github/workflows/pr_bot.yml
index 617f3f2e017a3..596d3511a543d 100644
--- a/.github/workflows/pr_bot.yml
+++ b/.github/workflows/pr_bot.yml
@@ -40,7 +40,7 @@ jobs:
- name: 'Download PR review payload'
id: 'download'
if: github.event_name == 'workflow_run'
- uses: actions/github-script@v6
+ uses: actions/github-script@d7906e4ad0b1822421a7e6a35d5ca353c962f410 # v6.4.1
with:
script: |
const run_id = "${{ github.event.workflow_run.id }}";
@@ -73,7 +73,7 @@ jobs:
curl -sL -o committers.yml $url
echo "committers_path=$(pwd)/committers.yml" >> $GITHUB_OUTPUT
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
path: arrow
repository: apache/arrow
@@ -82,7 +82,7 @@ jobs:
# fetch the tags for version number generation
fetch-depth: 0
- name: Set up Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Install Archery and Crossbow dependencies
diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml
index 7a8fd8d10c235..d201f90101de8 100644
--- a/.github/workflows/python.yml
+++ b/.github/workflows/python.yml
@@ -89,18 +89,18 @@ jobs:
NUMPY: ${{ matrix.numpy || 'latest' }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
key: ${{ matrix.cache }}-${{ hashFiles('cpp/**') }}
restore-keys: ${{ matrix.cache }}-
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
diff --git a/.github/workflows/r.yml b/.github/workflows/r.yml
index a8680aea56d48..db10e6f28ce1c 100644
--- a/.github/workflows/r.yml
+++ b/.github/workflows/r.yml
@@ -68,12 +68,12 @@ jobs:
UBUNTU: ${{ matrix.ubuntu }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
# As this key is identical on both matrix builds only one will be able to successfully cache,
@@ -83,7 +83,7 @@ jobs:
ubuntu-${{ matrix.ubuntu }}-r-${{ matrix.r }}-${{ hashFiles('cpp/src/**/*.cc','cpp/src/**/*.h)') }}-
ubuntu-${{ matrix.ubuntu }}-r-${{ matrix.r }}-
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
@@ -106,7 +106,7 @@ jobs:
if: always()
- name: Save the test output
if: always()
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2
with:
name: test-output
path: r/check/arrow.Rcheck/tests/testthat.Rout*
@@ -139,12 +139,12 @@ jobs:
DEVTOOLSET_VERSION: ${{ matrix.config.devtoolset }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
@@ -168,7 +168,7 @@ jobs:
if: always()
- name: Save the test output
if: always()
- uses: actions/upload-artifact@v3
+ uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2
with:
name: test-output
path: r/check/arrow.Rcheck/tests/testthat.Rout*
diff --git a/.github/workflows/r_nightly.yml b/.github/workflows/r_nightly.yml
index 7f21d4658e007..5a34239721392 100644
--- a/.github/workflows/r_nightly.yml
+++ b/.github/workflows/r_nightly.yml
@@ -45,7 +45,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 1
path: arrow
@@ -53,14 +53,14 @@ jobs:
ref: main
submodules: recursive
- name: Checkout Crossbow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
path: crossbow
repository: ursacomputing/crossbow
ref: main
- name: Set up Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
cache: 'pip'
python-version: 3.8
@@ -86,7 +86,7 @@ jobs:
exit 1
fi
- name: Cache Repo
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: repo
key: r-nightly-${{ github.run_id }}
diff --git a/.github/workflows/ruby.yml b/.github/workflows/ruby.yml
index 2e4b98c2428e9..b9a4ac03b6108 100644
--- a/.github/workflows/ruby.yml
+++ b/.github/workflows/ruby.yml
@@ -71,18 +71,18 @@ jobs:
UBUNTU: ${{ matrix.ubuntu }}
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
- name: Cache Docker Volumes
- uses: actions/cache@v3
+ uses: actions/cache@88522ab9f39a2ea568f7027eddc7d8d8bc9d59c8 # v3.3.1
with:
path: .docker
key: ubuntu-${{ matrix.ubuntu }}-ruby-${{ hashFiles('cpp/**') }}
restore-keys: ubuntu-${{ matrix.ubuntu }}-ruby-
- name: Setup Python
- uses: actions/setup-python@v4
+ uses: actions/setup-python@61a6322f88396a6271a6ee3565807d608ecaddd1 # v4.7.0
with:
python-version: 3.8
- name: Setup Archery
diff --git a/.github/workflows/swift.yml b/.github/workflows/swift.yml
index 825921ac6fa24..f55e9e77503c0 100644
--- a/.github/workflows/swift.yml
+++ b/.github/workflows/swift.yml
@@ -51,7 +51,7 @@ jobs:
timeout-minutes: 15
steps:
- name: Checkout Arrow
- uses: actions/checkout@v4
+ uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
with:
fetch-depth: 0
submodules: recursive
diff --git a/c_glib/README.md b/c_glib/README.md
index 23e3bd91b8a2a..d571053c3dce8 100644
--- a/c_glib/README.md
+++ b/c_glib/README.md
@@ -67,7 +67,8 @@ GLib (replace the version number in the following commands with the
one you use):
```console
-$ wget 'https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/arrow-12.0.0/apache-arrow-12.0.0.tar.gz'
+$ wget 'https://www.apache.org/dyn/closer.lua?action=download&filename=arrow/arrow-12.0.0/apache-arrow-12.0.0.tar.gz' \
+ --output-document apache-arrow-12.0.0.tar.gz
$ tar xf apache-arrow-12.0.0.tar.gz
$ cd apache-arrow-12.0.0
```
@@ -81,7 +82,7 @@ required packages.
macOS:
```console
-$ brew bundle
+$ brew bundle --file=c_glib/Brewfile
$ meson setup c_glib.build c_glib --buildtype=release
$ meson compile -C c_glib.build
$ sudo meson install -C c_glib.build
@@ -127,7 +128,7 @@ $ sudo pip3 install meson
On macOS with [Homebrew](https://brew.sh/):
```console
-$ brew bundle
+$ brew bundle --file=c_glib/Brewfile
```
You can build and install Arrow GLib by the followings:
diff --git a/c_glib/arrow-glib/version.h.in b/c_glib/arrow-glib/version.h.in
index 60c02936193bc..abb8ba08708de 100644
--- a/c_glib/arrow-glib/version.h.in
+++ b/c_glib/arrow-glib/version.h.in
@@ -110,6 +110,15 @@
# define GARROW_UNAVAILABLE(major, minor) G_UNAVAILABLE(major, minor)
#endif
+/**
+ * GARROW_VERSION_15_0:
+ *
+ * You can use this macro value for compile time API version check.
+ *
+ * Since: 15.0.0
+ */
+#define GARROW_VERSION_15_0 G_ENCODE_VERSION(15, 0)
+
/**
* GARROW_VERSION_14_0:
*
@@ -346,6 +355,20 @@
#define GARROW_AVAILABLE_IN_ALL
+#if GARROW_VERSION_MIN_REQUIRED >= GARROW_VERSION_15_0
+# define GARROW_DEPRECATED_IN_15_0 GARROW_DEPRECATED
+# define GARROW_DEPRECATED_IN_15_0_FOR(function) GARROW_DEPRECATED_FOR(function)
+#else
+# define GARROW_DEPRECATED_IN_15_0
+# define GARROW_DEPRECATED_IN_15_0_FOR(function)
+#endif
+
+#if GARROW_VERSION_MAX_ALLOWED < GARROW_VERSION_15_0
+# define GARROW_AVAILABLE_IN_15_0 GARROW_UNAVAILABLE(15, 0)
+#else
+# define GARROW_AVAILABLE_IN_15_0
+#endif
+
#if GARROW_VERSION_MIN_REQUIRED >= GARROW_VERSION_14_0
# define GARROW_DEPRECATED_IN_14_0 GARROW_DEPRECATED
# define GARROW_DEPRECATED_IN_14_0_FOR(function) GARROW_DEPRECATED_FOR(function)
diff --git a/c_glib/doc/gandiva-glib/gandiva-glib-docs.xml b/c_glib/doc/gandiva-glib/gandiva-glib-docs.xml
index 182bbfb527eb2..a5c32f11337e8 100644
--- a/c_glib/doc/gandiva-glib/gandiva-glib-docs.xml
+++ b/c_glib/doc/gandiva-glib/gandiva-glib-docs.xml
@@ -100,6 +100,10 @@
Index of deprecated API
+
+ Index of new symbols in 15.0.0
+
+
Index of new symbols in 4.0.0
diff --git a/c_glib/gandiva-glib/function-registry.cpp b/c_glib/gandiva-glib/function-registry.cpp
index a95019bd62c2b..f47262986db82 100644
--- a/c_glib/gandiva-glib/function-registry.cpp
+++ b/c_glib/gandiva-glib/function-registry.cpp
@@ -18,8 +18,8 @@
*/
#include
-#include
+#include
#include
#include
@@ -34,18 +34,86 @@ G_BEGIN_DECLS
* Since: 0.14.0
*/
-G_DEFINE_TYPE(GGandivaFunctionRegistry,
- ggandiva_function_registry,
- G_TYPE_OBJECT)
+struct GGandivaFunctionRegistryPrivate {
+ std::shared_ptr function_registry;
+};
+
+enum {
+ PROP_FUNCTION_REGISTRY = 1,
+};
+
+G_DEFINE_TYPE_WITH_PRIVATE(GGandivaFunctionRegistry,
+ ggandiva_function_registry,
+ G_TYPE_OBJECT)
+
+#define GGANDIVA_FUNCTION_REGISTRY_GET_PRIVATE(object) \
+ static_cast( \
+ ggandiva_function_registry_get_instance_private( \
+ GGANDIVA_FUNCTION_REGISTRY(object)))
+
+static void
+ggandiva_function_registry_finalize(GObject *object)
+{
+ auto priv = GGANDIVA_FUNCTION_REGISTRY_GET_PRIVATE(object);
+ priv->function_registry.~shared_ptr();
+ G_OBJECT_CLASS(ggandiva_function_registry_parent_class)->finalize(object);
+}
+
+static void
+ggandiva_function_registry_set_property(GObject *object,
+ guint prop_id,
+ const GValue *value,
+ GParamSpec *pspec)
+{
+ auto priv = GGANDIVA_FUNCTION_REGISTRY_GET_PRIVATE(object);
+
+ switch (prop_id) {
+ case PROP_FUNCTION_REGISTRY:
+ priv->function_registry =
+ *static_cast *>(
+ g_value_get_pointer(value));
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID(object, prop_id, pspec);
+ break;
+ }
+}
static void
ggandiva_function_registry_init(GGandivaFunctionRegistry *object)
{
+ auto priv = GGANDIVA_FUNCTION_REGISTRY_GET_PRIVATE(object);
+ new(&priv->function_registry) std::shared_ptr;
}
static void
ggandiva_function_registry_class_init(GGandivaFunctionRegistryClass *klass)
{
+ auto gobject_class = G_OBJECT_CLASS(klass);
+ gobject_class->finalize = ggandiva_function_registry_finalize;
+ gobject_class->set_property = ggandiva_function_registry_set_property;
+
+ GParamSpec *spec;
+ spec = g_param_spec_pointer("function-registry",
+ "Function registry",
+ "The raw std::shared_ptr *",
+ static_cast(G_PARAM_WRITABLE |
+ G_PARAM_CONSTRUCT_ONLY));
+ g_object_class_install_property(gobject_class, PROP_FUNCTION_REGISTRY, spec);
+}
+
+/**
+ * ggandiva_function_registry_default:
+ *
+ * Returns: (transfer full): The process-wide default function registry.
+ *
+ * Since: 15.0.0
+ */
+GGandivaFunctionRegistry *
+ggandiva_function_registry_default(void)
+{
+ auto gandiva_function_registry = gandiva::default_function_registry();
+ return ggandiva_function_registry_new_raw(&gandiva_function_registry);
}
/**
@@ -58,7 +126,8 @@ ggandiva_function_registry_class_init(GGandivaFunctionRegistryClass *klass)
GGandivaFunctionRegistry *
ggandiva_function_registry_new(void)
{
- return GGANDIVA_FUNCTION_REGISTRY(g_object_new(GGANDIVA_TYPE_FUNCTION_REGISTRY, NULL));
+ auto gandiva_function_registry = std::make_shared();
+ return ggandiva_function_registry_new_raw(&gandiva_function_registry);
}
/**
@@ -75,15 +144,16 @@ GGandivaNativeFunction *
ggandiva_function_registry_lookup(GGandivaFunctionRegistry *function_registry,
GGandivaFunctionSignature *function_signature)
{
- gandiva::FunctionRegistry gandiva_function_registry;
+ auto gandiva_function_registry =
+ ggandiva_function_registry_get_raw(function_registry);
auto gandiva_function_signature =
ggandiva_function_signature_get_raw(function_signature);
auto gandiva_native_function =
- gandiva_function_registry.LookupSignature(*gandiva_function_signature);
+ gandiva_function_registry->LookupSignature(*gandiva_function_signature);
if (gandiva_native_function) {
return ggandiva_native_function_new_raw(gandiva_native_function);
} else {
- return NULL;
+ return nullptr;
}
}
@@ -99,18 +169,32 @@ ggandiva_function_registry_lookup(GGandivaFunctionRegistry *function_registry,
GList *
ggandiva_function_registry_get_native_functions(GGandivaFunctionRegistry *function_registry)
{
- gandiva::FunctionRegistry gandiva_function_registry;
-
+ auto gandiva_function_registry =
+ ggandiva_function_registry_get_raw(function_registry);
GList *native_functions = nullptr;
- for (auto gandiva_native_function = gandiva_function_registry.begin();
- gandiva_native_function != gandiva_function_registry.end();
- ++gandiva_native_function) {
- auto native_function = ggandiva_native_function_new_raw(gandiva_native_function);
+ for (const auto &gandiva_native_function : *gandiva_function_registry) {
+ auto native_function = ggandiva_native_function_new_raw(&gandiva_native_function);
native_functions = g_list_prepend(native_functions, native_function);
}
- native_functions = g_list_reverse(native_functions);
-
- return native_functions;
+ return g_list_reverse(native_functions);
}
G_END_DECLS
+
+GGandivaFunctionRegistry *
+ggandiva_function_registry_new_raw(
+ std::shared_ptr *gandiva_function_registry)
+{
+ return GGANDIVA_FUNCTION_REGISTRY(
+ g_object_new(GGANDIVA_TYPE_FUNCTION_REGISTRY,
+ "function-registry", gandiva_function_registry,
+ nullptr));
+}
+
+std::shared_ptr
+ggandiva_function_registry_get_raw(GGandivaFunctionRegistry *function_registry)
+{
+ auto priv = GGANDIVA_FUNCTION_REGISTRY_GET_PRIVATE(function_registry);
+ return priv->function_registry;
+}
+
diff --git a/c_glib/gandiva-glib/function-registry.h b/c_glib/gandiva-glib/function-registry.h
index 1a0d767d45354..8ff6027cf1734 100644
--- a/c_glib/gandiva-glib/function-registry.h
+++ b/c_glib/gandiva-glib/function-registry.h
@@ -35,6 +35,8 @@ struct _GGandivaFunctionRegistryClass
GObjectClass parent_class;
};
+GARROW_AVAILABLE_IN_15_0
+GGandivaFunctionRegistry *ggandiva_function_registry_default(void);
GGandivaFunctionRegistry *ggandiva_function_registry_new(void);
GGandivaNativeFunction *
ggandiva_function_registry_lookup(GGandivaFunctionRegistry *function_registry,
diff --git a/c_glib/gandiva-glib/function-registry.hpp b/c_glib/gandiva-glib/function-registry.hpp
new file mode 100644
index 0000000000000..0430fc57dead2
--- /dev/null
+++ b/c_glib/gandiva-glib/function-registry.hpp
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include
+
+#include
+
+GGandivaFunctionRegistry *
+ggandiva_function_registry_new_raw(
+ std::shared_ptr *gandiva_function_registry);
+std::shared_ptr
+ggandiva_function_registry_get_raw(GGandivaFunctionRegistry *function_registry);
diff --git a/c_glib/meson.build b/c_glib/meson.build
index d9de6fcbf7e24..7c495d2567d72 100644
--- a/c_glib/meson.build
+++ b/c_glib/meson.build
@@ -24,7 +24,7 @@ project('arrow-glib', 'c', 'cpp',
'cpp_std=c++17',
])
-version = '14.0.0-SNAPSHOT'
+version = '15.0.0-SNAPSHOT'
if version.endswith('-SNAPSHOT')
version_numbers = version.split('-')[0].split('.')
version_tag = version.split('-')[1]
diff --git a/c_glib/test/gandiva/test-function-registry.rb b/c_glib/test/gandiva/test-function-registry.rb
index 25bac6673105e..d0f959a1c5f5f 100644
--- a/c_glib/test/gandiva/test-function-registry.rb
+++ b/c_glib/test/gandiva/test-function-registry.rb
@@ -20,7 +20,7 @@ class TestGandivaFunctionRegistry < Test::Unit::TestCase
def setup
omit("Gandiva is required") unless defined?(::Gandiva)
- @registry = Gandiva::FunctionRegistry.new
+ @registry = Gandiva::FunctionRegistry.default
end
sub_test_case("lookup") do
diff --git a/c_glib/test/gandiva/test-native-function.rb b/c_glib/test/gandiva/test-native-function.rb
index 7888f96b678b7..630a1f7c32d2a 100644
--- a/c_glib/test/gandiva/test-native-function.rb
+++ b/c_glib/test/gandiva/test-native-function.rb
@@ -20,7 +20,7 @@ class TestGandivaNativeFunction < Test::Unit::TestCase
def setup
omit("Gandiva is required") unless defined?(::Gandiva)
- @registry = Gandiva::FunctionRegistry.new
+ @registry = Gandiva::FunctionRegistry.default
@not = lookup("not", [boolean_data_type], boolean_data_type)
@isnull = lookup("isnull", [int8_data_type], boolean_data_type)
end
diff --git a/ci/conda_env_python.txt b/ci/conda_env_python.txt
index da52b5ea689be..97203442129c4 100644
--- a/ci/conda_env_python.txt
+++ b/ci/conda_env_python.txt
@@ -26,6 +26,6 @@ numpy>=1.16.6
pytest
pytest-faulthandler
pytest-lazy-fixture
-s3fs>=2021.8.0
+s3fs>=2023.10.0
setuptools
setuptools_scm<8.0.0
diff --git a/ci/docker/fedora-35-cpp.dockerfile b/ci/docker/fedora-38-cpp.dockerfile
similarity index 95%
rename from ci/docker/fedora-35-cpp.dockerfile
rename to ci/docker/fedora-38-cpp.dockerfile
index aefa25663ba14..2dcc094ee20c5 100644
--- a/ci/docker/fedora-35-cpp.dockerfile
+++ b/ci/docker/fedora-38-cpp.dockerfile
@@ -16,7 +16,7 @@
# under the License.
ARG arch
-FROM ${arch}/fedora:35
+FROM ${arch}/fedora:38
ARG arch
# install dependencies
@@ -46,9 +46,9 @@ RUN dnf update -y && \
java-latest-openjdk-devel \
java-latest-openjdk-headless \
json-devel \
+ liborc-devel \
libzstd-devel \
llvm-devel \
- llvm-static \
lz4-devel \
make \
ninja-build \
@@ -64,6 +64,7 @@ RUN dnf update -y && \
utf8proc-devel \
wget \
which \
+ xsimd-devel \
zlib-devel
COPY ci/scripts/install_minio.sh /arrow/ci/scripts/
@@ -100,8 +101,6 @@ ENV absl_SOURCE=BUNDLED \
CC=gcc \
CXX=g++ \
google_cloud_cpp_storage_SOURCE=BUNDLED \
- ORC_SOURCE=BUNDLED \
PARQUET_BUILD_EXAMPLES=ON \
PARQUET_BUILD_EXECUTABLES=ON \
- PATH=/usr/lib/ccache/:$PATH \
- xsimd_SOURCE=BUNDLED
+ PATH=/usr/lib/ccache/:$PATH
diff --git a/ci/docker/ubuntu-swift.dockerfile b/ci/docker/ubuntu-swift.dockerfile
index 5ef6bc433df38..4789c9188c226 100644
--- a/ci/docker/ubuntu-swift.dockerfile
+++ b/ci/docker/ubuntu-swift.dockerfile
@@ -17,8 +17,18 @@
FROM swift:5.7.3
-# Install golang
+# Go is needed for generating test data
RUN apt-get update -y -q && \
apt-get install -y -q --no-install-recommends \
- golang-go && \
- apt-get clean
\ No newline at end of file
+ golang-go \
+ unzip \
+ wget && \
+ apt-get clean
+
+ARG swift_lint=0.53.0
+RUN wget https://github.com/realm/SwiftLint/releases/download/${swift_lint}/swiftlint_linux.zip && \
+ unzip swiftlint_linux.zip && \
+ mv swiftlint /usr/local/bin/ && \
+ mkdir -p /usr/local/share/doc/swiftlint/ && \
+ mv LICENSE /usr/local/share/doc/swiftlint/ && \
+ rm -rf swiftlint_linux.zip
diff --git a/ci/scripts/PKGBUILD b/ci/scripts/PKGBUILD
index dcd313087e966..95029d98f7a01 100644
--- a/ci/scripts/PKGBUILD
+++ b/ci/scripts/PKGBUILD
@@ -18,7 +18,7 @@
_realname=arrow
pkgbase=mingw-w64-${_realname}
pkgname="${MINGW_PACKAGE_PREFIX}-${_realname}"
-pkgver=13.0.0.9000
+pkgver=14.0.0.9000
pkgrel=8000
pkgdesc="Apache Arrow is a cross-language development platform for in-memory data (mingw-w64)"
arch=("any")
diff --git a/ci/scripts/cpp_test.sh b/ci/scripts/cpp_test.sh
index 3acf56bae0fe4..0c6e1c6ef7057 100755
--- a/ci/scripts/cpp_test.sh
+++ b/ci/scripts/cpp_test.sh
@@ -86,7 +86,7 @@ ctest \
--parallel ${n_jobs} \
--timeout ${ARROW_CTEST_TIMEOUT:-300} \
"${ctest_options[@]}" \
- $@
+ "$@"
if [ "${ARROW_BUILD_EXAMPLES}" == "ON" ]; then
examples=$(find ${binary_output_dir} -executable -name "*example")
diff --git a/ci/scripts/go_build.sh b/ci/scripts/go_build.sh
index 7c5ca3230c96e..94f75e501ea0b 100755
--- a/ci/scripts/go_build.sh
+++ b/ci/scripts/go_build.sh
@@ -42,7 +42,9 @@ go install -v ./...
popd
-if [[ -n "${ARROW_GO_INTEGRATION}" ]]; then
+: ${ARROW_INTEGRATION_GO:=ON}
+
+if [ "${ARROW_INTEGRATION_GO}" == "ON" ]; then
pushd ${source_dir}/arrow/internal/cdata_integration
case "$(uname)" in
diff --git a/ci/scripts/integration_arrow.sh b/ci/scripts/integration_arrow.sh
index 2861b1c09d479..b5a38f01412d4 100755
--- a/ci/scripts/integration_arrow.sh
+++ b/ci/scripts/integration_arrow.sh
@@ -20,11 +20,25 @@
set -ex
arrow_dir=${1}
+build_dir=${2}
+
gold_dir=$arrow_dir/testing/data/arrow-ipc-stream/integration
+: ${ARROW_INTEGRATION_CPP:=ON}
+: ${ARROW_INTEGRATION_CSHARP:=ON}
+: ${ARROW_INTEGRATION_GO:=ON}
+: ${ARROW_INTEGRATION_JAVA:=ON}
+: ${ARROW_INTEGRATION_JS:=ON}
+
pip install -e $arrow_dir/dev/archery[integration]
+
# For C Data Interface testing
-pip install jpype1 pythonnet
+if [ "${ARROW_INTEGRATION_CSHARP}" == "ON" ]; then
+ pip install pythonnet
+fi
+if [ "${ARROW_INTEGRATION_JAVA}" == "ON" ]; then
+ pip install jpype1
+fi
# Get more detailed context on crashes
export PYTHONFAULTHANDLER=1
@@ -34,11 +48,11 @@ time archery integration \
--run-c-data \
--run-ipc \
--run-flight \
- --with-cpp=1 \
- --with-csharp=1 \
- --with-java=1 \
- --with-js=1 \
- --with-go=1 \
+ --with-cpp=$([ "$ARROW_INTEGRATION_CPP" == "ON" ] && echo "1" || echo "0") \
+ --with-csharp=$([ "$ARROW_INTEGRATION_CSHARP" == "ON" ] && echo "1" || echo "0") \
+ --with-go=$([ "$ARROW_INTEGRATION_GO" == "ON" ] && echo "1" || echo "0") \
+ --with-java=$([ "$ARROW_INTEGRATION_JAVA" == "ON" ] && echo "1" || echo "0") \
+ --with-js=$([ "$ARROW_INTEGRATION_JS" == "ON" ] && echo "1" || echo "0") \
--gold-dirs=$gold_dir/0.14.1 \
--gold-dirs=$gold_dir/0.17.1 \
--gold-dirs=$gold_dir/1.0.0-bigendian \
diff --git a/ci/scripts/integration_arrow_build.sh b/ci/scripts/integration_arrow_build.sh
new file mode 100755
index 0000000000000..02f593bf77b23
--- /dev/null
+++ b/ci/scripts/integration_arrow_build.sh
@@ -0,0 +1,55 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+set -ex
+
+arrow_dir=${1}
+build_dir=${2}
+
+: ${ARROW_INTEGRATION_CPP:=ON}
+: ${ARROW_INTEGRATION_CSHARP:=ON}
+: ${ARROW_INTEGRATION_GO:=ON}
+: ${ARROW_INTEGRATION_JAVA:=ON}
+: ${ARROW_INTEGRATION_JS:=ON}
+
+${arrow_dir}/ci/scripts/rust_build.sh ${arrow_dir} ${build_dir}
+
+if [ "${ARROW_INTEGRATION_CPP}" == "ON" ]; then
+ ${arrow_dir}/ci/scripts/cpp_build.sh ${arrow_dir} ${build_dir}
+fi
+
+if [ "${ARROW_INTEGRATION_CSHARP}" == "ON" ]; then
+ ${arrow_dir}/ci/scripts/csharp_build.sh ${arrow_dir} ${build_dir}
+fi
+
+if [ "${ARROW_INTEGRATION_GO}" == "ON" ]; then
+ ${arrow_dir}/ci/scripts/go_build.sh ${arrow_dir} ${build_dir}
+fi
+
+if [ "${ARROW_INTEGRATION_JAVA}" == "ON" ]; then
+ export ARROW_JAVA_CDATA="ON"
+ export JAVA_JNI_CMAKE_ARGS="-DARROW_JAVA_JNI_ENABLE_DEFAULT=OFF -DARROW_JAVA_JNI_ENABLE_C=ON"
+
+ ${arrow_dir}/ci/scripts/java_jni_build.sh ${arrow_dir} ${ARROW_HOME} ${build_dir} /tmp/dist/java/$(arch)
+ ${arrow_dir}/ci/scripts/java_build.sh ${arrow_dir} ${build_dir} /tmp/dist/java
+fi
+
+if [ "${ARROW_INTEGRATION_JS}" == "ON" ]; then
+ ${arrow_dir}/ci/scripts/js_build.sh ${arrow_dir} ${build_dir}
+fi
diff --git a/ci/scripts/python_wheel_macos_build.sh b/ci/scripts/python_wheel_macos_build.sh
index 5a3c6fb6d1f6f..fd845c512dcdb 100755
--- a/ci/scripts/python_wheel_macos_build.sh
+++ b/ci/scripts/python_wheel_macos_build.sh
@@ -34,7 +34,7 @@ rm -rf ${source_dir}/python/pyarrow/*.so.*
echo "=== (${PYTHON_VERSION}) Set SDK, C++ and Wheel flags ==="
export _PYTHON_HOST_PLATFORM="macosx-${MACOSX_DEPLOYMENT_TARGET}-${arch}"
-export MACOSX_DEPLOYMENT_TARGET=${MACOSX_DEPLOYMENT_TARGET:-10.14}
+export MACOSX_DEPLOYMENT_TARGET=${MACOSX_DEPLOYMENT_TARGET:-10.15}
export SDKROOT=${SDKROOT:-$(xcrun --sdk macosx --show-sdk-path)}
if [ $arch = "arm64" ]; then
diff --git a/ci/scripts/r_test.sh b/ci/scripts/r_test.sh
index e0c2ce9efedd8..22ec551edb9fa 100755
--- a/ci/scripts/r_test.sh
+++ b/ci/scripts/r_test.sh
@@ -27,7 +27,7 @@ pushd ${source_dir}
printenv
# Run the nixlibs.R test suite, which is not included in the installed package
-${R_BIN} -e 'setwd("tools"); testthat::test_dir(".")'
+${R_BIN} -e 'setwd("tools"); testthat::test_dir(".", stop_on_warning = TRUE)'
# Before release, we always copy the relevant parts of the cpp source into the
# package. In some CI checks, we will use this version of the source:
diff --git a/ci/scripts/swift_test.sh b/ci/scripts/swift_test.sh
index b7ab37fd489c9..b523e3891d93c 100755
--- a/ci/scripts/swift_test.sh
+++ b/ci/scripts/swift_test.sh
@@ -20,12 +20,18 @@
set -ex
data_gen_dir=${1}/swift/data-generator/swift-datagen
+export GOPATH=/
pushd ${data_gen_dir}
go get -d ./...
-go run main.go
+go run .
cp *.arrow ../../Arrow
popd
+source_dir=${1}/swift
+pushd ${source_dir}
+swiftlint --strict
+popd
+
source_dir=${1}/swift/Arrow
pushd ${source_dir}
swift test
diff --git a/ci/scripts/util_free_space.sh b/ci/scripts/util_free_space.sh
index 0518869d06993..dd6ba2c4600a9 100755
--- a/ci/scripts/util_free_space.sh
+++ b/ci/scripts/util_free_space.sh
@@ -25,7 +25,6 @@ du -hsc /usr/local/*
echo "::endgroup::"
# ~1GB
sudo rm -rf \
- /usr/local/aws-cli \
/usr/local/aws-sam-cil \
/usr/local/julia* || :
echo "::group::/usr/local/bin/*"
@@ -34,8 +33,6 @@ echo "::endgroup::"
# ~1GB (From 1.2GB to 214MB)
sudo rm -rf \
/usr/local/bin/aliyun \
- /usr/local/bin/aws \
- /usr/local/bin/aws_completer \
/usr/local/bin/azcopy \
/usr/local/bin/bicep \
/usr/local/bin/cmake-gui \
diff --git a/ci/vcpkg/universal2-osx-static-debug.cmake b/ci/vcpkg/universal2-osx-static-debug.cmake
index 580b4604d522f..8abc1ebf838f1 100644
--- a/ci/vcpkg/universal2-osx-static-debug.cmake
+++ b/ci/vcpkg/universal2-osx-static-debug.cmake
@@ -21,6 +21,6 @@ set(VCPKG_LIBRARY_LINKAGE static)
set(VCPKG_CMAKE_SYSTEM_NAME Darwin)
set(VCPKG_OSX_ARCHITECTURES "x86_64;arm64")
-set(VCPKG_OSX_DEPLOYMENT_TARGET "10.14")
+set(VCPKG_OSX_DEPLOYMENT_TARGET "10.15")
set(VCPKG_BUILD_TYPE debug)
diff --git a/ci/vcpkg/universal2-osx-static-release.cmake b/ci/vcpkg/universal2-osx-static-release.cmake
index 7247d0af351c5..2eb36c15175b2 100644
--- a/ci/vcpkg/universal2-osx-static-release.cmake
+++ b/ci/vcpkg/universal2-osx-static-release.cmake
@@ -21,6 +21,6 @@ set(VCPKG_LIBRARY_LINKAGE static)
set(VCPKG_CMAKE_SYSTEM_NAME Darwin)
set(VCPKG_OSX_ARCHITECTURES "x86_64;arm64")
-set(VCPKG_OSX_DEPLOYMENT_TARGET "10.14")
+set(VCPKG_OSX_DEPLOYMENT_TARGET "10.15")
set(VCPKG_BUILD_TYPE release)
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 8566508406bd4..bcb298407bd8b 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -71,7 +71,7 @@ if(POLICY CMP0135)
cmake_policy(SET CMP0135 NEW)
endif()
-set(ARROW_VERSION "14.0.0-SNAPSHOT")
+set(ARROW_VERSION "15.0.0-SNAPSHOT")
string(REGEX MATCH "^[0-9]+\\.[0-9]+\\.[0-9]+" ARROW_BASE_VERSION "${ARROW_VERSION}")
@@ -770,10 +770,10 @@ if(ARROW_WITH_ZSTD)
endif()
if(ARROW_ORC)
- list(APPEND ARROW_SHARED_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})
- list(APPEND ARROW_STATIC_LINK_LIBS orc::liborc ${ARROW_PROTOBUF_LIBPROTOBUF})
+ list(APPEND ARROW_SHARED_LINK_LIBS orc::orc ${ARROW_PROTOBUF_LIBPROTOBUF})
+ list(APPEND ARROW_STATIC_LINK_LIBS orc::orc ${ARROW_PROTOBUF_LIBPROTOBUF})
if(ORC_SOURCE STREQUAL "SYSTEM")
- list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS orc::liborc
+ list(APPEND ARROW_STATIC_INSTALL_INTERFACE_LIBS orc::orc
${ARROW_PROTOBUF_LIBPROTOBUF})
endif()
endif()
@@ -889,8 +889,8 @@ if(NOT MSVC_TOOLCHAIN)
list(APPEND ARROW_SHARED_LINK_LIBS ${CMAKE_DL_LIBS})
endif()
-set(ARROW_TEST_LINK_TOOLCHAIN arrow::flatbuffers ${ARROW_GTEST_GTEST_MAIN}
- ${ARROW_GTEST_GTEST} ${ARROW_GTEST_GMOCK})
+set(ARROW_TEST_LINK_TOOLCHAIN arrow::flatbuffers ${ARROW_GTEST_GMOCK}
+ ${ARROW_GTEST_GTEST_MAIN})
if(ARROW_BUILD_TESTS)
add_dependencies(arrow_test_dependencies ${ARROW_TEST_LINK_TOOLCHAIN})
@@ -909,7 +909,7 @@ set(ARROW_TEST_SHARED_LINK_LIBS arrow_testing_shared arrow_shared
${ARROW_SHARED_LINK_LIBS} ${ARROW_TEST_LINK_TOOLCHAIN})
if(NOT MSVC)
- set(ARROW_TEST_SHARED_LINK_LIBS ${ARROW_TEST_SHARED_LINK_LIBS} ${CMAKE_DL_LIBS})
+ list(APPEND ARROW_TEST_SHARED_LINK_LIBS ${CMAKE_DL_LIBS})
endif()
if("${ARROW_TEST_LINKAGE}" STREQUAL "shared")
diff --git a/cpp/cmake_modules/FindGTestAlt.cmake b/cpp/cmake_modules/FindGTestAlt.cmake
index 77d4f39d9e0bf..d1873d138e6c7 100644
--- a/cpp/cmake_modules/FindGTestAlt.cmake
+++ b/cpp/cmake_modules/FindGTestAlt.cmake
@@ -63,4 +63,7 @@ TEST(CXX_STANDARD, MatcherStringView) {
find_package_handle_standard_args(GTestAlt
REQUIRED_VARS GTestAlt_CXX_STANDARD_AVAILABLE)
endif()
+
+ target_link_libraries(GTest::gmock INTERFACE GTest::gtest)
+ target_link_libraries(GTest::gtest_main INTERFACE GTest::gtest)
endif()
diff --git a/cpp/cmake_modules/FindORC.cmake b/cpp/cmake_modules/FindorcAlt.cmake
similarity index 68%
rename from cpp/cmake_modules/FindORC.cmake
rename to cpp/cmake_modules/FindorcAlt.cmake
index aca915acc13d0..dc3b978cf4037 100644
--- a/cpp/cmake_modules/FindORC.cmake
+++ b/cpp/cmake_modules/FindorcAlt.cmake
@@ -15,13 +15,20 @@
# specific language governing permissions and limitations
# under the License.
-# - Find Apache ORC C++ (orc/orc-config.h, liborc.a)
-# This module defines
-# ORC_INCLUDE_DIR, directory containing headers
-# ORC_STATIC_LIB, path to liborc.a
-# ORC_FOUND, whether orc has been found
+if(orcAlt_FOUND)
+ return()
+endif()
-if(ORC_FOUND)
+set(find_package_args)
+if(orcAlt_FIND_VERSION)
+ list(APPEND find_package_args ${orcAlt_FIND_VERSION})
+endif()
+if(orcAlt_FIND_QUIETLY)
+ list(APPEND find_package_args QUIET)
+endif()
+find_package(orc ${find_package_args})
+if(orc_FOUND)
+ set(orcAlt_FOUND TRUE)
return()
endif()
@@ -45,15 +52,13 @@ else()
PATH_SUFFIXES ${ARROW_INCLUDE_PATH_SUFFIXES})
endif()
-if(ORC_STATIC_LIB AND ORC_INCLUDE_DIR)
- set(ORC_FOUND TRUE)
- add_library(orc::liborc STATIC IMPORTED)
- set_target_properties(orc::liborc
- PROPERTIES IMPORTED_LOCATION "${ORC_STATIC_LIB}"
- INTERFACE_INCLUDE_DIRECTORIES "${ORC_INCLUDE_DIR}")
-else()
- if(ORC_FIND_REQUIRED)
- message(FATAL_ERROR "ORC library was required in toolchain and unable to locate")
+find_package_handle_standard_args(orcAlt REQUIRED_VARS ORC_STATIC_LIB ORC_INCLUDE_DIR)
+
+if(orcAlt_FOUND)
+ if(NOT TARGET orc::orc)
+ add_library(orc::orc STATIC IMPORTED)
+ set_target_properties(orc::orc
+ PROPERTIES IMPORTED_LOCATION "${ORC_STATIC_LIB}"
+ INTERFACE_INCLUDE_DIRECTORIES "${ORC_INCLUDE_DIR}")
endif()
- set(ORC_FOUND FALSE)
endif()
diff --git a/cpp/cmake_modules/GandivaAddBitcode.cmake b/cpp/cmake_modules/GandivaAddBitcode.cmake
new file mode 100644
index 0000000000000..98847f8a186fe
--- /dev/null
+++ b/cpp/cmake_modules/GandivaAddBitcode.cmake
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Create bitcode for the given source file.
+function(gandiva_add_bitcode SOURCE)
+ set(CLANG_OPTIONS -std=c++17)
+ if(MSVC)
+ # "19.20" means that it's compatible with Visual Studio 16 2019.
+ # We can update this to "19.30" when we dropped support for Visual
+ # Studio 16 2019.
+ #
+ # See https://cmake.org/cmake/help/latest/variable/MSVC_VERSION.html
+ # for MSVC_VERSION and Visual Studio version.
+ set(FMS_COMPATIBILITY 19.20)
+ list(APPEND CLANG_OPTIONS -fms-compatibility
+ -fms-compatibility-version=${FMS_COMPATIBILITY})
+ endif()
+
+ get_filename_component(SOURCE_BASE ${SOURCE} NAME_WE)
+ get_filename_component(ABSOLUTE_SOURCE ${SOURCE} ABSOLUTE)
+ set(BC_FILE ${CMAKE_CURRENT_BINARY_DIR}/${SOURCE_BASE}.bc)
+ set(PRECOMPILE_COMMAND)
+ if(CMAKE_OSX_SYSROOT)
+ list(APPEND
+ PRECOMPILE_COMMAND
+ ${CMAKE_COMMAND}
+ -E
+ env
+ SDKROOT=${CMAKE_OSX_SYSROOT})
+ endif()
+ list(APPEND
+ PRECOMPILE_COMMAND
+ ${CLANG_EXECUTABLE}
+ ${CLANG_OPTIONS}
+ -DGANDIVA_IR
+ -DNDEBUG # DCHECK macros not implemented in precompiled code
+ -DARROW_STATIC # Do not set __declspec(dllimport) on MSVC on Arrow symbols
+ -DGANDIVA_STATIC # Do not set __declspec(dllimport) on MSVC on Gandiva symbols
+ -fno-use-cxa-atexit # Workaround for unresolved __dso_handle
+ -emit-llvm
+ -O3
+ -c
+ ${ABSOLUTE_SOURCE}
+ -o
+ ${BC_FILE}
+ ${ARROW_GANDIVA_PC_CXX_FLAGS})
+ if(ARROW_BINARY_DIR)
+ list(APPEND PRECOMPILE_COMMAND -I${ARROW_BINARY_DIR}/src)
+ endif()
+ if(ARROW_SOURCE_DIR)
+ list(APPEND PRECOMPILE_COMMAND -I${ARROW_SOURCE_DIR}/src)
+ endif()
+ if(NOT ARROW_USE_NATIVE_INT128)
+ foreach(boost_include_dir ${Boost_INCLUDE_DIRS})
+ list(APPEND PRECOMPILE_COMMAND -I${boost_include_dir})
+ endforeach()
+ endif()
+ add_custom_command(OUTPUT ${BC_FILE}
+ COMMAND ${PRECOMPILE_COMMAND}
+ DEPENDS ${SOURCE_FILE})
+endfunction()
diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake
index 559ddf14f6a91..52632d554aafb 100644
--- a/cpp/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake
@@ -65,7 +65,7 @@ set(ARROW_THIRDPARTY_DEPENDENCIES
lz4
nlohmann_json
opentelemetry-cpp
- ORC
+ orc
re2
Protobuf
RapidJSON
@@ -94,6 +94,14 @@ if("${re2_SOURCE}" STREQUAL "" AND NOT "${RE2_SOURCE}" STREQUAL "")
set(re2_SOURCE ${RE2_SOURCE})
endif()
+# For backward compatibility. We use "ORC_SOURCE" if "orc_SOURCE"
+# isn't specified and "ORC_SOURCE" is specified.
+# We renamed "ORC" dependency name to "orc" in 15.0.0 because
+# upstream uses "orc" not "ORC" as package name.
+if("${orc_SOURCE}" STREQUAL "" AND NOT "${ORC_SOURCE}" STREQUAL "")
+ set(orc_SOURCE ${ORC_SOURCE})
+endif()
+
# For backward compatibility. We use "RE2_ROOT" if "re2_ROOT"
# isn't specified and "RE2_ROOT" is specified.
if("${re2_ROOT}" STREQUAL "" AND NOT "${RE2_ROOT}" STREQUAL "")
@@ -193,7 +201,7 @@ macro(build_dependency DEPENDENCY_NAME)
build_nlohmann_json()
elseif("${DEPENDENCY_NAME}" STREQUAL "opentelemetry-cpp")
build_opentelemetry()
- elseif("${DEPENDENCY_NAME}" STREQUAL "ORC")
+ elseif("${DEPENDENCY_NAME}" STREQUAL "orc")
build_orc()
elseif("${DEPENDENCY_NAME}" STREQUAL "Protobuf")
build_protobuf()
@@ -222,18 +230,21 @@ macro(build_dependency DEPENDENCY_NAME)
endif()
endmacro()
-# Find modules are needed by the consumer in case of a static build, or if the
-# linkage is PUBLIC or INTERFACE.
-macro(provide_find_module PACKAGE_NAME ARROW_CMAKE_PACKAGE_NAME)
- set(module_ "${CMAKE_SOURCE_DIR}/cmake_modules/Find${PACKAGE_NAME}.cmake")
- if(EXISTS "${module_}")
- message(STATUS "Providing CMake module for ${PACKAGE_NAME} as part of ${ARROW_CMAKE_PACKAGE_NAME} CMake package"
+function(provide_cmake_module MODULE_NAME ARROW_CMAKE_PACKAGE_NAME)
+ set(module "${CMAKE_SOURCE_DIR}/cmake_modules/${MODULE_NAME}.cmake")
+ if(EXISTS "${module}")
+ message(STATUS "Providing CMake module for ${MODULE_NAME} as part of ${ARROW_CMAKE_PACKAGE_NAME} CMake package"
)
- install(FILES "${module_}"
+ install(FILES "${module}"
DESTINATION "${ARROW_CMAKE_DIR}/${ARROW_CMAKE_PACKAGE_NAME}")
endif()
- unset(module_)
-endmacro()
+endfunction()
+
+# Find modules are needed by the consumer in case of a static build, or if the
+# linkage is PUBLIC or INTERFACE.
+function(provide_find_module PACKAGE_NAME ARROW_CMAKE_PACKAGE_NAME)
+ provide_cmake_module("Find${PACKAGE_NAME}" ${ARROW_CMAKE_PACKAGE_NAME})
+endfunction()
macro(resolve_dependency DEPENDENCY_NAME)
set(options)
@@ -4423,31 +4434,31 @@ macro(build_orc)
set(ORC_VENDORED 1)
- add_library(orc::liborc STATIC IMPORTED)
- set_target_properties(orc::liborc PROPERTIES IMPORTED_LOCATION "${ORC_STATIC_LIB}")
- target_include_directories(orc::liborc BEFORE INTERFACE "${ORC_INCLUDE_DIR}")
- set(ORC_LINK_LIBRARIES LZ4::lz4 ZLIB::ZLIB ${ARROW_ZSTD_LIBZSTD} ${Snappy_TARGET})
+ add_library(orc::orc STATIC IMPORTED)
+ set_target_properties(orc::orc PROPERTIES IMPORTED_LOCATION "${ORC_STATIC_LIB}")
+ target_include_directories(orc::orc BEFORE INTERFACE "${ORC_INCLUDE_DIR}")
+ target_link_libraries(orc::orc INTERFACE LZ4::lz4 ZLIB::ZLIB ${ARROW_ZSTD_LIBZSTD}
+ ${Snappy_TARGET})
# Protobuf generated files may use ABSL_DCHECK*() and
# absl::log_internal_check_op is needed for them.
if(TARGET absl::log_internal_check_op)
- list(APPEND ORC_LINK_LIBRARIES absl::log_internal_check_op)
+ target_link_libraries(orc::orc INTERFACE absl::log_internal_check_op)
endif()
if(NOT MSVC)
if(NOT APPLE AND ARROW_ENABLE_THREADING)
- list(APPEND ORC_LINK_LIBRARIES Threads::Threads)
+ target_link_libraries(orc::orc INTERFACE Threads::Threads)
endif()
- list(APPEND ORC_LINK_LIBRARIES ${CMAKE_DL_LIBS})
+ target_link_libraries(orc::orc INTERFACE ${CMAKE_DL_LIBS})
endif()
- target_link_libraries(orc::liborc INTERFACE ${ORC_LINK_LIBRARIES})
add_dependencies(toolchain orc_ep)
- add_dependencies(orc::liborc orc_ep)
+ add_dependencies(orc::orc orc_ep)
- list(APPEND ARROW_BUNDLED_STATIC_LIBS orc::liborc)
+ list(APPEND ARROW_BUNDLED_STATIC_LIBS orc::orc)
endmacro()
if(ARROW_ORC)
- resolve_dependency(ORC)
+ resolve_dependency(orc HAVE_ALT TRUE)
message(STATUS "Found ORC static library: ${ORC_STATIC_LIB}")
message(STATUS "Found ORC headers: ${ORC_INCLUDE_DIR}")
endif()
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 9a6117011535e..24e8eefad1523 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -223,6 +223,7 @@ set(ARROW_SRCS
util/debug.cc
util/decimal.cc
util/delimiting.cc
+ util/float16.cc
util/formatting.cc
util/future.cc
util/hashing.cc
@@ -502,8 +503,8 @@ if(ARROW_FILESYSTEM)
filesystem/util_internal.cc)
if(ARROW_AZURE)
- list(APPEND ARROW_SRCS filesystem/azurefs.cc)
- set_source_files_properties(filesystem/azurefs.cc
+ list(APPEND ARROW_SRCS filesystem/azurefs.cc filesystem/azurefs_internal.cc)
+ set_source_files_properties(filesystem/azurefs.cc filesystem/azurefs_internal.cc
PROPERTIES SKIP_PRECOMPILE_HEADERS ON
SKIP_UNITY_BUILD_INCLUSION ON)
endif()
diff --git a/cpp/src/arrow/acero/CMakeLists.txt b/cpp/src/arrow/acero/CMakeLists.txt
index 44fbb26f0814d..b77d52a23eedb 100644
--- a/cpp/src/arrow/acero/CMakeLists.txt
+++ b/cpp/src/arrow/acero/CMakeLists.txt
@@ -49,9 +49,11 @@ set(ARROW_ACERO_SRCS
project_node.cc
query_context.cc
sink_node.cc
+ sorted_merge_node.cc
source_node.cc
swiss_join.cc
task_util.cc
+ time_series_util.cc
tpch_node.cc
union_node.cc
util.cc)
@@ -123,8 +125,7 @@ if(ARROW_TESTING)
add_library(arrow_acero_testing OBJECT test_util_internal.cc)
# Even though this is still just an object library we still need to "link" our
# dependencies so that include paths are configured correctly
- target_link_libraries(arrow_acero_testing ${ARROW_ACERO_TEST_LINK_LIBS})
- target_link_libraries(arrow_acero_testing ${ARROW_GTEST_GTEST})
+ target_link_libraries(arrow_acero_testing PRIVATE ${ARROW_ACERO_TEST_LINK_LIBS})
list(APPEND ARROW_ACERO_TEST_LINK_LIBS arrow_acero_testing)
endif()
@@ -174,11 +175,13 @@ add_arrow_acero_test(hash_join_node_test SOURCES hash_join_node_test.cc
add_arrow_acero_test(pivot_longer_node_test SOURCES pivot_longer_node_test.cc
test_nodes.cc)
-# asof_join_node uses std::thread internally
+# asof_join_node and sorted_merge_node use std::thread internally
# and doesn't use ThreadPool so it will
# be broken if threading is turned off
if(ARROW_ENABLE_THREADING)
add_arrow_acero_test(asof_join_node_test SOURCES asof_join_node_test.cc test_nodes.cc)
+ add_arrow_acero_test(sorted_merge_node_test SOURCES sorted_merge_node_test.cc
+ test_nodes.cc)
endif()
add_arrow_acero_test(tpch_node_test SOURCES tpch_node_test.cc)
diff --git a/cpp/src/arrow/acero/asof_join_node.cc b/cpp/src/arrow/acero/asof_join_node.cc
index d19d2db299cba..4a3b6b199c4c0 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -16,6 +16,8 @@
// under the License.
#include "arrow/acero/asof_join_node.h"
+#include "arrow/acero/backpressure_handler.h"
+#include "arrow/acero/concurrent_queue_internal.h"
#include
#include
@@ -30,6 +32,7 @@
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
+#include "arrow/acero/unmaterialized_table.h"
#ifndef NDEBUG
#include "arrow/acero/options_internal.h"
#endif
@@ -41,6 +44,7 @@
#ifndef NDEBUG
#include "arrow/compute/function_internal.h"
#endif
+#include "arrow/acero/time_series_util.h"
#include "arrow/compute/key_hash.h"
#include "arrow/compute/light_array.h"
#include "arrow/record_batch.h"
@@ -122,92 +126,12 @@ struct TolType {
typedef uint64_t row_index_t;
typedef int col_index_t;
-// normalize the value to 64-bits while preserving ordering of values
-template ::value, bool> = true>
-static inline uint64_t time_value(T t) {
- uint64_t bias = std::is_signed::value ? (uint64_t)1 << (8 * sizeof(T) - 1) : 0;
- return t < 0 ? static_cast(t + bias) : static_cast(t);
-}
-
// indicates normalization of a key value
template ::value, bool> = true>
static inline uint64_t key_value(T t) {
return static_cast(t);
}
-/**
- * Simple implementation for an unbound concurrent queue
- */
-template
-class ConcurrentQueue {
- public:
- T Pop() {
- std::unique_lock lock(mutex_);
- cond_.wait(lock, [&] { return !queue_.empty(); });
- return PopUnlocked();
- }
-
- T PopUnlocked() {
- auto item = queue_.front();
- queue_.pop();
- return item;
- }
-
- void Push(const T& item) {
- std::unique_lock lock(mutex_);
- return PushUnlocked(item);
- }
-
- void PushUnlocked(const T& item) {
- queue_.push(item);
- cond_.notify_one();
- }
-
- void Clear() {
- std::unique_lock lock(mutex_);
- ClearUnlocked();
- }
-
- void ClearUnlocked() { queue_ = std::queue(); }
-
- std::optional TryPop() {
- std::unique_lock lock(mutex_);
- return TryPopUnlocked();
- }
-
- std::optional TryPopUnlocked() {
- // Try to pop the oldest value from the queue (or return nullopt if none)
- if (queue_.empty()) {
- return std::nullopt;
- } else {
- auto item = queue_.front();
- queue_.pop();
- return item;
- }
- }
-
- bool Empty() const {
- std::unique_lock lock(mutex_);
- return queue_.empty();
- }
-
- // Un-synchronized access to front
- // For this to be "safe":
- // 1) the caller logically guarantees that queue is not empty
- // 2) pop/try_pop cannot be called concurrently with this
- const T& UnsyncFront() const { return queue_.front(); }
-
- size_t UnsyncSize() const { return queue_.size(); }
-
- protected:
- std::mutex& GetMutex() { return mutex_; }
-
- private:
- std::queue queue_;
- mutable std::mutex mutex_;
- std::condition_variable cond_;
-};
-
class AsofJoinNode;
#ifndef NDEBUG
@@ -547,104 +471,6 @@ class BackpressureController : public BackpressureControl {
std::atomic& backpressure_counter_;
};
-class BackpressureHandler {
- private:
- BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
- std::unique_ptr backpressure_control)
- : input_(input),
- low_threshold_(low_threshold),
- high_threshold_(high_threshold),
- backpressure_control_(std::move(backpressure_control)) {}
-
- public:
- static Result Make(
- ExecNode* input, size_t low_threshold, size_t high_threshold,
- std::unique_ptr backpressure_control) {
- if (low_threshold >= high_threshold) {
- return Status::Invalid("low threshold (", low_threshold,
- ") must be less than high threshold (", high_threshold, ")");
- }
- if (backpressure_control == NULLPTR) {
- return Status::Invalid("null backpressure control parameter");
- }
- BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
- std::move(backpressure_control));
- return std::move(backpressure_handler);
- }
-
- void Handle(size_t start_level, size_t end_level) {
- if (start_level < high_threshold_ && end_level >= high_threshold_) {
- backpressure_control_->Pause();
- } else if (start_level > low_threshold_ && end_level <= low_threshold_) {
- backpressure_control_->Resume();
- }
- }
-
- Status ForceShutdown() {
- // It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
- // Since acero's executor won't terminate if any one node is paused, we need to
- // force resume the node before stopping production.
- backpressure_control_->Resume();
- return input_->StopProducing();
- }
-
- private:
- ExecNode* input_;
- size_t low_threshold_;
- size_t high_threshold_;
- std::unique_ptr backpressure_control_;
-};
-
-template
-class BackpressureConcurrentQueue : public ConcurrentQueue {
- private:
- struct DoHandle {
- explicit DoHandle(BackpressureConcurrentQueue& queue)
- : queue_(queue), start_size_(queue_.UnsyncSize()) {}
-
- ~DoHandle() {
- size_t end_size = queue_.UnsyncSize();
- queue_.handler_.Handle(start_size_, end_size);
- }
-
- BackpressureConcurrentQueue& queue_;
- size_t start_size_;
- };
-
- public:
- explicit BackpressureConcurrentQueue(BackpressureHandler handler)
- : handler_(std::move(handler)) {}
-
- T Pop() {
- std::unique_lock lock(ConcurrentQueue::GetMutex());
- DoHandle do_handle(*this);
- return ConcurrentQueue::PopUnlocked();
- }
-
- void Push(const T& item) {
- std::unique_lock lock(ConcurrentQueue::GetMutex());
- DoHandle do_handle(*this);
- ConcurrentQueue::PushUnlocked(item);
- }
-
- void Clear() {
- std::unique_lock lock(ConcurrentQueue::GetMutex());
- DoHandle do_handle(*this);
- ConcurrentQueue::ClearUnlocked();
- }
-
- std::optional TryPop() {
- std::unique_lock lock(ConcurrentQueue::GetMutex());
- DoHandle do_handle(*this);
- return ConcurrentQueue::TryPopUnlocked();
- }
-
- Status ForceShutdown() { return handler_.ForceShutdown(); }
-
- private:
- BackpressureHandler handler_;
-};
-
class InputState {
// InputState correponds to an input
// Input record batches are queued up in InputState until processed and
@@ -783,29 +609,8 @@ class InputState {
}
inline OnType GetLatestTime() const {
- return GetTime(GetLatestBatch().get(), latest_ref_row_);
- }
-
- inline ByType GetTime(const RecordBatch* batch, row_index_t row) const {
- auto data = batch->column_data(time_col_index_);
- switch (time_type_id_) {
- LATEST_VAL_CASE(INT8, time_value)
- LATEST_VAL_CASE(INT16, time_value)
- LATEST_VAL_CASE(INT32, time_value)
- LATEST_VAL_CASE(INT64, time_value)
- LATEST_VAL_CASE(UINT8, time_value)
- LATEST_VAL_CASE(UINT16, time_value)
- LATEST_VAL_CASE(UINT32, time_value)
- LATEST_VAL_CASE(UINT64, time_value)
- LATEST_VAL_CASE(DATE32, time_value)
- LATEST_VAL_CASE(DATE64, time_value)
- LATEST_VAL_CASE(TIME32, time_value)
- LATEST_VAL_CASE(TIME64, time_value)
- LATEST_VAL_CASE(TIMESTAMP, time_value)
- default:
- DCHECK(false);
- return 0; // cannot happen
- }
+ return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+ latest_ref_row_);
}
#undef LATEST_VAL_CASE
@@ -832,7 +637,9 @@ class InputState {
have_active_batch &= !queue_.TryPop();
if (have_active_batch) {
DCHECK_GT(queue_.UnsyncFront()->num_rows(), 0); // empty batches disallowed
- memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), 0)); // time changed
+ memo_.UpdateTime(GetTime(queue_.UnsyncFront().get(), time_type_id_,
+ time_col_index_,
+ 0)); // time changed
}
}
}
@@ -988,35 +795,25 @@ class InputState {
std::vector> src_to_dst_;
};
+/// Wrapper around UnmaterializedCompositeTable that knows how to emplace
+/// the join row-by-row
template
-struct CompositeReferenceRow {
- struct Entry {
- arrow::RecordBatch* batch; // can be NULL if there's no value
- row_index_t row;
- };
- Entry refs[MAX_TABLES];
-};
+class CompositeTableBuilder {
+ using SliceBuilder = UnmaterializedSliceBuilder;
+ using CompositeTable = UnmaterializedCompositeTable;
-// A table of composite reference rows. Rows maintain pointers to the
-// constituent record batches, but the overall table retains shared_ptr
-// references to ensure memory remains resident while the table is live.
-//
-// The main reason for this is that, especially for wide tables, joins
-// are effectively row-oriented, rather than column-oriented. Separating
-// the join part from the columnar materialization part simplifies the
-// logic around data types and increases efficiency.
-//
-// We don't put the shared_ptr's into the rows for efficiency reasons.
-template
-class CompositeReferenceTable {
public:
- NDEBUG_EXPLICIT CompositeReferenceTable(DEBUG_ADD(size_t n_tables, AsofJoinNode* node))
- : DEBUG_ADD(n_tables_(n_tables), node_(node)) {
+ NDEBUG_EXPLICIT CompositeTableBuilder(
+ const std::vector>& inputs,
+ const std::shared_ptr& schema, arrow::MemoryPool* pool,
+ DEBUG_ADD(size_t n_tables, AsofJoinNode* node))
+ : unmaterialized_table(InitUnmaterializedTable(schema, inputs, pool)),
+ DEBUG_ADD(n_tables_(n_tables), node_(node)) {
DCHECK_GE(n_tables_, 1);
DCHECK_LE(n_tables_, MAX_TABLES);
}
- size_t n_rows() const { return rows_.size(); }
+ size_t n_rows() const { return unmaterialized_table.Size(); }
// Adds the latest row from the input state as a new composite reference row
// - LHS must have a valid key,timestep,and latest rows
@@ -1037,14 +834,16 @@ class CompositeReferenceTable {
// On the first row of the batch, we resize the destination.
// The destination size is dictated by the size of the LHS batch.
row_index_t new_batch_size = lhs_latest_batch->num_rows();
- row_index_t new_capacity = rows_.size() + new_batch_size;
- if (rows_.capacity() < new_capacity) rows_.reserve(new_capacity);
+ row_index_t new_capacity = unmaterialized_table.Size() + new_batch_size;
+ if (unmaterialized_table.capacity() < new_capacity) {
+ unmaterialized_table.reserve(new_capacity);
+ }
}
- rows_.resize(rows_.size() + 1);
- auto& row = rows_.back();
- row.refs[0].batch = lhs_latest_batch.get();
- row.refs[0].row = lhs_latest_row;
- AddRecordBatchRef(lhs_latest_batch);
+
+ SliceBuilder new_row{&unmaterialized_table};
+
+ // Each item represents a portion of the columns of the output table
+ new_row.AddEntry(lhs_latest_batch, lhs_latest_row, lhs_latest_row + 1);
DEBUG_SYNC(node_, "Emplace: key=", key, " lhs_latest_row=", lhs_latest_row,
" lhs_latest_time=", lhs_latest_time, DEBUG_MANIP(std::endl));
@@ -1068,100 +867,25 @@ class CompositeReferenceTable {
if (tolerance.Accepts(lhs_latest_time, (*opt_entry)->time)) {
// Have a valid entry
const MemoStore::Entry* entry = *opt_entry;
- row.refs[i].batch = entry->batch.get();
- row.refs[i].row = entry->row;
- AddRecordBatchRef(entry->batch);
+ new_row.AddEntry(entry->batch, entry->row, entry->row + 1);
continue;
}
}
- row.refs[i].batch = NULL;
- row.refs[i].row = 0;
+ new_row.AddEntry(nullptr, 0, 1);
}
+ new_row.Finalize();
}
// Materializes the current reference table into a target record batch
- Result> Materialize(
- MemoryPool* memory_pool, const std::shared_ptr& output_schema,
- const std::vector>& state) {
- DCHECK_EQ(state.size(), n_tables_);
-
- // Don't build empty batches
- size_t n_rows = rows_.size();
- if (!n_rows) return NULLPTR;
-
- // Build the arrays column-by-column from the rows
- std::vector> arrays(output_schema->num_fields());
- for (size_t i_table = 0; i_table < n_tables_; ++i_table) {
- int n_src_cols = state.at(i_table)->get_schema()->num_fields();
- {
- for (col_index_t i_src_col = 0; i_src_col < n_src_cols; ++i_src_col) {
- std::optional i_dst_col_opt =
- state[i_table]->MapSrcToDst(i_src_col);
- if (!i_dst_col_opt) continue;
- col_index_t i_dst_col = *i_dst_col_opt;
- const auto& src_field = state[i_table]->get_schema()->field(i_src_col);
- const auto& dst_field = output_schema->field(i_dst_col);
- DCHECK(src_field->type()->Equals(dst_field->type()));
- DCHECK_EQ(src_field->name(), dst_field->name());
- const auto& field_type = src_field->type();
-
-#define ASOFJOIN_MATERIALIZE_CASE(id) \
- case Type::id: { \
- using T = typename TypeIdTraits::Type; \
- ARROW_ASSIGN_OR_RAISE( \
- arrays.at(i_dst_col), \
- MaterializeColumn(memory_pool, field_type, i_table, i_src_col)); \
- break; \
- }
-
- switch (field_type->id()) {
- ASOFJOIN_MATERIALIZE_CASE(BOOL)
- ASOFJOIN_MATERIALIZE_CASE(INT8)
- ASOFJOIN_MATERIALIZE_CASE(INT16)
- ASOFJOIN_MATERIALIZE_CASE(INT32)
- ASOFJOIN_MATERIALIZE_CASE(INT64)
- ASOFJOIN_MATERIALIZE_CASE(UINT8)
- ASOFJOIN_MATERIALIZE_CASE(UINT16)
- ASOFJOIN_MATERIALIZE_CASE(UINT32)
- ASOFJOIN_MATERIALIZE_CASE(UINT64)
- ASOFJOIN_MATERIALIZE_CASE(FLOAT)
- ASOFJOIN_MATERIALIZE_CASE(DOUBLE)
- ASOFJOIN_MATERIALIZE_CASE(DATE32)
- ASOFJOIN_MATERIALIZE_CASE(DATE64)
- ASOFJOIN_MATERIALIZE_CASE(TIME32)
- ASOFJOIN_MATERIALIZE_CASE(TIME64)
- ASOFJOIN_MATERIALIZE_CASE(TIMESTAMP)
- ASOFJOIN_MATERIALIZE_CASE(STRING)
- ASOFJOIN_MATERIALIZE_CASE(LARGE_STRING)
- ASOFJOIN_MATERIALIZE_CASE(BINARY)
- ASOFJOIN_MATERIALIZE_CASE(LARGE_BINARY)
- default:
- return Status::Invalid("Unsupported data type ",
- src_field->type()->ToString(), " for field ",
- src_field->name());
- }
-
-#undef ASOFJOIN_MATERIALIZE_CASE
- }
- }
- }
-
- // Build the result
- DCHECK_LE(n_rows, (uint64_t)std::numeric_limits::max());
- std::shared_ptr r =
- arrow::RecordBatch::Make(output_schema, (int64_t)n_rows, arrays);
- return r;
+ Result>> Materialize() {
+ return unmaterialized_table.Materialize();
}
// Returns true if there are no rows
- bool empty() const { return rows_.empty(); }
+ bool empty() const { return unmaterialized_table.Empty(); }
private:
- // Contains shared_ptr refs for all RecordBatches referred to by the contents of rows_
- std::unordered_map> _ptr2ref;
-
- // Row table references
- std::vector> rows_;
+ CompositeTable unmaterialized_table;
// Total number of tables in the composite table
size_t n_tables_;
@@ -1171,70 +895,20 @@ class CompositeReferenceTable {
AsofJoinNode* node_;
#endif
- // Adds a RecordBatch ref to the mapping, if needed
- void AddRecordBatchRef(const std::shared_ptr& ref) {
- if (!_ptr2ref.count((uintptr_t)ref.get())) _ptr2ref[(uintptr_t)ref.get()] = ref;
- }
-
- template ::BuilderType>
- enable_if_boolean static BuilderAppend(
- Builder& builder, const std::shared_ptr& source, row_index_t row) {
- if (source->IsNull(row)) {
- builder.UnsafeAppendNull();
- return Status::OK();
- }
- builder.UnsafeAppend(bit_util::GetBit(source->template GetValues(1), row));
- return Status::OK();
- }
-
- template ::BuilderType>
- enable_if_t::value && !is_boolean_type::value,
- Status> static BuilderAppend(Builder& builder,
- const std::shared_ptr& source,
- row_index_t row) {
- if (source->IsNull(row)) {
- builder.UnsafeAppendNull();
- return Status::OK();
- }
- using CType = typename TypeTraits::CType;
- builder.UnsafeAppend(source->template GetValues(1)[row]);
- return Status::OK();
- }
-
- template ::BuilderType>
- enable_if_base_binary static BuilderAppend(
- Builder& builder, const std::shared_ptr& source, row_index_t row) {
- if (source->IsNull(row)) {
- return builder.AppendNull();
- }
- using offset_type = typename Type::offset_type;
- const uint8_t* data = source->buffers[2]->data();
- const offset_type* offsets = source->GetValues(1);
- const offset_type offset0 = offsets[row];
- const offset_type offset1 = offsets[row + 1];
- return builder.Append(data + offset0, offset1 - offset0);
- }
-
- template ::BuilderType>
- Result> MaterializeColumn(MemoryPool* memory_pool,
- const std::shared_ptr& type,
- size_t i_table, col_index_t i_col) {
- ARROW_ASSIGN_OR_RAISE(auto a_builder, MakeBuilder(type, memory_pool));
- Builder& builder = *checked_cast(a_builder.get());
- ARROW_RETURN_NOT_OK(builder.Reserve(rows_.size()));
- for (row_index_t i_row = 0; i_row < rows_.size(); ++i_row) {
- const auto& ref = rows_[i_row].refs[i_table];
- if (ref.batch) {
- Status st =
- BuilderAppend(builder, ref.batch->column_data(i_col), ref.row);
- ARROW_RETURN_NOT_OK(st);
- } else {
- builder.UnsafeAppendNull();
+ static CompositeTable InitUnmaterializedTable(
+ const std::shared_ptr& schema,
+ const std::vector>& inputs, arrow::MemoryPool* pool) {
+ std::unordered_map> dst_to_src;
+ for (size_t i = 0; i < inputs.size(); i++) {
+ auto& input = inputs[i];
+ for (int src = 0; src < input->get_schema()->num_fields(); src++) {
+ auto dst = input->MapSrcToDst(src);
+ if (dst.has_value()) {
+ dst_to_src[dst.value()] = std::make_pair(static_cast(i), src);
+ }
}
}
- std::shared_ptr result;
- ARROW_RETURN_NOT_OK(builder.Finish(&result));
- return result;
+ return CompositeTable{schema, inputs.size(), dst_to_src, pool};
}
};
@@ -1279,7 +953,9 @@ class AsofJoinNode : public ExecNode {
auto& lhs = *state_.at(0);
// Construct new target table if needed
- CompositeReferenceTable dst(DEBUG_ADD(state_.size(), this));
+ CompositeTableBuilder dst(state_, output_schema_,
+ plan()->query_context()->memory_pool(),
+ DEBUG_ADD(state_.size(), this));
// Generate rows into the dst table until we either run out of data or hit the row
// limit, or run out of input
@@ -1318,8 +994,8 @@ class AsofJoinNode : public ExecNode {
if (dst.empty()) {
return NULLPTR;
} else {
- return dst.Materialize(plan()->query_context()->memory_pool(), output_schema(),
- state_);
+ ARROW_ASSIGN_OR_RAISE(auto out, dst.Materialize());
+ return out.has_value() ? out.value() : NULLPTR;
}
}
diff --git a/cpp/src/arrow/acero/backpressure_handler.h b/cpp/src/arrow/acero/backpressure_handler.h
new file mode 100644
index 0000000000000..178272315d7fb
--- /dev/null
+++ b/cpp/src/arrow/acero/backpressure_handler.h
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+
+#include
+
+namespace arrow::acero {
+
+class BackpressureHandler {
+ private:
+ BackpressureHandler(ExecNode* input, size_t low_threshold, size_t high_threshold,
+ std::unique_ptr backpressure_control)
+ : input_(input),
+ low_threshold_(low_threshold),
+ high_threshold_(high_threshold),
+ backpressure_control_(std::move(backpressure_control)) {}
+
+ public:
+ static Result Make(
+ ExecNode* input, size_t low_threshold, size_t high_threshold,
+ std::unique_ptr backpressure_control) {
+ if (low_threshold >= high_threshold) {
+ return Status::Invalid("low threshold (", low_threshold,
+ ") must be less than high threshold (", high_threshold, ")");
+ }
+ if (backpressure_control == NULLPTR) {
+ return Status::Invalid("null backpressure control parameter");
+ }
+ BackpressureHandler backpressure_handler(input, low_threshold, high_threshold,
+ std::move(backpressure_control));
+ return std::move(backpressure_handler);
+ }
+
+ void Handle(size_t start_level, size_t end_level) {
+ if (start_level < high_threshold_ && end_level >= high_threshold_) {
+ backpressure_control_->Pause();
+ } else if (start_level > low_threshold_ && end_level <= low_threshold_) {
+ backpressure_control_->Resume();
+ }
+ }
+
+ Status ForceShutdown() {
+ // It may be unintuitive to call Resume() here, but this is to avoid a deadlock.
+ // Since acero's executor won't terminate if any one node is paused, we need to
+ // force resume the node before stopping production.
+ backpressure_control_->Resume();
+ return input_->StopProducing();
+ }
+
+ private:
+ ExecNode* input_;
+ size_t low_threshold_;
+ size_t high_threshold_;
+ std::unique_ptr backpressure_control_;
+};
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/concurrent_queue_internal.h b/cpp/src/arrow/acero/concurrent_queue_internal.h
new file mode 100644
index 0000000000000..f530394187299
--- /dev/null
+++ b/cpp/src/arrow/acero/concurrent_queue_internal.h
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include
+#include
+#include
+#include "arrow/acero/backpressure_handler.h"
+
+namespace arrow::acero {
+
+/**
+ * Simple implementation for a thread safe blocking unbound multi-consumer /
+ * multi-producer concurrent queue
+ */
+template
+class ConcurrentQueue {
+ public:
+ // Pops the last item from the queue. Must be called on a non-empty queue
+ //
+ T Pop() {
+ std::unique_lock lock(mutex_);
+ cond_.wait(lock, [&] { return !queue_.empty(); });
+ return PopUnlocked();
+ }
+
+ // Pops the last item from the queue, or returns a nullopt if empty
+ //
+ std::optional TryPop() {
+ std::unique_lock lock(mutex_);
+ return TryPopUnlocked();
+ }
+
+ // Pushes an item to the queue
+ //
+ void Push(const T& item) {
+ std::unique_lock lock(mutex_);
+ return PushUnlocked(item);
+ }
+
+ // Clears the queue
+ //
+ void Clear() {
+ std::unique_lock lock(mutex_);
+ ClearUnlocked();
+ }
+
+ bool Empty() const {
+ std::unique_lock lock(mutex_);
+ return queue_.empty();
+ }
+
+ // Un-synchronized access to front
+ // For this to be "safe":
+ // 1) the caller logically guarantees that queue is not empty
+ // 2) pop/try_pop cannot be called concurrently with this
+ const T& UnsyncFront() const { return queue_.front(); }
+
+ size_t UnsyncSize() const { return queue_.size(); }
+
+ protected:
+ std::mutex& GetMutex() { return mutex_; }
+
+ T PopUnlocked() {
+ auto item = queue_.front();
+ queue_.pop();
+ return item;
+ }
+
+ void PushUnlocked(const T& item) {
+ queue_.push(item);
+ cond_.notify_one();
+ }
+
+ void ClearUnlocked() { queue_ = std::queue(); }
+
+ std::optional TryPopUnlocked() {
+ // Try to pop the oldest value from the queue (or return nullopt if none)
+ if (queue_.empty()) {
+ return std::nullopt;
+ } else {
+ auto item = queue_.front();
+ queue_.pop();
+ return item;
+ }
+ }
+ std::queue queue_;
+
+ private:
+ mutable std::mutex mutex_;
+ std::condition_variable cond_;
+};
+
+template
+class BackpressureConcurrentQueue : public ConcurrentQueue {
+ private:
+ struct DoHandle {
+ explicit DoHandle(BackpressureConcurrentQueue& queue)
+ : queue_(queue), start_size_(queue_.UnsyncSize()) {}
+
+ ~DoHandle() {
+ // unsynced access is safe since DoHandle is internally only used when the
+ // lock is held
+ size_t end_size = queue_.UnsyncSize();
+ queue_.handler_.Handle(start_size_, end_size);
+ }
+
+ BackpressureConcurrentQueue& queue_;
+ size_t start_size_;
+ };
+
+ public:
+ explicit BackpressureConcurrentQueue(BackpressureHandler handler)
+ : handler_(std::move(handler)) {}
+
+ T Pop() {
+ std::unique_lock lock(ConcurrentQueue::GetMutex());
+ DoHandle do_handle(*this);
+ return ConcurrentQueue::PopUnlocked();
+ }
+
+ void Push(const T& item) {
+ std::unique_lock lock(ConcurrentQueue::GetMutex());
+ DoHandle do_handle(*this);
+ ConcurrentQueue::PushUnlocked(item);
+ }
+
+ void Clear() {
+ std::unique_lock lock(ConcurrentQueue::GetMutex());
+ DoHandle do_handle(*this);
+ ConcurrentQueue::ClearUnlocked();
+ }
+
+ std::optional TryPop() {
+ std::unique_lock lock(ConcurrentQueue::GetMutex());
+ DoHandle do_handle(*this);
+ return ConcurrentQueue::TryPopUnlocked();
+ }
+
+ Status ForceShutdown() { return handler_.ForceShutdown(); }
+
+ private:
+ BackpressureHandler handler_;
+};
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/exec_plan.cc b/cpp/src/arrow/acero/exec_plan.cc
index 541e5fed6206b..97119726d4b17 100644
--- a/cpp/src/arrow/acero/exec_plan.cc
+++ b/cpp/src/arrow/acero/exec_plan.cc
@@ -1114,6 +1114,7 @@ void RegisterAggregateNode(ExecFactoryRegistry*);
void RegisterSinkNode(ExecFactoryRegistry*);
void RegisterHashJoinNode(ExecFactoryRegistry*);
void RegisterAsofJoinNode(ExecFactoryRegistry*);
+void RegisterSortedMergeNode(ExecFactoryRegistry*);
} // namespace internal
@@ -1132,6 +1133,7 @@ ExecFactoryRegistry* default_exec_factory_registry() {
internal::RegisterSinkNode(this);
internal::RegisterHashJoinNode(this);
internal::RegisterAsofJoinNode(this);
+ internal::RegisterSortedMergeNode(this);
}
Result GetFactory(const std::string& factory_name) override {
diff --git a/cpp/src/arrow/acero/sorted_merge_node.cc b/cpp/src/arrow/acero/sorted_merge_node.cc
new file mode 100644
index 0000000000000..f3b934eda186b
--- /dev/null
+++ b/cpp/src/arrow/acero/sorted_merge_node.cc
@@ -0,0 +1,609 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include "arrow/acero/concurrent_queue_internal.h"
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/query_context.h"
+#include "arrow/acero/time_series_util.h"
+#include "arrow/acero/unmaterialized_table.h"
+#include "arrow/acero/util.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/result.h"
+#include "arrow/type_fwd.h"
+#include "arrow/util/logging.h"
+
+namespace {
+template
+struct Defer {
+ Callable callable;
+ explicit Defer(Callable callable_) : callable(std::move(callable_)) {}
+ ~Defer() noexcept { callable(); }
+};
+
+std::vector GetInputLabels(
+ const arrow::acero::ExecNode::NodeVector& inputs) {
+ std::vector labels(inputs.size());
+ for (size_t i = 0; i < inputs.size(); i++) {
+ labels[i] = "input_" + std::to_string(i) + "_label";
+ }
+ return labels;
+}
+
+template
+inline typename T::const_iterator std_find(const T& container, const V& val) {
+ return std::find(container.begin(), container.end(), val);
+}
+
+template
+inline bool std_has(const T& container, const V& val) {
+ return container.end() != std_find(container, val);
+}
+
+} // namespace
+
+namespace arrow::acero {
+
+namespace {
+
+// Each slice is associated with a single input source, so we only need 1 record
+// batch per slice
+using SingleRecordBatchSliceBuilder = arrow::acero::UnmaterializedSliceBuilder<1>;
+using SingleRecordBatchCompositeTable = arrow::acero::UnmaterializedCompositeTable<1>;
+
+using row_index_t = uint64_t;
+using time_unit_t = uint64_t;
+using col_index_t = int;
+
+constexpr bool kNewTask = true;
+constexpr bool kPoisonPill = false;
+
+class BackpressureController : public BackpressureControl {
+ public:
+ BackpressureController(ExecNode* node, ExecNode* output,
+ std::atomic& backpressure_counter)
+ : node_(node), output_(output), backpressure_counter_(backpressure_counter) {}
+
+ void Pause() override { node_->PauseProducing(output_, ++backpressure_counter_); }
+ void Resume() override { node_->ResumeProducing(output_, ++backpressure_counter_); }
+
+ private:
+ ExecNode* node_;
+ ExecNode* output_;
+ std::atomic& backpressure_counter_;
+};
+
+/// InputState correponds to an input. Input record batches are queued up in InputState
+/// until processed and turned into output record batches.
+class InputState {
+ public:
+ InputState(size_t index, BackpressureHandler handler,
+ const std::shared_ptr& schema, const int time_col_index)
+ : index_(index),
+ queue_(std::move(handler)),
+ schema_(schema),
+ time_col_index_(time_col_index),
+ time_type_id_(schema_->fields()[time_col_index_]->type()->id()) {}
+
+ template
+ static arrow::Result Make(size_t index, arrow::acero::ExecNode* input,
+ arrow::acero::ExecNode* output,
+ std::atomic& backpressure_counter,
+ const std::shared_ptr& schema,
+ const col_index_t time_col_index) {
+ constexpr size_t low_threshold = 4, high_threshold = 8;
+ std::unique_ptr backpressure_control =
+ std::make_unique(input, output, backpressure_counter);
+ ARROW_ASSIGN_OR_RAISE(auto handler,
+ BackpressureHandler::Make(input, low_threshold, high_threshold,
+ std::move(backpressure_control)));
+ return PtrType(new InputState(index, std::move(handler), schema, time_col_index));
+ }
+
+ bool IsTimeColumn(col_index_t i) const {
+ DCHECK_LT(i, schema_->num_fields());
+ return (i == time_col_index_);
+ }
+
+ // Gets the latest row index, assuming the queue isn't empty
+ row_index_t GetLatestRow() const { return latest_ref_row_; }
+
+ bool Empty() const {
+ // cannot be empty if ref row is >0 -- can avoid slow queue lock
+ // below
+ if (latest_ref_row_ > 0) {
+ return false;
+ }
+ return queue_.Empty();
+ }
+
+ size_t index() const { return index_; }
+
+ int total_batches() const { return total_batches_; }
+
+ // Gets latest batch (precondition: must not be empty)
+ const std::shared_ptr& GetLatestBatch() const {
+ return queue_.UnsyncFront();
+ }
+
+#define LATEST_VAL_CASE(id, val) \
+ case arrow::Type::id: { \
+ using T = typename arrow::TypeIdTraits::Type; \
+ using CType = typename arrow::TypeTraits::CType; \
+ return val(data->GetValues(1)[row]); \
+ }
+
+ inline time_unit_t GetLatestTime() const {
+ return GetTime(GetLatestBatch().get(), time_type_id_, time_col_index_,
+ latest_ref_row_);
+ }
+
+#undef LATEST_VAL_CASE
+
+ bool Finished() const { return batches_processed_ == total_batches_; }
+
+ void Advance(SingleRecordBatchSliceBuilder& builder) {
+ // Advance the row until a new time is encountered or the record batch
+ // ends. This will return a range of {-1, -1} and a nullptr if there is
+ // no input
+ bool active =
+ (latest_ref_row_ > 0 /*short circuit the lock on the queue*/) || !queue_.Empty();
+
+ if (!active) {
+ return;
+ }
+
+ row_index_t start = latest_ref_row_;
+ row_index_t end = latest_ref_row_;
+ time_unit_t startTime = GetLatestTime();
+ std::shared_ptr batch = queue_.UnsyncFront();
+ auto rows_in_batch = (row_index_t)batch->num_rows();
+
+ while (GetLatestTime() == startTime) {
+ end = ++latest_ref_row_;
+ if (latest_ref_row_ >= rows_in_batch) {
+ // hit the end of the batch, need to get the next batch if
+ // possible.
+ ++batches_processed_;
+ latest_ref_row_ = 0;
+ active &= !queue_.TryPop();
+ if (active) {
+ DCHECK_GT(queue_.UnsyncFront()->num_rows(),
+ 0); // empty batches disallowed, sanity check
+ }
+ break;
+ }
+ }
+ builder.AddEntry(batch, start, end);
+ }
+
+ arrow::Status Push(const std::shared_ptr& rb) {
+ if (rb->num_rows() > 0) {
+ queue_.Push(rb);
+ } else {
+ ++batches_processed_; // don't enqueue empty batches, just record
+ // as processed
+ }
+ return arrow::Status::OK();
+ }
+
+ const std::shared_ptr& get_schema() const { return schema_; }
+
+ void set_total_batches(int n) { total_batches_ = n; }
+
+ private:
+ size_t index_;
+ // Pending record batches. The latest is the front. Batches cannot be empty.
+ BackpressureConcurrentQueue> queue_;
+ // Schema associated with the input
+ std::shared_ptr schema_;
+ // Total number of batches (only int because InputFinished uses int)
+ std::atomic total_batches_{-1};
+ // Number of batches processed so far (only int because InputFinished uses
+ // int)
+ std::atomic batches_processed_{0};
+ // Index of the time col
+ col_index_t time_col_index_;
+ // Type id of the time column
+ arrow::Type::type time_type_id_;
+ // Index of the latest row reference within; if >0 then queue_ cannot be
+ // empty Must be < queue_.front()->num_rows() if queue_ is non-empty
+ row_index_t latest_ref_row_ = 0;
+ // Time of latest row
+ time_unit_t latest_time_ = std::numeric_limits::lowest();
+};
+
+struct InputStateComparator {
+ bool operator()(const std::shared_ptr& lhs,
+ const std::shared_ptr& rhs) const {
+ // True if lhs is ahead of time of rhs
+ if (lhs->Finished()) {
+ return false;
+ }
+ if (rhs->Finished()) {
+ return false;
+ }
+ time_unit_t lFirst = lhs->GetLatestTime();
+ time_unit_t rFirst = rhs->GetLatestTime();
+ return lFirst > rFirst;
+ }
+};
+
+class SortedMergeNode : public ExecNode {
+ static constexpr int64_t kTargetOutputBatchSize = 1024 * 1024;
+
+ public:
+ SortedMergeNode(arrow::acero::ExecPlan* plan,
+ std::vector inputs,
+ std::shared_ptr output_schema,
+ arrow::Ordering new_ordering)
+ : ExecNode(plan, inputs, GetInputLabels(inputs), std::move(output_schema)),
+ ordering_(std::move(new_ordering)),
+ input_counter(inputs_.size()),
+ output_counter(inputs_.size()),
+ process_thread() {
+ SetLabel("sorted_merge");
+ }
+
+ ~SortedMergeNode() override {
+ process_queue.Push(
+ kPoisonPill); // poison pill
+ // We might create a temporary (such as to inspect the output
+ // schema), in which case there isn't anything to join
+ if (process_thread.joinable()) {
+ process_thread.join();
+ }
+ }
+
+ static arrow::Result Make(
+ arrow::acero::ExecPlan* plan, std::vector inputs,
+ const arrow::acero::ExecNodeOptions& options) {
+ RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, static_cast(inputs.size()),
+ "SortedMergeNode"));
+
+ if (inputs.size() < 1) {
+ return Status::Invalid("Constructing a `SortedMergeNode` with < 1 inputs");
+ }
+
+ const auto schema = inputs.at(0)->output_schema();
+ for (const auto& input : inputs) {
+ if (!input->output_schema()->Equals(schema)) {
+ return Status::Invalid(
+ "SortedMergeNode input schemas must all "
+ "match, first schema "
+ "was: ",
+ schema->ToString(), " got schema: ", input->output_schema()->ToString());
+ }
+ }
+
+ const auto& order_options =
+ arrow::internal::checked_cast(options);
+
+ if (order_options.ordering.is_implicit() || order_options.ordering.is_unordered()) {
+ return Status::Invalid("`ordering` must be an explicit non-empty ordering");
+ }
+
+ std::shared_ptr output_schema = inputs[0]->output_schema();
+ return plan->EmplaceNode(
+ plan, std::move(inputs), std::move(output_schema), order_options.ordering);
+ }
+
+ const char* kind_name() const override { return "SortedMergeNode"; }
+
+ const arrow::Ordering& ordering() const override { return ordering_; }
+
+ arrow::Status Init() override {
+ ARROW_CHECK(ordering_.sort_keys().size() == 1) << "Only one sort key supported";
+
+ auto inputs = this->inputs();
+ for (size_t i = 0; i < inputs.size(); i++) {
+ ExecNode* input = inputs[i];
+ const auto& schema = input->output_schema();
+
+ const auto& sort_key = ordering_.sort_keys()[0];
+ if (sort_key.order != arrow::compute::SortOrder::Ascending) {
+ return Status::NotImplemented("Only ascending sort order is supported");
+ }
+
+ const FieldRef& ref = sort_key.target;
+ auto match_res = ref.FindOne(*schema);
+ if (!match_res.ok()) {
+ return Status::Invalid("Bad sort key : ", match_res.status().message());
+ }
+ ARROW_ASSIGN_OR_RAISE(auto match, match_res);
+ ARROW_DCHECK(match.indices().size() == 1);
+
+ ARROW_ASSIGN_OR_RAISE(auto input_state,
+ InputState::Make>(
+ i, input, this, backpressure_counter, schema,
+ std::move(match.indices()[0])));
+ state.push_back(std::move(input_state));
+ }
+ return Status::OK();
+ }
+
+ arrow::Status InputReceived(arrow::acero::ExecNode* input,
+ arrow::ExecBatch batch) override {
+ ARROW_DCHECK(std_has(inputs_, input));
+ const size_t index = std_find(inputs_, input) - inputs_.begin();
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr rb,
+ batch.ToRecordBatch(output_schema_));
+
+ // Push into the queue. Note that we don't need to lock since
+ // InputState's ConcurrentQueue manages locking
+ input_counter[index] += rb->num_rows();
+ ARROW_RETURN_NOT_OK(state[index]->Push(rb));
+ process_queue.Push(kNewTask);
+ return Status::OK();
+ }
+
+ arrow::Status InputFinished(arrow::acero::ExecNode* input, int total_batches) override {
+ ARROW_DCHECK(std_has(inputs_, input));
+ {
+ std::lock_guard guard(gate);
+ ARROW_DCHECK(std_has(inputs_, input));
+ size_t k = std_find(inputs_, input) - inputs_.begin();
+ state.at(k)->set_total_batches(total_batches);
+ }
+ // Trigger a final process call for stragglers
+ process_queue.Push(kNewTask);
+ return Status::OK();
+ }
+
+ arrow::Status StartProducing() override {
+ ARROW_ASSIGN_OR_RAISE(process_task, plan_->query_context()->BeginExternalTask(
+ "SortedMergeNode::ProcessThread"));
+ if (!process_task.is_valid()) {
+ // Plan has already aborted. Do not start process thread
+ return Status::OK();
+ }
+ process_thread = std::thread(&SortedMergeNode::StartPoller, this);
+ return Status::OK();
+ }
+
+ arrow::Status StopProducingImpl() override {
+ process_queue.Clear();
+ process_queue.Push(kPoisonPill);
+ return Status::OK();
+ }
+
+ // handled by the backpressure controller
+ void PauseProducing(arrow::acero::ExecNode* output, int32_t counter) override {}
+ void ResumeProducing(arrow::acero::ExecNode* output, int32_t counter) override {}
+
+ protected:
+ std::string ToStringExtra(int indent) const override {
+ std::stringstream ss;
+ ss << "ordering=" << ordering_.ToString();
+ return ss.str();
+ }
+
+ private:
+ void EndFromProcessThread(arrow::Status st = arrow::Status::OK()) {
+ ARROW_CHECK(!cleanup_started);
+ for (size_t i = 0; i < input_counter.size(); ++i) {
+ ARROW_CHECK(input_counter[i] == output_counter[i])
+ << input_counter[i] << " != " << output_counter[i];
+ }
+
+ ARROW_UNUSED(
+ plan_->query_context()->executor()->Spawn([this, st = std::move(st)]() mutable {
+ Defer cleanup([this, &st]() { process_task.MarkFinished(st); });
+ if (st.ok()) {
+ st = output_->InputFinished(this, batches_produced);
+ }
+ }));
+ }
+
+ bool CheckEnded() {
+ bool all_finished = true;
+ for (const auto& s : state) {
+ all_finished &= s->Finished();
+ }
+ if (all_finished) {
+ EndFromProcessThread();
+ return false;
+ }
+ return true;
+ }
+
+ /// Streams the input states in sorted order until we run out of input
+ arrow::Result> getNextBatch() {
+ DCHECK(!state.empty());
+ for (const auto& s : state) {
+ if (s->Empty() && !s->Finished()) {
+ return nullptr; // not enough data, wait
+ }
+ }
+
+ std::vector> heap = state;
+ // filter out finished states
+ heap.erase(std::remove_if(
+ heap.begin(), heap.end(),
+ [](const std::shared_ptr& s) { return s->Finished(); }),
+ heap.end());
+
+ // If any are Empty(), then return early since we don't have enough data
+ if (std::any_of(heap.begin(), heap.end(),
+ [](const std::shared_ptr& s) { return s->Empty(); })) {
+ return nullptr;
+ }
+
+ // Currently we only support one sort key
+ const auto sort_col = *ordering_.sort_keys().at(0).target.name();
+ const auto comp = InputStateComparator();
+ std::make_heap(heap.begin(), heap.end(), comp);
+
+ // Each slice only has one record batch with the same schema as the output
+ std::unordered_map> output_col_to_src;
+ for (int i = 0; i < output_schema_->num_fields(); i++) {
+ output_col_to_src[i] = std::make_pair(0, i);
+ }
+ SingleRecordBatchCompositeTable output(output_schema(), 1,
+ std::move(output_col_to_src),
+ plan()->query_context()->memory_pool());
+
+ // Generate rows until we run out of data or we exceed the target output
+ // size
+ bool waiting_for_more_data = false;
+ while (!waiting_for_more_data && !heap.empty() &&
+ output.Size() < kTargetOutputBatchSize) {
+ std::pop_heap(heap.begin(), heap.end(), comp);
+
+ auto& next_item = heap.back();
+ time_unit_t latest_time = std::numeric_limits::min();
+ time_unit_t new_time = next_item->GetLatestTime();
+ ARROW_CHECK(new_time >= latest_time)
+ << "Input state " << next_item->index()
+ << " has out of order data. newTime=" << new_time
+ << " latestTime=" << latest_time;
+
+ latest_time = new_time;
+ SingleRecordBatchSliceBuilder builder{&output};
+ next_item->Advance(builder);
+
+ if (builder.Size() > 0) {
+ output_counter[next_item->index()] += builder.Size();
+ builder.Finalize();
+ }
+ if (next_item->Finished()) {
+ heap.pop_back();
+ } else if (next_item->Empty()) {
+ // We've run out of data on one of the inputs
+ waiting_for_more_data = true;
+ continue; // skip the unnecessary make_heap
+ }
+ std::make_heap(heap.begin(), heap.end(), comp);
+ }
+
+ // Emit the batch
+ if (output.Size() == 0) {
+ return nullptr;
+ }
+
+ ARROW_ASSIGN_OR_RAISE(auto maybe_rb, output.Materialize());
+ return maybe_rb.value_or(nullptr);
+ }
+ /// Gets a batch. Returns true if there is more data to process, false if we
+ /// are done or an error occurred
+ bool PollOnce() {
+ std::lock_guard guard(gate);
+ if (!CheckEnded()) {
+ return false;
+ }
+
+ // Process batches while we have data
+ for (;;) {
+ Result> result = getNextBatch();
+
+ if (result.ok()) {
+ auto out_rb = *result;
+ if (!out_rb) {
+ break;
+ }
+ ExecBatch out_b(*out_rb);
+ out_b.index = batches_produced++;
+ Status st = output_->InputReceived(this, std::move(out_b));
+ if (!st.ok()) {
+ ARROW_LOG(FATAL) << "Error in output_::InputReceived: " << st.ToString();
+ EndFromProcessThread(std::move(st));
+ }
+ } else {
+ EndFromProcessThread(result.status());
+ return false;
+ }
+ }
+
+ // Report to the output the total batch count, if we've already
+ // finished everything (there are two places where this can happen:
+ // here and InputFinished)
+ //
+ // It may happen here in cases where InputFinished was called before
+ // we were finished producing results (so we didn't know the output
+ // size at that time)
+ if (!CheckEnded()) {
+ return false;
+ }
+
+ // There is no more we can do now but there is still work remaining
+ // for later when more data arrives.
+ return true;
+ }
+
+ void EmitBatches() {
+ while (true) {
+ // Implementation note: If the queue is empty, we will block here
+ if (process_queue.Pop() == kPoisonPill) {
+ EndFromProcessThread();
+ }
+ // Either we're out of data or something went wrong
+ if (!PollOnce()) {
+ return;
+ }
+ }
+ }
+
+ /// The entry point for processThread
+ static void StartPoller(SortedMergeNode* node) { node->EmitBatches(); }
+
+ arrow::Ordering ordering_;
+
+ // Each input state corresponds to an input (e.g. a parquet data file)
+ std::vector> state;
+ std::vector input_counter;
+ std::vector output_counter;
+ std::mutex gate;
+
+ std::atomic cleanup_started{false};
+
+ // Backpressure counter common to all input states
+ std::atomic backpressure_counter;
+
+ std::atomic batches_produced{0};
+
+ // Queue to trigger processing of a given input. False acts as a poison pill
+ ConcurrentQueue process_queue;
+ // Once StartProducing is called, we initialize this thread to poll the
+ // input states and emit batches
+ std::thread process_thread;
+ arrow::Future<> process_task;
+
+ // Map arg index --> completion counter
+ std::vector counter_;
+ // Map arg index --> data
+ std::vector accumulation_queue_;
+ std::mutex mutex_;
+ std::atomic total_batches_{0};
+};
+
+} // namespace
+
+namespace internal {
+void RegisterSortedMergeNode(ExecFactoryRegistry* registry) {
+ DCHECK_OK(registry->AddFactory("sorted_merge", SortedMergeNode::Make));
+}
+} // namespace internal
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/sorted_merge_node_test.cc b/cpp/src/arrow/acero/sorted_merge_node_test.cc
new file mode 100644
index 0000000000000..55446d631d90c
--- /dev/null
+++ b/cpp/src/arrow/acero/sorted_merge_node_test.cc
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include
+
+#include "arrow/acero/exec_plan.h"
+#include "arrow/acero/map_node.h"
+#include "arrow/acero/options.h"
+#include "arrow/acero/test_nodes.h"
+#include "arrow/array/builder_base.h"
+#include "arrow/array/concatenate.h"
+#include "arrow/compute/ordering.h"
+#include "arrow/result.h"
+#include "arrow/scalar.h"
+#include "arrow/table.h"
+#include "arrow/testing/generator.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/type.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::acero {
+
+std::shared_ptr TestTable(int start, int step, int rows_per_batch,
+ int num_batches) {
+ return gen::Gen({{"timestamp", gen::Step(start, step, /*signed_int=*/true)},
+ {"str", gen::Random(utf8())}})
+ ->FailOnError()
+ ->Table(rows_per_batch, num_batches);
+}
+
+TEST(SortedMergeNode, Basic) {
+ auto table1 = TestTable(
+ /*start=*/0,
+ /*step=*/2,
+ /*rows_per_batch=*/2,
+ /*num_batches=*/3);
+ auto table2 = TestTable(
+ /*start=*/1,
+ /*step=*/2,
+ /*rows_per_batch=*/3,
+ /*num_batches=*/2);
+ auto table3 = TestTable(
+ /*start=*/3,
+ /*step=*/3,
+ /*rows_per_batch=*/6,
+ /*num_batches=*/1);
+ std::vector src_decls;
+ src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table1)));
+ src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table2)));
+ src_decls.emplace_back(Declaration("table_source", TableSourceNodeOptions(table3)));
+
+ auto ops = OrderByNodeOptions(compute::Ordering({compute::SortKey("timestamp")}));
+
+ Declaration sorted_merge{"sorted_merge", src_decls, ops};
+ // We can't use threads for sorted merging since it relies on
+ // ascending deterministic order of timestamps
+ ASSERT_OK_AND_ASSIGN(auto output,
+ DeclarationToTable(sorted_merge, /*use_threads=*/false));
+ ASSERT_EQ(output->num_rows(), 18);
+
+ ASSERT_OK_AND_ASSIGN(auto expected_ts_builder,
+ MakeBuilder(int32(), default_memory_pool()));
+ for (auto i : {0, 1, 2, 3, 3, 4, 5, 6, 6, 7, 8, 9, 9, 10, 11, 12, 15, 18}) {
+ ASSERT_OK(expected_ts_builder->AppendScalar(*MakeScalar(i)));
+ }
+ ASSERT_OK_AND_ASSIGN(auto expected_ts, expected_ts_builder->Finish());
+ auto output_col = output->column(0);
+ ASSERT_OK_AND_ASSIGN(auto output_ts, Concatenate(output_col->chunks()));
+
+ AssertArraysEqual(*expected_ts, *output_ts);
+}
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/time_series_util.cc b/cpp/src/arrow/acero/time_series_util.cc
new file mode 100644
index 0000000000000..71133fef47306
--- /dev/null
+++ b/cpp/src/arrow/acero/time_series_util.cc
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/array/data.h"
+
+#include "arrow/acero/time_series_util.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+template ::value, bool>>
+inline uint64_t NormalizeTime(T t) {
+ uint64_t bias =
+ std::is_signed::value ? static_cast(1) << (8 * sizeof(T) - 1) : 0;
+ return t < 0 ? static_cast(t + bias) : static_cast(t);
+}
+
+uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row) {
+#define LATEST_VAL_CASE(id, val) \
+ case Type::id: { \
+ using T = typename TypeIdTraits::Type; \
+ using CType = typename TypeTraits::CType; \
+ return val(data->GetValues(1)[row]); \
+ }
+
+ auto data = batch->column_data(col);
+ switch (time_type) {
+ LATEST_VAL_CASE(INT8, NormalizeTime)
+ LATEST_VAL_CASE(INT16, NormalizeTime)
+ LATEST_VAL_CASE(INT32, NormalizeTime)
+ LATEST_VAL_CASE(INT64, NormalizeTime)
+ LATEST_VAL_CASE(UINT8, NormalizeTime)
+ LATEST_VAL_CASE(UINT16, NormalizeTime)
+ LATEST_VAL_CASE(UINT32, NormalizeTime)
+ LATEST_VAL_CASE(UINT64, NormalizeTime)
+ LATEST_VAL_CASE(DATE32, NormalizeTime)
+ LATEST_VAL_CASE(DATE64, NormalizeTime)
+ LATEST_VAL_CASE(TIME32, NormalizeTime)
+ LATEST_VAL_CASE(TIME64, NormalizeTime)
+ LATEST_VAL_CASE(TIMESTAMP, NormalizeTime)
+ default:
+ DCHECK(false);
+ return 0; // cannot happen
+ }
+
+#undef LATEST_VAL_CASE
+}
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/time_series_util.h b/cpp/src/arrow/acero/time_series_util.h
new file mode 100644
index 0000000000000..97707f43bf20b
--- /dev/null
+++ b/cpp/src/arrow/acero/time_series_util.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+
+namespace arrow::acero {
+
+// normalize the value to unsigned 64-bits while preserving ordering of values
+template ::value, bool> = true>
+uint64_t NormalizeTime(T t);
+
+uint64_t GetTime(const RecordBatch* batch, Type::type time_type, int col, uint64_t row);
+
+} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/unmaterialized_table.h b/cpp/src/arrow/acero/unmaterialized_table.h
new file mode 100644
index 0000000000000..05d6c866936e0
--- /dev/null
+++ b/cpp/src/arrow/acero/unmaterialized_table.h
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include
+#include
+#include "arrow/array/builder_base.h"
+#include "arrow/array/builder_binary.h"
+#include "arrow/array/builder_primitive.h"
+#include "arrow/memory_pool.h"
+#include "arrow/record_batch.h"
+#include "arrow/type_traits.h"
+#include "arrow/util/logging.h"
+
+namespace arrow::acero {
+
+/// Lightweight representation of a cell of an unmaterialized table.
+///
+struct CompositeEntry {
+ RecordBatch* batch;
+ uint64_t start;
+ uint64_t end;
+};
+
+// Forward declare the builder
+template
+class UnmaterializedSliceBuilder;
+
+/// A table of composite reference rows. Rows maintain pointers to the
+/// constituent record batches, but the overall table retains shared_ptr
+/// references to ensure memory remains resident while the table is live.
+///
+/// The main reason for this is that, especially for wide tables, some operations
+/// such as sorted_merge or asof_join are effectively row-oriented, rather than
+/// column-oriented. Separating the join part from the columnar materialization
+/// part simplifies the logic around data types and increases efficiency.
+///
+/// We don't put the shared_ptr's into the rows for efficiency reasons. Use
+/// UnmaterializedSliceBuilder to add ranges of record batches to this table
+template
+class UnmaterializedCompositeTable {
+ public:
+ UnmaterializedCompositeTable(
+ const std::shared_ptr& output_schema, size_t num_composite_tables,
+ std::unordered_map> output_col_to_src_,
+ arrow::MemoryPool* pool_ = arrow::default_memory_pool())
+ : schema(output_schema),
+ num_composite_tables(num_composite_tables),
+ output_col_to_src(std::move(output_col_to_src_)),
+ pool{pool_} {}
+
+ // Shallow wrappers around std::vector for performance
+ inline size_t capacity() { return slices.capacity(); }
+ inline void reserve(size_t num_slices) { slices.reserve(num_slices); }
+
+ inline size_t Size() const { return num_rows; }
+ inline size_t Empty() const { return num_rows == 0; }
+
+ Result>> Materialize() {
+ // Don't build empty batches
+ if (Empty()) {
+ return std::nullopt;
+ }
+ DCHECK_LE(Size(), (uint64_t)std::numeric_limits::max());
+ std::vector> arrays(schema->num_fields());
+
+#define MATERIALIZE_CASE(id) \
+ case arrow::Type::id: { \
+ using T = typename arrow::TypeIdTraits::Type; \
+ ARROW_ASSIGN_OR_RAISE(arrays.at(i_col), materializeColumn(field_type, i_col)); \
+ break; \
+ }
+
+ // Build the arrays column-by-column from the rows
+ for (int i_col = 0; i_col < schema->num_fields(); ++i_col) {
+ const std::shared_ptr& field = schema->field(i_col);
+ const auto& field_type = field->type();
+
+ switch (field_type->id()) {
+ MATERIALIZE_CASE(BOOL)
+ MATERIALIZE_CASE(INT8)
+ MATERIALIZE_CASE(INT16)
+ MATERIALIZE_CASE(INT32)
+ MATERIALIZE_CASE(INT64)
+ MATERIALIZE_CASE(UINT8)
+ MATERIALIZE_CASE(UINT16)
+ MATERIALIZE_CASE(UINT32)
+ MATERIALIZE_CASE(UINT64)
+ MATERIALIZE_CASE(FLOAT)
+ MATERIALIZE_CASE(DOUBLE)
+ MATERIALIZE_CASE(DATE32)
+ MATERIALIZE_CASE(DATE64)
+ MATERIALIZE_CASE(TIME32)
+ MATERIALIZE_CASE(TIME64)
+ MATERIALIZE_CASE(TIMESTAMP)
+ MATERIALIZE_CASE(STRING)
+ MATERIALIZE_CASE(LARGE_STRING)
+ MATERIALIZE_CASE(BINARY)
+ MATERIALIZE_CASE(LARGE_BINARY)
+ default:
+ return arrow::Status::Invalid("Unsupported data type ",
+ field->type()->ToString(), " for field ",
+ field->name());
+ }
+ }
+
+#undef MATERIALIZE_CASE
+
+ std::shared_ptr r =
+ arrow::RecordBatch::Make(schema, (int64_t)num_rows, arrays);
+ return r;
+ }
+
+ private:
+ struct UnmaterializedSlice {
+ CompositeEntry components[MAX_COMPOSITE_TABLES];
+ size_t num_components;
+
+ inline int64_t Size() const {
+ if (num_components == 0) {
+ return 0;
+ }
+ return components[0].end - components[0].start;
+ }
+ };
+
+ // Mapping from an output column ID to a source table ID and column ID
+ std::shared_ptr schema;
+ size_t num_composite_tables;
+ std::unordered_map> output_col_to_src;
+
+ arrow::MemoryPool* pool;
+
+ /// A map from address of a record batch to the record batch. Used to
+ /// maintain the lifetime of the record batch in case it goes out of scope
+ /// by the main exec node thread
+ std::unordered_map> ptr2Ref = {};
+ std::vector slices;
+
+ size_t num_rows = 0;
+
+ // for AddRecordBatchRef/AddSlice and access to UnmaterializedSlice
+ friend class UnmaterializedSliceBuilder;
+
+ void AddRecordBatchRef(const std::shared_ptr& ref) {
+ ptr2Ref[(uintptr_t)ref.get()] = ref;
+ }
+ void AddSlice(const UnmaterializedSlice& slice) {
+ slices.push_back(slice);
+ num_rows += slice.Size();
+ }
+
+ template ::BuilderType>
+ enable_if_boolean static BuilderAppend(
+ Builder& builder, const std::shared_ptr& source, uint64_t row) {
+ if (source->IsNull(row)) {
+ builder.UnsafeAppendNull();
+ return Status::OK();
+ }
+ builder.UnsafeAppend(bit_util::GetBit(source->template GetValues(1), row));
+ return Status::OK();
+ }
+
+ template ::BuilderType>
+ enable_if_t::value && !is_boolean_type::value,
+ Status> static BuilderAppend(Builder& builder,
+ const std::shared_ptr& source,
+ uint64_t row) {
+ if (source->IsNull(row)) {
+ builder.UnsafeAppendNull();
+ return Status::OK();
+ }
+ using CType = typename TypeTraits::CType;
+ builder.UnsafeAppend(source->template GetValues(1)[row]);
+ return Status::OK();
+ }
+
+ template ::BuilderType>
+ enable_if_base_binary static BuilderAppend(
+ Builder& builder, const std::shared_ptr& source, uint64_t row) {
+ if (source->IsNull(row)) {
+ return builder.AppendNull();
+ }
+ using offset_type = typename Type::offset_type;
+ const uint8_t* data = source->buffers[2]->data();
+ const offset_type* offsets = source->GetValues(1);
+ const offset_type offset0 = offsets[row];
+ const offset_type offset1 = offsets[row + 1];
+ return builder.Append(data + offset0, offset1 - offset0);
+ }
+
+ template ::BuilderType>
+ arrow::Result> materializeColumn(
+ const std::shared_ptr& type, int i_col) {
+ ARROW_ASSIGN_OR_RAISE(auto builderPtr, arrow::MakeBuilder(type, pool));
+ Builder& builder = *arrow::internal::checked_cast(builderPtr.get());
+ ARROW_RETURN_NOT_OK(builder.Reserve(num_rows));
+
+ const auto& [table_index, column_index] = output_col_to_src[i_col];
+
+ for (const auto& unmaterialized_slice : slices) {
+ const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
+ if (batch) {
+ for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+ arrow::Status st = BuilderAppend(
+ builder, batch->column_data(column_index), rowNum);
+ ARROW_RETURN_NOT_OK(st);
+ }
+ } else {
+ for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
+ ARROW_RETURN_NOT_OK(builder.AppendNull());
+ }
+ }
+ }
+ std::shared_ptr result;
+ ARROW_RETURN_NOT_OK(builder.Finish(&result));
+ return Result{std::move(result)};
+ }
+};
+
+/// A builder class that can append blocks of data to a row. A "slice"
+/// is built by horizontally concatenating record batches.
+template