Skip to content

Commit

Permalink
Add entries metadata table (apache#551)
Browse files Browse the repository at this point in the history
* Add entries metadata table

* lint

* Revert typedef changes

* Remove unrelated changes

* Fix the CI

* Add docs
  • Loading branch information
Fokko authored Apr 4, 2024
1 parent d69407c commit ee4dd92
Show file tree
Hide file tree
Showing 7 changed files with 582 additions and 31 deletions.
160 changes: 159 additions & 1 deletion mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,165 @@ manifest_list: [["s3://warehouse/default/table_metadata_snapshots/metadata/snap-
summary: [[keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","1","0","3","5459","0","0"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-records",...,"total-equality-deletes","total-files-size","deleted-data-files","deleted-records","removed-files-size"]values:["5459","1","3","1","3",...,"0","5459","1","3","5459"],keys:["added-files-size","added-data-files","added-records","total-data-files","total-delete-files","total-records","total-files-size","total-position-deletes","total-equality-deletes"]values:["5459","1","3","2","0","6","10918","0","0"]]]
```
### Add Files
### Entries
To show all the table's current manifest entries for both data and delete files.
```python
table.inspect.entries()
```

```
pyarrow.Table
status: int8 not null
snapshot_id: int64 not null
sequence_number: int64 not null
file_sequence_number: int64 not null
data_file: struct<content: int8 not null, file_path: string not null, file_format: string not null, partition: struct<> not null, record_count: int64 not null, file_size_in_bytes: int64 not null, column_sizes: map<int32, int64>, value_counts: map<int32, int64>, null_value_counts: map<int32, int64>, nan_value_counts: map<int32, int64>, lower_bounds: map<int32, binary>, upper_bounds: map<int32, binary>, key_metadata: binary, split_offsets: list<item: int64>, equality_ids: list<item: int32>, sort_order_id: int32> not null
child 0, content: int8 not null
child 1, file_path: string not null
child 2, file_format: string not null
child 3, partition: struct<> not null
child 4, record_count: int64 not null
child 5, file_size_in_bytes: int64 not null
child 6, column_sizes: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 7, value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 8, null_value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 9, nan_value_counts: map<int32, int64>
child 0, entries: struct<key: int32 not null, value: int64> not null
child 0, key: int32 not null
child 1, value: int64
child 10, lower_bounds: map<int32, binary>
child 0, entries: struct<key: int32 not null, value: binary> not null
child 0, key: int32 not null
child 1, value: binary
child 11, upper_bounds: map<int32, binary>
child 0, entries: struct<key: int32 not null, value: binary> not null
child 0, key: int32 not null
child 1, value: binary
child 12, key_metadata: binary
child 13, split_offsets: list<item: int64>
child 0, item: int64
child 14, equality_ids: list<item: int32>
child 0, item: int32
child 15, sort_order_id: int32
readable_metrics: struct<city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null>
child 0, city: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: string
child 5, upper_bound: string
child 1, lat: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: double
child 5, upper_bound: double
child 2, long: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double> not null
child 0, column_size: int64
child 1, value_count: int64
child 2, null_value_count: int64
child 3, nan_value_count: int64
child 4, lower_bound: double
child 5, upper_bound: double
----
status: [[1]]
snapshot_id: [[6245626162224016531]]
sequence_number: [[1]]
file_sequence_number: [[1]]
data_file: [
-- is_valid: all not null
-- child 0 type: int8
[0]
-- child 1 type: string
["s3://warehouse/default/cities/data/00000-0-80766b66-e558-4150-a5cf-85e4c609b9fe.parquet"]
-- child 2 type: string
["PARQUET"]
-- child 3 type: struct<>
-- is_valid: all not null
-- child 4 type: int64
[4]
-- child 5 type: int64
[1656]
-- child 6 type: map<int32, int64>
[keys:[1,2,3]values:[140,135,135]]
-- child 7 type: map<int32, int64>
[keys:[1,2,3]values:[4,4,4]]
-- child 8 type: map<int32, int64>
[keys:[1,2,3]values:[0,0,0]]
-- child 9 type: map<int32, int64>
[keys:[]values:[]]
-- child 10 type: map<int32, binary>
[keys:[1,2,3]values:[416D7374657264616D,8602B68311E34240,3A77BB5E9A9B5EC0]]
-- child 11 type: map<int32, binary>
[keys:[1,2,3]values:[53616E204672616E636973636F,F5BEF1B5678E4A40,304CA60A46651840]]
-- child 12 type: binary
[null]
-- child 13 type: list<item: int64>
[[4]]
-- child 14 type: list<item: int32>
[null]
-- child 15 type: int32
[null]]
readable_metrics: [
-- is_valid: all not null
-- child 0 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: string, upper_bound: string>
-- is_valid: all not null
-- child 0 type: int64
[140]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: string
["Amsterdam"]
-- child 5 type: string
["San Francisco"]
-- child 1 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
-- is_valid: all not null
-- child 0 type: int64
[135]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: double
[37.773972]
-- child 5 type: double
[53.11254]
-- child 2 type: struct<column_size: int64, value_count: int64, null_value_count: int64, nan_value_count: int64, lower_bound: double, upper_bound: double>
-- is_valid: all not null
-- child 0 type: int64
[135]
-- child 1 type: int64
[4]
-- child 2 type: int64
[0]
-- child 3 type: int64
[null]
-- child 4 type: double
[-122.431297]
-- child 5 type: double
[6.0989]]
```

## Add Files

Expert Iceberg users may choose to commit existing parquet files to the Iceberg table as data files, without rewriting them.

Expand Down
124 changes: 124 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from typing_extensions import Annotated

import pyiceberg.expressions.parser as parser
from pyiceberg.conversions import from_bytes
from pyiceberg.exceptions import CommitFailedException, ResolveError, ValidationError
from pyiceberg.expressions import (
AlwaysTrue,
Expand Down Expand Up @@ -3264,3 +3265,126 @@ def snapshots(self) -> "pa.Table":
snapshots,
schema=snapshots_schema,
)

def entries(self) -> "pa.Table":
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow

schema = self.tbl.metadata.schema()

readable_metrics_struct = []

def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
pa_bound_type = schema_to_pyarrow(bound_type)
return pa.struct([
pa.field("column_size", pa.int64(), nullable=True),
pa.field("value_count", pa.int64(), nullable=True),
pa.field("null_value_count", pa.int64(), nullable=True),
pa.field("nan_value_count", pa.int64(), nullable=True),
pa.field("lower_bound", pa_bound_type, nullable=True),
pa.field("upper_bound", pa_bound_type, nullable=True),
])

for field in self.tbl.metadata.schema().fields:
readable_metrics_struct.append(
pa.field(schema.find_column_name(field.field_id), _readable_metrics_struct(field.field_type), nullable=False)
)

partition_record = self.tbl.metadata.specs_struct()
pa_record_struct = schema_to_pyarrow(partition_record)

entries_schema = pa.schema([
pa.field('status', pa.int8(), nullable=False),
pa.field('snapshot_id', pa.int64(), nullable=False),
pa.field('sequence_number', pa.int64(), nullable=False),
pa.field('file_sequence_number', pa.int64(), nullable=False),
pa.field(
'data_file',
pa.struct([
pa.field('content', pa.int8(), nullable=False),
pa.field('file_path', pa.string(), nullable=False),
pa.field('file_format', pa.string(), nullable=False),
pa.field('partition', pa_record_struct, nullable=False),
pa.field('record_count', pa.int64(), nullable=False),
pa.field('file_size_in_bytes', pa.int64(), nullable=False),
pa.field('column_sizes', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('null_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('nan_value_counts', pa.map_(pa.int32(), pa.int64()), nullable=True),
pa.field('lower_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('upper_bounds', pa.map_(pa.int32(), pa.binary()), nullable=True),
pa.field('key_metadata', pa.binary(), nullable=True),
pa.field('split_offsets', pa.list_(pa.int64()), nullable=True),
pa.field('equality_ids', pa.list_(pa.int32()), nullable=True),
pa.field('sort_order_id', pa.int32(), nullable=True),
]),
nullable=False,
),
pa.field('readable_metrics', pa.struct(readable_metrics_struct), nullable=True),
])

entries = []
if snapshot := self.tbl.metadata.current_snapshot():
for manifest in snapshot.manifests(self.tbl.io):
for entry in manifest.fetch_manifest_entry(io=self.tbl.io):
column_sizes = entry.data_file.column_sizes or {}
value_counts = entry.data_file.value_counts or {}
null_value_counts = entry.data_file.null_value_counts or {}
nan_value_counts = entry.data_file.nan_value_counts or {}
lower_bounds = entry.data_file.lower_bounds or {}
upper_bounds = entry.data_file.upper_bounds or {}
readable_metrics = {
schema.find_column_name(field.field_id): {
"column_size": column_sizes.get(field.field_id),
"value_count": value_counts.get(field.field_id),
"null_value_count": null_value_counts.get(field.field_id),
"nan_value_count": nan_value_counts.get(field.field_id),
# Makes them readable
"lower_bound": from_bytes(field.field_type, lower_bound)
if (lower_bound := lower_bounds.get(field.field_id))
else None,
"upper_bound": from_bytes(field.field_type, upper_bound)
if (upper_bound := upper_bounds.get(field.field_id))
else None,
}
for field in self.tbl.metadata.schema().fields
}

partition = entry.data_file.partition
partition_record_dict = {
field.name: partition[pos]
for pos, field in enumerate(self.tbl.metadata.specs()[manifest.partition_spec_id].fields)
}

entries.append({
'status': entry.status.value,
'snapshot_id': entry.snapshot_id,
'sequence_number': entry.data_sequence_number,
'file_sequence_number': entry.file_sequence_number,
'data_file': {
"content": entry.data_file.content,
"file_path": entry.data_file.file_path,
"file_format": entry.data_file.file_format,
"partition": partition_record_dict,
"record_count": entry.data_file.record_count,
"file_size_in_bytes": entry.data_file.file_size_in_bytes,
"column_sizes": dict(entry.data_file.column_sizes),
"value_counts": dict(entry.data_file.value_counts),
"null_value_counts": dict(entry.data_file.null_value_counts),
"nan_value_counts": entry.data_file.nan_value_counts,
"lower_bounds": entry.data_file.lower_bounds,
"upper_bounds": entry.data_file.upper_bounds,
"key_metadata": entry.data_file.key_metadata,
"split_offsets": entry.data_file.split_offsets,
"equality_ids": entry.data_file.equality_ids,
"sort_order_id": entry.data_file.sort_order_id,
"spec_id": entry.data_file.spec_id,
},
'readable_metrics': readable_metrics,
})

return pa.Table.from_pylist(
entries,
schema=entries_schema,
)
27 changes: 26 additions & 1 deletion pyiceberg/table/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
IcebergRootModel,
Properties,
)
from pyiceberg.types import transform_dict_value_to_str
from pyiceberg.types import NestedField, StructType, transform_dict_value_to_str
from pyiceberg.utils.config import Config
from pyiceberg.utils.datetime import datetime_to_millis

Expand Down Expand Up @@ -245,6 +245,31 @@ def specs(self) -> Dict[int, PartitionSpec]:
"""Return a dict the partition specs this table."""
return {spec.spec_id: spec for spec in self.partition_specs}

def specs_struct(self) -> StructType:
"""Produce a struct of all the combined PartitionSpecs.
The partition fields should be optional: Partition fields may be added later,
in which case not all files would have the result field, and it may be null.
:return: A StructType that represents all the combined PartitionSpecs of the table
"""
specs = self.specs()

# Collect all the fields
struct_fields = {field.field_id: field for spec in specs.values() for field in spec.fields}

schema = self.schema()

nested_fields = []
# Sort them by field_id in order to get a deterministic output
for field_id in sorted(struct_fields):
field = struct_fields[field_id]
source_type = schema.find_type(field.source_id)
result_type = field.transform.result_type(source_type)
nested_fields.append(NestedField(field_id=field.field_id, name=field.name, type=result_type, required=False))

return StructType(*nested_fields)

def new_snapshot_id(self) -> int:
"""Generate a new snapshot-id that's not in use."""
snapshot_id = _generate_snapshot_id()
Expand Down
4 changes: 4 additions & 0 deletions pyiceberg/utils/lazydict.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ def __len__(self) -> int:
"""Return the number of items in the dictionary."""
source = self._dict or self._build_dict()
return len(source)

def __dict__(self) -> Dict[K, V]: # type: ignore
"""Convert the lazy dict in a dict."""
return self._dict or self._build_dict()
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2043,5 +2043,5 @@ def pa_schema() -> "pa.Schema":
def arrow_table_with_null(pa_schema: "pa.Schema") -> "pa.Table":
import pyarrow as pa

"""PyArrow table with all kinds of columns"""
"""Pyarrow table with all kinds of columns."""
return pa.Table.from_pydict(TEST_DATA_WITH_NULL, schema=pa_schema)
Loading

0 comments on commit ee4dd92

Please sign in to comment.