Skip to content

Commit

Permalink
Add isDriver_ member variables in VeloxBackend class
Browse files Browse the repository at this point in the history
  • Loading branch information
leoluan2009 committed Nov 7, 2024
1 parent e204dc5 commit cd23366
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class VeloxListenerApi extends ListenerApi with Logging {

SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = true)
initialize(conf)
initialize(conf, true)
UdfJniWrapper.registerFunctionSignatures()
}

Expand All @@ -116,12 +116,12 @@ class VeloxListenerApi extends ListenerApi with Logging {

SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = false)
initialize(conf)
initialize(conf, false)
}

override def onExecutorShutdown(): Unit = shutdown()

private def initialize(conf: SparkConf): Unit = {
private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
// Force batch type initializations.
VeloxBatch.getClass
ArrowJavaBatch.getClass
Expand Down Expand Up @@ -158,7 +158,7 @@ class VeloxListenerApi extends ListenerApi with Logging {

// Initial native backend with configurations.
val parsed = GlutenConfigUtil.parseConfig(conf.getAll.toMap)
NativeBackendInitializer.initializeBackend(parsed)
NativeBackendInitializer.initializeBackend(parsed, isDriver)

// Inject backend-specific implementations to override spark classes.
GlutenFormatFactory.register(new VeloxParquetWriterInjects, new VeloxOrcWriterInjects)
Expand Down
6 changes: 3 additions & 3 deletions cpp/velox/compute/VeloxBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ void VeloxBackend::initJolFilesystem() {
}

void VeloxBackend::initCache() {
if (backendConf_->get<bool>(kVeloxCacheEnabled, false)) {
if (backendConf_->get<bool>(kVeloxCacheEnabled, false) && !isDriver_) {
FLAGS_ssd_odirect = true;

FLAGS_ssd_odirect = backendConf_->get<bool>(kVeloxSsdODirectEnabled, false);
Expand Down Expand Up @@ -308,8 +308,8 @@ void VeloxBackend::initUdf() {

std::unique_ptr<VeloxBackend> VeloxBackend::instance_ = nullptr;

void VeloxBackend::create(const std::unordered_map<std::string, std::string>& conf) {
instance_ = std::unique_ptr<VeloxBackend>(new VeloxBackend(conf));
void VeloxBackend::create(const std::unordered_map<std::string, std::string>& conf, bool isDriver) {
instance_ = std::unique_ptr<VeloxBackend>(new VeloxBackend(conf, isDriver));
}

VeloxBackend* VeloxBackend::get() {
Expand Down
6 changes: 4 additions & 2 deletions cpp/velox/compute/VeloxBackend.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class VeloxBackend {
}
}

static void create(const std::unordered_map<std::string, std::string>& conf);
static void create(const std::unordered_map<std::string, std::string>& conf, bool isDriver);

static VeloxBackend* get();

Expand All @@ -67,7 +67,7 @@ class VeloxBackend {
}

private:
explicit VeloxBackend(const std::unordered_map<std::string, std::string>& conf) {
explicit VeloxBackend(const std::unordered_map<std::string, std::string>& conf, bool isDriver) : isDriver_(isDriver) {
init(conf);
}

Expand Down Expand Up @@ -95,6 +95,8 @@ class VeloxBackend {
std::string cacheFilePrefix_;

std::shared_ptr<facebook::velox::config::ConfigBase> backendConf_;

bool isDriver_ = false;
};

} // namespace gluten
5 changes: 3 additions & 2 deletions cpp/velox/jni/VeloxJniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ void JNI_OnUnload(JavaVM* vm, void*) {
JNIEXPORT void JNICALL Java_org_apache_gluten_init_NativeBackendInitializer_initialize( // NOLINT
JNIEnv* env,
jclass,
jbyteArray conf) {
jbyteArray conf,
jboolean isDriver) {
JNI_METHOD_START
auto safeArray = getByteArrayElementsSafe(env, conf);
auto sparkConf = parseConfMap(env, safeArray.elems(), safeArray.length());
VeloxBackend::create(sparkConf);
VeloxBackend::create(sparkConf, isDriver);
JNI_METHOD_END()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,31 @@ public final class NativeBackendInitializer {
// In local mode, NativeBackendInitializer#initializeBackend will be invoked twice in same
// thread, driver first then executor, initialized flag ensure only invoke initializeBackend once,
// so there are no race condition here.
public static void initializeBackend(scala.collection.Map<String, String> conf) {
public static void initializeBackend(scala.collection.Map<String, String> conf, boolean isDriver) {
if (!initialized.compareAndSet(false, true)) {
// Already called.
return;
}
initialize0(conf);
initialize0(conf, isDriver);
SparkShutdownManagerUtil.addHook(
() -> {
shutdown();
return BoxedUnit.UNIT;
});
}

private static void initialize0(scala.collection.Map<String, String> conf) {
private static void initialize0(scala.collection.Map<String, String> conf, boolean isDriver) {
try {
Map<String, String> nativeConfMap =
GlutenConfig.getNativeBackendConf(Backend.get().name(), conf);
initialize(ConfigUtil.serialize(nativeConfMap));
initialize(ConfigUtil.serialize(nativeConfMap), isDriver);
} catch (Exception e) {
LOG.error("Failed to call native backend's initialize method", e);
throw e;
}
}

private static native void initialize(byte[] configPlan);
private static native void initialize(byte[] configPlan, boolean isDriver);

private static native void shutdown();

Expand Down

0 comments on commit cd23366

Please sign in to comment.