Skip to content

Commit

Permalink
Merge branch 'main' into velox-merge-agg
Browse files Browse the repository at this point in the history
  • Loading branch information
yikf authored Nov 4, 2024
2 parents ce97dd0 + eca5103 commit 3f084f8
Show file tree
Hide file tree
Showing 57 changed files with 653 additions and 490 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
import java.util.Set;

public class CHNativeCacheManager {
public static String cacheParts(String table, Set<String> columns) {
return nativeCacheParts(table, String.join(",", columns));
public static String cacheParts(String table, Set<String> columns, boolean onlyMetaCache) {
return nativeCacheParts(table, String.join(",", columns), onlyMetaCache);
}

private static native String nativeCacheParts(String table, String columns);
private static native String nativeCacheParts(
String table, String columns, boolean onlyMetaCache);

public static CacheResult getCacheStatus(String jobId) {
return nativeGetCacheStatus(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
.toLowerCase(Locale.getDefault)
}

override def validateScan(
override def validateScanExec(
format: ReadFileFormat,
fields: Array[StructField],
rootPaths: Seq[String]): ValidationResult = {
rootPaths: Seq[String],
properties: Map[String, String]): ValidationResult = {

// Validate if all types are supported.
def hasComplexType: Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.gluten.vectorized.CHNativeExpressionEvaluator

import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
Expand Down Expand Up @@ -71,6 +71,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg
}

override def doColumnarShuffleExchangeExecValidate(
outputAttributes: Seq[Attribute],
outputPartitioning: Partitioning,
child: SparkPlan): Option[String] = {
val outputAttributes = child.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf)
}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) =>
case GlutenMergeTreeCacheLoad(mergeTreeTable, columns, onlyMetaCache) =>
try {
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns)
val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns, onlyMetaCache)
context.reply(CacheJobInfo(status = true, jobId))
} catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ object GlutenRpcMessages {
extends GlutenRpcMessage

// for mergetree cache
case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String])
case class GlutenMergeTreeCacheLoad(
mergeTreeTable: String,
columns: util.Set[String],
onlyMetaCache: Boolean)
extends GlutenRpcMessage

case class GlutenCacheLoadStatus(jobId: String)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ case class GlutenCHCacheDataCommand(
(
executorId,
executor.executorEndpointRef.ask[CacheJobInfo](
GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava)
GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
} else {
Expand All @@ -213,7 +213,7 @@ case class GlutenCHCacheDataCommand(
(
value._1,
executorData.executorEndpointRef.ask[CacheJobInfo](
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava)
GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava, onlyMetaCache)
)))
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,14 @@ object MergeTreePartsPartitionsUtil extends Logging {
val splitFiles = selectRanges
.map {
part =>
MergeTreePartSplit(part.name, part.dirName, part.targetNode, 0, part.marks, part.size)
MergeTreePartSplit(
part.name,
part.dirName,
part.targetNode,
part.start,
part.marks,
part.size
)
}
genInputPartitionSeqByFileCnt(
engine,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1461,4 +1461,26 @@ class GlutenClickHouseFileFormatSuite
spark.createDataFrame(data, schema).toDF().write.parquet(fileName)
fileName
}

/** TODO: fix the issue and test in spark 3.5 */
testSparkVersionLE33("write into hdfs") {

/**
* There is a bug in pipeline write to HDFS; when a pipeline returns column batch, it doesn't
* close the hdfs file, and hence the file is not flushed.HDFS file is closed when LocalExecutor
* is destroyed, but before that, the file moved by spark committer.
*/
val tableName = "write_into_hdfs"
val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/"
val format = "parquet"
val sql =
s"""
| select *
| from $format.`$tablePath`
| where long_field > 30
|""".stripMargin
withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) {
testFileFormatBase(tablePath, format, sql, df => {})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -563,5 +563,12 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite {
compareResultsAgainstVanillaSpark(sql, true, { _ => })
spark.sql("drop table t1")
}

test("GLUTEN-7780 fix split diff") {
val sql = "select split(concat('a|b|c', cast(id as string)), '\\|')" +
", split(concat('a|b|c', cast(id as string)), '\\\\|')" +
", split(concat('a|b|c', cast(id as string)), '|') from range(10)"
compareResultsAgainstVanillaSpark(sql, true, { _ => })
}
}
// scalastyle:off line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,26 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
val DBL_RELAX_EPSILON: Double = Math.pow(10, -11)
val FLT_EPSILON = 1.19209290e-07f

protected val sparkVersion: String = {
private val sparkVersion: String = {
val version = SPARK_VERSION_SHORT.split("\\.")
version(0) + "." + version(1)
}
val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")

val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/"
val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/"
val S3_METADATA_PATH = s"/tmp/metadata/s3/$SPARK_DIR_NAME/"
val S3_CACHE_PATH = s"/tmp/s3_cache/$SPARK_DIR_NAME/"
val S3_ENDPOINT = "s3://127.0.0.1:9000/"
val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http")
val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-")
val BUCKET_NAME: String = SPARK_DIR_NAME
val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/"

val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/"
val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/"
val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$SPARK_DIR_NAME/"
val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$SPARK_DIR_NAME/"
val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020"
val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion"
val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME"

val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4"
val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E"
val S3_ACCESS_KEY = "minioadmin"
val S3_SECRET_KEY = "minioadmin"

val CH_DEFAULT_STORAGE_DIR = "/data"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
Expand Down Expand Up @@ -238,7 +238,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
eventually(timeout(60.seconds), interval(2.seconds)) {
assertResult(22)(metaPath.list().length)
Expand Down Expand Up @@ -346,7 +346,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
Expand Down Expand Up @@ -439,7 +439,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
val dataPath = new File(HDFS_CACHE_PATH)
val initial_cache_files = countFiles(dataPath)

val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect()
assertResult(true)(res1(0).getBoolean(0))
assertResult(1)(metaPath.list().length)
Expand Down Expand Up @@ -539,7 +539,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite
| aaa='ccc')""".stripMargin)
.collect()
assertResult(true)(res(0).getBoolean(0))
val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs")
val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs")
assertResult(true)(metaPath.exists() && metaPath.isDirectory)
assertResult(22)(metaPath.list().length)
assert(countFiles(dataPath) > initial_cache_files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1805,6 +1805,29 @@ class GlutenClickHouseMergeTreeWriteSuite
}
}
})

// GLUTEN-7670: fix enable 'files.per.partition.threshold'
withSQLConf(
CHConf.runtimeSettings("enabled_driver_filter_mergetree_index") -> "true",
CHConf.prefixOf("files.per.partition.threshold") -> "10"
) {
runTPCHQueryBySQL(6, sqlStr) {
df =>
val scanExec = collect(df.queryExecution.executedPlan) {
case f: FileSourceScanExecTransformer => f
}
assertResult(1)(scanExec.size)

val mergetreeScan = scanExec.head
assert(mergetreeScan.nodeName.startsWith("Scan mergetree"))

val plans = collect(df.queryExecution.executedPlan) {
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(1)(plans.head.getSplitInfos(null).size)
}
}
}

test("test mergetree with primary keys filter pruning by driver with bucket") {
Expand Down
Loading

0 comments on commit 3f084f8

Please sign in to comment.