From d4d0df3ad15f0fa416807af60188f2beac02176e Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 21 Mar 2022 13:31:06 -0700 Subject: [PATCH 1/6] fix: Add PushSource proto and Python class Signed-off-by: Achal Shah --- protos/feast/core/DataSource.proto | 16 +++++-- sdk/python/feast/data_source.py | 74 ++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index e954733195..66d1249920 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -26,14 +26,14 @@ import "feast/core/DataFormat.proto"; import "feast/types/Value.proto"; // Defines a Data Source that can be used source Feature data -// Next available id: 22 +// Next available id: 23 message DataSource { // Field indexes should *not* be reused. Not sure if fields 6-10 were used previously or not, // but they are going to be reserved for backwards compatibility. reserved 6 to 10; // Type of Data Source. - // Next available id: 9 + // Next available id: 10 enum SourceType { INVALID = 0; BATCH_FILE = 1; @@ -44,7 +44,7 @@ message DataSource { STREAM_KINESIS = 4; CUSTOM_SOURCE = 6; REQUEST_SOURCE = 7; - + PUSH_SOURCE = 9; } // Unique name of data source within the project @@ -169,6 +169,15 @@ message DataSource { map schema = 2; } + // Defines options for DataSource that Push Sources + message PushOptions { + reserved 1; + // Mapping of feature name to type + map schema = 2; + DataSource batch_source = 3; + } + + // DataSource options. oneof options { FileOptions file_options = 11; @@ -179,5 +188,6 @@ message DataSource { RequestDataOptions request_data_options = 18; CustomSourceOptions custom_options = 16; SnowflakeOptions snowflake_options = 19; + PushOptions push_options = 22; } } diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 15ce0c2377..1608cc4e82 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -522,3 +522,77 @@ def to_proto(self) -> DataSourceProto: data_source_proto.date_partition_column = self.date_partition_column return data_source_proto + + +class PushSource(DataSource): + """ + PushSource that can be used to ingest features on request + + Args: + name: Name of the push source + schema: Schema mapping from the input feature name to a ValueType + """ + + name: str + schema: Dict[str, ValueType] + batch_source: Optional[DataSource] + + def __init__( + self, + name: str, + schema: Dict[str, ValueType], + batch_source: Optional[DataSource] = None, + ): + """Creates a PushSource object.""" + super().__init__(name) + self.schema = schema + self.batch_source = batch_source + + def validate(self, config: RepoConfig): + pass + + def get_table_column_names_and_types( + self, config: RepoConfig + ) -> Iterable[Tuple[str, str]]: + pass + + @staticmethod + def from_proto(data_source: DataSourceProto): + schema_pb = data_source.push_options.schema + schema = {} + for key in schema_pb.keys(): + schema[key] = ValueType(schema_pb.get(key)) + + batch_source = None + if data_source.push_options.batch_source: + batch_source = DataSource.from_proto(data_source.push_options.batch_source) + + return PushSource( + name=data_source.name, schema=schema, batch_source=batch_source + ) + + def to_proto(self) -> DataSourceProto: + schema_pb = {} + for key, value in self.schema.items(): + schema_pb[key] = value.value + batch_source_proto = None + if self.batch_source: + batch_source_proto = self.batch_source.to_proto() + + options = DataSourceProto.PushOptions( + schema=schema_pb, batch_source=batch_source_proto + ) + data_source_proto = DataSourceProto( + name=self.name, + type=DataSourceProto.REQUEST_SOURCE, + request_data_options=options, + ) + + return data_source_proto + + def get_table_query_string(self) -> str: + raise NotImplementedError + + @staticmethod + def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: + raise NotImplementedError From b229f918edb8815230ecffa5ce1925ca88aa7dee Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 21 Mar 2022 16:27:51 -0700 Subject: [PATCH 2/6] tests Signed-off-by: Achal Shah --- sdk/python/feast/data_source.py | 8 ++--- sdk/python/tests/unit/test_data_sources.py | 36 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 5 deletions(-) create mode 100644 sdk/python/tests/unit/test_data_sources.py diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 1608cc4e82..5a3cb0150b 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -564,7 +564,7 @@ def from_proto(data_source: DataSourceProto): schema[key] = ValueType(schema_pb.get(key)) batch_source = None - if data_source.push_options.batch_source: + if data_source.push_options.HasField("batch_source"): batch_source = DataSource.from_proto(data_source.push_options.batch_source) return PushSource( @@ -574,7 +574,7 @@ def from_proto(data_source: DataSourceProto): def to_proto(self) -> DataSourceProto: schema_pb = {} for key, value in self.schema.items(): - schema_pb[key] = value.value + schema_pb[key] = value batch_source_proto = None if self.batch_source: batch_source_proto = self.batch_source.to_proto() @@ -583,9 +583,7 @@ def to_proto(self) -> DataSourceProto: schema=schema_pb, batch_source=batch_source_proto ) data_source_proto = DataSourceProto( - name=self.name, - type=DataSourceProto.REQUEST_SOURCE, - request_data_options=options, + name=self.name, type=DataSourceProto.PUSH_SOURCE, push_options=options, ) return data_source_proto diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py new file mode 100644 index 0000000000..dcf3a83ea0 --- /dev/null +++ b/sdk/python/tests/unit/test_data_sources.py @@ -0,0 +1,36 @@ +import unittest + +from feast.data_source import PushSource +from feast.infra.offline_stores.bigquery_source import BigQuerySource +from feast.protos.feast.types.Value_pb2 import ValueType + + +class DataSourceTestCases(unittest.TestCase): + def test_push_no_batch(self): + push_source = PushSource( + name="test", schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64} + ) + push_source_proto = push_source.to_proto() + self.assertIsNotNone(push_source_proto.push_options) + self.assertFalse(push_source_proto.push_options.HasField("batch_source")) + push_source_unproto = PushSource.from_proto(push_source_proto) + + self.assertEqual(push_source, push_source_unproto) + + def test_push_with_batch(self): + push_source = PushSource( + name="test", + schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64}, + batch_source=BigQuerySource(table="test.test"), + ) + push_source_proto = push_source.to_proto() + self.assertIsNotNone(push_source_proto.push_options) + self.assertTrue(push_source_proto.push_options.HasField("batch_source")) + + import pdb + + pdb.set_trace() + + push_source_unproto = PushSource.from_proto(push_source_proto) + + self.assertEqual(push_source, push_source_unproto) From 1413a9b40f7a4efbf7c49ec65d7352fdb07bac2d Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Mon, 21 Mar 2022 16:44:23 -0700 Subject: [PATCH 3/6] remove pdb Signed-off-by: Achal Shah --- sdk/python/tests/unit/test_data_sources.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index dcf3a83ea0..2bb0332561 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -27,10 +27,6 @@ def test_push_with_batch(self): self.assertIsNotNone(push_source_proto.push_options) self.assertTrue(push_source_proto.push_options.HasField("batch_source")) - import pdb - - pdb.set_trace() - push_source_unproto = PushSource.from_proto(push_source_proto) self.assertEqual(push_source, push_source_unproto) From ad3816548b4ccb19f7c4972b86719dd08e1f6b11 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 22 Mar 2022 10:49:54 -0700 Subject: [PATCH 4/6] cr Signed-off-by: Achal Shah --- protos/feast/core/DataSource.proto | 9 +++-- sdk/python/tests/unit/test_data_sources.py | 44 +++++++++++----------- 2 files changed, 26 insertions(+), 27 deletions(-) diff --git a/protos/feast/core/DataSource.proto b/protos/feast/core/DataSource.proto index 66d1249920..22f1c257fb 100644 --- a/protos/feast/core/DataSource.proto +++ b/protos/feast/core/DataSource.proto @@ -169,12 +169,13 @@ message DataSource { map schema = 2; } - // Defines options for DataSource that Push Sources + // Defines options for DataSource that supports pushing data to it. This allows data to be pushed to + // the online store on-demand, such as by stream consumers. message PushOptions { - reserved 1; // Mapping of feature name to type - map schema = 2; - DataSource batch_source = 3; + map schema = 1; + // Optional batch source for the push source for historical features and materialization. + DataSource batch_source = 2; } diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 2bb0332561..223c86d4b9 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -1,32 +1,30 @@ -import unittest - from feast.data_source import PushSource from feast.infra.offline_stores.bigquery_source import BigQuerySource from feast.protos.feast.types.Value_pb2 import ValueType -class DataSourceTestCases(unittest.TestCase): - def test_push_no_batch(self): - push_source = PushSource( - name="test", schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64} - ) - push_source_proto = push_source.to_proto() - self.assertIsNotNone(push_source_proto.push_options) - self.assertFalse(push_source_proto.push_options.HasField("batch_source")) - push_source_unproto = PushSource.from_proto(push_source_proto) +def test_push_no_batch(): + push_source = PushSource( + name="test", schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64} + ) + push_source_proto = push_source.to_proto() + assert push_source_proto.push_options is not None + assert not push_source_proto.push_options.HasField("batch_source") + push_source_unproto = PushSource.from_proto(push_source_proto) + + assert push_source == push_source_unproto - self.assertEqual(push_source, push_source_unproto) - def test_push_with_batch(self): - push_source = PushSource( - name="test", - schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64}, - batch_source=BigQuerySource(table="test.test"), - ) - push_source_proto = push_source.to_proto() - self.assertIsNotNone(push_source_proto.push_options) - self.assertTrue(push_source_proto.push_options.HasField("batch_source")) +def test_push_with_batch(): + push_source = PushSource( + name="test", + schema={"user_id": ValueType.INT64, "ltv": ValueType.INT64}, + batch_source=BigQuerySource(table="test.test"), + ) + push_source_proto = push_source.to_proto() + assert push_source_proto.push_options is not None + assert push_source_proto.push_options.HasField("batch_source") - push_source_unproto = PushSource.from_proto(push_source_proto) + push_source_unproto = PushSource.from_proto(push_source_proto) - self.assertEqual(push_source, push_source_unproto) + assert push_source == push_source_unproto From c271783c004ed75030c78009fccf2bb4cec9d8fa Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 22 Mar 2022 10:57:02 -0700 Subject: [PATCH 5/6] cr Signed-off-by: Achal Shah --- sdk/python/tests/unit/test_data_sources.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index 223c86d4b9..ab7a8f747f 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -12,7 +12,9 @@ def test_push_no_batch(): assert not push_source_proto.push_options.HasField("batch_source") push_source_unproto = PushSource.from_proto(push_source_proto) - assert push_source == push_source_unproto + assert push_source.name == push_source_unproto.name + assert push_source.schema == push_source_unproto.schema + assert push_source.batch_source == push_source_unproto.batch_source def test_push_with_batch(): @@ -27,4 +29,6 @@ def test_push_with_batch(): push_source_unproto = PushSource.from_proto(push_source_proto) - assert push_source == push_source_unproto + assert push_source.name == push_source_unproto.name + assert push_source.schema == push_source_unproto.schema + assert push_source.batch_source == push_source_unproto.batch_source From 9182d3d6391e775c0e6f4736fa5be41199a88db5 Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Tue, 22 Mar 2022 11:34:40 -0700 Subject: [PATCH 6/6] cr Signed-off-by: Achal Shah --- sdk/python/feast/data_source.py | 4 ++-- sdk/python/tests/unit/test_data_sources.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 5a3cb0150b..3794b797cc 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -560,8 +560,8 @@ def get_table_column_names_and_types( def from_proto(data_source: DataSourceProto): schema_pb = data_source.push_options.schema schema = {} - for key in schema_pb.keys(): - schema[key] = ValueType(schema_pb.get(key)) + for key, value in schema_pb.items(): + schema[key] = value batch_source = None if data_source.push_options.HasField("batch_source"): diff --git a/sdk/python/tests/unit/test_data_sources.py b/sdk/python/tests/unit/test_data_sources.py index ab7a8f747f..cae5249694 100644 --- a/sdk/python/tests/unit/test_data_sources.py +++ b/sdk/python/tests/unit/test_data_sources.py @@ -31,4 +31,4 @@ def test_push_with_batch(): assert push_source.name == push_source_unproto.name assert push_source.schema == push_source_unproto.schema - assert push_source.batch_source == push_source_unproto.batch_source + assert push_source.batch_source.name == push_source_unproto.batch_source.name