Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8560] Revert HUDI-8036 #12303

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,139 +18,17 @@
package org.apache.spark.sql.execution.datasources

import org.apache.hadoop.fs.Path
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.util
import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.util.JFunction
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType}
import org.apache.spark.sql.types.DataType

import java.util.TimeZone

trait SparkParsePartitionUtil extends Serializable with Logging {
trait SparkParsePartitionUtil extends Serializable {

def parsePartition(path: Path,
typeInference: Boolean,
basePaths: Set[Path],
userSpecifiedDataTypes: Map[String, DataType],
timeZone: TimeZone,
validatePartitionValues: Boolean = false): InternalRow

/**
* This function generates the partition schema for hoodie file index. This method is used by both HoodieFileIndex and
* HoodieReaderFileIndex. For HoodieReaderFileIndex it upgrades the schema of partition columns with timestamp partition
* type to STRING whereas for HoodieFileIndex it uses the base schema type of such partition columns. This makes sure
* that with output partition format as DD/MM/YYYY, there are no incompatible schema errors while reading the table.
*/
def getPartitionSchema(tableConfig: HoodieTableConfig, schema: StructType, shouldUseStringTypeForTimestampPartitionKeyType: Boolean): StructType = {
val nameFieldMap: Map[String, StructField] = generateFieldMap(schema)
val partitionColumns = tableConfig.getPartitionFields

def validateAndGetPartitionFieldsStruct(partitionFields: Array[StructField]) = {
if (partitionFields.length != partitionColumns.get().length) {
val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent
if (isBootstrapTable) {
// For bootstrapped tables its possible the schema does not contain partition field when source table
// is hive style partitioned. In this case we would like to treat the table as non-partitioned
// as opposed to failing
new StructType()
} else {
throw new IllegalArgumentException(s"Cannot find columns: " +
s"'${partitionColumns.get().filter(col => !nameFieldMap.contains(col)).mkString(",")}' " +
s"in the schema[${nameFieldMap.keys.mkString(",")}]")
}
} else {
new StructType(partitionFields)
}
}

def getPartitionStructFields(keyGeneratorPartitionFieldsOpt: util.Option[String], keyGeneratorClassName: String) = {
val partitionFields: Array[StructField] = if (keyGeneratorPartitionFieldsOpt.isPresent
&& keyGeneratorPartitionFieldsOpt.get().contains(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX)
&& (classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName))) {
val keyGeneratorPartitionFields = keyGeneratorPartitionFieldsOpt.get().split(BaseKeyGenerator.FIELD_SEPARATOR)
keyGeneratorPartitionFields.map(field => CustomAvroKeyGenerator.getPartitionFieldAndKeyType(field))
.map(pair => {
val partitionField = pair.getLeft
val partitionKeyTypeOpt = pair.getRight
partitionKeyTypeOpt.map[StructField] {
JFunction.toJavaFunction {
case PartitionKeyType.SIMPLE => nameFieldMap.getOrElse(partitionField, null)
case PartitionKeyType.TIMESTAMP => if (shouldUseStringTypeForTimestampPartitionKeyType) StructField(partitionField, StringType) else nameFieldMap.getOrElse(partitionField, null)
}
}.orElse(nameFieldMap.getOrElse(partitionField, null))
})
.filter(structField => structField != null)
.array
} else {
partitionColumns.get().filter(column => nameFieldMap.contains(column))
.map(column => nameFieldMap.apply(column))
}
partitionFields
}

val partitionSchema = if (partitionColumns.isPresent) {
// Note that key generator class name could be null
val keyGeneratorPartitionFieldsOpt = HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig)
val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName
if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)
|| classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) {
val partitionFields: Array[StructField] = partitionColumns.get().map(column => StructField(column, StringType))
StructType(partitionFields)
} else {
val partitionFields: Array[StructField] = getPartitionStructFields(keyGeneratorPartitionFieldsOpt, keyGeneratorClassName)
validateAndGetPartitionFieldsStruct(partitionFields)
}
} else {
// If the partition columns have not stored in hoodie.properties(the table that was
// created earlier), we trait it as a non-partitioned table.
logWarning("No partition columns available from hoodie.properties." +
" Partition pruning will not work")
new StructType()
}
partitionSchema
}

