From 361d0661fd85a39a646037a0cf609c3c083e1fe0 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 8 Jul 2024 15:40:46 +0200 Subject: [PATCH 01/11] PyArrow: Don't enforce the schema PyIceberg struggled with the different type of arrow, such as the `string` and `large_string`. They represent the same, but are different under the hood. My take is that we should hide these kind of details from the user as much as possible. Now we went down the road of passing in the Iceberg schema into Arrow, but when doing this, Iceberg has to decide if it is a large or non-large type. This PR removes passing down the schema in order to let Arrow decide unless: - The type should be evolved - In case of re-ordering, we reorder the original types --- pyiceberg/io/pyarrow.py | 27 +++------- tests/integration/test_add_files.py | 81 ++++++++++++++++++++++++++--- tests/io/test_pyarrow.py | 14 ++--- 3 files changed, 86 insertions(+), 36 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index e6490ae156..a90733e392 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1025,9 +1025,6 @@ def _task_to_record_batches( fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, - # We always use large types in memory as it uses larger offsets - # That can chunk more row values into the buffers - schema=_pyarrow_schema_ensure_large_types(physical_schema), # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first filter=pyarrow_filter if not positional_deletes else None, @@ -1066,7 +1063,7 @@ def _task_to_table( batches = _task_to_record_batches( fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping ) - return pa.Table.from_batches(batches, schema=schema_to_pyarrow(projected_schema, include_field_ids=False)) + return pa.Table.from_batches(batches) def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]: @@ -1170,7 +1167,7 @@ def project_table( if len(tables) < 1: return pa.Table.from_batches([], schema=schema_to_pyarrow(projected_schema, include_field_ids=False)) - result = pa.concat_tables(tables) + result = pa.concat_tables(tables, promote_options="permissive") if limit is not None: return result.slice(0, limit) @@ -1249,15 +1246,9 @@ def project_batches( def to_requested_schema(requested_schema: Schema, file_schema: Schema, batch: pa.RecordBatch) -> pa.RecordBatch: + # We could re-use some of these visitors struct_array = visit_with_partner(requested_schema, batch, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema)) - - arrays = [] - fields = [] - for pos, field in enumerate(requested_schema.fields): - array = struct_array.field(pos) - arrays.append(array) - fields.append(pa.field(field.name, array.type, field.optional)) - return pa.RecordBatch.from_arrays(arrays, schema=pa.schema(fields)) + return pa.RecordBatch.from_struct_array(struct_array) class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): @@ -1268,14 +1259,8 @@ def __init__(self, file_schema: Schema): def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self.file_schema.find_field(field.field_id) - if field.field_type.is_primitive: - if field.field_type != file_field.field_type: - return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=False)) - elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=False)) != values.type: - # if file_field and field_type (e.g. String) are the same - # but the pyarrow type of the array is different from the expected type - # (e.g. string vs larger_string), we want to cast the array to the larger type - return values.cast(target_type) + if field.field_type.is_primitive and field.field_type != file_field.field_type: + return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=False)) return values def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field: diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 84729fcca4..5761ed1380 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -17,7 +17,7 @@ # pylint:disable=redefined-outer-name from datetime import date -from typing import Iterator, Optional +from typing import Iterator import pyarrow as pa import pyarrow.parquet as pq @@ -26,7 +26,8 @@ from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError -from pyiceberg.partitioning import PartitionField, PartitionSpec +from pyiceberg.io import FileIO +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table from pyiceberg.transforms import BucketTransform, IdentityTransform, MonthTransform @@ -104,23 +105,32 @@ ) +def _write_parquet(io: FileIO, file_path: str, arrow_schema: pa.Schema, arrow_table: pa.Table) -> None: + fo = io.new_output(file_path) + with fo.create(overwrite=True) as fos: + with pq.ParquetWriter(fos, schema=arrow_schema) as writer: + writer.write_table(arrow_table) + + def _create_table( - session_catalog: Catalog, identifier: str, format_version: int, partition_spec: Optional[PartitionSpec] = None + session_catalog: Catalog, + identifier: str, + format_version: int, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + schema: Schema = TABLE_SCHEMA, ) -> Table: try: session_catalog.drop_table(identifier=identifier) except NoSuchTableError: pass - tbl = session_catalog.create_table( + return session_catalog.create_table( identifier=identifier, - schema=TABLE_SCHEMA, + schema=schema, properties={"format-version": str(format_version)}, - partition_spec=partition_spec if partition_spec else PartitionSpec(), + partition_spec=partition_spec, ) - return tbl - @pytest.fixture(name="format_version", params=[pytest.param(1, id="format_version=1"), pytest.param(2, id="format_version=2")]) def format_version_fixure(request: pytest.FixtureRequest) -> Iterator[int]: @@ -448,3 +458,58 @@ def test_add_files_snapshot_properties(spark: SparkSession, session_catalog: Cat assert "snapshot_prop_a" in summary assert summary["snapshot_prop_a"] == "test_prop_a" + + +@pytest.mark.integration +def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_catalog: Catalog, format_version: int) -> None: + identifier = f"default.unpartitioned_with_large_types{format_version}" + + iceberg_schema = Schema(NestedField(1, "foo", StringType(), required=True)) + arrow_schema = pa.schema([ + pa.field("foo", pa.string(), nullable=False), + ]) + arrow_schema_large = pa.schema([ + pa.field("foo", pa.large_string(), nullable=False), + ]) + + tbl = _create_table(session_catalog, identifier, format_version, schema=iceberg_schema) + + file_path = f"s3://warehouse/default/unpartitioned_with_large_types/v{format_version}/test-0.parquet" + _write_parquet( + tbl.io, + file_path, + arrow_schema, + pa.Table.from_pylist( + [ + { + "foo": "normal", + } + ], + schema=arrow_schema, + ), + ) + + tbl.add_files([file_path]) + + table_schema = tbl.scan().to_arrow().schema + assert table_schema == arrow_schema + + file_path_large = f"s3://warehouse/default/unpartitioned_with_large_types/v{format_version}/test-1.parquet" + _write_parquet( + tbl.io, + file_path_large, + arrow_schema_large, + pa.Table.from_pylist( + [ + { + "foo": "normal", + } + ], + schema=arrow_schema_large, + ), + ) + + tbl.add_files([file_path_large]) + + table_schema = tbl.scan().to_arrow().schema + assert table_schema == arrow_schema_large diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index ecb946a98b..bfc1158928 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1002,10 +1002,10 @@ def test_read_map(schema_map: Schema, file_map: str) -> None: assert ( repr(result_table.schema) - == """properties: map - child 0, entries: struct not null - child 0, key: large_string not null - child 1, value: large_string not null""" + == """properties: map + child 0, entries: struct not null + child 0, key: string not null + child 1, value: string not null""" ) @@ -1279,9 +1279,9 @@ def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_s assert actual.as_py() == expected assert ( repr(result_table.schema) - == """locations: map> - child 0, entries: struct not null> not null - child 0, key: large_string not null + == """locations: map> + child 0, entries: struct not null> not null + child 0, key: string not null child 1, value: struct not null child 0, latitude: double not null child 1, longitude: double not null From a0c0c573f777f55cb115b56db21e2a7885db8f82 Mon Sep 17 00:00:00 2001 From: Fokko Date: Mon, 8 Jul 2024 20:20:49 +0200 Subject: [PATCH 02/11] WIP --- pyiceberg/io/pyarrow.py | 14 ++++++++++---- pyiceberg/table/__init__.py | 5 +++-- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index a90733e392..ce7955d2f4 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1059,11 +1059,17 @@ def _task_to_table( positional_deletes: Optional[List[ChunkedArray]], case_sensitive: bool, name_mapping: Optional[NameMapping] = None, -) -> pa.Table: - batches = _task_to_record_batches( - fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping +) -> Optional[pa.Table]: + batches = list( + _task_to_record_batches( + fs, task, bound_row_filter, projected_schema, projected_field_ids, positional_deletes, case_sensitive, name_mapping + ) ) - return pa.Table.from_batches(batches) + + if len(batches) > 0: + return pa.Table.from_batches(batches) + else: + return None def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dict[str, List[ChunkedArray]]: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 2eec4d3036..3de03b676b 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1884,8 +1884,9 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: from pyiceberg.io.pyarrow import project_batches, schema_to_pyarrow + target_schema = schema_to_pyarrow(self.projection()) return pa.RecordBatchReader.from_batches( - schema_to_pyarrow(self.projection()), + target_schema, project_batches( self.plan_files(), self.table_metadata, @@ -1895,7 +1896,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: case_sensitive=self.case_sensitive, limit=self.limit, ), - ) + ).cast(target_schema=target_schema) def to_pandas(self, **kwargs: Any) -> pd.DataFrame: return self.to_arrow().to_pandas(**kwargs) From 4ca513b732b7ecc79d2369d18d0d2028ae0b8e84 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 10 Jul 2024 16:48:52 +0200 Subject: [PATCH 03/11] Reuse Table schema --- pyiceberg/io/pyarrow.py | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index ce7955d2f4..aeb1135459 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1251,30 +1251,44 @@ def project_batches( total_row_count += len(batch) -def to_requested_schema(requested_schema: Schema, file_schema: Schema, batch: pa.RecordBatch) -> pa.RecordBatch: +def to_requested_schema( + requested_schema: Schema, file_schema: Schema, batch: pa.RecordBatch, include_field_ids: bool = False +) -> pa.RecordBatch: # We could re-use some of these visitors - struct_array = visit_with_partner(requested_schema, batch, ArrowProjectionVisitor(file_schema), ArrowAccessor(file_schema)) + struct_array = visit_with_partner( + requested_schema, batch, ArrowProjectionVisitor(file_schema, include_field_ids), ArrowAccessor(file_schema) + ) return pa.RecordBatch.from_struct_array(struct_array) class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): file_schema: Schema - def __init__(self, file_schema: Schema): + def __init__(self, file_schema: Schema, include_field_ids: bool = False) -> None: self.file_schema = file_schema + self._include_field_ids = include_field_ids def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: file_field = self.file_schema.find_field(field.field_id) if field.field_type.is_primitive and field.field_type != file_field.field_type: - return values.cast(schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=False)) + return values.cast( + schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids) + ) + return values def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field: + metadata = {} + if field.doc: + metadata[PYARROW_FIELD_DOC_KEY] = field.doc + if self._include_field_ids: + metadata[PYARROW_PARQUET_FIELD_ID_KEY] = str(field.field_id) + return pa.field( name=field.name, type=arrow_type, nullable=field.optional, - metadata={DOC: field.doc} if field.doc is not None else None, + metadata=metadata, ) def schema(self, schema: Schema, schema_partner: Optional[pa.Array], struct_result: Optional[pa.Array]) -> Optional[pa.Array]: @@ -1904,14 +1918,14 @@ def write_parquet(task: WriteTask) -> DataFile: file_schema = table_schema batches = [ - to_requested_schema(requested_schema=file_schema, file_schema=table_schema, batch=batch) + to_requested_schema(requested_schema=file_schema, file_schema=table_schema, batch=batch, include_field_ids=True) for batch in task.record_batches ] arrow_table = pa.Table.from_batches(batches) file_path = f'{table_metadata.location}/data/{task.generate_data_file_path("parquet")}' fo = io.new_output(file_path) with fo.create(overwrite=True) as fos: - with pq.ParquetWriter(fos, schema=file_schema.as_arrow(), **parquet_writer_kwargs) as writer: + with pq.ParquetWriter(fos, schema=arrow_table.schema, **parquet_writer_kwargs) as writer: writer.write(arrow_table, row_group_size=row_group_size) statistics = data_file_statistics_from_parquet_metadata( parquet_metadata=writer.writer.metadata, From ee293a1692666b5c41112e81f520e5882bde937a Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 10 Jul 2024 16:55:54 +0200 Subject: [PATCH 04/11] Make linter happy --- pyiceberg/io/pyarrow.py | 16 ++++++++++++---- tests/integration/test_add_files.py | 1 + 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index ebdcecceaf..208e5308ce 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1274,19 +1274,27 @@ def project_batches( def to_requested_schema( - requested_schema: Schema, file_schema: Schema, batch: pa.RecordBatch, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False + requested_schema: Schema, + file_schema: Schema, + batch: pa.RecordBatch, + downcast_ns_timestamp_to_us: bool = False, + include_field_ids: bool = False, ) -> pa.RecordBatch: # We could re-use some of these visitors struct_array = visit_with_partner( - requested_schema, batch, ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids), ArrowAccessor(file_schema) + requested_schema, + batch, + ArrowProjectionVisitor(file_schema, downcast_ns_timestamp_to_us, include_field_ids), + ArrowAccessor(file_schema), ) return pa.RecordBatch.from_struct_array(struct_array) + class ArrowProjectionVisitor(SchemaWithPartnerVisitor[pa.Array, Optional[pa.Array]]): file_schema: Schema _include_field_ids: bool - def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False) -> None: + def __init__(self, file_schema: Schema, downcast_ns_timestamp_to_us: bool = False, include_field_ids: bool = False) -> None: self.file_schema = file_schema self._include_field_ids = include_field_ids self.downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us @@ -1967,7 +1975,7 @@ def write_parquet(task: WriteTask) -> DataFile: file_schema=table_schema, batch=batch, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us, - include_field_ids=True + include_field_ids=True, ) for batch in task.record_batches ] diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 42193b6c08..6fde87e43e 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -517,6 +517,7 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca table_schema = tbl.scan().to_arrow().schema assert table_schema == arrow_schema_large + def test_timestamp_tz_ns_downcast_on_read(session_catalog: Catalog, format_version: int, mocker: MockerFixture) -> None: nanoseconds_schema_iceberg = Schema(NestedField(1, "quux", TimestamptzType())) From d2a0b360e6386201ed861057d3172fcf6855a7b9 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 10 Jul 2024 17:08:29 +0200 Subject: [PATCH 05/11] Squash some bugs --- pyiceberg/io/pyarrow.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 208e5308ce..5d60376957 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1307,19 +1307,15 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: return values.cast( schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids) ) - elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=False)) != values.type: - # if file_field and field_type (e.g. String) are the same - # but the pyarrow type of the array is different from the expected type - # (e.g. string vs larger_string), we want to cast the array to the larger type - safe = True + elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=True)) != values.type: + # Downcasting of nanoseconds to microseconds if ( pa.types.is_timestamp(target_type) and target_type.unit == "us" and pa.types.is_timestamp(values.type) and values.type.unit == "ns" ): - safe = False - return values.cast(target_type, safe=safe) + return values.cast(target_type, safe=False) return values def _construct_field(self, field: NestedField, arrow_type: pa.DataType) -> pa.Field: From 3e86782e6fcac8c0f0f2c19dbca29fa696c16d62 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Wed, 10 Jul 2024 18:29:31 +0200 Subject: [PATCH 06/11] Thanks Sung! Co-authored-by: Sung Yun <107272191+syun64@users.noreply.github.com> --- pyiceberg/io/pyarrow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 5d60376957..cdedd3addc 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1307,7 +1307,7 @@ def _cast_if_needed(self, field: NestedField, values: pa.Array) -> pa.Array: return values.cast( schema_to_pyarrow(promote(file_field.field_type, field.field_type), include_field_ids=self._include_field_ids) ) - elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=True)) != values.type: + elif (target_type := schema_to_pyarrow(field.field_type, include_field_ids=self._include_field_ids)) != values.type: # Downcasting of nanoseconds to microseconds if ( pa.types.is_timestamp(target_type) From 8d1ed75b3a7c4404ff2846f8682c395c3ed9aedb Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 10 Jul 2024 18:54:29 +0200 Subject: [PATCH 07/11] Moar code moar bugs --- tests/integration/test_deletes.py | 2 +- .../test_writes/test_partitioned_writes.py | 8 +++--- tests/integration/test_writes/test_writes.py | 26 +++++++++---------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index ad3adedeca..2e62cb1b5a 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -291,7 +291,7 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio assert snapshots[2].summary == Summary( Operation.OVERWRITE, **{ - "added-files-size": "1145", + "added-files-size": "1125", "added-data-files": "1", "added-records": "2", "changed-partition-count": "1", diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 59bb76933e..0a21479d59 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -255,12 +255,12 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro assert summaries[0] == { "changed-partition-count": "3", "added-data-files": "3", - "added-files-size": "15029", + "added-files-size": "14969", "added-records": "3", "total-data-files": "3", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "15029", + "total-files-size": "14969", "total-position-deletes": "0", "total-records": "3", } @@ -268,12 +268,12 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro assert summaries[1] == { "changed-partition-count": "3", "added-data-files": "3", - "added-files-size": "15029", + "added-files-size": "14969", "added-records": "3", "total-data-files": "6", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "30058", + "total-files-size": "29938", "total-position-deletes": "0", "total-records": "6", } diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 2542fbdb38..585afd2e89 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -197,12 +197,12 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi # Append assert summaries[0] == { "added-data-files": "1", - "added-files-size": "5459", + "added-files-size": "5439", "added-records": "3", "total-data-files": "1", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "5459", + "total-files-size": "5439", "total-position-deletes": "0", "total-records": "3", } @@ -210,12 +210,12 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi # Append assert summaries[1] == { "added-data-files": "1", - "added-files-size": "5459", + "added-files-size": "5439", "added-records": "3", "total-data-files": "2", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "10918", + "total-files-size": "10878", "total-position-deletes": "0", "total-records": "6", } @@ -224,7 +224,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi assert summaries[2] == { "deleted-data-files": "2", "deleted-records": "6", - "removed-files-size": "10918", + "removed-files-size": "10878", "total-data-files": "0", "total-delete-files": "0", "total-equality-deletes": "0", @@ -236,12 +236,12 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi # Overwrite assert summaries[3] == { "added-data-files": "1", - "added-files-size": "5459", + "added-files-size": "5439", "added-records": "3", "total-data-files": "1", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "5459", + "total-files-size": "5439", "total-position-deletes": "0", "total-records": "3", } @@ -587,12 +587,12 @@ def test_summaries_with_only_nulls( assert summaries[1] == { "added-data-files": "1", - "added-files-size": "4239", + "added-files-size": "4219", "added-records": "2", "total-data-files": "1", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "4239", + "total-files-size": "4219", "total-position-deletes": "0", "total-records": "2", } @@ -600,7 +600,7 @@ def test_summaries_with_only_nulls( assert summaries[2] == { "deleted-data-files": "1", "deleted-records": "2", - "removed-files-size": "4239", + "removed-files-size": "4219", "total-data-files": "0", "total-delete-files": "0", "total-equality-deletes": "0", @@ -846,20 +846,20 @@ def test_inspect_snapshots( # Append assert df["summary"][0].as_py() == [ - ("added-files-size", "5459"), + ("added-files-size", "5439"), ("added-data-files", "1"), ("added-records", "3"), ("total-data-files", "1"), ("total-delete-files", "0"), ("total-records", "3"), - ("total-files-size", "5459"), + ("total-files-size", "5439"), ("total-position-deletes", "0"), ("total-equality-deletes", "0"), ] # Delete assert df["summary"][1].as_py() == [ - ("removed-files-size", "5459"), + ("removed-files-size", "5439"), ("deleted-data-files", "1"), ("deleted-records", "3"), ("total-data-files", "0"), From ab0db07087c7739fe5f72d3b8b81a184983b5319 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 10 Jul 2024 21:19:06 +0200 Subject: [PATCH 08/11] Remove the variables wrt file sizes --- tests/integration/test_inspect_table.py | 9 +++-- .../test_writes/test_partitioned_writes.py | 12 ++++--- tests/integration/test_writes/test_writes.py | 35 ++++++++++++------- 3 files changed, 36 insertions(+), 20 deletions(-) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index d8a83e0df7..7ff1444d10 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -110,22 +110,25 @@ def test_inspect_snapshots( for manifest_list in df["manifest_list"]: assert manifest_list.as_py().startswith("s3://") + file_size = int(next(value for key, value in df["summary"][0].as_py() if key == 'added-files-size')) + assert file_size > 0 + # Append assert df["summary"][0].as_py() == [ - ("added-files-size", "5459"), + ("added-files-size", str(file_size)), ("added-data-files", "1"), ("added-records", "3"), ("total-data-files", "1"), ("total-delete-files", "0"), ("total-records", "3"), - ("total-files-size", "5459"), + ("total-files-size", str(file_size)), ("total-position-deletes", "0"), ("total-equality-deletes", "0"), ] # Delete assert df["summary"][1].as_py() == [ - ("removed-files-size", "5459"), + ("removed-files-size", str(file_size)), ("deleted-data-files", "1"), ("deleted-records", "3"), ("total-data-files", "0"), diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 0a21479d59..3d7056b691 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -252,15 +252,19 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro assert operations == ["append", "append"] summaries = [row.summary for row in rows] + + file_size = int(summaries[0]['added-files-size']) + assert file_size > 0 + assert summaries[0] == { "changed-partition-count": "3", "added-data-files": "3", - "added-files-size": "14969", + "added-files-size": str(file_size), "added-records": "3", "total-data-files": "3", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "14969", + "total-files-size": str(file_size), "total-position-deletes": "0", "total-records": "3", } @@ -268,12 +272,12 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro assert summaries[1] == { "changed-partition-count": "3", "added-data-files": "3", - "added-files-size": "14969", + "added-files-size": str(file_size), "added-records": "3", "total-data-files": "6", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "29938", + "total-files-size": str(file_size*2), "total-position-deletes": "0", "total-records": "6", } diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index 585afd2e89..dfde302c5d 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -194,15 +194,18 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi summaries = [row.summary for row in rows] + file_size = int(summaries[0]['added-files-size']) + assert file_size > 0 + # Append assert summaries[0] == { "added-data-files": "1", - "added-files-size": "5439", + "added-files-size": str(file_size), "added-records": "3", "total-data-files": "1", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "5439", + "total-files-size": str(file_size), "total-position-deletes": "0", "total-records": "3", } @@ -210,12 +213,12 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi # Append assert summaries[1] == { "added-data-files": "1", - "added-files-size": "5439", + "added-files-size": str(file_size), "added-records": "3", "total-data-files": "2", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "10878", + "total-files-size": str(file_size * 2), "total-position-deletes": "0", "total-records": "6", } @@ -224,7 +227,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi assert summaries[2] == { "deleted-data-files": "2", "deleted-records": "6", - "removed-files-size": "10878", + "removed-files-size": str(file_size * 2), "total-data-files": "0", "total-delete-files": "0", "total-equality-deletes": "0", @@ -236,12 +239,12 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi # Overwrite assert summaries[3] == { "added-data-files": "1", - "added-files-size": "5439", + "added-files-size": str(file_size), "added-records": "3", "total-data-files": "1", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "5439", + "total-files-size": str(file_size), "total-position-deletes": "0", "total-records": "3", } @@ -576,6 +579,9 @@ def test_summaries_with_only_nulls( summaries = [row.summary for row in rows] + file_size = int(summaries[1]['added-files-size']) + assert file_size > 0 + assert summaries[0] == { "total-data-files": "0", "total-delete-files": "0", @@ -587,12 +593,12 @@ def test_summaries_with_only_nulls( assert summaries[1] == { "added-data-files": "1", - "added-files-size": "4219", + "added-files-size": str(file_size), "added-records": "2", "total-data-files": "1", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": "4219", + "total-files-size": str(file_size), "total-position-deletes": "0", "total-records": "2", } @@ -600,7 +606,7 @@ def test_summaries_with_only_nulls( assert summaries[2] == { "deleted-data-files": "1", "deleted-records": "2", - "removed-files-size": "4219", + "removed-files-size": str(file_size), "total-data-files": "0", "total-delete-files": "0", "total-equality-deletes": "0", @@ -844,22 +850,25 @@ def test_inspect_snapshots( for manifest_list in df["manifest_list"]: assert manifest_list.as_py().startswith("s3://") + file_size = int(next(value for key, value in df["summary"][0].as_py() if key == 'added-files-size')) + assert file_size > 0 + # Append assert df["summary"][0].as_py() == [ - ("added-files-size", "5439"), + ("added-files-size", str(file_size)), ("added-data-files", "1"), ("added-records", "3"), ("total-data-files", "1"), ("total-delete-files", "0"), ("total-records", "3"), - ("total-files-size", "5439"), + ("total-files-size", str(file_size)), ("total-position-deletes", "0"), ("total-equality-deletes", "0"), ] # Delete assert df["summary"][1].as_py() == [ - ("removed-files-size", "5439"), + ("removed-files-size", str(file_size)), ("deleted-data-files", "1"), ("deleted-records", "3"), ("total-data-files", "0"), From c4f044abbdf4ef891b5d1465e6af16c7b5509ee3 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 10 Jul 2024 21:21:16 +0200 Subject: [PATCH 09/11] Linting --- tests/integration/test_inspect_table.py | 2 +- tests/integration/test_writes/test_partitioned_writes.py | 4 ++-- tests/integration/test_writes/test_writes.py | 6 +++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_inspect_table.py b/tests/integration/test_inspect_table.py index 7ff1444d10..9415d7146d 100644 --- a/tests/integration/test_inspect_table.py +++ b/tests/integration/test_inspect_table.py @@ -110,7 +110,7 @@ def test_inspect_snapshots( for manifest_list in df["manifest_list"]: assert manifest_list.as_py().startswith("s3://") - file_size = int(next(value for key, value in df["summary"][0].as_py() if key == 'added-files-size')) + file_size = int(next(value for key, value in df["summary"][0].as_py() if key == "added-files-size")) assert file_size > 0 # Append diff --git a/tests/integration/test_writes/test_partitioned_writes.py b/tests/integration/test_writes/test_partitioned_writes.py index 3d7056b691..12da9c928b 100644 --- a/tests/integration/test_writes/test_partitioned_writes.py +++ b/tests/integration/test_writes/test_partitioned_writes.py @@ -253,7 +253,7 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro summaries = [row.summary for row in rows] - file_size = int(summaries[0]['added-files-size']) + file_size = int(summaries[0]["added-files-size"]) assert file_size > 0 assert summaries[0] == { @@ -277,7 +277,7 @@ def test_summaries_with_null(spark: SparkSession, session_catalog: Catalog, arro "total-data-files": "6", "total-delete-files": "0", "total-equality-deletes": "0", - "total-files-size": str(file_size*2), + "total-files-size": str(file_size * 2), "total-position-deletes": "0", "total-records": "6", } diff --git a/tests/integration/test_writes/test_writes.py b/tests/integration/test_writes/test_writes.py index dfde302c5d..af626718f7 100644 --- a/tests/integration/test_writes/test_writes.py +++ b/tests/integration/test_writes/test_writes.py @@ -194,7 +194,7 @@ def test_summaries(spark: SparkSession, session_catalog: Catalog, arrow_table_wi summaries = [row.summary for row in rows] - file_size = int(summaries[0]['added-files-size']) + file_size = int(summaries[0]["added-files-size"]) assert file_size > 0 # Append @@ -579,7 +579,7 @@ def test_summaries_with_only_nulls( summaries = [row.summary for row in rows] - file_size = int(summaries[1]['added-files-size']) + file_size = int(summaries[1]["added-files-size"]) assert file_size > 0 assert summaries[0] == { @@ -850,7 +850,7 @@ def test_inspect_snapshots( for manifest_list in df["manifest_list"]: assert manifest_list.as_py().startswith("s3://") - file_size = int(next(value for key, value in df["summary"][0].as_py() if key == 'added-files-size')) + file_size = int(next(value for key, value in df["summary"][0].as_py() if key == "added-files-size")) assert file_size > 0 # Append From a760a52ec9af96ddcce41e296e975ef690e4e853 Mon Sep 17 00:00:00 2001 From: Fokko Date: Wed, 10 Jul 2024 21:58:22 +0200 Subject: [PATCH 10/11] Go with large ones for now --- pyiceberg/io/pyarrow.py | 5 +++++ pyiceberg/table/__init__.py | 2 +- tests/integration/test_add_files.py | 2 +- tests/io/test_pyarrow.py | 14 +++++++------- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index cdedd3addc..17ab844066 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -1047,6 +1047,11 @@ def _task_to_record_batches( fragment_scanner = ds.Scanner.from_fragment( fragment=fragment, + # With PyArrow 16.0.0 there is an issue with casting record-batches: + # https://github.com/apache/arrow/issues/41884 + # https://github.com/apache/arrow/issues/43183 + # Would be good to remove this later on + schema=_pyarrow_schema_ensure_large_types(physical_schema), # This will push down the query to Arrow. # But in case there are positional deletes, we have to apply them first filter=pyarrow_filter if not positional_deletes else None, diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 1ab96df1f1..6af5e47c76 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -2066,7 +2066,7 @@ def to_arrow_batch_reader(self) -> pa.RecordBatchReader: case_sensitive=self.case_sensitive, limit=self.limit, ), - ).cast(target_schema=target_schema) + ) def to_pandas(self, **kwargs: Any) -> pd.DataFrame: return self.to_arrow().to_pandas(**kwargs) diff --git a/tests/integration/test_add_files.py b/tests/integration/test_add_files.py index 6fde87e43e..825d17e924 100644 --- a/tests/integration/test_add_files.py +++ b/tests/integration/test_add_files.py @@ -495,7 +495,7 @@ def test_add_files_with_large_and_regular_schema(spark: SparkSession, session_ca tbl.add_files([file_path]) table_schema = tbl.scan().to_arrow().schema - assert table_schema == arrow_schema + assert table_schema == arrow_schema_large file_path_large = f"s3://warehouse/default/unpartitioned_with_large_types/v{format_version}/test-1.parquet" _write_parquet( diff --git a/tests/io/test_pyarrow.py b/tests/io/test_pyarrow.py index bfc1158928..ecb946a98b 100644 --- a/tests/io/test_pyarrow.py +++ b/tests/io/test_pyarrow.py @@ -1002,10 +1002,10 @@ def test_read_map(schema_map: Schema, file_map: str) -> None: assert ( repr(result_table.schema) - == """properties: map - child 0, entries: struct not null - child 0, key: string not null - child 1, value: string not null""" + == """properties: map + child 0, entries: struct not null + child 0, key: large_string not null + child 1, value: large_string not null""" ) @@ -1279,9 +1279,9 @@ def test_projection_maps_of_structs(schema_map_of_structs: Schema, file_map_of_s assert actual.as_py() == expected assert ( repr(result_table.schema) - == """locations: map> - child 0, entries: struct not null> not null - child 0, key: string not null + == """locations: map> + child 0, entries: struct not null> not null + child 0, key: large_string not null child 1, value: struct not null child 0, latitude: double not null child 1, longitude: double not null From 4464bd7b5313029d625ba6abd4ca59fab8ecf001 Mon Sep 17 00:00:00 2001 From: Fokko Date: Thu, 11 Jul 2024 11:41:37 +0200 Subject: [PATCH 11/11] Missed one there! --- tests/integration/test_deletes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_deletes.py b/tests/integration/test_deletes.py index 2e62cb1b5a..d8fb01c447 100644 --- a/tests/integration/test_deletes.py +++ b/tests/integration/test_deletes.py @@ -291,7 +291,7 @@ def test_partitioned_table_positional_deletes_sequence_number(spark: SparkSessio assert snapshots[2].summary == Summary( Operation.OVERWRITE, **{ - "added-files-size": "1125", + "added-files-size": snapshots[2].summary["total-files-size"], "added-data-files": "1", "added-records": "2", "changed-partition-count": "1",