diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index e954733195..22f1c257fb 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -26,14 +26,14 @@ import "feast/core/DataFormat.proto"; import "feast/types/Value.proto"; // Defines a Data Source that can be used source Feature data -// Next available id: 22 +// Next available id: 23 message DataSource { // Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not, // but they are going to be reserved for backwards compatibility. reserved 6 to 10; // Type of Data Source. - // Next available id: 9 + // Next available id: 10 enum SourceType { INVALID = 0; BATCH_FILE = 1; @@ -44,7 +44,7 @@ message DataSource { STREAM_KINESIS = 4; CUSTOM_SOURCE = 6; REQUEST_SOURCE = 7; - + PUSH_SOURCE = 9; } // Unique name of data source within the project @@ -169,6 +169,16 @@ message DataSource { map schema = 2; } + // Defines options for DataSource that supports pushing data to it. This allows data to be pushed to + // the online store on-demand, such as by stream consumers. + message PushOptions { + // Mapping of feature name to type + map schema = 1; + // Optional batch source for the push source for historical features and materialization. + DataSource batch_source = 2; + } + + // DataSource options. oneof options { FileOptions file_options = 11; @@ -179,5 +189,6 @@ message DataSource { RequestDataOptions request_data_options = 18; CustomSourceOptions custom_options = 16; SnowflakeOptions snowflake_options = 19; + PushOptions push_options = 22; } } diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 15ce0c2377..3794b797cc 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -522,3 +522,75 @@ def to_proto(self) -> DataSourceProto: data_source_proto.date_partition_column = self.date_partition_column return data_source_proto + + +class PushSource(DataSource): + """ + PushSource that can be used to ingest features on request + + Args: + name: Name of the push source + schema: Schema mapping from the input feature name to a ValueType + """ + + name: str + schema: Dict[str, ValueType] + batch_source: Optional[DataSource] + + def __init__( + self, + name: str, + schema: Dict[str, ValueType], + batch_source: Optional[DataSource] = None, + ): + """Creates a PushSource object.""" + super().__init__(name) + self.schema = schema + self.batch_source = batch_source + + def validate(self, config: RepoConfig): + pass + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + pass + + @staticmethod + def from_proto(data_source: DataSourceProto): + schema_pb = data_source.push_options.schema + schema = {} + for key, value in schema_pb.items(): + schema[key] = value + + batch_source = None + if data_source.push_options.HasField("batch_source"): + batch_source = DataSource.from_proto(data_source.push_options.batch_source) + + return PushSource( + name=data_source.name, schema=schema, batch_source=batch_source + ) + + def to_proto(self) -> DataSourceProto: + schema_pb = {} + for key, value in self.schema.items(): + schema_pb[key] = value + batch_source_proto = None + if self.batch_source: + batch_source_proto = self.batch_source.to_proto() + + options = DataSourceProto.PushOptions( + schema=schema_pb, batch_source=batch_source_proto + ) + data_source_proto = DataSourceProto( + name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options, + ) + + return data_source_proto + + def get_table_query_string(self) -> str: + raise NotImplementedError + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + raise NotImplementedError diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py new file mode 100644 index 0000000000..cae5249694 --- /dev/null +++ b/sdk/python/tests/unit/test_data_sources.py @@ -0,0 +1,34 @@ +from feast.data_source import PushSource +from feast.infra.offline_stores.bigquery_source import BigQuerySource +from feast.protos.feast.types.Value_pb2 import ValueType + + +def test_push_no_batch(): + push_source = PushSource( + name="test", schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64} + ) + push_source_proto = push_source.to_proto() + assert push_source_proto.push_options is not None + assert not push_source_proto.push_options.HasField("batch_source") + push_source_unproto = PushSource.from_proto(push_source_proto) + + assert push_source.name == push_source_unproto.name + assert push_source.schema == push_source_unproto.schema + assert push_source.batch_source == push_source_unproto.batch_source + + +def test_push_with_batch(): + push_source = PushSource( + name="test", + schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64}, + batch_source=BigQuerySource(table="test.test"), + ) + push_source_proto = push_source.to_proto() + assert push_source_proto.push_options is not None + assert push_source_proto.push_options.HasField("batch_source") + + push_source_unproto = PushSource.from_proto(push_source_proto) + + assert push_source.name == push_source_unproto.name + assert push_source.schema == push_source_unproto.schema + assert push_source.batch_source.name == push_source_unproto.batch_source.name