From 52a989b10d9b46b345d17e751a975df826ec17ac Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Jun 2022 11:58:45 -0700 Subject: [PATCH 1/3] fix: Implement apply_materialziation and infra methods in sql registry Signed-off-by: Achal Shah --- sdk/python/feast/infra/registry_stores/sql.py | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/registry_stores/sql.py b/sdk/python/feast/infra/registry_stores/sql.py index f793ef7376..2bd3dfbc13 100644 --- a/sdk/python/feast/infra/registry_stores/sql.py +++ b/sdk/python/feast/infra/registry_stores/sql.py @@ -32,6 +32,7 @@ from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import Infra +from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto @@ -465,16 +466,11 @@ def apply_materialization( raise ValueError( f"Cannot apply materialization for feature {feature_view.name} of type {python_class}" ) - fv: Union[FeatureView, StreamFeatureView] = self._get_object( - table, - feature_view.name, - project, - proto_class, - python_class, - "feature_view_name", - "feature_view_proto", - FeatureViewNotFoundException, - ) + fv: Union[FeatureView, StreamFeatureView] = self._get_object(table, feature_view.name, project, proto_class, + python_class, + "feature_view_name", + "feature_view_proto", + FeatureViewNotFoundException) fv.materialization_intervals.append((start_date, end_date)) self._apply_object(table, "feature_view_name", fv, "feature_view_proto") @@ -597,7 +593,10 @@ def commit(self): # This method is a no-op since we're always writing values eagerly to the db. pass - def _apply_object(self, table, id_field_name, obj, proto_field_name, name=None): + def _apply_object( + self, table, id_field_name, obj, proto_field_name, + name=None + ): name = name or obj.name with self.engine.connect() as conn: stmt = select(table).where(getattr(table.c, id_field_name) == name) From 680c5d6871a79fa208ac7b73c6b493ec65d510dd Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Thu, 9 Jun 2022 16:30:17 -0700 Subject: [PATCH 2/3] fix: hydrate infra object in the sql registry proto() method Signed-off-by: Achal Shah --- sdk/python/feast/infra/registry_stores/sql.py | 76 ++++++++++++++++--- 1 file changed, 65 insertions(+), 11 deletions(-) diff --git a/sdk/python/feast/infra/registry_stores/sql.py b/sdk/python/feast/infra/registry_stores/sql.py index 2bd3dfbc13..0f0e08d8d6 100644 --- a/sdk/python/feast/infra/registry_stores/sql.py +++ b/sdk/python/feast/infra/registry_stores/sql.py @@ -32,7 +32,6 @@ from feast.feature_service import FeatureService from feast.feature_view import FeatureView from feast.infra.infra_object import Infra -from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.on_demand_feature_view import OnDemandFeatureView from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto @@ -149,6 +148,15 @@ ) +feast_metadata = Table( + "feast_metadata", + metadata, + Column("metadata_key", String(50), primary_key=True), + Column("metadata_value", String(50), nullable=False), + Column("last_updated_timestamp", BigInteger, nullable=False), +) + + class SqlRegistry(BaseRegistry): def __init__( self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path] @@ -466,11 +474,16 @@ def apply_materialization( raise ValueError( f"Cannot apply materialization for feature {feature_view.name} of type {python_class}" ) - fv: Union[FeatureView, StreamFeatureView] = self._get_object(table, feature_view.name, project, proto_class, - python_class, - "feature_view_name", - "feature_view_proto", - FeatureViewNotFoundException) + fv: Union[FeatureView, StreamFeatureView] = self._get_object( + table, + feature_view.name, + project, + proto_class, + python_class, + "feature_view_name", + "feature_view_proto", + FeatureViewNotFoundException, + ) fv.materialization_intervals.append((start_date, end_date)) self._apply_object(table, "feature_view_name", fv, "feature_view_proto") @@ -571,7 +584,7 @@ def get_user_metadata( def proto(self) -> RegistryProto: r = RegistryProto() project = "" - # TODO(achal): Support Infra object, and last_updated_timestamp. + # TODO(achal): Support last_updated_timestamp. for lister, registry_proto_field in [ (self.list_entities, r.entities), (self.list_feature_views, r.feature_views), @@ -587,16 +600,18 @@ def proto(self) -> RegistryProto: if objs: registry_proto_field.extend([obj.to_proto() for obj in objs]) + r.infra.CopyFrom(self.get_infra(project).to_proto()) + last_update_timestamp = self._get_last_updated_metadata() + if last_update_timestamp: + r.last_updated.FromDatetime(last_update_timestamp) + return r def commit(self): # This method is a no-op since we're always writing values eagerly to the db. pass - def _apply_object( - self, table, id_field_name, obj, proto_field_name, - name=None - ): + def _apply_object(self, table, id_field_name, obj, proto_field_name, name=None): name = name or obj.name with self.engine.connect() as conn: stmt = select(table).where(getattr(table.c, id_field_name) == name) @@ -625,6 +640,7 @@ def _apply_object( } insert_stmt = insert(table).values(values,) conn.execute(insert_stmt) + self._set_last_updated_metadata(update_datetime) def _delete_object(self, table, name, project, id_field_name, not_found_exception): with self.engine.connect() as conn: @@ -632,6 +648,7 @@ def _delete_object(self, table, name, project, id_field_name, not_found_exceptio rows = conn.execute(stmt) if rows.rowcount < 1 and not_found_exception: raise not_found_exception(name, project) + self._set_last_updated_metadata(datetime.utcnow()) return rows.rowcount def _get_object( @@ -665,3 +682,40 @@ def _list_objects(self, table, proto_class, python_class, proto_field_name): for row in rows ] return [] + + def _set_last_updated_metadata(self, last_updated: datetime): + with self.engine.connect() as conn: + stmt = select(feast_metadata).where( + feast_metadata.c.metadata_key == "last_updated_timestamp" + ) + row = conn.execute(stmt).first() + + update_time = int(last_updated.timestamp()) + + values = { + "metadata_key": "last_updated_timestamp", + "metadata_value": f"{update_time}", + "last_updated_timestamp": update_time, + } + if row: + update_stmt = ( + update(feast_metadata) + .where(feast_metadata.c.metadata_key == "last_updated_timestamp") + .values(values) + ) + conn.execute(update_stmt) + else: + insert_stmt = insert(feast_metadata).values(values,) + conn.execute(insert_stmt) + + def _get_last_updated_metadata(self): + with self.engine.connect() as conn: + stmt = select(feast_metadata).where( + feast_metadata.c.metadata_key == "last_updated_timestamp" + ) + row = conn.execute(stmt).first() + if not row: + return None + update_time = int(row["last_updated_timestamp"]) + + return datetime.utcfromtimestamp(update_time) From c3d3b996bb4c899c8cd389aa51eacbf93a3e2b1a Mon Sep 17 00:00:00 2001 From: Achal Shah Date: Fri, 10 Jun 2022 11:34:25 -0700 Subject: [PATCH 3/3] rm old comment Signed-off-by: Achal Shah --- sdk/python/feast/infra/registry_stores/sql.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/registry_stores/sql.py b/sdk/python/feast/infra/registry_stores/sql.py index 0f0e08d8d6..af988edd59 100644 --- a/sdk/python/feast/infra/registry_stores/sql.py +++ b/sdk/python/feast/infra/registry_stores/sql.py @@ -584,7 +584,6 @@ def get_user_metadata( def proto(self) -> RegistryProto: r = RegistryProto() project = "" - # TODO(achal): Support last_updated_timestamp. for lister, registry_proto_field in [ (self.list_entities, r.entities), (self.list_feature_views, r.feature_views), @@ -601,9 +600,9 @@ def proto(self) -> RegistryProto: registry_proto_field.extend([obj.to_proto() for obj in objs]) r.infra.CopyFrom(self.get_infra(project).to_proto()) - last_update_timestamp = self._get_last_updated_metadata() - if last_update_timestamp: - r.last_updated.FromDatetime(last_update_timestamp) + last_updated_timestamp = self._get_last_updated_metadata() + if last_updated_timestamp: + r.last_updated.FromDatetime(last_updated_timestamp) return r