/**
* This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding
* [[StructField]] object for every field of the provided [[StructType]], recursively.
*
* For example, following struct
* <pre>
* StructType(
* StructField("a",
* StructType(
* StructField("b", StringType),
* StructField("c", IntType)
* )
* )
* )
* </pre>
*
* will be converted into following mapping:
*
* <pre>
* "a.b" -> StructField("b", StringType),
* "a.c" -> StructField("c", IntType),
* </pre>
*/
private def generateFieldMap(structType: StructType): Map[String, StructField] = {
def traverse(structField: Either[StructField, StructType]): Map[String, StructField] = {
structField match {
case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap
case Left(field) => field.dataType match {
case struct: StructType => traverse(Right(struct)).map {
case (key, structField) => (s"${field.name}.$key", structField)
}
case _ => Map(field.name -> field)
}
}
}

traverse(Right(structType))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,15 @@ package org.apache.hudi

import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath
import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD}
import org.apache.hudi.HoodieFileIndex.{collectReferencedColumns, convertFilterForTimestampKeyGenerator, convertTimestampPartitionValues, getConfigProperties, DataSkippingFailureMode}
import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, convertFilterForTimestampKeyGenerator, getConfigProperties}
import org.apache.hudi.HoodieSparkConfUtils.getConfigValue
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT}
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile}
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.exception.HoodieException
import org.apache.hudi.keygen.{CustomAvroKeyGenerator, KeyGenUtils, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType
import org.apache.hudi.keygen.constant.KeyGeneratorType
import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.storage.{StoragePath, StoragePathInfo}
import org.apache.hudi.util.JFunction

Expand All @@ -44,14 +42,13 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

import javax.annotation.concurrent.NotThreadSafe

import java.text.SimpleDateFormat
import java.util.stream.Collectors
import javax.annotation.concurrent.NotThreadSafe

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import scala.util.{Failure, Success, Try}

/**
* A file index which support partition prune for hoodie snapshot and read-optimized query.
Expand Down Expand Up @@ -91,8 +88,7 @@ case class HoodieFileIndex(spark: SparkSession,
options: Map[String, String],
@transient fileStatusCache: FileStatusCache = NoopCache,
includeLogFiles: Boolean = false,
shouldEmbedFileSlices: Boolean = false,
shouldUseStringTypeForTimestampPartitionKeyType: Boolean = false)
shouldEmbedFileSlices: Boolean = false)
extends SparkHoodieTableFileIndex(
spark = spark,
metaClient = metaClient,
Expand All @@ -102,10 +98,7 @@ case class HoodieFileIndex(spark: SparkSession,
specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant),
fileStatusCache = fileStatusCache,
startCompletionTime = options.get(DataSourceReadOptions.START_COMMIT.key),
endCompletionTime = options.get(DataSourceReadOptions.END_COMMIT.key),
shouldUseStringTypeForTimestampPartitionKeyType = shouldUseStringTypeForTimestampPartitionKeyType
)
with FileIndex {
endCompletionTime = options.get(DataSourceReadOptions.END_COMMIT.key)) with FileIndex {

@transient protected var hasPushedDownPartitionPredicates: Boolean = false

Expand Down Expand Up @@ -171,7 +164,6 @@ case class HoodieFileIndex(spark: SparkSession,
* @return list of PartitionDirectory containing partition to base files mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
val timestampPartitionIndexes = HoodieFileIndex.getTimestampPartitionIndex(metaClient.getTableConfig)
val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters, partitionFilters).map {
case (partitionOpt, fileSlices) =>
if (shouldEmbedFileSlices) {
Expand All @@ -187,13 +179,12 @@ case class HoodieFileIndex(spark: SparkSession,
.map(fileInfo => new FileStatus(fileInfo.getLength, fileInfo.isDirectory, 0, fileInfo.getBlockSize,
fileInfo.getModificationTime, new Path(fileInfo.getPath.toUri)))
val c = fileSlices.filter(f => f.hasLogFiles || f.hasBootstrapBase).foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) }
val convertedPartitionValues = convertTimestampPartitionValues(partitionOpt.get.values, timestampPartitionIndexes, shouldUseStringTypeForTimestampPartitionKeyType)
if (c.nonEmpty) {
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
new HoodiePartitionFileSliceMapping(InternalRow.fromSeq(convertedPartitionValues), c), baseFileStatusesAndLogFileOnly)
new HoodiePartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), c), baseFileStatusesAndLogFileOnly)
} else {
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
InternalRow.fromSeq(convertedPartitionValues), baseFileStatusesAndLogFileOnly)
InternalRow.fromSeq(partitionOpt.get.values), baseFileStatusesAndLogFileOnly)
}

} else {
Expand All @@ -206,9 +197,8 @@ case class HoodieFileIndex(spark: SparkSession,
})
.map(fileInfo => new FileStatus(fileInfo.getLength, fileInfo.isDirectory, 0, fileInfo.getBlockSize,
fileInfo.getModificationTime, new Path(fileInfo.getPath.toUri)))
val convertedPartitionValues = convertTimestampPartitionValues(partitionOpt.get.values, timestampPartitionIndexes, shouldUseStringTypeForTimestampPartitionKeyType)
sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory(
InternalRow.fromSeq(convertedPartitionValues), allCandidateFiles)
InternalRow.fromSeq(partitionOpt.get.values), allCandidateFiles)
}
}

Expand Down Expand Up @@ -597,48 +587,4 @@ object HoodieFileIndex extends Logging {

paths.map(new StoragePath(_))
}

private def convertTimestampPartitionType(timestampPartitionIndexes: Set[Int], index: Int, elem: Any) = {
if (timestampPartitionIndexes.contains(index)) {
org.apache.spark.unsafe.types.UTF8String.fromString(String.valueOf(elem))
} else {
elem
}
}

private def convertTimestampPartitionValues(values: Array[Object], timestampPartitionIndexes: Set[Int], shouldUseStringTypeForTimestampPartitionKeyType: Boolean) = {
if (!shouldUseStringTypeForTimestampPartitionKeyType || timestampPartitionIndexes.isEmpty) {
values
} else {
values.zipWithIndex.map { case (elem, index) => convertTimestampPartitionType(timestampPartitionIndexes, index, elem) }
}
}

/**
* Returns set of indices with timestamp partition type. For Timestamp based keygen, there is only one
* partition so index is 0. For custom keygen, it is the partition indices for which partition type is
* timestamp.
*/
def getTimestampPartitionIndex(tableConfig: HoodieTableConfig): Set[Int] = {
val keyGeneratorClassNameOpt = Option.apply(tableConfig.getKeyGeneratorClassName)
val recordKeyFieldOpt = common.util.Option.ofNullable(tableConfig.getRawRecordKeyFieldProp)
val keyGeneratorClassName = keyGeneratorClassNameOpt.getOrElse(KeyGenUtils.inferKeyGeneratorType(recordKeyFieldOpt, tableConfig.getPartitionFieldProp).getClassName)
if (keyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName)
|| keyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP_AVRO.getClassName)) {
Set(0)
} else if (keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM.getClassName)
|| keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName)) {
val partitionTypes = CustomAvroKeyGenerator.getPartitionTypes(tableConfig)
var partitionIndexes: Set[Int] = Set.empty
for (i <- 0 until partitionTypes.size()) {
if (partitionTypes.get(i).equals(PartitionKeyType.TIMESTAMP)) {
partitionIndexes = partitionIndexes + i
}
}
partitionIndexes
} else {
Set.empty
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
optParams,
FileStatusCache.getOrCreate(sparkSession),
includeLogFiles = true,
shouldEmbedFileSlices = true,
shouldUseStringTypeForTimestampPartitionKeyType = true)
shouldEmbedFileSlices = true)

val configProperties: TypedProperties = getConfigProperties(sparkSession, options, metaClient.getTableConfig)
val metadataConfig: HoodieMetadataConfig = HoodieMetadataConfig.newBuilder
Expand Down Expand Up @@ -335,8 +334,7 @@ class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
Some(tableStructSchema),
optParams,
FileStatusCache.getOrCreate(sparkSession),
shouldEmbedFileSlices = true,
shouldUseStringTypeForTimestampPartitionKeyType = true)
shouldEmbedFileSlices = true)

override def buildFileFormat(): FileFormat = {
if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && !isBootstrap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class HoodieIncrementalFileIndex(override val spark: SparkSession,
override val includeLogFiles: Boolean,
override val shouldEmbedFileSlices: Boolean)
extends HoodieFileIndex(
spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, shouldEmbedFileSlices,
shouldUseStringTypeForTimestampPartitionKeyType = true
spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, shouldEmbedFileSlices
) with FileIndex {
val mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation = MergeOnReadIncrementalRelation(
spark.sqlContext, options, metaClient, schemaSpec, schemaSpec)
Expand Down
Loading
Loading