Skip to content

Commit

Permalink
Add created_timestamp_column to DataSource. Rename timestamp_column -…
Browse files Browse the repository at this point in the history
…> event_timestamp_column (#1048)

* timestamp_column -> event_timestamp_column; new created_timestamp_column

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* e2e tests

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>

* lint

Signed-off-by: Oleksii Moskalenko <moskalenko.alexey@gmail.com>
pyalex authored Oct 13, 2020
1 parent ccafcca commit 2a53800
Showing 22 changed files with 174 additions and 148 deletions.
6 changes: 3 additions & 3 deletions common-test/src/main/java/feast/common/it/DataGenerator.java
Original file line number Diff line number Diff line change
@@ -243,7 +243,7 @@ public static DataSource createFileDataSourceSpec(
.setType(DataSource.SourceType.BATCH_FILE)
.setFileOptions(
FileOptions.newBuilder().setFileFormat(fileFormat).setFileUrl(fileURL).build())
.setTimestampColumn(timestampColumn)
.setEventTimestampColumn(timestampColumn)
.setDatePartitionColumn(datePartitionColumn)
.build();
}
@@ -253,7 +253,7 @@ public static DataSource createBigQueryDataSourceSpec(
return DataSource.newBuilder()
.setType(DataSource.SourceType.BATCH_BIGQUERY)
.setBigqueryOptions(BigQueryOptions.newBuilder().setTableRef(bigQueryTableRef).build())
.setTimestampColumn(timestampColumn)
.setEventTimestampColumn(timestampColumn)
.setDatePartitionColumn(datePartitionColumn)
.build();
}
@@ -268,7 +268,7 @@ public static DataSource createKafkaDataSourceSpec(
.setBootstrapServers(servers)
.setClassPath(classPath)
.build())
.setTimestampColumn(timestampColumn)
.setEventTimestampColumn(timestampColumn)
.build();
}
}
11 changes: 8 additions & 3 deletions core/src/main/java/feast/core/model/DataSource.java
Original file line number Diff line number Diff line change
@@ -63,7 +63,10 @@ public class DataSource {
private String fieldMapJSON;

@Column(name = "timestamp_column")
private String timestampColumn;
private String eventTimestampColumn;

@Column(name = "created_timestamp_column")
private String createdTimestampColumn;

@Column(name = "date_partition_column")
private String datePartitionColumn;
@@ -115,7 +118,8 @@ public static DataSource fromProto(DataSourceProto.DataSource spec) {
source.setFieldMapJSON(TypeConversion.convertMapToJsonString(spec.getFieldMappingMap()));

// Set timestamp mapping columns
source.setTimestampColumn(spec.getTimestampColumn());
source.setEventTimestampColumn(spec.getEventTimestampColumn());
source.setCreatedTimestampColumn(spec.getCreatedTimestampColumn());
source.setDatePartitionColumn(spec.getDatePartitionColumn());

return source;
@@ -163,7 +167,8 @@ public DataSourceProto.DataSource toProto() {
// Parse field mapping and options from JSON
spec.putAllFieldMapping(TypeConversion.convertJsonStringToMap(getFieldMapJSON()));

spec.setTimestampColumn(getTimestampColumn());
spec.setEventTimestampColumn(getEventTimestampColumn());
spec.setCreatedTimestampColumn(getCreatedTimestampColumn());
spec.setDatePartitionColumn(getDatePartitionColumn());

return spec.build();
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE data_sources ADD COLUMN created_timestamp_column character varying(255);
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -25,7 +25,7 @@ require (
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200822124328-c89045814202
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9 // indirect
golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b // indirect
golang.org/x/tools v0.0.0-20201013053347-2db1cd791039 // indirect
google.golang.org/grpc v1.29.1
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/russross/blackfriday.v2 v2.0.0 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -490,6 +490,8 @@ golang.org/x/tools v0.0.0-20200929223013-bf155c11ec6f h1:7+Nz9MyPqt2qMCTvNiRy1G0
golang.org/x/tools v0.0.0-20200929223013-bf155c11ec6f/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b h1:07IVqnnzaip3TGyl/cy32V5YP3FguWG4BybYDTBNpm0=
golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/tools v0.0.0-20201013053347-2db1cd791039 h1:kLBxO4OPBgPwjg8Vvu+/0DCHIfDwYIGNFcD66NU9kpo=
golang.org/x/tools v0.0.0-20201013053347-2db1cd791039/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
7 changes: 5 additions & 2 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
@@ -38,13 +38,16 @@ message DataSource {
// and fields in parent FeatureTable.
map<string, string> field_mapping = 2;

// Must specify timestamp column name
string timestamp_column = 3;
// Must specify event timestamp column name
string event_timestamp_column = 3;

// (Optional) Specify partition column
// useful for file sources
string date_partition_column = 4;

// Must specify creation timestamp column name
string created_timestamp_column = 5;

// Defines options for DataSource that sources features from a file
message FileOptions {
// File Format of the file containing the features
8 changes: 4 additions & 4 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
@@ -656,7 +656,7 @@ def ingest(
_check_field_mappings(
column_names,
name,
feature_table.batch_source.timestamp_column,
feature_table.batch_source.event_timestamp_column,
feature_table.batch_source.field_mapping,
)

@@ -671,7 +671,7 @@ def ingest(
column_names,
pyarrow_table,
feature_table.batch_source.date_partition_column,
feature_table.batch_source.timestamp_column,
feature_table.batch_source.event_timestamp_column,
)
else:
dir_path, dest_path = _write_non_partitioned_table_from_source(
@@ -680,12 +680,12 @@ def ingest(

try:
if issubclass(type(feature_table.batch_source), FileSource):
file_url = feature_table.batch_source.file_options.file_url[:-1]
file_url = feature_table.batch_source.file_options.file_url.rstrip("*")
_upload_to_file_source(file_url, with_partitions, dest_path)
if issubclass(type(feature_table.batch_source), BigQuerySource):
bq_table_ref = feature_table.batch_source.bigquery_options.table_ref
feature_table_timestamp_column = (
feature_table.batch_source.timestamp_column
feature_table.batch_source.event_timestamp_column
)

_upload_to_bq_source(
Loading

0 comments on commit 2a53800

Please sign in to comment.