Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow writing pa.Table that are either a subset of table schema or in arbitrary order, and support type promotion on write #921

Merged
merged 13 commits into from
Jul 17, 2024

Conversation

sungwy
Copy link
Collaborator

@sungwy sungwy commented Jul 12, 2024

Another attempt at fixing #674

This PR is motivated by work started off by @kevinjqliu in #829

  • it enables support for writing dataframes that are a subset of the Iceberg Table's schema
  • it enables support for writing dataframes that have fields in any arbitrary order
  • we convert the pyarrow schema to Iceberg Schema using the name mapping of the Table, and checks for nullability and type consistency.

Here we introduce terminologies requested_schema to refer to the Iceberg Table schema we are checking against, and the provided_schema which represents the schema of the Dataframe we are attempting to write.

A field is considered compatible if satisfies the following checks:

  • An optional field in the requested_schema can be required, optional or missing in the provided_schema
  • A required field in the requested_schema must also be a required field in the provided_schema
  • A primitive type in the requested_schema must be represented as its own type, or a smaller type that can be promote-ed to the said primitive type (e.g. LongType in requested_schema and IntegerType in provided_schema)
  • MapType, ListType, or StructType must be of the same type, but the validity of its child fields will be checked independently from the validity of the parent field.

EDIT:
This PR also includes changes to support type promotion on write.
For instance, a pa.int32() column from the pyarrow construct can be written into an Iceberg Table field of LongType().

@sungwy sungwy changed the title Update _check_compatible_schema to support subset of schema Update _check_compatible_schema to support subset of schema Jul 12, 2024
@sungwy sungwy changed the title Update _check_compatible_schema to support subset of schema Allow writing dataframes that are either a subset of table schema or in arbitrary order Jul 12, 2024
return partner_struct.column(name)
else:
raise ValueError(f"Cannot find {name} in expected partner_struct type {type(partner_struct)}")
except KeyError:
Copy link
Collaborator Author

@sungwy sungwy Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is necessary to support writing dataframes / recordbatches with a subset of the schema. Otherwise, the ArrowAccessor throws a KeyError. This way, we return a None and the ArrowProjectionVisitor is responsible for checking if the field is nullable, and can be filled in with a null array.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change responsible for schema projection / writing a subset of the schema? Do you mind expanding on the mechanism behind how this works? I'm curious

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right - the ArrowProjectionVisitor is responsible for detecting that the field_partner is None and then checking if the table field is also optional before filling it in with a null array. This change is necessary so that the ArrowAccessor doesn't throw an exception if the field can't be found in the arrow component, and enables ArrowProjectionVisitor to make use of a code pathway it wasn't able to make use of before:

if field_array is not None:
array = self._cast_if_needed(field, field_array)
field_arrays.append(array)
fields.append(self._construct_field(field, array.type))
elif field.optional:
arrow_type = schema_to_pyarrow(field.field_type, include_field_ids=False)
field_arrays.append(pa.nulls(len(struct_array), type=arrow_type))
fields.append(self._construct_field(field, arrow_type))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above we have the file_schema that should correspond with the partner_struct. I expect that when looking up the field-id, it should already return None.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I as I pointed out in this comment: #921 (comment) I think write_parquet is using the Table Schema, instead of the Schema corresponding to the data types of the PyArrow construct.

I will take that to mean that this isn't intended and making sure that we use the Schema corresponding to the data types of the PyArrow construct is what we intend to do here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context. This isn't intended, the schema should align with the data. I checked against the last commit, and it doesn't throw the KeyError anymore because of your fix. Thanks 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion - I've removed this try exception block in the latest update.

pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
assert len(arrow_table_without_some_columns.columns) < len(arrow_table_with_null.columns)
tbl.overwrite(arrow_table_without_some_columns)
tbl.append(arrow_table_without_some_columns)
# overwrite and then append should produce twice the data
assert len(tbl.scan().to_arrow()) == len(arrow_table_without_some_columns) * 2


@pytest.mark.integration
@pytest.mark.parametrize("format_version", [1, 2])
def test_table_write_out_of_order_schema(session_catalog: Catalog, arrow_table_with_null: pa.Table, format_version: int) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @kevinjqliu for writing up these tests! Ported them over from your PR

