Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: Infer Iceberg schema from the Parquet file #6505

Closed
Fokko opened this issue Dec 30, 2022 · 11 comments · Fixed by #6997
Closed

Python: Infer Iceberg schema from the Parquet file #6505

Fokko opened this issue Dec 30, 2022 · 11 comments · Fixed by #6997

Comments

@Fokko
Copy link
Contributor

Fokko commented Dec 30, 2022

Feature Request / Improvement

In PyIceberg we rely on fetching the schema from the Parquet metadata. If this is not available (because the parquet file is written by something else than an Iceberg writer), we want to go over the actual schema and construct the Iceberg schema from it.

Query engine

None

@Fokko Fokko changed the title Infer Iceberg schema from the Parquet file Python: Infer Iceberg schema from the Parquet file Dec 30, 2022
@Fokko Fokko added the python label Dec 30, 2022
@JonasJ-ap
Copy link
Contributor

I'm interested in solving this issue. Would you mind assigning it to me? Thank you so much!

@Fokko
Copy link
Contributor Author

Fokko commented Feb 8, 2023

@JonasJ-ap Anything I can help with? If you don't have time, maybe @amogh-jahagirdar is interested in picking this up. I'd love to get this in 0.4.0

@JonasJ-ap
Copy link
Contributor

Sorry that I haven't got enough time to work this out. @amogh-jahagirdar please feel free to pick this up if you are interested in.

@kiran94
Copy link

kiran94 commented Feb 13, 2023

Hello, I wanted to report that I've also observed this issue. Adding some details about how I got into this state in case it's helpful.

I've created an iceberg table via AWS glue:

partition_column = 'id'
partition_bucket_size = 4
udf_name = 'iceberg_bucket_long_' + str(partition_bucket_size)

spark.sparkContext._jvm.org.apache.iceberg.spark.IcebergSpark.registerBucketUDF(
        spark._jsparkSession, udf_name, spark.sparkContext._jvm.org.apache.spark.sql.types.DataTypes.LongType, partition_bucket_size)

df = df.sortWithinPartitions(F.expr(f"{udf_name}({partition_column})"))

df = df.writeTo('my_iceberg_table') \
        .partitionedBy(F.bucket(partition_bucket_size, partition_column))
        .createOrReplace()

At this point I could read the table fine via Athena and pyiceberg. However it led to many small files being which I believe was leading to poor query performance so I decided to follow https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-data-optimization.html

OPTIMIZE my_iceberg_table REWRITE DATA USING BIN_PACK

After this had completed successfully, I was able to still query the table from Athena but no longer from pyiceberg:

ValueError: Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505

Let me know if there are any more details I can provide

@Guillem96
Copy link
Contributor

Just for further information I'll add here a code snippet that leads to the same error message

from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import EqualTo

# pyiceberg.yaml
# catalog:
#   default:
#     type: glue
#     py-io-impl: pyiceberg.io.pyarrow.PyArrowFileIO

catalog = load_catalog(
    "default",
    warehouse="...",
)

table = catalog.load_table(("...", "..."))

df = (
    table.scan()
    .filter(EqualTo("uuid", "..."))
    .select("rt", "cs1", "in")
    .to_arrow()
)

print(df)

@JonasJ-ap
Copy link
Contributor

I created a draft PR #6997 containing a raw visitor to support inferring iceberg schema and verified that the new feature could solve the problem described above and in #6647. @amogh-jahagirdar Please let me know if you are working on this or still interested in picking this up. I am willing to re-pick this issue if you do not have enough time.

@bigluck
Copy link

bigluck commented Mar 3, 2023

Ciao @Fokko, maybe I'm facing a similar issue, but I'm a bit confused.
I'm using Glue and I'm querying an iceberg table created with Dremio.

The table in question derives from the open dataset of NY taxis.

import os
from pyiceberg.catalog import load_glue

catalog = load_glue(name='biglake', conf={})
table = catalog.load_table('biglake.taxi_dremio_by_month')

print(table.identifier)
print(table.metadata)
print(table.metadata_location)

con = table.scan().to_duckdb(table_name='taxi')
print(con.execute('SELECT COUNT(*) FROM taxi').fetchall())

This is the output:

('biglake', 'taxi_dremio_by_month')

