Skip to content

Commit

Permalink
Fix pytests and make TS conversion conditional (#205)
Browse files Browse the repository at this point in the history
* Add wait flag for jobs, fix go proto path for dataset service

* Rebase with master

* Fix tests, update requirements

* Update min version for pandas

* Update integration testutils to use pandas nullable ints

* Fix warehouse validate int32

* Distinguish pandas dtype and dtype

* Add Int32 support for kafka producer

* Remove print
  • Loading branch information
zhilingc authored and feast-ci-bot committed May 28, 2019
1 parent 5bd6d4d commit 7548e61
Show file tree
Hide file tree
Showing 12 changed files with 45 additions and 33 deletions.
4 changes: 3 additions & 1 deletion integration-tests/testutils/kafka_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ def produce_feature_rows(
feature.id = info["id"]
feature_value = Value()
feature_name = info["name"]
if info["dtype"] is np.int64:
if info["dtype"] is "Int64":
feature_value.int64Val = row[feature_name]
elif info["dtype"] is "Int32":
feature_value.int32Val = row[feature_name]
elif info["dtype"] is np.float64:
feature_value.doubleVal = row[feature_name]
else:
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/testutils/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
pandas==0.23.*
pandas==0.24.*
numpy==1.15.*
kafka-python==1.4.*
4 changes: 2 additions & 2 deletions integration-tests/testutils/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ def get_entity_name(entity_spec_file):

def get_feature_infos(feature_specs_files):
value_type_to_dtype = {
"INT32": np.int64,
"INT64": np.int64,
"INT32": "Int32",
"INT64": "Int64",
"DOUBLE": np.float64,
"FLOAT": np.float64,
}
Expand Down
6 changes: 5 additions & 1 deletion integration-tests/testutils/validate_feature_values.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def validate_warehouse(
parse_dates=["event_timestamp"],
)

dtypes = {"event_timestamp": "datetime64[ns]"}
for f in feature_infos:
dtypes[f["name"]] = f["dtype"]

# TODO: Retrieve actual values via Feast Core rather than directly from BigQuery
# Need to change Python SDK so can retrieve values via Feast Core while
# "ensuring correct value types"
Expand All @@ -37,7 +41,7 @@ def validate_warehouse(
)
.sort_values(["id", "event_timestamp"])
.reset_index(drop=True)
.astype({"event_timestamp": "datetime64[ns]"})
.astype(dtypes)
)[expected.columns]

pd.testing.assert_frame_equal(expected, actual)
Expand Down
3 changes: 2 additions & 1 deletion sdk/python/feast/sdk/importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ def stage(self):
if not self.require_staging:
return
ts_col = self.spec.schema.timestampColumn
_convert_timestamp(self.df, ts_col)
if ts_col != "":
_convert_timestamp(self.df, ts_col)
df_to_gcs(self.df, self.remote_path)

def describe(self):
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/sdk/utils/bq_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def get_table_name(feature_id, storage_spec):
except KeyError:
raise ValueError("storage spec has empty project or dataset option")

table_name = "_".join(feature_id.split(".")[:2])
table_name = feature_id.split(".")[0]
return ".".join([project, dataset, table_name])


Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/sdk/utils/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
FEAST_VALUETYPE_TO_DTYPE = {
"bytesVal": np.byte,
"stringVal": np.object,
"int32Val": np.int64,
"int64Val": np.int64,
"int32Val": "Int32", # Use pandas nullable int type
"int64Val": "Int64", # Use pandas nullable int type
"doubleVal": np.float64,
"floatVal": np.float64,
"boolVal": np.bool,
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"google-cloud-storage>=1.13.0",
"googleapis-common-protos>=1.5.5",
"grpcio>=1.16.1",
"pandas",
"pandas>=0.24.0",
"protobuf>=3.0.0",
"PyYAML",
"fastavro>=0.21.19"
Expand Down
5 changes: 3 additions & 2 deletions sdk/python/test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ google-resumable-media==0.3.1
googleapis-common-protos>=1.5.5
grpcio>=1.16.1
numpy
pandas
pandas>=0.24.0
protobuf>=3.0.0
pytest
pytest-mock
PyYAML
PyYAML
fastavro>=0.21.23
2 changes: 1 addition & 1 deletion sdk/python/tests/sdk/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ def test_serving_response_to_df_with_missing_value(self, client):
})
expected_df = pd.DataFrame({'entity': ["1", "2"],
'entity.feat1': [1, 3],
'entity.feat2': [np.NaN, 4]}) \
'entity.feat2': [np.nan, 4]}) \
.reset_index(drop=True)
df = client._response_to_df(FeatureSet("entity", ["entity.feat1",
"entity.feat2"]),
Expand Down
25 changes: 23 additions & 2 deletions sdk/python/tests/sdk/test_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,8 @@ def test_from_df(self):
assert feature.id == "driver." + feature.name

import_spec = importer.spec
assert import_spec.type == "file"
assert import_spec.type == "file.csv"
assert import_spec.sourceOptions == {
"format": "csv",
"path": importer.remote_path
}
assert import_spec.entities == ["driver"]
Expand All @@ -182,6 +181,27 @@ def test_from_df(self):
assert col == field.name
if col in feature_columns:
assert field.featureId == "driver." + col

def test_stage_df_without_timestamp(self, mocker):
mocker.patch("feast.sdk.importer.df_to_gcs", return_value=True)
feature_columns = [
"avg_distance_completed", "avg_customer_distance_completed",
"avg_distance_cancelled"
]
csv_path = "tests/data/driver_features.csv"
entity_name = "driver"
owner = "[email protected]"
staging_location = "gs://test-bucket"
id_column = "driver_id"
importer = Importer.from_csv(
path=csv_path,
entity=entity_name,
owner=owner,
staging_location=staging_location,
id_column=id_column,
feature_columns=feature_columns)

importer.stage()

def _validate_csv_importer(self,
importer,
Expand Down Expand Up @@ -228,6 +248,7 @@ def _validate_csv_importer(self,
if col in feature_columns:
assert field.featureId == '{}.{}'.format(entity_name,
col).lower()



class TestHelpers:
Expand Down
19 changes: 1 addition & 18 deletions sdk/python/tests/sdk/utils/test_bq_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def test_get_table_name():
)
assert (
get_table_name(feature_id, storage_spec)
== "my_project.my_dataset.myentity_none"
== "my_project.my_dataset.myentity"
)


Expand All @@ -48,23 +48,6 @@ def test_get_table_name_not_bq():
with pytest.raises(ValueError, match="storage spec is not BigQuery storage spec"):
get_table_name(feature_id, storage_spec)


@pytest.mark.skipif(
os.getenv("SKIP_BIGQUERY_TEST") is not None,
reason="SKIP_BIGQUERY_TEST is set in the environment",
)
def test_query_to_dataframe():
with open(
os.path.join(testdata_path, "austin_bikeshare.bikeshare_stations.avro"), "rb"
) as expected_file:
avro_reader = fastavro.reader(expected_file)
expected = pd.DataFrame.from_records(avro_reader)

query = "SELECT * FROM `bigquery-public-data.austin_bikeshare.bikeshare_stations`"
actual = query_to_dataframe(query)
assert expected.equals(actual)


@pytest.mark.skipif(
os.getenv("SKIP_BIGQUERY_TEST") is not None,
reason="SKIP_BIGQUERY_TEST is set in the environment",
Expand Down

0 comments on commit 7548e61

Please sign in to comment.