Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partition value string deserialization for timestamp/binary #371

Merged
merged 5 commits into from
Sep 15, 2021

Conversation

zijie0
Copy link
Contributor

@zijie0 zijie0 commented Aug 10, 2021

Description

support partition value string deserialization for timestamp/binary

Related Issue(s)

Documentation

@zijie0
Copy link
Contributor Author

zijie0 commented Aug 10, 2021

I'm not sure the binary format test case and implementation is correct.

In the delta spec description, the binary data would be encoded as a string of escaped binary values. For example, "\u0001\u0002\u0003".

I tried to generate a delta table which partitioned with binary data column with values "\u0001" and "\u0002". In the _delta_log file, the partitionValues seems to be consistent with the document:

{
    "add": {
        "path": "biz_date=%2501/part-00000-bdbac3d9-1594-48eb-b83c-883efd820c3a.c000.snappy.parquet",
        "partitionValues": {
            "biz_date": "\u0001"
        },
        "size": 472,
        "modificationTime": 1628584740000,
        "dataChange": true
    }
}

Then I tried to debug the new implementation with following test case:

use deltalake::checkpoints::CheckPointWriter;
use deltalake::storage;

#[tokio::test]
async fn write_checkpoint_partitioned_on_binary() {
    let table_location = "./tests/data/partitioned_data/test_partition_on_binary";
    let log_path = "./tests/data/partitioned_data/test_partition_on_binary/_delta_log";

    // Load the delta table with binary partition
    let table = deltalake::open_table(table_location).await.unwrap();

    // Write a checkpoint
    let storage_backend = storage::get_backend_for_uri(log_path).unwrap();
    let checkpoint_writer = CheckPointWriter::new(log_path, storage_backend);
    let _ = checkpoint_writer
        .create_checkpoint_from_state(table.version, table.get_state())
        .await
        .unwrap();
}

And watch the input parameter of typed_partition_value_from_string, the input string_value is not "\u0001" but "�". So I also use the original string value in the test case.

Although typed_partition_value_from_string_test passed, the write_checkpoint_partitioned_on_binary test failed. The error message is:

called `Result::unwrap()` on an `Err` value: ArrowError { source: JsonError("Binary type is not supported") }

I'm not sure if I've missed some important things here...

@houqp
Copy link
Member

houqp commented Aug 10, 2021

Thanks @zijie0 , I will take a closer look at the PR later today or tomorrow. The timestamp handling part is going to be tricky due to ambiguities in the delta spec, see: delta-io/delta#643. The nanosecond type we write out is a logical timestamp type stored in int64, which is not supported by spark. Spark only reads nanosecond type stored in int96, which is deprecated in newer versions of parquet format. The rust parquet crate supports reading int96 timestamp, but not writing int96 as recommended by the parquet developers. So we need to look into this more to see what's the best way to handle timestamp.

@zijie0
Copy link
Contributor Author

zijie0 commented Aug 11, 2021

Hmm... It reminds me a similar issue we encounter some time ago: dask/fastparquet#646

@houqp
Copy link
Member

houqp commented Aug 13, 2021

Yeah, it's similar to the one you posted. Basically spark writes out timestamp in the deprecated int96 type by default. When reading nanosecond timestamps, it only supports int96, not the newer logical nanosecond type (int64), which is also not recommended due to limited precision constrained by int64. So the best type to use going forward is logical micro second type (int64). This type is supported by spark for both read and write. My hunch on this is we should probably use micro second timestamp here as well instead of nano seconds.

@zijie0
Copy link
Contributor Author

zijie0 commented Aug 13, 2021

It seems that in #375 , we will change the time unit back to micro second. Maybe I should modify my PR after the previous one getting merged?

@houqp
Copy link
Member

houqp commented Aug 13, 2021

yeah, I will send a email to the dev list later this week to get more clarifications from the delta authors on what's the expected way to handle timestamp semantics.

@zijie0
Copy link
Contributor Author

zijie0 commented Aug 13, 2021

Another (entry level) question is that when I'm debugging the binary format case, the call stack only shows the error is thrown from unwrap call. Do you have any best practice on how to debug async methods to locate where the error is originally generated?

@houqp
Copy link
Member

houqp commented Aug 13, 2021

Async stack trace is unfortunately a pain point I have as well. So I don't really have a good solution either. I usually will try running the code with RUST_BACKTRACE=1 first, if that didn't get me enough information, I would then use the backtrace module to explicitly print out the stack trace right before where the crash happens.

@houqp
Copy link
Member

houqp commented Aug 18, 2021

Sorry @zijie0 for the late reply, I finally got some time to take a closer look at the binary type test error you reported.

I believe it's coming from https://github.com/apache/arrow-rs/blob/3fa6fcdbc09542343fb42a2b5b6c5bbe2e56c125/arrow/src/json/reader.rs#L1300. This means arrow-rs itself currently doesn't support reading json string into binary data type. It should be a fairly straight forward thing to add though. So if you are interested in getting a commit into Arrow Rust implementation, please let me know, I am more than happy to help guide you through the process. If not, I can add that support for you this weekend to unblock this PR.

the input string_value is not "\u0001" but "�". So I also use the original string value in the test case.

This is because when we load the json file in rust using serde_json, the string "\u0001" is already parsed by the json parser into unicode string. Here is the quote from the JSON RFC: https://www.ietf.org/rfc/rfc4627.txt.

Any character may be escaped. If the character is in the Basic
Multilingual Plane (U+0000 through U+FFFF), then it may be
represented as a six-character sequence: a reverse solidus, followed
by the lowercase letter u, followed by four hexadecimal digits that
encode the character's code point.

Given that typed_partition_value_from_option_string works with Add.partition_value, which is already a Rust HashMap parsed from the JSON commit log entry, what you are doing in this PR is the correct behavior I believe. i.e. passing the parsed string value as is to downstream parquet serialization code path.

@houqp
Copy link
Member

houqp commented Aug 18, 2021

also cc @xianwill @mosyp in case you notice any issue with binary column type handling in production with kafka delta ingest ;)

@zijie0
Copy link
Contributor Author

zijie0 commented Aug 18, 2021

@houqp Cool, let me take a look at arrow-rs project first.

@zijie0 zijie0 marked this pull request as draft August 18, 2021 12:11
@zijie0
Copy link
Contributor Author

zijie0 commented Aug 19, 2021

Hi @houqp , I tried to draft a commit for supporting binary type in arrow-rs. I'm not sure where can I find something like "arrow spec" so I just read the source code to do the implementation. It seems that BinaryArray is very similar to StringArray so that I made a similar implementation here.

The commit is: zijie0/arrow-rs@926ea32
Could you please take a look? Thanks in advance!

@houqp
Copy link
Member

houqp commented Aug 19, 2021

@zijie0 that looks good to me, please feel free to send an upstream arrow-rs PR :) Arrow spec is documented at https://arrow.apache.org/docs/format/Columnar.html.

@zijie0 zijie0 marked this pull request as ready for review September 15, 2021 03:33
@zijie0
Copy link
Contributor Author

zijie0 commented Sep 15, 2021

@houqp I've changed timestamp type to micro seconds. Please take a look when you got some time.

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @zijie0 , sorry I forgot to send the timestamp clarification email to the dev list. Will try to get it out this week.

rust/src/delta_arrow.rs Outdated Show resolved Hide resolved
@houqp houqp merged commit 58dcb1e into delta-io:main Sep 15, 2021
@houqp
Copy link
Member

houqp commented Sep 15, 2021

Thank you @zijie0 ! We can tackle the timestamp precision problem later :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants