diff --git a/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala b/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala index 86a1eece0..e664ab919 100644 --- a/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala +++ b/core/src/main/scala/com/intel/oap/ColumnarPlugin.scala @@ -220,19 +220,25 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] { case plan: WindowExec => if (columnarConf.enableColumnarWindow) { - val child = plan.child match { + val sortRemoved = plan.child match { case sort: SortExec => // remove ordering requirements replaceWithColumnarPlan(sort.child) case _ => replaceWithColumnarPlan(plan.child) } + // disable CoalesceBatchesExec to reduce Netty direct memory usage + val coalesceBatchRemoved = sortRemoved match { + case s: CoalesceBatchesExec => + s.child + case _ => sortRemoved + } logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") try { return new ColumnarWindowExec( plan.windowExpression, plan.partitionSpec, plan.orderSpec, - child) + coalesceBatchRemoved) } catch { case _: Throwable => logInfo("Columnar Window: Falling back to regular Window...") diff --git a/core/src/test/scala/com/intel/oap/tpc/TableGen.scala b/core/src/test/scala/com/intel/oap/tpc/TableGen.scala new file mode 100644 index 000000000..38e8e154c --- /dev/null +++ b/core/src/test/scala/com/intel/oap/tpc/TableGen.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.intel.oap.tpc + +trait TableGen { + def gen(): Unit + def createTables(): Unit +} diff --git a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala index cca9be0df..feedd53d6 100644 --- a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala +++ b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala @@ -57,7 +57,9 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { override def beforeAll(): Unit = { super.beforeAll() LogManager.getRootLogger.setLevel(Level.WARN) - new TPCDSTableGen(spark, 0.1D, TPCDS_WRITE_PATH).gen() + val tGen = new TPCDSTableGen(spark, 0.01D, TPCDS_WRITE_PATH) + tGen.gen() + tGen.createTables() runner = new TPCRunner(spark, TPCDS_QUERIES_RESOURCE) } @@ -82,6 +84,10 @@ class TPCDSSuite extends QueryTest with SharedSparkSession { runner.runTPCQuery("q89", 1, true) runner.runTPCQuery("q98", 1, true) } + + test("window query") { + runner.runTPCQuery("q67", 1, true) + } } object TPCDSSuite { diff --git a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala index 695ee43be..e2bdaa264 100644 --- a/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala +++ b/core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala @@ -19,6 +19,7 @@ package com.intel.oap.tpc.ds import java.io.{File, IOException} +import com.intel.oap.tpc.TableGen import com.intel.oap.tpc.ds.TPCDSTableGen._ import io.trino.tpcds.Results.constructResults import io.trino.tpcds._ @@ -29,7 +30,8 @@ import org.apache.spark.sql.{Column, Row, SparkSession} import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer -class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String) extends Serializable { +class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String) + extends Serializable with TableGen { def writeParquetTable(name: String, rows: List[Row]): Unit = { if (name.equals("dbgen_version")) { @@ -91,9 +93,9 @@ class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String) extend .save(path + File.separator + tableName) } - def gen(): Unit = { + override def gen(): Unit = { val options = new Options() - options.scale = 0.01D + options.scale = scale val session = options.toSession val tableGenerator = new Gen(session) Table.getBaseTables.forEach { t => @@ -103,7 +105,9 @@ class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String) extend writeParquetTable(t.getChild.getName, c) } } + } + override def createTables(): Unit = { val files = new File(path).listFiles() files.foreach(file => { println("Creating catalog table: " + file.getName) diff --git a/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala b/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala index 38619329d..7776a2b34 100644 --- a/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala +++ b/core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala @@ -72,7 +72,9 @@ class TPCHSuite extends QueryTest with SharedSparkSession { override def beforeAll(): Unit = { super.beforeAll() LogManager.getRootLogger.setLevel(Level.WARN) - new TPCHTableGen(spark, 0.1D, TPCH_WRITE_PATH).gen() + val tGen = new TPCHTableGen(spark, 0.1D, TPCH_WRITE_PATH) + tGen.gen() + tGen.createTables() runner = new TPCRunner(spark, TPCH_QUERIES_RESOURCE) } diff --git a/core/src/test/scala/com/intel/oap/tpc/h/TPCHTableGen.scala b/core/src/test/scala/com/intel/oap/tpc/h/TPCHTableGen.scala index ac1f7da2d..517b0fcf5 100644 --- a/core/src/test/scala/com/intel/oap/tpc/h/TPCHTableGen.scala +++ b/core/src/test/scala/com/intel/oap/tpc/h/TPCHTableGen.scala @@ -20,12 +20,14 @@ package com.intel.oap.tpc.h import java.io.File import java.sql.Date +import com.intel.oap.tpc.TableGen import io.trino.tpch._ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SaveMode, SparkSession} -class TPCHTableGen(val spark: SparkSession, scale: Double, path: String) extends Serializable { +class TPCHTableGen(val spark: SparkSession, scale: Double, path: String) + extends Serializable with TableGen { // lineitem private def lineItemGenerator = { () => @@ -297,7 +299,7 @@ class TPCHTableGen(val spark: SparkSession, scale: Double, path: String) extends .parquet(path + File.separator + tableName) } - def gen(): Unit = { + override def gen(): Unit = { generate(path, "lineitem", lineItemSchema, lineItemGenerator, lineItemParser) generate(path, "customer", customerSchema, customerGenerator, customerParser) generate(path, "orders", orderSchema, orderGenerator, orderParser) @@ -306,6 +308,9 @@ class TPCHTableGen(val spark: SparkSession, scale: Double, path: String) extends generate(path, "nation", nationSchema, nationGenerator, nationParser) generate(path, "part", partSchema, partGenerator, partParser) generate(path, "region", regionSchema, regionGenerator, regionParser) + } + + override def createTables(): Unit = { val files = new File(path).listFiles() files.foreach(file => { println("Creating catalog table: " + file.getName) diff --git a/cpp/src/codegen/arrow_compute/expr_visitor_impl.h b/cpp/src/codegen/arrow_compute/expr_visitor_impl.h index fd2d30a36..428be06a2 100644 --- a/cpp/src/codegen/arrow_compute/expr_visitor_impl.h +++ b/cpp/src/codegen/arrow_compute/expr_visitor_impl.h @@ -211,11 +211,23 @@ class WindowVisitorImpl : public ExprVisitorImpl { auto col = p_->in_record_batch_->column(col_id); in1.push_back(col); } +#ifdef DEBUG + std::cout << "[window kernel] Calling concat_kernel_->Evaluate(in1, &out1) on batch... " << std::endl; +#endif RETURN_NOT_OK(concat_kernel_->Evaluate(in1, &out1)); +#ifdef DEBUG + std::cout << "[window kernel] Finished. " << std::endl; +#endif } std::shared_ptr in2 = out1; +#ifdef DEBUG + std::cout << "[window kernel] Calling partition_kernel_->Evaluate(in2, &out2) on batch... " << std::endl; +#endif RETURN_NOT_OK(partition_kernel_->Evaluate(in2, &out2)); +#ifdef DEBUG + std::cout << "[window kernel] Finished. " << std::endl; +#endif } for (int func_id = 0; func_id < window_function_names_.size(); func_id++) { @@ -230,7 +242,13 @@ class WindowVisitorImpl : public ExprVisitorImpl { in3.push_back(col); } in3.push_back(out2); +#ifdef DEBUG + std::cout << "[window kernel] Calling function_kernels_.at(func_id)->Evaluate(in3) on batch... " << std::endl; +#endif RETURN_NOT_OK(function_kernels_.at(func_id)->Evaluate(in3)); +#ifdef DEBUG + std::cout << "[window kernel] Finished. " << std::endl; +#endif } return arrow::Status::OK(); } diff --git a/cpp/src/codegen/arrow_compute/ext/kernels_ext.h b/cpp/src/codegen/arrow_compute/ext/kernels_ext.h index 3f5c6b9d1..7c9485f74 100644 --- a/cpp/src/codegen/arrow_compute/ext/kernels_ext.h +++ b/cpp/src/codegen/arrow_compute/ext/kernels_ext.h @@ -290,7 +290,7 @@ class WindowRankKernel : public KernalBase { std::vector>* offsets); template - arrow::Status AreTheSameValue(std::vector values, int column, + arrow::Status AreTheSameValue(const std::vector& values, int column, std::shared_ptr i, std::shared_ptr j, bool* out); diff --git a/cpp/src/codegen/arrow_compute/ext/window_kernel.cc b/cpp/src/codegen/arrow_compute/ext/window_kernel.cc index cb719c2bb..f0d8edb1a 100644 --- a/cpp/src/codegen/arrow_compute/ext/window_kernel.cc +++ b/cpp/src/codegen/arrow_compute/ext/window_kernel.cc @@ -250,6 +250,12 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) { std::vector values; std::vector> group_ids; +#ifdef DEBUG + std::cout << "[window kernel] Entering Rank Kernel's finish method... " << std::endl; +#endif +#ifdef DEBUG + std::cout << "[window kernel] Splitting all input batches to key/value batches... " << std::endl; +#endif for (auto batch : input_cache_) { ArrayList values_batch; for (int i = 0; i < type_list_.size() + 1; i++) { @@ -263,7 +269,13 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) { } values.push_back(values_batch); } +#ifdef DEBUG + std::cout << "[window kernel] Finished. " << std::endl; +#endif +#ifdef DEBUG + std::cout << "[window kernel] Calculating max group ID... " << std::endl; +#endif int32_t max_group_id = 0; for (int i = 0; i < group_ids.size(); i++) { auto slice = group_ids.at(i); @@ -276,12 +288,19 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) { } } } +#ifdef DEBUG + std::cout << "[window kernel] Finished. " << std::endl; +#endif + // initialize partitions to be sorted std::vector>> partitions_to_sort; for (int i = 0; i <= max_group_id; i++) { partitions_to_sort.emplace_back(); } +#ifdef DEBUG + std::cout << "[window kernel] Creating indexed array based on group IDs... " << std::endl; +#endif for (int i = 0; i < group_ids.size(); i++) { auto slice = group_ids.at(i); for (int j = 0; j < slice->length(); j++) { @@ -292,19 +311,32 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) { partitions_to_sort.at(partition_id).push_back(std::make_shared(i, j)); } } +#ifdef DEBUG + std::cout << "[window kernel] Finished. " << std::endl; +#endif + std::vector>> sorted_partitions; RETURN_NOT_OK(SortToIndicesPrepare(values)); for (int i = 0; i <= max_group_id; i++) { std::vector> partition = partitions_to_sort.at(i); std::vector> sorted_partition; +#ifdef DEBUG + std::cout << "[window kernel] Sorting a single partition... " << std::endl; +#endif RETURN_NOT_OK(SortToIndicesFinish(partition, &sorted_partition)); - sorted_partitions.push_back(sorted_partition); +#ifdef DEBUG + std::cout << "[window kernel] Finished. " << std::endl; +#endif + sorted_partitions.push_back(std::move(sorted_partition)); } int32_t **rank_array = new int32_t*[group_ids.size()]; for (int i = 0; i < group_ids.size(); i++) { *(rank_array + i) = new int32_t[group_ids.at(i)->length()]; } for (int i = 0; i <= max_group_id; i++) { +#ifdef DEBUG + std::cout << "[window kernel] Generating rank result on a single partition... " << std::endl; +#endif std::vector> sorted_partition = sorted_partitions.at(i); int assumed_rank = 0; for (int j = 0; j < sorted_partition.size(); j++) { @@ -344,7 +376,14 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) { } rank_array[index->array_id][index->id] = assumed_rank; } +#ifdef DEBUG + std::cout << "[window kernel] Finished. " << std::endl; +#endif } + +#ifdef DEBUG + std::cout << "[window kernel] Building overall associated rank results... " << std::endl; +#endif for (int i = 0; i < input_cache_.size(); i++) { auto batch = input_cache_.at(i); auto group_id_column_slice = batch.at(type_list_.size()); @@ -357,8 +396,11 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) { RETURN_NOT_OK(rank_builder->Finish(&rank_slice)); out->push_back(rank_slice); } +#ifdef DEBUG + std::cout << "[window kernel] Finished. " << std::endl; +#endif for (int i = 0; i < group_ids.size(); i++) { - delete *(rank_array + i); + delete[] *(rank_array + i); } delete[] rank_array; return arrow::Status::OK(); @@ -404,15 +446,12 @@ arrow::Status WindowRankKernel::SortToIndicesFinish(std::vector> decoded_out; RETURN_NOT_OK(DecodeIndices(out, &decoded_out)); *offsets = decoded_out; -#ifdef DEBUG - std::cout << "RANK: partition sorted: " << out->ToString() << std::endl; -#endif return arrow::Status::OK(); // todo sort algorithm } template -arrow::Status WindowRankKernel::AreTheSameValue(std::vector values, int column, std::shared_ptr i, std::shared_ptr j, bool* out) { +arrow::Status WindowRankKernel::AreTheSameValue(const std::vector& values, int column, std::shared_ptr i, std::shared_ptr j, bool* out) { auto typed_array_i = std::dynamic_pointer_cast(values.at(i->array_id).at(column)); auto typed_array_j = std::dynamic_pointer_cast(values.at(j->array_id).at(column)); *out = (typed_array_i->GetView(i->id) == typed_array_j->GetView(j->id)); diff --git a/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h b/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h index 6ab7ceba5..29c895776 100644 --- a/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h +++ b/cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h @@ -425,7 +425,7 @@ extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx, } std::string GetCompFunction(std::vector sort_key_index_list) { std::stringstream ss; - ss << "auto comp = [this](ArrayItemIndex x, ArrayItemIndex y) {" + ss << "auto comp = [this](const ArrayItemIndex& x, const ArrayItemIndex& y) {" << GetCompFunction_(0, sort_key_index_list) << "};"; return ss.str(); } @@ -655,8 +655,9 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl { [this](auto& x) -> decltype(auto){ return cached_key_[x.array_id]->GetView(x.id); }); } } else { - auto comp = [this](ArrayItemIndex x, ArrayItemIndex y) { - return cached_key_[x.array_id]->GetView(x.id) > cached_key_[y.array_id]->GetView(y.id);}; + auto comp = [this](const ArrayItemIndex& x, const ArrayItemIndex& y) { + return cached_key_[x.array_id]->GetView(x.id) > cached_key_[y.array_id]->GetView(y.id); + }; if (nulls_first_) { std::sort(indices_begin + nulls_total, indices_begin + items_total, comp); } else {