@sungwy sungwy changed the title Allow writing dataframes that are either a subset of table schema or in arbitrary order Allow writing pa.Table that are either a subset of table schema or in arbitrary order Jul 12, 2024
@sungwy sungwy requested review from kevinjqliu, Fokko and HonahX July 12, 2024 21:24
Copy link
Contributor

@HonahX HonahX left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@syun64 Thanks for the quick fix. Also thanks @kevinjqliu for the great work in another PR.

Comment on lines 2125 to 2131
elif lhs.field_type != rhs.field_type:
rich_table.add_row(
field_name,
"Type",
f"{print_nullability(lhs.required)} {str(lhs.field_type)}",
f"{print_nullability(rhs.required)} {str(rhs.field_type)}",
)
Copy link
Contributor

@HonahX HonahX Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking if we can be less restrictive on type. If the rhs's type can be promoted to lhs's type, the case may still be considered as compatible:

 elif lhs.field_type != rhs.field_type:
            try:
                promote(rhs.field_type, lhs.field_type)
            except ResolveError:
                rich_table.add_row(
                    field_name,
                    "Type",
                    f"{print_nullability(lhs.required)} {str(lhs.field_type)}",
                    f"{print_nullability(rhs.required)} {str(rhs.field_type)}",
                )
Example Test case
def test_schema_uuid() -> None:
    table_schema = Schema(
        NestedField(field_id=1, name="foo", field_type=StringType(), required=False),
        NestedField(field_id=2, name="bar", field_type=IntegerType(), required=True),
        NestedField(field_id=3, name="baz", field_type=UUIDType(), required=False),
        schema_id=1,
        identifier_field_ids=[2],
    )
    other_schema = pa.schema((
        pa.field("foo", pa.large_string(), nullable=True),
        pa.field("bar", pa.int32(), nullable=False),
        pa.field("baz", pa.binary(16), nullable=True),
    ))

    _check_schema_compatible(table_schema, other_schema)

    other_schema_fail = pa.schema((
        pa.field("foo", pa.large_string(), nullable=True),
        pa.field("bar", pa.int32(), nullable=False),
        pa.field("baz", pa.binary(15), nullable=True),
    ))

    with pytest.raises(ValueError):
        _check_schema_compatible(table_schema, other_schema_fail)

This could be a possible solution for #855, and should also cover writing pa.int32() (IntegerType) to LongType

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @HonahX I think that's a great suggestion! Thank you for pointing that out. I think it'll actually be a very simple change that addresses my question above:

Question: is it correct to compare both as Iceberg Schema? or do we want to allow a more permissive range of pyarrow types to be supported for writes?

For example, do we want to support writing pa.int32() into LongType()? - maybe we could support this in a subsequent PR?

Copy link
Collaborator Author

@sungwy sungwy Jul 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @HonahX - I tried this out, and I think we may benefit from scoping this out from this PR and investing some more time to figure out the correct way to support type promotions on write. The exception I'm getting is as follows:

tests/integration/test_writes/utils.py:79: in _create_table
    tbl.append(d)
pyiceberg/table/__init__.py:1557: in append
    tx.append(df=df, snapshot_properties=snapshot_properties)
pyiceberg/table/__init__.py:503: in append
    for data_file in data_files:
