Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-17] smjwscg optimization: #18

Merged
merged 2 commits into from
Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public ExpressionEvaluator(List<String> listJars) throws IOException, IllegalAcc
jniWrapper = new ExpressionEvaluatorJniWrapper(tmp_dir, listJars);
jniWrapper.nativeSetJavaTmpDir(jniWrapper.tmp_dir_path);
jniWrapper.nativeSetBatchSize(ColumnarPluginConfig.getBatchSize());
jniWrapper.nativeSetMetricsTime(ColumnarPluginConfig.getEnableMetricsTime());
ColumnarPluginConfig.setRandomTempDir(jniWrapper.tmp_dir_path);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,15 @@ public ExpressionEvaluatorJniWrapper(String tmp_dir, List<String> listJars)
*/
native void nativeSetBatchSize(int batch_size);

/**
* Set native env variables NATIVESQL_METRICS_TIME
*
* @param batch_size numRows of one batch, use
* spark.sql.execution.arrow.maxRecordsPerBatch
*/
native void nativeSetMetricsTime(boolean is_enable);


/**
* Generates the projector module to evaluate the expressions with custom
* configuration.
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/com/intel/oap/ColumnarPluginConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class ColumnarPluginConfig(conf: SparkConf) {
.equals("org.apache.spark.shuffle.sort.ColumnarShuffleManager")
val batchSize: Int =
conf.getInt("spark.sql.execution.arrow.maxRecordsPerBatch", defaultValue = 10000)
val enableMetricsTime: Boolean =
conf.getBoolean("spark.oap.sql.columnar.wholestagecodegen.breakdownTime", defaultValue = false)
val tmpFile: String =
conf.getOption("spark.sql.columnar.tmp_dir").getOrElse(null)
val broadcastCacheTimeout: Int =
Expand Down Expand Up @@ -107,6 +109,13 @@ object ColumnarPluginConfig {
ins.batchSize
}
}
def getEnableMetricsTime: Boolean = synchronized{
if (ins == null) {
false
} else {
ins.enableMetricsTime
}
}
def getTempFile: String = synchronized {
if (ins != null && ins.tmpFile != null) {
ins.tmpFile
Expand Down
10 changes: 3 additions & 7 deletions cpp/src/codegen/arrow_compute/ext/array_item_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,18 +25,14 @@ namespace extra {
struct ArrayItemIndex {
uint16_t id = 0;
uint16_t array_id = 0;
bool valid = true;
ArrayItemIndex() : array_id(0), id(0), valid(true) {}
ArrayItemIndex(bool valid) : array_id(0), id(0), valid(valid) {}
ArrayItemIndex(uint16_t array_id, uint16_t id)
: array_id(array_id), id(id), valid(true) {}
ArrayItemIndex() : array_id(0), id(0) {}
ArrayItemIndex(uint16_t array_id, uint16_t id) : array_id(array_id), id(id) {}
};
struct ArrayItemIndexS {
uint16_t id = 0;
uint16_t array_id = 0;
ArrayItemIndexS() : array_id(0), id(0) {}
ArrayItemIndexS(uint16_t array_id, uint16_t id)
: array_id(array_id), id(id) {}
ArrayItemIndexS(uint16_t array_id, uint16_t id) : array_id(array_id), id(id) {}
};

} // namespace extra
Expand Down
78 changes: 36 additions & 42 deletions cpp/src/codegen/arrow_compute/ext/basic_physical_kernels.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,20 @@ class ProjectKernel::Impl {

std::string GetSignature() { return signature_; }

arrow::Status DoCodeGen(int level, const std::vector<std::string> input,
std::shared_ptr<CodeGenContext>* codegen_ctx_out, int* var_id) {
arrow::Status DoCodeGen(
int level,
std::vector<std::pair<std::pair<std::string, std::string>, gandiva::DataTypePtr>>
input,
std::shared_ptr<CodeGenContext>* codegen_ctx_out, int* var_id) {
auto codegen_ctx = std::make_shared<CodeGenContext>();
int idx = 0;
for (auto project : project_list_) {
std::shared_ptr<ExpressionCodegenVisitor> project_node_visitor;
std::vector<std::string> input_list;
std::vector<int> indices_list;
RETURN_NOT_OK(MakeExpressionCodegenVisitor(project, input, {input_field_list_}, -1,
var_id, &input_list,
auto is_local = false;
RETURN_NOT_OK(MakeExpressionCodegenVisitor(project, &input, {input_field_list_}, -1,
var_id, is_local, &input_list,
&project_node_visitor));
codegen_ctx->process_codes += project_node_visitor->GetPrepare();
auto name = project_node_visitor->GetResult();
Expand All @@ -83,26 +87,19 @@ class ProjectKernel::Impl {
auto output_name =
"project_" + std::to_string(level) + "_output_col_" + std::to_string(idx++);
auto output_validity = output_name + "_validity";
codegen_ctx->output_list.push_back(
std::make_pair(output_name, project->return_type()));
std::stringstream output_get_ss;
output_get_ss << "auto " << output_name << " = " << name << ";" << std::endl;
output_get_ss << "auto " << output_validity << " = " << validity << ";"
<< std::endl;

codegen_ctx->output_list.push_back(std::make_pair(
std::make_pair(output_name, output_get_ss.str()), project->return_type()));
for (auto header : project_node_visitor->GetHeaders()) {
if (std::find(codegen_ctx->header_codes.begin(), codegen_ctx->header_codes.end(),
header) == codegen_ctx->header_codes.end()) {
codegen_ctx->header_codes.push_back(header);
}
}

std::stringstream process_ss;
std::stringstream define_ss;

process_ss << output_name << " = " << name << ";" << std::endl;
process_ss << output_validity << " = " << validity << ";" << std::endl;
codegen_ctx->process_codes += process_ss.str();

define_ss << GetCTypeString(project->return_type()) << " " << output_name << ";"
<< std::endl;
define_ss << "bool " << output_validity << ";" << std::endl;
codegen_ctx->definition_codes += define_ss.str();
}
*codegen_ctx_out = codegen_ctx;
return arrow::Status::OK();
Expand Down Expand Up @@ -139,9 +136,11 @@ arrow::Status ProjectKernel::MakeResultIterator(

std::string ProjectKernel::GetSignature() { return impl_->GetSignature(); }

arrow::Status ProjectKernel::DoCodeGen(int level, std::vector<std::string> input,
std::shared_ptr<CodeGenContext>* codegen_ctx,
int* var_id) {
arrow::Status ProjectKernel::DoCodeGen(
int level,
std::vector<std::pair<std::pair<std::string, std::string>, gandiva::DataTypePtr>>
input,
std::shared_ptr<CodeGenContext>* codegen_ctx, int* var_id) {
return impl_->DoCodeGen(level, input, codegen_ctx, var_id);
}

Expand All @@ -166,14 +165,18 @@ class FilterKernel::Impl {

std::string GetSignature() { return signature_; }

arrow::Status DoCodeGen(int level, const std::vector<std::string> input,
std::shared_ptr<CodeGenContext>* codegen_ctx_out, int* var_id) {
arrow::Status DoCodeGen(
int level,
std::vector<std::pair<std::pair<std::string, std::string>, gandiva::DataTypePtr>>
input,
std::shared_ptr<CodeGenContext>* codegen_ctx_out, int* var_id) {
auto codegen_ctx = std::make_shared<CodeGenContext>();
std::shared_ptr<ExpressionCodegenVisitor> condition_node_visitor;
std::vector<std::string> input_list;
std::vector<int> indices_list;
RETURN_NOT_OK(MakeExpressionCodegenVisitor(condition_, input, {input_field_list_}, -1,
var_id, &input_list,
auto is_local = false;
RETURN_NOT_OK(MakeExpressionCodegenVisitor(condition_, &input, {input_field_list_},
-1, var_id, is_local, &input_list,
&condition_node_visitor));
codegen_ctx->process_codes += condition_node_visitor->GetPrepare();
for (auto header : condition_node_visitor->GetHeaders()) {
Expand All @@ -185,27 +188,16 @@ class FilterKernel::Impl {

auto condition_codes = condition_node_visitor->GetResult();
std::stringstream process_ss;
std::stringstream define_ss;
process_ss << "if (!(" << condition_codes << ")) {" << std::endl;
process_ss << "continue;" << std::endl;
process_ss << "}" << std::endl;
int idx = 0;
for (auto field : input_field_list_) {
auto output_name =
"filter_" + std::to_string(level) + "_output_col_" + std::to_string(idx);
auto output_validity = output_name + "_validity";
codegen_ctx->output_list.push_back(std::make_pair(output_name, field->type()));

define_ss << GetCTypeString(field->type()) << " " << output_name << ";"
<< std::endl;
define_ss << "bool " << output_validity << ";" << std::endl;

process_ss << output_name << " = " << input[idx] << ";" << std::endl;
process_ss << output_validity << " = " << input[idx] << "_validity"
<< ";" << std::endl;
codegen_ctx->output_list.push_back(
std::make_pair(std::make_pair(input[idx].first.first, input[idx].first.second),
field->type()));
idx++;
}
codegen_ctx->definition_codes += define_ss.str();
codegen_ctx->process_codes += process_ss.str();

*codegen_ctx_out = codegen_ctx;
Expand Down Expand Up @@ -244,9 +236,11 @@ arrow::Status FilterKernel::MakeResultIterator(

std::string FilterKernel::GetSignature() { return impl_->GetSignature(); }

arrow::Status FilterKernel::DoCodeGen(int level, std::vector<std::string> input,
std::shared_ptr<CodeGenContext>* codegen_ctx,
int* var_id) {
arrow::Status FilterKernel::DoCodeGen(
int level,
std::vector<std::pair<std::pair<std::string, std::string>, gandiva::DataTypePtr>>
input,
std::shared_ptr<CodeGenContext>* codegen_ctx, int* var_id) {
return impl_->DoCodeGen(level, input, codegen_ctx, var_id);
}

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/codegen/arrow_compute/ext/codegen_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,16 @@ std::string GetTempPath() {
return tmp_dir_;
}

bool GetEnableTimeMetrics() {
bool is_enable = false;
const char* env_enable_time_metrics = std::getenv("NATIVESQL_METRICS_TIME");
if (env_enable_time_metrics != nullptr) {
auto is_enable_str = std::string(env_enable_time_metrics);
if (is_enable_str.compare("true") == 0) is_enable = true;
}
return is_enable;
}

int GetBatchSize() {
int batch_size;
const char* env_batch_size = std::getenv("NATIVESQL_BATCH_SIZE");
Expand Down
1 change: 1 addition & 0 deletions cpp/src/codegen/arrow_compute/ext/codegen_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ int FileSpinLock();
void FileSpinUnLock(int fd);

int GetBatchSize();
bool GetEnableTimeMetrics();
std::string exec(const char* cmd);
std::string GetTempPath();
std::string GetArrowTypeDefString(std::shared_ptr<arrow::DataType> type);
Expand Down
4 changes: 3 additions & 1 deletion cpp/src/codegen/arrow_compute/ext/codegen_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@ struct CodeGenContext {
std::string finish_codes;
std::string definition_codes;
std::vector<std::string> function_list;
std::vector<std::pair<std::string, std::shared_ptr<arrow::DataType>>> output_list;
std::vector<
std::pair<std::pair<std::string, std::string>, std::shared_ptr<arrow::DataType>>>
output_list;
};
Loading