Skip to content

Commit

Permalink
[GLUTEN-3582] Support PageIndex (#4634)
Browse files Browse the repository at this point in the history
* Fix typo

(cherry picked from commit c3fbf13)

* 1. using FutureSetFromTuple instead of FutureSetFromStorage. FutureSetFromTuple can buildOrderedSetInplace automatocally, FutureSetFromStorage need set Sizelimits mannually

2. Support PageIndex,  set spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format to true again.

3. Remove skipped test

* refactor gtest

* fix build due to #4664

* v2 for finding performance issue

* Refactor:
add ParquetFileReaderExtBase
add readColumnChunkPageBase
simpilefy build read
remove redundant codes
reemove current_row_group_
std::vector<int32_t> row_groups_ => std::deque<int32_t> row_groups_
std::vector<std::unique_ptr<RowRanges>> row_group_row_ranges_ => std::unordered_map<int32_t, std::unique_ptr<RowRanges>> row_group_row_ranges_
std::vector<std::unique_ptr<ColumnIndexStore>> row_group_column_index_stores_ => std::unordered_map<int32_t, std::unique_ptr<ColumnIndexStore>> row_group_column_index_stores_;
remove std::vector<std::unique_ptr<parquet::RowGroupMetaData>> row_group_metas_;
remove std::vector<std::shared_ptr<parquet::RowGroupPageIndexReader>> row_group_index_readers_

* new loop

* Cleanup

* Cleanup

* Revert: fix build due to #4664

* support case_insensitive_column_matching of parquet

(cherry picked from commit bce0c6668d7bb397127eefeac1943d4c02cf79dc)

* fix case_insensitive_column_matching issue
fix a stupid bug!
add testDataPath
getTpcdsDataPath() => tpcdsDataPath
getClickHouseLibPath() => clickHouseLibPath

* add benchmark

(cherry picked from commit bb0267135243ff8ad980b0521d8302e150a2c4e4)

* lowercase first letter of function name

(cherry picked from commit 98dc9a79bf4f372ecabcac9b47aa06cd328f1aa4)

* add comments

(cherry picked from commit 2fb41831f4e338503ff620ce5eac9917bdb68f6a)

* Remove Camel case member variable

(cherry picked from commit 1ace73205a033e14ca1659f063eb1df65c3e9969)

* Use Int32 instead of int32_t

(cherry picked from commit e7d8fbe701fcd92fb6cb167686602561adc26ec4)

* Camel case for function name

(cherry picked from commit 1ee0516e2eadf045b4aec63de67cf5cb97810217)

* add ColumnIndexFilterPtr alias

(cherry picked from commit 1e9cdd3b08eb4e026a739ee558e9c2dd0c4c88fb)

* using RowRangesMap = absl::flat_hash_map<Int32, std::unique_ptr<RowRanges>>;
using ColumnIndexStoreMap = absl::flat_hash_map<Int32, std::unique_ptr<ColumnIndexStore>>;

(cherry picked from commit 610fcd038d24d54fa30bcc40ab0d4d39f60dd0c4)

* fix style

(cherry picked from commit 8d85db48fe1c93dbc05404aa580b3f11de94c51d)

* fix benchmark due to #4995

* fix build due to ClickHouse/ClickHouse#61502

* fix assertion failed in Debug Build
  • Loading branch information
baibaichen authored Mar 20, 2024
1 parent e977d79 commit 85c2d9d
Show file tree
Hide file tree
Showing 50 changed files with 4,341 additions and 567 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class GlutenClickHouseHiveTableSuite
.set("spark.sql.files.minPartitionNum", "1")
.set("spark.gluten.sql.columnar.columnartorow", "true")
.set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set("spark.gluten.sql.columnar.iterator", "true")
.set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.set("spark.gluten.sql.enable.native.validation", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class GlutenClickHouseNativeExceptionSuite extends GlutenClickHouseWholeStageTra

override protected def sparkConf: SparkConf = {
super.sparkConf
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
}

test("native exception caught by jvm") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class GlutenClickHouseNativeWriteTableSuite
.set("spark.databricks.delta.stalenessLimit", "3600000")
.set("spark.gluten.sql.columnar.columnartorow", "true")
.set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set("spark.gluten.sql.columnar.iterator", "true")
.set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.set("spark.gluten.sql.enable.native.validation", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class GlutenClickHouseSyntheticDataSuite
.set("spark.databricks.delta.stalenessLimit", "3600000")
.set("spark.gluten.sql.columnar.columnarToRow", "true")
.set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set("spark.gluten.sql.columnar.iterator", "true")
.set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.set("spark.gluten.sql.enable.native.validation", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ abstract class GlutenClickHouseTPCDSAbstractSuite

override protected def spark: SparkSession = _spark

protected val tablesPath: String = UTSystemParameters.getTpcdsDataPath() + "/"
protected val tablesPath: String = UTSystemParameters.tpcdsDataPath + "/"
protected val tpcdsQueries: String
protected val queriesResults: String

Expand Down Expand Up @@ -153,7 +153,7 @@ abstract class GlutenClickHouseTPCDSAbstractSuite
.set("spark.databricks.delta.stalenessLimit", "3600000")
.set("spark.gluten.sql.columnar.columnarToRow", "true")
.set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set("spark.gluten.sql.columnar.iterator", "true")
.set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.set("spark.gluten.sql.enable.native.validation", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ abstract class GlutenClickHouseTPCHAbstractSuite
.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.set("spark.gluten.sql.columnar.columnarToRow", "true")
.set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set("spark.gluten.sql.columnar.iterator", "true")
.set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.set("spark.gluten.sql.enable.native.validation", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class GlutenClickHouseTPCHParquetAQESuite
.set("spark.sql.shuffle.partitions", "5")
.set("spark.sql.autoBroadcastJoinThreshold", "10MB")
.set("spark.sql.adaptive.enabled", "true")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "false")
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
.set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "sparkMurmurHash3_32")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu

override protected def sparkConf: SparkConf =
super.sparkConf
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set(
"spark.gluten.sql.columnar.backend.ch.use.v2",
ClickHouseConfig.DEFAULT_USE_DATASOURCE_V2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
.set("spark.databricks.delta.stalenessLimit", "3600000")
.set("spark.gluten.sql.columnar.columnartorow", "true")
.set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set("spark.gluten.sql.columnar.iterator", "true")
.set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.set("spark.gluten.sql.enable.native.validation", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class GlutenFunctionValidateSuite extends GlutenClickHouseWholeStageTransformerS
.set("spark.databricks.delta.stalenessLimit", "3600000")
.set("spark.gluten.sql.columnar.columnartorow", "true")
.set("spark.gluten.sql.columnar.backend.ch.worker.id", "1")
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.set("spark.gluten.sql.columnar.iterator", "true")
.set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
.set("spark.gluten.sql.enable.native.validation", "false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,34 @@ package io.glutenproject.utils

object UTSystemParameters {

val CLICKHOUSE_LIB_PATH_KEY = "clickhouse.lib.path"
val CLICKHOUSE_LIB_PATH_DEFAULT_VALUE = "/usr/local/clickhouse/lib/libch.so"
private val CLICKHOUSE_LIB_PATH_KEY = "clickhouse.lib.path"
private val CLICKHOUSE_LIB_PATH_DEFAULT_VALUE = "/usr/local/clickhouse/lib/libch.so"

val TPCDS_DATA_PATH_KEY = "tpcds.data.path"
val TPCDS_DATA_PATH_DEFAULT_VALUE = "/data/tpcds-data-sf1"

def getClickHouseLibPath(): String = {
def clickHouseLibPath: String = {
System.getProperty(
UTSystemParameters.CLICKHOUSE_LIB_PATH_KEY,
UTSystemParameters.CLICKHOUSE_LIB_PATH_DEFAULT_VALUE)
}

def getTpcdsDataPath(): String = {
private val TEST_DATA_PATH_KEY = "gluten.test.data.path"
private val TEST_DATA_PATH_DEFAULT_VALUE = "/data"

def testDataPath: String = {
System.getProperty(
UTSystemParameters.TPCDS_DATA_PATH_KEY,
UTSystemParameters.TPCDS_DATA_PATH_DEFAULT_VALUE)
UTSystemParameters.TEST_DATA_PATH_KEY,
UTSystemParameters.TEST_DATA_PATH_DEFAULT_VALUE)
}

private val TPCDS_DATA_PATH_KEY = "tpcds.data.path"
private val TPCDS_RELATIVE_DATA_PATH = "tpcds-data-sf1"

def tpcdsDataPath: String = {
val result = System.getProperty(UTSystemParameters.TPCDS_DATA_PATH_KEY, null)
if (result == null) {
s"$testDataPath/$TPCDS_RELATIVE_DATA_PATH"
} else {
result
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object CHAggAndShuffleBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchma

override def getSparkSession: SparkSession = {
beforeAll()
val conf = getSparkcConf
val conf = getSparkConf
.set("spark.gluten.sql.columnar.separate.scan.rdd.for.ch", "false")
.setIfMissing("spark.sql.shuffle.partitions", shufflePartition)
.setIfMissing("spark.shuffle.manager", "sort")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object CHHashBuildBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark w

override def getSparkSession: SparkSession = {

val conf = getSparkcConf
val conf = getSparkConf
.set("spark.driver.maxResultSize", "0")
SparkSession.builder.config(conf).getOrCreate()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object CHParquetReadBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark

override def getSparkSession: SparkSession = {
beforeAll()
val conf = getSparkcConf
val conf = getSparkConf
.setIfMissing("spark.sql.columnVector.offheap.enabled", "true")
.set("spark.gluten.sql.columnar.separate.scan.rdd.for.ch", "true")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ trait CHSqlBasedBenchmark extends SqlBasedBenchmark {
protected val thrdNum: String
protected val memorySize: String
protected val offheapSize: String
def getSparkcConf: SparkConf = {
def getSparkConf: SparkConf = {
val conf = new SparkConf()
.setAppName(appName)
.setIfMissing(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.setIfMissing(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
.setIfMissing("spark.master", s"local[$thrdNum]")
.set("spark.plugins", "io.glutenproject.GlutenPlugin")
.set(
Expand All @@ -58,10 +58,7 @@ trait CHSqlBasedBenchmark extends SqlBasedBenchmark {

override def afterAll(): Unit = {
DeltaLog.clearCache()
val libPath = spark.conf.get(
GlutenConfig.GLUTEN_LIB_PATH,
UTSystemParameters
.getClickHouseLibPath())
val libPath = spark.conf.get(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath)
JniLibLoader.unloadFromPath(libPath)
// Wait for Ctrl+C, convenient for seeing Spark UI
// Thread.sleep(600000)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark

override def getSparkSession: SparkSession = {

val conf = getSparkcConf
val conf = getSparkConf
.set("spark.driver.maxResultSize", "0")
SparkSession.builder.config(conf).getOrCreate()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.gluten.parquet

import io.glutenproject.execution.{FileSourceScanExecTransformer, GlutenClickHouseWholeStageTransformerSuite}
import io.glutenproject.utils.UTSystemParameters

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.gluten.test.GlutenSQLTestUtils
import org.apache.spark.sql.internal.SQLConf

case class ParquetData(parquetDir: String, filter: String, scanOutput: Long)

class GlutenParquetColumnIndexSuite
extends GlutenClickHouseWholeStageTransformerSuite
with GlutenSQLTestUtils
with Logging {

override protected val fileFormat: String = "parquet"
private val testPath: String = s"${UTSystemParameters.testDataPath}/$fileFormat"

// TODO: we need refactor compareResultsAgainstVanillaSpark to make customCheck accept
// both gluten and vanilla spark dataframe
private val parquetData = Seq(
ParquetData(
"index/tpch/20003",
"`27` <> '1-URGENT' and `9` >= '1995-01-01' and `9` < '1996-01-01' ",
140000),
ParquetData(
"index/tpch/upper_case",
"c_comment = '! requests wake. (...)ructions. furiousl'",
12853)
)

parquetData.foreach {
data =>
test(s"${data.parquetDir}") {
val parquetDir = s"$testPath/${data.parquetDir}"
val sql1 = s"""|select count(*) from $fileFormat.`$parquetDir`
|where ${data.filter}
|""".stripMargin
compareResultsAgainstVanillaSpark(
sql1,
compareResult = true,
checkScanOutput(data.scanOutput, _))
}
}

private def checkScanOutput(scanOutput: Long, df: DataFrame): Unit = {
val chScanPlan = df.queryExecution.executedPlan.collect {
case scan: FileSourceScanExecTransformer => scan
}
assertResult(1)(chScanPlan.length)
val chFileScan = chScanPlan.head
assertResult(scanOutput)(chFileScan.longMetric("numOutputRows").value)
}
override protected def sparkConf: SparkConf =
super.sparkConf
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, false)
.set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,12 @@
package org.apache.spark.sql.gluten.test

import io.glutenproject.GlutenConfig
import io.glutenproject.utils.UTSystemParameters

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.test.SharedSparkSession

import org.apache.commons.io.FileUtils

import java.io.File

trait GlutenSQLTestUtils extends SparkFunSuite with SharedSparkSession {
protected val rootPath: String = getClass.getResource("/").getPath
protected val basePath: String = rootPath + "unit-tests-working-home"
protected val tablesPath: String = basePath + "/unit-tests-data"

protected val warehouse: String = basePath + "/spark-warehouse"
protected val metaStorePathAbsolute: String = basePath + "/meta"

override protected def sparkConf: SparkConf =
super.sparkConf
.set(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.getClickHouseLibPath())
.set(
"spark.gluten.sql.columnar.backend.ch.use.v2",
ClickHouseConfig.DEFAULT_USE_DATASOURCE_V2)
.set(GlutenConfig.NATIVE_VALIDATION_ENABLED, false)
.set(StaticSQLConf.WAREHOUSE_PATH, warehouse)

override def beforeAll(): Unit = {
// prepare working paths
val basePathDir = new File(basePath)
if (basePathDir.exists()) {
FileUtils.forceDelete(basePathDir)
}
FileUtils.forceMkdir(basePathDir)
FileUtils.forceMkdir(new File(warehouse))
FileUtils.forceMkdir(new File(metaStorePathAbsolute))
super.beforeAll()
}

override protected def afterAll(): Unit = {
DeltaLog.clearCache()
super.afterAll()
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ include_directories(
${ClickHouse_SOURCE_DIR}/contrib/azure/sdk/storage/azure-storage-common/inc
)

add_subdirectory(Storages/ch_parquet)
add_subdirectory(Storages/Parquet)
add_subdirectory(Storages/SubstraitSource)
add_subdirectory(Functions)

Expand Down Expand Up @@ -138,7 +138,7 @@ PRIVATE
substrait
)

#TODO: target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC ch_parquet)
target_link_libraries(${LOCALENGINE_SHARED_LIB} PUBLIC ch_parquet)

if (ENABLE_JEMALLOC)
target_link_options(${LOCALENGINE_SHARED_LIB} PRIVATE
Expand Down
Loading

0 comments on commit 85c2d9d

Please sign in to comment.