location='s3://my-s3-bucket/biglake/taxi_dremio_by_month'
table_uuid=UUID('80a4d129-4919-4b2a-8784-ec845a853130')
last_updated_ms=1677495898549
last_column_id=24
schemas=[
	Schema(
		NestedField(field_id=1, name='hvfhs_license_num', field_type=StringType(), required=False),
		NestedField(field_id=2, name='dispatching_base_num', field_type=StringType(), required=False),
		NestedField(field_id=3, name='originating_base_num', field_type=StringType(), required=False),
		NestedField(field_id=4, name='request_datetime', field_type=TimestamptzType(), required=False),
		NestedField(field_id=5, name='on_scene_datetime', field_type=TimestamptzType(), required=False),
		NestedField(field_id=6, name='pickup_datetime', field_type=TimestamptzType(), required=False),
		NestedField(field_id=7, name='dropoff_datetime', field_type=TimestamptzType(), required=False),
		NestedField(field_id=8, name='PULocationID', field_type=LongType(), required=False),
		NestedField(field_id=9, name='DOLocationID', field_type=LongType(), required=False),
		NestedField(field_id=10, name='trip_miles', field_type=DoubleType(), required=False),
		NestedField(field_id=11, name='trip_time', field_type=LongType(), required=False),
		NestedField(field_id=12, name='base_passenger_fare', field_type=DoubleType(), required=False),
		NestedField(field_id=13, name='tolls', field_type=DoubleType(), required=False),
		NestedField(field_id=14, name='bcf', field_type=DoubleType(), required=False),
		NestedField(field_id=15, name='sales_tax', field_type=DoubleType(), required=False),
		NestedField(field_id=16, name='congestion_surcharge', field_type=DoubleType(), required=False),
		NestedField(field_id=17, name='airport_fee', field_type=IntegerType(), required=False),
		NestedField(field_id=18, name='tips', field_type=DoubleType(), required=False),
		NestedField(field_id=19, name='driver_pay', field_type=DoubleType(), required=False),
		NestedField(field_id=20, name='shared_request_flag', field_type=StringType(), required=False),
		NestedField(field_id=21, name='shared_match_flag', field_type=StringType(), required=False),
		NestedField(field_id=22, name='access_a_ride_flag', field_type=StringType(), required=False),
		NestedField(field_id=23, name='wav_request_flag', field_type=StringType(), required=False),
		NestedField(field_id=24, name='wav_match_flag', field_type=StringType(), required=False),
		schema_id=0,
		identifier_field_ids=[]
	)
]
current_schema_id=0
partition_specs=[
	PartitionSpec(
		PartitionField(source_id=4, field_id=1000, transform=MonthTransform(), name='request_datetime_month'),
		spec_id=0
	)
]
default_spec_id=0
last_partition_id=1000
properties={
	'compatibility.snapshot-id-inheritance.enabled': 'true',
	'commit.manifest.target-size-bytes': '153600'
}
current_snapshot_id=666682962113515828
snapshots=[
	Snapshot(
		snapshot_id=666682962113515828,
		parent_snapshot_id=None,
		sequence_number=None,
		timestamp_ms=1677495898549,
		manifest_list='s3://my-s3-bucket/biglake/taxi_dremio_by_month/metadata/snap-666682962113515828-1-133eb191-5a2e-43da-a773-9f87eeb6b495.avro',
		summary=Summary(
			Operation.APPEND, **{
				'added-data-files': '260',
				'added-records': '745287023',
				'total-records': '745287023',
				'total-files-size': '0',
				'total-data-files': '260',
				'total-delete-files': '0',
				'total-position-deletes': '0',
				'total-equality-deletes': '0'
			}
		),
		schema_id=0
	)
]
snapshot_log=[
	SnapshotLogEntry(
		snapshot_id='666682962113515828',
		timestamp_ms=1677495898549
	)
]
metadata_log=[]
sort_orders=[SortOrder(order_id=0)]
default_sort_order_id=0
refs={
	'main': SnapshotRef(
		snapshot_id=666682962113515828,
		snapshot_ref_type=SnapshotRefType.BRANCH,
		min_snapshots_to_keep=None,
		max_snapshot_age_ms=None,
		max_ref_age_ms=None
	)
}
format_version=1
schema_=Schema(
	NestedField(field_id=1, name='hvfhs_license_num', field_type=StringType(), required=False),
	NestedField(field_id=2, name='dispatching_base_num', field_type=StringType(), required=False),
	NestedField(field_id=3, name='originating_base_num', field_type=StringType(), required=False),
	NestedField(field_id=4, name='request_datetime', field_type=TimestamptzType(), required=False),
	NestedField(field_id=5, name='on_scene_datetime', field_type=TimestamptzType(), required=False),
	NestedField(field_id=6, name='pickup_datetime', field_type=TimestamptzType(), required=False),
	NestedField(field_id=7, name='dropoff_datetime', field_type=TimestamptzType(), required=False),
	NestedField(field_id=8, name='PULocationID', field_type=LongType(), required=False),
	NestedField(field_id=9, name='DOLocationID', field_type=LongType(), required=False),
	NestedField(field_id=10, name='trip_miles', field_type=DoubleType(), required=False),
	NestedField(field_id=11, name='trip_time', field_type=LongType(), required=False),
	NestedField(field_id=12, name='base_passenger_fare', field_type=DoubleType(), required=False),
	NestedField(field_id=13, name='tolls', field_type=DoubleType(), required=False),
	NestedField(field_id=14, name='bcf', field_type=DoubleType(), required=False),
	NestedField(field_id=15, name='sales_tax', field_type=DoubleType(), required=False),
	NestedField(field_id=16, name='congestion_surcharge', field_type=DoubleType(), required=False),
	NestedField(field_id=17, name='airport_fee', field_type=IntegerType(), required=False),
	NestedField(field_id=18, name='tips', field_type=DoubleType(), required=False),
	NestedField(field_id=19, name='driver_pay', field_type=DoubleType(), required=False),
	NestedField(field_id=20, name='shared_request_flag', field_type=StringType(), required=False),
	NestedField(field_id=21, name='shared_match_flag', field_type=StringType(), required=False),
	NestedField(field_id=22, name='access_a_ride_flag', field_type=StringType(), required=False),
	NestedField(field_id=23, name='wav_request_flag', field_type=StringType(), required=False),
	NestedField(field_id=24, name='wav_match_flag', field_type=StringType(), required=False),
	schema_id=0,
	identifier_field_ids=[]
)
partition_spec=[
	{
		'name': 'request_datetime_month',
		'transform': 'month',
		'source-id': 4,
		'field-id': 1000
	}
]

