From 6058031f4db83a1c6fc148b01aa7293b66bf1f83 Mon Sep 17 00:00:00 2001 From: Dane Pitkin Date: Fri, 28 Jul 2023 16:37:29 -0400 Subject: [PATCH] GH-36069: [Java] Ensure S3 is finalized on shutdown --- java/dataset/src/main/cpp/jni_wrapper.cc | 13 +++++++++++++ .../dataset/file/FileSystemDatasetFactory.java | 12 ++++++++++++ .../org/apache/arrow/dataset/file/JniWrapper.java | 6 ++++++ 3 files changed, 31 insertions(+) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 871a2e95b94ec..1cfea1e2aed4a 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -27,6 +27,7 @@ #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/s3fs.h" #include "arrow/engine/substrait/util.h" #include "arrow/ipc/api.h" #include "arrow/util/iterator.h" @@ -678,6 +679,18 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JNI_METHOD_END() } +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: ensureS3Finalized + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_ensureS3Finalized( + JNIEnv* env, jobject) { + JNI_METHOD_START + JniAssertOkOrThrow(arrow::fs::EnsureS3Finalized()); + JNI_METHOD_END() +} + /* * Class: org_apache_arrow_dataset_substrait_JniWrapper * Method: executeSerializedPlan diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java index aa315690592ee..cd9bb751252ec 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -17,6 +17,8 @@ package org.apache.arrow.dataset.file; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.arrow.dataset.jni.NativeDatasetFactory; import org.apache.arrow.dataset.jni.NativeMemoryPool; import org.apache.arrow.memory.BufferAllocator; @@ -26,14 +28,18 @@ */ public class FileSystemDatasetFactory extends NativeDatasetFactory { + private static final AtomicBoolean addedS3ShutdownHook = new AtomicBoolean(false); + public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String uri) { super(allocator, memoryPool, createNative(format, uri)); + ensureS3FinalizedOnShutdown(); } public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String[] uris) { super(allocator, memoryPool, createNative(format, uris)); + ensureS3FinalizedOnShutdown(); } private static long createNative(FileFormat format, String uri) { @@ -44,4 +50,10 @@ private static long createNative(FileFormat format, String[] uris) { return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); } + private static void ensureS3FinalizedOnShutdown() { + if (addedS3ShutdownHook.compareAndSet(false, true)) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { JniWrapper.get().ensureS3Finalized(); })); + } + } + } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index c3a1a4e58a140..c59a9d3fe8210 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -75,4 +75,10 @@ public native void writeFromScannerToFile(long streamAddress, int maxPartitions, String baseNameTemplate); + /** + * Ensure the S3 APIs are shutdown, but only if not already done. If the S3 APIs are unintialized, + * then this is a noop. + */ + public native void ensureS3Finalized(); + }