Skip to content

Commit

Permalink
test(flink): deep dive on the tests marked for Flink in test_json.py
Browse files Browse the repository at this point in the history
  • Loading branch information
mfatihaktas committed Jan 3, 2024
1 parent 1a5a420 commit 601cd13
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 13 deletions.
112 changes: 110 additions & 2 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
InsertSelect,
RenameTable,
)
from ibis.util import gen_name, normalize_filename

if TYPE_CHECKING:
from collections.abc import Mapping
from pathlib import Path

import pandas as pd
import pyarrow as pa
Expand Down Expand Up @@ -119,9 +121,10 @@ def drop_database(
def list_tables(
self,
like: str | None = None,
temp: bool = False,
*,
database: str | None = None,
catalog: str | None = None,
temp: bool = False,
) -> list[str]:
"""Return the list of table/view names.
Expand Down Expand Up @@ -198,7 +201,7 @@ def _fully_qualified_name(
database: str | None = None,
catalog: str | None = None,
) -> str:
if is_fully_qualified(name):
if name and is_fully_qualified(name):
return name

return sg.table(
Expand Down Expand Up @@ -635,6 +638,111 @@ def drop_view(
sql = statement.compile()
self._exec_sql(sql)

def _get_dataframe_from_path(self, path: str | Path) -> pd.DataFrame:
import glob

Check warning on line 642 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L642

Added line #L642 was not covered by tests

import pandas as pd

Check warning on line 644 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L644

Added line #L644 was not covered by tests

dataframe_list = []
path_str = str(path)
path_normalized = normalize_filename(path)

Check warning on line 648 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L646-L648

Added lines #L646 - L648 were not covered by tests
for file_path in glob.glob(path_normalized):
if path_str.startswith(("parquet://", "parq://")) or path_str.endswith(
("parq", "parquet")
):
dataframe = pd.read_parquet(file_path)

Check warning on line 653 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L653

Added line #L653 was not covered by tests
elif path_str.startswith("csv://") or path_str.endswith(("csv", "csv.gz")):
dataframe = pd.read_csv(file_path)

Check warning on line 655 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L655

Added line #L655 was not covered by tests
elif path_str.endswith("json"):
dataframe = pd.read_json(file_path, lines=True)

Check warning on line 657 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L657

Added line #L657 was not covered by tests
else:
raise ValueError(f"Unsupported file_path: {file_path}")

Check warning on line 659 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L659

Added line #L659 was not covered by tests

dataframe_list.append(dataframe)

Check warning on line 661 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L661

Added line #L661 was not covered by tests

return pd.concat(dataframe_list, ignore_index=True, sort=False)

Check warning on line 663 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L663

Added line #L663 was not covered by tests

def read_file(
self,
file_type: str,
path: str | Path,
table_name: str | None = None,
) -> ir.Table:
"""Register a file as a table in the current database.
Parameters
----------
file_type
File type, e.g., parquet, csv, json.
path
The data source.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
Returns
-------
ir.Table
The just-registered table
"""
obj = self._get_dataframe_from_path(path)
table_name = table_name or gen_name(f"read_{file_type}")
return self.create_table(table_name, obj, temp=True, overwrite=True)

Check warning on line 690 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L688-L690

Added lines #L688 - L690 were not covered by tests

def read_parquet(self, path: str | Path, table_name: str | None = None) -> ir.Table:
"""Register a parquet file as a table in the current database.
Parameters
----------
path
The data source.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
Returns
-------
ir.Table
The just-registered table
"""
return self.read_file(file_type="parquet", path=path, table_name=table_name)

Check warning on line 708 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L708

Added line #L708 was not covered by tests

def read_csv(self, path: str | Path, table_name: str | None = None) -> ir.Table:
"""Register a csv file as a table in the current database.
Parameters
----------
path
The data source.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
Returns
-------
ir.Table
The just-registered table
"""
return self.read_file(file_type="csv", path=path, table_name=table_name)

Check warning on line 726 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L726

Added line #L726 was not covered by tests

def read_json(self, path: str | Path, table_name: str | None = None) -> ir.Table:
"""Register a json file as a table in the current database.
Parameters
----------
path
The data source.
table_name
An optional name to use for the created table. This defaults to
a sequentially generated name.
Returns
-------
ir.Table
The just-registered table
"""
return self.read_file(file_type="json", path=path, table_name=table_name)

Check warning on line 744 in ibis/backends/flink/__init__.py

View check run for this annotation

Codecov / codecov/patch

ibis/backends/flink/__init__.py#L744

Added line #L744 was not covered by tests

@classmethod
@lru_cache
def _get_operations(cls):
Expand Down
2 changes: 0 additions & 2 deletions ibis/backends/tests/test_param.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ def test_scalar_param_array(con):
[
"datafusion",
"impala",
"flink",
"postgres",
"pyspark",
"druid",
Expand Down Expand Up @@ -244,7 +243,6 @@ def test_scalar_param_date(backend, alltypes, value):
"exasol",
]
)
@pytest.mark.notimpl(["flink"], "WIP")
def test_scalar_param_nested(con):
param = ibis.param("struct<x: array<struct<y: array<double>>>>")
value = OrderedDict([("x", [OrderedDict([("y", [1.0, 2.0, 3.0])])])])
Expand Down
13 changes: 4 additions & 9 deletions ibis/backends/tests/test_register.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,7 @@ def test_register_garbage(con, monkeypatch):
("functional_alltypes.parquet", "funk_all"),
],
)
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"]
)
@pytest.mark.notyet(["impala", "mssql", "mysql", "postgres", "sqlite", "trino"])
def test_read_parquet(con, tmp_path, data_dir, fname, in_table_name):
pq = pytest.importorskip("pyarrow.parquet")

Expand Down Expand Up @@ -427,7 +425,7 @@ def ft_data(data_dir):


@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
["impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
)
def test_read_parquet_glob(con, tmp_path, ft_data):
pq = pytest.importorskip("pyarrow.parquet")
Expand All @@ -446,7 +444,7 @@ def test_read_parquet_glob(con, tmp_path, ft_data):


@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
["impala", "mssql", "mysql", "pandas", "postgres", "sqlite", "trino"]
)
def test_read_csv_glob(con, tmp_path, ft_data):
pc = pytest.importorskip("pyarrow.csv")
Expand All @@ -469,7 +467,6 @@ def test_read_csv_glob(con, tmp_path, ft_data):
"clickhouse",
"dask",
"datafusion",
"flink",
"impala",
"mssql",
"mysql",
Expand Down Expand Up @@ -522,9 +519,7 @@ def num_diamonds(data_dir):
"in_table_name",
[param(None, id="default"), param("fancy_stones", id="file_name")],
)
@pytest.mark.notyet(
["flink", "impala", "mssql", "mysql", "postgres", "sqlite", "trino"]
)
@pytest.mark.notyet(["impala", "mssql", "mysql", "postgres", "sqlite", "trino"])
def test_read_csv(con, data_dir, in_table_name, num_diamonds):
fname = "diamonds.csv"
with pushd(data_dir / "csv"):
Expand Down

0 comments on commit 601cd13

Please sign in to comment.