Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistent arrow timestamp type breaks datafusion query #2341

Closed
qinix opened this issue Mar 25, 2024 · 12 comments
Closed

Inconsistent arrow timestamp type breaks datafusion query #2341

qinix opened this issue Mar 25, 2024 · 12 comments
Labels
bug Something isn't working

Comments

@qinix
Copy link
Contributor

qinix commented Mar 25, 2024

Environment

Delta-rs version: current main(abafd2d)

Binding: rust, python


Bug

What happened:

Prior versions of this library serialize PrimitiveType::Timestamp as ArrowDataType::Timestamp(TimeUnit::Microsecond, None), while PR #2236 changed this behavior to serialize as ArrowDataType::Timestamp(TimeUnit::Microsecond, Some("UTC")), leads to different timestamp schema in different parquet file.

When querying delta lake with Arrow Datafusion and filtering by timestamp column, datafusion throws Invalid comparison operation: Timestamp(Microsecond, Some("UTC")) < Timestamp(Microsecond, None). After changing the filter type to Timestamp(Microsecond, Some("UTC")), datafusion throws Invalid comparison operation: Timestamp(Microsecond, None) < Timestamp(Microsecond, Some("UTC")). The left side of the comparison operation is a timestamp column in delta lake.

@qinix qinix added the bug Something isn't working label Mar 25, 2024
@ion-elgreco
Copy link
Collaborator

ion-elgreco commented Mar 25, 2024

This actually had breaking effects since the prior behavior was incorrect. I suggest you rewrite your tables so that the parquet timestamp is properly encoded.

It's also strange that the parquet encoded type takes precedence over the arrow schema that's coming from delta shema. This doesn't seem right

@ion-elgreco
Copy link
Collaborator

@qinix I assume you have this issue only on tables written with older versions and then read with the latest main?

@qinix
Copy link
Contributor Author

qinix commented Mar 25, 2024

@qinix I assume you have this issue only on tables written with older versions and then read with the latest main?

Yes, it is

@cmettler
Copy link

i have a similar issue when trying to delete records via a UTC timestamp field. I was not able to create a UTC timestamp for the right side of the predicate:

pa = duckdb.sql("select now()::timestamptz at time zone 'CET' at time zone 'UTC' as ts" ).to_arrow_table()
write_deltalake(target_table, pa,engine="rust",storage_options=storage_options,mode='overwrite')
dt = DeltaTable(target_table,storage_options=storage_options)
dt.delete("ts >= to_timestamp_micros('2024-03-27 00:00:00Z','%Y-%m-%d %H:%M:%S%#z')")

fails with:
ValueError: Invalid comparison operation: Timestamp(Microsecond, Some("UTC")) >= Timestamp(Microsecond, None)

@ravid08
Copy link

ravid08 commented Apr 8, 2024

I am facing similar issue when trying to write to a delta table previously written to by a Glue PySpark job. I chose to use Glue for one time full load of the source data and then use Lambda with deltalake python package to load on-going streaming data. The Glue job loads the data and then Lambda fails with the following error:
"Schema error: Fail to merge schema because the from data_type = Timestamp(Microsecond, Some(\"UTC\")) does not equal Timestamp(Nanosecond, None)
I have tried various options like:

  • trying to set SparkSession.config("spark.sql.timestampType", "TIMESTAMP_LTZ") \
  • using to_utc_timestamp to convert the incoming value to UTC
  • read schema of the delta table loaded by Lambda first and then pass it to spark.write.option("schema", myschema)

but unable to make these two schemas agree on a common format for timestamp!!

@ion-elgreco
Copy link
Collaborator

@ravid08 I am quite sure you wrote with spark without changing the default spark timestamp parquet type from int96 to timestamp_micros.

Try restoring the table prior to full load, then do load with spark again with timestamp_micros as the default timestamp parquet type

@ravid08
Copy link

ravid08 commented Apr 8, 2024

@ion-elgreco can you please clarify, I am using Glue 4.0 which supports Spark 3.3. From what I see pyspark.sql.functions.timestamp_micros is a new feature in Spark 3.5.
Also, I am not using delta table, all I am doing is using write_deltalake from writer.py because the table will be created in an external hive metastore.

@ion-elgreco
Copy link
Collaborator

@ravid08 you mentioned you did a full load with spark, so i assume it wrote some parquet files. Those parquet files without timestam_micros setting will have timestamps in int96. At the moment the parquet crate interprets this as timestamp nanoseconds

@ravid08
Copy link

ravid08 commented Apr 8, 2024

@ion-elgreco
The original value from the source is a string 2024-08-06T16:34:16.000574Z
I did a full load of that data using spark to s3 in delta format:
df = df.withColumn(col, fn.to_timestamp(fn.col(col)))
I created a table in hive metastore on these delta files and I could see the datatype is timestamp, not int.

@ion-elgreco
Copy link
Collaborator

@ravid08 im meaning the logical type in the parquet file is represented as INT96 when you don't set this spark conf setting

@ravid08
Copy link

ravid08 commented Apr 8, 2024

@ion-elgreco OK. sounds like spark 3.5 is required, which doesn't exist in Glue (yet).

@ravid08
Copy link

ravid08 commented Apr 8, 2024

Thanks for the pointer @ion-elgreco the following config worked:
SparkSession.config("spark.sql.parquet.outputTimestampType", "TIMESTAMP_MICROS")

ion-elgreco pushed a commit that referenced this issue Jul 21, 2024
# Description

By casting the read record batch to the delta schema datafusion can read
tables where the underlying parquet files can be cast to the desired
schema. Fixes:

- Errors querying data where some of the parquet files may not have
columns that were added later because of schema migration. This includes
nested columns for structs that are in Maps, Lists, or children of other
structs
- maps and lists written with different different element names
- timestamps of different units.
- Any other cast supported by arrow-cast.

This can be done now since data-fusion exposes a SchemaAdapter which can
be overwritten.

We should note that this makes all times being read by delta-rs as
having microsecond precision to match the Delta protocol.

# Related Issue(s)
- This makes solving #2478 and #2341 just a matter of adding code to
delta-rs cast.

---------

Co-authored-by: Alex Wilcoxson <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants