Skip to content

Commit

Permalink
fix: renable to_csv and to_json related tests (#468)
Browse files Browse the repository at this point in the history
* fix: renable to_csv and to_json related tests

* fix gcs file path

* add global FIRST_GCS_FILE_SUFFIX

* trying to avoid import functions
  • Loading branch information
chelsea-lin authored Mar 20, 2024
1 parent 21b2188 commit 2b9a01d
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 36 deletions.
35 changes: 18 additions & 17 deletions tests/system/small/test_dataframe_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pyarrow as pa
import pytest

from tests.system.utils import assert_pandas_df_equal, convert_pandas_dtypes
from tests.system import utils

try:
import pandas_gbq # type: ignore
Expand Down Expand Up @@ -115,7 +115,6 @@ def test_to_pandas_batches_w_correct_dtypes(scalars_df_default_index):
pd.testing.assert_series_equal(actual, expected)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
@pytest.mark.parametrize(
("index"),
[True, False],
Expand Down Expand Up @@ -150,12 +149,12 @@ def test_to_csv_index(
# read_csv will decode into bytes inproperly, convert_pandas_dtypes will encode properly from string
dtype.pop("bytes_col")
gcs_df = pd.read_csv(
path,
utils.get_first_file_from_wildcard(path),
dtype=dtype,
date_format={"timestamp_col": "YYYY-MM-DD HH:MM:SS Z"},
index_col=index_col,
)
convert_pandas_dtypes(gcs_df, bytes_col=True)
utils.convert_pandas_dtypes(gcs_df, bytes_col=True)
gcs_df.index.name = scalars_df.index.name

scalars_pandas_df = scalars_pandas_df.copy()
Expand All @@ -164,7 +163,6 @@ def test_to_csv_index(
pd.testing.assert_frame_equal(gcs_df, scalars_pandas_df)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
def test_to_csv_tabs(
scalars_dfs: Tuple[bigframes.dataframe.DataFrame, pd.DataFrame],
gcs_folder: str,
Expand All @@ -189,13 +187,13 @@ def test_to_csv_tabs(
# read_csv will decode into bytes inproperly, convert_pandas_dtypes will encode properly from string
dtype.pop("bytes_col")
gcs_df = pd.read_csv(
path,
utils.get_first_file_from_wildcard(path),
sep="\t",
dtype=dtype,
date_format={"timestamp_col": "YYYY-MM-DD HH:MM:SS Z"},
index_col=index_col,
)
convert_pandas_dtypes(gcs_df, bytes_col=True)
utils.convert_pandas_dtypes(gcs_df, bytes_col=True)
gcs_df.index.name = scalars_df.index.name

scalars_pandas_df = scalars_pandas_df.copy()
Expand Down Expand Up @@ -229,7 +227,7 @@ def test_to_gbq_index(scalars_dfs, dataset_id, index):
else:
df_out = df_out.sort_values("rowindex_2").reset_index(drop=True)

convert_pandas_dtypes(df_out, bytes_col=False)
utils.convert_pandas_dtypes(df_out, bytes_col=False)
# pd.read_gbq interpets bytes_col as object, reconvert to pyarrow binary
df_out["bytes_col"] = df_out["bytes_col"].astype(pd.ArrowDtype(pa.binary()))
expected = scalars_pandas_df.copy()
Expand Down Expand Up @@ -415,7 +413,6 @@ def test_to_json_index_invalid_lines(
scalars_df.to_json(path, index=index)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
@pytest.mark.parametrize(
("index"),
[True, False],
Expand All @@ -435,8 +432,12 @@ def test_to_json_index_records_orient(
""" Test the `to_json` API with `orient` is `records` and `lines` is True"""
scalars_df.to_json(path, index=index, orient="records", lines=True)

gcs_df = pd.read_json(path, lines=True, convert_dates=["datetime_col"])
convert_pandas_dtypes(gcs_df, bytes_col=True)
gcs_df = pd.read_json(
utils.get_first_file_from_wildcard(path),
lines=True,
convert_dates=["datetime_col"],
)
utils.convert_pandas_dtypes(gcs_df, bytes_col=True)
if index and scalars_df.index.name is not None:
gcs_df = gcs_df.set_index(scalars_df.index.name)

Expand Down Expand Up @@ -474,8 +475,8 @@ def test_to_parquet_index(scalars_dfs, gcs_folder, index):
# table.
scalars_df.to_parquet(path, index=index)

gcs_df = pd.read_parquet(path.replace("*", "000000000000"))
convert_pandas_dtypes(gcs_df, bytes_col=False)
gcs_df = pd.read_parquet(utils.get_first_file_from_wildcard(path))
utils.convert_pandas_dtypes(gcs_df, bytes_col=False)
if index and scalars_df.index.name is not None:
gcs_df = gcs_df.set_index(scalars_df.index.name)

Expand Down Expand Up @@ -507,7 +508,7 @@ def test_to_sql_query_unnamed_index_included(
pd_df = scalars_pandas_df_default_index.reset_index(drop=True)
roundtrip = session.read_gbq(sql, index_col=idx_ids)
roundtrip.index.names = [None]
assert_pandas_df_equal(roundtrip.to_pandas(), pd_df, check_index_type=False)
utils.assert_pandas_df_equal(roundtrip.to_pandas(), pd_df, check_index_type=False)


def test_to_sql_query_named_index_included(
Expand All @@ -524,7 +525,7 @@ def test_to_sql_query_named_index_included(

pd_df = scalars_pandas_df_default_index.set_index("rowindex_2", drop=True)
roundtrip = session.read_gbq(sql, index_col=idx_ids)
assert_pandas_df_equal(roundtrip.to_pandas(), pd_df)
utils.assert_pandas_df_equal(roundtrip.to_pandas(), pd_df)


def test_to_sql_query_unnamed_index_excluded(
Expand All @@ -539,7 +540,7 @@ def test_to_sql_query_unnamed_index_excluded(

pd_df = scalars_pandas_df_default_index.reset_index(drop=True)
roundtrip = session.read_gbq(sql)
assert_pandas_df_equal(
utils.assert_pandas_df_equal(
roundtrip.to_pandas(), pd_df, check_index_type=False, ignore_order=True
)

Expand All @@ -558,6 +559,6 @@ def test_to_sql_query_named_index_excluded(
"rowindex_2", drop=True
).reset_index(drop=True)
roundtrip = session.read_gbq(sql)
assert_pandas_df_equal(
utils.assert_pandas_df_equal(
roundtrip.to_pandas(), pd_df, check_index_type=False, ignore_order=True
)
3 changes: 2 additions & 1 deletion tests/system/small/test_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import bigframes
import bigframes.ml.linear_model
from tests.system import utils


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -160,7 +161,7 @@ def test_read_csv_gcs(
# Create a csv in gcs
write_path = gcs_folder + "test_read_csv_gcs_bigquery_engine*.csv"
read_path = (
write_path.replace("*", "000000000000") if engine is None else write_path
utils.get_first_file_from_wildcard(write_path) if engine is None else write_path
)
scalars_df_index.to_csv(write_path)

Expand Down
7 changes: 3 additions & 4 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from tests.system.utils import (
assert_pandas_df_equal,
assert_series_equal,
get_first_file_from_wildcard,
skip_legacy_pandas,
)

Expand Down Expand Up @@ -2390,11 +2391,10 @@ def test_to_frame(scalars_dfs):
assert_pandas_df_equal(bf_result, pd_result)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
def test_to_json(gcs_folder, scalars_df_index, scalars_pandas_df_index):
path = gcs_folder + "test_series_to_json*.jsonl"
scalars_df_index["int64_col"].to_json(path, lines=True, orient="records")
gcs_df = pd.read_json(path, lines=True)
gcs_df = pd.read_json(get_first_file_from_wildcard(path), lines=True)

pd.testing.assert_series_equal(
gcs_df["int64_col"].astype(pd.Int64Dtype()),
Expand All @@ -2404,11 +2404,10 @@ def test_to_json(gcs_folder, scalars_df_index, scalars_pandas_df_index):
)


@pytest.mark.skip(reason="Disable to unblock kokoro tests")
def test_to_csv(gcs_folder, scalars_df_index, scalars_pandas_df_index):
path = gcs_folder + "test_series_to_csv*.csv"
scalars_df_index["int64_col"].to_csv(path)
gcs_df = pd.read_csv(path)
gcs_df = pd.read_csv(get_first_file_from_wildcard(path))

pd.testing.assert_series_equal(
gcs_df["int64_col"].astype(pd.Int64Dtype()),
Expand Down
26 changes: 12 additions & 14 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import bigframes.dataframe
import bigframes.dtypes
import bigframes.ml.linear_model
from tests.system.utils import skip_legacy_pandas

FIRST_FILE = "000000000000"
from tests.system import utils


def test_read_gbq_tokyo(
Expand Down Expand Up @@ -435,14 +433,14 @@ def test_read_pandas_tokyo(
pd.testing.assert_frame_equal(result, expected)


@skip_legacy_pandas
@utils.skip_legacy_pandas
def test_read_csv_gcs_default_engine(session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
if scalars_df.index.name is not None:
path = gcs_folder + "test_read_csv_gcs_default_engine_w_index*.csv"
else:
path = gcs_folder + "test_read_csv_gcs_default_engine_wo_index*.csv"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df.to_csv(path, index=False)
dtype = scalars_df.dtypes.to_dict()
dtype.pop("geography_col")
Expand Down Expand Up @@ -492,7 +490,7 @@ def test_read_csv_gcs_bq_engine(session, scalars_dfs, gcs_folder):
pytest.param("\t", id="custom_sep"),
],
)
@skip_legacy_pandas
@utils.skip_legacy_pandas
def test_read_csv_local_default_engine(session, scalars_dfs, sep):
scalars_df, scalars_pandas_df = scalars_dfs
with tempfile.TemporaryDirectory() as dir:
Expand Down Expand Up @@ -641,15 +639,15 @@ def test_read_csv_default_engine_throws_not_implemented_error(
gcs_folder
+ "test_read_csv_gcs_default_engine_throws_not_implemented_error*.csv"
)
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df_index.to_csv(path)
with pytest.raises(NotImplementedError, match=match):
session.read_csv(read_path, **kwargs)


def test_read_csv_gcs_default_engine_w_header(session, scalars_df_index, gcs_folder):
path = gcs_folder + "test_read_csv_gcs_default_engine_w_header*.csv"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df_index.to_csv(path)

# Skips header=N rows, normally considers the N+1th row as the header, but overridden by
Expand Down Expand Up @@ -716,7 +714,7 @@ def test_read_csv_gcs_default_engine_w_index_col_name(
session, scalars_df_default_index, gcs_folder
):
path = gcs_folder + "test_read_csv_gcs_default_engine_w_index_col_name*.csv"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df_default_index.to_csv(path)

df = session.read_csv(read_path, index_col="rowindex")
Expand All @@ -731,7 +729,7 @@ def test_read_csv_gcs_default_engine_w_index_col_index(
session, scalars_df_default_index, gcs_folder
):
path = gcs_folder + "test_read_csv_gcs_default_engine_w_index_col_index*.csv"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df_default_index.to_csv(path)

index_col = scalars_df_default_index.columns.to_list().index("rowindex")
Expand Down Expand Up @@ -790,7 +788,7 @@ def test_read_csv_local_default_engine_w_index_col_index(
def test_read_csv_gcs_w_usecols(session, scalars_df_index, gcs_folder, engine):
path = gcs_folder + "test_read_csv_gcs_w_usecols"
path = path + "_default_engine*.csv" if engine is None else path + "_bq_engine*.csv"
read_path = path.replace("*", FIRST_FILE) if engine is None else path
read_path = utils.get_first_file_from_wildcard(path) if engine is None else path
scalars_df_index.to_csv(path)

# df should only have 1 column which is bool_col.
Expand Down Expand Up @@ -902,7 +900,7 @@ def test_read_parquet_gcs(session: bigframes.Session, scalars_dfs, gcs_folder, e

# Only bigquery engine for reads supports wildcards in path name.
if engine != "bigquery":
path = path.replace("*", "000000000000")
path = utils.get_first_file_from_wildcard(path)

df_out = (
session.read_parquet(path, engine=engine)
Expand Down Expand Up @@ -1012,7 +1010,7 @@ def test_read_parquet_gcs_compression_not_supported(
def test_read_json_gcs_bq_engine(session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
path = gcs_folder + "test_read_json_gcs_bq_engine_w_index*.json"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df.to_json(path, index=False, lines=True, orient="records")
df = session.read_json(read_path, lines=True, orient="records", engine="bigquery")

Expand All @@ -1036,7 +1034,7 @@ def test_read_json_gcs_bq_engine(session, scalars_dfs, gcs_folder):
def test_read_json_gcs_default_engine(session, scalars_dfs, gcs_folder):
scalars_df, _ = scalars_dfs
path = gcs_folder + "test_read_json_gcs_default_engine_w_index*.json"
read_path = path.replace("*", FIRST_FILE)
read_path = utils.get_first_file_from_wildcard(path)
scalars_df.to_json(
path,
index=False,
Expand Down
4 changes: 4 additions & 0 deletions tests/system/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,7 @@ def delete_cloud_function(
request = functions_v2.DeleteFunctionRequest(name=full_name)
operation = functions_client.delete_function(request=request)
return operation


def get_first_file_from_wildcard(path):
return path.replace("*", "000000000000")

0 comments on commit 2b9a01d

Please sign in to comment.