From 3622b1442f94f559446318d1e53d892af1a19e19 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Wed, 20 Nov 2024 14:01:32 -0500 Subject: [PATCH 1/2] revert changes --- .../datasources/SparkParsePartitionUtil.scala | 126 +-- .../org/apache/hudi/HoodieFileIndex.scala | 74 +- .../hudi/HoodieHadoopFsRelationFactory.scala | 6 +- .../hudi/HoodieIncrementalFileIndex.scala | 3 +- .../hudi/SparkHoodieTableFileIndex.scala | 55 +- .../sql/hudi/analysis/HoodieAnalysis.scala | 112 +-- .../org/apache/hudi/TestHoodieFileIndex.scala | 78 +- .../TestSparkSqlWithCustomKeyGenerator.scala | 812 ++++++++---------- 8 files changed, 453 insertions(+), 813 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala index 26cdd5eb05ca..ecf775f715a8 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/SparkParsePartitionUtil.scala @@ -18,18 +18,12 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.fs.Path -import org.apache.hudi.common.table.HoodieTableConfig -import org.apache.hudi.common.util -import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType -import org.apache.hudi.keygen.{BaseKeyGenerator, CustomAvroKeyGenerator, CustomKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} -import org.apache.hudi.util.JFunction -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} +import org.apache.spark.sql.types.DataType import java.util.TimeZone -trait SparkParsePartitionUtil extends Serializable with Logging { +trait SparkParsePartitionUtil extends Serializable { def parsePartition(path: Path, typeInference: Boolean, @@ -37,120 +31,4 @@ trait SparkParsePartitionUtil extends Serializable with Logging { userSpecifiedDataTypes: Map[String, DataType], timeZone: TimeZone, validatePartitionValues: Boolean = false): InternalRow - - /** - * This function generates the partition schema for hoodie file index. This method is used by both HoodieFileIndex and - * HoodieReaderFileIndex. For HoodieReaderFileIndex it upgrades the schema of partition columns with timestamp partition - * type to STRING whereas for HoodieFileIndex it uses the base schema type of such partition columns. This makes sure - * that with output partition format as DD/MM/YYYY, there are no incompatible schema errors while reading the table. - */ - def getPartitionSchema(tableConfig: HoodieTableConfig, schema: StructType, shouldUseStringTypeForTimestampPartitionKeyType: Boolean): StructType = { - val nameFieldMap: Map[String, StructField] = generateFieldMap(schema) - val partitionColumns = tableConfig.getPartitionFields - - def validateAndGetPartitionFieldsStruct(partitionFields: Array[StructField]) = { - if (partitionFields.length != partitionColumns.get().length) { - val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent - if (isBootstrapTable) { - // For bootstrapped tables its possible the schema does not contain partition field when source table - // is hive style partitioned. In this case we would like to treat the table as non-partitioned - // as opposed to failing - new StructType() - } else { - throw new IllegalArgumentException(s"Cannot find columns: " + - s"'${partitionColumns.get().filter(col => !nameFieldMap.contains(col)).mkString(",")}' " + - s"in the schema[${nameFieldMap.keys.mkString(",")}]") - } - } else { - new StructType(partitionFields) - } - } - - def getPartitionStructFields(keyGeneratorPartitionFieldsOpt: util.Option[String], keyGeneratorClassName: String) = { - val partitionFields: Array[StructField] = if (keyGeneratorPartitionFieldsOpt.isPresent - && keyGeneratorPartitionFieldsOpt.get().contains(BaseKeyGenerator.CUSTOM_KEY_GENERATOR_SPLIT_REGEX) - && (classOf[CustomKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) - || classOf[CustomAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName))) { - val keyGeneratorPartitionFields = keyGeneratorPartitionFieldsOpt.get().split(BaseKeyGenerator.FIELD_SEPARATOR) - keyGeneratorPartitionFields.map(field => CustomAvroKeyGenerator.getPartitionFieldAndKeyType(field)) - .map(pair => { - val partitionField = pair.getLeft - val partitionKeyTypeOpt = pair.getRight - partitionKeyTypeOpt.map[StructField] { - JFunction.toJavaFunction { - case PartitionKeyType.SIMPLE => nameFieldMap.getOrElse(partitionField, null) - case PartitionKeyType.TIMESTAMP => if (shouldUseStringTypeForTimestampPartitionKeyType) StructField(partitionField, StringType) else nameFieldMap.getOrElse(partitionField, null) - } - }.orElse(nameFieldMap.getOrElse(partitionField, null)) - }) - .filter(structField => structField != null) - .array - } else { - partitionColumns.get().filter(column => nameFieldMap.contains(column)) - .map(column => nameFieldMap.apply(column)) - } - partitionFields - } - - val partitionSchema = if (partitionColumns.isPresent) { - // Note that key generator class name could be null - val keyGeneratorPartitionFieldsOpt = HoodieTableConfig.getPartitionFieldPropForKeyGenerator(tableConfig) - val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName - if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) - || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { - val partitionFields: Array[StructField] = partitionColumns.get().map(column => StructField(column, StringType)) - StructType(partitionFields) - } else { - val partitionFields: Array[StructField] = getPartitionStructFields(keyGeneratorPartitionFieldsOpt, keyGeneratorClassName) - validateAndGetPartitionFieldsStruct(partitionFields) - } - } else { - // If the partition columns have not stored in hoodie.properties(the table that was - // created earlier), we trait it as a non-partitioned table. - logWarning("No partition columns available from hoodie.properties." + - " Partition pruning will not work") - new StructType() - } - partitionSchema - } - - /** - * This method unravels [[StructType]] into a [[Map]] of pairs of dot-path notation with corresponding - * [[StructField]] object for every field of the provided [[StructType]], recursively. - * - * For example, following struct - *
- * StructType( - * StructField("a", - * StructType( - * StructField("b", StringType), - * StructField("c", IntType) - * ) - * ) - * ) - *- * - * will be converted into following mapping: - * - *
- * "a.b" -> StructField("b", StringType), - * "a.c" -> StructField("c", IntType), - *- */ - private def generateFieldMap(structType: StructType): Map[String, StructField] = { - def traverse(structField: Either[StructField, StructType]): Map[String, StructField] = { - structField match { - case Right(struct) => struct.fields.flatMap(f => traverse(Left(f))).toMap - case Left(field) => field.dataType match { - case struct: StructType => traverse(Right(struct)).map { - case (key, structField) => (s"${field.name}.$key", structField) - } - case _ => Map(field.name -> field) - } - } - } - - traverse(Right(structType)) - } - } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index a29002168d8d..aa33032fb3bd 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -19,17 +19,15 @@ package org.apache.hudi import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD} -import org.apache.hudi.HoodieFileIndex.{collectReferencedColumns, convertFilterForTimestampKeyGenerator, convertTimestampPartitionValues, getConfigProperties, DataSkippingFailureMode} +import org.apache.hudi.HoodieFileIndex.{DataSkippingFailureMode, collectReferencedColumns, convertFilterForTimestampKeyGenerator, getConfigProperties} import org.apache.hudi.HoodieSparkConfUtils.getConfigValue -import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.config.TimestampKeyGeneratorConfig.{TIMESTAMP_INPUT_DATE_FORMAT, TIMESTAMP_OUTPUT_DATE_FORMAT} +import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} import org.apache.hudi.common.model.{FileSlice, HoodieBaseFile, HoodieLogFile} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.util.StringUtils import org.apache.hudi.exception.HoodieException -import org.apache.hudi.keygen.{CustomAvroKeyGenerator, KeyGenUtils, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} -import org.apache.hudi.keygen.CustomAvroKeyGenerator.PartitionKeyType -import org.apache.hudi.keygen.constant.KeyGeneratorType +import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.storage.{StoragePath, StoragePathInfo} import org.apache.hudi.util.JFunction @@ -44,14 +42,13 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -import javax.annotation.concurrent.NotThreadSafe - import java.text.SimpleDateFormat import java.util.stream.Collectors +import javax.annotation.concurrent.NotThreadSafe import scala.collection.JavaConverters._ -import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal +import scala.util.{Failure, Success, Try} /** * A file index which support partition prune for hoodie snapshot and read-optimized query. @@ -91,8 +88,7 @@ case class HoodieFileIndex(spark: SparkSession, options: Map[String, String], @transient fileStatusCache: FileStatusCache = NoopCache, includeLogFiles: Boolean = false, - shouldEmbedFileSlices: Boolean = false, - shouldUseStringTypeForTimestampPartitionKeyType: Boolean = false) + shouldEmbedFileSlices: Boolean = false) extends SparkHoodieTableFileIndex( spark = spark, metaClient = metaClient, @@ -102,10 +98,7 @@ case class HoodieFileIndex(spark: SparkSession, specifiedQueryInstant = options.get(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key).map(HoodieSqlCommonUtils.formatQueryInstant), fileStatusCache = fileStatusCache, startCompletionTime = options.get(DataSourceReadOptions.START_COMMIT.key), - endCompletionTime = options.get(DataSourceReadOptions.END_COMMIT.key), - shouldUseStringTypeForTimestampPartitionKeyType = shouldUseStringTypeForTimestampPartitionKeyType - ) - with FileIndex { + endCompletionTime = options.get(DataSourceReadOptions.END_COMMIT.key)) with FileIndex { @transient protected var hasPushedDownPartitionPredicates: Boolean = false @@ -171,7 +164,6 @@ case class HoodieFileIndex(spark: SparkSession, * @return list of PartitionDirectory containing partition to base files mapping */ override def listFiles(partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = { - val timestampPartitionIndexes = HoodieFileIndex.getTimestampPartitionIndex(metaClient.getTableConfig) val prunedPartitionsAndFilteredFileSlices = filterFileSlices(dataFilters, partitionFilters).map { case (partitionOpt, fileSlices) => if (shouldEmbedFileSlices) { @@ -187,13 +179,12 @@ case class HoodieFileIndex(spark: SparkSession, .map(fileInfo => new FileStatus(fileInfo.getLength, fileInfo.isDirectory, 0, fileInfo.getBlockSize, fileInfo.getModificationTime, new Path(fileInfo.getPath.toUri))) val c = fileSlices.filter(f => f.hasLogFiles || f.hasBootstrapBase).foldLeft(Map[String, FileSlice]()) { (m, f) => m + (f.getFileId -> f) } - val convertedPartitionValues = convertTimestampPartitionValues(partitionOpt.get.values, timestampPartitionIndexes, shouldUseStringTypeForTimestampPartitionKeyType) if (c.nonEmpty) { sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory( - new HoodiePartitionFileSliceMapping(InternalRow.fromSeq(convertedPartitionValues), c), baseFileStatusesAndLogFileOnly) + new HoodiePartitionFileSliceMapping(InternalRow.fromSeq(partitionOpt.get.values), c), baseFileStatusesAndLogFileOnly) } else { sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory( - InternalRow.fromSeq(convertedPartitionValues), baseFileStatusesAndLogFileOnly) + InternalRow.fromSeq(partitionOpt.get.values), baseFileStatusesAndLogFileOnly) } } else { @@ -206,9 +197,8 @@ case class HoodieFileIndex(spark: SparkSession, }) .map(fileInfo => new FileStatus(fileInfo.getLength, fileInfo.isDirectory, 0, fileInfo.getBlockSize, fileInfo.getModificationTime, new Path(fileInfo.getPath.toUri))) - val convertedPartitionValues = convertTimestampPartitionValues(partitionOpt.get.values, timestampPartitionIndexes, shouldUseStringTypeForTimestampPartitionKeyType) sparkAdapter.getSparkPartitionedFileUtils.newPartitionDirectory( - InternalRow.fromSeq(convertedPartitionValues), allCandidateFiles) + InternalRow.fromSeq(partitionOpt.get.values), allCandidateFiles) } } @@ -597,48 +587,4 @@ object HoodieFileIndex extends Logging { paths.map(new StoragePath(_)) } - - private def convertTimestampPartitionType(timestampPartitionIndexes: Set[Int], index: Int, elem: Any) = { - if (timestampPartitionIndexes.contains(index)) { - org.apache.spark.unsafe.types.UTF8String.fromString(String.valueOf(elem)) - } else { - elem - } - } - - private def convertTimestampPartitionValues(values: Array[Object], timestampPartitionIndexes: Set[Int], shouldUseStringTypeForTimestampPartitionKeyType: Boolean) = { - if (!shouldUseStringTypeForTimestampPartitionKeyType || timestampPartitionIndexes.isEmpty) { - values - } else { - values.zipWithIndex.map { case (elem, index) => convertTimestampPartitionType(timestampPartitionIndexes, index, elem) } - } - } - - /** - * Returns set of indices with timestamp partition type. For Timestamp based keygen, there is only one - * partition so index is 0. For custom keygen, it is the partition indices for which partition type is - * timestamp. - */ - def getTimestampPartitionIndex(tableConfig: HoodieTableConfig): Set[Int] = { - val keyGeneratorClassNameOpt = Option.apply(tableConfig.getKeyGeneratorClassName) - val recordKeyFieldOpt = common.util.Option.ofNullable(tableConfig.getRawRecordKeyFieldProp) - val keyGeneratorClassName = keyGeneratorClassNameOpt.getOrElse(KeyGenUtils.inferKeyGeneratorType(recordKeyFieldOpt, tableConfig.getPartitionFieldProp).getClassName) - if (keyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP.getClassName) - || keyGeneratorClassName.equals(KeyGeneratorType.TIMESTAMP_AVRO.getClassName)) { - Set(0) - } else if (keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM.getClassName) - || keyGeneratorClassName.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName)) { - val partitionTypes = CustomAvroKeyGenerator.getPartitionTypes(tableConfig) - var partitionIndexes: Set[Int] = Set.empty - for (i <- 0 until partitionTypes.size()) { - if (partitionTypes.get(i).equals(PartitionKeyType.TIMESTAMP)) { - partitionIndexes = partitionIndexes + i - } - } - partitionIndexes - } else { - Set.empty - } - } - } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala index ceb5a3826824..46310a3706cc 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieHadoopFsRelationFactory.scala @@ -236,8 +236,7 @@ class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext: optParams, FileStatusCache.getOrCreate(sparkSession), includeLogFiles = true, - shouldEmbedFileSlices = true, - shouldUseStringTypeForTimestampPartitionKeyType = true) + shouldEmbedFileSlices = true) val configProperties: TypedProperties = getConfigProperties(sparkSession, options, metaClient.getTableConfig) val metadataConfig: HoodieMetadataConfig = HoodieMetadataConfig.newBuilder @@ -335,8 +334,7 @@ class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext: Some(tableStructSchema), optParams, FileStatusCache.getOrCreate(sparkSession), - shouldEmbedFileSlices = true, - shouldUseStringTypeForTimestampPartitionKeyType = true) + shouldEmbedFileSlices = true) override def buildFileFormat(): FileFormat = { if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && !isBootstrap) { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala index 2431ed00b3a7..7b39257c4d27 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieIncrementalFileIndex.scala @@ -41,8 +41,7 @@ class HoodieIncrementalFileIndex(override val spark: SparkSession, override val includeLogFiles: Boolean, override val shouldEmbedFileSlices: Boolean) extends HoodieFileIndex( - spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, shouldEmbedFileSlices, - shouldUseStringTypeForTimestampPartitionKeyType = true + spark, metaClient, schemaSpec, options, fileStatusCache, includeLogFiles, shouldEmbedFileSlices ) with FileIndex { val mergeOnReadIncrementalRelation: MergeOnReadIncrementalRelation = MergeOnReadIncrementalRelation( spark.sqlContext, options, metaClient, schemaSpec, schemaSpec) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala index 5d2732712f1e..fc9c9ff94336 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala @@ -20,7 +20,7 @@ package org.apache.hudi import org.apache.hudi.BaseHoodieTableFileIndex.PartitionPath import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.HoodieConversionUtils.toJavaOption -import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, extractEqualityPredicatesLiteralValues, haveProperPartitionValues, shouldListLazily, shouldUsePartitionPathPrefixAnalysis, shouldValidatePartitionColumns} +import org.apache.hudi.SparkHoodieTableFileIndex.{deduceQueryType, extractEqualityPredicatesLiteralValues, generateFieldMap, haveProperPartitionValues, shouldListLazily, shouldUsePartitionPathPrefixAnalysis, shouldValidatePartitionColumns} import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.{TimestampKeyGeneratorConfig, TypedProperties} import org.apache.hudi.common.model.{FileSlice, HoodieTableQueryType} @@ -31,7 +31,7 @@ import org.apache.hudi.config.HoodieBootstrapConfig.DATA_QUERIES_ONLY import org.apache.hudi.hadoop.fs.HadoopFSUtils import org.apache.hudi.internal.schema.Types.RecordType import org.apache.hudi.internal.schema.utils.Conversions -import org.apache.hudi.keygen.StringPartitionPathFormatter +import org.apache.hudi.keygen.{StringPartitionPathFormatter, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.storage.{StoragePath, StoragePathInfo} import org.apache.hudi.util.JFunction @@ -40,7 +40,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, BoundReference, EmptyRow, EqualTo, Expression, InterpretedPredicate, Literal} import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.{FileStatusCache, NoopCache} @@ -49,7 +49,6 @@ import org.apache.spark.sql.types.{ByteType, DateType, IntegerType, LongType, Sh import org.apache.spark.unsafe.types.UTF8String import javax.annotation.concurrent.NotThreadSafe - import java.util.Collections import scala.collection.JavaConverters._ @@ -78,8 +77,7 @@ class SparkHoodieTableFileIndex(spark: SparkSession, specifiedQueryInstant: Option[String] = None, @transient fileStatusCache: FileStatusCache = NoopCache, startCompletionTime: Option[String] = None, - endCompletionTime: Option[String] = None, - shouldUseStringTypeForTimestampPartitionKeyType: Boolean = false) + endCompletionTime: Option[String] = None) extends BaseHoodieTableFileIndex( new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), metaClient, @@ -118,12 +116,45 @@ class SparkHoodieTableFileIndex(spark: SparkSession, /** * Get the partition schema from the hoodie.properties. */ - lazy val _partitionSchemaFromProperties: StructType = { - getPartitionSchema() - } - - def getPartitionSchema(): StructType = { - sparkParsePartitionUtil.getPartitionSchema(metaClient.getTableConfig, schema, shouldUseStringTypeForTimestampPartitionKeyType) + private lazy val _partitionSchemaFromProperties: StructType = { + val tableConfig = metaClient.getTableConfig + val partitionColumns = tableConfig.getPartitionFields + val nameFieldMap = generateFieldMap(schema) + + if (partitionColumns.isPresent) { + // Note that key generator class name could be null + val keyGeneratorClassName = tableConfig.getKeyGeneratorClassName + if (classOf[TimestampBasedKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName) + || classOf[TimestampBasedAvroKeyGenerator].getName.equalsIgnoreCase(keyGeneratorClassName)) { + val partitionFields: Array[StructField] = partitionColumns.get().map(column => StructField(column, StringType)) + StructType(partitionFields) + } else { + val partitionFields: Array[StructField] = partitionColumns.get().filter(column => nameFieldMap.contains(column)) + .map(column => nameFieldMap.apply(column)) + + if (partitionFields.length != partitionColumns.get().length) { + val isBootstrapTable = tableConfig.getBootstrapBasePath.isPresent + if (isBootstrapTable) { + // For bootstrapped tables its possible the schema does not contain partition field when source table + // is hive style partitioned. In this case we would like to treat the table as non-partitioned + // as opposed to failing + new StructType() + } else { + throw new IllegalArgumentException(s"Cannot find columns: " + + s"'${partitionColumns.get().filter(col => !nameFieldMap.contains(col)).mkString(",")}' " + + s"in the schema[${schema.fields.mkString(",")}]") + } + } else { + new StructType(partitionFields) + } + } + } else { + // If the partition columns have not stored in hoodie.properties(the table that was + // created earlier), we trait it as a non-partitioned table. + logWarning("No partition columns available from hoodie.properties." + + " Partition pruning will not work") + new StructType() + } } /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index cc461c8c6f2a..35d9dba7fb17 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -17,25 +17,23 @@ package org.apache.spark.sql.hudi.analysis -import org.apache.hudi.SparkAdapterSupport.sparkAdapter.isHoodieTable import org.apache.hudi.common.util.ReflectionUtils.loadClass import org.apache.hudi.common.util.{ReflectionUtils, ValidationUtils} -import org.apache.hudi.{HoodieFileIndex, HoodieSchemaUtils, HoodieSparkUtils, SparkAdapterSupport} +import org.apache.hudi.{HoodieSchemaUtils, HoodieSparkUtils, SparkAdapterSupport} + import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute} +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable} -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSeq, Expression, GenericInternalRow} import org.apache.spark.sql.catalyst.optimizer.ReplaceExpressions -import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.{CreateTable, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, removeMetaFields} -import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateIndex, MatchCreateTableLike, MatchDropIndex, MatchInsertIntoStatement, MatchMergeIntoTable, MatchRefreshIndex, MatchShowIndexes, ResolvesToHudiTable, instantiateKlass, sparkAdapter} +import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateIndex, MatchCreateTableLike, MatchDropIndex, MatchInsertIntoStatement, MatchMergeIntoTable, MatchRefreshIndex, MatchShowIndexes, ResolvesToHudiTable, sparkAdapter} import org.apache.spark.sql.hudi.command._ import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure, ProcedureArgs} -import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.{AnalysisException, SparkSession} import java.util @@ -170,87 +168,9 @@ object HoodieAnalysis extends SparkAdapterSupport { * In Spark < 3.2 however, this is worked around by simply removing any meta-fields from the output * of the [[LogicalRelation]] resolving into Hudi table. Note that, it's a safe operation since we * actually need to ignore these values anyway - * - * This function also adds transformations to the logical plan so that the HadoopFSRelation does not use - * HoodieFileIndex with shouldUseStringTypeForTimestampPartitionKeyType set to true while performing write operations. - * This is required because shouldUseStringTypeForTimestampPartitionKeyType is set to true only for reading - * the hudi tables. The flag changes the reader schema to ensure timestamp partition fields can be - * read using CustomKeyGenerator. - * Only InsertIntoStatement, MergeIntoTable, UpdateTable and DeleteFromTable are modified in the funtion. */ case class AdaptIngestionTargetLogicalRelations(spark: SparkSession) extends Rule[LogicalPlan] { - /** - * The function updates the HadoopFSRelation so that it uses HoodieFileIndex with flag - * shouldUseStringTypeForTimestampPartitionKeyType set to false. Also the data type for output attributes of the plan are - * changed accordingly. shouldUseStringTypeForTimestampPartitionKeyType is set to true by HoodieBaseRelation for reading - * tables with Timestamp or custom key generator. - */ - private def transformReaderFSRelation(logicalPlan: Option[LogicalPlan]): Option[LogicalPlan] = { - def getAttributesFromTableSchema(catalogTableOpt: Option[CatalogTable], lr: LogicalRelation, attributes: Seq[AttributeReference]) = { - if (catalogTableOpt.isDefined) { - val attributesSet = attributes.toSet - var finalAttributes: List[AttributeReference] = List.empty - for (attr <- lr.output) { - val origAttr: AttributeReference = attributesSet.collectFirst({ case a if a.name.equals(attr.name) => a }).get - val catalogAttr = catalogTableOpt.get.partitionSchema.fields.collectFirst({ case a if a.name.equals(attr.name) => a }) - val newAttr: AttributeReference = if (catalogAttr.isDefined) { - origAttr.copy(dataType = catalogAttr.get.dataType)(origAttr.exprId, origAttr.qualifier) - } else { - origAttr - } - finalAttributes = finalAttributes :+ newAttr - } - finalAttributes - } else { - attributes - } - } - - def resolveHoodieTableAndHadoopFsRelation(plan: LogicalPlan): (Option[CatalogTable], Option[HadoopFsRelation]) = { - EliminateSubqueryAliases(plan) match { - // First, we need to weed out unresolved plans - case plan if !plan.resolved => (None, None) - // NOTE: When resolving Hudi table we allow [[Filter]]s and [[Project]]s be applied - // on top of it - case PhysicalOperation(_, _, LogicalRelation(relation, _, Some(table), _)) => - val fsRelationOpt = relation match { - case relation1: HadoopFsRelation => Some(relation1) - case _ => None - } - val catalogTableOpt = Some(table).filter(isHoodieTable) - (catalogTableOpt, fsRelationOpt) - case _ => (None, None) - } - } - - logicalPlan.map(relation => { - val (catalogTableOpt, fsRelationOpt) = resolveHoodieTableAndHadoopFsRelation(relation) - val needTransformation = fsRelationOpt - .filter(rel => rel.location.isInstanceOf[HoodieFileIndex]) - .exists(rel => rel.location.asInstanceOf[HoodieFileIndex].shouldUseStringTypeForTimestampPartitionKeyType) - if (catalogTableOpt.isEmpty || !needTransformation) { - // transformation is required only when HoodieFileIndex has flag shouldUseStringTypeForTimestampPartitionKeyType set to true - // This is to ensure that write operations do not change the table schema. shouldUseStringTypeForTimestampPartitionKeyType - // is primarily used for reading timestamp based partition columns and sets the schema for such columns to string type. - relation - } else { - relation transformUp { - case lr: LogicalRelation => - val finalAttrs: Seq[AttributeReference] = getAttributesFromTableSchema(catalogTableOpt, lr, lr.output) - val newFsRelation: BaseRelation = lr.relation match { - case fsRelation: HadoopFsRelation => - // set flag shouldUseStringTypeForTimestampPartitionKeyType to false - val fileIndex = fsRelation.location.asInstanceOf[HoodieFileIndex].copy(shouldUseStringTypeForTimestampPartitionKeyType = false) - fsRelation.copy(location = fileIndex, partitionSchema = fileIndex.partitionSchema)(spark) - case _ => lr.relation - } - lr.copy(output = finalAttrs, relation = newFsRelation) - } - } - }) - } - override def apply(plan: LogicalPlan): LogicalPlan = AnalysisHelper.allowInvokingTransformsInAnalyzer { plan transformDown { @@ -275,12 +195,10 @@ object HoodieAnalysis extends SparkAdapterSupport { case _ => None } - val targetRelation = transformReaderFSRelation(updatedTargetTable) - if (updatedTargetTable.isDefined || updatedQuery.isDefined) { mit.asInstanceOf[MergeIntoTable].copy( - targetTable = targetRelation.getOrElse(targetTable), - sourceTable = transformReaderFSRelation(updatedQuery).getOrElse(query) + targetTable = updatedTargetTable.getOrElse(targetTable), + sourceTable = updatedQuery.getOrElse(query) ) } else { mit @@ -310,23 +228,13 @@ object HoodieAnalysis extends SparkAdapterSupport { if (updatedTargetTable.isDefined || updatedQuery.isDefined) { sparkAdapter.getCatalystPlanUtils.rebaseInsertIntoStatement(iis, - transformReaderFSRelation(updatedTargetTable).getOrElse(targetTable), - transformReaderFSRelation(updatedQuery).getOrElse(query)) + updatedTargetTable.getOrElse(targetTable), updatedQuery.getOrElse(query)) } else { iis } case ut @ UpdateTable(relation @ ResolvesToHudiTable(_), _, _) => - val updatedRelation: LogicalPlan = transformReaderFSRelation(Option.apply(relation)).get - ut.copy(table = updatedRelation) - - case dft@DeleteFromTable(plan@ResolvesToHudiTable(_), _) => - val updatedPlan = transformReaderFSRelation(Option.apply(plan)) - dft.copy(table = updatedPlan.getOrElse(plan)) - - case ct @ CreateTable(_, _, queryOpt) => - val updatedQuery = transformReaderFSRelation(queryOpt) - ct.copy(query = updatedQuery.orElse(queryOpt)) + ut.copy(table = relation) case logicalPlan: LogicalPlan if logicalPlan.resolved => sparkAdapter.getCatalystPlanUtils.maybeApplyForNewFileFormat(logicalPlan) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala index 574f458872ab..7c4a836f200f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieFileIndex.scala @@ -312,7 +312,7 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS val nextBatch = for ( i <- 0 to 4 - ) yield(r.nextString(1000), i, r.nextString(1000)) + ) yield (r.nextString(1000), i, r.nextString(1000)) nextBatch.toDF("_row_key", "partition", "timestamp") .write @@ -780,13 +780,13 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS val testCases: Seq[TestCase] = TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = false) :: - TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = true) :: - TestCase(enableMetadata = true, enableColumnStats = false, enableDataSkipping = true) :: - TestCase(enableMetadata = false, enableColumnStats = true, enableDataSkipping = true) :: - TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true) :: - TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY) :: - TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE) :: - Nil + TestCase(enableMetadata = false, enableColumnStats = false, enableDataSkipping = true) :: + TestCase(enableMetadata = true, enableColumnStats = false, enableDataSkipping = true) :: + TestCase(enableMetadata = false, enableColumnStats = true, enableDataSkipping = true) :: + TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true) :: + TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_IN_MEMORY) :: + TestCase(enableMetadata = true, enableColumnStats = true, enableDataSkipping = true, columnStatsProcessingModeOverride = HoodieMetadataConfig.COLUMN_STATS_INDEX_PROCESSING_MODE_ENGINE) :: + Nil for (testCase <- testCases) { val readMetadataOpts = Map( @@ -850,68 +850,6 @@ class TestHoodieFileIndex extends HoodieSparkClientTestBase with ScalaAssertionS partitionValues.mkString(StoragePath.SEPARATOR) } } - - @Test - def testGetPartitionSchema(): Unit = { - val tableConfig = new HoodieTableConfig() - tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, KeyGeneratorType.CUSTOM.name()) - tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1:SIMPLE,f2:TIMESTAMP") - val fields = List( - StructField.apply("f1", DataTypes.DoubleType, nullable = true), - StructField.apply("f2", DataTypes.LongType, nullable = true), - StructField.apply("f3", DataTypes.IntegerType, nullable = true), - StructField.apply("f4", DataTypes.StringType, nullable = false)) - val schema = StructType.apply(fields) - var expectedPartitionSchema = StructType.apply(List(fields(0), fields(1).copy(dataType = DataTypes.StringType))) - // With custom key generator handling, timestamp partition field f2 would have string schema type - assertEquals(expectedPartitionSchema, SparkAdapterSupport.sparkAdapter.getSparkParsePartitionUtil.getPartitionSchema(tableConfig, - schema, shouldUseStringTypeForTimestampPartitionKeyType = true)) - - expectedPartitionSchema = StructType.apply(List(fields(0), fields(1))) - // Without custom key generator handling, timestamp partition field f2 would have input schema type - assertEquals(expectedPartitionSchema, SparkAdapterSupport.sparkAdapter.getSparkParsePartitionUtil.getPartitionSchema(tableConfig, - schema, shouldUseStringTypeForTimestampPartitionKeyType = false)) - - tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1,f2") - expectedPartitionSchema = StructType.apply(List(fields(0), fields(1))) - // With custom key generator handling, timestamp partition field f2 would have input schema type with old partition format - assertEquals(expectedPartitionSchema, SparkAdapterSupport.sparkAdapter.getSparkParsePartitionUtil.getPartitionSchema(tableConfig, - schema, shouldUseStringTypeForTimestampPartitionKeyType = true)) - - tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, KeyGeneratorType.COMPLEX.name()) - tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1,f2") - // With other key generators, timestamp partition field f2 would have input schema type - assertEquals(expectedPartitionSchema, SparkAdapterSupport.sparkAdapter.getSparkParsePartitionUtil.getPartitionSchema(tableConfig, - schema, shouldUseStringTypeForTimestampPartitionKeyType = false)) - - tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, KeyGeneratorType.TIMESTAMP.name()) - tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f2") - expectedPartitionSchema = StructType.apply(List(fields(1).copy(dataType = DataTypes.StringType))) - // With timestamp key generator, timestamp partition field f2 would have string schema type - assertEquals(expectedPartitionSchema, SparkAdapterSupport.sparkAdapter.getSparkParsePartitionUtil.getPartitionSchema(tableConfig, - schema, shouldUseStringTypeForTimestampPartitionKeyType = true)) - } - - @Test - def testGetTimestampPartitionIndexAPI(): Unit = { - val tableConfig = new HoodieTableConfig() - tableConfig.setValue(HoodieTableConfig.RECORDKEY_FIELDS, "f3, f4") - tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, KeyGeneratorType.CUSTOM.name()) - tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1:SIMPLE,f2:TIMESTAMP") - assertEquals(Set(1), HoodieFileIndex.getTimestampPartitionIndex(tableConfig)) - - // Custom key generator with both field partition types as timestamp would return both the indices - tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1:TIMESTAMP,f2:TIMESTAMP") - assertEquals(Set(0, 1), HoodieFileIndex.getTimestampPartitionIndex(tableConfig)) - - // Custom key generator with both field partition types as simple would return empty set - tableConfig.setValue(HoodieTableConfig.PARTITION_FIELDS, "f1:SIMPLE,f2:SIMPLE") - assertEquals(Set(), HoodieFileIndex.getTimestampPartitionIndex(tableConfig)) - - // Timestamp based key generators have only a single partition field - tableConfig.setValue(HoodieTableConfig.KEY_GENERATOR_TYPE, KeyGeneratorType.TIMESTAMP.name()) - assertEquals(Set(0), HoodieFileIndex.getTimestampPartitionIndex(tableConfig)) - } } object TestHoodieFileIndex { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala index 46da43264fc2..85ff5c5c228a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala @@ -20,8 +20,6 @@ package org.apache.hudi.functional import org.apache.avro.Schema -import org.apache.hudi.DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH -import org.apache.hudi.HoodieSparkUtils import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver} import org.apache.hudi.common.util.StringUtils @@ -44,451 +42,407 @@ class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { private val LOG = LoggerFactory.getLogger(getClass) test("Test Spark SQL DML with custom key generator") { - for (extractPartition <- Seq(true, false)) { - withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { - withTempDir { tmp => - Seq( - Seq("COPY_ON_WRITE", "ts:timestamp,segment:simple", - "(ts=202401, segment='cat2')", "202401/cat2", - Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), - TS_FORMATTER_FUNC, - (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, false), - Seq("MERGE_ON_READ", "segment:simple", - "(segment='cat3')", "cat3", - Seq("cat1", "cat2", "cat4", "cat5"), - TS_TO_STRING_FUNC, - (_: Integer, segment: String) => segment, false), - Seq("MERGE_ON_READ", "ts:timestamp", - "(ts=202312)", "202312", - Seq("202401", "202402"), - TS_FORMATTER_FUNC, - (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts), false), - Seq("MERGE_ON_READ", "ts:timestamp,segment:simple", - "(ts=202401, segment='cat2')", "202401/cat2", - Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), - TS_FORMATTER_FUNC, - (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, false), - Seq("MERGE_ON_READ", "ts:timestamp,segment:simple", - "(ts=202401, segment='cat2')", "202401/cat2", - Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), - TS_FORMATTER_FUNC, - (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, true) - ).foreach { testParams => - withTable(generateTableName) { tableName => - LOG.warn("Testing with parameters: " + testParams) - val tableType = testParams(0).asInstanceOf[String] - val writePartitionFields = testParams(1).asInstanceOf[String] - val dropPartitionStatement = testParams(2).asInstanceOf[String] - val droppedPartition = testParams(3).asInstanceOf[String] - val expectedPartitions = testParams(4).asInstanceOf[Seq[String]] - val tsGenFunc = testParams(5).asInstanceOf[Integer => String] - val partitionGenFunc = testParams(6).asInstanceOf[(Integer, String) => String] - val tablePath = tmp.getCanonicalPath + "/" + tableName - val timestampKeyGeneratorConfig = if (writePartitionFields.contains("timestamp")) { - TS_KEY_GEN_CONFIGS - } else { - Map[String, String]() - } - val timestampKeyGenProps = if (timestampKeyGeneratorConfig.nonEmpty) { - ", " + timestampKeyGeneratorConfig.map(e => e._1 + " = '" + e._2 + "'").mkString(", ") - } else { - "" - } - val useOlderPartitionFieldFormat = testParams(7).asInstanceOf[Boolean] - - prepareTableWithKeyGenerator( - tableName, tablePath, tableType, - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, timestampKeyGeneratorConfig) - - if (useOlderPartitionFieldFormat) { - var metaClient = createMetaClient(spark, tablePath) - val props = new TypedProperties() - props.put(HoodieTableConfig.PARTITION_FIELDS.key(), metaClient.getTableConfig.getPartitionFieldProp) - HoodieTableConfig.update(metaClient.getStorage, metaClient.getMetaPath, props) - metaClient = createMetaClient(spark, tablePath) - assertEquals(metaClient.getTableConfig.getPartitionFieldProp, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse("")) - } - - // SQL CTAS with table properties containing key generator write configs - createTableWithSql(tableName, tablePath, - s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields'" + timestampKeyGenProps) - - // Prepare source and test SQL INSERT INTO - val sourceTableName = tableName + "_source" - prepareParquetSource(sourceTableName, Seq( - "(7, 'a7', 1399.0, 1706800227, 'cat1')", - "(8, 'a8', 26.9, 1706800227, 'cat3')", - "(9, 'a9', 299.0, 1701443427, 'cat4')")) - spark.sql( - s""" - | INSERT INTO $tableName - | SELECT * from ${tableName}_source - | """.stripMargin) - val sqlStr = s"SELECT id, name, cast(price as string), cast(ts as string), segment from $tableName" - validateResults( - tableName, - sqlStr, - extractPartition, - tsGenFunc, - partitionGenFunc, - Seq(), - Seq(1, "a1", "1.6", 1704121827, "cat1"), - Seq(2, "a2", "10.8", 1704121827, "cat1"), - Seq(3, "a3", "30.0", 1706800227, "cat1"), - Seq(4, "a4", "103.4", 1701443427, "cat2"), - Seq(5, "a5", "1999.0", 1704121827, "cat2"), - Seq(6, "a6", "80.0", 1704121827, "cat3"), - Seq(7, "a7", "1399.0", 1706800227, "cat1"), - Seq(8, "a8", "26.9", 1706800227, "cat3"), - Seq(9, "a9", "299.0", 1701443427, "cat4") - ) - - // Test SQL UPDATE - spark.sql( - s""" - | UPDATE $tableName - | SET price = price + 10.0 - | WHERE id between 4 and 7 - | """.stripMargin) - validateResults( - tableName, - sqlStr, - extractPartition, - tsGenFunc, - partitionGenFunc, - Seq(), - Seq(1, "a1", "1.6", 1704121827, "cat1"), - Seq(2, "a2", "10.8", 1704121827, "cat1"), - Seq(3, "a3", "30.0", 1706800227, "cat1"), - Seq(4, "a4", "113.4", 1701443427, "cat2"), - Seq(5, "a5", "2009.0", 1704121827, "cat2"), - Seq(6, "a6", "90.0", 1704121827, "cat3"), - Seq(7, "a7", "1409.0", 1706800227, "cat1"), - Seq(8, "a8", "26.9", 1706800227, "cat3"), - Seq(9, "a9", "299.0", 1701443427, "cat4") - ) - - // Test SQL MERGE INTO - spark.sql( - s""" - | MERGE INTO $tableName as target - | USING ( - | SELECT 1 as id, 'a1' as name, 1.6 as price, 1704121827 as ts, 'cat1' as segment, 'delete' as flag - | UNION - | SELECT 2 as id, 'a2' as name, 11.9 as price, 1704121827 as ts, 'cat1' as segment, '' as flag - | UNION - | SELECT 6 as id, 'a6' as name, 99.0 as price, 1704121827 as ts, 'cat3' as segment, '' as flag - | UNION - | SELECT 8 as id, 'a8' as name, 24.9 as price, 1706800227 as ts, 'cat3' as segment, '' as flag - | UNION - | SELECT 10 as id, 'a10' as name, 888.8 as price, 1706800227 as ts, 'cat5' as segment, '' as flag - | ) source - | on target.id = source.id - | WHEN MATCHED AND flag != 'delete' THEN UPDATE SET - | id = source.id, name = source.name, price = source.price, ts = source.ts, segment = source.segment - | WHEN MATCHED AND flag = 'delete' THEN DELETE - | WHEN NOT MATCHED THEN INSERT (id, name, price, ts, segment) - | values (source.id, source.name, source.price, source.ts, source.segment) - | """.stripMargin) - validateResults( - tableName, - sqlStr, - extractPartition, - tsGenFunc, - partitionGenFunc, - Seq(), - Seq(2, "a2", "11.9", 1704121827, "cat1"), - Seq(3, "a3", "30.0", 1706800227, "cat1"), - Seq(4, "a4", "113.4", 1701443427, "cat2"), - Seq(5, "a5", "2009.0", 1704121827, "cat2"), - Seq(6, "a6", "99.0", 1704121827, "cat3"), - Seq(7, "a7", "1409.0", 1706800227, "cat1"), - Seq(8, "a8", "24.9", 1706800227, "cat3"), - Seq(9, "a9", "299.0", 1701443427, "cat4"), - Seq(10, "a10", "888.8", 1706800227, "cat5") - ) - - // Test SQL DELETE - spark.sql( - s""" - | DELETE FROM $tableName - | WHERE id = 7 - | """.stripMargin) - validateResults( - tableName, - sqlStr, - extractPartition, - tsGenFunc, - partitionGenFunc, - Seq(), - Seq(2, "a2", "11.9", 1704121827, "cat1"), - Seq(3, "a3", "30.0", 1706800227, "cat1"), - Seq(4, "a4", "113.4", 1701443427, "cat2"), - Seq(5, "a5", "2009.0", 1704121827, "cat2"), - Seq(6, "a6", "99.0", 1704121827, "cat3"), - Seq(8, "a8", "24.9", 1706800227, "cat3"), - Seq(9, "a9", "299.0", 1701443427, "cat4"), - Seq(10, "a10", "888.8", 1706800227, "cat5") - ) - - // Test DROP PARTITION - assertTrue(getSortedTablePartitions(tableName).contains(droppedPartition)) - spark.sql( - s""" - | ALTER TABLE $tableName DROP PARTITION $dropPartitionStatement - |""".stripMargin) - validatePartitions(tableName, Seq(droppedPartition), expectedPartitions) - - if (HoodieSparkUtils.isSpark3) { - // Test INSERT OVERWRITE, only supported in Spark 3.x - spark.sql( - s""" - | INSERT OVERWRITE $tableName - | SELECT 100 as id, 'a100' as name, 299.0 as price, 1706800227 as ts, 'cat10' as segment - | """.stripMargin) - validateResults( - tableName, - sqlStr, - extractPartition, - tsGenFunc, - partitionGenFunc, - Seq(), - Seq(100, "a100", "299.0", 1706800227, "cat10") - ) - } - - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) - if (useOlderPartitionFieldFormat) { - val metaClient = createMetaClient(spark, tablePath) - assertEquals(metaClient.getTableConfig.getPartitionFieldProp, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse("")) - } - } + withTempDir { tmp => + Seq( + Seq("COPY_ON_WRITE", "ts:timestamp,segment:simple", + "(ts=202401, segment='cat2')", "202401/cat2", + Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), + (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, false), + Seq("MERGE_ON_READ", "segment:simple", + "(segment='cat3')", "cat3", + Seq("cat1", "cat2", "cat4", "cat5"), + (_: Integer, segment: String) => segment, false), + Seq("MERGE_ON_READ", "ts:timestamp", + "(ts=202312)", "202312", + Seq("202401", "202402"), + (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts), false), + Seq("MERGE_ON_READ", "ts:timestamp,segment:simple", + "(ts=202401, segment='cat2')", "202401/cat2", + Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), + (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, false), + Seq("MERGE_ON_READ", "ts:timestamp,segment:simple", + "(ts=202401, segment='cat2')", "202401/cat2", + Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), + (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, true) + ).foreach { testParams => + withTable(generateTableName) { tableName => + LOG.warn("Testing with parameters: " + testParams) + val tableType = testParams(0).asInstanceOf[String] + val writePartitionFields = testParams(1).asInstanceOf[String] + val dropPartitionStatement = testParams(2).asInstanceOf[String] + val droppedPartition = testParams(3).asInstanceOf[String] + val expectedPartitions = testParams(4).asInstanceOf[Seq[String]] + val partitionGenFunc = testParams(5).asInstanceOf[(Integer, String) => String] + val tablePath = tmp.getCanonicalPath + "/" + tableName + val timestampKeyGeneratorConfig = if (writePartitionFields.contains("timestamp")) { + TS_KEY_GEN_CONFIGS + } else { + Map[String, String]() + } + val timestampKeyGenProps = if (timestampKeyGeneratorConfig.nonEmpty) { + ", " + timestampKeyGeneratorConfig.map(e => e._1 + " = '" + e._2 + "'").mkString(", ") + } else { + "" + } + val useOlderPartitionFieldFormat = testParams(6).asInstanceOf[Boolean] + + prepareTableWithKeyGenerator( + tableName, tablePath, tableType, + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, timestampKeyGeneratorConfig) + + if (useOlderPartitionFieldFormat) { + var metaClient = createMetaClient(spark, tablePath) + val props = new TypedProperties() + props.put(HoodieTableConfig.PARTITION_FIELDS.key(), metaClient.getTableConfig.getPartitionFieldProp) + HoodieTableConfig.update(metaClient.getStorage, metaClient.getMetaPath, props) + metaClient = createMetaClient(spark, tablePath) + assertEquals(metaClient.getTableConfig.getPartitionFieldProp, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse("")) + } + + // SQL CTAS with table properties containing key generator write configs + createTableWithSql(tableName, tablePath, + s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields'" + timestampKeyGenProps) + + // Prepare source and test SQL INSERT INTO + val sourceTableName = tableName + "_source" + prepareParquetSource(sourceTableName, Seq( + "(7, 'a7', 1399.0, 1706800227, 'cat1')", + "(8, 'a8', 26.9, 1706800227, 'cat3')", + "(9, 'a9', 299.0, 1701443427, 'cat4')")) + spark.sql( + s""" + | INSERT INTO $tableName + | SELECT * from ${tableName}_source + | """.stripMargin) + val sqlStr = s"SELECT id, name, cast(price as string), ts, segment from $tableName" + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(1, "a1", "1.6", 1704121827, "cat1"), + Seq(2, "a2", "10.8", 1704121827, "cat1"), + Seq(3, "a3", "30.0", 1706800227, "cat1"), + Seq(4, "a4", "103.4", 1701443427, "cat2"), + Seq(5, "a5", "1999.0", 1704121827, "cat2"), + Seq(6, "a6", "80.0", 1704121827, "cat3"), + Seq(7, "a7", "1399.0", 1706800227, "cat1"), + Seq(8, "a8", "26.9", 1706800227, "cat3"), + Seq(9, "a9", "299.0", 1701443427, "cat4") + ) + + // Test SQL UPDATE + spark.sql( + s""" + | UPDATE $tableName + | SET price = price + 10.0 + | WHERE id between 4 and 7 + | """.stripMargin) + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(1, "a1", "1.6", 1704121827, "cat1"), + Seq(2, "a2", "10.8", 1704121827, "cat1"), + Seq(3, "a3", "30.0", 1706800227, "cat1"), + Seq(4, "a4", "113.4", 1701443427, "cat2"), + Seq(5, "a5", "2009.0", 1704121827, "cat2"), + Seq(6, "a6", "90.0", 1704121827, "cat3"), + Seq(7, "a7", "1409.0", 1706800227, "cat1"), + Seq(8, "a8", "26.9", 1706800227, "cat3"), + Seq(9, "a9", "299.0", 1701443427, "cat4") + ) + + // Test SQL MERGE INTO + spark.sql( + s""" + | MERGE INTO $tableName as target + | USING ( + | SELECT 1 as id, 'a1' as name, 1.6 as price, 1704121827 as ts, 'cat1' as segment, 'delete' as flag + | UNION + | SELECT 2 as id, 'a2' as name, 11.9 as price, 1704121827 as ts, 'cat1' as segment, '' as flag + | UNION + | SELECT 6 as id, 'a6' as name, 99.0 as price, 1704121827 as ts, 'cat3' as segment, '' as flag + | UNION + | SELECT 8 as id, 'a8' as name, 24.9 as price, 1706800227 as ts, 'cat3' as segment, '' as flag + | UNION + | SELECT 10 as id, 'a10' as name, 888.8 as price, 1706800227 as ts, 'cat5' as segment, '' as flag + | ) source + | on target.id = source.id + | WHEN MATCHED AND flag != 'delete' THEN UPDATE SET + | id = source.id, name = source.name, price = source.price, ts = source.ts, segment = source.segment + | WHEN MATCHED AND flag = 'delete' THEN DELETE + | WHEN NOT MATCHED THEN INSERT (id, name, price, ts, segment) + | values (source.id, source.name, source.price, source.ts, source.segment) + | """.stripMargin) + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(2, "a2", "11.9", 1704121827, "cat1"), + Seq(3, "a3", "30.0", 1706800227, "cat1"), + Seq(4, "a4", "113.4", 1701443427, "cat2"), + Seq(5, "a5", "2009.0", 1704121827, "cat2"), + Seq(6, "a6", "99.0", 1704121827, "cat3"), + Seq(7, "a7", "1409.0", 1706800227, "cat1"), + Seq(8, "a8", "24.9", 1706800227, "cat3"), + Seq(9, "a9", "299.0", 1701443427, "cat4"), + Seq(10, "a10", "888.8", 1706800227, "cat5") + ) + + // Test SQL DELETE + spark.sql( + s""" + | DELETE FROM $tableName + | WHERE id = 7 + | """.stripMargin) + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(2, "a2", "11.9", 1704121827, "cat1"), + Seq(3, "a3", "30.0", 1706800227, "cat1"), + Seq(4, "a4", "113.4", 1701443427, "cat2"), + Seq(5, "a5", "2009.0", 1704121827, "cat2"), + Seq(6, "a6", "99.0", 1704121827, "cat3"), + Seq(8, "a8", "24.9", 1706800227, "cat3"), + Seq(9, "a9", "299.0", 1701443427, "cat4"), + Seq(10, "a10", "888.8", 1706800227, "cat5") + ) + + // Test DROP PARTITION + assertTrue(getSortedTablePartitions(tableName).contains(droppedPartition)) + spark.sql( + s""" + | ALTER TABLE $tableName DROP PARTITION $dropPartitionStatement + |""".stripMargin) + validatePartitions(tableName, Seq(droppedPartition), expectedPartitions) + + spark.sql( + s""" + | INSERT OVERWRITE $tableName + | SELECT 100 as id, 'a100' as name, 299.0 as price, 1706800227 as ts, 'cat10' as segment + | """.stripMargin) + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(100, "a100", "299.0", 1706800227, "cat10") + ) + + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) + if (useOlderPartitionFieldFormat) { + val metaClient = createMetaClient(spark, tablePath) + assertEquals(metaClient.getTableConfig.getPartitionFieldProp, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse("")) } } } } } - test("Test table property isolation for partition path field config " - + "with custom key generator for Spark 3.3 and above") { - // Only testing Spark 3.3 and above as lower Spark versions do not support - // ALTER TABLE .. SET TBLPROPERTIES .. to store table-level properties in Hudi Catalog - if (HoodieSparkUtils.gteqSpark3_3) { - for (extractPartition <- Seq(true, false)) { - withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { - withTempDir { tmp => { - val tableNameNonPartitioned = generateTableName - val tableNameSimpleKey = generateTableName - val tableNameCustom1 = generateTableName - val tableNameCustom2 = generateTableName - - val tablePathNonPartitioned = tmp.getCanonicalPath + "/" + tableNameNonPartitioned - val tablePathSimpleKey = tmp.getCanonicalPath + "/" + tableNameSimpleKey - val tablePathCustom1 = tmp.getCanonicalPath + "/" + tableNameCustom1 - val tablePathCustom2 = tmp.getCanonicalPath + "/" + tableNameCustom2 - - val tableType = "MERGE_ON_READ" - val writePartitionFields1 = "segment:simple" - val writePartitionFields2 = "ts:timestamp,segment:simple" - - prepareTableWithKeyGenerator( - tableNameNonPartitioned, tablePathNonPartitioned, tableType, - NONPARTITIONED_KEY_GEN_CLASS_NAME, "", Map()) - prepareTableWithKeyGenerator( - tableNameSimpleKey, tablePathSimpleKey, tableType, - SIMPLE_KEY_GEN_CLASS_NAME, "segment", Map()) - prepareTableWithKeyGenerator( - tableNameCustom1, tablePathCustom1, tableType, - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields1, Map()) - prepareTableWithKeyGenerator( - tableNameCustom2, tablePathCustom2, tableType, - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields2, TS_KEY_GEN_CONFIGS) - - // Non-partitioned table does not require additional partition path field write config - createTableWithSql(tableNameNonPartitioned, tablePathNonPartitioned, "") - // Partitioned table with simple key generator does not require additional partition path field write config - createTableWithSql(tableNameSimpleKey, tablePathSimpleKey, "") - // Partitioned table with custom key generator requires additional partition path field write config - // Without that, right now the SQL DML fails - createTableWithSql(tableNameCustom1, tablePathCustom1, "") - createTableWithSql(tableNameCustom2, tablePathCustom2, - s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields2', " - + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - - val segmentPartitionFunc = (_: Integer, segment: String) => segment - val customPartitionFunc = (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment - - testFirstRoundInserts(tableNameNonPartitioned, extractPartition, TS_TO_STRING_FUNC, (_, _) => "") - testFirstRoundInserts(tableNameSimpleKey, extractPartition, TS_TO_STRING_FUNC, segmentPartitionFunc) - testFirstRoundInserts(tableNameCustom2, extractPartition, TS_FORMATTER_FUNC, customPartitionFunc) - - // INSERT INTO should succeed for tableNameCustom1 even if write partition path field config is not set - // It should pick up the partition fields from table config - val sourceTableName = tableNameCustom1 + "_source" - prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) - testFirstRoundInserts(tableNameCustom1, extractPartition, TS_TO_STRING_FUNC, segmentPartitionFunc) - - // All tables should be able to do INSERT INTO without any problem, - // since the scope of the added write config is at the catalog table level - testSecondRoundInserts(tableNameNonPartitioned, extractPartition, TS_TO_STRING_FUNC, (_, _) => "") - testSecondRoundInserts(tableNameSimpleKey, extractPartition, TS_TO_STRING_FUNC, segmentPartitionFunc) - testSecondRoundInserts(tableNameCustom2, extractPartition, TS_FORMATTER_FUNC, customPartitionFunc) - - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePathCustom1, "ts", Schema.Type.INT) - validateTsFieldSchema(tablePathCustom2, "ts", Schema.Type.INT) - } - } - } + test("Test table property isolation for partition path field config with custom key generator") { + withTempDir { + tmp => { + val tableNameNonPartitioned = generateTableName + val tableNameSimpleKey = generateTableName + val tableNameCustom1 = generateTableName + val tableNameCustom2 = generateTableName + + val tablePathNonPartitioned = tmp.getCanonicalPath + "/" + tableNameNonPartitioned + val tablePathSimpleKey = tmp.getCanonicalPath + "/" + tableNameSimpleKey + val tablePathCustom1 = tmp.getCanonicalPath + "/" + tableNameCustom1 + val tablePathCustom2 = tmp.getCanonicalPath + "/" + tableNameCustom2 + + val tableType = "MERGE_ON_READ" + val writePartitionFields1 = "segment:simple" + val writePartitionFields2 = "ts:timestamp,segment:simple" + + prepareTableWithKeyGenerator( + tableNameNonPartitioned, tablePathNonPartitioned, tableType, + NONPARTITIONED_KEY_GEN_CLASS_NAME, "", Map()) + prepareTableWithKeyGenerator( + tableNameSimpleKey, tablePathSimpleKey, tableType, + SIMPLE_KEY_GEN_CLASS_NAME, "segment", Map()) + prepareTableWithKeyGenerator( + tableNameCustom1, tablePathCustom1, tableType, + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields1, Map()) + prepareTableWithKeyGenerator( + tableNameCustom2, tablePathCustom2, tableType, + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields2, TS_KEY_GEN_CONFIGS) + + // Non-partitioned table does not require additional partition path field write config + createTableWithSql(tableNameNonPartitioned, tablePathNonPartitioned, "") + // Partitioned table with simple key generator does not require additional partition path field write config + createTableWithSql(tableNameSimpleKey, tablePathSimpleKey, "") + // Partitioned table with custom key generator requires additional partition path field write config + // Without that, right now the SQL DML fails + createTableWithSql(tableNameCustom1, tablePathCustom1, "") + createTableWithSql(tableNameCustom2, tablePathCustom2, + s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields2', " + + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) + + val segmentPartitionFunc = (_: Integer, segment: String) => segment + val customPartitionFunc = (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment + + testFirstRoundInserts(tableNameNonPartitioned, (_, _) => "") + testFirstRoundInserts(tableNameSimpleKey, segmentPartitionFunc) + testFirstRoundInserts(tableNameCustom2, customPartitionFunc) + + // INSERT INTO should succeed for tableNameCustom1 even if write partition path field config is not set + // It should pick up the partition fields from table config + val sourceTableName = tableNameCustom1 + "_source" + prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) + testFirstRoundInserts(tableNameCustom1, segmentPartitionFunc) + + // All tables should be able to do INSERT INTO without any problem, + // since the scope of the added write config is at the catalog table level + testSecondRoundInserts(tableNameNonPartitioned, (_, _) => "") + testSecondRoundInserts(tableNameSimpleKey, segmentPartitionFunc) + testSecondRoundInserts(tableNameCustom2, customPartitionFunc) + + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePathCustom1, "ts", Schema.Type.INT) + validateTsFieldSchema(tablePathCustom2, "ts", Schema.Type.INT) } } } test("Test wrong partition path field write config with custom key generator") { - for (extractPartition <- Seq(true, false)) { - withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { - withTempDir { tmp => { - val tableName = generateTableName - val tablePath = tmp.getCanonicalPath + "/" + tableName - val tableType = "MERGE_ON_READ" - val writePartitionFields = "segment:simple,ts:timestamp" - val wrongWritePartitionFields = "segment:simple" - val customPartitionFunc = (ts: Integer, segment: String) => segment + "/" + TS_FORMATTER_FUNC.apply(ts) - - prepareTableWithKeyGenerator( - tableName, tablePath, "MERGE_ON_READ", - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, TS_KEY_GEN_CONFIGS) - - // CREATE TABLE should fail due to config conflict - assertThrows[HoodieException] { - createTableWithSql(tableName, tablePath, - s"hoodie.datasource.write.partitionpath.field = '$wrongWritePartitionFields', " - + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - } - + withTempDir { + tmp => { + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + val tableType = "MERGE_ON_READ" + val writePartitionFields = "segment:simple,ts:timestamp" + val wrongWritePartitionFields = "segment:simple" + val customPartitionFunc = (ts: Integer, segment: String) => segment + "/" + TS_FORMATTER_FUNC.apply(ts) + + prepareTableWithKeyGenerator( + tableName, tablePath, "MERGE_ON_READ", + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, TS_KEY_GEN_CONFIGS) + + // CREATE TABLE should fail due to config conflict + assertThrows[HoodieException] { createTableWithSql(tableName, tablePath, - s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields', " + s"hoodie.datasource.write.partitionpath.field = '$wrongWritePartitionFields', " + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - // Set wrong write config + } + + createTableWithSql(tableName, tablePath, + s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields', " + + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) + // Set wrong write config + spark.sql( + s"""ALTER TABLE $tableName + | SET TBLPROPERTIES (hoodie.datasource.write.partitionpath.field = '$wrongWritePartitionFields') + | """.stripMargin) + + // INSERT INTO should fail due to conflict between write and table config of partition path fields + val sourceTableName = tableName + "_source" + prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) + assertThrows[HoodieException] { spark.sql( - s"""ALTER TABLE $tableName - | SET TBLPROPERTIES (hoodie.datasource.write.partitionpath.field = '$wrongWritePartitionFields') + s""" + | INSERT INTO $tableName + | SELECT * from $sourceTableName | """.stripMargin) + } - // INSERT INTO should fail due to conflict between write and table config of partition path fields - val sourceTableName = tableName + "_source" - prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) - assertThrows[HoodieException] { - spark.sql( - s""" - | INSERT INTO $tableName - | SELECT * from $sourceTableName - | """.stripMargin) - } + // Now fix the partition path field write config for tableName + spark.sql( + s"""ALTER TABLE $tableName + | SET TBLPROPERTIES (hoodie.datasource.write.partitionpath.field = '$writePartitionFields') + | """.stripMargin) - // Only testing Spark 3.3 and above as lower Spark versions do not support - // ALTER TABLE .. SET TBLPROPERTIES .. to store table-level properties in Hudi Catalog - if (HoodieSparkUtils.gteqSpark3_3) { - // Now fix the partition path field write config for tableName - spark.sql( - s"""ALTER TABLE $tableName - | SET TBLPROPERTIES (hoodie.datasource.write.partitionpath.field = '$writePartitionFields') - | """.stripMargin) - - // INSERT INTO should succeed now - testFirstRoundInserts(tableName, extractPartition, TS_FORMATTER_FUNC, customPartitionFunc) - } + // INSERT INTO should succeed now + testFirstRoundInserts(tableName, customPartitionFunc) - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) - } - } + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) } } } test("Test query with custom key generator") { - for (extractPartition <- Seq(true, false)) { - withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { - withTempDir { tmp => { - val tableName = generateTableName - val tablePath = tmp.getCanonicalPath + "/" + tableName - val writePartitionFields = "ts:timestamp" - val dateFormat = "yyyy/MM/dd" - val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts, dateFormat) - val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts) - val keyGenConfigs = TS_KEY_GEN_CONFIGS + ("hoodie.keygen.timebased.output.dateformat" -> dateFormat) - - prepareTableWithKeyGenerator( - tableName, tablePath, "MERGE_ON_READ", - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs) - - createTableWithSql(tableName, tablePath, - s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields', " - + keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - testFirstRoundInserts(tableName, extractPartition, tsGenFunc, customPartitionFunc) - assertEquals(7, spark.sql( - s""" - | SELECT * from $tableName - | """.stripMargin).count()) - val incrementalDF = spark.read.format("hudi"). - option("hoodie.datasource.query.type", "incremental"). - option("hoodie.datasource.read.begin.instanttime", 0). - load(tablePath) - incrementalDF.createOrReplaceTempView("tbl_incremental") - assertEquals(7, spark.sql( - s""" - | SELECT * from tbl_incremental - | """.stripMargin).count()) - - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) - } - } + withTempDir { + tmp => { + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + val writePartitionFields = "ts:timestamp" + val dateFormat = "yyyy/MM/dd" + val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts, dateFormat) + val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts) + val keyGenConfigs = TS_KEY_GEN_CONFIGS + ("hoodie.keygen.timebased.output.dateformat" -> dateFormat) + + prepareTableWithKeyGenerator( + tableName, tablePath, "MERGE_ON_READ", + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs) + + createTableWithSql(tableName, tablePath, + s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields', " + + keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) + testFirstRoundInserts(tableName, customPartitionFunc) + assertEquals(7, spark.sql( + s""" + | SELECT * from $tableName + | """.stripMargin).count()) + val incrementalDF = spark.read.format("hudi"). + option("hoodie.datasource.query.type", "incremental"). + option("hoodie.datasource.read.begin.instanttime", 0). + load(tablePath) + incrementalDF.createOrReplaceTempView("tbl_incremental") + assertEquals(7, spark.sql( + s""" + | SELECT * from tbl_incremental + | """.stripMargin).count()) + + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) } } } test("Test query with custom key generator without partition path field config") { - for (extractPartition <- Seq(true, false)) { - withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { - withTempDir { tmp => { - val tableName = generateTableName - val tablePath = tmp.getCanonicalPath + "/" + tableName - val writePartitionFields = "ts:timestamp" - val dateFormat = "yyyy/MM/dd" - val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts, dateFormat) - val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts) - val keyGenConfigs = TS_KEY_GEN_CONFIGS + ("hoodie.keygen.timebased.output.dateformat" -> dateFormat) - - prepareTableWithKeyGenerator( - tableName, tablePath, "MERGE_ON_READ", - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs) - - // We are not specifying config hoodie.datasource.write.partitionpath.field while creating - // table - createTableWithSql(tableName, tablePath, - keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - testFirstRoundInserts(tableName, extractPartition, tsGenFunc, customPartitionFunc) - assertEquals(7, spark.sql( - s""" - | SELECT * from $tableName - | """.stripMargin).count()) - val incrementalDF = spark.read.format("hudi"). - option("hoodie.datasource.query.type", "incremental"). - option("hoodie.datasource.read.begin.instanttime", 0). - load(tablePath) - incrementalDF.createOrReplaceTempView("tbl_incremental") - assertEquals(7, spark.sql( - s""" - | SELECT * from tbl_incremental - | """.stripMargin).count()) - - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) - } - } + withTempDir { + tmp => { + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + val writePartitionFields = "ts:timestamp" + val dateFormat = "yyyy/MM/dd" + val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts, dateFormat) + val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts) + val keyGenConfigs = TS_KEY_GEN_CONFIGS + ("hoodie.keygen.timebased.output.dateformat" -> dateFormat) + + prepareTableWithKeyGenerator( + tableName, tablePath, "MERGE_ON_READ", + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs) + + // We are not specifying config hoodie.datasource.write.partitionpath.field while creating + // table + createTableWithSql(tableName, tablePath, + keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) + testFirstRoundInserts(tableName, customPartitionFunc) + assertEquals(7, spark.sql( + s""" + | SELECT * from $tableName + | """.stripMargin).count()) + val incrementalDF = spark.read.format("hudi"). + option("hoodie.datasource.query.type", "incremental"). + option("hoodie.datasource.read.begin.instanttime", 0). + load(tablePath) + incrementalDF.createOrReplaceTempView("tbl_incremental") + assertEquals(7, spark.sql( + s""" + | SELECT * from tbl_incremental + | """.stripMargin).count()) + + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) } } } @@ -564,9 +518,7 @@ class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { | """.stripMargin) validateResults( tableName, - s"SELECT id, name, cast(price as string), cast(ts as string), segment from $tableName", - false, - tsGenFunc, + s"SELECT id, name, cast(price as string), ts, segment from $tableName", partitionGenFunc, Seq(), Seq(1, "a1", "1.6", 1704121827, "cat1"), @@ -580,8 +532,6 @@ class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { } private def testFirstRoundInserts(tableName: String, - extractPartition: Boolean, - tsGenFunc: Integer => String, partitionGenFunc: (Integer, String) => String): Unit = { val sourceTableName = tableName + "_source1" prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) @@ -590,12 +540,10 @@ class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { | INSERT INTO $tableName | SELECT * from $sourceTableName | """.stripMargin) - val sqlStr = s"SELECT id, name, cast(price as string), cast(ts as string), segment from $tableName" + val sqlStr = s"SELECT id, name, cast(price as string), ts, segment from $tableName" validateResults( tableName, sqlStr, - extractPartition, - tsGenFunc, partitionGenFunc, Seq(), Seq(1, "a1", "1.6", 1704121827, "cat1"), @@ -609,8 +557,6 @@ class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { } private def testSecondRoundInserts(tableName: String, - extractPartition: Boolean, - tsGenFunc: Integer => String, partitionGenFunc: (Integer, String) => String): Unit = { val sourceTableName = tableName + "_source2" prepareParquetSource(sourceTableName, Seq("(8, 'a8', 26.9, 1706800227, 'cat3')")) @@ -619,12 +565,10 @@ class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { | INSERT INTO $tableName | SELECT * from $sourceTableName | """.stripMargin) - val sqlStr = s"SELECT id, name, cast(price as string), cast(ts as string), segment from $tableName" + val sqlStr = s"SELECT id, name, cast(price as string), ts, segment from $tableName" validateResults( tableName, sqlStr, - extractPartition, - tsGenFunc, partitionGenFunc, Seq(), Seq(1, "a1", "1.6", 1704121827, "cat1"), @@ -726,13 +670,11 @@ class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { private def validateResults(tableName: String, sql: String, - extractPartition: Boolean, - tsGenFunc: Integer => String, partitionGenFunc: (Integer, String) => String, droppedPartitions: Seq[String], expects: Seq[Any]*): Unit = { checkAnswer(sql)( - expects.map(e => Seq(e(0), e(1), e(2), if (extractPartition) tsGenFunc.apply(e(3).asInstanceOf[Integer]) else e(3).toString, e(4))): _* + expects.map(e => Seq(e(0), e(1), e(2), e(3), e(4))): _* ) val expectedPartitions: Seq[String] = expects .map(e => partitionGenFunc.apply(e(3).asInstanceOf[Integer], e(4).asInstanceOf[String])) From b917b0c5f16200390d9bbeec429b877bc63d23e7 Mon Sep 17 00:00:00 2001 From: Jonathan Vexler <=> Date: Tue, 26 Nov 2024 15:55:04 -0500 Subject: [PATCH 2/2] revert as many changes as possible in the test --- .../TestSparkSqlWithCustomKeyGenerator.scala | 778 +++++++++--------- 1 file changed, 405 insertions(+), 373 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala index 85ff5c5c228a..d8d2fa9853ac 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkSqlWithCustomKeyGenerator.scala @@ -19,6 +19,9 @@ package org.apache.hudi.functional +import org.apache.hudi.DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH +import org.apache.hudi.HoodieSparkUtils + import org.apache.avro.Schema import org.apache.hudi.common.config.TypedProperties import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver} @@ -28,6 +31,7 @@ import org.apache.hudi.functional.TestSparkSqlWithCustomKeyGenerator._ import org.apache.hudi.keygen.constant.KeyGeneratorType import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient import org.apache.hudi.util.SparkKeyGenUtils + import org.apache.spark.sql.SaveMode import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase import org.joda.time.DateTime @@ -42,407 +46,435 @@ class TestSparkSqlWithCustomKeyGenerator extends HoodieSparkSqlTestBase { private val LOG = LoggerFactory.getLogger(getClass) test("Test Spark SQL DML with custom key generator") { - withTempDir { tmp => - Seq( - Seq("COPY_ON_WRITE", "ts:timestamp,segment:simple", - "(ts=202401, segment='cat2')", "202401/cat2", - Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), - (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, false), - Seq("MERGE_ON_READ", "segment:simple", - "(segment='cat3')", "cat3", - Seq("cat1", "cat2", "cat4", "cat5"), - (_: Integer, segment: String) => segment, false), - Seq("MERGE_ON_READ", "ts:timestamp", - "(ts=202312)", "202312", - Seq("202401", "202402"), - (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts), false), - Seq("MERGE_ON_READ", "ts:timestamp,segment:simple", - "(ts=202401, segment='cat2')", "202401/cat2", - Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), - (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, false), - Seq("MERGE_ON_READ", "ts:timestamp,segment:simple", - "(ts=202401, segment='cat2')", "202401/cat2", - Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), - (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, true) - ).foreach { testParams => - withTable(generateTableName) { tableName => - LOG.warn("Testing with parameters: " + testParams) - val tableType = testParams(0).asInstanceOf[String] - val writePartitionFields = testParams(1).asInstanceOf[String] - val dropPartitionStatement = testParams(2).asInstanceOf[String] - val droppedPartition = testParams(3).asInstanceOf[String] - val expectedPartitions = testParams(4).asInstanceOf[Seq[String]] - val partitionGenFunc = testParams(5).asInstanceOf[(Integer, String) => String] - val tablePath = tmp.getCanonicalPath + "/" + tableName - val timestampKeyGeneratorConfig = if (writePartitionFields.contains("timestamp")) { - TS_KEY_GEN_CONFIGS - } else { - Map[String, String]() - } - val timestampKeyGenProps = if (timestampKeyGeneratorConfig.nonEmpty) { - ", " + timestampKeyGeneratorConfig.map(e => e._1 + " = '" + e._2 + "'").mkString(", ") - } else { - "" - } - val useOlderPartitionFieldFormat = testParams(6).asInstanceOf[Boolean] - - prepareTableWithKeyGenerator( - tableName, tablePath, tableType, - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, timestampKeyGeneratorConfig) - - if (useOlderPartitionFieldFormat) { - var metaClient = createMetaClient(spark, tablePath) - val props = new TypedProperties() - props.put(HoodieTableConfig.PARTITION_FIELDS.key(), metaClient.getTableConfig.getPartitionFieldProp) - HoodieTableConfig.update(metaClient.getStorage, metaClient.getMetaPath, props) - metaClient = createMetaClient(spark, tablePath) - assertEquals(metaClient.getTableConfig.getPartitionFieldProp, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse("")) - } - - // SQL CTAS with table properties containing key generator write configs - createTableWithSql(tableName, tablePath, - s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields'" + timestampKeyGenProps) - - // Prepare source and test SQL INSERT INTO - val sourceTableName = tableName + "_source" - prepareParquetSource(sourceTableName, Seq( - "(7, 'a7', 1399.0, 1706800227, 'cat1')", - "(8, 'a8', 26.9, 1706800227, 'cat3')", - "(9, 'a9', 299.0, 1701443427, 'cat4')")) - spark.sql( - s""" - | INSERT INTO $tableName - | SELECT * from ${tableName}_source - | """.stripMargin) - val sqlStr = s"SELECT id, name, cast(price as string), ts, segment from $tableName" - validateResults( - tableName, - sqlStr, - partitionGenFunc, - Seq(), - Seq(1, "a1", "1.6", 1704121827, "cat1"), - Seq(2, "a2", "10.8", 1704121827, "cat1"), - Seq(3, "a3", "30.0", 1706800227, "cat1"), - Seq(4, "a4", "103.4", 1701443427, "cat2"), - Seq(5, "a5", "1999.0", 1704121827, "cat2"), - Seq(6, "a6", "80.0", 1704121827, "cat3"), - Seq(7, "a7", "1399.0", 1706800227, "cat1"), - Seq(8, "a8", "26.9", 1706800227, "cat3"), - Seq(9, "a9", "299.0", 1701443427, "cat4") - ) - - // Test SQL UPDATE - spark.sql( - s""" - | UPDATE $tableName - | SET price = price + 10.0 - | WHERE id between 4 and 7 - | """.stripMargin) - validateResults( - tableName, - sqlStr, - partitionGenFunc, - Seq(), - Seq(1, "a1", "1.6", 1704121827, "cat1"), - Seq(2, "a2", "10.8", 1704121827, "cat1"), - Seq(3, "a3", "30.0", 1706800227, "cat1"), - Seq(4, "a4", "113.4", 1701443427, "cat2"), - Seq(5, "a5", "2009.0", 1704121827, "cat2"), - Seq(6, "a6", "90.0", 1704121827, "cat3"), - Seq(7, "a7", "1409.0", 1706800227, "cat1"), - Seq(8, "a8", "26.9", 1706800227, "cat3"), - Seq(9, "a9", "299.0", 1701443427, "cat4") - ) - - // Test SQL MERGE INTO - spark.sql( - s""" - | MERGE INTO $tableName as target - | USING ( - | SELECT 1 as id, 'a1' as name, 1.6 as price, 1704121827 as ts, 'cat1' as segment, 'delete' as flag - | UNION - | SELECT 2 as id, 'a2' as name, 11.9 as price, 1704121827 as ts, 'cat1' as segment, '' as flag - | UNION - | SELECT 6 as id, 'a6' as name, 99.0 as price, 1704121827 as ts, 'cat3' as segment, '' as flag - | UNION - | SELECT 8 as id, 'a8' as name, 24.9 as price, 1706800227 as ts, 'cat3' as segment, '' as flag - | UNION - | SELECT 10 as id, 'a10' as name, 888.8 as price, 1706800227 as ts, 'cat5' as segment, '' as flag - | ) source - | on target.id = source.id - | WHEN MATCHED AND flag != 'delete' THEN UPDATE SET - | id = source.id, name = source.name, price = source.price, ts = source.ts, segment = source.segment - | WHEN MATCHED AND flag = 'delete' THEN DELETE - | WHEN NOT MATCHED THEN INSERT (id, name, price, ts, segment) - | values (source.id, source.name, source.price, source.ts, source.segment) - | """.stripMargin) - validateResults( - tableName, - sqlStr, - partitionGenFunc, - Seq(), - Seq(2, "a2", "11.9", 1704121827, "cat1"), - Seq(3, "a3", "30.0", 1706800227, "cat1"), - Seq(4, "a4", "113.4", 1701443427, "cat2"), - Seq(5, "a5", "2009.0", 1704121827, "cat2"), - Seq(6, "a6", "99.0", 1704121827, "cat3"), - Seq(7, "a7", "1409.0", 1706800227, "cat1"), - Seq(8, "a8", "24.9", 1706800227, "cat3"), - Seq(9, "a9", "299.0", 1701443427, "cat4"), - Seq(10, "a10", "888.8", 1706800227, "cat5") - ) - - // Test SQL DELETE - spark.sql( - s""" - | DELETE FROM $tableName - | WHERE id = 7 - | """.stripMargin) - validateResults( - tableName, - sqlStr, - partitionGenFunc, - Seq(), - Seq(2, "a2", "11.9", 1704121827, "cat1"), - Seq(3, "a3", "30.0", 1706800227, "cat1"), - Seq(4, "a4", "113.4", 1701443427, "cat2"), - Seq(5, "a5", "2009.0", 1704121827, "cat2"), - Seq(6, "a6", "99.0", 1704121827, "cat3"), - Seq(8, "a8", "24.9", 1706800227, "cat3"), - Seq(9, "a9", "299.0", 1701443427, "cat4"), - Seq(10, "a10", "888.8", 1706800227, "cat5") - ) - - // Test DROP PARTITION - assertTrue(getSortedTablePartitions(tableName).contains(droppedPartition)) - spark.sql( - s""" - | ALTER TABLE $tableName DROP PARTITION $dropPartitionStatement - |""".stripMargin) - validatePartitions(tableName, Seq(droppedPartition), expectedPartitions) - - spark.sql( - s""" - | INSERT OVERWRITE $tableName - | SELECT 100 as id, 'a100' as name, 299.0 as price, 1706800227 as ts, 'cat10' as segment - | """.stripMargin) - validateResults( - tableName, - sqlStr, - partitionGenFunc, - Seq(), - Seq(100, "a100", "299.0", 1706800227, "cat10") - ) - - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) - if (useOlderPartitionFieldFormat) { - val metaClient = createMetaClient(spark, tablePath) - assertEquals(metaClient.getTableConfig.getPartitionFieldProp, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse("")) + for (extractPartition <- Seq(false)) { + withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { + withTempDir { tmp => + Seq( + Seq("COPY_ON_WRITE", "ts:timestamp,segment:simple", + "(ts=202401, segment='cat2')", "202401/cat2", + Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), + (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, false), + Seq("MERGE_ON_READ", "segment:simple", + "(segment='cat3')", "cat3", + Seq("cat1", "cat2", "cat4", "cat5"), + (_: Integer, segment: String) => segment, false), + Seq("MERGE_ON_READ", "ts:timestamp", + "(ts=202312)", "202312", + Seq("202401", "202402"), + (ts: Integer, _: String) => TS_FORMATTER_FUNC.apply(ts), false), + Seq("MERGE_ON_READ", "ts:timestamp,segment:simple", + "(ts=202401, segment='cat2')", "202401/cat2", + Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), + (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, false), + Seq("MERGE_ON_READ", "ts:timestamp,segment:simple", + "(ts=202401, segment='cat2')", "202401/cat2", + Seq("202312/cat2", "202312/cat4", "202401/cat1", "202401/cat3", "202402/cat1", "202402/cat3", "202402/cat5"), + (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment, true) + ).foreach { testParams => + withTable(generateTableName) { tableName => + LOG.warn("Testing with parameters: " + testParams) + val tableType = testParams(0).asInstanceOf[String] + val writePartitionFields = testParams(1).asInstanceOf[String] + val dropPartitionStatement = testParams(2).asInstanceOf[String] + val droppedPartition = testParams(3).asInstanceOf[String] + val expectedPartitions = testParams(4).asInstanceOf[Seq[String]] + val partitionGenFunc = testParams(5).asInstanceOf[(Integer, String) => String] + val tablePath = tmp.getCanonicalPath + "/" + tableName + val timestampKeyGeneratorConfig = if (writePartitionFields.contains("timestamp")) { + TS_KEY_GEN_CONFIGS + } else { + Map[String, String]() + } + val timestampKeyGenProps = if (timestampKeyGeneratorConfig.nonEmpty) { + ", " + timestampKeyGeneratorConfig.map(e => e._1 + " = '" + e._2 + "'").mkString(", ") + } else { + "" + } + val useOlderPartitionFieldFormat = testParams(6).asInstanceOf[Boolean] + + prepareTableWithKeyGenerator( + tableName, tablePath, tableType, + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, timestampKeyGeneratorConfig) + + if (useOlderPartitionFieldFormat) { + var metaClient = createMetaClient(spark, tablePath) + val props = new TypedProperties() + props.put(HoodieTableConfig.PARTITION_FIELDS.key(), metaClient.getTableConfig.getPartitionFieldProp) + HoodieTableConfig.update(metaClient.getStorage, metaClient.getMetaPath, props) + metaClient = createMetaClient(spark, tablePath) + assertEquals(metaClient.getTableConfig.getPartitionFieldProp, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse("")) + } + + // SQL CTAS with table properties containing key generator write configs + createTableWithSql(tableName, tablePath, + s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields'" + timestampKeyGenProps) + + // Prepare source and test SQL INSERT INTO + val sourceTableName = tableName + "_source" + prepareParquetSource(sourceTableName, Seq( + "(7, 'a7', 1399.0, 1706800227, 'cat1')", + "(8, 'a8', 26.9, 1706800227, 'cat3')", + "(9, 'a9', 299.0, 1701443427, 'cat4')")) + spark.sql( + s""" + | INSERT INTO $tableName + | SELECT * from ${tableName}_source + | """.stripMargin) + val sqlStr = s"SELECT id, name, cast(price as string), ts, segment from $tableName" + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(1, "a1", "1.6", 1704121827, "cat1"), + Seq(2, "a2", "10.8", 1704121827, "cat1"), + Seq(3, "a3", "30.0", 1706800227, "cat1"), + Seq(4, "a4", "103.4", 1701443427, "cat2"), + Seq(5, "a5", "1999.0", 1704121827, "cat2"), + Seq(6, "a6", "80.0", 1704121827, "cat3"), + Seq(7, "a7", "1399.0", 1706800227, "cat1"), + Seq(8, "a8", "26.9", 1706800227, "cat3"), + Seq(9, "a9", "299.0", 1701443427, "cat4") + ) + + // Test SQL UPDATE + spark.sql( + s""" + | UPDATE $tableName + | SET price = price + 10.0 + | WHERE id between 4 and 7 + | """.stripMargin) + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(1, "a1", "1.6", 1704121827, "cat1"), + Seq(2, "a2", "10.8", 1704121827, "cat1"), + Seq(3, "a3", "30.0", 1706800227, "cat1"), + Seq(4, "a4", "113.4", 1701443427, "cat2"), + Seq(5, "a5", "2009.0", 1704121827, "cat2"), + Seq(6, "a6", "90.0", 1704121827, "cat3"), + Seq(7, "a7", "1409.0", 1706800227, "cat1"), + Seq(8, "a8", "26.9", 1706800227, "cat3"), + Seq(9, "a9", "299.0", 1701443427, "cat4") + ) + + // Test SQL MERGE INTO + spark.sql( + s""" + | MERGE INTO $tableName as target + | USING ( + | SELECT 1 as id, 'a1' as name, 1.6 as price, 1704121827 as ts, 'cat1' as segment, 'delete' as flag + | UNION + | SELECT 2 as id, 'a2' as name, 11.9 as price, 1704121827 as ts, 'cat1' as segment, '' as flag + | UNION + | SELECT 6 as id, 'a6' as name, 99.0 as price, 1704121827 as ts, 'cat3' as segment, '' as flag + | UNION + | SELECT 8 as id, 'a8' as name, 24.9 as price, 1706800227 as ts, 'cat3' as segment, '' as flag + | UNION + | SELECT 10 as id, 'a10' as name, 888.8 as price, 1706800227 as ts, 'cat5' as segment, '' as flag + | ) source + | on target.id = source.id + | WHEN MATCHED AND flag != 'delete' THEN UPDATE SET + | id = source.id, name = source.name, price = source.price, ts = source.ts, segment = source.segment + | WHEN MATCHED AND flag = 'delete' THEN DELETE + | WHEN NOT MATCHED THEN INSERT (id, name, price, ts, segment) + | values (source.id, source.name, source.price, source.ts, source.segment) + | """.stripMargin) + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(2, "a2", "11.9", 1704121827, "cat1"), + Seq(3, "a3", "30.0", 1706800227, "cat1"), + Seq(4, "a4", "113.4", 1701443427, "cat2"), + Seq(5, "a5", "2009.0", 1704121827, "cat2"), + Seq(6, "a6", "99.0", 1704121827, "cat3"), + Seq(7, "a7", "1409.0", 1706800227, "cat1"), + Seq(8, "a8", "24.9", 1706800227, "cat3"), + Seq(9, "a9", "299.0", 1701443427, "cat4"), + Seq(10, "a10", "888.8", 1706800227, "cat5") + ) + + // Test SQL DELETE + spark.sql( + s""" + | DELETE FROM $tableName + | WHERE id = 7 + | """.stripMargin) + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(2, "a2", "11.9", 1704121827, "cat1"), + Seq(3, "a3", "30.0", 1706800227, "cat1"), + Seq(4, "a4", "113.4", 1701443427, "cat2"), + Seq(5, "a5", "2009.0", 1704121827, "cat2"), + Seq(6, "a6", "99.0", 1704121827, "cat3"), + Seq(8, "a8", "24.9", 1706800227, "cat3"), + Seq(9, "a9", "299.0", 1701443427, "cat4"), + Seq(10, "a10", "888.8", 1706800227, "cat5") + ) + + // Test DROP PARTITION + assertTrue(getSortedTablePartitions(tableName).contains(droppedPartition)) + spark.sql( + s""" + | ALTER TABLE $tableName DROP PARTITION $dropPartitionStatement + |""".stripMargin) + validatePartitions(tableName, Seq(droppedPartition), expectedPartitions) + + if (HoodieSparkUtils.isSpark3) { + // Test INSERT OVERWRITE, only supported in Spark 3.x + spark.sql( + s""" + | INSERT OVERWRITE $tableName + | SELECT 100 as id, 'a100' as name, 299.0 as price, 1706800227 as ts, 'cat10' as segment + | """.stripMargin) + validateResults( + tableName, + sqlStr, + partitionGenFunc, + Seq(), + Seq(100, "a100", "299.0", 1706800227, "cat10") + ) + } + + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) + if (useOlderPartitionFieldFormat) { + val metaClient = createMetaClient(spark, tablePath) + assertEquals(metaClient.getTableConfig.getPartitionFieldProp, HoodieTableConfig.getPartitionFieldPropForKeyGenerator(metaClient.getTableConfig).orElse("")) + } + } } } } } } - test("Test table property isolation for partition path field config with custom key generator") { - withTempDir { - tmp => { - val tableNameNonPartitioned = generateTableName - val tableNameSimpleKey = generateTableName - val tableNameCustom1 = generateTableName - val tableNameCustom2 = generateTableName - - val tablePathNonPartitioned = tmp.getCanonicalPath + "/" + tableNameNonPartitioned - val tablePathSimpleKey = tmp.getCanonicalPath + "/" + tableNameSimpleKey - val tablePathCustom1 = tmp.getCanonicalPath + "/" + tableNameCustom1 - val tablePathCustom2 = tmp.getCanonicalPath + "/" + tableNameCustom2 - - val tableType = "MERGE_ON_READ" - val writePartitionFields1 = "segment:simple" - val writePartitionFields2 = "ts:timestamp,segment:simple" - - prepareTableWithKeyGenerator( - tableNameNonPartitioned, tablePathNonPartitioned, tableType, - NONPARTITIONED_KEY_GEN_CLASS_NAME, "", Map()) - prepareTableWithKeyGenerator( - tableNameSimpleKey, tablePathSimpleKey, tableType, - SIMPLE_KEY_GEN_CLASS_NAME, "segment", Map()) - prepareTableWithKeyGenerator( - tableNameCustom1, tablePathCustom1, tableType, - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields1, Map()) - prepareTableWithKeyGenerator( - tableNameCustom2, tablePathCustom2, tableType, - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields2, TS_KEY_GEN_CONFIGS) - - // Non-partitioned table does not require additional partition path field write config - createTableWithSql(tableNameNonPartitioned, tablePathNonPartitioned, "") - // Partitioned table with simple key generator does not require additional partition path field write config - createTableWithSql(tableNameSimpleKey, tablePathSimpleKey, "") - // Partitioned table with custom key generator requires additional partition path field write config - // Without that, right now the SQL DML fails - createTableWithSql(tableNameCustom1, tablePathCustom1, "") - createTableWithSql(tableNameCustom2, tablePathCustom2, - s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields2', " - + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - - val segmentPartitionFunc = (_: Integer, segment: String) => segment - val customPartitionFunc = (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment - - testFirstRoundInserts(tableNameNonPartitioned, (_, _) => "") - testFirstRoundInserts(tableNameSimpleKey, segmentPartitionFunc) - testFirstRoundInserts(tableNameCustom2, customPartitionFunc) - - // INSERT INTO should succeed for tableNameCustom1 even if write partition path field config is not set - // It should pick up the partition fields from table config - val sourceTableName = tableNameCustom1 + "_source" - prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) - testFirstRoundInserts(tableNameCustom1, segmentPartitionFunc) - - // All tables should be able to do INSERT INTO without any problem, - // since the scope of the added write config is at the catalog table level - testSecondRoundInserts(tableNameNonPartitioned, (_, _) => "") - testSecondRoundInserts(tableNameSimpleKey, segmentPartitionFunc) - testSecondRoundInserts(tableNameCustom2, customPartitionFunc) - - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePathCustom1, "ts", Schema.Type.INT) - validateTsFieldSchema(tablePathCustom2, "ts", Schema.Type.INT) + test("Test table property isolation for partition path field config " + + "with custom key generator for Spark 3.3 and above") { + // Only testing Spark 3.3 and above as lower Spark versions do not support + // ALTER TABLE .. SET TBLPROPERTIES .. to store table-level properties in Hudi Catalog + if (HoodieSparkUtils.gteqSpark3_3) { + for (extractPartition <- Seq(false)) { + withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { + withTempDir { tmp => { + val tableNameNonPartitioned = generateTableName + val tableNameSimpleKey = generateTableName + val tableNameCustom1 = generateTableName + val tableNameCustom2 = generateTableName + + val tablePathNonPartitioned = tmp.getCanonicalPath + "/" + tableNameNonPartitioned + val tablePathSimpleKey = tmp.getCanonicalPath + "/" + tableNameSimpleKey + val tablePathCustom1 = tmp.getCanonicalPath + "/" + tableNameCustom1 + val tablePathCustom2 = tmp.getCanonicalPath + "/" + tableNameCustom2 + + val tableType = "MERGE_ON_READ" + val writePartitionFields1 = "segment:simple" + val writePartitionFields2 = "ts:timestamp,segment:simple" + + prepareTableWithKeyGenerator( + tableNameNonPartitioned, tablePathNonPartitioned, tableType, + NONPARTITIONED_KEY_GEN_CLASS_NAME, "", Map()) + prepareTableWithKeyGenerator( + tableNameSimpleKey, tablePathSimpleKey, tableType, + SIMPLE_KEY_GEN_CLASS_NAME, "segment", Map()) + prepareTableWithKeyGenerator( + tableNameCustom1, tablePathCustom1, tableType, + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields1, Map()) + prepareTableWithKeyGenerator( + tableNameCustom2, tablePathCustom2, tableType, + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields2, TS_KEY_GEN_CONFIGS) + + // Non-partitioned table does not require additional partition path field write config + createTableWithSql(tableNameNonPartitioned, tablePathNonPartitioned, "") + // Partitioned table with simple key generator does not require additional partition path field write config + createTableWithSql(tableNameSimpleKey, tablePathSimpleKey, "") + // Partitioned table with custom key generator requires additional partition path field write config + // Without that, right now the SQL DML fails + createTableWithSql(tableNameCustom1, tablePathCustom1, "") + createTableWithSql(tableNameCustom2, tablePathCustom2, + s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields2', " + + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) + + val segmentPartitionFunc = (_: Integer, segment: String) => segment + val customPartitionFunc = (ts: Integer, segment: String) => TS_FORMATTER_FUNC.apply(ts) + "/" + segment + + testFirstRoundInserts(tableNameNonPartitioned, (_, _) => "") + testFirstRoundInserts(tableNameSimpleKey, segmentPartitionFunc) + testFirstRoundInserts(tableNameCustom2, customPartitionFunc) + + // INSERT INTO should succeed for tableNameCustom1 even if write partition path field config is not set + // It should pick up the partition fields from table config + val sourceTableName = tableNameCustom1 + "_source" + prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) + testFirstRoundInserts(tableNameCustom1, segmentPartitionFunc) + + // All tables should be able to do INSERT INTO without any problem, + // since the scope of the added write config is at the catalog table level + testSecondRoundInserts(tableNameNonPartitioned, (_, _) => "") + testSecondRoundInserts(tableNameSimpleKey, segmentPartitionFunc) + testSecondRoundInserts(tableNameCustom2, customPartitionFunc) + + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePathCustom1, "ts", Schema.Type.INT) + validateTsFieldSchema(tablePathCustom2, "ts", Schema.Type.INT) + } + } + } } } } test("Test wrong partition path field write config with custom key generator") { - withTempDir { - tmp => { - val tableName = generateTableName - val tablePath = tmp.getCanonicalPath + "/" + tableName - val tableType = "MERGE_ON_READ" - val writePartitionFields = "segment:simple,ts:timestamp" - val wrongWritePartitionFields = "segment:simple" - val customPartitionFunc = (ts: Integer, segment: String) => segment + "/" + TS_FORMATTER_FUNC.apply(ts) - - prepareTableWithKeyGenerator( - tableName, tablePath, "MERGE_ON_READ", - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, TS_KEY_GEN_CONFIGS) - - // CREATE TABLE should fail due to config conflict - assertThrows[HoodieException] { + for (extractPartition <- Seq(false)) { + withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { + withTempDir { tmp => { + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + val tableType = "MERGE_ON_READ" + val writePartitionFields = "segment:simple,ts:timestamp" + val wrongWritePartitionFields = "segment:simple" + val customPartitionFunc = (ts: Integer, segment: String) => segment + "/" + TS_FORMATTER_FUNC.apply(ts) + + prepareTableWithKeyGenerator( + tableName, tablePath, "MERGE_ON_READ", + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, TS_KEY_GEN_CONFIGS) + + // CREATE TABLE should fail due to config conflict + assertThrows[HoodieException] { + createTableWithSql(tableName, tablePath, + s"hoodie.datasource.write.partitionpath.field = '$wrongWritePartitionFields', " + + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) + } + createTableWithSql(tableName, tablePath, - s"hoodie.datasource.write.partitionpath.field = '$wrongWritePartitionFields', " + s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields', " + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - } - - createTableWithSql(tableName, tablePath, - s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields', " - + TS_KEY_GEN_CONFIGS.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - // Set wrong write config - spark.sql( - s"""ALTER TABLE $tableName - | SET TBLPROPERTIES (hoodie.datasource.write.partitionpath.field = '$wrongWritePartitionFields') - | """.stripMargin) - - // INSERT INTO should fail due to conflict between write and table config of partition path fields - val sourceTableName = tableName + "_source" - prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) - assertThrows[HoodieException] { + // Set wrong write config spark.sql( - s""" - | INSERT INTO $tableName - | SELECT * from $sourceTableName + s"""ALTER TABLE $tableName + | SET TBLPROPERTIES (hoodie.datasource.write.partitionpath.field = '$wrongWritePartitionFields') | """.stripMargin) - } - // Now fix the partition path field write config for tableName - spark.sql( - s"""ALTER TABLE $tableName - | SET TBLPROPERTIES (hoodie.datasource.write.partitionpath.field = '$writePartitionFields') - | """.stripMargin) + // INSERT INTO should fail due to conflict between write and table config of partition path fields + val sourceTableName = tableName + "_source" + prepareParquetSource(sourceTableName, Seq("(7, 'a7', 1399.0, 1706800227, 'cat1')")) + assertThrows[HoodieException] { + spark.sql( + s""" + | INSERT INTO $tableName + | SELECT * from $sourceTableName + | """.stripMargin) + } - // INSERT INTO should succeed now - testFirstRoundInserts(tableName, customPartitionFunc) + // Only testing Spark 3.3 and above as lower Spark versions do not support + // ALTER TABLE .. SET TBLPROPERTIES .. to store table-level properties in Hudi Catalog + if (HoodieSparkUtils.gteqSpark3_3) { + // Now fix the partition path field write config for tableName + spark.sql( + s"""ALTER TABLE $tableName + | SET TBLPROPERTIES (hoodie.datasource.write.partitionpath.field = '$writePartitionFields') + | """.stripMargin) + + // INSERT INTO should succeed now + testFirstRoundInserts(tableName, customPartitionFunc) + } - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) + } + } } } } test("Test query with custom key generator") { - withTempDir { - tmp => { - val tableName = generateTableName - val tablePath = tmp.getCanonicalPath + "/" + tableName - val writePartitionFields = "ts:timestamp" - val dateFormat = "yyyy/MM/dd" - val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts, dateFormat) - val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts) - val keyGenConfigs = TS_KEY_GEN_CONFIGS + ("hoodie.keygen.timebased.output.dateformat" -> dateFormat) - - prepareTableWithKeyGenerator( - tableName, tablePath, "MERGE_ON_READ", - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs) - - createTableWithSql(tableName, tablePath, - s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields', " - + keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - testFirstRoundInserts(tableName, customPartitionFunc) - assertEquals(7, spark.sql( - s""" - | SELECT * from $tableName - | """.stripMargin).count()) - val incrementalDF = spark.read.format("hudi"). - option("hoodie.datasource.query.type", "incremental"). - option("hoodie.datasource.read.begin.instanttime", 0). - load(tablePath) - incrementalDF.createOrReplaceTempView("tbl_incremental") - assertEquals(7, spark.sql( - s""" - | SELECT * from tbl_incremental - | """.stripMargin).count()) - - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) + for (extractPartition <- Seq(false)) { + withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { + withTempDir { tmp => { + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + val writePartitionFields = "ts:timestamp" + val dateFormat = "yyyy/MM/dd" + val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts, dateFormat) + val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts) + val keyGenConfigs = TS_KEY_GEN_CONFIGS + ("hoodie.keygen.timebased.output.dateformat" -> dateFormat) + + prepareTableWithKeyGenerator( + tableName, tablePath, "MERGE_ON_READ", + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs) + + createTableWithSql(tableName, tablePath, + s"hoodie.datasource.write.partitionpath.field = '$writePartitionFields', " + + keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) + testFirstRoundInserts(tableName, customPartitionFunc) + assertEquals(7, spark.sql( + s""" + | SELECT * from $tableName + | """.stripMargin).count()) + val incrementalDF = spark.read.format("hudi"). + option("hoodie.datasource.query.type", "incremental"). + option("hoodie.datasource.read.begin.instanttime", 0). + load(tablePath) + incrementalDF.createOrReplaceTempView("tbl_incremental") + assertEquals(7, spark.sql( + s""" + | SELECT * from tbl_incremental + | """.stripMargin).count()) + + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) + } + } } } } test("Test query with custom key generator without partition path field config") { - withTempDir { - tmp => { - val tableName = generateTableName - val tablePath = tmp.getCanonicalPath + "/" + tableName - val writePartitionFields = "ts:timestamp" - val dateFormat = "yyyy/MM/dd" - val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts, dateFormat) - val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts) - val keyGenConfigs = TS_KEY_GEN_CONFIGS + ("hoodie.keygen.timebased.output.dateformat" -> dateFormat) - - prepareTableWithKeyGenerator( - tableName, tablePath, "MERGE_ON_READ", - CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs) - - // We are not specifying config hoodie.datasource.write.partitionpath.field while creating - // table - createTableWithSql(tableName, tablePath, - keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) - testFirstRoundInserts(tableName, customPartitionFunc) - assertEquals(7, spark.sql( - s""" - | SELECT * from $tableName - | """.stripMargin).count()) - val incrementalDF = spark.read.format("hudi"). - option("hoodie.datasource.query.type", "incremental"). - option("hoodie.datasource.read.begin.instanttime", 0). - load(tablePath) - incrementalDF.createOrReplaceTempView("tbl_incremental") - assertEquals(7, spark.sql( - s""" - | SELECT * from tbl_incremental - | """.stripMargin).count()) - - // Validate ts field is still of type int in the table - validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) + for (extractPartition <- Seq(false)) { + withSQLConf(EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key() -> extractPartition.toString) { + withTempDir { tmp => { + val tableName = generateTableName + val tablePath = tmp.getCanonicalPath + "/" + tableName + val writePartitionFields = "ts:timestamp" + val dateFormat = "yyyy/MM/dd" + val tsGenFunc = (ts: Integer) => TS_FORMATTER_FUNC_WITH_FORMAT.apply(ts, dateFormat) + val customPartitionFunc = (ts: Integer, _: String) => tsGenFunc.apply(ts) + val keyGenConfigs = TS_KEY_GEN_CONFIGS + ("hoodie.keygen.timebased.output.dateformat" -> dateFormat) + + prepareTableWithKeyGenerator( + tableName, tablePath, "MERGE_ON_READ", + CUSTOM_KEY_GEN_CLASS_NAME, writePartitionFields, keyGenConfigs) + + // We are not specifying config hoodie.datasource.write.partitionpath.field while creating + // table + createTableWithSql(tableName, tablePath, + keyGenConfigs.map(e => e._1 + " = '" + e._2 + "'").mkString(", ")) + testFirstRoundInserts(tableName, customPartitionFunc) + assertEquals(7, spark.sql( + s""" + | SELECT * from $tableName + | """.stripMargin).count()) + val incrementalDF = spark.read.format("hudi"). + option("hoodie.datasource.query.type", "incremental"). + option("hoodie.datasource.read.begin.instanttime", 0). + load(tablePath) + incrementalDF.createOrReplaceTempView("tbl_incremental") + assertEquals(7, spark.sql( + s""" + | SELECT * from tbl_incremental + | """.stripMargin).count()) + + // Validate ts field is still of type int in the table + validateTsFieldSchema(tablePath, "ts", Schema.Type.INT) + } + } } } }