Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance UUID Compatibility in Iceberg to Hudi Conversion #543

Open
1 of 2 tasks
danielhumanmod opened this issue Sep 18, 2024 · 2 comments
Open
1 of 2 tasks

Enhance UUID Compatibility in Iceberg to Hudi Conversion #543

danielhumanmod opened this issue Sep 18, 2024 · 2 comments

Comments

@danielhumanmod
Copy link
Contributor

Feature Request / Improvement

Context

In #112 , we are working on adding UUID support in XTable. Currently, only Iceberg has the concept of a UUID, which is stored as a fixed-size byte array in the underlying Parquet files. To facilitate conversion between Iceberg and formats like Delta or Hudi, we introduced a UUID internal type.

Problem Statement

While converting UUIDs from Iceberg to Hudi, we encountered an issue with Spark’s ParquetSchemaConverter. It appears that Spark cannot read FIXED_LEN_BYTE_ARRAY fields annotated with the UUID logical type in Parquet. You can refer to the relevant source code here: ParquetSchemaConverter.

As of now, we have successfully implemented UUID conversion for Iceberg to Delta. However, Iceberg to Hudi remains a limitation due to this issue, which we hope to address in the future.

Error Log


Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: FIXED_LEN_BYTE_ARRAY (UUID).
	at org.apache.spark.sql.errors.QueryCompilationErrors$.illegalParquetTypeError(QueryCompilationErrors.scala:1762)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.illegalType$1(ParquetSchemaConverter.scala:206)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertPrimitiveField$2(ParquetSchemaConverter.scala:310)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertPrimitiveField(ParquetSchemaConverter.scala:224)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertField(ParquetSchemaConverter.scala:187)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3(ParquetSchemaConverter.scala:147)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.$anonfun$convertInternal$3$adapted(ParquetSchemaConverter.scala:117)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convertInternal(ParquetSchemaConverter.scala:117)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetToSparkSchemaConverter.convert(ParquetSchemaConverter.scala:87)
	at org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormatHelper$.buildImplicitSchemaChangeInfo(HoodieParquetFileFormatHelper.scala:31)
	at org.apache.spark.sql.execution.datasources.parquet.Spark34LegacyHoodieParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(Spark34LegacyHoodieParquetFileFormat.scala:254)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@vinishjail97
Copy link
Contributor

@danielhumanmod Can we close this issue ?

@danielhumanmod
Copy link
Contributor Author

danielhumanmod commented Sep 26, 2024

Maybe not for now. This is a problem we can’t resolve in issue #112 . After discussing it with @the-other-tim-brown , we decided to leave this issue open as a reminder if we can find a way to make it work on the Hudi or Spark side in the future.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants