From be24bcdebf410eb44b29cb89d8ae15d656d89552 Mon Sep 17 00:00:00 2001 From: Chendi Xue Date: Mon, 4 Jan 2021 17:32:59 +0800 Subject: [PATCH] Add a new config to enable/disable time breakdown for wscg Signed-off-by: Chendi Xue --- .../oap/vectorized/ExpressionEvaluator.java | 1 + .../ExpressionEvaluatorJniWrapper.java | 9 ++++++++ .../com/intel/oap/ColumnarPluginConfig.scala | 9 ++++++++ .../arrow_compute/ext/codegen_common.cc | 10 +++++++++ .../arrow_compute/ext/codegen_common.h | 1 + .../ext/whole_stage_codegen_kernel.cc | 22 ++++++++++++------- cpp/src/jni/jni_wrapper.cc | 6 +++++ 7 files changed, 50 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java b/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java index 8b86c92ea..0dc9fde23 100644 --- a/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java +++ b/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluator.java @@ -55,6 +55,7 @@ public ExpressionEvaluator(List listJars) throws IOException, IllegalAcc jniWrapper = new ExpressionEvaluatorJniWrapper(tmp_dir, listJars); jniWrapper.nativeSetJavaTmpDir(jniWrapper.tmp_dir_path); jniWrapper.nativeSetBatchSize(ColumnarPluginConfig.getBatchSize()); + jniWrapper.nativeSetMetricsTime(ColumnarPluginConfig.getEnableMetricsTime()); ColumnarPluginConfig.setRandomTempDir(jniWrapper.tmp_dir_path); } diff --git a/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluatorJniWrapper.java b/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluatorJniWrapper.java index 763883972..bd79e1425 100644 --- a/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluatorJniWrapper.java +++ b/core/src/main/java/com/intel/oap/vectorized/ExpressionEvaluatorJniWrapper.java @@ -54,6 +54,15 @@ public ExpressionEvaluatorJniWrapper(String tmp_dir, List listJars) */ native void nativeSetBatchSize(int batch_size); + /** + * Set native env variables NATIVESQL_METRICS_TIME + * + * @param batch_size numRows of one batch, use + * spark.sql.execution.arrow.maxRecordsPerBatch + */ + native void nativeSetMetricsTime(boolean is_enable); + + /** * Generates the projector module to evaluate the expressions with custom * configuration. diff --git a/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala b/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala index 40d8a61b7..aa22d7468 100644 --- a/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala +++ b/core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala @@ -50,6 +50,8 @@ class ColumnarPluginConfig(conf: SparkConf) { .equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager") val batchSize: Int = conf.getInt("spark.sql.execution.arrow.maxRecordsPerBatch", defaultValue = 10000) + val enableMetricsTime: Boolean = + conf.getBoolean("spark.oap.sql.columnar.wholestagecodegen.breakdownTime", defaultValue = false) val tmpFile: String = conf.getOption("spark.sql.columnar.tmp_dir").getOrElse(null) val broadcastCacheTimeout: Int = @@ -107,6 +109,13 @@ object ColumnarPluginConfig { ins.batchSize } } + def getEnableMetricsTime: Boolean = synchronized{ + if (ins == null) { + false + } else { + ins.enableMetricsTime + } + } def getTempFile: String = synchronized { if (ins != null && ins.tmpFile != null) { ins.tmpFile diff --git a/cpp/src/codegen/arrow_compute/ext/codegen_common.cc b/cpp/src/codegen/arrow_compute/ext/codegen_common.cc index 7711379d0..eb8478b18 100644 --- a/cpp/src/codegen/arrow_compute/ext/codegen_common.cc +++ b/cpp/src/codegen/arrow_compute/ext/codegen_common.cc @@ -449,6 +449,16 @@ std::string GetTempPath() { return tmp_dir_; } +bool GetEnableTimeMetrics() { + bool is_enable = false; + const char* env_enable_time_metrics = std::getenv("NATIVESQL_METRICS_TIME"); + if (env_enable_time_metrics != nullptr) { + auto is_enable_str = std::string(env_enable_time_metrics); + if (is_enable_str.compare("true") == 0) is_enable = true; + } + return is_enable; +} + int GetBatchSize() { int batch_size; const char* env_batch_size = std::getenv("NATIVESQL_BATCH_SIZE"); diff --git a/cpp/src/codegen/arrow_compute/ext/codegen_common.h b/cpp/src/codegen/arrow_compute/ext/codegen_common.h index cb8f028d7..c8ec15676 100644 --- a/cpp/src/codegen/arrow_compute/ext/codegen_common.h +++ b/cpp/src/codegen/arrow_compute/ext/codegen_common.h @@ -38,6 +38,7 @@ int FileSpinLock(); void FileSpinUnLock(int fd); int GetBatchSize(); +bool GetEnableTimeMetrics(); std::string exec(const char* cmd); std::string GetTempPath(); std::string GetArrowTypeDefString(std::shared_ptr type); diff --git a/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc b/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc index 14fab7743..e8d6d87e8 100644 --- a/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc +++ b/cpp/src/codegen/arrow_compute/ext/whole_stage_codegen_kernel.cc @@ -53,6 +53,7 @@ class WholeStageCodeGenKernel::Impl { const std::vector>& output_field_list) : ctx_(ctx) { int hash_relation_idx = 0; + enable_time_metrics_ = GetEnableTimeMetrics(); THROW_NOT_OK(ParseNodeTree(root_node, &hash_relation_idx, &kernel_list_)); THROW_NOT_OK(LoadJITFunction(input_field_list, output_field_list, kernel_list_, &wscg_kernel_)); @@ -73,6 +74,7 @@ class WholeStageCodeGenKernel::Impl { std::shared_ptr wscg_kernel_; std::string signature_; bool is_smj_ = false; + bool enable_time_metrics_; arrow::Status GetArguments(std::shared_ptr node, int i, gandiva::NodeVector* node_list) { @@ -399,10 +401,12 @@ class TypedWholeStageCodeGenImpl : public CodeGenBase { for (auto codegen_ctx : codegen_ctx_list) { auto tmp_idx = codegen_ctx_idx; codegen_ctx_idx++; - codes_ss << "struct timespec start_" << tmp_idx << ", end_" << tmp_idx << ";" - << std::endl; - codes_ss << "clock_gettime(CLOCK_MONOTONIC_COARSE, &start_" << tmp_idx << ");" - << std::endl; + if (enable_time_metrics_) { + codes_ss << "struct timespec start_" << tmp_idx << ", end_" << tmp_idx << ";" + << std::endl; + codes_ss << "clock_gettime(CLOCK_MONOTONIC_COARSE, &start_" << tmp_idx << ");" + << std::endl; + } codes_ss << codegen_ctx->prepare_codes << std::endl; if (codegen_ctx_idx < codegen_ctx_list.size()) { codes_ss << codegen_ctx_list[codegen_ctx_idx]->unsafe_row_prepare_codes @@ -417,10 +421,12 @@ class TypedWholeStageCodeGenImpl : public CodeGenBase { for (int ctx_idx = codegen_ctx_list.size() - 1; ctx_idx >= 0; ctx_idx--) { auto codegen_ctx = codegen_ctx_list[ctx_idx]; codes_ss << codegen_ctx->finish_codes << std::endl; - codes_ss << "clock_gettime(CLOCK_MONOTONIC_COARSE, &end_" << ctx_idx << ");" - << std::endl; - codes_ss << "process_time_" << ctx_idx << " += TIME_NANO_DIFF(end_" << ctx_idx - << ", start_" << ctx_idx << ");" << std::endl; + if (enable_time_metrics_) { + codes_ss << "clock_gettime(CLOCK_MONOTONIC_COARSE, &end_" << ctx_idx << ");" + << std::endl; + codes_ss << "process_time_" << ctx_idx << " += TIME_NANO_DIFF(end_" << ctx_idx + << ", start_" << ctx_idx << ");" << std::endl; + } } codes_ss << "} // end of for loop" << std::endl; codes_ss << GetProcessFinishCodes(output_field_list) << std::endl; diff --git a/cpp/src/jni/jni_wrapper.cc b/cpp/src/jni/jni_wrapper.cc index 739c4385e..f27414c4f 100644 --- a/cpp/src/jni/jni_wrapper.cc +++ b/cpp/src/jni/jni_wrapper.cc @@ -263,6 +263,12 @@ Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeSetBatchSize( setenv("NATIVESQL_BATCH_SIZE", std::to_string(batch_size).c_str(), 1); } +JNIEXPORT void JNICALL +Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeSetMetricsTime( + JNIEnv* env, jobject obj, jboolean is_enable) { + setenv("NATIVESQL_METRICS_TIME", (is_enable ? "true" : "false"), 1); +} + JNIEXPORT jlong JNICALL Java_com_intel_oap_vectorized_ExpressionEvaluatorJniWrapper_nativeBuild( JNIEnv* env, jobject obj, jlong memory_pool_id, jbyteArray schema_arr,