From cec051f230edfb584f1267e505ee218305389c11 Mon Sep 17 00:00:00 2001 From: Jay Chia <17691182+jaychia@users.noreply.github.com> Date: Wed, 7 Feb 2024 05:30:34 -0800 Subject: [PATCH] Add tests and fixes for Daft integration (#381) * Implement to_daft on Table instead of Scan * Add integration tests --------- Co-authored-by: Jay Chia --- pyiceberg/table/__init__.py | 20 ++++++++++---------- tests/integration/test_reads.py | 22 ++++++++++++++++++++++ 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index b9d44b7c4d..1feffc60ed 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -1025,6 +1025,16 @@ def __repr__(self) -> str: result_str = f"{table_name}(\n {schema_str}\n),\n{partition_str},\n{sort_order_str},\n{snapshot_str}" return result_str + def to_daft(self) -> daft.DataFrame: + """Read a Daft DataFrame lazily from this Iceberg table. + + Returns: + daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table + """ + import daft + + return daft.read_iceberg(self) + class StaticTable(Table): """Load a table directly from a metadata file (i.e., without using a catalog).""" @@ -1382,16 +1392,6 @@ def to_ray(self) -> ray.data.dataset.Dataset: return ray.data.from_arrow(self.to_arrow()) - def to_daft(self) -> daft.DataFrame: - """Read a Daft DataFrame lazily from this Iceberg table. - - Returns: - daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table - """ - import daft - - return daft.read_iceberg(self) - class MoveOperation(Enum): First = 1 diff --git a/tests/integration/test_reads.py b/tests/integration/test_reads.py index b35b348638..d487a6477e 100644 --- a/tests/integration/test_reads.py +++ b/tests/integration/test_reads.py @@ -178,6 +178,28 @@ def test_pyarrow_limit(catalog: Catalog) -> None: assert len(full_result) == 10 +@pytest.mark.integration +@pytest.mark.filterwarnings("ignore") +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_daft_nan(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") + df = table_test_null_nan_rewritten.to_daft() + assert df.count_rows() == 3 + assert math.isnan(df.to_pydict()["col_numeric"][0]) + + +@pytest.mark.integration +@pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')]) +def test_daft_nan_rewritten(catalog: Catalog) -> None: + table_test_null_nan_rewritten = catalog.load_table("default.test_null_nan_rewritten") + df = table_test_null_nan_rewritten.to_daft() + df = df.where(df["col_numeric"].float.is_nan()) + df = df.select("idx", "col_numeric") + assert df.count_rows() == 1 + assert df.to_pydict()["idx"][0] == 1 + assert math.isnan(df.to_pydict()["col_numeric"][0]) + + @pytest.mark.integration @pytest.mark.filterwarnings("ignore") @pytest.mark.parametrize('catalog', [pytest.lazy_fixture('catalog_hive'), pytest.lazy_fixture('catalog_rest')])