s3://my-s3-bucket/biglake/taxi_dremio_by_month/metadata/00000-2cf6fb41-1c37-4e7c-a35b-3ab1c4670b45.metadata.json

And then it crashes:

Traceback (most recent call last):
  File "/home/ubuntu/src/query.py", line 17, in <module>
    con = table.scan().to_duckdb(table_name='taxi')
  File "/home/ubuntu/src/.venv/lib/python3.10/site-packages/pyiceberg/table/__init__.py", line 360, in to_duckdb
    con.register(table_name, self.to_arrow())
  File "/home/ubuntu/src/.venv/lib/python3.10/site-packages/pyiceberg/table/__init__.py", line 349, in to_arrow
    return project_table(
  File "/home/ubuntu/src/.venv/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py", line 552, in project_table
    tables = pool.starmap(
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 375, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 774, in get
    raise self._value
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.10/multiprocessing/pool.py", line 51, in starmapstar
    return list(itertools.starmap(args[0], args[1]))
  File "/home/ubuntu/src/.venv/lib/python3.10/site-packages/pyiceberg/io/pyarrow.py", line 491, in _file_to_table
    raise ValueError(
ValueError: Iceberg schema is not embedded into the Parquet file, see https://github.com/apache/iceberg/issues/6505

I'm confused because the query is a simple COUNT(*), and I thought pyiceber would read the metadata stored on the metadata folder to get the number of records.

Screenshot 2023-03-03 at 09 31 39

I've also tested PR #6997, but the python operator crashed:

s3://my-s3-bucket/biglake/taxi_dremio_by_month/metadata/00000-2cf6fb41-1c37-4e7c-a35b-3ab1c4670b45.metadata.json
Killed

@Fokko
Copy link
Contributor Author

Fokko commented Mar 3, 2023

@bigluck Thanks for giving it a try.

I'm confused because the query is a simple COUNT(*), and I thought pyiceber would read the metadata stored on the metadata folder to get the number of records.

Unfortunately, with the current DuckDB implementation, it pulls in all the (relevant) data. Since there is no filter on the scan, this means the entire table.

How big is the table? Could it be that it runs out of memory? Running echo $? will tell you the exit code of the process, which might indicate an out-of-memory situation.

@bigluck
Copy link

bigluck commented Mar 3, 2023

Oh, I've got it, thanks @Fokko .
The EC2 I'm using is a t2.medium, 2 vCPU/4GB RAM.
It's not big, so it can be the root cause (I'm querying the full hvfhs dataset, ~745,287,023 records)

The exit code is 137, OOM :)

@sheinbergon
Copy link

@Fokko @JonasJ-ap what's the status of dealing with this issue? How can I help to have a fix for this included in version 0.4.0?

@Fokko
Copy link
Contributor Author

Fokko commented May 2, 2023

@sheinbergon The PR had been merged and will be part of the 0.4.0 release

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants