Skip to content

Commit

Permalink
[NSE-120] Columnar window: Reduce peak memory usage and fix performan…
Browse files Browse the repository at this point in the history
…ce issues (oap-project#121)

* fix

* fix test
  • Loading branch information
zhztheplayer authored Feb 24, 2021
1 parent 97bb720 commit 14409f1
Show file tree
Hide file tree
Showing 10 changed files with 123 additions and 19 deletions.
10 changes: 8 additions & 2 deletions core/src/main/scala/com/intel/oap/ColumnarPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -220,19 +220,25 @@ case class ColumnarPreOverrides(conf: SparkConf) extends Rule[SparkPlan] {

case plan: WindowExec =>
if (columnarConf.enableColumnarWindow) {
val child = plan.child match {
val sortRemoved = plan.child match {
case sort: SortExec => // remove ordering requirements
replaceWithColumnarPlan(sort.child)
case _ =>
replaceWithColumnarPlan(plan.child)
}
// disable CoalesceBatchesExec to reduce Netty direct memory usage
val coalesceBatchRemoved = sortRemoved match {
case s: CoalesceBatchesExec =>
s.child
case _ => sortRemoved
}
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
try {
return new ColumnarWindowExec(
plan.windowExpression,
plan.partitionSpec,
plan.orderSpec,
child)
coalesceBatchRemoved)
} catch {
case _: Throwable =>
logInfo("Columnar Window: Falling back to regular Window...")
Expand Down
23 changes: 23 additions & 0 deletions core/src/test/scala/com/intel/oap/tpc/TableGen.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.intel.oap.tpc

trait TableGen {
def gen(): Unit
def createTables(): Unit
}
8 changes: 7 additions & 1 deletion core/src/test/scala/com/intel/oap/tpc/ds/TPCDSSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
override def beforeAll(): Unit = {
super.beforeAll()
LogManager.getRootLogger.setLevel(Level.WARN)
new TPCDSTableGen(spark, 0.1D, TPCDS_WRITE_PATH).gen()
val tGen = new TPCDSTableGen(spark, 0.01D, TPCDS_WRITE_PATH)
tGen.gen()
tGen.createTables()
runner = new TPCRunner(spark, TPCDS_QUERIES_RESOURCE)
}

Expand All @@ -82,6 +84,10 @@ class TPCDSSuite extends QueryTest with SharedSparkSession {
runner.runTPCQuery("q89", 1, true)
runner.runTPCQuery("q98", 1, true)
}

test("window query") {
runner.runTPCQuery("q67", 1, true)
}
}

object TPCDSSuite {
Expand Down
10 changes: 7 additions & 3 deletions core/src/test/scala/com/intel/oap/tpc/ds/TPCDSTableGen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package com.intel.oap.tpc.ds

import java.io.{File, IOException}

import com.intel.oap.tpc.TableGen
import com.intel.oap.tpc.ds.TPCDSTableGen._
import io.trino.tpcds.Results.constructResults
import io.trino.tpcds._
Expand All @@ -29,7 +30,8 @@ import org.apache.spark.sql.{Column, Row, SparkSession}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String) extends Serializable {
class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String)
extends Serializable with TableGen {

def writeParquetTable(name: String, rows: List[Row]): Unit = {
if (name.equals("dbgen_version")) {
Expand Down Expand Up @@ -91,9 +93,9 @@ class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String) extend
.save(path + File.separator + tableName)
}

def gen(): Unit = {
override def gen(): Unit = {
val options = new Options()
options.scale = 0.01D
options.scale = scale
val session = options.toSession
val tableGenerator = new Gen(session)
Table.getBaseTables.forEach { t =>
Expand All @@ -103,7 +105,9 @@ class TPCDSTableGen(val spark: SparkSession, scale: Double, path: String) extend
writeParquetTable(t.getChild.getName, c)
}
}
}

override def createTables(): Unit = {
val files = new File(path).listFiles()
files.foreach(file => {
println("Creating catalog table: " + file.getName)
Expand Down
4 changes: 3 additions & 1 deletion core/src/test/scala/com/intel/oap/tpc/h/TPCHSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ class TPCHSuite extends QueryTest with SharedSparkSession {
override def beforeAll(): Unit = {
super.beforeAll()
LogManager.getRootLogger.setLevel(Level.WARN)
new TPCHTableGen(spark, 0.1D, TPCH_WRITE_PATH).gen()
val tGen = new TPCHTableGen(spark, 0.1D, TPCH_WRITE_PATH)
tGen.gen()
tGen.createTables()
runner = new TPCRunner(spark, TPCH_QUERIES_RESOURCE)
}

Expand Down
9 changes: 7 additions & 2 deletions core/src/test/scala/com/intel/oap/tpc/h/TPCHTableGen.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package com.intel.oap.tpc.h
import java.io.File
import java.sql.Date

import com.intel.oap.tpc.TableGen
import io.trino.tpch._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

class TPCHTableGen(val spark: SparkSession, scale: Double, path: String) extends Serializable {
class TPCHTableGen(val spark: SparkSession, scale: Double, path: String)
extends Serializable with TableGen {

// lineitem
private def lineItemGenerator = { () =>
Expand Down Expand Up @@ -297,7 +299,7 @@ class TPCHTableGen(val spark: SparkSession, scale: Double, path: String) extends
.parquet(path + File.separator + tableName)
}

def gen(): Unit = {
override def gen(): Unit = {
generate(path, "lineitem", lineItemSchema, lineItemGenerator, lineItemParser)
generate(path, "customer", customerSchema, customerGenerator, customerParser)
generate(path, "orders", orderSchema, orderGenerator, orderParser)
Expand All @@ -306,6 +308,9 @@ class TPCHTableGen(val spark: SparkSession, scale: Double, path: String) extends
generate(path, "nation", nationSchema, nationGenerator, nationParser)
generate(path, "part", partSchema, partGenerator, partParser)
generate(path, "region", regionSchema, regionGenerator, regionParser)
}

override def createTables(): Unit = {
val files = new File(path).listFiles()
files.foreach(file => {
println("Creating catalog table: " + file.getName)
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/codegen/arrow_compute/expr_visitor_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,23 @@ class WindowVisitorImpl : public ExprVisitorImpl {
auto col = p_->in_record_batch_->column(col_id);
in1.push_back(col);
}
#ifdef DEBUG
std::cout << "[window kernel] Calling concat_kernel_->Evaluate(in1, &out1) on batch... " << std::endl;
#endif
RETURN_NOT_OK(concat_kernel_->Evaluate(in1, &out1));
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif
}

std::shared_ptr<arrow::Array> in2 = out1;
#ifdef DEBUG
std::cout << "[window kernel] Calling partition_kernel_->Evaluate(in2, &out2) on batch... " << std::endl;
#endif
RETURN_NOT_OK(partition_kernel_->Evaluate(in2, &out2));
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif
}

for (int func_id = 0; func_id < window_function_names_.size(); func_id++) {
Expand All @@ -230,7 +242,13 @@ class WindowVisitorImpl : public ExprVisitorImpl {
in3.push_back(col);
}
in3.push_back(out2);
#ifdef DEBUG
std::cout << "[window kernel] Calling function_kernels_.at(func_id)->Evaluate(in3) on batch... " << std::endl;
#endif
RETURN_NOT_OK(function_kernels_.at(func_id)->Evaluate(in3));
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif
}
return arrow::Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/codegen/arrow_compute/ext/kernels_ext.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ class WindowRankKernel : public KernalBase {
std::vector<std::shared_ptr<ArrayItemIndex>>* offsets);

template <typename ArrayType>
arrow::Status AreTheSameValue(std::vector<ArrayList> values, int column,
arrow::Status AreTheSameValue(const std::vector<ArrayList>& values, int column,
std::shared_ptr<ArrayItemIndex> i,
std::shared_ptr<ArrayItemIndex> j, bool* out);

Expand Down
51 changes: 45 additions & 6 deletions cpp/src/codegen/arrow_compute/ext/window_kernel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) {
std::vector<ArrayList> values;
std::vector<std::shared_ptr<arrow::Int32Array>> group_ids;

#ifdef DEBUG
std::cout << "[window kernel] Entering Rank Kernel's finish method... " << std::endl;
#endif
#ifdef DEBUG
std::cout << "[window kernel] Splitting all input batches to key/value batches... " << std::endl;
#endif
for (auto batch : input_cache_) {
ArrayList values_batch;
for (int i = 0; i < type_list_.size() + 1; i++) {
Expand All @@ -263,7 +269,13 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) {
}
values.push_back(values_batch);
}
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif

#ifdef DEBUG
std::cout << "[window kernel] Calculating max group ID... " << std::endl;
#endif
int32_t max_group_id = 0;
for (int i = 0; i < group_ids.size(); i++) {
auto slice = group_ids.at(i);
Expand All @@ -276,12 +288,19 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) {
}
}
}
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif

// initialize partitions to be sorted
std::vector<std::vector<std::shared_ptr<ArrayItemIndex>>> partitions_to_sort;
for (int i = 0; i <= max_group_id; i++) {
partitions_to_sort.emplace_back();
}

#ifdef DEBUG
std::cout << "[window kernel] Creating indexed array based on group IDs... " << std::endl;
#endif
for (int i = 0; i < group_ids.size(); i++) {
auto slice = group_ids.at(i);
for (int j = 0; j < slice->length(); j++) {
Expand All @@ -292,19 +311,32 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) {
partitions_to_sort.at(partition_id).push_back(std::make_shared<ArrayItemIndex>(i, j));
}
}
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif

std::vector<std::vector<std::shared_ptr<ArrayItemIndex>>> sorted_partitions;
RETURN_NOT_OK(SortToIndicesPrepare(values));
for (int i = 0; i <= max_group_id; i++) {
std::vector<std::shared_ptr<ArrayItemIndex>> partition = partitions_to_sort.at(i);
std::vector<std::shared_ptr<ArrayItemIndex>> sorted_partition;
#ifdef DEBUG
std::cout << "[window kernel] Sorting a single partition... " << std::endl;
#endif
RETURN_NOT_OK(SortToIndicesFinish(partition, &sorted_partition));
sorted_partitions.push_back(sorted_partition);
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif
sorted_partitions.push_back(std::move(sorted_partition));
}
int32_t **rank_array = new int32_t*[group_ids.size()];
for (int i = 0; i < group_ids.size(); i++) {
*(rank_array + i) = new int32_t[group_ids.at(i)->length()];
}
for (int i = 0; i <= max_group_id; i++) {
#ifdef DEBUG
std::cout << "[window kernel] Generating rank result on a single partition... " << std::endl;
#endif
std::vector<std::shared_ptr<ArrayItemIndex>> sorted_partition = sorted_partitions.at(i);
int assumed_rank = 0;
for (int j = 0; j < sorted_partition.size(); j++) {
Expand Down Expand Up @@ -344,7 +376,14 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) {
}
rank_array[index->array_id][index->id] = assumed_rank;
}
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif
}

#ifdef DEBUG
std::cout << "[window kernel] Building overall associated rank results... " << std::endl;
#endif
for (int i = 0; i < input_cache_.size(); i++) {
auto batch = input_cache_.at(i);
auto group_id_column_slice = batch.at(type_list_.size());
Expand All @@ -357,8 +396,11 @@ arrow::Status WindowRankKernel::Finish(ArrayList *out) {
RETURN_NOT_OK(rank_builder->Finish(&rank_slice));
out->push_back(rank_slice);
}
#ifdef DEBUG
std::cout << "[window kernel] Finished. " << std::endl;
#endif
for (int i = 0; i < group_ids.size(); i++) {
delete *(rank_array + i);
delete[] *(rank_array + i);
}
delete[] rank_array;
return arrow::Status::OK();
Expand Down Expand Up @@ -404,15 +446,12 @@ arrow::Status WindowRankKernel::SortToIndicesFinish(std::vector<std::shared_ptr<
std::vector<std::shared_ptr<ArrayItemIndex>> decoded_out;
RETURN_NOT_OK(DecodeIndices(out, &decoded_out));
*offsets = decoded_out;
#ifdef DEBUG
std::cout << "RANK: partition sorted: " << out->ToString() << std::endl;
#endif
return arrow::Status::OK();
// todo sort algorithm
}

template<typename ArrayType>
arrow::Status WindowRankKernel::AreTheSameValue(std::vector<ArrayList> values, int column, std::shared_ptr<ArrayItemIndex> i, std::shared_ptr<ArrayItemIndex> j, bool* out) {
arrow::Status WindowRankKernel::AreTheSameValue(const std::vector<ArrayList>& values, int column, std::shared_ptr<ArrayItemIndex> i, std::shared_ptr<ArrayItemIndex> j, bool* out) {
auto typed_array_i = std::dynamic_pointer_cast<ArrayType>(values.at(i->array_id).at(column));
auto typed_array_j = std::dynamic_pointer_cast<ArrayType>(values.at(j->array_id).at(column));
*out = (typed_array_i->GetView(i->id) == typed_array_j->GetView(j->id));
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/codegen/arrow_compute/ext/window_sort_kernel.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ extern "C" void MakeCodeGen(arrow::compute::FunctionContext* ctx,
}
std::string GetCompFunction(std::vector<int> sort_key_index_list) {
std::stringstream ss;
ss << "auto comp = [this](ArrayItemIndex x, ArrayItemIndex y) {"
ss << "auto comp = [this](const ArrayItemIndex& x, const ArrayItemIndex& y) {"
<< GetCompFunction_(0, sort_key_index_list) << "};";
return ss.str();
}
Expand Down Expand Up @@ -655,8 +655,9 @@ class WindowSortOnekeyKernel : public WindowSortKernel::Impl {
[this](auto& x) -> decltype(auto){ return cached_key_[x.array_id]->GetView(x.id); });
}
} else {
auto comp = [this](ArrayItemIndex x, ArrayItemIndex y) {
return cached_key_[x.array_id]->GetView(x.id) > cached_key_[y.array_id]->GetView(y.id);};
auto comp = [this](const ArrayItemIndex& x, const ArrayItemIndex& y) {
return cached_key_[x.array_id]->GetView(x.id) > cached_key_[y.array_id]->GetView(y.id);
};
if (nulls_first_) {
std::sort(indices_begin + nulls_total, indices_begin + items_total, comp);
} else {
Expand Down

0 comments on commit 14409f1

Please sign in to comment.