Skip to content

Commit

Permalink
Merge branch 'apache:main' into as-replace-table-as-select
Browse files Browse the repository at this point in the history
  • Loading branch information
anupam-saini authored Feb 25, 2024
2 parents 8d25dda + 44948cd commit 53f2d2c
Show file tree
Hide file tree
Showing 15 changed files with 67 additions and 24 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ repos:
- id: ruff-format
args: [ --preview ]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.6.1
rev: v1.8.0
hooks:
- id: mypy
args:
[--install-types, --non-interactive, --config=pyproject.toml]
- repo: https://github.com/hadialqattan/pycln
rev: v2.3.0
rev: v2.4.0
hooks:
- id: pycln
args: [--config=pyproject.toml]
Expand Down
17 changes: 17 additions & 0 deletions pyiceberg/avro/decoder_fast.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,54 @@ from pyiceberg.avro.decoder import BinaryDecoder
class CythonBinaryDecoder(BinaryDecoder):
def __init__(self, input_contents: bytes) -> None:
pass

def tell(self) -> int:
pass

def read(self, n: int) -> bytes:
pass

def read_boolean(self) -> bool:
pass

def read_int(self) -> int:
pass

def read_ints(self, count: int) -> tuple[int, ...]:
pass

def read_int_bytes_dict(self, count: int, dest: dict[int, bytes]) -> None:
pass

def read_bytes(self) -> bytes:
pass

def read_float(self) -> float:
pass

def read_double(self) -> float:
pass

def read_utf8(self) -> str:
pass

def skip(self, n: int) -> None:
pass

def skip_int(self) -> None:
pass

def skip_boolean(self) -> None:
pass

def skip_float(self) -> None:
pass

def skip_double(self) -> None:
pass

def skip_bytes(self) -> None:
pass

def skip_utf8(self) -> None:
pass
4 changes: 1 addition & 3 deletions pyiceberg/avro/decoder_fast.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ unsigned_long_long_array_template = cython.declare(array.array, array.array('Q',

@cython.final
cdef class CythonBinaryDecoder:
"""Implement a BinaryDecoder that reads from an in-memory buffer.
"""
"""Implement a BinaryDecoder that reads from an in-memory buffer."""

# This the data that is duplicated when the decoder is created.
cdef unsigned char *_data
Expand Down
2 changes: 2 additions & 0 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
"""

@abstractmethod
Expand Down
1 change: 1 addition & 0 deletions pyiceberg/catalog/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
raise NotImplementedError

Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: If the commit failed.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
Expand Down
1 change: 1 addition & 0 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
Expand Down
4 changes: 3 additions & 1 deletion pyiceberg/catalog/rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class Endpoints:
SIGV4_SERVICE = "rest.signing-name"
AUTH_URL = "rest.authorization-url"

NAMESPACE_SEPARATOR = b"\x1F".decode(UTF8)
NAMESPACE_SEPARATOR = b"\x1f".decode(UTF8)


def _retry_hook(retry_state: RetryCallState) -> None:
Expand Down Expand Up @@ -596,6 +596,8 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
CommitStateUnknownException: Failed due to an internal exception on the side of the catalog.
"""
response = self._session.post(
self.url(Endpoints.update_table, prefixed=True, **self._split_identifier_for_path(table_request.identifier)),
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons
Raises:
NoSuchTableError: If a table with the given identifier does not exist.
CommitFailedException: If the commit failed.
CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
"""
identifier_tuple = self.identifier_to_tuple_without_catalog(
tuple(table_request.identifier.namespace.root + [table_request.identifier.name])
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/conversions.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ def _(primitive_type: DecimalType, value: Decimal) -> bytes:
return decimal_to_bytes(value)


@singledispatch
@singledispatch # type: ignore
def from_bytes(primitive_type: PrimitiveType, b: bytes) -> L: # type: ignore
"""Convert bytes to a built-in python value.
Expand Down
4 changes: 3 additions & 1 deletion pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,9 @@ def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> T:


@visit_pyarrow.register(pa.ListType)
def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> T:
@visit_pyarrow.register(pa.FixedSizeListType)
@visit_pyarrow.register(pa.LargeListType)
def _(obj: Union[pa.ListType, pa.LargeListType, pa.FixedSizeListType], visitor: PyArrowSchemaVisitor[T]) -> T:
visitor.before_list_element(obj.value_field)
result = visit_pyarrow(obj.value_type, visitor)
visitor.after_list_element(obj.value_field)
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,7 @@ def rename_column(self, path_from: Union[str, Tuple[str, ...]], new_name: str) -
from_field_correct_casing = self._schema.find_column_name(field_from.field_id)
if from_field_correct_casing in self._identifier_field_names:
self._identifier_field_names.remove(from_field_correct_casing)
new_identifier_path = f"{from_field_correct_casing[:-len(field_from.name)]}{new_name}"
new_identifier_path = f"{from_field_correct_casing[: -len(field_from.name)]}{new_name}"
self._identifier_field_names.add(new_identifier_path)

return self
Expand Down
16 changes: 8 additions & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ source = ['pyiceberg/']
[tool.ruff]
src = ['pyiceberg','tests']
extend-exclude = ["dev/provision.py"]
select = [
lint.select = [
"E", # pycodestyle
"W", # pycodestyle
"F", # Pyflakes
Expand All @@ -322,11 +322,11 @@ select = [
"I", # isort
"UP", # pyupgrade
]
ignore = ["E501","E203","B024","B028","UP037"]
lint.ignore = ["E501","E203","B024","B028","UP037"]

# Allow autofix for all enabled rules (when `--fix`) is provided.
fixable = ["ALL"]
unfixable = []
lint.fixable = ["ALL"]
lint.unfixable = []

# Exclude a variety of commonly ignored directories.
exclude = [
Expand All @@ -352,19 +352,19 @@ exclude = [
"node_modules",
"venv",
]
per-file-ignores = {}
lint.per-file-ignores = {}
# Ignore _all_ violations.
# Same as Black.
line-length = 130

# Allow unused variables when underscore-prefixed.
dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"
lint.dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$"

[tool.ruff.pyupgrade]
[tool.ruff.lint.pyupgrade]
# Preserve types, even if a file imports `from __future__ import annotations`.
keep-runtime-typing = true

[tool.ruff.isort]
[tool.ruff.lint.isort]
detect-same-package = true
lines-between-types = 0
known-first-party = ["pyiceberg", "tests"]
Expand Down
10 changes: 5 additions & 5 deletions tests/avro/test_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ def test_read_single_byte_at_the_time(decoder_class: Callable[[bytes], BinaryDec

@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_float(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x9A\x41")
decoder = decoder_class(b"\x00\x00\x9a\x41")
assert decoder.read_float() == 19.25


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_float(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x9A\x41")
decoder = decoder_class(b"\x00\x00\x9a\x41")
assert decoder.tell() == 0
decoder.skip_float()
assert decoder.tell() == 4
Expand Down Expand Up @@ -179,21 +179,21 @@ def test_read_bytes(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:

@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_utf8(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x04\x76\x6F")
decoder = decoder_class(b"\x04\x76\x6f")
assert decoder.read_utf8() == "vo"


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_skip_utf8(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x04\x76\x6F")
decoder = decoder_class(b"\x04\x76\x6f")
assert decoder.tell() == 0
decoder.skip_utf8()
assert decoder.tell() == 3


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_int_as_float(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
decoder = decoder_class(b"\x00\x00\x9A\x41")
decoder = decoder_class(b"\x00\x00\x9a\x41")
reader = resolve_reader(FloatType(), DoubleType())
assert reader.read(decoder) == 19.25

Expand Down
20 changes: 20 additions & 0 deletions tests/io/test_pyarrow_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,26 @@ def test_pyarrow_list_to_iceberg() -> None:
assert visit_pyarrow(pyarrow_list, _ConvertToIceberg()) == expected


def test_pyarrow_large_list_to_iceberg() -> None:
pyarrow_list = pa.large_list(pa.field("element", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "1"}))
expected = ListType(
element_id=1,
element_type=IntegerType(),
element_required=True,
)
assert visit_pyarrow(pyarrow_list, _ConvertToIceberg()) == expected


def test_pyarrow_fixed_size_list_to_iceberg() -> None:
pyarrow_list = pa.list_(pa.field("element", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "1"}), 1)
expected = ListType(
element_id=1,
element_type=IntegerType(),
element_required=True,
)
assert visit_pyarrow(pyarrow_list, _ConvertToIceberg()) == expected


def test_pyarrow_map_to_iceberg() -> None:
pyarrow_map = pa.map_(
pa.field("key", pa.int32(), nullable=False, metadata={"PARQUET:field_id": "1"}),
Expand Down

0 comments on commit 53f2d2c

Please sign in to comment.