Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-17] smjwscg optimization: (#18)
Browse files Browse the repository at this point in the history
* Add a new config to enable/disable time breakdown for wscg

Signed-off-by: Chendi Xue <[email protected]>

* ColumnarWSCG optimization: only GetValue when field is used

Signed-off-by: Chendi Xue <[email protected]>
  • Loading branch information
xuechendi authored Jan 8, 2021
1 parent ec28760 commit 567ada1
Show file tree
Hide file tree
Showing 15 changed files with 445 additions and 337 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public ExpressionEvaluator(List<String> listJars) throws IOException, IllegalAcc
jniWrapper = new ExpressionEvaluatorJniWrapper(tmp_dir, listJars);
jniWrapper.nativeSetJavaTmpDir(jniWrapper.tmp_dir_path);
jniWrapper.nativeSetBatchSize(ColumnarPluginConfig.getBatchSize());
jniWrapper.nativeSetMetricsTime(ColumnarPluginConfig.getEnableMetricsTime());
ColumnarPluginConfig.setRandomTempDir(jniWrapper.tmp_dir_path);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ public ExpressionEvaluatorJniWrapper(String tmp_dir, List<String> listJars)
*/
native void nativeSetBatchSize(int batch_size);

/**
* Set native env variables NATIVESQL_METRICS_TIME
*
* @param batch_size numRows of one batch, use
* spark.sql.execution.arrow.maxRecordsPerBatch
*/
native void nativeSetMetricsTime(boolean is_enable);


/**
* Generates the projector module to evaluate the expressions with custom
* configuration.
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class ColumnarPluginConfig(conf: SparkConf) {
.equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager")
val batchSize: Int =
conf.getInt("spark.sql.execution.arrow.maxRecordsPerBatch", defaultValue = 10000)
val enableMetricsTime: Boolean =
conf.getBoolean("spark.oap.sql.columnar.wholestagecodegen.breakdownTime", defaultValue = false)
val tmpFile: String =
conf.getOption("spark.sql.columnar.tmp_dir").getOrElse(null)
val broadcastCacheTimeout: Int =
Expand Down Expand Up @@ -107,6 +109,13 @@ object ColumnarPluginConfig {
ins.batchSize
}
}
def getEnableMetricsTime: Boolean = synchronized{
if (ins == null) {
false
} else {
ins.enableMetricsTime
}
}
def getTempFile: String = synchronized {
if (ins != null && ins.tmpFile != null) {
ins.tmpFile
Expand Down
10 changes: 3 additions & 7 deletions cpp/src/codegen/arrow_compute/ext/array_item_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@ namespace extra {
struct ArrayItemIndex {
uint16_t id = 0;
uint16_t array_id = 0;
bool valid = true;
ArrayItemIndex() : array_id(0), id(0), valid(true) {}
ArrayItemIndex(bool valid) : array_id(0), id(0), valid(valid) {}
ArrayItemIndex(uint16_t array_id, uint16_t id)
: array_id(array_id), id(id), valid(true) {}
ArrayItemIndex() : array_id(0), id(0) {}
ArrayItemIndex(uint16_t array_id, uint16_t id) : array_id(array_id), id(id) {}
};
struct ArrayItemIndexS {
uint16_t id = 0;
uint16_t array_id = 0;
ArrayItemIndexS() : array_id(0), id(0) {}
ArrayItemIndexS(uint16_t array_id, uint16_t id)
: array_id(array_id), id(id) {}
ArrayItemIndexS(uint16_t array_id, uint16_t id) : array_id(array_id), id(id) {}
};

} // namespace extra
Expand Down
78 changes: 36 additions & 42 deletions cpp/src/codegen/arrow_compute/ext/basic_physical_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,20 @@ class ProjectKernel::Impl {

std::string GetSignature() { return signature_; }

arrow::Status DoCodeGen(int level, const std::vector<std::string> input,
std::shared_ptr<CodeGenContext>* codegen_ctx_out, int* var_id) {
arrow::Status DoCodeGen(
int level,
std::vector<std::pair<std::pair<std::string, std::string>, gandiva::DataTypePtr>>
input,
std::shared_ptr<CodeGenContext>* codegen_ctx_out, int* var_id) {
auto codegen_ctx = std::make_shared<CodeGenContext>();
int idx = 0;
for (auto project : project_list_) {
std::shared_ptr<ExpressionCodegenVisitor> project_node_visitor;
std::vector<std::string> input_list;
std::vector<int> indices_list;
RETURN_NOT_OK(MakeExpressionCodegenVisitor(project, input, {input_field_list_}, -1,
var_id, &input_list,
auto is_local = false;
RETURN_NOT_OK(MakeExpressionCodegenVisitor(project, &input, {input_field_list_}, -1,
var_id, is_local, &input_list,
&project_node_visitor));
codegen_ctx->process_codes += project_node_visitor->GetPrepare();
auto name = project_node_visitor->GetResult();
Expand All @@ -83,26 +87,19 @@ class ProjectKernel::Impl {
auto output_name =
"project_" + std::to_string(level) + "_output_col_" + std::to_string(idx++);
auto output_validity = output_name + "_validity";
codegen_ctx->output_list.push_back(
std::make_pair(output_name, project->return_type()));
std::stringstream output_get_ss;
output_get_ss << "auto " << output_name << " = " << name << ";" << std::endl;
output_get_ss << "auto " << output_validity << " = " << validity << ";"
<< std::endl;

codegen_ctx->output_list.push_back(std::make_pair(
std::make_pair(output_name, output_get_ss.str()), project->return_type()));
for (auto header : project_node_visitor->GetHeaders()) {
if (std::find(codegen_ctx->header_codes.begin(), codegen_ctx->header_codes.end(),
header) == codegen_ctx->header_codes.end()) {
codegen_ctx->header_codes.push_back(header);
}
}

std::stringstream process_ss;
std::stringstream define_ss;

process_ss << output_name << " = " << name << ";" << std::endl;
process_ss << output_validity << " = " << validity << ";" << std::endl;
codegen_ctx->process_codes += process_ss.str();

define_ss << GetCTypeString(project->return_type()) << " " << output_name << ";"
<< std::endl;
define_ss << "bool " << output_validity << ";" << std::endl;
codegen_ctx->definition_codes += define_ss.str();
}
*codegen_ctx_out = codegen_ctx;
return arrow::Status::OK();
Expand Down Expand Up @@ -139,9 +136,11 @@ arrow::Status ProjectKernel::MakeResultIterator(

std::string ProjectKernel::GetSignature() { return impl_->GetSignature(); }

arrow::Status ProjectKernel::DoCodeGen(int level, std::vector<std::string> input,
std::shared_ptr<CodeGenContext>* codegen_ctx,
int* var_id) {
arrow::Status ProjectKernel::DoCodeGen(
int level,
std::vector<std::pair<std::pair<std::string, std::string>, gandiva::DataTypePtr>>
input,
std::shared_ptr<CodeGenContext>* codegen_ctx, int* var_id) {
return impl_->DoCodeGen(level, input, codegen_ctx, var_id);
}

Expand All @@ -166,14 +165,18 @@ class FilterKernel::Impl {

std::string GetSignature() { return signature_; }

arrow::Status DoCodeGen(int level, const std::vector<std::string> input,
std::shared_ptr<CodeGenContext>* codegen_ctx_out, int* var_id) {
arrow::Status DoCodeGen(
int level,
std::vector<std::pair<std::pair<std::string, std::string>, gandiva::DataTypePtr>>
input,
std::shared_ptr<CodeGenContext>* codegen_ctx_out, int* var_id) {
auto codegen_ctx = std::make_shared<CodeGenContext>();
std::shared_ptr<ExpressionCodegenVisitor> condition_node_visitor;
std::vector<std::string> input_list;
std::vector<int> indices_list;
RETURN_NOT_OK(MakeExpressionCodegenVisitor(condition_, input, {input_field_list_}, -1,
var_id, &input_list,
auto is_local = false;
RETURN_NOT_OK(MakeExpressionCodegenVisitor(condition_, &input, {input_field_list_},
-1, var_id, is_local, &input_list,
&condition_node_visitor));
codegen_ctx->process_codes += condition_node_visitor->GetPrepare();
for (auto header : condition_node_visitor->GetHeaders()) {
Expand All @@ -185,27 +188,16 @@ class FilterKernel::Impl {

auto condition_codes = condition_node_visitor->GetResult();
std::stringstream process_ss;
std::stringstream define_ss;
process_ss << "if (!(" << condition_codes << ")) {" << std::endl;
process_ss << "continue;" << std::endl;
process_ss << "}" << std::endl;
int idx = 0;
for (auto field : input_field_list_) {
auto output_name =
"filter_" + std::to_string(level) + "_output_col_" + std::to_string(idx);
auto output_validity = output_name + "_validity";
codegen_ctx->output_list.push_back(std::make_pair(output_name, field->type()));

define_ss << GetCTypeString(field->type()) << " " << output_name << ";"
<< std::endl;
define_ss << "bool " << output_validity << ";" << std::endl;

process_ss << output_name << " = " << input[idx] << ";" << std::endl;
process_ss << output_validity << " = " << input[idx] << "_validity"
<< ";" << std::endl;
codegen_ctx->output_list.push_back(
std::make_pair(std::make_pair(input[idx].first.first, input[idx].first.second),
field->type()));
idx++;
}
codegen_ctx->definition_codes += define_ss.str();
codegen_ctx->process_codes += process_ss.str();

*codegen_ctx_out = codegen_ctx;
Expand Down Expand Up @@ -244,9 +236,11 @@ arrow::Status FilterKernel::MakeResultIterator(

std::string FilterKernel::GetSignature() { return impl_->GetSignature(); }

arrow::Status FilterKernel::DoCodeGen(int level, std::vector<std::string> input,
std::shared_ptr<CodeGenContext>* codegen_ctx,
int* var_id) {
arrow::Status FilterKernel::DoCodeGen(
int level,
std::vector<std::pair<std::pair<std::string, std::string>, gandiva::DataTypePtr>>
input,
std::shared_ptr<CodeGenContext>* codegen_ctx, int* var_id) {
return impl_->DoCodeGen(level, input, codegen_ctx, var_id);
}

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/codegen/arrow_compute/ext/codegen_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,16 @@ std::string GetTempPath() {
return tmp_dir_;
}

bool GetEnableTimeMetrics() {
bool is_enable = false;
const char* env_enable_time_metrics = std::getenv("NATIVESQL_METRICS_TIME");
if (env_enable_time_metrics != nullptr) {
auto is_enable_str = std::string(env_enable_time_metrics);
if (is_enable_str.compare("true") == 0) is_enable = true;
}
return is_enable;
}

int GetBatchSize() {
int batch_size;
const char* env_batch_size = std::getenv("NATIVESQL_BATCH_SIZE");
Expand Down
1 change: 1 addition & 0 deletions cpp/src/codegen/arrow_compute/ext/codegen_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ int FileSpinLock();
void FileSpinUnLock(int fd);

int GetBatchSize();
bool GetEnableTimeMetrics();
std::string exec(const char* cmd);
std::string GetTempPath();
std::string GetArrowTypeDefString(std::shared_ptr<arrow::DataType> type);
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/codegen/arrow_compute/ext/codegen_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ struct CodeGenContext {
std::string finish_codes;
std::string definition_codes;
std::vector<std::string> function_list;
std::vector<std::pair<std::string, std::shared_ptr<arrow::DataType>>> output_list;
std::vector<
std::pair<std::pair<std::string, std::string>, std::shared_ptr<arrow::DataType>>>
output_list;
};
Loading

0 comments on commit 567ada1

Please sign in to comment.