diff --git a/common-test/src/main/java/feast/common/it/DataGenerator.java b/common-test/src/main/java/feast/common/it/DataGenerator.java index 487455814a..9fe46bb3ba 100644 --- a/common-test/src/main/java/feast/common/it/DataGenerator.java +++ b/common-test/src/main/java/feast/common/it/DataGenerator.java @@ -20,10 +20,16 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.Duration; import com.google.protobuf.Timestamp; +import feast.proto.core.DataFormatProto.FileFormat; +import feast.proto.core.DataFormatProto.FileFormat.ParquetFormat; +import feast.proto.core.DataFormatProto.StreamFormat; +import feast.proto.core.DataFormatProto.StreamFormat.AvroFormat; +import feast.proto.core.DataFormatProto.StreamFormat.ProtoFormat; import feast.proto.core.DataSourceProto.DataSource; import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions; import feast.proto.core.DataSourceProto.DataSource.FileOptions; import feast.proto.core.DataSourceProto.DataSource.KafkaOptions; +import feast.proto.core.DataSourceProto.DataSource.KinesisOptions; import feast.proto.core.EntityProto; import feast.proto.core.FeatureProto; import feast.proto.core.FeatureProto.FeatureSpecV2; @@ -266,11 +272,14 @@ public static FeatureTableSpec createFeatureTableSpec( } public static DataSource createFileDataSourceSpec( - String fileURL, String fileFormat, String timestampColumn, String datePartitionColumn) { + String fileURL, String timestampColumn, String datePartitionColumn) { return DataSource.newBuilder() .setType(DataSource.SourceType.BATCH_FILE) .setFileOptions( - FileOptions.newBuilder().setFileFormat(fileFormat).setFileUrl(fileURL).build()) + FileOptions.newBuilder() + .setFileFormat(createParquetFormat()) + .setFileUrl(fileURL) + .build()) .setEventTimestampColumn(timestampColumn) .setDatePartitionColumn(datePartitionColumn) .build(); @@ -294,7 +303,7 @@ public static DataSource createKafkaDataSourceSpec( KafkaOptions.newBuilder() .setTopic(topic) .setBootstrapServers(servers) - .setClassPath(classPath) + .setMessageFormat(createProtoFormat("class.path")) .build()) .setEventTimestampColumn(timestampColumn) .build(); @@ -327,4 +336,34 @@ public static ServingAPIProto.GetOnlineFeaturesRequestV2.EntityRow createEntityR .putFields(entityName, entityValue) .build(); } + + public static DataSource createKinesisDataSourceSpec( + String region, String streamName, String classPath, String timestampColumn) { + return DataSource.newBuilder() + .setType(DataSource.SourceType.STREAM_KINESIS) + .setKinesisOptions( + KinesisOptions.newBuilder() + .setRegion("ap-nowhere1") + .setStreamName("stream") + .setRecordFormat(createProtoFormat(classPath)) + .build()) + .setEventTimestampColumn(timestampColumn) + .build(); + } + + public static FileFormat createParquetFormat() { + return FileFormat.newBuilder().setParquetFormat(ParquetFormat.getDefaultInstance()).build(); + } + + public static StreamFormat createAvroFormat(String schemaJSON) { + return StreamFormat.newBuilder() + .setAvroFormat(AvroFormat.newBuilder().setSchemaJson(schemaJSON).build()) + .build(); + } + + public static StreamFormat createProtoFormat(String classPath) { + return StreamFormat.newBuilder() + .setProtoFormat(ProtoFormat.newBuilder().setClassPath(classPath).build()) + .build(); + } } diff --git a/core/src/main/java/feast/core/model/DataSource.java b/core/src/main/java/feast/core/model/DataSource.java index 6bfb5f0303..67477dab45 100644 --- a/core/src/main/java/feast/core/model/DataSource.java +++ b/core/src/main/java/feast/core/model/DataSource.java @@ -16,9 +16,13 @@ */ package feast.core.model; -import static feast.proto.core.DataSourceProto.DataSource.SourceType.*; - +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.MessageOrBuilder; +import com.google.protobuf.util.JsonFormat; import feast.core.util.TypeConversion; +import feast.proto.core.DataFormatProto.FileFormat; +import feast.proto.core.DataFormatProto.StreamFormat; import feast.proto.core.DataSourceProto; import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions; import feast.proto.core.DataSourceProto.DataSource.FileOptions; @@ -91,20 +95,23 @@ public static DataSource fromProto(DataSourceProto.DataSource spec) { switch (spec.getType()) { case BATCH_FILE: dataSourceConfigMap.put("file_url", spec.getFileOptions().getFileUrl()); - dataSourceConfigMap.put("file_format", spec.getFileOptions().getFileFormat()); + dataSourceConfigMap.put("file_format", printJSON(spec.getFileOptions().getFileFormat())); break; case BATCH_BIGQUERY: dataSourceConfigMap.put("table_ref", spec.getBigqueryOptions().getTableRef()); break; case STREAM_KAFKA: dataSourceConfigMap.put("bootstrap_servers", spec.getKafkaOptions().getBootstrapServers()); - dataSourceConfigMap.put("class_path", spec.getKafkaOptions().getClassPath()); + dataSourceConfigMap.put( + "message_format", printJSON(spec.getKafkaOptions().getMessageFormat())); dataSourceConfigMap.put("topic", spec.getKafkaOptions().getTopic()); break; case STREAM_KINESIS: - dataSourceConfigMap.put("class_path", spec.getKinesisOptions().getClassPath()); + dataSourceConfigMap.put( + "record_format", printJSON(spec.getKinesisOptions().getRecordFormat())); dataSourceConfigMap.put("region", spec.getKinesisOptions().getRegion()); dataSourceConfigMap.put("stream_name", spec.getKinesisOptions().getStreamName()); + break; default: throw new UnsupportedOperationException( @@ -137,7 +144,11 @@ public DataSourceProto.DataSource toProto() { case BATCH_FILE: FileOptions.Builder fileOptions = FileOptions.newBuilder(); fileOptions.setFileUrl(dataSourceConfigMap.get("file_url")); - fileOptions.setFileFormat(dataSourceConfigMap.get("file_format")); + + FileFormat.Builder fileFormat = FileFormat.newBuilder(); + parseMessage(dataSourceConfigMap.get("file_format"), fileFormat); + fileOptions.setFileFormat(fileFormat.build()); + spec.setFileOptions(fileOptions.build()); break; case BATCH_BIGQUERY: @@ -148,15 +159,23 @@ public DataSourceProto.DataSource toProto() { case STREAM_KAFKA: KafkaOptions.Builder kafkaOptions = KafkaOptions.newBuilder(); kafkaOptions.setBootstrapServers(dataSourceConfigMap.get("bootstrap_servers")); - kafkaOptions.setClassPath(dataSourceConfigMap.get("class_path")); kafkaOptions.setTopic(dataSourceConfigMap.get("topic")); + + StreamFormat.Builder messageFormat = StreamFormat.newBuilder(); + parseMessage(dataSourceConfigMap.get("message_format"), messageFormat); + kafkaOptions.setMessageFormat(messageFormat.build()); + spec.setKafkaOptions(kafkaOptions.build()); break; case STREAM_KINESIS: KinesisOptions.Builder kinesisOptions = KinesisOptions.newBuilder(); - kinesisOptions.setClassPath(dataSourceConfigMap.get("class_path")); kinesisOptions.setRegion(dataSourceConfigMap.get("region")); kinesisOptions.setStreamName(dataSourceConfigMap.get("stream_name")); + + StreamFormat.Builder recordFormat = StreamFormat.newBuilder(); + parseMessage(dataSourceConfigMap.get("record_format"), recordFormat); + kinesisOptions.setRecordFormat(recordFormat.build()); + spec.setKinesisOptions(kinesisOptions.build()); break; default: @@ -194,4 +213,22 @@ public boolean equals(Object o) { DataSource other = (DataSource) o; return this.toProto().equals(other.toProto()); } + + /** Print the given Message into its JSON string representation */ + private static String printJSON(MessageOrBuilder message) { + try { + return JsonFormat.printer().print(message); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Unexpected exception convering Proto to JSON", e); + } + } + + /** Parse the given Message in JSON representation into the given Message Builder */ + private static void parseMessage(String json, Message.Builder message) { + try { + JsonFormat.parser().merge(json, message); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException("Unexpected exception convering JSON to Proto", e); + } + } } diff --git a/core/src/main/java/feast/core/validators/DataSourceValidator.java b/core/src/main/java/feast/core/validators/DataSourceValidator.java new file mode 100644 index 0000000000..223906d0e6 --- /dev/null +++ b/core/src/main/java/feast/core/validators/DataSourceValidator.java @@ -0,0 +1,81 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.validators; + +import static feast.core.validators.Matchers.*; +import static feast.proto.core.DataSourceProto.DataSource.SourceType.*; + +import feast.proto.core.DataFormatProto.FileFormat; +import feast.proto.core.DataFormatProto.StreamFormat; +import feast.proto.core.DataSourceProto.DataSource; + +public class DataSourceValidator { + /** Validate if the given DataSource protobuf spec is valid. */ + public static void validate(DataSource spec) { + switch (spec.getType()) { + case BATCH_FILE: + FileFormat.FormatCase fileFormat = spec.getFileOptions().getFileFormat().getFormatCase(); + switch (fileFormat) { + case PARQUET_FORMAT: + break; + default: + throw new UnsupportedOperationException( + String.format("Unsupported File Format: %s", fileFormat)); + } + break; + + case BATCH_BIGQUERY: + checkValidBigQueryTableRef(spec.getBigqueryOptions().getTableRef(), "FeatureTable"); + break; + + case STREAM_KAFKA: + StreamFormat.FormatCase messageFormat = + spec.getKafkaOptions().getMessageFormat().getFormatCase(); + switch (messageFormat) { + case PROTO_FORMAT: + checkValidClassPath( + spec.getKafkaOptions().getMessageFormat().getProtoFormat().getClassPath(), + "FeatureTable"); + break; + default: + throw new UnsupportedOperationException( + String.format( + "Unsupported Stream Format for Kafka Source Type: %s", messageFormat)); + } + break; + + case STREAM_KINESIS: + // Verify tht DataFormat is supported by kinesis data source + StreamFormat.FormatCase recordFormat = + spec.getKinesisOptions().getRecordFormat().getFormatCase(); + switch (recordFormat) { + case PROTO_FORMAT: + checkValidClassPath( + spec.getKinesisOptions().getRecordFormat().getProtoFormat().getClassPath(), + "FeatureTable"); + break; + default: + throw new UnsupportedOperationException( + String.format("Unsupported Stream Format for Kafka Source Type: %s", recordFormat)); + } + break; + default: + throw new UnsupportedOperationException( + String.format("Unsupported Feature Store Type: %s", spec.getType())); + } + } +} diff --git a/core/src/main/java/feast/core/validators/FeatureTableValidator.java b/core/src/main/java/feast/core/validators/FeatureTableValidator.java index d50be2736c..863c9442f7 100644 --- a/core/src/main/java/feast/core/validators/FeatureTableValidator.java +++ b/core/src/main/java/feast/core/validators/FeatureTableValidator.java @@ -18,6 +18,7 @@ import static feast.core.validators.Matchers.*; +import feast.proto.core.DataSourceProto.DataSource.SourceType; import feast.proto.core.FeatureProto.FeatureSpecV2; import feast.proto.core.FeatureTableProto.FeatureTableSpec; import java.util.ArrayList; @@ -49,12 +50,6 @@ public static void validateSpec(FeatureTableSpec spec) { checkValidCharacters(spec.getName(), "FeatureTable"); spec.getFeaturesList().forEach(FeatureTableValidator::validateFeatureSpec); - // Check that BigQuery reference defined for BigQuery source is valid - if (!spec.getBatchSource().getBigqueryOptions().getTableRef().isEmpty()) { - checkValidBigQueryTableRef( - spec.getBatchSource().getBigqueryOptions().getTableRef(), "FeatureTable"); - } - // Check that features and entities defined in FeatureTable do not use reserved names ArrayList fieldNames = new ArrayList<>(spec.getEntitiesList()); fieldNames.addAll( @@ -70,6 +65,14 @@ public static void validateSpec(FeatureTableSpec spec) { throw new IllegalArgumentException( String.format("Entity and Feature names within a Feature Table should be unique.")); } + + // Check that the data sources defined in the feature table are valid + if (!spec.getBatchSource().getType().equals(SourceType.INVALID)) { + DataSourceValidator.validate(spec.getBatchSource()); + } + if (!spec.getStreamSource().getType().equals(SourceType.INVALID)) { + DataSourceValidator.validate(spec.getStreamSource()); + } } private static void validateFeatureSpec(FeatureSpecV2 spec) { diff --git a/core/src/main/java/feast/core/validators/Matchers.java b/core/src/main/java/feast/core/validators/Matchers.java index dadc0a3b02..8ba89a8308 100644 --- a/core/src/main/java/feast/core/validators/Matchers.java +++ b/core/src/main/java/feast/core/validators/Matchers.java @@ -24,6 +24,8 @@ public class Matchers { private static Pattern BIGQUERY_TABLE_REF_REGEX = Pattern.compile("[a-zA-Z0-9-]+[:]+[a-zA-Z0-9_]+[.]+[a-zA-Z0-9_]*"); + private static Pattern CLASS_PATH_REGEX = + Pattern.compile("[a-zA-Z_$][a-zA-Z0-9_$]*(\\.[a-zA-Z_$][a-zA-Z0-9_$]*)"); private static Pattern UPPER_SNAKE_CASE_REGEX = Pattern.compile("^[A-Z0-9]+(_[A-Z0-9]+)*$"); private static Pattern LOWER_SNAKE_CASE_REGEX = Pattern.compile("^[a-z0-9]+(_[a-z0-9]+)*$"); private static Pattern VALID_CHARACTERS_REGEX = Pattern.compile("^[a-zA-Z_][a-zA-Z0-9_]*$"); @@ -92,6 +94,14 @@ public static void checkValidBigQueryTableRef(String input, String resource) } } + public static void checkValidClassPath(String input, String resource) { + if (!CLASS_PATH_REGEX.matcher(input).matches()) { + throw new IllegalArgumentException( + String.format( + ERROR_MESSAGE_TEMPLATE, resource, input, "argument must be a valid Java Classpath")); + } + } + public static boolean hasDuplicates(Collection strings) { return (new HashSet<>(strings)).size() < strings.size(); } diff --git a/core/src/test/java/feast/core/model/DataSourceTest.java b/core/src/test/java/feast/core/model/DataSourceTest.java index 8e7400dc8b..2dcbaa93cd 100644 --- a/core/src/test/java/feast/core/model/DataSourceTest.java +++ b/core/src/test/java/feast/core/model/DataSourceTest.java @@ -16,14 +16,11 @@ */ package feast.core.model; -import static feast.proto.core.DataSourceProto.DataSource.SourceType.*; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; import feast.common.it.DataGenerator; import feast.proto.core.DataSourceProto; -import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions; -import feast.proto.core.DataSourceProto.DataSource.KinesisOptions; import java.util.List; import java.util.Map; import org.junit.Test; @@ -55,21 +52,9 @@ public void shouldFromProtoBeReversableWithToProto() { private List getTestSpecs() { return List.of( - DataGenerator.createFileDataSourceSpec("file:///path/to/file", "parquet", "ts_col", ""), + DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", ""), DataGenerator.createKafkaDataSourceSpec("localhost:9092", "topic", "class.path", "ts_col"), - DataSourceProto.DataSource.newBuilder() - .setType(BATCH_BIGQUERY) - .setBigqueryOptions( - BigQueryOptions.newBuilder().setTableRef("project:dataset.table").build()) - .build(), - DataSourceProto.DataSource.newBuilder() - .setType(STREAM_KINESIS) - .setKinesisOptions( - KinesisOptions.newBuilder() - .setRegion("ap-nowhere1") - .setStreamName("stream") - .setClassPath("class.path") - .build()) - .build()); + DataGenerator.createBigQueryDataSourceSpec("project:dataset.table", "ts_col", "dt_col"), + DataGenerator.createKinesisDataSourceSpec("ap-nowhere1", "stream", "class.path", "ts_col")); } } diff --git a/core/src/test/java/feast/core/service/SpecServiceIT.java b/core/src/test/java/feast/core/service/SpecServiceIT.java index 482fa65d83..35535d0715 100644 --- a/core/src/test/java/feast/core/service/SpecServiceIT.java +++ b/core/src/test/java/feast/core/service/SpecServiceIT.java @@ -100,8 +100,7 @@ public void initState() { ImmutableMap.of("feat_key2", "feat_value2")) .toBuilder() .setBatchSource( - DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", "")) .build()); apiClient.simpleApplyEntity( "project1", @@ -953,8 +952,7 @@ public void shouldReturnFeatureTableIfExists() { ImmutableMap.of("feat_key2", "feat_value2")) .toBuilder() .setBatchSource( - DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", "")) .build(); FeatureTableProto.FeatureTable featureTable = apiClient.simpleGetFeatureTable("default", "featuretable1"); @@ -1066,8 +1064,7 @@ private FeatureTableSpec getTestSpec() { Map.of()) .toBuilder() .setBatchSource( - DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", "")) .setStreamSource( DataGenerator.createKafkaDataSourceSpec( "localhost:9092", "topic", "class.path", "ts_col")) @@ -1098,8 +1095,7 @@ public void shouldUpdateExistingTableWithValidSpec() { Map.of("test", "labels")) .toBuilder() .setStreamSource( - DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", "")) .setBatchSource( DataGenerator.createKafkaDataSourceSpec( "localhost:9092", "topic", "class.path", "ts_col")) @@ -1158,8 +1154,7 @@ public void shouldErrorIfEntityChangeOnUpdate() { ImmutableMap.of("feat_key2", "feat_value2")) .toBuilder() .setBatchSource( - DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", "")) .build(); StatusRuntimeException exc = @@ -1190,8 +1185,7 @@ public void shouldErrorIfFeatureValueTypeChangeOnUpdate() { ImmutableMap.of("feat_key2", "feat_value2")) .toBuilder() .setBatchSource( - DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", "")) .build(); StatusRuntimeException exc = @@ -1213,7 +1207,7 @@ public void shouldErrorOnInvalidBigQueryTableRef() { DataGenerator.createFeatureTableSpec( "ft", List.of("entity1"), - Map.of("event_timestamp", ValueProto.ValueType.Enum.INT64), + Map.of("feature", ValueProto.ValueType.Enum.INT64), 3600, Map.of()) .toBuilder() @@ -1250,7 +1244,7 @@ public void shouldErrorOnReservedNames() { .toBuilder() .setBatchSource( DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + "file:///path/to/file", "ts_col", "")) .build())); // Reserved name used in as entity name @@ -1268,7 +1262,7 @@ public void shouldErrorOnReservedNames() { .toBuilder() .setBatchSource( DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + "file:///path/to/file", "ts_col", "")) .build())); } @@ -1289,7 +1283,7 @@ public void shouldErrorOnInvalidName() { .toBuilder() .setBatchSource( DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + "file:///path/to/file", "ts_col", "")) .build())); // Invalid feature name @@ -1307,7 +1301,7 @@ public void shouldErrorOnInvalidName() { .toBuilder() .setBatchSource( DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + "file:///path/to/file", "ts_col", "")) .build())); } @@ -1327,7 +1321,7 @@ public void shouldErrorOnNotFoundEntityName() { .toBuilder() .setBatchSource( DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + "file:///path/to/file", "ts_col", "")) .build())); } @@ -1350,7 +1344,7 @@ public void shouldErrorOnArchivedProject() { .toBuilder() .setBatchSource( DataGenerator.createFileDataSourceSpec( - "file:///path/to/file", "parquet", "ts_col", "")) + "file:///path/to/file", "ts_col", "")) .build())); } } diff --git a/core/src/test/java/feast/core/validators/DataSourceValidatorTest.java b/core/src/test/java/feast/core/validators/DataSourceValidatorTest.java new file mode 100644 index 0000000000..842e4c657d --- /dev/null +++ b/core/src/test/java/feast/core/validators/DataSourceValidatorTest.java @@ -0,0 +1,92 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.validators; + +import static feast.proto.core.DataSourceProto.DataSource.SourceType.*; + +import feast.common.it.DataGenerator; +import feast.proto.core.DataSourceProto; +import feast.proto.core.DataSourceProto.DataSource.BigQueryOptions; +import feast.proto.core.DataSourceProto.DataSource.KafkaOptions; +import feast.proto.core.DataSourceProto.DataSource.KinesisOptions; +import feast.proto.core.DataSourceProto.DataSource.SourceType; +import java.util.Map; +import org.junit.Test; + +public class DataSourceValidatorTest { + + @Test(expected = UnsupportedOperationException.class) + public void shouldErrorIfSourceTypeUnsupported() { + DataSourceProto.DataSource badSpec = + getTestSpecsMap().get(BATCH_FILE).toBuilder().setType(SourceType.INVALID).build(); + DataSourceValidator.validate(badSpec); + } + + @Test + public void shouldPassValidSpecs() { + getTestSpecsMap().values().forEach(DataSourceValidator::validate); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldErrorIfBadBigQueryTableRef() { + DataSourceProto.DataSource badSpec = + getTestSpecsMap() + .get(BATCH_BIGQUERY) + .toBuilder() + .setBigqueryOptions(BigQueryOptions.newBuilder().setTableRef("bad:/ref").build()) + .build(); + DataSourceValidator.validate(badSpec); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldErrorIfBadClassPath() { + DataSourceProto.DataSource badSpec = + getTestSpecsMap() + .get(STREAM_KAFKA) + .toBuilder() + .setKafkaOptions( + KafkaOptions.newBuilder() + .setMessageFormat(DataGenerator.createProtoFormat(".bad^path")) + .build()) + .build(); + DataSourceValidator.validate(badSpec); + + badSpec = + getTestSpecsMap() + .get(STREAM_KINESIS) + .toBuilder() + .setKinesisOptions( + KinesisOptions.newBuilder() + .setRecordFormat(DataGenerator.createProtoFormat(".bad^path")) + .build()) + .build(); + DataSourceValidator.validate(badSpec); + } + + private Map getTestSpecsMap() { + return Map.of( + BATCH_FILE, DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", ""), + BATCH_BIGQUERY, + DataGenerator.createBigQueryDataSourceSpec("project:dataset.table", "ts_col", "dt_col"), + STREAM_KINESIS, + DataGenerator.createKinesisDataSourceSpec( + "ap-nowhere1", "stream", "class.path", "ts_col"), + STREAM_KAFKA, + DataGenerator.createKafkaDataSourceSpec( + "localhost:9092", "topic", "class.path", "ts_col")); + } +} diff --git a/go.mod b/go.mod index 8b36dfe678..de70423b33 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/gogo/protobuf v1.3.1 // indirect github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect github.com/golang/mock v1.2.0 - github.com/golang/protobuf v1.4.2 + github.com/golang/protobuf v1.4.3 github.com/google/go-cmp v0.5.0 github.com/huandu/xstrings v1.2.0 // indirect github.com/lyft/protoc-gen-validate v0.1.0 // indirect @@ -25,7 +25,7 @@ require ( golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect golang.org/x/net v0.0.0-20200822124328-c89045814202 golang.org/x/sys v0.0.0-20200515095857-1151b9dac4a9 // indirect - golang.org/x/tools v0.0.0-20201013201025-64a9e34f3752 // indirect + golang.org/x/tools v0.0.0-20201017001424-6003fad69a88 // indirect google.golang.org/grpc v1.29.1 google.golang.org/protobuf v1.25.0 // indirect gopkg.in/russross/blackfriday.v2 v2.0.0 // indirect diff --git a/go.sum b/go.sum index 4e26be46a3..b9ec81936f 100644 --- a/go.sum +++ b/go.sum @@ -154,6 +154,8 @@ github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvq github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0= github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= github.com/google/btree v0.0.0-20160524151835-7d79101e329e/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -494,8 +496,10 @@ golang.org/x/tools v0.0.0-20201011145850-ed2f50202694 h1:BANdcOVw3KTuUiyfDp7wrzC golang.org/x/tools v0.0.0-20201011145850-ed2f50202694/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= golang.org/x/tools v0.0.0-20201013053347-2db1cd791039 h1:kLBxO4OPBgPwjg8Vvu+/0DCHIfDwYIGNFcD66NU9kpo= golang.org/x/tools v0.0.0-20201013053347-2db1cd791039/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= -golang.org/x/tools v0.0.0-20201013201025-64a9e34f3752 h1:2ntEwh02rqo2jSsrYmp4yKHHjh0CbXP3ZtSUetSB+q8= -golang.org/x/tools v0.0.0-20201013201025-64a9e34f3752/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= +golang.org/x/tools v0.0.0-20201015182029-a5d9e455e9c4 h1:rQWkJiVIyJ3PgiSHL+RXc8xbrK8duU6jG5eeZ9G7nk8= +golang.org/x/tools v0.0.0-20201015182029-a5d9e455e9c4/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= +golang.org/x/tools v0.0.0-20201017001424-6003fad69a88 h1:ZB1XYzdDo7c/O48jzjMkvIjnC120Z9/CwgDWhePjQdQ= +golang.org/x/tools v0.0.0-20201017001424-6003fad69a88/go.mod h1:z6u4i615ZeAfBE4XtMziQW1fSVJXACjjbWkB/mvPzlU= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= diff --git a/protos/feast/core/DataFormat.proto b/protos/feast/core/DataFormat.proto new file mode 100644 index 0000000000..2926e08c63 --- /dev/null +++ b/protos/feast/core/DataFormat.proto @@ -0,0 +1,56 @@ +// +// Copyright 2020 The Feast Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + + +syntax = "proto3"; +package feast.core; + +option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; +option java_outer_classname = "DataFormatProto"; +option java_package = "feast.proto.core"; + +// Defines the file format encoding the features/entity data in files +message FileFormat { + // Defines options for the Parquet data format + message ParquetFormat {} + + oneof format { + ParquetFormat parquet_format = 1; + } +} + +// Defines the data format encoding features/entity data in data streams +message StreamFormat { + // Defines options for the protobuf data format + message ProtoFormat { + // Classpath to the generated Java Protobuf class that can be used to decode + // Feature data from the obtained stream message + string class_path = 1; + } + + // Defines options for the avro data format + message AvroFormat { + // Optional if used in a File DataSource as schema is embedded in avro file. + // Specifies the schema of the Avro message as JSON string. + string schema_json = 1; + } + + // Specifies the data format and format specific options + oneof format { + AvroFormat avro_format = 1; + ProtoFormat proto_format = 2; + } +} diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index c3c0f3fc90..c06a74b14c 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -22,6 +22,8 @@ option go_package = "github.com/feast-dev/feast/sdk/go/protos/feast/core"; option java_outer_classname = "DataSourceProto"; option java_package = "feast.proto.core"; +import "feast/core/DataFormat.proto"; + // Defines a Data Source that can be used source Feature data message DataSource { // Type of Data Source. @@ -50,8 +52,7 @@ message DataSource { // Defines options for DataSource that sources features from a file message FileOptions { - // File Format of the file containing the features - string file_format = 1; + FileFormat file_format = 1; // Target URL of file to retrieve and source features from. // s3://path/to/file for AWS S3 storage @@ -76,9 +77,8 @@ message DataSource { // Kafka topic to collect feature data from. string topic = 2; - // Classpath to the generated Java Protobuf class that can be used to decode - // Feature data from the obtained Kafka message - string class_path = 3; + // Defines the stream data format encoding feature/entity data in Kafka messages. + StreamFormat message_format = 3; } // Defines options for DataSource that sources features from Kinesis records. @@ -91,9 +91,9 @@ message DataSource { // Name of the Kinesis stream to obtain feature data from. string stream_name = 2; - // Classpath to the generated Java Protobuf class that can be used to decode - // Feature data from the obtained Kinesis record - string class_path = 3; + // Defines the data format encoding the feature/entity data in Kinesis records. + // Kinesis Data Sources support Avro and Proto as data formats. + StreamFormat record_format = 3; } // DataSource options. diff --git a/sdk/go/protos/feast/core/CoreService.pb.go b/sdk/go/protos/feast/core/CoreService.pb.go index 53f82b9a6a..c54a5bc315 100644 --- a/sdk/go/protos/feast/core/CoreService.pb.go +++ b/sdk/go/protos/feast/core/CoreService.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/CoreService.proto package core diff --git a/sdk/go/protos/feast/core/DataFormat.pb.go b/sdk/go/protos/feast/core/DataFormat.pb.go new file mode 100644 index 0000000000..ac6d88f9c7 --- /dev/null +++ b/sdk/go/protos/feast/core/DataFormat.pb.go @@ -0,0 +1,494 @@ +// +// Copyright 2020 The Feast Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.25.0 +// protoc v3.10.0 +// source: feast/core/DataFormat.proto + +package core + +import ( + proto "github.com/golang/protobuf/proto" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// This is a compile-time assertion that a sufficiently up-to-date version +// of the legacy proto package is being used. +const _ = proto.ProtoPackageIsVersion4 + +// Defines the file format encoding the features/entity data in files +type FileFormat struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Types that are assignable to Format: + // *FileFormat_ParquetFormat_ + Format isFileFormat_Format `protobuf_oneof:"format"` +} + +func (x *FileFormat) Reset() { + *x = FileFormat{} + if protoimpl.UnsafeEnabled { + mi := &file_feast_core_DataFormat_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FileFormat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FileFormat) ProtoMessage() {} + +func (x *FileFormat) ProtoReflect() protoreflect.Message { + mi := &file_feast_core_DataFormat_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FileFormat.ProtoReflect.Descriptor instead. +func (*FileFormat) Descriptor() ([]byte, []int) { + return file_feast_core_DataFormat_proto_rawDescGZIP(), []int{0} +} + +func (m *FileFormat) GetFormat() isFileFormat_Format { + if m != nil { + return m.Format + } + return nil +} + +func (x *FileFormat) GetParquetFormat() *FileFormat_ParquetFormat { + if x, ok := x.GetFormat().(*FileFormat_ParquetFormat_); ok { + return x.ParquetFormat + } + return nil +} + +type isFileFormat_Format interface { + isFileFormat_Format() +} + +type FileFormat_ParquetFormat_ struct { + ParquetFormat *FileFormat_ParquetFormat `protobuf:"bytes,1,opt,name=parquet_format,json=parquetFormat,proto3,oneof"` +} + +func (*FileFormat_ParquetFormat_) isFileFormat_Format() {} + +// Defines the data format encoding features/entity data in data streams +type StreamFormat struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Specifies the data format and format specific options + // + // Types that are assignable to Format: + // *StreamFormat_AvroFormat_ + // *StreamFormat_ProtoFormat_ + Format isStreamFormat_Format `protobuf_oneof:"format"` +} + +func (x *StreamFormat) Reset() { + *x = StreamFormat{} + if protoimpl.UnsafeEnabled { + mi := &file_feast_core_DataFormat_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamFormat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamFormat) ProtoMessage() {} + +func (x *StreamFormat) ProtoReflect() protoreflect.Message { + mi := &file_feast_core_DataFormat_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamFormat.ProtoReflect.Descriptor instead. +func (*StreamFormat) Descriptor() ([]byte, []int) { + return file_feast_core_DataFormat_proto_rawDescGZIP(), []int{1} +} + +func (m *StreamFormat) GetFormat() isStreamFormat_Format { + if m != nil { + return m.Format + } + return nil +} + +func (x *StreamFormat) GetAvroFormat() *StreamFormat_AvroFormat { + if x, ok := x.GetFormat().(*StreamFormat_AvroFormat_); ok { + return x.AvroFormat + } + return nil +} + +func (x *StreamFormat) GetProtoFormat() *StreamFormat_ProtoFormat { + if x, ok := x.GetFormat().(*StreamFormat_ProtoFormat_); ok { + return x.ProtoFormat + } + return nil +} + +type isStreamFormat_Format interface { + isStreamFormat_Format() +} + +type StreamFormat_AvroFormat_ struct { + AvroFormat *StreamFormat_AvroFormat `protobuf:"bytes,1,opt,name=avro_format,json=avroFormat,proto3,oneof"` +} + +type StreamFormat_ProtoFormat_ struct { + ProtoFormat *StreamFormat_ProtoFormat `protobuf:"bytes,2,opt,name=proto_format,json=protoFormat,proto3,oneof"` +} + +func (*StreamFormat_AvroFormat_) isStreamFormat_Format() {} + +func (*StreamFormat_ProtoFormat_) isStreamFormat_Format() {} + +// Defines options for the Parquet data format +type FileFormat_ParquetFormat struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *FileFormat_ParquetFormat) Reset() { + *x = FileFormat_ParquetFormat{} + if protoimpl.UnsafeEnabled { + mi := &file_feast_core_DataFormat_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *FileFormat_ParquetFormat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*FileFormat_ParquetFormat) ProtoMessage() {} + +func (x *FileFormat_ParquetFormat) ProtoReflect() protoreflect.Message { + mi := &file_feast_core_DataFormat_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use FileFormat_ParquetFormat.ProtoReflect.Descriptor instead. +func (*FileFormat_ParquetFormat) Descriptor() ([]byte, []int) { + return file_feast_core_DataFormat_proto_rawDescGZIP(), []int{0, 0} +} + +// Defines options for the protobuf data format +type StreamFormat_ProtoFormat struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Classpath to the generated Java Protobuf class that can be used to decode + // Feature data from the obtained stream message + ClassPath string `protobuf:"bytes,1,opt,name=class_path,json=classPath,proto3" json:"class_path,omitempty"` +} + +func (x *StreamFormat_ProtoFormat) Reset() { + *x = StreamFormat_ProtoFormat{} + if protoimpl.UnsafeEnabled { + mi := &file_feast_core_DataFormat_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamFormat_ProtoFormat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamFormat_ProtoFormat) ProtoMessage() {} + +func (x *StreamFormat_ProtoFormat) ProtoReflect() protoreflect.Message { + mi := &file_feast_core_DataFormat_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamFormat_ProtoFormat.ProtoReflect.Descriptor instead. +func (*StreamFormat_ProtoFormat) Descriptor() ([]byte, []int) { + return file_feast_core_DataFormat_proto_rawDescGZIP(), []int{1, 0} +} + +func (x *StreamFormat_ProtoFormat) GetClassPath() string { + if x != nil { + return x.ClassPath + } + return "" +} + +// Defines options for the avro data format +type StreamFormat_AvroFormat struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Optional if used in a File DataSource as schema is embedded in avro file. + // Specifies the schema of the Avro message as JSON string. + SchemaJson string `protobuf:"bytes,1,opt,name=schema_json,json=schemaJson,proto3" json:"schema_json,omitempty"` +} + +func (x *StreamFormat_AvroFormat) Reset() { + *x = StreamFormat_AvroFormat{} + if protoimpl.UnsafeEnabled { + mi := &file_feast_core_DataFormat_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamFormat_AvroFormat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamFormat_AvroFormat) ProtoMessage() {} + +func (x *StreamFormat_AvroFormat) ProtoReflect() protoreflect.Message { + mi := &file_feast_core_DataFormat_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamFormat_AvroFormat.ProtoReflect.Descriptor instead. +func (*StreamFormat_AvroFormat) Descriptor() ([]byte, []int) { + return file_feast_core_DataFormat_proto_rawDescGZIP(), []int{1, 1} +} + +func (x *StreamFormat_AvroFormat) GetSchemaJson() string { + if x != nil { + return x.SchemaJson + } + return "" +} + +var File_feast_core_DataFormat_proto protoreflect.FileDescriptor + +var file_feast_core_DataFormat_proto_rawDesc = []byte{ + 0x0a, 0x1b, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x44, 0x61, 0x74, + 0x61, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x66, + 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x22, 0x76, 0x0a, 0x0a, 0x46, 0x69, 0x6c, + 0x65, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x12, 0x4d, 0x0a, 0x0e, 0x70, 0x61, 0x72, 0x71, 0x75, + 0x65, 0x74, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x24, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x46, 0x69, 0x6c, + 0x65, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x2e, 0x50, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, 0x46, + 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x48, 0x00, 0x52, 0x0d, 0x70, 0x61, 0x72, 0x71, 0x75, 0x65, 0x74, + 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x1a, 0x0f, 0x0a, 0x0d, 0x50, 0x61, 0x72, 0x71, 0x75, 0x65, + 0x74, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x42, 0x08, 0x0a, 0x06, 0x66, 0x6f, 0x72, 0x6d, 0x61, + 0x74, 0x22, 0x88, 0x02, 0x0a, 0x0c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6f, 0x72, 0x6d, + 0x61, 0x74, 0x12, 0x46, 0x0a, 0x0b, 0x61, 0x76, 0x72, 0x6f, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, + 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, + 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6f, 0x72, 0x6d, 0x61, + 0x74, 0x2e, 0x41, 0x76, 0x72, 0x6f, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x48, 0x00, 0x52, 0x0a, + 0x61, 0x76, 0x72, 0x6f, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x12, 0x49, 0x0a, 0x0c, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x24, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x2e, 0x50, 0x72, 0x6f, 0x74, 0x6f, + 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x48, 0x00, 0x52, 0x0b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x46, + 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x1a, 0x2c, 0x0a, 0x0b, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x46, 0x6f, + 0x72, 0x6d, 0x61, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x61, 0x73, 0x73, 0x5f, 0x70, 0x61, + 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x61, 0x73, 0x73, 0x50, + 0x61, 0x74, 0x68, 0x1a, 0x2d, 0x0a, 0x0a, 0x41, 0x76, 0x72, 0x6f, 0x46, 0x6f, 0x72, 0x6d, 0x61, + 0x74, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x5f, 0x6a, 0x73, 0x6f, 0x6e, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x4a, 0x73, + 0x6f, 0x6e, 0x42, 0x08, 0x0a, 0x06, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x42, 0x58, 0x0a, 0x10, + 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x72, 0x65, + 0x42, 0x0f, 0x44, 0x61, 0x74, 0x61, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x50, 0x72, 0x6f, 0x74, + 0x6f, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x66, 0x65, + 0x61, 0x73, 0x74, 0x2d, 0x64, 0x65, 0x76, 0x2f, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2f, 0x73, 0x64, + 0x6b, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x66, 0x65, 0x61, 0x73, + 0x74, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_feast_core_DataFormat_proto_rawDescOnce sync.Once + file_feast_core_DataFormat_proto_rawDescData = file_feast_core_DataFormat_proto_rawDesc +) + +func file_feast_core_DataFormat_proto_rawDescGZIP() []byte { + file_feast_core_DataFormat_proto_rawDescOnce.Do(func() { + file_feast_core_DataFormat_proto_rawDescData = protoimpl.X.CompressGZIP(file_feast_core_DataFormat_proto_rawDescData) + }) + return file_feast_core_DataFormat_proto_rawDescData +} + +var file_feast_core_DataFormat_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_feast_core_DataFormat_proto_goTypes = []interface{}{ + (*FileFormat)(nil), // 0: feast.core.FileFormat + (*StreamFormat)(nil), // 1: feast.core.StreamFormat + (*FileFormat_ParquetFormat)(nil), // 2: feast.core.FileFormat.ParquetFormat + (*StreamFormat_ProtoFormat)(nil), // 3: feast.core.StreamFormat.ProtoFormat + (*StreamFormat_AvroFormat)(nil), // 4: feast.core.StreamFormat.AvroFormat +} +var file_feast_core_DataFormat_proto_depIdxs = []int32{ + 2, // 0: feast.core.FileFormat.parquet_format:type_name -> feast.core.FileFormat.ParquetFormat + 4, // 1: feast.core.StreamFormat.avro_format:type_name -> feast.core.StreamFormat.AvroFormat + 3, // 2: feast.core.StreamFormat.proto_format:type_name -> feast.core.StreamFormat.ProtoFormat + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name +} + +func init() { file_feast_core_DataFormat_proto_init() } +func file_feast_core_DataFormat_proto_init() { + if File_feast_core_DataFormat_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_feast_core_DataFormat_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FileFormat); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_feast_core_DataFormat_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamFormat); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_feast_core_DataFormat_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*FileFormat_ParquetFormat); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_feast_core_DataFormat_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamFormat_ProtoFormat); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_feast_core_DataFormat_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamFormat_AvroFormat); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_feast_core_DataFormat_proto_msgTypes[0].OneofWrappers = []interface{}{ + (*FileFormat_ParquetFormat_)(nil), + } + file_feast_core_DataFormat_proto_msgTypes[1].OneofWrappers = []interface{}{ + (*StreamFormat_AvroFormat_)(nil), + (*StreamFormat_ProtoFormat_)(nil), + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_feast_core_DataFormat_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_feast_core_DataFormat_proto_goTypes, + DependencyIndexes: file_feast_core_DataFormat_proto_depIdxs, + MessageInfos: file_feast_core_DataFormat_proto_msgTypes, + }.Build() + File_feast_core_DataFormat_proto = out.File + file_feast_core_DataFormat_proto_rawDesc = nil + file_feast_core_DataFormat_proto_goTypes = nil + file_feast_core_DataFormat_proto_depIdxs = nil +} diff --git a/sdk/go/protos/feast/core/DataSource.pb.go b/sdk/go/protos/feast/core/DataSource.pb.go index a70176bccb..4dd43d5196 100644 --- a/sdk/go/protos/feast/core/DataSource.pb.go +++ b/sdk/go/protos/feast/core/DataSource.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/DataSource.proto package core @@ -260,8 +260,7 @@ type DataSource_FileOptions struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // File Format of the file containing the features - FileFormat string `protobuf:"bytes,1,opt,name=file_format,json=fileFormat,proto3" json:"file_format,omitempty"` + FileFormat *FileFormat `protobuf:"bytes,1,opt,name=file_format,json=fileFormat,proto3" json:"file_format,omitempty"` // Target URL of file to retrieve and source features from. // s3://path/to/file for AWS S3 storage // gs://path/to/file for GCP GCS storage @@ -301,11 +300,11 @@ func (*DataSource_FileOptions) Descriptor() ([]byte, []int) { return file_feast_core_DataSource_proto_rawDescGZIP(), []int{0, 1} } -func (x *DataSource_FileOptions) GetFileFormat() string { +func (x *DataSource_FileOptions) GetFileFormat() *FileFormat { if x != nil { return x.FileFormat } - return "" + return nil } func (x *DataSource_FileOptions) GetFileUrl() string { @@ -376,9 +375,8 @@ type DataSource_KafkaOptions struct { BootstrapServers string `protobuf:"bytes,1,opt,name=bootstrap_servers,json=bootstrapServers,proto3" json:"bootstrap_servers,omitempty"` // Kafka topic to collect feature data from. Topic string `protobuf:"bytes,2,opt,name=topic,proto3" json:"topic,omitempty"` - // Classpath to the generated Java Protobuf class that can be used to decode - // Feature data from the obtained Kafka message - ClassPath string `protobuf:"bytes,3,opt,name=class_path,json=classPath,proto3" json:"class_path,omitempty"` + // Defines the stream data format encoding feature/entity data in Kafka messages. + MessageFormat *StreamFormat `protobuf:"bytes,3,opt,name=message_format,json=messageFormat,proto3" json:"message_format,omitempty"` } func (x *DataSource_KafkaOptions) Reset() { @@ -427,11 +425,11 @@ func (x *DataSource_KafkaOptions) GetTopic() string { return "" } -func (x *DataSource_KafkaOptions) GetClassPath() string { +func (x *DataSource_KafkaOptions) GetMessageFormat() *StreamFormat { if x != nil { - return x.ClassPath + return x.MessageFormat } - return "" + return nil } // Defines options for DataSource that sources features from Kinesis records. @@ -446,9 +444,9 @@ type DataSource_KinesisOptions struct { Region string `protobuf:"bytes,1,opt,name=region,proto3" json:"region,omitempty"` // Name of the Kinesis stream to obtain feature data from. StreamName string `protobuf:"bytes,2,opt,name=stream_name,json=streamName,proto3" json:"stream_name,omitempty"` - // Classpath to the generated Java Protobuf class that can be used to decode - // Feature data from the obtained Kinesis record - ClassPath string `protobuf:"bytes,3,opt,name=class_path,json=classPath,proto3" json:"class_path,omitempty"` + // Defines the data format encoding the feature/entity data in Kinesis records. + // Kinesis Data Sources support Avro and Proto as data formats. + RecordFormat *StreamFormat `protobuf:"bytes,3,opt,name=record_format,json=recordFormat,proto3" json:"record_format,omitempty"` } func (x *DataSource_KinesisOptions) Reset() { @@ -497,11 +495,11 @@ func (x *DataSource_KinesisOptions) GetStreamName() string { return "" } -func (x *DataSource_KinesisOptions) GetClassPath() string { +func (x *DataSource_KinesisOptions) GetRecordFormat() *StreamFormat { if x != nil { - return x.ClassPath + return x.RecordFormat } - return "" + return nil } var File_feast_core_DataSource_proto protoreflect.FileDescriptor @@ -509,85 +507,93 @@ var File_feast_core_DataSource_proto protoreflect.FileDescriptor var file_feast_core_DataSource_proto_rawDesc = []byte{ 0x0a, 0x1b, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0a, 0x66, - 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x22, 0xfa, 0x08, 0x0a, 0x0a, 0x44, 0x61, - 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x21, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, - 0x6f, 0x72, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x53, - 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, - 0x4d, 0x0a, 0x0d, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x5f, 0x6d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, - 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, - 0x6f, 0x72, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x46, + 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x1a, 0x1b, 0x66, 0x65, 0x61, 0x73, 0x74, + 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x44, 0x61, 0x74, 0x61, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xd6, 0x09, 0x0a, 0x0a, 0x44, 0x61, 0x74, 0x61, 0x53, + 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x35, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0e, 0x32, 0x21, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, + 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x53, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x4d, 0x0a, 0x0d, + 0x66, 0x69, 0x65, 0x6c, 0x64, 0x5f, 0x6d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x18, 0x02, 0x20, + 0x03, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, + 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x46, 0x69, 0x65, 0x6c, + 0x64, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x0c, 0x66, + 0x69, 0x65, 0x6c, 0x64, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x34, 0x0a, 0x16, 0x65, + 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x63, + 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x14, 0x65, 0x76, 0x65, + 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x43, 0x6f, 0x6c, 0x75, 0x6d, + 0x6e, 0x12, 0x32, 0x0a, 0x15, 0x64, 0x61, 0x74, 0x65, 0x5f, 0x70, 0x61, 0x72, 0x74, 0x69, 0x74, + 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x13, 0x64, 0x61, 0x74, 0x65, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x43, + 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x12, 0x38, 0x0a, 0x18, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, + 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, + 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x16, 0x63, 0x72, 0x65, 0x61, 0x74, 0x65, 0x64, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x12, + 0x47, 0x0a, 0x0c, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, + 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, + 0x72, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x46, 0x69, + 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x48, 0x00, 0x52, 0x0b, 0x66, 0x69, 0x6c, + 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x53, 0x0a, 0x10, 0x62, 0x69, 0x67, 0x71, + 0x75, 0x65, 0x72, 0x79, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x0c, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, + 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x42, 0x69, 0x67, 0x51, 0x75, + 0x65, 0x72, 0x79, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x48, 0x00, 0x52, 0x0f, 0x62, 0x69, + 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x4a, 0x0a, + 0x0d, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x0d, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, + 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x4b, 0x61, 0x66, + 0x6b, 0x61, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x48, 0x00, 0x52, 0x0c, 0x6b, 0x61, 0x66, + 0x6b, 0x61, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x50, 0x0a, 0x0f, 0x6b, 0x69, 0x6e, + 0x65, 0x73, 0x69, 0x73, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x0e, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, + 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x4b, 0x69, 0x6e, 0x65, 0x73, + 0x69, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x48, 0x00, 0x52, 0x0e, 0x6b, 0x69, 0x6e, + 0x65, 0x73, 0x69, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x3f, 0x0a, 0x11, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x45, 0x6e, 0x74, 0x72, 0x79, - 0x52, 0x0c, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x12, 0x34, - 0x0a, 0x16, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x14, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x43, 0x6f, - 0x6c, 0x75, 0x6d, 0x6e, 0x12, 0x32, 0x0a, 0x15, 0x64, 0x61, 0x74, 0x65, 0x5f, 0x70, 0x61, 0x72, - 0x74, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x18, 0x04, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x13, 0x64, 0x61, 0x74, 0x65, 0x50, 0x61, 0x72, 0x74, 0x69, 0x74, 0x69, - 0x6f, 0x6e, 0x43, 0x6f, 0x6c, 0x75, 0x6d, 0x6e, 0x12, 0x38, 0x0a, 0x18, 0x63, 0x72, 0x65, 0x61, - 0x74, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x5f, 0x63, 0x6f, - 0x6c, 0x75, 0x6d, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x16, 0x63, 0x72, 0x65, 0x61, - 0x74, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x43, 0x6f, 0x6c, 0x75, - 0x6d, 0x6e, 0x12, 0x47, 0x0a, 0x0c, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, - 0x6e, 0x73, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x22, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, - 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x2e, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x48, 0x00, 0x52, 0x0b, - 0x66, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x53, 0x0a, 0x10, 0x62, - 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, - 0x0c, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x26, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, - 0x72, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x42, 0x69, - 0x67, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x48, 0x00, 0x52, - 0x0f, 0x62, 0x69, 0x67, 0x71, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, - 0x12, 0x4a, 0x0a, 0x0d, 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, - 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x23, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, - 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, - 0x4b, 0x61, 0x66, 0x6b, 0x61, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x48, 0x00, 0x52, 0x0c, - 0x6b, 0x61, 0x66, 0x6b, 0x61, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x50, 0x0a, 0x0f, - 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x5f, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, - 0x0e, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, - 0x72, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x2e, 0x4b, 0x69, - 0x6e, 0x65, 0x73, 0x69, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x48, 0x00, 0x52, 0x0e, - 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x1a, 0x3f, - 0x0a, 0x11, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x4d, 0x61, 0x70, 0x70, 0x69, 0x6e, 0x67, 0x45, 0x6e, - 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, - 0x49, 0x0a, 0x0b, 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1f, - 0x0a, 0x0b, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0a, 0x66, 0x69, 0x6c, 0x65, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x12, - 0x19, 0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x07, 0x66, 0x69, 0x6c, 0x65, 0x55, 0x72, 0x6c, 0x1a, 0x2e, 0x0a, 0x0f, 0x42, 0x69, - 0x67, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x1b, 0x0a, - 0x09, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x72, 0x65, 0x66, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x66, 0x1a, 0x70, 0x0a, 0x0c, 0x4b, 0x61, - 0x66, 0x6b, 0x61, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2b, 0x0a, 0x11, 0x62, 0x6f, - 0x6f, 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x62, 0x6f, 0x6f, 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, - 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x12, 0x1d, 0x0a, - 0x0a, 0x63, 0x6c, 0x61, 0x73, 0x73, 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x63, 0x6c, 0x61, 0x73, 0x73, 0x50, 0x61, 0x74, 0x68, 0x1a, 0x68, 0x0a, 0x0e, - 0x4b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x16, - 0x0a, 0x06, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, - 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x63, 0x6c, 0x61, 0x73, 0x73, - 0x5f, 0x70, 0x61, 0x74, 0x68, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x61, - 0x73, 0x73, 0x50, 0x61, 0x74, 0x68, 0x22, 0x63, 0x0a, 0x0a, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, - 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x42, 0x41, 0x54, 0x43, 0x48, 0x5f, 0x46, 0x49, 0x4c, 0x45, 0x10, - 0x01, 0x12, 0x12, 0x0a, 0x0e, 0x42, 0x41, 0x54, 0x43, 0x48, 0x5f, 0x42, 0x49, 0x47, 0x51, 0x55, - 0x45, 0x52, 0x59, 0x10, 0x02, 0x12, 0x10, 0x0a, 0x0c, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, - 0x4b, 0x41, 0x46, 0x4b, 0x41, 0x10, 0x03, 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x52, 0x45, 0x41, - 0x4d, 0x5f, 0x4b, 0x49, 0x4e, 0x45, 0x53, 0x49, 0x53, 0x10, 0x04, 0x42, 0x09, 0x0a, 0x07, 0x6f, - 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42, 0x58, 0x0a, 0x10, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x42, 0x0f, 0x44, 0x61, 0x74, 0x61, - 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x33, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2d, 0x64, 0x65, - 0x76, 0x2f, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2f, 0x73, 0x64, 0x6b, 0x2f, 0x67, 0x6f, 0x2f, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2f, 0x63, 0x6f, 0x72, 0x65, - 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, + 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x1a, 0x61, 0x0a, 0x0b, + 0x46, 0x69, 0x6c, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x37, 0x0a, 0x0b, 0x66, + 0x69, 0x6c, 0x65, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x16, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x46, 0x69, + 0x6c, 0x65, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x52, 0x0a, 0x66, 0x69, 0x6c, 0x65, 0x46, 0x6f, + 0x72, 0x6d, 0x61, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x66, 0x69, 0x6c, 0x65, 0x5f, 0x75, 0x72, 0x6c, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x66, 0x69, 0x6c, 0x65, 0x55, 0x72, 0x6c, 0x1a, + 0x2e, 0x0a, 0x0f, 0x42, 0x69, 0x67, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4f, 0x70, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x72, 0x65, 0x66, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x52, 0x65, 0x66, 0x1a, + 0x92, 0x01, 0x0a, 0x0c, 0x4b, 0x61, 0x66, 0x6b, 0x61, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, + 0x12, 0x2b, 0x0a, 0x11, 0x62, 0x6f, 0x6f, 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, 0x5f, 0x73, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x10, 0x62, 0x6f, 0x6f, + 0x74, 0x73, 0x74, 0x72, 0x61, 0x70, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x73, 0x12, 0x14, 0x0a, + 0x05, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x6f, + 0x70, 0x69, 0x63, 0x12, 0x3f, 0x0a, 0x0e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x5f, 0x66, + 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x66, 0x65, + 0x61, 0x73, 0x74, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, + 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x52, 0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x46, 0x6f, + 0x72, 0x6d, 0x61, 0x74, 0x1a, 0x88, 0x01, 0x0a, 0x0e, 0x4b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, + 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x72, 0x65, 0x67, 0x69, 0x6f, + 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x72, 0x65, 0x67, 0x69, 0x6f, 0x6e, 0x12, + 0x1f, 0x0a, 0x0b, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4e, 0x61, 0x6d, 0x65, + 0x12, 0x3d, 0x0a, 0x0d, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x5f, 0x66, 0x6f, 0x72, 0x6d, 0x61, + 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x18, 0x2e, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, + 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6f, 0x72, 0x6d, 0x61, + 0x74, 0x52, 0x0c, 0x72, 0x65, 0x63, 0x6f, 0x72, 0x64, 0x46, 0x6f, 0x72, 0x6d, 0x61, 0x74, 0x22, + 0x63, 0x0a, 0x0a, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x0b, 0x0a, + 0x07, 0x49, 0x4e, 0x56, 0x41, 0x4c, 0x49, 0x44, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x42, 0x41, + 0x54, 0x43, 0x48, 0x5f, 0x46, 0x49, 0x4c, 0x45, 0x10, 0x01, 0x12, 0x12, 0x0a, 0x0e, 0x42, 0x41, + 0x54, 0x43, 0x48, 0x5f, 0x42, 0x49, 0x47, 0x51, 0x55, 0x45, 0x52, 0x59, 0x10, 0x02, 0x12, 0x10, + 0x0a, 0x0c, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4b, 0x41, 0x46, 0x4b, 0x41, 0x10, 0x03, + 0x12, 0x12, 0x0a, 0x0e, 0x53, 0x54, 0x52, 0x45, 0x41, 0x4d, 0x5f, 0x4b, 0x49, 0x4e, 0x45, 0x53, + 0x49, 0x53, 0x10, 0x04, 0x42, 0x09, 0x0a, 0x07, 0x6f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x42, + 0x58, 0x0a, 0x10, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x63, + 0x6f, 0x72, 0x65, 0x42, 0x0f, 0x44, 0x61, 0x74, 0x61, 0x53, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, + 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x66, 0x65, 0x61, 0x73, 0x74, 0x2d, 0x64, 0x65, 0x76, 0x2f, 0x66, 0x65, 0x61, 0x73, 0x74, + 0x2f, 0x73, 0x64, 0x6b, 0x2f, 0x67, 0x6f, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x66, + 0x65, 0x61, 0x73, 0x74, 0x2f, 0x63, 0x6f, 0x72, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -612,6 +618,8 @@ var file_feast_core_DataSource_proto_goTypes = []interface{}{ (*DataSource_BigQueryOptions)(nil), // 4: feast.core.DataSource.BigQueryOptions (*DataSource_KafkaOptions)(nil), // 5: feast.core.DataSource.KafkaOptions (*DataSource_KinesisOptions)(nil), // 6: feast.core.DataSource.KinesisOptions + (*FileFormat)(nil), // 7: feast.core.FileFormat + (*StreamFormat)(nil), // 8: feast.core.StreamFormat } var file_feast_core_DataSource_proto_depIdxs = []int32{ 0, // 0: feast.core.DataSource.type:type_name -> feast.core.DataSource.SourceType @@ -620,11 +628,14 @@ var file_feast_core_DataSource_proto_depIdxs = []int32{ 4, // 3: feast.core.DataSource.bigquery_options:type_name -> feast.core.DataSource.BigQueryOptions 5, // 4: feast.core.DataSource.kafka_options:type_name -> feast.core.DataSource.KafkaOptions 6, // 5: feast.core.DataSource.kinesis_options:type_name -> feast.core.DataSource.KinesisOptions - 6, // [6:6] is the sub-list for method output_type - 6, // [6:6] is the sub-list for method input_type - 6, // [6:6] is the sub-list for extension type_name - 6, // [6:6] is the sub-list for extension extendee - 0, // [0:6] is the sub-list for field type_name + 7, // 6: feast.core.DataSource.FileOptions.file_format:type_name -> feast.core.FileFormat + 8, // 7: feast.core.DataSource.KafkaOptions.message_format:type_name -> feast.core.StreamFormat + 8, // 8: feast.core.DataSource.KinesisOptions.record_format:type_name -> feast.core.StreamFormat + 9, // [9:9] is the sub-list for method output_type + 9, // [9:9] is the sub-list for method input_type + 9, // [9:9] is the sub-list for extension type_name + 9, // [9:9] is the sub-list for extension extendee + 0, // [0:9] is the sub-list for field type_name } func init() { file_feast_core_DataSource_proto_init() } @@ -632,6 +643,7 @@ func file_feast_core_DataSource_proto_init() { if File_feast_core_DataSource_proto != nil { return } + file_feast_core_DataFormat_proto_init() if !protoimpl.UnsafeEnabled { file_feast_core_DataSource_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { switch v := v.(*DataSource); i { diff --git a/sdk/go/protos/feast/core/Entity.pb.go b/sdk/go/protos/feast/core/Entity.pb.go index 0aed913325..f5a2c0d06a 100644 --- a/sdk/go/protos/feast/core/Entity.pb.go +++ b/sdk/go/protos/feast/core/Entity.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/Entity.proto package core diff --git a/sdk/go/protos/feast/core/Feature.pb.go b/sdk/go/protos/feast/core/Feature.pb.go index 1ad93ef8a1..9c2690e0f9 100644 --- a/sdk/go/protos/feast/core/Feature.pb.go +++ b/sdk/go/protos/feast/core/Feature.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/Feature.proto package core diff --git a/sdk/go/protos/feast/core/FeatureSet.pb.go b/sdk/go/protos/feast/core/FeatureSet.pb.go index d13041a868..520c4881be 100644 --- a/sdk/go/protos/feast/core/FeatureSet.pb.go +++ b/sdk/go/protos/feast/core/FeatureSet.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/FeatureSet.proto package core diff --git a/sdk/go/protos/feast/core/FeatureSetReference.pb.go b/sdk/go/protos/feast/core/FeatureSetReference.pb.go index d82e94b6a1..ca640aa0fb 100644 --- a/sdk/go/protos/feast/core/FeatureSetReference.pb.go +++ b/sdk/go/protos/feast/core/FeatureSetReference.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/FeatureSetReference.proto package core diff --git a/sdk/go/protos/feast/core/FeatureTable.pb.go b/sdk/go/protos/feast/core/FeatureTable.pb.go index 290477931d..48aef52950 100644 --- a/sdk/go/protos/feast/core/FeatureTable.pb.go +++ b/sdk/go/protos/feast/core/FeatureTable.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/FeatureTable.proto package core diff --git a/sdk/go/protos/feast/core/IngestionJob.pb.go b/sdk/go/protos/feast/core/IngestionJob.pb.go index 047a9cea0e..a716bdf089 100644 --- a/sdk/go/protos/feast/core/IngestionJob.pb.go +++ b/sdk/go/protos/feast/core/IngestionJob.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/IngestionJob.proto package core diff --git a/sdk/go/protos/feast/core/Runner.pb.go b/sdk/go/protos/feast/core/Runner.pb.go index 105878c6d6..763695f6c8 100644 --- a/sdk/go/protos/feast/core/Runner.pb.go +++ b/sdk/go/protos/feast/core/Runner.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/Runner.proto package core diff --git a/sdk/go/protos/feast/core/Source.pb.go b/sdk/go/protos/feast/core/Source.pb.go index a5b3de9564..af7f9783bb 100644 --- a/sdk/go/protos/feast/core/Source.pb.go +++ b/sdk/go/protos/feast/core/Source.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/Source.proto package core diff --git a/sdk/go/protos/feast/core/Store.pb.go b/sdk/go/protos/feast/core/Store.pb.go index c037fe84b5..b5ffae9326 100644 --- a/sdk/go/protos/feast/core/Store.pb.go +++ b/sdk/go/protos/feast/core/Store.pb.go @@ -17,7 +17,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/core/Store.proto package core diff --git a/sdk/go/protos/feast/serving/ServingService.pb.go b/sdk/go/protos/feast/serving/ServingService.pb.go index dc99f28510..f77930562e 100644 --- a/sdk/go/protos/feast/serving/ServingService.pb.go +++ b/sdk/go/protos/feast/serving/ServingService.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/serving/ServingService.proto package serving diff --git a/sdk/go/protos/feast/storage/Redis.pb.go b/sdk/go/protos/feast/storage/Redis.pb.go index d169c8bf32..7b3a3f6a77 100644 --- a/sdk/go/protos/feast/storage/Redis.pb.go +++ b/sdk/go/protos/feast/storage/Redis.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/storage/Redis.proto package storage diff --git a/sdk/go/protos/feast/types/FeatureRow.pb.go b/sdk/go/protos/feast/types/FeatureRow.pb.go index 0c61f25f3d..f6fe6bfa42 100644 --- a/sdk/go/protos/feast/types/FeatureRow.pb.go +++ b/sdk/go/protos/feast/types/FeatureRow.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/types/FeatureRow.proto package types diff --git a/sdk/go/protos/feast/types/FeatureRowExtended.pb.go b/sdk/go/protos/feast/types/FeatureRowExtended.pb.go index d1e02b1e0a..a01f0b0417 100644 --- a/sdk/go/protos/feast/types/FeatureRowExtended.pb.go +++ b/sdk/go/protos/feast/types/FeatureRowExtended.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/types/FeatureRowExtended.proto package types diff --git a/sdk/go/protos/feast/types/Field.pb.go b/sdk/go/protos/feast/types/Field.pb.go index 9dad77cdb9..f2562b72d9 100644 --- a/sdk/go/protos/feast/types/Field.pb.go +++ b/sdk/go/protos/feast/types/Field.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/types/Field.proto package types diff --git a/sdk/go/protos/feast/types/Value.pb.go b/sdk/go/protos/feast/types/Value.pb.go index 3b19435633..3625cef1a5 100644 --- a/sdk/go/protos/feast/types/Value.pb.go +++ b/sdk/go/protos/feast/types/Value.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: feast/types/Value.proto package types diff --git a/sdk/go/protos/tensorflow_metadata/proto/v0/path.pb.go b/sdk/go/protos/tensorflow_metadata/proto/v0/path.pb.go index 1daa7687f9..a1e5137c72 100644 --- a/sdk/go/protos/tensorflow_metadata/proto/v0/path.pb.go +++ b/sdk/go/protos/tensorflow_metadata/proto/v0/path.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: tensorflow_metadata/proto/v0/path.proto package v0 diff --git a/sdk/go/protos/tensorflow_metadata/proto/v0/schema.pb.go b/sdk/go/protos/tensorflow_metadata/proto/v0/schema.pb.go index 940779a191..25bf40bc7f 100644 --- a/sdk/go/protos/tensorflow_metadata/proto/v0/schema.pb.go +++ b/sdk/go/protos/tensorflow_metadata/proto/v0/schema.pb.go @@ -16,7 +16,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: tensorflow_metadata/proto/v0/schema.proto package v0 diff --git a/sdk/go/protos/tensorflow_metadata/proto/v0/statistics.pb.go b/sdk/go/protos/tensorflow_metadata/proto/v0/statistics.pb.go index fbf6247a1d..6a102a28af 100644 --- a/sdk/go/protos/tensorflow_metadata/proto/v0/statistics.pb.go +++ b/sdk/go/protos/tensorflow_metadata/proto/v0/statistics.pb.go @@ -20,7 +20,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.25.0 -// protoc v3.12.4 +// protoc v3.10.0 // source: tensorflow_metadata/proto/v0/statistics.proto package v0 diff --git a/sdk/python/feast/client.py b/sdk/python/feast/client.py index ed80714845..08d6b5a9be 100644 --- a/sdk/python/feast/client.py +++ b/sdk/python/feast/client.py @@ -58,6 +58,7 @@ ListProjectsResponse, ) from feast.core.CoreService_pb2_grpc import CoreServiceStub +from feast.data_format import ParquetFormat from feast.data_source import BigQuerySource, FileSource from feast.entity import Entity from feast.feature import _build_feature_references @@ -657,10 +658,9 @@ def ingest( if ( feature_table.batch_source and issubclass(type(feature_table.batch_source), FileSource) - and "".join( - feature_table.batch_source.file_options.file_format.split() - ).lower() - != "parquet" + and isinstance( + type(feature_table.batch_source.file_options.file_format), ParquetFormat + ) ): raise Exception( f"No suitable batch source found for FeatureTable, {name}." diff --git a/sdk/python/feast/data_format.py b/sdk/python/feast/data_format.py new file mode 100644 index 0000000000..d876fa03e8 --- /dev/null +++ b/sdk/python/feast/data_format.py @@ -0,0 +1,131 @@ +# Copyright 2020 The Feast Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY aIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from abc import ABC, abstractmethod + +from feast.core.DataFormat_pb2 import FileFormat as FileFormatProto +from feast.core.DataFormat_pb2 import StreamFormat as StreamFormatProto + + +class FileFormat(ABC): + """ + Defines an abtract file forma used to encode feature data in files + """ + + @abstractmethod + def to_proto(self): + """ + Convert this FileFormat into its protobuf representation. + """ + pass + + def __eq__(self, other): + return self.to_proto() == other.to_proto() + + @classmethod + def from_proto(cls, proto): + """ + Construct this FileFormat from its protobuf representation. + Raises NotImplementedError if FileFormat specified in given proto is not supported. + """ + fmt = proto.WhichOneof("format") + if fmt == "parquet_format": + return ParquetFormat() + raise NotImplementedError(f"FileFormat is unsupported: {fmt}") + + def __str__(self): + """ + String representation of the file format passed to spark + """ + raise NotImplementedError() + + +class ParquetFormat(FileFormat): + """ + Defines the Parquet data format + """ + + def to_proto(self): + return FileFormatProto(parquet_format=FileFormatProto.ParquetFormat()) + + def __str__(self): + return "parquet" + + +class StreamFormat(ABC): + """ + Defines an abtracts streaming data format used to encode feature data in streams + """ + + @abstractmethod + def to_proto(self): + """ + Convert this StreamFormat into its protobuf representation. + """ + pass + + def __eq__(self, other): + return self.to_proto() == other.to_proto() + + @classmethod + def from_proto(cls, proto): + """ + Construct this StreamFormat from its protobuf representation. + """ + fmt = proto.WhichOneof("format") + if fmt == "avro_format": + return AvroFormat(schema_json=proto.avro_format.schema_json) + if fmt == "proto_format": + return ProtoFormat(class_path=proto.proto_format.class_path) + raise NotImplementedError(f"StreamFormat is unsupported: {fmt}") + + +class AvroFormat(StreamFormat): + """ + Defines the Avro streaming data format that encodes data in Avro format + """ + + def __init__(self, schema_json: str): + """ + Construct a new Avro data format. + + Args: + schema_json: Avro schema definition in JSON + """ + self.schema_json = schema_json + + def to_proto(self): + proto = StreamFormatProto.AvroFormat(schema_json=self.schema_json) + return StreamFormatProto(avro_format=proto) + + +class ProtoFormat(StreamFormat): + """ + Defines the Protobuf data format + """ + + def __init__(self, class_path: str): + """ + Construct a new Protobuf data format. + + Args: + class_path: Class path to the Java Protobuf class that can be used to decode protobuf messages.; + """ + self.class_path = class_path + + def to_proto(self): + return StreamFormatProto( + proto_format=StreamFormatProto.ProtoFormat(class_path=self.class_path) + ) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 421d3d5c5b..58f7ec6bdb 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -17,6 +17,7 @@ from typing import Dict, Optional from feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.data_format import FileFormat, StreamFormat class SourceType(enum.Enum): @@ -37,7 +38,7 @@ class FileOptions: """ def __init__( - self, file_format: str, file_url: str, + self, file_format: FileFormat, file_url: str, ): self._file_format = file_format self._file_url = file_url @@ -75,18 +76,16 @@ def from_proto(cls, file_options_proto: DataSourceProto.FileOptions): """ Creates a FileOptions from a protobuf representation of a file option - Args: - file_options_proto: A protobuf representation of a DataSource + args: + file_options_proto: a protobuf representation of a datasource Returns: Returns a FileOptions object based on the file_options protobuf """ - file_options = cls( - file_format=file_options_proto.file_format, + file_format=FileFormat.from_proto(file_options_proto.file_format), file_url=file_options_proto.file_url, ) - return file_options def to_proto(self) -> DataSourceProto.FileOptions: @@ -98,7 +97,7 @@ def to_proto(self) -> DataSourceProto.FileOptions: """ file_options_proto = DataSourceProto.FileOptions( - file_format=self.file_format, file_url=self.file_url, + file_format=self.file_format.to_proto(), file_url=self.file_url, ) return file_options_proto @@ -165,10 +164,10 @@ class KafkaOptions: """ def __init__( - self, bootstrap_servers: str, class_path: str, topic: str, + self, bootstrap_servers: str, message_format: StreamFormat, topic: str, ): self._bootstrap_servers = bootstrap_servers - self._class_path = class_path + self._message_format = message_format self._topic = topic @property @@ -186,20 +185,18 @@ def bootstrap_servers(self, bootstrap_servers): self._bootstrap_servers = bootstrap_servers @property - def class_path(self): + def message_format(self): """ - Returns the class path to the generated Java Protobuf class that can be - used to decode feature data from the obtained Kafka message + Returns the data format that is used to encode the feature data in Kafka messages """ - return self._class_path + return self._message_format - @class_path.setter - def class_path(self, class_path): + @message_format.setter + def message_format(self, message_format): """ - Sets the class path to the generated Java Protobuf class that can be - used to decode feature data from the obtained Kafka message + Sets the data format that is used to encode the feature data in Kafka messages """ - self._class_path = class_path + self._message_format = message_format @property def topic(self): @@ -229,7 +226,7 @@ def from_proto(cls, kafka_options_proto: DataSourceProto.KafkaOptions): kafka_options = cls( bootstrap_servers=kafka_options_proto.bootstrap_servers, - class_path=kafka_options_proto.class_path, + message_format=StreamFormat.from_proto(kafka_options_proto.message_format), topic=kafka_options_proto.topic, ) @@ -245,7 +242,7 @@ def to_proto(self) -> DataSourceProto.KafkaOptions: kafka_options_proto = DataSourceProto.KafkaOptions( bootstrap_servers=self.bootstrap_servers, - class_path=self.class_path, + message_format=self.message_format.to_proto(), topic=self.topic, ) @@ -258,27 +255,25 @@ class KinesisOptions: """ def __init__( - self, class_path: str, region: str, stream_name: str, + self, record_format: StreamFormat, region: str, stream_name: str, ): - self._class_path = class_path + self._record_format = record_format self._region = region self._stream_name = stream_name @property - def class_path(self): + def record_format(self): """ - Returns the class path to the generated Java Protobuf class that can be - used to decode feature data from the obtained Kinesis record + Returns the data format used to encode the feature data in the Kinesis records. """ - return self._class_path + return self._record_format - @class_path.setter - def class_path(self, class_path): + @record_format.setter + def record_format(self, record_format): """ - Sets the class path to the generated Java Protobuf class that can be - used to decode feature data from the obtained Kinesis record + Sets the data format used to encode the feature data in the Kinesis records. """ - self._class_path = class_path + self._record_format = record_format @property def region(self): @@ -321,7 +316,7 @@ def from_proto(cls, kinesis_options_proto: DataSourceProto.KinesisOptions): """ kinesis_options = cls( - class_path=kinesis_options_proto.class_path, + record_format=StreamFormat.from_proto(kinesis_options_proto.record_format), region=kinesis_options_proto.region, stream_name=kinesis_options_proto.stream_name, ) @@ -337,7 +332,7 @@ def to_proto(self) -> DataSourceProto.KinesisOptions: """ kinesis_options_proto = DataSourceProto.KinesisOptions( - class_path=self.class_path, + record_format=self.record_format.to_proto(), region=self.region, stream_name=self.stream_name, ) @@ -441,7 +436,7 @@ def from_proto(data_source): if data_source.file_options.file_format and data_source.file_options.file_url: data_source_obj = FileSource( field_mapping=data_source.field_mapping, - file_format=data_source.file_options.file_format, + file_format=FileFormat.from_proto(data_source.file_options.file_format), file_url=data_source.file_options.file_url, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, @@ -458,25 +453,29 @@ def from_proto(data_source): elif ( data_source.kafka_options.bootstrap_servers and data_source.kafka_options.topic - and data_source.kafka_options.class_path + and data_source.kafka_options.message_format ): data_source_obj = KafkaSource( field_mapping=data_source.field_mapping, bootstrap_servers=data_source.kafka_options.bootstrap_servers, - class_path=data_source.kafka_options.class_path, + message_format=StreamFormat.from_proto( + data_source.kafka_options.message_format + ), topic=data_source.kafka_options.topic, event_timestamp_column=data_source.event_timestamp_column, created_timestamp_column=data_source.created_timestamp_column, date_partition_column=data_source.date_partition_column, ) elif ( - data_source.kinesis_options.class_path + data_source.kinesis_options.record_format and data_source.kinesis_options.region and data_source.kinesis_options.stream_name ): data_source_obj = KinesisSource( field_mapping=data_source.field_mapping, - class_path=data_source.kinesis_options.class_path, + record_format=StreamFormat.from_proto( + data_source.kinesis_options.record_format + ), region=data_source.kinesis_options.region, stream_name=data_source.kinesis_options.stream_name, event_timestamp_column=data_source.event_timestamp_column, @@ -500,7 +499,7 @@ def __init__( self, event_timestamp_column: str, created_timestamp_column: str, - file_format: str, + file_format: FileFormat, file_url: str, field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", @@ -522,7 +521,6 @@ def __eq__(self, other): or self.file_options.file_format != other.file_options.file_format ): return False - return True @property @@ -615,7 +613,7 @@ def __init__( event_timestamp_column: str, created_timestamp_column: str, bootstrap_servers: str, - class_path: str, + message_format: StreamFormat, topic: str, field_mapping: Optional[Dict[str, str]] = dict(), date_partition_column: Optional[str] = "", @@ -627,7 +625,9 @@ def __init__( date_partition_column, ) self._kafka_options = KafkaOptions( - bootstrap_servers=bootstrap_servers, class_path=class_path, topic=topic + bootstrap_servers=bootstrap_servers, + message_format=message_format, + topic=topic, ) def __eq__(self, other): @@ -639,7 +639,7 @@ def __eq__(self, other): if ( self.kafka_options.bootstrap_servers != other.kafka_options.bootstrap_servers - or self.kafka_options.class_path != other.kafka_options.class_path + or self.kafka_options.message_format != other.kafka_options.message_format or self.kafka_options.topic != other.kafka_options.topic ): return False @@ -679,7 +679,7 @@ def __init__( self, event_timestamp_column: str, created_timestamp_column: str, - class_path: str, + record_format: StreamFormat, region: str, stream_name: str, field_mapping: Optional[Dict[str, str]] = dict(), @@ -692,7 +692,7 @@ def __init__( date_partition_column, ) self._kinesis_options = KinesisOptions( - class_path=class_path, region=region, stream_name=stream_name + record_format=record_format, region=region, stream_name=stream_name ) def __eq__(self, other): @@ -702,7 +702,7 @@ def __eq__(self, other): ) if ( - self.kinesis_options.class_path != other.kinesis_options.class_path + self.kinesis_options.record_format != other.kinesis_options.record_format or self.kinesis_options.region != other.kinesis_options.region or self.kinesis_options.stream_name != other.kinesis_options.stream_name ): diff --git a/sdk/python/feast/pyspark/launcher.py b/sdk/python/feast/pyspark/launcher.py index 61bf16cef6..9f1c40ab2a 100644 --- a/sdk/python/feast/pyspark/launcher.py +++ b/sdk/python/feast/pyspark/launcher.py @@ -1,7 +1,7 @@ import shutil import tempfile from datetime import datetime -from typing import TYPE_CHECKING, List, Union +from typing import TYPE_CHECKING, List, Union, cast from urllib.parse import urlparse from feast.config import Config @@ -86,8 +86,8 @@ def resolve_launcher(config: Config) -> JobLauncher: _SOURCES = { - FileSource: ("file", "file_options", {"path": "file_url", "format": "file_format"}), - BigQuerySource: ("bq", "bigquery_options", {"table_ref": "table_ref"}), + FileSource: ("file", "file_options"), + BigQuerySource: ("bq", "bigquery_options"), } @@ -99,17 +99,18 @@ def _source_to_argument(source: DataSource): "date_partition_column": source.date_partition_column, } - kind, option_field, extra_properties = _SOURCES[type(source)] - - properties = { - **common_properties, - **{ - k: getattr(getattr(source, option_field), ref) - for k, ref in extra_properties.items() - }, - } - - return {kind: properties} + kind, option_field = _SOURCES[type(source)] + properties = {**common_properties} + if type(source) == FileSource: + file_source = cast(FileSource, source) + properties["path"] = file_source.file_options.file_url + properties["format"] = str(file_source.file_options.file_format) + return {kind: properties} + if type(source) == BigQuerySource: + bq_source = cast(BigQuerySource, source) + properties["table_ref"] = bq_source.bigquery_options.table_ref + return {kind: properties} + raise NotImplementedError(f"Unsupported Datasource: {type(source)}") def _feature_table_to_argument(client: "Client", feature_table: FeatureTable): diff --git a/sdk/python/feast/pyspark/launchers/aws/emr.py b/sdk/python/feast/pyspark/launchers/aws/emr.py index 0e8a4c23a8..04b32ac923 100644 --- a/sdk/python/feast/pyspark/launchers/aws/emr.py +++ b/sdk/python/feast/pyspark/launchers/aws/emr.py @@ -6,6 +6,7 @@ import boto3 import pandas +from feast.data_format import ParquetFormat from feast.data_source import FileSource from feast.pyspark.abc import ( IngestionJob, @@ -242,6 +243,6 @@ def stage_dataframe( return FileSource( event_timestamp_column=event_timestamp, created_timestamp_column=created_timestamp_column, - file_format="parquet", + file_format=ParquetFormat(), file_url=file_url, ) diff --git a/sdk/python/tests/test_client.py b/sdk/python/tests/test_client.py index 1d970907d5..54948c273b 100644 --- a/sdk/python/tests/test_client.py +++ b/sdk/python/tests/test_client.py @@ -41,6 +41,7 @@ from feast.core.FeatureTable_pb2 import FeatureTable as FeatureTableProto from feast.core.FeatureTable_pb2 import FeatureTableMeta as FeatureTableMetaProto from feast.core.FeatureTable_pb2 import FeatureTableSpec as FeatureTableSpecProto +from feast.data_format import ParquetFormat, ProtoFormat from feast.data_source import FileSource, KafkaSource from feast.entity import Entity from feast.feature import Feature @@ -398,7 +399,7 @@ def test_apply_feature_table_success(self, test_client): # Create Feature Tables batch_source = FileSource( - file_format="parquet", + file_format=ParquetFormat(), file_url="file://feast/*", event_timestamp_column="ts_col", created_timestamp_column="timestamp", @@ -407,7 +408,7 @@ def test_apply_feature_table_success(self, test_client): stream_source = KafkaSource( bootstrap_servers="localhost:9094", - class_path="random/path/to/class", + message_format=ProtoFormat("class.path"), topic="test_topic", event_timestamp_column="ts_col", created_timestamp_column="timestamp", @@ -820,7 +821,7 @@ def _ingest_test_getfeaturetable_mocked_resp( entities=["dev_entity"], batch_source=DataSourceProto( file_options=DataSourceProto.FileOptions( - file_format="parquet", file_url=file_url + file_format=ParquetFormat().to_proto(), file_url=file_url ), event_timestamp_column="datetime", created_timestamp_column="timestamp", diff --git a/sdk/python/tests/test_feature_table.py b/sdk/python/tests/test_feature_table.py index 0dd8bb1717..858704bd7c 100644 --- a/sdk/python/tests/test_feature_table.py +++ b/sdk/python/tests/test_feature_table.py @@ -21,6 +21,7 @@ from feast.client import Client from feast.core import CoreService_pb2_grpc as Core +from feast.data_format import ParquetFormat, ProtoFormat from feast.data_source import FileSource, KafkaSource from feast.feature import Feature from feast.feature_table import FeatureTable @@ -59,7 +60,7 @@ def test_feature_table_import_export_yaml(self): "ride_distance": "ride_distance", "ride_duration": "ride_duration", }, - file_format="parquet", + file_format=ParquetFormat(), file_url="file://feast/*", event_timestamp_column="ts_col", created_timestamp_column="timestamp", @@ -72,7 +73,7 @@ def test_feature_table_import_export_yaml(self): "ride_duration": "ride_duration", }, bootstrap_servers="localhost:9094", - class_path="random/path/to/class", + message_format=ProtoFormat(class_path="class.path"), topic="test_topic", event_timestamp_column="ts_col", created_timestamp_column="timestamp", diff --git a/sdk/python/tests/test_historical_feature_retrieval.py b/sdk/python/tests/test_historical_feature_retrieval.py index d9dbd9f4fc..286661148a 100644 --- a/sdk/python/tests/test_historical_feature_retrieval.py +++ b/sdk/python/tests/test_historical_feature_retrieval.py @@ -22,6 +22,7 @@ from feast import Client, Entity, Feature, FeatureTable, FileSource, ValueType from feast.core import CoreService_pb2_grpc as Core +from feast.data_format import ParquetFormat from tests.feast_core_server import CoreServicer @@ -153,7 +154,7 @@ def transactions_feature_table(spark, client): spark, "transactions", schema, df_data ) file_source = FileSource( - "event_timestamp", "created_timestamp", "parquet", file_uri + "event_timestamp", "created_timestamp", ParquetFormat(), file_uri ) features = [ Feature("total_transactions", ValueType.DOUBLE), @@ -199,7 +200,7 @@ def bookings_feature_table(spark, client): temp_dir, file_uri = create_temp_parquet_file(spark, "bookings", schema, df_data) file_source = FileSource( - "event_timestamp", "created_timestamp", "parquet", file_uri + "event_timestamp", "created_timestamp", ParquetFormat(), file_uri ) features = [Feature("total_completed_bookings", ValueType.INT32)] max_age = Duration() @@ -244,7 +245,7 @@ def bookings_feature_table_with_mapping(spark, client): temp_dir, file_uri = create_temp_parquet_file(spark, "bookings", schema, df_data) file_source = FileSource( - "datetime", "created_datetime", "parquet", file_uri, {"id": "driver_id"} + "datetime", "created_datetime", ParquetFormat(), file_uri, {"id": "driver_id"} ) features = [Feature("total_completed_bookings", ValueType.INT32)] max_age = Duration() @@ -283,7 +284,7 @@ def test_historical_feature_retrieval_from_local_spark_session( spark, "customer_driver_pair", schema, df_data ) customer_driver_pairs_source = FileSource( - "event_timestamp", "created_timestamp", "parquet", file_uri + "event_timestamp", "created_timestamp", ParquetFormat(), file_uri ) joined_df = client.get_historical_features_df( ["transactions:total_transactions", "bookings:total_completed_bookings"], @@ -330,7 +331,7 @@ def test_historical_feature_retrieval_with_field_mappings_from_local_spark_sessi ] temp_dir, file_uri = create_temp_parquet_file(spark, "drivers", schema, df_data) entity_source = FileSource( - "event_timestamp", "created_timestamp", "parquet", file_uri + "event_timestamp", "created_timestamp", ParquetFormat(), file_uri ) joined_df = client.get_historical_features_df( ["bookings:total_completed_bookings"], entity_source, diff --git a/serving/src/test/java/feast/serving/it/TestUtils.java b/serving/src/test/java/feast/serving/it/TestUtils.java index 4903a8c4d0..0c72df1ea8 100644 --- a/serving/src/test/java/feast/serving/it/TestUtils.java +++ b/serving/src/test/java/feast/serving/it/TestUtils.java @@ -19,14 +19,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.google.common.collect.ImmutableMap; -import com.google.protobuf.Duration; import feast.common.auth.credentials.OAuthCredentials; +import feast.common.it.DataGenerator; import feast.proto.core.CoreServiceGrpc; import feast.proto.core.CoreServiceGrpc.CoreServiceBlockingStub; -import feast.proto.core.DataSourceProto.DataSource; import feast.proto.core.EntityProto.Entity; import feast.proto.core.EntityProto.EntitySpecV2; -import feast.proto.core.FeatureProto.FeatureSpecV2; import feast.proto.core.FeatureTableProto.FeatureTable; import feast.proto.core.FeatureTableProto.FeatureTableSpec; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; @@ -37,7 +35,6 @@ import io.grpc.Channel; import io.grpc.ManagedChannelBuilder; import java.util.*; -import java.util.stream.Collectors; public class TestUtils { @@ -64,44 +61,6 @@ public static CoreSimpleAPIClient getApiClientForCore(int feastCorePort) { return new CoreSimpleAPIClient(coreService); } - public static FeatureTableSpec createFeatureTableSpec( - String name, - List entities, - Map features, - int maxAgeSecs, - Map labels) { - return FeatureTableSpec.newBuilder() - .setName(name) - .setMaxAge(Duration.newBuilder().setSeconds(maxAgeSecs).build()) - .addAllEntities(entities) - .addAllFeatures( - features.entrySet().stream() - .map( - entry -> - FeatureSpecV2.newBuilder() - .setName(entry.getKey()) - .setValueType(entry.getValue()) - .putAllLabels(labels) - .build()) - .collect(Collectors.toList())) - .putAllLabels(labels) - .build(); - } - - public static DataSource createFileDataSourceSpec( - String fileURL, String fileFormat, String timestampColumn, String datePartitionColumn) { - return DataSource.newBuilder() - .setType(DataSource.SourceType.BATCH_FILE) - .setFileOptions( - DataSource.FileOptions.newBuilder() - .setFileFormat(fileFormat) - .setFileUrl(fileURL) - .build()) - .setEventTimestampColumn(timestampColumn) - .setDatePartitionColumn(datePartitionColumn) - .build(); - } - public static GetOnlineFeaturesRequestV2 createOnlineFeatureRequest( String projectName, List featureReferences, @@ -121,19 +80,15 @@ public static void applyFeatureTable( ImmutableMap features, int maxAgeSecs) { FeatureTableSpec expectedFeatureTableSpec = - createFeatureTableSpec( + DataGenerator.createFeatureTableSpec( featureTableName, entities, - new HashMap<>() { - { - putAll(features); - } - }, + features, maxAgeSecs, - ImmutableMap.of("feat_key2", "feat_value2")) + Map.of("feat_key2", "feat_value2")) .toBuilder() .setBatchSource( - createFileDataSourceSpec("file:///path/to/file", "parquet", "ts_col", "")) + DataGenerator.createFileDataSourceSpec("file:///path/to/file", "ts_col", "dt_col")) .build(); secureApiClient.simpleApplyFeatureTable(expectedFeatureTableSpec); FeatureTable actualFeatureTable = diff --git a/tests/e2e/test-register.py b/tests/e2e/test-register.py index 1bb64a711b..4bd8966817 100644 --- a/tests/e2e/test-register.py +++ b/tests/e2e/test-register.py @@ -10,6 +10,7 @@ from pandas.testing import assert_frame_equal from feast.client import Client +from feast.data_format import ParquetFormat, ProtoFormat from feast.data_source import BigQuerySource, FileSource, KafkaSource from feast.entity import Entity from feast.feature import Feature @@ -67,7 +68,7 @@ def basic_featuretable(): "dev_feature_float": "dev_feature_float_field", "dev_feature_string": "dev_feature_string_field", }, - file_format="PARQUET", + file_format=ParquetFormat(), file_url="gs://example/feast/*", event_timestamp_column="datetime_col", created_timestamp_column="timestamp", @@ -80,7 +81,7 @@ def basic_featuretable(): "dev_feature_string": "dev_feature_string_field", }, bootstrap_servers="localhost:9094", - class_path="random/path/to/class", + message_format=ProtoFormat(class_path="class.path"), topic="test_topic", event_timestamp_column="datetime_col", created_timestamp_column="timestamp", @@ -144,7 +145,7 @@ def alltypes_entity(): @pytest.fixture def alltypes_featuretable(): batch_source = FileSource( - file_format="parquet", + file_format=ParquetFormat(), file_url="file://feast/*", event_timestamp_column="ts_col", created_timestamp_column="timestamp",