Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Hudi Record Level Index says item not found with Complex keys #12234

Open
dataproblems opened this issue Nov 11, 2024 · 5 comments
Open
Labels
index priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types

Comments

@dataproblems
Copy link

Describe the problem you faced

Record lookup in a table with record level index results in None.get exception.

To Reproduce

Steps to reproduce the behavior:

  1. Create a table with two fields in the record key and use Complex Key Generator
  2. Read the table and perform a lookup for a particular key

Expected behavior

I should be able to read the data without any exceptions, like I can for a table generated using simple key generator.

Environment Description

  • Hudi version : 0.14.0

  • Spark version : 3.4

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

Read Options I used:

val ReadOptions: Map[String, String] = Map(
"hoodie.enable.data.skipping" -> "true", 
"hoodie.metadata.enable" -> "true", 
"hoodie.metadata.index.column.stats.enable" -> "true", 
"hoodie.metadata.record.index.enable" -> "true")

Config I used to create the table with complex key

val insertOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
  DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
  HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
  HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
  "hoodie.parquet.small.file.limit" -> "1073741824",
  HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
  HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
  HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
  "hoodie.metadata.record.index.enable" -> "true",
  "hoodie.metadata.enable" -> "true",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.write.partitionpath.field" -> "partition",
  "hoodie.datasource.write.recordkey.field" -> "id,partition",
  "hoodie.datasource.write.precombine.field" -> "ts",
  "hoodie.table.name" -> tableName,
  DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getName,
  "hoodie.write.markers.type" -> "DIRECT",
  "hoodie.embed.timeline.server" -> "true",
  "hoodie.metadata.record.index.min.filegroup.count" -> "100",
 )

Config I used to create the table with simple key:


val insertOptions: Map[String, String] = Map(
DataSourceWriteOptions.OPERATION.key() -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
  DataSourceWriteOptions.TABLE_TYPE.key() -> DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL,
  HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key() -> "snappy",
  HoodieStorageConfig.PARQUET_MAX_FILE_SIZE.key() -> "2147483648",
  "hoodie.parquet.small.file.limit" -> "1073741824",
  HoodieTableConfig.POPULATE_META_FIELDS.key() -> "true",
  HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true",
  HoodieIndexConfig.INDEX_TYPE.key() -> "RECORD_INDEX",
  "hoodie.metadata.record.index.enable" -> "true",
  "hoodie.metadata.enable" -> "true",
  "hoodie.datasource.write.hive_style_partitioning" -> "true",
  "hoodie.datasource.write.partitionpath.field" -> "partition",
  "hoodie.datasource.write.recordkey.field" -> "id",
  "hoodie.datasource.write.precombine.field" -> "ts",
  "hoodie.table.name" -> tableName,
  DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> classOf[SimpleKeyGenerator].getName,
  "hoodie.write.markers.type" -> "DIRECT",
  "hoodie.embed.timeline.server" -> "true",
  "hoodie.metadata.record.index.min.filegroup.count" -> "1000",
 )

Code I used to generate the data:

import java.util.UUID
import scala.util.Random
case class RandomData(id: Long, uuid: String, ts: Long = 28800000L, partition: String)

val partitions = List("One", "Two", "Three", "Four")

val randomData = spark.range(1, 10 * 10000000L).map(f => RandomData(id = f, uuid = UUID.randomUUID.toString, partition = Random.shuffle(partitions).head))

Stacktrace

java.util.NoSuchElementException: None.get
        at scala.None$.get(Option.scala:529) ~[scala-library-2.12.15.jar:?]
        at scala.None$.get(Option.scala:527) ~[scala-library-2.12.15.jar:?]
        at org.apache.hudi.RecordLevelIndexSupport.attributeMatchesRecordKey(RecordLevelIndexSupport.scala:89) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.RecordLevelIndexSupport.filterQueryWithRecordKey(RecordLevelIndexSupport.scala:155) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.RecordLevelIndexSupport.$anonfun$filterQueriesWithRecordKey$1(RecordLevelIndexSupport.scala:133) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.RecordLevelIndexSupport.$anonfun$filterQueriesWithRecordKey$1$adapted(RecordLevelIndexSupport.scala:132) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at scala.collection.immutable.List.foreach(List.scala:431) ~[scala-library-2.12.15.jar:?]
        at org.apache.hudi.RecordLevelIndexSupport.filterQueriesWithRecordKey(RecordLevelIndexSupport.scala:132) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.recordKeys$lzycompute$1(HoodieFileIndex.scala:334) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.recordKeys$1(HoodieFileIndex.scala:334) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.$anonfun$lookupCandidateFilesInMetadataTable$1(HoodieFileIndex.scala:338) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at scala.util.Try$.apply(Try.scala:213) ~[scala-library-2.12.15.jar:?]
        at org.apache.hudi.HoodieFileIndex.lookupCandidateFilesInMetadataTable(HoodieFileIndex.scala:321) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.filterFileSlices(HoodieFileIndex.scala:222) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.hudi.HoodieFileIndex.listFiles(HoodieFileIndex.scala:149) ~[hudi-spark3-bundle_2.12-0.14.0-amzn-0.jar:0.14.0-amzn-0]
        at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions(DataSourceScanExec.scala:274) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanLike.selectedPartitions$(DataSourceScanExec.scala:265) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions$lzycompute(DataSourceScanExec.scala:543) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.selectedPartitions(DataSourceScanExec.scala:543) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions(DataSourceScanExec.scala:312) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanLike.dynamicallySelectedPartitions$(DataSourceScanExec.scala:285) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions$lzycompute(DataSourceScanExec.scala:543) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.dynamicallySelectedPartitions(DataSourceScanExec.scala:543) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.isDataPrefetchSupportedForAllFiles(DataSourceScanExec.scala:697) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.shouldPrefetchData$lzycompute(DataSourceScanExec.scala:599) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.shouldPrefetchData(DataSourceScanExec.scala:595) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:628) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:603) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FileSourceScanExec.doExecuteColumnar(DataSourceScanExec.scala:753) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:241) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:265) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:262) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:237) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.InputAdapter.doExecuteColumnar(WholeStageCodegenExec.scala:678) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:241) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:265) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:262) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:237) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.ColumnarToRowExec.inputRDDs(Columnar.scala:399) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:304) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:53) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:950) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:214) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:265) ~[spark-sql_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) ~[spark-core_2.12-3.4.1-amzn-2.jar:3.4.1-amzn-2]
@danny0405
Copy link
Contributor

Related fix: #12160

@dataproblems
Copy link
Author

Thank you @danny0405 - do you have an estimate for the versions that will get this fix? or will it only be for the latest one?

@ad1happy2go
Copy link
Collaborator

@dataproblems Can you also please test this PR on your dataset?

@dataproblems
Copy link
Author

@ad1happy2go - are you suggesting that I build the hudi jar and apply these changes and use that jar instead of the packages available through maven central? If I have some extra bandwidth, I will give that a try but you can also reproduce the error using the random dataset for which I provided the data generation code.

@ad1happy2go
Copy link
Collaborator

@dataproblems Correct thats what i meant. But i will give it a try if you dont have bandwidth. Thanks.

@ad1happy2go ad1happy2go added the priority:critical production down; pipelines stalled; Need help asap. label Nov 15, 2024
@ad1happy2go ad1happy2go moved this from ⏳ Awaiting Triage to 🚧 Needs Repro in Hudi Issue Support Nov 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
index priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types
Projects
Status: 🚧 Needs Repro
Development

No branches or pull requests

3 participants