pyiceberg/io/pyarrow.py:2252: in _dataframe_to_data_files
    yield from write_file(
/usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:621: in result_iterator
    yield _result_or_cancel(fs.pop())
/usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:319: in _result_or_cancel
    return fut.result(timeout)
/usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:458: in result
    return self.__get_result()
/usr/local/python/3.10.13/lib/python3.10/concurrent/futures/_base.py:403: in __get_result
    raise self._exception
/usr/local/python/3.10.13/lib/python3.10/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
pyiceberg/io/pyarrow.py:2030: in write_parquet
    statistics = data_file_statistics_from_parquet_metadata(
pyiceberg/io/pyarrow.py:1963: in data_file_statistics_from_parquet_metadata
    col_aggs[field_id] = StatsAggregator(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <pyiceberg.io.pyarrow.StatsAggregator object at 0x787561b422c0>, iceberg_type = LongType(), physical_type_string = 'INT32', trunc_length = None

    def __init__(self, iceberg_type: PrimitiveType, physical_type_string: str, trunc_length: Optional[int] = None) -> None:
        self.current_min = None
        self.current_max = None
        self.trunc_length = trunc_length
    
        expected_physical_type = _primitive_to_physical(iceberg_type)
        if expected_physical_type != physical_type_string:
>           raise ValueError(
                f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}"
            )
E           ValueError: Unexpected physical type INT32 for long, expected INT64

pyiceberg/io/pyarrow.py:1556: ValueError

And this is because the file_schema that's passed to _to_requested_schema in write_parquet function is just the IcebergTable schema instead of being a Schema representation of the pyarrow Table's data type itself. So when the types of the file_schema and the requested_schema are compared, they are both comparing the Iceberg table type (e.g. LongType) instead of the smaller pyarrow type in the dataframe (e.g. IntegerType).

I think this is going to take a bit of work to ensure that we are using the schema that actually represents the datatype of the types within the Arrow dataframe, because we also have to create a Schema representation of the PyArrow schema that has field_ids consistent with the Iceberg Table schema, because the ArrowProjectionVisitor uses field_ids for lookup against the file_schema.

I'd like to continue this discussion out of scope of this release, but I think we will have to decide on one of the following two approaches:

  1. We decide to write with the compatible smaller parquet types (write using INT32 for a LongType) and fix the StatsAggregator to handle different physical types
  2. Update the file_schema input to _to_requested_schema in write_parquet so that we upcast the arrow data type and write using the larger expected physical types into the parquet file.

Long story short, our underlying piping currently doesn't yet support promote on write and there's still some work left for us in order to do so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to get this to work as well. It should be pretty easy by just first upcasting the buffers before writing.

tests/integration/test_writes/test_writes.py Outdated Show resolved Hide resolved
sungwy and others added 2 commits July 12, 2024 19:42
@HonahX HonahX added this to the PyIceberg 0.7.0 release milestone Jul 13, 2024
@sungwy sungwy requested a review from HonahX July 13, 2024 16:10
Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this. This is awesome, love the new schema compatibility check table! The categories are very helpful for debugging as well.

Porting over Fokko's comment from the other PR
#829 (review)
Do you know if the current change supports "re-aligning" the table?

pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
def print_nullability(required: bool) -> str:
return "required" if required else "optional"

for field_name in fields_in_both:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to check my understanding, this works for nested fields because nested fields are "flattened" by . field_names and then fetched by .find_field.

For example: a df schema like

task_schema = pa.field(
    "person",
    pa.struct([
        pa.field("name", pa.string(), nullable=True),
    ]),
    nullable=True,
)

task_schema.field_names will produce {"person", "person.name"}.
task_schema.find_field("person") and task_schema.find_field("person.name") will fetch the corresponding fields

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's consistent with my understanding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, this could be solved using the SchemaWithPartnerVisitor

return partner_struct.column(name)
else:
raise ValueError(f"Cannot find {name} in expected partner_struct type {type(partner_struct)}")
except KeyError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change responsible for schema projection / writing a subset of the schema? Do you mind expanding on the mechanism behind how this works? I'm curious

@kevinjqliu
Copy link
Contributor

@pdpark does this solution resolve your usecase?

#829 (comment)

@sungwy
Copy link
Collaborator Author

sungwy commented Jul 13, 2024

Thank you for the review comments @kevinjqliu 🙏

Porting over Fokko's comment from the other PR #829 (review) Do you know if the current change supports "re-aligning" the table?

I think this test from your PR covers that case where the fields are re-aligned according to the Iceberg Table spec: https://github.com/apache/iceberg-python/pull/921/files#diff-7f3dd1244d08ce27c003cd091da10aa049f7bb0c7d5397acb4ec69767036accdR982

@kevinjqliu
Copy link
Contributor

My understanding of the "re-aligning" problem is the alignment between parquet and iceberg.

similar to what Fokko mentioned here
#829 (comment)

I think this PR currently only addresses schema compatibility and it is still possible to write parquet files with out-of-order schema.
The test_table_write_out_of_order_schema test shows we are allowing out-of-order schema on write, but we want to sort the schema order before writing.

Can probably punt this to a subsequent PR as an optimization

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up @syun64. I left some comments, but I think it would be better to move the logic to a visitor. WDYT?

│ ❌ │ 3: baz: optional int │ 3: baz: optional string │
│ ✅ │ 4: qux: optional date │ 4: qux: optional date │
└────┴──────────────────────────┴──────────────────────────┘
┏━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━┓
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just me, or is the left easier to read? 😅

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted for this approach because I wanted to group the Extra Fields in the dataframe also into the table. But if we are taking the approach of using the name_mapping to generate the Iceberg Schema with consistent IDs after first checking that there are no extra fields, I think we can go back to the old way

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the new way since it tells me exactly which field to focus on and the reason its not compatible

return partner_struct.column(name)
else:
raise ValueError(f"Cannot find {name} in expected partner_struct type {type(partner_struct)}")
except KeyError:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Above we have the file_schema that should correspond with the partner_struct. I expect that when looking up the field-id, it should already return None.

pyiceberg/io/pyarrow.py Show resolved Hide resolved
if table_schema.as_struct() != task_schema.as_struct():
from rich.console import Console
from rich.table import Table as RichTable
task_schema = assign_fresh_schema_ids(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This naming is still from when we only used it at the read-path, probably we make it more generic. Maybe provided_schema and requested_schema? Open for suggestions!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm debating myself if the API is the most extensible here. I think we should re-use _check_schema_compatible(requested_schema: Schema, provided_schema: Schema) instead of reimplementing the logic in Arrow here. This nicely splits pyarrow_to_schema and _check_schema_compatible.

def print_nullability(required: bool) -> str:
return "required" if required else "optional"

for field_name in fields_in_both:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point, this could be solved using the SchemaWithPartnerVisitor

pyiceberg/io/pyarrow.py Outdated Show resolved Hide resolved
Comment on lines 2125 to 2131
elif lhs.field_type != rhs.field_type:
rich_table.add_row(
field_name,
"Type",
f"{print_nullability(lhs.required)} {str(lhs.field_type)}",
f"{print_nullability(rhs.required)} {str(rhs.field_type)}",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to get this to work as well. It should be pretty easy by just first upcasting the buffers before writing.

@Fokko
Copy link
Contributor

Fokko commented Jul 13, 2024

Just to expand on top of #921 (comment)

This is the situation where you already have two Iceberg schemas with the IDs assigned. I think an easier approach to solve this problem is to first convert the Arrow schema to an Iceberg schema (similar to the logic with the name-mapping), and then compare the two Iceberg schemas.

The process would look like:

  • Convert the Arrow schema into Iceberg using name-mapping, similar to here
  • Compare the two schemas using a SchemaWithPartnerVisitor if they are compatible. If not, we should generate a pretty error.
  • If they are compatible, we should be able to just push the schema through to_requested_schema to align/cast the fields.

@sungwy sungwy marked this pull request as draft July 14, 2024 16:52
# raise ValueError(f"Mismatch in fields:\n{console.export_text()}")


class _SchemaCompatibilityVisitor(SchemaVisitor[bool]):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @Fokko here's my take on using a Visitor as we've discussed - unfortunately the richTable doesn't print in order with the nested fields because a Visitor doesn't traverse in pre-order sequence.

Should we use a PreOrderSchemaVisitor here instead to ensure that the fields are logged in order of IDs?

https://github.com/apache/iceberg-python/pull/921/files#diff-7e699af26a81bbae741fb990c9369ef2b966ac473577d0a8e2b82319e9932dc3R1833-R1862

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think pre-order makes sense here 👍

@@ -2214,13 +2200,16 @@ def _dataframe_to_data_files(
property_name=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
default=TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT,
)
name_mapping = table_metadata.schema().name_mapping
downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
task_schema = pyarrow_to_schema(df.schema, name_mapping=name_mapping, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is necessary to ensure that we are comparing the Schema that matches that arrow table's schema versus the Table Schema in order to properly invoke promote on write

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch 👍

pyiceberg/schema.py Outdated Show resolved Hide resolved
@sungwy sungwy changed the title Allow writing pa.Table that are either a subset of table schema or in arbitrary order Allow writing pa.Table that are either a subset of table schema or in arbitrary order, and support type promotion on write Jul 15, 2024
Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small comments, but this looks great 🤩

sungwy and others added 2 commits July 15, 2024 11:31
Co-authored-by: Fokko Driesprong <[email protected]>
try:
# If type can be promoted to the requested schema
# it is considered compatible
promote(rhs.field_type, lhs.field_type)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This succeeds for append or overwrite, but will cause the add_files operation to fail even though it passes the schema checks. This is because the StatsAggregator will fail to collect stats because it makes an assertion that the physical type of the file be the same as the expected physical type of the IcebergType.

INT32 != physical_type(LongType())

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have a test to reproduce this? This is interesting since for Python int and long are both a int in Python.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll write up a test 👍

The comparison isn't between python types, but between parquet physical types: https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L1503-L1507

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could get away with just removing this check, since we are running a comprehensive type compatibility check already?

expected_physical_type = _primitive_to_physical(iceberg_type)
if expected_physical_type != physical_type_string:
raise ValueError(
f"Unexpected physical type {physical_type_string} for {iceberg_type}, expected {expected_physical_type}"
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed that this would work @Fokko let me know if we are good to move forward with this change!

# Allow promotable physical types
# INT32 -> INT64 and FLOAT -> DOUBLE are safe type casts
if (physical_type_string == "INT32" and expected_physical_type == "INT64") or (
physical_type_string == "FLOAT" and expected_physical_type == "DOUBLE"
Copy link
Collaborator Author

@sungwy sungwy Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've put in this logic to allow StatsAggregator to collect stats for files that are added through add_files that have file field types that map to broader Iceberg Schema types. This feels overly specific, and I feel as though I am duplicating the type promote mappings in a different format. I'm open to other ideas if we want to keep this check on the parquet physical types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we get more promotions (which is expected in V3 with the upcoming variant type, this might need some reworking), but I think we're good for now 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good! There’s unfortunately one more case where we are failing for add_files so will that test case added shortly

@pdpark
Copy link

pdpark commented Jul 16, 2024

@pdpark does this solution resolve your usecase?

#829 (comment)

Not sure - I have a hack in place so I'm not blocked at the moment. I will probably wait for the release to try it in my environment. Looking forward to this release - a lot of great work. Thanks!

@Fokko
Copy link
Contributor

Fokko commented Jul 16, 2024

@kevinjqliu any comments? I think we're good moving this forward

Comment on lines -1899 to -1908
if parquet_metadata.num_columns != len(stats_columns):
raise ValueError(
f"Number of columns in statistics configuration ({len(stats_columns)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})"
)

if parquet_metadata.num_columns != len(parquet_column_mapping):
raise ValueError(
f"Number of columns in column mapping ({len(parquet_column_mapping)}) is different from the number of columns in pyarrow table ({parquet_metadata.num_columns})"
)

Copy link
Collaborator Author

@sungwy sungwy Jul 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed this check now that we have a comprehensive schema check in the write APIs.

Removal of these checks is necessary in order to allow add_files to add tables with subset schema to the Iceberg Table.

FAILED tests/integration/test_add_files.py::test_add_files_subset_of_schema[1] - ValueError: Number of columns in statistics configuration (4) is different from the number of columns in pyarrow table (3)

Since we are using the field IDs to aggregate into the stats_columns this should be a safe change. We will leave the work of flagging column incompatibilities to the updated _check_schema_compatible function

Copy link
Contributor

@kevinjqliu kevinjqliu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thanks @syun64 for this comprehensive fix!!

@sungwy
Copy link
Collaborator Author

sungwy commented Jul 16, 2024

LGTM! Thanks @syun64 for this comprehensive fix!!

Thank you @kevinjqliu for the help in gathering the requirements and taking the first crack in resolving the issue! Appreciate all your help! 🎉

@HonahX HonahX merged commit 1ed3abd into apache:main Jul 17, 2024
7 checks passed
@HonahX
Copy link
Contributor

HonahX commented Jul 17, 2024

Merged! Thanks for the great fix @syun64. Thanks @Fokko @kevinjqliu for the review!

felixscherz added a commit to felixscherz/iceberg-python that referenced this pull request Jul 17, 2024
commit 1ed3abd
Author: Sung Yun <[email protected]>
Date:   Wed Jul 17 02:04:52 2024 -0400

    Allow writing `pa.Table` that are either a subset of table schema or in arbitrary order, and support type promotion on write (apache#921)

    * merge

    * thanks @HonahX :)

    Co-authored-by: Honah J. <[email protected]>

    * support promote

    * revert promote

    * use a visitor

    * support promotion on write

    * fix

    * Thank you @Fokko !

    Co-authored-by: Fokko Driesprong <[email protected]>

    * revert

    * add-files promotiontest

    * support promote for add_files

    * add tests for uuid

    * add_files subset schema test

    ---------

    Co-authored-by: Honah J. <[email protected]>
    Co-authored-by: Fokko Driesprong <[email protected]>

commit 0f2e19e
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Mon Jul 15 23:25:08 2024 -0700

    Bump zstandard from 0.22.0 to 0.23.0 (apache#934)

    Bumps [zstandard](https://github.com/indygreg/python-zstandard) from 0.22.0 to 0.23.0.
    - [Release notes](https://github.com/indygreg/python-zstandard/releases)
    - [Changelog](https://github.com/indygreg/python-zstandard/blob/main/docs/news.rst)
    - [Commits](indygreg/python-zstandard@0.22.0...0.23.0)

    ---
    updated-dependencies:
    - dependency-name: zstandard
      dependency-type: direct:production
      update-type: version-update:semver-minor
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit ec73d97
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Mon Jul 15 23:24:47 2024 -0700

    Bump griffe from 0.47.0 to 0.48.0 (apache#933)

    Bumps [griffe](https://github.com/mkdocstrings/griffe) from 0.47.0 to 0.48.0.
    - [Release notes](https://github.com/mkdocstrings/griffe/releases)
    - [Changelog](https://github.com/mkdocstrings/griffe/blob/main/CHANGELOG.md)
    - [Commits](mkdocstrings/griffe@0.47.0...0.48.0)

    ---
    updated-dependencies:
    - dependency-name: griffe
      dependency-type: direct:production
      update-type: version-update:semver-minor
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit d05a423
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Mon Jul 15 23:24:16 2024 -0700

    Bump mkdocs-material from 9.5.28 to 9.5.29 (apache#932)

    Bumps [mkdocs-material](https://github.com/squidfunk/mkdocs-material) from 9.5.28 to 9.5.29.
    - [Release notes](https://github.com/squidfunk/mkdocs-material/releases)
    - [Changelog](https://github.com/squidfunk/mkdocs-material/blob/master/CHANGELOG)
    - [Commits](squidfunk/mkdocs-material@9.5.28...9.5.29)

    ---
    updated-dependencies:
    - dependency-name: mkdocs-material
      dependency-type: direct:production
      update-type: version-update:semver-patch
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit e27cd90
Author: Yair Halevi (Spock) <[email protected]>
Date:   Sun Jul 14 22:11:04 2024 +0300

    Allow empty `names` in mapped field of Name Mapping (apache#927)

    * Remove check_at_least_one field validator

    Iceberg spec permits an emtpy list of names in the default name mapping. check_at_least_one is therefore unnecessary.

    * Remove irrelevant test case

    * Fixing pydantic model

    No longer requiring minimum length of names list to be 1.

    * Added test case for empty names in name mapping

    * Fixed formatting error

commit 3f44dfe
Author: Soumya Ghosh <[email protected]>
Date:   Sun Jul 14 00:35:38 2024 +0530

    Lowercase bool values in table properties (apache#924)

commit b11cdb5
Author: Sung Yun <[email protected]>
Date:   Fri Jul 12 16:45:04 2024 -0400

    Deprecate to_requested_schema (apache#918)

    * deprecate to_requested_schema

    * prep for release

commit a3dd531
Author: Honah J <[email protected]>
Date:   Fri Jul 12 13:14:40 2024 -0700

    Glue endpoint config variable, continue apache#530 (apache#920)

    Co-authored-by: Seb Pretzer <[email protected]>

commit 32e8f88
Author: Sung Yun <[email protected]>
Date:   Fri Jul 12 15:26:00 2024 -0400

    support PyArrow timestamptz with Etc/UTC (apache#910)

    Co-authored-by: Fokko Driesprong <[email protected]>

commit f6d56e9
Author: Sung Yun <[email protected]>
Date:   Fri Jul 12 05:31:06 2024 -0400

    fix invalidation logic (apache#911)

commit 6488ad8
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Thu Jul 11 22:56:48 2024 -0700

    Bump coverage from 7.5.4 to 7.6.0 (apache#917)

    Bumps [coverage](https://github.com/nedbat/coveragepy) from 7.5.4 to 7.6.0.
    - [Release notes](https://github.com/nedbat/coveragepy/releases)
    - [Changelog](https://github.com/nedbat/coveragepy/blob/master/CHANGES.rst)
    - [Commits](nedbat/coveragepy@7.5.4...7.6.0)

    ---
    updated-dependencies:
    - dependency-name: coverage
      dependency-type: direct:development
      update-type: version-update:semver-minor
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit dceedfa
Author: Sung Yun <[email protected]>
Date:   Thu Jul 11 20:32:14 2024 -0400

    Check if schema is compatible in `add_files` API (apache#907)

    Co-authored-by: Fokko Driesprong <[email protected]>

commit aceed2a
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Thu Jul 11 15:52:06 2024 +0200

    Bump mypy-boto3-glue from 1.34.136 to 1.34.143 (apache#912)

    Bumps [mypy-boto3-glue](https://github.com/youtype/mypy_boto3_builder) from 1.34.136 to 1.34.143.
    - [Release notes](https://github.com/youtype/mypy_boto3_builder/releases)
    - [Commits](https://github.com/youtype/mypy_boto3_builder/commits)

    ---
    updated-dependencies:
    - dependency-name: mypy-boto3-glue
      dependency-type: direct:production
      update-type: version-update:semver-patch
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit 1b9b884
Author: Fokko Driesprong <[email protected]>
Date:   Thu Jul 11 12:45:20 2024 +0200

    PyArrow: Don't enforce the schema when reading/writing (apache#902)

    * PyArrow: Don't enforce the schema

    PyIceberg struggled with the different type of arrow, such as
    the `string` and `large_string`. They represent the same, but are
    different under the hood.

    My take is that we should hide these kind of details from the user
    as much as possible. Now we went down the road of passing in the
    Iceberg schema into Arrow, but when doing this, Iceberg has to
    decide if it is a large or non-large type.

    This PR removes passing down the schema in order to let Arrow decide
    unless:

     - The type should be evolved
     - In case of re-ordering, we reorder the original types

    * WIP

    * Reuse Table schema

    * Make linter happy

    * Squash some bugs

    * Thanks Sung!

    Co-authored-by: Sung Yun <[email protected]>

    * Moar code moar bugs

    * Remove the variables wrt file sizes

    * Linting

    * Go with large ones for now

    * Missed one there!

    ---------

    Co-authored-by: Sung Yun <[email protected]>

commit 8f47dfd
Author: Soumya Ghosh <[email protected]>
Date:   Thu Jul 11 11:52:55 2024 +0530

    Move determine_partitions and helper methods to io.pyarrow (apache#906)

commit 5aa451d
Author: Soumya Ghosh <[email protected]>
Date:   Thu Jul 11 07:57:05 2024 +0530

    Rename data_sequence_number to sequence_number in ManifestEntry (apache#900)

commit 77a07c9
Author: Honah J <[email protected]>
Date:   Wed Jul 10 03:56:13 2024 -0700

    Support MergeAppend operations (apache#363)

    * add ListPacker + tests

    * add merge append

    * add merge_append

    * fix snapshot inheritance

    * test manifest file and entries

    * add doc

    * fix lint

    * change test name

    * address review comments

    * rename _MergingSnapshotProducer to _SnapshotProducer

    * fix a serious bug

    * update the doc

    * remove merge_append as public API

    * make default to false

    * add test description

    * fix merge conflict

    * fix snapshot_id issue

commit 66b92ff
Author: Fokko Driesprong <[email protected]>
Date:   Wed Jul 10 10:09:20 2024 +0200

    GCS: Fix incorrect token description (apache#909)

commit c25e080
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Tue Jul 9 20:50:29 2024 -0700

    Bump zipp from 3.17.0 to 3.19.1 (apache#905)

    Bumps [zipp](https://github.com/jaraco/zipp) from 3.17.0 to 3.19.1.
    - [Release notes](https://github.com/jaraco/zipp/releases)
    - [Changelog](https://github.com/jaraco/zipp/blob/main/NEWS.rst)
    - [Commits](jaraco/zipp@v3.17.0...v3.19.1)

    ---
    updated-dependencies:
    - dependency-name: zipp
      dependency-type: indirect
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit 301e336
Author: Sung Yun <[email protected]>
Date:   Tue Jul 9 23:35:11 2024 -0400

    Cast 's', 'ms' and 'ns' PyArrow timestamp to 'us' precision on write (apache#848)

commit 3f574d3
Author: Fokko Driesprong <[email protected]>
Date:   Tue Jul 9 11:36:43 2024 +0200

    Support partial deletes (apache#569)

    * Add option to delete datafiles

    This is done through the Iceberg metadata, resulting
    in efficient deletes if the data is partitioned correctly

    * Pull in main

    * WIP

    * Change DataScan to accept Metadata and io

    For the partial deletes I want to do a scan on in
    memory metadata. Changing this API allows this.

    * fix name-mapping issue

    * WIP

    * WIP

    * Moar tests

    * Oops

    * Cleanup

    * WIP

    * WIP

    * Fix summary generation

    * Last few bits

    * Fix the requirement

    * Make ruff happy

    * Comments, thanks Kevin!

    * Comments

    * Append rather than truncate

    * Fix merge conflicts

    * Make the tests pass

    * Add another test

    * Conflicts

    * Add docs (apache#33)

    * docs

    * docs

    * Add a partitioned overwrite test

    * Fix comment

    * Skip empty manifests

    ---------

    Co-authored-by: HonahX <[email protected]>
    Co-authored-by: Sung Yun <[email protected]>

commit cdc3e54
Author: Fokko Driesprong <[email protected]>
Date:   Tue Jul 9 08:28:27 2024 +0200

    Disallow writing empty Manifest files (apache#876)

    * Disallow writing empty Avro files/blocks

    Raising an exception when doing this might look extreme, but
    there is no real good reason to allow this.

    * Relax the constaints a bit

commit b68e109
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Mon Jul 8 22:16:23 2024 -0700

    Bump fastavro from 1.9.4 to 1.9.5 (apache#904)

    Bumps [fastavro](https://github.com/fastavro/fastavro) from 1.9.4 to 1.9.5.
    - [Release notes](https://github.com/fastavro/fastavro/releases)
    - [Changelog](https://github.com/fastavro/fastavro/blob/master/ChangeLog)
    - [Commits](fastavro/fastavro@1.9.4...1.9.5)

    ---
    updated-dependencies:
    - dependency-name: fastavro
      dependency-type: direct:development
      update-type: version-update:semver-patch
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit 90547bb
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Mon Jul 8 22:15:39 2024 -0700

    Bump moto from 5.0.10 to 5.0.11 (apache#903)

    Bumps [moto](https://github.com/getmoto/moto) from 5.0.10 to 5.0.11.
    - [Release notes](https://github.com/getmoto/moto/releases)
    - [Changelog](https://github.com/getmoto/moto/blob/master/CHANGELOG.md)
    - [Commits](getmoto/moto@5.0.10...5.0.11)

    ---
    updated-dependencies:
    - dependency-name: moto
      dependency-type: direct:development
      update-type: version-update:semver-patch
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit 7dff359
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Sun Jul 7 07:50:19 2024 +0200

    Bump tenacity from 8.4.2 to 8.5.0 (apache#898)

commit 4aa469e
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Sat Jul 6 22:30:59 2024 +0200

    Bump certifi from 2024.2.2 to 2024.7.4 (apache#899)

    Bumps [certifi](https://github.com/certifi/python-certifi) from 2024.2.2 to 2024.7.4.
    - [Commits](certifi/python-certifi@2024.02.02...2024.07.04)

    ---
    updated-dependencies:
    - dependency-name: certifi
      dependency-type: indirect
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

commit aa7ad78
Author: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Date:   Sat Jul 6 20:37:51 2024 +0200

    Bump deptry from 0.16.1 to 0.16.2 (apache#897)

    Bumps [deptry](https://github.com/fpgmaas/deptry) from 0.16.1 to 0.16.2.
    - [Release notes](https://github.com/fpgmaas/deptry/releases)
    - [Changelog](https://github.com/fpgmaas/deptry/blob/main/CHANGELOG.md)
    - [Commits](fpgmaas/deptry@0.16.1...0.16.2)

    ---
    updated-dependencies:
    - dependency-name: deptry
      dependency-type: direct:development
      update-type: version-update:semver-patch
    ...

    Signed-off-by: dependabot[bot] <[email protected]>
    Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
@pdpark
Copy link

pdpark commented Jul 17, 2024

@pdpark does this solution resolve your usecase?

#829 (comment)

I installed pyiceberg from main and it's working - thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants