diff --git a/dlt/common/typing.py b/dlt/common/typing.py index 283fd0a558..29572036d9 100644 --- a/dlt/common/typing.py +++ b/dlt/common/typing.py @@ -328,7 +328,7 @@ def is_typeddict(t: Type[Any]) -> bool: def is_annotated(ann_type: Any) -> bool: try: - return issubclass(get_origin(ann_type), Annotated) # type: ignore[arg-type] + return get_origin(ann_type) is Annotated except TypeError: return False diff --git a/dlt/destinations/impl/databricks/databricks.py b/dlt/destinations/impl/databricks/databricks.py index a83db6ec34..fd97ded510 100644 --- a/dlt/destinations/impl/databricks/databricks.py +++ b/dlt/destinations/impl/databricks/databricks.py @@ -28,8 +28,7 @@ from dlt.common.schema import TColumnSchema, Schema from dlt.common.schema.typing import TColumnType from dlt.common.storages import FilesystemConfiguration, fsspec_from_config - -from dlt.destinations.insert_job_client import InsertValuesJobClient +from dlt.destinations.job_client_impl import SqlJobClientWithStagingDataset from dlt.destinations.exceptions import LoadJobTerminalException from dlt.destinations.impl.databricks.configuration import DatabricksClientConfiguration from dlt.destinations.impl.databricks.sql_client import DatabricksSqlClient @@ -198,7 +197,7 @@ def gen_delete_from_sql( """ -class DatabricksClient(InsertValuesJobClient, SupportsStagingDestination): +class DatabricksClient(SqlJobClientWithStagingDataset, SupportsStagingDestination): def __init__( self, schema: Schema, @@ -213,7 +212,7 @@ def __init__( ) super().__init__(schema, config, sql_client) self.config: DatabricksClientConfiguration = config - self.sql_client: DatabricksSqlClient = sql_client + self.sql_client: DatabricksSqlClient = sql_client # type: ignore[assignment, unused-ignore] self.type_mapper = self.capabilities.get_type_mapper() def create_load_job( diff --git a/dlt/destinations/impl/databricks/sql_client.py b/dlt/destinations/impl/databricks/sql_client.py index 16e1e73d93..9f695b9d6e 100644 --- a/dlt/destinations/impl/databricks/sql_client.py +++ b/dlt/destinations/impl/databricks/sql_client.py @@ -16,7 +16,7 @@ ) from databricks.sdk.core import Config, oauth_service_principal -from databricks import sql as databricks_lib # type: ignore[attr-defined] +from databricks import sql as databricks_lib from databricks.sql.client import ( Connection as DatabricksSqlConnection, Cursor as DatabricksSqlCursor, @@ -43,7 +43,7 @@ class DatabricksCursorImpl(DBApiCursorImpl): """Use native data frame support if available""" - native_cursor: DatabricksSqlCursor + native_cursor: DatabricksSqlCursor # type: ignore[assignment, unused-ignore] vector_size: ClassVar[int] = 2048 # vector size is 2048 def iter_arrow(self, chunk_size: int) -> Generator[ArrowTable, None, None]: @@ -140,7 +140,6 @@ def execute_sql( @contextmanager @raise_database_error def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DBApiCursor]: - curr: DBApiCursor # TODO: Inline param support will be dropped in future databricks driver, switch to :named paramstyle # This will drop support for cluster runtime v13.x # db_args: Optional[Dict[str, Any]] @@ -159,10 +158,11 @@ def execute_query(self, query: AnyStr, *args: Any, **kwargs: Any) -> Iterator[DB # else: # db_args = kwargs or None + assert isinstance(query, str) db_args = args or kwargs or None with self._conn.cursor() as curr: curr.execute(query, db_args) - yield DatabricksCursorImpl(curr) # type: ignore[abstract] + yield DatabricksCursorImpl(curr) # type: ignore[arg-type, abstract, unused-ignore] def catalog_name(self, escape: bool = True) -> Optional[str]: catalog = self.capabilities.casefold_identifier(self.credentials.catalog) diff --git a/dlt/destinations/impl/lancedb/lancedb_client.py b/dlt/destinations/impl/lancedb/lancedb_client.py index bb0e12f8ec..e484435720 100644 --- a/dlt/destinations/impl/lancedb/lancedb_client.py +++ b/dlt/destinations/impl/lancedb/lancedb_client.py @@ -536,7 +536,7 @@ def update_schema_in_storage(self) -> None: self.schema.naming.normalize_identifier( "engine_version" ): self.schema.ENGINE_VERSION, - self.schema.naming.normalize_identifier("inserted_at"): str(pendulum.now()), + self.schema.naming.normalize_identifier("inserted_at"): pendulum.now(), self.schema.naming.normalize_identifier("schema_name"): self.schema.name, self.schema.naming.normalize_identifier( "version_hash" @@ -693,7 +693,7 @@ def complete_load(self, load_id: str) -> None: self.schema.naming.normalize_identifier("load_id"): load_id, self.schema.naming.normalize_identifier("schema_name"): self.schema.name, self.schema.naming.normalize_identifier("status"): 0, - self.schema.naming.normalize_identifier("inserted_at"): str(pendulum.now()), + self.schema.naming.normalize_identifier("inserted_at"): pendulum.now(), self.schema.naming.normalize_identifier("schema_version_hash"): None, } ] diff --git a/dlt/extract/incremental/transform.py b/dlt/extract/incremental/transform.py index 1d213e26c2..6920aaaa26 100644 --- a/dlt/extract/incremental/transform.py +++ b/dlt/extract/incremental/transform.py @@ -24,12 +24,15 @@ try: from dlt.common.libs import pyarrow - from dlt.common.libs.numpy import numpy from dlt.common.libs.pyarrow import pyarrow as pa, TAnyArrowItem from dlt.common.libs.pyarrow import from_arrow_scalar, to_arrow_scalar except MissingDependencyException: pa = None pyarrow = None + +try: + from dlt.common.libs.numpy import numpy +except MissingDependencyException: numpy = None # NOTE: always import pandas independently from pyarrow @@ -320,7 +323,9 @@ def _add_unique_index(self, tbl: "pa.Table") -> "pa.Table": """Creates unique index if necessary.""" # create unique index if necessary if self._dlt_index not in tbl.schema.names: - tbl = pyarrow.append_column(tbl, self._dlt_index, pa.array(numpy.arange(tbl.num_rows))) + # indices = pa.compute.sequence(start=0, step=1, length=tbl.num_rows, + indices = pa.array(range(tbl.num_rows)) + tbl = pyarrow.append_column(tbl, self._dlt_index, indices) return tbl def __call__( diff --git a/mypy.ini b/mypy.ini index 701695567e..51151486b6 100644 --- a/mypy.ini +++ b/mypy.ini @@ -24,9 +24,6 @@ disallow_untyped_defs=false [mypy-jsonpath_ng.*] ignore_missing_imports=true -[mypy-astunparse.*] -ignore_missing_imports=true - [mypy-google.oauth2.*] ignore_missing_imports=true @@ -89,6 +86,9 @@ ignore_missing_imports=true [mypy-pandas.*] ignore_missing_imports=true +[mypy-numpy.*] +ignore_missing_imports=true + [mypy-apiclient.*] ignore_missing_imports=true @@ -101,8 +101,10 @@ ignore_missing_imports=true [mypy-connectorx] ignore_missing_imports=true + [mypy-s3fs.*] ignore_missing_imports=true + [mypy-win_precise_time] ignore_missing_imports=true @@ -121,6 +123,9 @@ ignore_missing_imports = True [mypy-pytz.*] ignore_missing_imports = True +[mypy-sentry_sdk.*] +ignore_missing_imports = True + [mypy-tornado.*] ignore_missing_imports = True @@ -130,7 +135,7 @@ ignore_missing_imports = True [mypy-snowflake.*] ignore_missing_imports = True -[mypy-backports.*] +[mypy-pendulum.*] ignore_missing_imports = True [mypy-time_machine.*] diff --git a/tests/extract/test_decorators.py b/tests/extract/test_decorators.py index a14b4a9602..b6d77b055a 100644 --- a/tests/extract/test_decorators.py +++ b/tests/extract/test_decorators.py @@ -1447,11 +1447,11 @@ def some_tx(item): def some_tx_func(item): return list(range(item)) - transformer = dlt.transformer(some_tx_func, parallelized=True, data_from=resource) + transformer = dlt.transformer(some_tx_func, data_from=resource) pipe_gen = transformer._pipe.gen inner = pipe_gen(3) # type: ignore # this is a regular function returning list - assert inner() == [0, 1, 2] # type: ignore[operator] + assert inner == [0, 1, 2] assert list(transformer) == [0, 0, 1, 0, 1, 2] # Invalid parallel resources diff --git a/tests/libs/test_parquet_writer.py b/tests/libs/test_parquet_writer.py index b6a25c5db5..5bb91dff88 100644 --- a/tests/libs/test_parquet_writer.py +++ b/tests/libs/test_parquet_writer.py @@ -4,6 +4,7 @@ import pytest import datetime # noqa: 251 import time +import math from dlt.common import pendulum, Decimal, json from dlt.common.configuration import inject_section @@ -12,7 +13,6 @@ from dlt.common.schema.utils import new_column from dlt.common.configuration.specs.config_section_context import ConfigSectionContext from dlt.common.time import ensure_pendulum_datetime -from dlt.common.libs.pyarrow import from_arrow_scalar from tests.common.data_writers.utils import get_writer from tests.cases import ( @@ -165,10 +165,14 @@ def test_parquet_writer_size_file_rotation() -> None: for i in range(0, 100): writer.write_data_item([{"col1": i}], columns) - assert len(writer.closed_files) == 25 + # different arrow version create different file sizes + no_files = len(writer.closed_files) + i_per_file = int(math.ceil(100 / no_files)) + assert no_files >= 17 and no_files <= 25 + with open(writer.closed_files[4].file_path, "rb") as f: table = pq.read_table(f) - assert table.column("col1").to_pylist() == list(range(16, 20)) + assert table.column("col1").to_pylist() == list(range(4 * i_per_file, 5 * i_per_file)) def test_parquet_writer_config() -> None: diff --git a/tests/libs/test_pydantic.py b/tests/libs/test_pydantic.py index 70846dcd72..e6df83a380 100644 --- a/tests/libs/test_pydantic.py +++ b/tests/libs/test_pydantic.py @@ -347,7 +347,7 @@ def test_nested_model_config_propagation() -> None: assert model_freeze.__fields__["address"].annotation.__name__ == "UserAddressExtraAllow" # type: ignore[index] # annotated is preserved type_origin = get_origin(model_freeze.__fields__["address"].rebuild_annotation()) # type: ignore[index] - assert issubclass(type_origin, Annotated) # type: ignore[arg-type] + assert type_origin is Annotated # UserAddress is converted to UserAddressAllow only once type_annotation = model_freeze.__fields__["address"].annotation # type: ignore[index] assert type_annotation is get_args(model_freeze.__fields__["unity"].annotation)[0] # type: ignore[index] @@ -404,7 +404,7 @@ class UserPipe(BaseModel): assert model_freeze.__fields__["address"].annotation.__name__ == "UserAddressPipeExtraAllow" # type: ignore[index] # annotated is preserved type_origin = get_origin(model_freeze.__fields__["address"].rebuild_annotation()) # type: ignore[index] - assert issubclass(type_origin, Annotated) # type: ignore[arg-type] + assert type_origin is Annotated # UserAddress is converted to UserAddressAllow only once type_annotation = model_freeze.__fields__["address"].annotation # type: ignore[index] assert type_annotation is get_args(model_freeze.__fields__["unity"].annotation)[0] # type: ignore[index] diff --git a/tests/pipeline/test_pipeline_extra.py b/tests/pipeline/test_pipeline_extra.py index 32b16c234f..b14e9f6264 100644 --- a/tests/pipeline/test_pipeline_extra.py +++ b/tests/pipeline/test_pipeline_extra.py @@ -450,8 +450,8 @@ class Parent(BaseModel): @pytest.mark.skipif( - importlib.util.find_spec("pandas") is not None, - reason="Test skipped because pandas IS installed", + importlib.util.find_spec("pandas") is not None or importlib.util.find_spec("numpy") is not None, + reason="Test skipped because pandas or numpy ARE installed", ) def test_arrow_no_pandas() -> None: import pyarrow as pa @@ -461,20 +461,32 @@ def test_arrow_no_pandas() -> None: "Strings": ["apple", "banana", "cherry", "date", "elderberry"], } - df = pa.table(data) + table = pa.table(data) @dlt.resource def pandas_incremental(numbers=dlt.sources.incremental("Numbers")): - yield df + yield table + + info = dlt.run( + pandas_incremental(), write_disposition="merge", table_name="data", destination="duckdb" + ) + + # change table + data = { + "Numbers": [5, 6], + "Strings": ["elderberry", "burak"], + } + + table = pa.table(data) info = dlt.run( - pandas_incremental(), write_disposition="append", table_name="data", destination="duckdb" + pandas_incremental(), write_disposition="merge", table_name="data", destination="duckdb" ) with info.pipeline.sql_client() as client: # type: ignore with client.execute_query("SELECT * FROM data") as c: with pytest.raises(ImportError): - df = c.df() + c.df() def test_empty_parquet(test_storage: FileStorage) -> None: