From 7cb6f17fe38ece49d8f5baf411a6a49e76b50bb7 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Tue, 13 Oct 2020 16:57:24 +0800 Subject: [PATCH 1/3] timestamp_column -> event_timestamp_column; new created_timestamp_column Signed-off-by: Oleksii Moskalenko --- .../java/feast/common/it/DataGenerator.java | 6 +- .../java/feast/core/model/DataSource.java | 11 +- ...__Data_Source_Created_Timestamp_Column.sql | 1 + go.mod | 2 +- go.sum | 2 + protos/feast/core/DataSource.proto | 7 +- sdk/python/feast/client.py | 8 +- sdk/python/feast/data_source.py | 149 +++++++++--------- sdk/python/feast/pyspark/aws/jobs.py | 6 +- sdk/python/feast/staging/storage_client.py | 4 +- sdk/python/tests/test_client.py | 39 ++--- sdk/python/tests/test_feature_table.py | 6 +- .../scala/feast/ingestion/BasePipeline.scala | 8 +- .../scala/feast/ingestion/BatchPipeline.scala | 4 +- .../scala/feast/ingestion/IngestionJob.scala | 4 +- .../feast/ingestion/IngestionJobConfig.scala | 24 ++- .../feast/ingestion/StreamingPipeline.scala | 4 +- .../ingestion/sources/bq/BigQueryReader.scala | 4 +- .../ingestion/sources/file/FileReader.scala | 4 +- .../feast/ingestion/StreamingPipelineIT.scala | 6 +- 20 files changed, 162 insertions(+), 137 deletions(-) create mode 100644 core/src/main/resources/db/migration/V2.9__Data_Source_Created_Timestamp_Column.sql diff --git a/common-test/src/main/java/feast/common/it/DataGenerator.java b/common-test/src/main/java/feast/common/it/DataGenerator.java index 236e516432..5fc4f6aa12 100644 --- a/common-test/src/main/java/feast/common/it/DataGenerator.java +++ b/common-test/src/main/java/feast/common/it/DataGenerator.java @@ -243,7 +243,7 @@ public static DataSource createFileDataSourceSpec( .setType(DataSource.SourceType.BATCH_FILE) .setFileOptions( FileOptions.newBuilder().setFileFormat(fileFormat).setFileUrl(fileURL).build()) - .setTimestampColumn(timestampColumn) + .setEventTimestampColumn(timestampColumn) .setDatePartitionColumn(datePartitionColumn) .build(); } @@ -253,7 +253,7 @@ public static DataSource createBigQueryDataSourceSpec( return DataSource.newBuilder() .setType(DataSource.SourceType.BATCH_BIGQUERY) .setBigqueryOptions(BigQueryOptions.newBuilder().setTableRef(bigQueryTableRef).build()) - .setTimestampColumn(timestampColumn) + .setEventTimestampColumn(timestampColumn) .setDatePartitionColumn(datePartitionColumn) .build(); } @@ -268,7 +268,7 @@ public static DataSource createKafkaDataSourceSpec( .setBootstrapServers(servers) .setClassPath(classPath) .build()) - .setTimestampColumn(timestampColumn) + .setEventTimestampColumn(timestampColumn) .build(); } } diff --git a/core/src/main/java/feast/core/model/DataSource.java b/core/src/main/java/feast/core/model/DataSource.java index a72906c073..6bfb5f0303 100644 --- a/core/src/main/java/feast/core/model/DataSource.java +++ b/core/src/main/java/feast/core/model/DataSource.java @@ -63,7 +63,10 @@ public class DataSource { private String fieldMapJSON; @Column(name = "timestamp_column") - private String timestampColumn; + private String eventTimestampColumn; + + @Column(name = "created_timestamp_column") + private String createdTimestampColumn; @Column(name = "date_partition_column") private String datePartitionColumn; @@ -115,7 +118,8 @@ public static DataSource fromProto(DataSourceProto.DataSource spec) { source.setFieldMapJSON(TypeConversion.convertMapToJsonString(spec.getFieldMappingMap())); // Set timestamp mapping columns - source.setTimestampColumn(spec.getTimestampColumn()); + source.setEventTimestampColumn(spec.getEventTimestampColumn()); + source.setCreatedTimestampColumn(spec.getCreatedTimestampColumn()); source.setDatePartitionColumn(spec.getDatePartitionColumn()); return source; @@ -163,7 +167,8 @@ public DataSourceProto.DataSource toProto() { // Parse field mapping and options from JSON spec.putAllFieldMapping(TypeConversion.convertJsonStringToMap(getFieldMapJSON())); - spec.setTimestampColumn(getTimestampColumn()); + spec.setEventTimestampColumn(getEventTimestampColumn()); + spec.setCreatedTimestampColumn(getCreatedTimestampColumn()); spec.setDatePartitionColumn(getDatePartitionColumn()); return spec.build(); diff --git a/core/src/main/resources/db/migration/V2.9__Data_Source_Created_Timestamp_Column.sql b/core/src/main/resources/db/migration/V2.9__Data_Source_Created_Timestamp_Column.sql new file mode 100644 index 0000000000..ddbeffc584 --- /dev/null +++ b/core/src/main/resources/db/migration/V2.9__Data_Source_Created_Timestamp_Column.sql @@ -0,0 +1 @@ +ALTER TABLE data_sources ADD COLUMN created_timestamp_column character varying(255); \ No newline at end of file diff --git a/go.mod b/go.mod index 137fea3e19..fa19e694c8 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect golang.org/x/net v0.0.0-20200822124328-c89045814202 golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9 // indirect - golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b // indirect + golang.org/x/tools v0.0.0-20201013053347-2db1cd791039 // indirect google.golang.org/grpc v1.29.1 google.golang.org/protobuf v1.25.0 // indirect gopkg.in/russross/blackfriday.v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index c8b8ce2c30..5b6fd4a78e 100644 --- a/go.sum +++ b/go.sum @@ -490,6 +490,8 @@ golang.org/x/tools v0.0.0-20200929223013-bf155c11ec6f h1:7+Nz9MyPqt2qMCTvNiRy1G0 golang.org/x/tools v0.0.0-20200929223013-bf155c11ec6f/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b h1:07IVqnnzaip3TGyl/cy32V5YP3FguWG4BybYDTBNpm0= golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= +golang.org/x/tools v0.0.0-20201013053347-2db1cd791039 h1:kLBxO4OPBgPwjg8Vvu+/0DCHIfDwYIGNFcD66NU9kpo= +golang.org/x/tools v0.0.0-20201013053347-2db1cd791039/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index d7f1104435..c3c0f3fc90 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -38,13 +38,16 @@ message DataSource { // and fields in parent FeatureTable. map field_mapping = 2; - // Must specify timestamp column name - string timestamp_column = 3; + // Must specify event timestamp column name + string event_timestamp_column = 3; // (Optional) Specify partition column // useful for file sources string date_partition_column = 4; + // Must specify creation timestamp column name + string created_timestamp_column = 5; + // Defines options for DataSource that sources features from a file message FileOptions { // File Format of the file containing the features diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index cf88311e3a..d023795ae5 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -656,7 +656,7 @@ def ingest( _check_field_mappings( column_names, name, - feature_table.batch_source.timestamp_column, + feature_table.batch_source.event_timestamp_column, feature_table.batch_source.field_mapping, ) @@ -671,7 +671,7 @@ def ingest( column_names, pyarrow_table, feature_table.batch_source.date_partition_column, - feature_table.batch_source.timestamp_column, + feature_table.batch_source.event_timestamp_column, ) else: dir_path, dest_path = _write_non_partitioned_table_from_source( @@ -680,12 +680,12 @@ def ingest( try: if issubclass(type(feature_table.batch_source), FileSource): - file_url = feature_table.batch_source.file_options.file_url[:-1] + file_url = feature_table.batch_source.file_options.file_url.rstrip("*") _upload_to_file_source(file_url, with_partitions, dest_path) if issubclass(type(feature_table.batch_source), BigQuerySource): bq_table_ref = feature_table.batch_source.bigquery_options.table_ref feature_table_timestamp_column = ( - feature_table.batch_source.timestamp_column + feature_table.batch_source.event_timestamp_column ) _upload_to_bq_source( diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 04f4752c37..421d3d5c5b 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -352,11 +352,13 @@ class DataSource: def __init__( self, - timestamp_column: str, + event_timestamp_column: str, + created_timestamp_column: str, field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", ): - self._timestamp_column = timestamp_column + self._event_timestamp_column = event_timestamp_column + self._created_timestamp_column = created_timestamp_column self._field_mapping = field_mapping self._date_partition_column = date_partition_column @@ -365,7 +367,8 @@ def __eq__(self, other): raise TypeError("Comparisons should only involve DataSource class objects.") if ( - self.timestamp_column != other.timestamp_column + self.event_timestamp_column != other.event_timestamp_column + or self.created_timestamp_column != other.created_timestamp_column or self.field_mapping != other.field_mapping or self.date_partition_column != other.date_partition_column ): @@ -388,18 +391,32 @@ def field_mapping(self, field_mapping): self._field_mapping = field_mapping @property - def timestamp_column(self): + def event_timestamp_column(self): """ - Returns the timestamp column of this data source + Returns the event timestamp column of this data source """ - return self._timestamp_column + return self._event_timestamp_column - @timestamp_column.setter - def timestamp_column(self, timestamp_column): + @event_timestamp_column.setter + def event_timestamp_column(self, event_timestamp_column): """ - Sets the timestamp column of this data source + Sets the event timestamp column of this data source """ - self._timestamp_column = timestamp_column + self._event_timestamp_column = event_timestamp_column + + @property + def created_timestamp_column(self): + """ + Returns the created timestamp column of this data source + """ + return self._created_timestamp_column + + @created_timestamp_column.setter + def created_timestamp_column(self, created_timestamp_column): + """ + Sets the event timestamp column of this data source + """ + self._created_timestamp_column = created_timestamp_column @property def date_partition_column(self): @@ -426,14 +443,16 @@ def from_proto(data_source): field_mapping=data_source.field_mapping, file_format=data_source.file_options.file_format, file_url=data_source.file_options.file_url, - timestamp_column=data_source.timestamp_column, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) elif data_source.bigquery_options.table_ref: data_source_obj = BigQuerySource( field_mapping=data_source.field_mapping, table_ref=data_source.bigquery_options.table_ref, - timestamp_column=data_source.timestamp_column, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) elif ( @@ -446,7 +465,8 @@ def from_proto(data_source): bootstrap_servers=data_source.kafka_options.bootstrap_servers, class_path=data_source.kafka_options.class_path, topic=data_source.kafka_options.topic, - timestamp_column=data_source.timestamp_column, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) elif ( @@ -459,7 +479,8 @@ def from_proto(data_source): class_path=data_source.kinesis_options.class_path, region=data_source.kinesis_options.region, stream_name=data_source.kinesis_options.stream_name, - timestamp_column=data_source.timestamp_column, + event_timestamp_column=data_source.event_timestamp_column, + created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) else: @@ -477,13 +498,19 @@ def to_proto(self) -> DataSourceProto: class FileSource(DataSource): def __init__( self, - timestamp_column: str, + event_timestamp_column: str, + created_timestamp_column: str, file_format: str, file_url: str, field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", ): - super().__init__(timestamp_column, field_mapping, date_partition_column) + super().__init__( + event_timestamp_column, + created_timestamp_column, + field_mapping, + date_partition_column, + ) self._file_options = FileOptions(file_format=file_format, file_url=file_url) def __eq__(self, other): @@ -512,18 +539,6 @@ def file_options(self, file_options): """ self._file_options = file_options - @classmethod - def from_proto(cls, data_source_proto): - - data_source = cls( - field_mapping=data_source_proto.field_mapping, - file_options=cls.file_options, - timestamp_column=data_source_proto.timestamp_column, - date_partition_column=data_source_proto.date_partition_column, - ) - - return data_source - def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.BATCH_FILE, @@ -531,7 +546,8 @@ def to_proto(self) -> DataSourceProto: file_options=self.file_options.to_proto(), ) - data_source_proto.timestamp_column = self.timestamp_column + data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto @@ -540,12 +556,18 @@ def to_proto(self) -> DataSourceProto: class BigQuerySource(DataSource): def __init__( self, - timestamp_column: str, + event_timestamp_column: str, + created_timestamp_column: str, table_ref: str, field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", ): - super().__init__(timestamp_column, field_mapping, date_partition_column) + super().__init__( + event_timestamp_column, + created_timestamp_column, + field_mapping, + date_partition_column, + ) self._bigquery_options = BigQueryOptions(table_ref=table_ref,) def __eq__(self, other): @@ -573,18 +595,6 @@ def bigquery_options(self, bigquery_options): """ self._bigquery_options = bigquery_options - @classmethod - def from_proto(cls, data_source_proto): - - data_source = cls( - field_mapping=data_source_proto.field_mapping, - bigquery_options=cls.bigquery_options, - timestamp_column=data_source_proto.timestamp_column, - date_partition_column=data_source_proto.date_partition_column, - ) - - return data_source - def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.BATCH_BIGQUERY, @@ -592,7 +602,8 @@ def to_proto(self) -> DataSourceProto: bigquery_options=self.bigquery_options.to_proto(), ) - data_source_proto.timestamp_column = self.timestamp_column + data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto @@ -601,14 +612,20 @@ def to_proto(self) -> DataSourceProto: class KafkaSource(DataSource): def __init__( self, - timestamp_column: str, + event_timestamp_column: str, + created_timestamp_column: str, bootstrap_servers: str, class_path: str, topic: str, field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", ): - super().__init__(timestamp_column, field_mapping, date_partition_column) + super().__init__( + event_timestamp_column, + created_timestamp_column, + field_mapping, + date_partition_column, + ) self._kafka_options = KafkaOptions( bootstrap_servers=bootstrap_servers, class_path=class_path, topic=topic ) @@ -643,18 +660,6 @@ def kafka_options(self, kafka_options): """ self._kafka_options = kafka_options - @classmethod - def from_proto(cls, data_source_proto): - - data_source = cls( - field_mapping=data_source_proto.field_mapping, - kafka_options=cls.kafka_options, - timestamp_column=data_source_proto.timestamp_column, - date_partition_column=data_source_proto.date_partition_column, - ) - - return data_source - def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.STREAM_KAFKA, @@ -662,7 +667,8 @@ def to_proto(self) -> DataSourceProto: kafka_options=self.kafka_options.to_proto(), ) - data_source_proto.timestamp_column = self.timestamp_column + data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto @@ -671,14 +677,20 @@ def to_proto(self) -> DataSourceProto: class KinesisSource(DataSource): def __init__( self, - timestamp_column: str, + event_timestamp_column: str, + created_timestamp_column: str, class_path: str, region: str, stream_name: str, field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", ): - super().__init__(timestamp_column, field_mapping, date_partition_column) + super().__init__( + event_timestamp_column, + created_timestamp_column, + field_mapping, + date_partition_column, + ) self._kinesis_options = KinesisOptions( class_path=class_path, region=region, stream_name=stream_name ) @@ -712,18 +724,6 @@ def kinesis_options(self, kinesis_options): """ self._kinesis_options = kinesis_options - @classmethod - def from_proto(cls, data_source_proto): - - data_source = cls( - field_mapping=data_source_proto.field_mapping, - kinesis_options=cls.kinesis_options, - timestamp_column=data_source_proto.timestamp_column, - date_partition_column=data_source_proto.date_partition_column, - ) - - return data_source - def to_proto(self) -> DataSourceProto: data_source_proto = DataSourceProto( type=DataSourceProto.STREAM_KINESIS, @@ -731,7 +731,8 @@ def to_proto(self) -> DataSourceProto: kinesis_options=self.kinesis_options.to_proto(), ) - data_source_proto.timestamp_column = self.timestamp_column + data_source_proto.event_timestamp_column = self.event_timestamp_column + data_source_proto.created_timestamp_column = self.created_timestamp_column data_source_proto.date_partition_column = self.date_partition_column return data_source_proto diff --git a/sdk/python/feast/pyspark/aws/jobs.py b/sdk/python/feast/pyspark/aws/jobs.py index a783212186..a69c87e569 100644 --- a/sdk/python/feast/pyspark/aws/jobs.py +++ b/sdk/python/feast/pyspark/aws/jobs.py @@ -134,8 +134,10 @@ def _batch_source_to_json(batch_source): return { "file": { "path": batch_source.file_options.file_url, - "mapping": dict(batch_source.field_mapping), - "timestampColumn": batch_source.timestamp_column, + "field_mapping": dict(batch_source.field_mapping), + "event_timestamp_column": batch_source.event_timestamp_column, + "created_timestamp_column": batch_source.created_timestamp_column, + "date_partition_column": batch_source.date_partition_column, } } diff --git a/sdk/python/feast/staging/storage_client.py b/sdk/python/feast/staging/storage_client.py index a10558b38c..a24a4c6357 100644 --- a/sdk/python/feast/staging/storage_client.py +++ b/sdk/python/feast/staging/storage_client.py @@ -229,8 +229,8 @@ def download_file(self, uri: ParseResult) -> IO[bytes]: def list_files(self, bucket: str, path: str) -> List[str]: raise NotImplementedError("list files not implemented for Local file") - def upload_file(self, local_path: str, folder: str, remote_path: str): - dest_fpath = os.path.join(folder + "/" + remote_path) + def upload_file(self, local_path: str, bucket: str, remote_path: str): + dest_fpath = "/" + remote_path os.makedirs(os.path.dirname(dest_fpath), exist_ok=True) shutil.copy(local_path, dest_fpath) diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 05e598ec34..3d7fbab730 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -390,7 +390,8 @@ def test_apply_feature_table_success(self, test_client): batch_source = FileSource( file_format="parquet", file_url="file://feast/*", - timestamp_column="ts_col", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", date_partition_column="date_partition_col", ) @@ -398,7 +399,8 @@ def test_apply_feature_table_success(self, test_client): bootstrap_servers="localhost:9094", class_path="random/path/to/class", topic="test_topic", - timestamp_column="ts_col", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", ) ft1 = FeatureTable( @@ -438,7 +440,9 @@ def test_apply_feature_table_success(self, test_client): @pytest.mark.parametrize( "mocked_client", [lazy_fixture("mock_client")], ) - def test_ingest_dataframe_partition(self, mocked_client, mocker, partitioned_df): + def test_ingest_dataframe_partition( + self, mocked_client, mocker, partitioned_df, tmp_path + ): """ Test ingestion with local FileSource, using DataFrame. Partition column stated but not provided in Dataset. @@ -451,7 +455,7 @@ def test_ingest_dataframe_partition(self, mocked_client, mocker, partitioned_df) mocked_client._core_service_stub, "GetFeatureTable", return_value=_ingest_test_getfeaturetable_mocked_resp( - "file://feast/*", "datetime_col" + f"file://{tmp_path}", "date" ), ) @@ -459,8 +463,7 @@ def test_ingest_dataframe_partition(self, mocked_client, mocker, partitioned_df) ft = mocked_client.get_feature_table("ingest_featuretable") mocked_client.ingest(ft, partitioned_df, timeout=600) - dest_fpath = os.path.join("feast/") - pq_df = pq.read_table(dest_fpath).to_pandas() + pq_df = pq.read_table(tmp_path).to_pandas().drop(columns=["date"]) partitioned_df, pq_df = _ingest_test_format_dataframes( partitioned_df, pq_df, True @@ -472,7 +475,7 @@ def test_ingest_dataframe_partition(self, mocked_client, mocker, partitioned_df) "mocked_client", [lazy_fixture("mock_client")], ) def test_ingest_dataframe_no_partition( - self, mocked_client, mocker, non_partitioned_df + self, mocked_client, mocker, non_partitioned_df, tmp_path ): """ Test ingestion with local FileSource, using DataFrame. @@ -485,7 +488,7 @@ def test_ingest_dataframe_no_partition( mocker.patch.object( mocked_client._core_service_stub, "GetFeatureTable", - return_value=_ingest_test_getfeaturetable_mocked_resp("file://feast2/*"), + return_value=_ingest_test_getfeaturetable_mocked_resp(f"file://{tmp_path}"), ) mocked_client.set_project("my_project") @@ -493,13 +496,10 @@ def test_ingest_dataframe_no_partition( mocked_client.ingest(ft, non_partitioned_df, timeout=600) # Since not partitioning, we're only looking for single file - dest_fpath = os.path.join("feast2/") single_file = [ - f - for f in os.listdir(dest_fpath) - if os.path.isfile(os.path.join(dest_fpath, f)) + f for f in os.listdir(tmp_path) if os.path.isfile(os.path.join(tmp_path, f)) ][0] - pq_df = pq.read_table(dest_fpath + single_file).to_pandas() + pq_df = pq.read_table(tmp_path / single_file).to_pandas() non_partitioned_df, pq_df = _ingest_test_format_dataframes( non_partitioned_df, pq_df @@ -510,7 +510,7 @@ def test_ingest_dataframe_no_partition( @pytest.mark.parametrize( "mocked_client", [lazy_fixture("mock_client")], ) - def test_ingest_csv(self, mocked_client, mocker): + def test_ingest_csv(self, mocked_client, mocker, tmp_path): """ Test ingestion with local FileSource, using CSV file. Partition column is provided. @@ -523,7 +523,7 @@ def test_ingest_csv(self, mocked_client, mocker): mocked_client._core_service_stub, "GetFeatureTable", return_value=_ingest_test_getfeaturetable_mocked_resp( - "file://feast3/*", "datetime_col" + f"file://{tmp_path}", "date" ), ) @@ -531,15 +531,15 @@ def test_ingest_csv(self, mocked_client, mocker): os.path.join( os.path.dirname(os.path.realpath(__file__)), "./data/dev_featuretable.csv", - ) + ), + parse_dates=["datetime"], ) mocked_client.set_project("my_project") ft = mocked_client.get_feature_table("ingest_featuretable") mocked_client.ingest(ft, partitioned_df, timeout=600) - dest_fpath = os.path.join("feast3/") - pq_df = pq.read_table(dest_fpath).to_pandas() + pq_df = pq.read_table(tmp_path).to_pandas().drop(columns=["date"]) partitioned_df, pq_df = _ingest_test_format_dataframes( partitioned_df, pq_df, True @@ -640,7 +640,8 @@ def _ingest_test_getfeaturetable_mocked_resp( file_options=DataSourceProto.FileOptions( file_format="parquet", file_url=file_url ), - timestamp_column="datetime", + event_timestamp_column="datetime", + created_timestamp_column="timestamp", date_partition_column=date_partition_col if date_partition_col is not None else None, diff --git a/sdk/python/tests/test_feature_table.py b/sdk/python/tests/test_feature_table.py index 7a50b7e58f..0dd8bb1717 100644 --- a/sdk/python/tests/test_feature_table.py +++ b/sdk/python/tests/test_feature_table.py @@ -61,7 +61,8 @@ def test_feature_table_import_export_yaml(self): }, file_format="parquet", file_url="file://feast/*", - timestamp_column="ts_col", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", date_partition_column="date_partition_col", ) @@ -73,7 +74,8 @@ def test_feature_table_import_export_yaml(self): bootstrap_servers="localhost:9094", class_path="random/path/to/class", topic="test_topic", - timestamp_column="ts_col", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", ) test_feature_table = FeatureTable( diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala index bb43d160ce..91b274d1ef 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/BasePipeline.scala @@ -79,13 +79,13 @@ trait BasePipeline { entities: Seq[Field] ): Array[Column] = { val featureColumns = features - .filter(f => !source.mapping.contains(f.name)) - .map(f => (f.name, f.name)) ++ source.mapping + .filter(f => !source.fieldMapping.contains(f.name)) + .map(f => (f.name, f.name)) ++ source.fieldMapping - val timestampColumn = Seq((source.timestampColumn, source.timestampColumn)) + val timestampColumn = Seq((source.eventTimestampColumn, source.eventTimestampColumn)) val entitiesColumns = entities - .filter(e => !source.mapping.contains(e.name)) + .filter(e => !source.fieldMapping.contains(e.name)) .map(e => (e.name, e.name)) (featureColumns ++ entitiesColumns ++ timestampColumn).map { case (alias, source) => diff --git a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala index d52c565918..c7019a5cd1 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/BatchPipeline.scala @@ -35,7 +35,7 @@ object BatchPipeline extends BasePipeline { val featureTable = config.featureTable val projection = inputProjection(config.source, featureTable.features, featureTable.entities) - val validator = new RowValidator(featureTable, config.source.timestampColumn) + val validator = new RowValidator(featureTable, config.source.eventTimestampColumn) val input = config.source match { case source: BQSource => @@ -70,7 +70,7 @@ object BatchPipeline extends BasePipeline { .option("entity_columns", featureTable.entities.map(_.name).mkString(",")) .option("namespace", featureTable.name) .option("project_name", featureTable.project) - .option("timestamp_column", config.source.timestampColumn) + .option("timestamp_column", config.source.eventTimestampColumn) .save() config.deadLetterPath match { diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala index 72c611810d..82168dbe81 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJob.scala @@ -39,7 +39,7 @@ object IngestionJob { opt[String](name = "source") .action((x, c) => - parseJSON(x).extract[Sources] match { + parseJSON(x).camelizeKeys.extract[Sources] match { case Sources(file: Some[FileSource], _, _) => c.copy(source = file.get) case Sources(_, bq: Some[BQSource], _) => c.copy(source = bq.get) case Sources(_, _, kafka: Some[KafkaSource]) => c.copy(source = kafka.get) @@ -49,7 +49,7 @@ object IngestionJob { .text("JSON-encoded source object (e.g. {\"kafka\":{\"bootstrapServers\":...}}") opt[String](name = "feature-table") - .action((x, c) => c.copy(featureTable = parseJSON(x).extract[FeatureTable])) + .action((x, c) => c.copy(featureTable = parseJSON(x).camelizeKeys.extract[FeatureTable])) .required() .text("JSON-encoded FeatureTableSpec object") diff --git a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala index c024013514..f13298ef2d 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/IngestionJobConfig.scala @@ -33,9 +33,11 @@ abstract class MetricConfig case class StatsDConfig(host: String, port: Int) extends MetricConfig abstract class Source { - def mapping: Map[String, String] + def fieldMapping: Map[String, String] - def timestampColumn: String + def eventTimestampColumn: String + def createdTimestampColumn: Option[String] + def datePartitionColumn: Option[String] } abstract class BatchSource extends Source @@ -46,24 +48,30 @@ abstract class StreamingSource extends Source { case class FileSource( path: String, - override val mapping: Map[String, String], - override val timestampColumn: String + override val fieldMapping: Map[String, String], + override val eventTimestampColumn: String, + override val createdTimestampColumn: Option[String] = None, + override val datePartitionColumn: Option[String] = None ) extends BatchSource case class BQSource( project: String, dataset: String, table: String, - override val mapping: Map[String, String], - override val timestampColumn: String + override val fieldMapping: Map[String, String], + override val eventTimestampColumn: String, + override val createdTimestampColumn: Option[String] = None, + override val datePartitionColumn: Option[String] = None ) extends BatchSource case class KafkaSource( bootstrapServers: String, topic: String, override val classpath: String, - override val mapping: Map[String, String], - override val timestampColumn: String + override val fieldMapping: Map[String, String], + override val eventTimestampColumn: String, + override val createdTimestampColumn: Option[String] = None, + override val datePartitionColumn: Option[String] = None ) extends StreamingSource case class Sources( diff --git a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala index f9b6456d48..576849f4b2 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/StreamingPipeline.scala @@ -42,7 +42,7 @@ object StreamingPipeline extends BasePipeline with Serializable { val featureTable = config.featureTable val projection = inputProjection(config.source, featureTable.features, featureTable.entities) - val validator = new RowValidator(featureTable, config.source.timestampColumn) + val validator = new RowValidator(featureTable, config.source.eventTimestampColumn) val messageParser = protoParser(sparkSession, config.source.asInstanceOf[StreamingSource].classpath) @@ -79,7 +79,7 @@ object StreamingPipeline extends BasePipeline with Serializable { .option("entity_columns", featureTable.entities.map(_.name).mkString(",")) .option("namespace", featureTable.name) .option("project_name", featureTable.project) - .option("timestamp_column", config.source.timestampColumn) + .option("timestamp_column", config.source.eventTimestampColumn) .save() config.deadLetterPath match { diff --git a/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala b/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala index 5dc5a7fddd..05264bd55a 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/sources/bq/BigQueryReader.scala @@ -33,7 +33,7 @@ object BigQueryReader { sqlContext.read .format("bigquery") .load(s"${source.project}.${source.dataset}.${source.table}") - .filter(col(source.timestampColumn) >= new Timestamp(start.getMillis)) - .filter(col(source.timestampColumn) < new Timestamp(end.getMillis)) + .filter(col(source.eventTimestampColumn) >= new Timestamp(start.getMillis)) + .filter(col(source.eventTimestampColumn) < new Timestamp(end.getMillis)) } } diff --git a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala index e099a87aa3..5bad029b5b 100644 --- a/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala +++ b/spark/ingestion/src/main/scala/feast/ingestion/sources/file/FileReader.scala @@ -32,7 +32,7 @@ object FileReader { ): DataFrame = { sqlContext.read .parquet(source.path) - .filter(col(source.timestampColumn) >= new Timestamp(start.getMillis)) - .filter(col(source.timestampColumn) < new Timestamp(end.getMillis)) + .filter(col(source.eventTimestampColumn) >= new Timestamp(start.getMillis)) + .filter(col(source.eventTimestampColumn) < new Timestamp(end.getMillis)) } } diff --git a/spark/ingestion/src/test/scala/feast/ingestion/StreamingPipelineIT.scala b/spark/ingestion/src/test/scala/feast/ingestion/StreamingPipelineIT.scala index 7bb0edd615..5cb72cfa15 100644 --- a/spark/ingestion/src/test/scala/feast/ingestion/StreamingPipelineIT.scala +++ b/spark/ingestion/src/test/scala/feast/ingestion/StreamingPipelineIT.scala @@ -105,8 +105,8 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer { bootstrapServers = kafkaContainer.bootstrapServers, topic = "topic", classpath = "com.example.protos.TestMessage", - mapping = Map.empty, - timestampColumn = "event_timestamp" + fieldMapping = Map.empty, + eventTimestampColumn = "event_timestamp" ) val featureKeyEncoder: String => String = encodeFeatureKey(config.featureTable) } @@ -160,7 +160,7 @@ class StreamingPipelineIT extends SparkSpec with ForAllTestContainer { val configWithKafka = config.copy( source = kafkaSource.copy( classpath = "com.example.protos.AllTypesMessage", - mapping = Map( + fieldMapping = Map( "map_value" -> "map.key", "inner_double" -> "inner.double", "inner_float" -> "inner.float", From 1e6e9946920dac067ab71e836b3f396f981d73dd Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Tue, 13 Oct 2020 17:38:50 +0800 Subject: [PATCH 2/3] e2e tests Signed-off-by: Oleksii Moskalenko --- sdk/python/feast/loaders/file.py | 6 +----- tests/e2e/test-register.py | 15 +++++++++++---- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/sdk/python/feast/loaders/file.py b/sdk/python/feast/loaders/file.py index cad82cbced..2327f9d17e 100644 --- a/sdk/python/feast/loaders/file.py +++ b/sdk/python/feast/loaders/file.py @@ -64,13 +64,9 @@ def export_source_to_staging_location( # Prepare Avro file to be exported to staging location if isinstance(source, pd.DataFrame): - # DataFrame provided as a source - uri_path = None # type: Optional[str] - if uri.scheme == "file": - uri_path = uri.path # Remote gs staging location provided by serving dir_path, file_name, source_path = export_dataframe_to_local( - df=source, dir_path=uri_path + df=source ) elif isinstance(source, str): source_uri = urlparse(source) diff --git a/tests/e2e/test-register.py b/tests/e2e/test-register.py index 20eb670eed..1bb64a711b 100644 --- a/tests/e2e/test-register.py +++ b/tests/e2e/test-register.py @@ -69,7 +69,8 @@ def basic_featuretable(): }, file_format="PARQUET", file_url="gs://example/feast/*", - timestamp_column="datetime_col", + event_timestamp_column="datetime_col", + created_timestamp_column="timestamp", date_partition_column="datetime", ) stream_source = KafkaSource( @@ -81,7 +82,8 @@ def basic_featuretable(): bootstrap_servers="localhost:9094", class_path="random/path/to/class", topic="test_topic", - timestamp_column="datetime_col", + event_timestamp_column="datetime_col", + created_timestamp_column="timestamp", ) return FeatureTable( name="basic_featuretable", @@ -112,7 +114,11 @@ def bq_dataset(): @pytest.fixture def bq_featuretable(bq_table_id): - batch_source = BigQuerySource(table_ref=bq_table_id, timestamp_column="datetime",) + batch_source = BigQuerySource( + table_ref=bq_table_id, + event_timestamp_column="datetime", + created_timestamp_column="timestamp", + ) return FeatureTable( name="basic_featuretable", entities=["driver_id", "customer_id"], @@ -140,7 +146,8 @@ def alltypes_featuretable(): batch_source = FileSource( file_format="parquet", file_url="file://feast/*", - timestamp_column="ts_col", + event_timestamp_column="ts_col", + created_timestamp_column="timestamp", date_partition_column="date_partition_col", ) return FeatureTable( From 091eece1ccf736600ddf7f700d9adeb3d657e12e Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Tue, 13 Oct 2020 17:43:01 +0800 Subject: [PATCH 3/3] lint Signed-off-by: Oleksii Moskalenko --- sdk/python/feast/loaders/file.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdk/python/feast/loaders/file.py b/sdk/python/feast/loaders/file.py index 2327f9d17e..b0692457ee 100644 --- a/sdk/python/feast/loaders/file.py +++ b/sdk/python/feast/loaders/file.py @@ -65,9 +65,7 @@ def export_source_to_staging_location( # Prepare Avro file to be exported to staging location if isinstance(source, pd.DataFrame): # Remote gs staging location provided by serving - dir_path, file_name, source_path = export_dataframe_to_local( - df=source - ) + dir_path, file_name, source_path = export_dataframe_to_local(df=source) elif isinstance(source, str): source_uri = urlparse(source) if source_uri.scheme in ["", "file"]: