Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Python] write_dataset does not preserve non-nullable columns in schema #35730

Closed
ildipo opened this issue May 24, 2023 · 22 comments · Fixed by #35860
Closed

[Python] write_dataset does not preserve non-nullable columns in schema #35730

ildipo opened this issue May 24, 2023 · 22 comments · Fixed by #35860
Assignees
Labels
Component: Python Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Type: bug
Milestone

Comments

@ildipo
Copy link
Contributor

ildipo commented May 24, 2023

When writing a table whose schema has not nullable columns using write_dataset the not nullable info is not saved

To reproduce

import pyarrow as pa
import pyarrow.parquet as pq
import datetime as dt
import pyarrow.dataset as ds

table = pa.Table.from_arrays([[1,2,3],[None,5,None], [dt.date(2023,1,1), dt.date(2023,1,2), dt.date(2023,1,3)]],
    schema=pa.schema([pa.field("x", pa.int64(), nullable=False), pa.field("y", pa.int64(), nullable=True), pa.field("date", pa.date32(), nullable=True)]))
print(table.schema)
# schema shows  column 'x' as not nullable

pq.write_to_dataset(table, parquet_test1", partitioning=['date'], partitioning_flavor='hive')
dataset = ds.dataset("parquet_test1", format="parquet", partitioning="hive")
dataset.to_table().schema
# column 'x' is nullable

pa.dataset.write_dataset(table, "parquet_test2", partitioning=['date'], partitioning_flavor='hive', format='parquet')
dataset = ds.dataset("parquet_test2", format="parquet", partitioning="hive")
dataset.to_table().schema
# column 'x' is nullable

Component(s)

Python

@jorisvandenbossche
Copy link
Member

@ildipo Thanks for the report!

The parquet format doesn't has such a flag directly, but it stores nulls as a repetition level, and you can indicate a field to be "required", and it seems that for reading writing individual tables to Parquet files, we translate "not null" into required parquet types, and also when reading convert a required field back to "not null":

>>> pq.write_table(table, "test_nullability.parquet")
>>> pq.read_metadata("test_nullability.parquet").schema
<pyarrow._parquet.ParquetSchema object at 0x7f21b778fec0>
required group field_id=-1 schema {
  required int64 field_id=-1 x;
  optional int64 field_id=-1 y;
  optional int32 field_id=-1 date (Date);
}
>>> pq.read_table("test_nullability.parquet").schema
Out[28]: 
x: int64 not null
y: int64
date: date32[day]

So it seems this is supported in the Parquet module itself, and so this should be something in the dataset API that loses this information. Quick guess is that it has to do with partitioning:

>>> pq.write_to_dataset(table, "test_dataset_nullability"')
# reading directory -> lost "not null"
>>> ds.dataset("test_dataset_nullability/", format="parquet").schema
x: int64
y: int64
date: date32[day]

# reading single file -> preserved "not null"
>>> ds.dataset("test_nullability.parquet", format="parquet").schema
Out[37]: 
x: int64 not null
y: int64
date: date32[day]

@westonpace
Copy link
Member

Yes, write_dataset is a bit tricky when it comes to schema information. If the input is multiple tables, then write_dataset is probably going to be combining them into a single output table, so which metadata do we use? What the write node does today is allow a custom_metadata to be supplied, in addition to the dataset, which it will attach to all written batches.

Then we have a bit of a hack in place today for "If the input is a single table then preserve the metadata". This is in FileSystemDataset::Write which is what pyarrow is using today:

  // The projected_schema is currently used by pyarrow to preserve the custom metadata
  // when reading from a single input file.
  const auto& custom_metadata = scanner->options()->projected_schema->metadata();

This custom_metadata is not currently exposed to pyarrow. So I think we probably want to add it.

That being said, custom_metadata may not be sufficient here. It only allows you to specify the key/value metadata for the schema, and not individual field metadata. So we'd need to change that too. All put together we have:

  • Change WriteNodeOptions::custom_metadata to WriteNodeOptions::schema
  • Do one of the following:
    • Add custom_schema to FileSystemDataset::Write
    • Change pyarrow to use Acero (and WriteNodeOptions) directly instead of FileSystemDataset::Write
  • Add pyarrow bindings for whichever approach we did in the previous step

@ildipo
Copy link
Contributor Author

ildipo commented May 24, 2023

The behavior changed sometime between arrow 7 and 12 since it used to work with arrow 7

@ildipo
Copy link
Contributor Author

ildipo commented May 24, 2023

I think we want the solution that is easier to backport to arrow 12

@daniel-shields
Copy link

Note that this is also broken when the 'schema' parameter is passed explicitly.

@westonpace
Copy link
Member

I think we want the solution that is easier to backport to arrow 12

Does it work if you set use_legacy_dataset=True?

@westonpace
Copy link
Member

Nvm, I see this is write_dataset and not write_to_dataset. Were you using write_dataset in 7.0.0?

@jorisvandenbossche
Copy link
Member

@weston note that this is not (AFAIU) about custom metadata, but just about how the arrow schema gets translated to a Parquet schema (or how the arrow schema gets changed throughout dataset writing).

If we write a single file (directly using the Parquet file writer, not going through datasets), then a pyarrow field with nullable=False gets translated into a "required" parquet field:

>>> schema = pa.schema([pa.field("col1", "int64", nullable=True), pa.field("col2", "int64", nullable=False)])
>>> table = pa.table({"col1": [1, 2, 3], "col2": [2, 3, 4]}, schema=schema)
>>> table.schema
col1: int64
col2: int64 not null

>>> pq.write_table(table, "test_nullability.parquet")
>>> pq.read_metadata("test_nullability.parquet").schema
<pyarrow._parquet.ParquetSchema object at 0x7f21957c9700>
required group field_id=-1 schema {
  optional int64 field_id=-1 col1;
  required int64 field_id=-1 col2;       # <--- this is "required" instead of "optional"
}

But if we write this as a single file (in a directory) through the dataset API (so not even using a partitioning column), the non-nullable column is no longer "required" in the parquet field:

>>> ds.write_dataset(table, "test_dataset_nullability/", format="parquet")
>>> pq.read_metadata("test_dataset_nullability/part-0.parquet").schema
Out[68]: 
<pyarrow._parquet.ParquetSchema object at 0x7f219d16cfc0>
required group field_id=-1 schema {
  optional int64 field_id=-1 col1;
  optional int64 field_id=-1 col2;        # <--- no longer "required" !
}

So I suppose that somewhere in the dataset writing code path, the schema looses the field nullability information

The behavior changed sometime between arrow 7 and 12 since it used to work with arrow 7

I suppose this is because we now use pyarrow.dataset.write_dataset under the hood in pq.write_to_dataset, i.e. going through the dataset API, while the "legacy" implementation of pq.write_to_dataset used a custom implementation using the direct parquet file writer (and then it comes down to the difference between those two as illustrated above).

@jorisvandenbossche
Copy link
Member

Digging a bit further, this nullable field information is lost in acero's ProjectNode (the FileSystemDataset::Write call is essentially a combination of source+project+filter+write nodes).

Small reproducer in python:

from pyarrow.acero import Declaration, TableSourceNodeOptions, ProjectNodeOptions, field

schema = pa.schema([pa.field("col1", "int64", nullable=True), pa.field("col2", "int64", nullable=False)])
table = pa.table({"col1": [1, 2, 3], "col2": [2, 3, 4]}, schema=schema)
table_source = Declaration("table_source", options=TableSourceNodeOptions(table))
project = Declaration("project", ProjectNodeOptions([field("col1"), field("col2")]))
decl = Declaration.from_sequence([table_source, project])

>>> table.schema
col1: int64
col2: int64 not null
>>> decl.to_table().schema
col1: int64
col2: int64

This happens because the ProjectNode naively recreates the schema from the names/exprs, ignoring the field information of the original input schema:

FieldVector fields(exprs.size());
int i = 0;
for (auto& expr : exprs) {
if (!expr.IsBound()) {
ARROW_ASSIGN_OR_RAISE(expr, expr.Bind(*inputs[0]->output_schema(),
plan->query_context()->exec_context()));
}
fields[i] = field(std::move(names[i]), expr.type()->GetSharedPtr());
++i;
}
return plan->EmplaceNode<ProjectNode>(plan, std::move(inputs),
schema(std::move(fields)), std::move(exprs));

So this only preserves the type of the original input schema, but will ignore any nullable flag or field metadata information (and then we only do some special code to preserve the custom metadata of the full schema)

@westonpace rereading your original comment, while your explanation first focused on the schema metadata, you actually also already said essentially the above:

That being said, custom_metadata may not be sufficient here. It only allows you to specify the key/value metadata for the schema, and not individual field metadata.

But for what we need to do about this: shouldn't the ProjectNode just try to preserve this information for trivial field ref expressions?

@ildipo
Copy link
Contributor Author

ildipo commented May 25, 2023

in 7.0 we were using write_dataset and it was working with 12 we tried using write_dataset, write_to_dataset as well as use_legacy_dataset=True and none is working

@ildipo
Copy link
Contributor Author

ildipo commented May 25, 2023

But for what we need to do about this: shouldn't the ProjectNode just try to preserve this information for trivial field ref expressions?

If this is enough it should be pretty quick

@westonpace
Copy link
Member

So here is the change that introduced this: #31452

Before the change we used to require the schema be specified on the write node options. This was a unnecessary burden when you didn't care about any custom field information (since we've already calculated the schema).

But for what we need to do about this: shouldn't the ProjectNode just try to preserve this information for trivial field ref expressions?

I think there is still the problem that we largely ignore nullability. We can't usually assume that all batches will have the same nullability. For example, imagine a scan node where we are scanning two different parquet files. One of the parquet files marks a column as nullable and the other does not. I suppose the correct answer, if Acero were nulalbility-aware and once evolution is a little more robust, would be to "evolve" the schema of the file with a nullable type to a non-nullable type so that we have a common input schema.

In the meantime, the quickest simple fix to this regression is to allow the user to specify an output schema instead of just key / value metadata.

@raulcd raulcd added this to the 12.0.1 milestone Jun 1, 2023
westonpace added a commit that referenced this issue Jun 1, 2023
… write (#35860)

### Rationale for this change

The dataset write node previously allowed you to specify custom key/value metadata on a write node.  This was added to support saving schema metadata.  However, it doesn't capture field metadata or field nullability.  This PR replaces that capability with the ability to specify a custom schema instead.  The custom schema must have the same number of fields as the input to the write node and each field must have the same type.

### What changes are included in this PR?

Added `custom_schema` to `WriteNodeOptions` and removed `custom_metadata`.

### Are these changes tested?

Yes, I added a new C++ unit test to verify that the custom info is applied to written files.

### Are there any user-facing changes?

No.  Only new functionality (which is user facing)

* Closes: #35730

Lead-authored-by: Weston Pace <[email protected]>
Co-authored-by: Nic Crane <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: anjakefala <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
@westonpace westonpace modified the milestones: 12.0.1, 13.0.0 Jun 1, 2023
raulcd pushed a commit that referenced this issue Jun 1, 2023
… write (#35860)

### Rationale for this change

The dataset write node previously allowed you to specify custom key/value metadata on a write node.  This was added to support saving schema metadata.  However, it doesn't capture field metadata or field nullability.  This PR replaces that capability with the ability to specify a custom schema instead.  The custom schema must have the same number of fields as the input to the write node and each field must have the same type.

### What changes are included in this PR?

Added `custom_schema` to `WriteNodeOptions` and removed `custom_metadata`.

### Are these changes tested?

Yes, I added a new C++ unit test to verify that the custom info is applied to written files.

### Are there any user-facing changes?

No.  Only new functionality (which is user facing)

* Closes: #35730

Lead-authored-by: Weston Pace <[email protected]>
Co-authored-by: Nic Crane <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: anjakefala <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
@raulcd
Copy link
Member

raulcd commented Jun 1, 2023

@github-actions crossbow submit test-r-ubuntu-22.04

@raulcd
Copy link
Member

raulcd commented Jun 1, 2023

@github-actions crossbow submit test-r-versions

@raulcd
Copy link
Member

raulcd commented Jun 1, 2023

the two jobs above have failed on the maintenance branch (https://github.com/ursacomputing/crossbow/actions/runs/5149898329/jobs/9273436127 and https://github.com/ursacomputing/crossbow/actions/runs/5149898179/jobs/9273436054). I am validating what is the status here as they seem related with this change.

@anjakefala
Copy link
Collaborator

compute-exec.cpp: In function ‘void ExecPlan_Write(const std::shared_ptr<arrow::acero::ExecPlan>&, const std::shared_ptr<arrow::acero::ExecNode>&, const std::shared_ptr<arrow::Schema>&, const std::shared_ptr<arrow::dataset::FileWriteOptions>&, const std::shared_ptr<arrow::fs::FileSystem>&, std::string, const std::shared_ptr<arrow::dataset::Partitioning>&, std::string, arrow::dataset::ExistingDataBehavior, int, uint32_t, uint64_t, uint64_t, uint64_t)’:
compute-exec.cpp:340:11: error: ‘class arrow::dataset::WriteNodeOptions’ has no member named ‘custom_schema’
  340 |   options.custom_schema = std::move(schema);
      |           ^~~~~~~~~~~~~
make: *** [/usr/lib/R/etc/Makeconf:200: compute-exec.o] Error 1
make: *** Waiting for unfinished jobs....
ERROR: compilation failed for package ‘arrow’
* removing ‘/arrow/r/check/arrow.Rcheck/arrow’

Error: R CMD check found ERRORs

@anjakefala
Copy link
Collaborator

@anjakefala
Copy link
Collaborator

I do see custom_schema being declared in the header for WriteNodeOptions: https://github.com/apache/arrow/pull/35860/files#diff-16fb00f643139995febaccf5315f70d5173f2c378b883cb3c2d25b614f6f4493R482

@anjakefala
Copy link
Collaborator

@thisisnic Would you be able to take a look?

@westonpace
Copy link
Member

@raulcd

The error is a bit of a red herring. It is not building Arrow-C++. Instead it is downloading Arrow-C++. If you look at a passing build (e.g. from the nightly tests) you can see:

2023-05-30T01:07:19.3429074Z * installing *source* package ‘arrow’ ...
2023-05-30T01:07:19.3429654Z ** using staged installation
2023-05-30T01:07:19.3429994Z *** Found libcurl and OpenSSL >= 1.1
2023-05-30T01:07:19.3430691Z trying URL 'https://nightlies.apache.org/arrow/r/libarrow/bin/linux-openssl-1.1/arrow-12.0.0.9000.zip'
2023-05-30T01:07:19.3431226Z Error in download.file(from_url, to_file, quiet = hush) : 
2023-05-30T01:07:19.3431942Z   cannot open URL 'https://nightlies.apache.org/arrow/r/libarrow/bin/linux-openssl-1.1/arrow-12.0.0.9000.zip'
2023-05-30T01:07:19.3432612Z *** Downloading libarrow binary failed for version 12.0.0.9000 (linux-openssl-1.1)
2023-05-30T01:07:19.3433276Z     at https://nightlies.apache.org/arrow/r/libarrow/bin/linux-openssl-1.1/arrow-12.0.0.9000.zip
2023-05-30T01:07:19.3433789Z *** Found local C++ source: '/arrow/cpp'
2023-05-30T01:07:19.3434126Z *** Building libarrow from source
2023-05-30T01:07:19.3434552Z     For build options and troubleshooting, see the install guide:
2023-05-30T01:07:19.3435014Z     https://arrow.apache.org/docs/r/articles/install.html

On the other hand, if you look at these failing builds, you see:

2023-06-01T22:45:52.2820480Z * installing *source* package ‘arrow’ ...
2023-06-01T22:45:52.2820835Z ** using staged installation
2023-06-01T22:45:52.2826960Z **** pkg-config not installed, setting ARROW_DEPENDENCY_SOURCE=BUNDLED
2023-06-01T22:45:52.2827523Z *** Found libcurl and OpenSSL >= 1.1
2023-06-01T22:45:52.2830096Z trying URL 'https://apache.jfrog.io/artifactory/arrow/r/12.0.0/libarrow/bin/linux-openssl-1.1/arrow-12.0.0.zip'
2023-06-01T22:45:52.2830790Z Content type 'application/zip' length 40016664 bytes (38.2 MB)
2023-06-01T22:45:52.2831184Z ==================================================
2023-06-01T22:45:52.2835569Z downloaded 38.2 MB
2023-06-01T22:45:52.2835774Z 
2023-06-01T22:45:52.2836129Z *** Successfully retrieved C++ binaries (linux-openssl-1.1)

So the nightly test looks for 12.0.0.9000 which, of course, doesn't exist. Then it falls back to building from source. This is what we want.

The test build you've shared is looking for 12.0.0 (shouldn't this be 12.0.1?) It finds it, and then it doesn't build Arrow-C++ from source.

@raulcd
Copy link
Member

raulcd commented Jun 2, 2023

@github-actions crossbow submit test-r-ubuntu-22.04

@raulcd
Copy link
Member

raulcd commented Jun 2, 2023

ok, I've finally realised this is the issue, not the PR :)

dgreiss pushed a commit to dgreiss/arrow that referenced this issue Jun 4, 2023
…ataset write (apache#35860)

The dataset write node previously allowed you to specify custom key/value metadata on a write node.  This was added to support saving schema metadata.  However, it doesn't capture field metadata or field nullability.  This PR replaces that capability with the ability to specify a custom schema instead.  The custom schema must have the same number of fields as the input to the write node and each field must have the same type.

Added `custom_schema` to `WriteNodeOptions` and removed `custom_metadata`.

Yes, I added a new C++ unit test to verify that the custom info is applied to written files.

No.  Only new functionality (which is user facing)

* Closes: apache#35730

Lead-authored-by: Weston Pace <[email protected]>
Co-authored-by: Nic Crane <[email protected]>
Co-authored-by: Joris Van den Bossche <[email protected]>
Co-authored-by: anjakefala <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Weston Pace <[email protected]>
@westonpace westonpace added the Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. label Jun 23, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Python Critical Fix Bugfixes for security vulnerabilities, crashes, or invalid data. Type: bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants