From cd2336697d267356b1b33920fd203f91fb92f0fe Mon Sep 17 00:00:00 2001 From: xuedongluan Date: Thu, 7 Nov 2024 21:47:09 +0800 Subject: [PATCH] Add isDriver_ member variables in VeloxBackend class --- .../gluten/backendsapi/velox/VeloxListenerApi.scala | 8 ++++---- cpp/velox/compute/VeloxBackend.cc | 6 +++--- cpp/velox/compute/VeloxBackend.h | 6 ++++-- cpp/velox/jni/VeloxJniWrapper.cc | 5 +++-- .../apache/gluten/init/NativeBackendInitializer.java | 10 +++++----- 5 files changed, 19 insertions(+), 16 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala index 850509db3e91..b0bee270e361 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala @@ -89,7 +89,7 @@ class VeloxListenerApi extends ListenerApi with Logging { SparkDirectoryUtil.init(conf) UDFResolver.resolveUdfConf(conf, isDriver = true) - initialize(conf) + initialize(conf, true) UdfJniWrapper.registerFunctionSignatures() } @@ -116,12 +116,12 @@ class VeloxListenerApi extends ListenerApi with Logging { SparkDirectoryUtil.init(conf) UDFResolver.resolveUdfConf(conf, isDriver = false) - initialize(conf) + initialize(conf, false) } override def onExecutorShutdown(): Unit = shutdown() - private def initialize(conf: SparkConf): Unit = { + private def initialize(conf: SparkConf, isDriver: Boolean): Unit = { // Force batch type initializations. VeloxBatch.getClass ArrowJavaBatch.getClass @@ -158,7 +158,7 @@ class VeloxListenerApi extends ListenerApi with Logging { // Initial native backend with configurations. val parsed = GlutenConfigUtil.parseConfig(conf.getAll.toMap) - NativeBackendInitializer.initializeBackend(parsed) + NativeBackendInitializer.initializeBackend(parsed, isDriver) // Inject backend-specific implementations to override spark classes. GlutenFormatFactory.register(new VeloxParquetWriterInjects, new VeloxOrcWriterInjects) diff --git a/cpp/velox/compute/VeloxBackend.cc b/cpp/velox/compute/VeloxBackend.cc index d39b0902c250..5a88121130d3 100644 --- a/cpp/velox/compute/VeloxBackend.cc +++ b/cpp/velox/compute/VeloxBackend.cc @@ -195,7 +195,7 @@ void VeloxBackend::initJolFilesystem() { } void VeloxBackend::initCache() { - if (backendConf_->get(kVeloxCacheEnabled, false)) { + if (backendConf_->get(kVeloxCacheEnabled, false) && !isDriver_) { FLAGS_ssd_odirect = true; FLAGS_ssd_odirect = backendConf_->get(kVeloxSsdODirectEnabled, false); @@ -308,8 +308,8 @@ void VeloxBackend::initUdf() { std::unique_ptr VeloxBackend::instance_ = nullptr; -void VeloxBackend::create(const std::unordered_map& conf) { - instance_ = std::unique_ptr(new VeloxBackend(conf)); +void VeloxBackend::create(const std::unordered_map& conf, bool isDriver) { + instance_ = std::unique_ptr(new VeloxBackend(conf, isDriver)); } VeloxBackend* VeloxBackend::get() { diff --git a/cpp/velox/compute/VeloxBackend.h b/cpp/velox/compute/VeloxBackend.h index e00a8868d15b..285f6fbced26 100644 --- a/cpp/velox/compute/VeloxBackend.h +++ b/cpp/velox/compute/VeloxBackend.h @@ -49,7 +49,7 @@ class VeloxBackend { } } - static void create(const std::unordered_map& conf); + static void create(const std::unordered_map& conf, bool isDriver); static VeloxBackend* get(); @@ -67,7 +67,7 @@ class VeloxBackend { } private: - explicit VeloxBackend(const std::unordered_map& conf) { + explicit VeloxBackend(const std::unordered_map& conf, bool isDriver) : isDriver_(isDriver) { init(conf); } @@ -95,6 +95,8 @@ class VeloxBackend { std::string cacheFilePrefix_; std::shared_ptr backendConf_; + + bool isDriver_ = false; }; } // namespace gluten diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc index b8d2b0c3c2ff..f886a7e8b9f0 100644 --- a/cpp/velox/jni/VeloxJniWrapper.cc +++ b/cpp/velox/jni/VeloxJniWrapper.cc @@ -84,11 +84,12 @@ void JNI_OnUnload(JavaVM* vm, void*) { JNIEXPORT void JNICALL Java_org_apache_gluten_init_NativeBackendInitializer_initialize( // NOLINT JNIEnv* env, jclass, - jbyteArray conf) { + jbyteArray conf, + jboolean isDriver) { JNI_METHOD_START auto safeArray = getByteArrayElementsSafe(env, conf); auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length()); - VeloxBackend::create(sparkConf); + VeloxBackend::create(sparkConf, isDriver); JNI_METHOD_END() } diff --git a/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java b/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java index f85187fae695..b77c525daf9b 100644 --- a/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java +++ b/gluten-arrow/src/main/java/org/apache/gluten/init/NativeBackendInitializer.java @@ -41,12 +41,12 @@ public final class NativeBackendInitializer { // In local mode, NativeBackendInitializer#initializeBackend will be invoked twice in same // thread, driver first then executor, initialized flag ensure only invoke initializeBackend once, // so there are no race condition here. - public static void initializeBackend(scala.collection.Map conf) { + public static void initializeBackend(scala.collection.Map conf, boolean isDriver) { if (!initialized.compareAndSet(false, true)) { // Already called. return; } - initialize0(conf); + initialize0(conf, isDriver); SparkShutdownManagerUtil.addHook( () -> { shutdown(); @@ -54,18 +54,18 @@ public static void initializeBackend(scala.collection.Map conf) }); } - private static void initialize0(scala.collection.Map conf) { + private static void initialize0(scala.collection.Map conf, boolean isDriver) { try { Map nativeConfMap = GlutenConfig.getNativeBackendConf(Backend.get().name(), conf); - initialize(ConfigUtil.serialize(nativeConfMap)); + initialize(ConfigUtil.serialize(nativeConfMap), isDriver); } catch (Exception e) { LOG.error("Failed to call native backend's initialize method", e); throw e; } } - private static native void initialize(byte[] configPlan); + private static native void initialize(byte[] configPlan, boolean isDriver); private static native void shutdown();