Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add created_timestamp_column to DataSource. Rename timestamp_column -> event_timestamp_column #1048

Merged
merged 3 commits into from
Oct 13, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions common-test/src/main/java/feast/common/it/DataGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ public static DataSource createFileDataSourceSpec(
.setType(DataSource.SourceType.BATCH_FILE)
.setFileOptions(
FileOptions.newBuilder().setFileFormat(fileFormat).setFileUrl(fileURL).build())
.setTimestampColumn(timestampColumn)
.setEventTimestampColumn(timestampColumn)
.setDatePartitionColumn(datePartitionColumn)
.build();
}
Expand All @@ -253,7 +253,7 @@ public static DataSource createBigQueryDataSourceSpec(
return DataSource.newBuilder()
.setType(DataSource.SourceType.BATCH_BIGQUERY)
.setBigqueryOptions(BigQueryOptions.newBuilder().setTableRef(bigQueryTableRef).build())
.setTimestampColumn(timestampColumn)
.setEventTimestampColumn(timestampColumn)
.setDatePartitionColumn(datePartitionColumn)
.build();
}
Expand All @@ -268,7 +268,7 @@ public static DataSource createKafkaDataSourceSpec(
.setBootstrapServers(servers)
.setClassPath(classPath)
.build())
.setTimestampColumn(timestampColumn)
.setEventTimestampColumn(timestampColumn)
.build();
}
}
11 changes: 8 additions & 3 deletions core/src/main/java/feast/core/model/DataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ public class DataSource {
private String fieldMapJSON;

@Column(name = "timestamp_column")
private String timestampColumn;
private String eventTimestampColumn;

@Column(name = "created_timestamp_column")
private String createdTimestampColumn;

@Column(name = "date_partition_column")
private String datePartitionColumn;
Expand Down Expand Up @@ -115,7 +118,8 @@ public static DataSource fromProto(DataSourceProto.DataSource spec) {
source.setFieldMapJSON(TypeConversion.convertMapToJsonString(spec.getFieldMappingMap()));

// Set timestamp mapping columns
source.setTimestampColumn(spec.getTimestampColumn());
source.setEventTimestampColumn(spec.getEventTimestampColumn());
source.setCreatedTimestampColumn(spec.getCreatedTimestampColumn());
source.setDatePartitionColumn(spec.getDatePartitionColumn());

return source;
Expand Down Expand Up @@ -163,7 +167,8 @@ public DataSourceProto.DataSource toProto() {
// Parse field mapping and options from JSON
spec.putAllFieldMapping(TypeConversion.convertJsonStringToMap(getFieldMapJSON()));

spec.setTimestampColumn(getTimestampColumn());
spec.setEventTimestampColumn(getEventTimestampColumn());
spec.setCreatedTimestampColumn(getCreatedTimestampColumn());
spec.setDatePartitionColumn(getDatePartitionColumn());

return spec.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE data_sources ADD COLUMN created_timestamp_column character varying(255);
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/net v0.0.0-20200822124328-c89045814202
golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9 // indirect
golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b // indirect
golang.org/x/tools v0.0.0-20201013053347-2db1cd791039 // indirect
google.golang.org/grpc v1.29.1
google.golang.org/protobuf v1.25.0 // indirect
gopkg.in/russross/blackfriday.v2 v2.0.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ golang.org/x/tools v0.0.0-20200929223013-bf155c11ec6f h1:7+Nz9MyPqt2qMCTvNiRy1G0
golang.org/x/tools v0.0.0-20200929223013-bf155c11ec6f/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b h1:07IVqnnzaip3TGyl/cy32V5YP3FguWG4BybYDTBNpm0=
golang.org/x/tools v0.0.0-20201001230009-b5b87423c93b/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/tools v0.0.0-20201013053347-2db1cd791039 h1:kLBxO4OPBgPwjg8Vvu+/0DCHIfDwYIGNFcD66NU9kpo=
golang.org/x/tools v0.0.0-20201013053347-2db1cd791039/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
Expand Down
7 changes: 5 additions & 2 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ message DataSource {
// and fields in parent FeatureTable.
map<string, string> field_mapping = 2;

// Must specify timestamp column name
string timestamp_column = 3;
// Must specify event timestamp column name
string event_timestamp_column = 3;

// (Optional) Specify partition column
// useful for file sources
string date_partition_column = 4;

// Must specify creation timestamp column name
string created_timestamp_column = 5;

// Defines options for DataSource that sources features from a file
message FileOptions {
// File Format of the file containing the features
Expand Down
8 changes: 4 additions & 4 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ def ingest(
_check_field_mappings(
column_names,
name,
feature_table.batch_source.timestamp_column,
feature_table.batch_source.event_timestamp_column,
feature_table.batch_source.field_mapping,
)

Expand All @@ -671,7 +671,7 @@ def ingest(
column_names,
pyarrow_table,
feature_table.batch_source.date_partition_column,
feature_table.batch_source.timestamp_column,
feature_table.batch_source.event_timestamp_column,
)
else:
dir_path, dest_path = _write_non_partitioned_table_from_source(
Expand All @@ -680,12 +680,12 @@ def ingest(

try:
if issubclass(type(feature_table.batch_source), FileSource):
file_url = feature_table.batch_source.file_options.file_url[:-1]
file_url = feature_table.batch_source.file_options.file_url.rstrip("*")
_upload_to_file_source(file_url, with_partitions, dest_path)
if issubclass(type(feature_table.batch_source), BigQuerySource):
bq_table_ref = feature_table.batch_source.bigquery_options.table_ref
feature_table_timestamp_column = (
feature_table.batch_source.timestamp_column
feature_table.batch_source.event_timestamp_column
)

_upload_to_bq_source(
Expand Down
Loading