Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement diff_infra_protos method for feast plan #2204

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 132 additions & 4 deletions sdk/python/feast/diff/infra_diff.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
from dataclasses import dataclass
from typing import Any, List
from typing import Any, Iterable, List, Tuple, TypeVar

from feast.diff.property_diff import PropertyDiff, TransitionType
from feast.infra.infra_object import (
DATASTORE_INFRA_OBJECT_CLASS_TYPE,
DYNAMODB_INFRA_OBJECT_CLASS_TYPE,
SQLITE_INFRA_OBJECT_CLASS_TYPE,
InfraObject,
)
from feast.protos.feast.core.DatastoreTable_pb2 import (
DatastoreTable as DatastoreTableProto,
)
from feast.protos.feast.core.DynamoDBTable_pb2 import (
DynamoDBTable as DynamoDBTableProto,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.SqliteTable_pb2 import SqliteTable as SqliteTableProto


@dataclass
class InfraObjectDiff:
name: str
infra_object_type: str
current_fco: Any
new_fco: Any
fco_property_diffs: List[PropertyDiff]
current_infra_object: Any
new_infra_object: Any
infra_object_property_diffs: List[PropertyDiff]
transition_type: TransitionType


Expand All @@ -26,3 +40,117 @@ def update(self):

def to_string(self):
pass


U = TypeVar("U", DatastoreTableProto, DynamoDBTableProto, SqliteTableProto)


def tag_infra_proto_objects_for_keep_delete_add(
existing_objs: Iterable[U], desired_objs: Iterable[U]
) -> Tuple[Iterable[U], Iterable[U], Iterable[U]]:
existing_obj_names = {e.name for e in existing_objs}
desired_obj_names = {e.name for e in desired_objs}

objs_to_add = [e for e in desired_objs if e.name not in existing_obj_names]
objs_to_keep = [e for e in desired_objs if e.name in existing_obj_names]
objs_to_delete = [e for e in existing_objs if e.name not in desired_obj_names]

return objs_to_keep, objs_to_delete, objs_to_add


def diff_infra_protos(
current_infra_proto: InfraProto, new_infra_proto: InfraProto
) -> InfraDiff:
infra_diff = InfraDiff()

infra_object_class_types_to_str = {
DATASTORE_INFRA_OBJECT_CLASS_TYPE: "datastore table",
DYNAMODB_INFRA_OBJECT_CLASS_TYPE: "dynamodb table",
SQLITE_INFRA_OBJECT_CLASS_TYPE: "sqlite table",
}

for infra_object_class_type in infra_object_class_types_to_str:
current_infra_objects = get_infra_object_protos_by_type(
current_infra_proto, infra_object_class_type
)
new_infra_objects = get_infra_object_protos_by_type(
new_infra_proto, infra_object_class_type
)
(
infra_objects_to_keep,
infra_objects_to_delete,
infra_objects_to_add,
) = tag_infra_proto_objects_for_keep_delete_add(
current_infra_objects, new_infra_objects,
)

for e in infra_objects_to_add:
infra_diff.infra_object_diffs.append(
InfraObjectDiff(
e.name,
infra_object_class_types_to_str[infra_object_class_type],
None,
e,
[],
TransitionType.CREATE,
)
)
for e in infra_objects_to_delete:
infra_diff.infra_object_diffs.append(
InfraObjectDiff(
e.name,
infra_object_class_types_to_str[infra_object_class_type],
e,
None,
[],
TransitionType.DELETE,
)
)
for e in infra_objects_to_keep:
current_infra_object = [
_e for _e in current_infra_objects if _e.name == e.name
][0]
infra_diff.infra_object_diffs.append(
diff_between(
current_infra_object,
e,
infra_object_class_types_to_str[infra_object_class_type],
)
)

return infra_diff


def get_infra_object_protos_by_type(
infra_proto: InfraProto, infra_object_class_type: str
) -> List[U]:
return [
InfraObject.from_infra_object_proto(infra_object).to_proto()
for infra_object in infra_proto.infra_objects
if infra_object.infra_object_class_type == infra_object_class_type
]


FIELDS_TO_IGNORE = {"project"}


def diff_between(current: U, new: U, infra_object_type: str) -> InfraObjectDiff:
assert current.DESCRIPTOR.full_name == new.DESCRIPTOR.full_name
property_diffs = []
transition: TransitionType = TransitionType.UNCHANGED
if current != new:
for _field in current.DESCRIPTOR.fields:
if _field.name in FIELDS_TO_IGNORE:
continue
if getattr(current, _field.name) != getattr(new, _field.name):
transition = TransitionType.UPDATE
property_diffs.append(
PropertyDiff(
_field.name,
getattr(current, _field.name),
getattr(new, _field.name),
)
)
return InfraObjectDiff(
new.name, infra_object_type, current, new, property_diffs, transition,
)
17 changes: 13 additions & 4 deletions sdk/python/feast/infra/infra_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,29 @@
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto

DATASTORE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.datastore.DatastoreTable"
DYNAMODB_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_stores.dynamodb.DynamoDBTable"
SQLITE_INFRA_OBJECT_CLASS_TYPE = "feast.infra.online_store.sqlite.SqliteTable"


class InfraObject(ABC):
"""
Represents a single infrastructure object (e.g. online store table) managed by Feast.
"""

@abstractmethod
def to_proto(self) -> InfraObjectProto:
def to_infra_object_proto(self) -> InfraObjectProto:
"""Converts an InfraObject to its protobuf representation, wrapped in an InfraObjectProto."""
pass

@abstractmethod
def to_proto(self) -> Any:
"""Converts an InfraObject to its protobuf representation."""
pass

@staticmethod
@abstractmethod
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
"""
Returns an InfraObject created from a protobuf representation.

Expand All @@ -46,7 +55,7 @@ def from_proto(infra_object_proto: InfraObjectProto) -> Any:
cls = _get_infra_object_class_from_type(
infra_object_proto.infra_object_class_type
)
return cls.from_proto(infra_object_proto)
return cls.from_infra_object_proto(infra_object_proto)

raise ValueError("Could not identify the type of the InfraObject.")

Expand Down Expand Up @@ -97,7 +106,7 @@ def from_proto(cls, infra_proto: InfraProto):
"""
infra = cls()
cls.infra_objects += [
InfraObject.from_proto(infra_object_proto)
InfraObject.from_infra_object_proto(infra_object_proto)
for infra_object_proto in infra_proto.infra_objects
]

Expand Down
49 changes: 26 additions & 23 deletions sdk/python/feast/infra/online_stores/datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
from pydantic.typing import Literal

from feast import Entity, utils
from feast.errors import FeastProviderLoginError
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
from feast.infra.infra_object import DATASTORE_INFRA_OBJECT_CLASS_TYPE, InfraObject
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.DatastoreTable_pb2 import (
Expand All @@ -43,7 +44,7 @@
from google.cloud import datastore
from google.cloud.datastore.client import Key
except ImportError as e:
from feast.errors import FeastExtrasDependencyImportError, FeastProviderLoginError
from feast.errors import FeastExtrasDependencyImportError

raise FeastExtrasDependencyImportError("gcp", str(e))

Expand Down Expand Up @@ -332,14 +333,12 @@ class DatastoreTable(InfraObject):
name: The name of the table.
project_id (optional): The GCP project id.
namespace (optional): Datastore namespace.
client: Datastore client.
"""

project: str
name: str
project_id: Optional[str]
namespace: Optional[str]
client: datastore.Client

def __init__(
self,
Expand All @@ -352,51 +351,55 @@ def __init__(
self.name = name
self.project_id = project_id
self.namespace = namespace
self.client = _initialize_client(self.project_id, self.namespace)

def to_proto(self) -> InfraObjectProto:
def to_infra_object_proto(self) -> InfraObjectProto:
datastore_table_proto = self.to_proto()
return InfraObjectProto(
infra_object_class_type=DATASTORE_INFRA_OBJECT_CLASS_TYPE,
datastore_table=datastore_table_proto,
)

def to_proto(self) -> Any:
datastore_table_proto = DatastoreTableProto()
datastore_table_proto.project = self.project
datastore_table_proto.name = self.name
if self.project_id:
datastore_table_proto.project_id.FromString(bytes(self.project_id, "utf-8"))
datastore_table_proto.project_id.value = self.project_id
if self.namespace:
datastore_table_proto.namespace.FromString(bytes(self.namespace, "utf-8"))

return InfraObjectProto(
infra_object_class_type="feast.infra.online_stores.datastore.DatastoreTable",
datastore_table=datastore_table_proto,
)
datastore_table_proto.namespace.value = self.namespace
return datastore_table_proto

@staticmethod
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
datastore_table = DatastoreTable(
project=infra_object_proto.datastore_table.project,
name=infra_object_proto.datastore_table.name,
)

if infra_object_proto.datastore_table.HasField("project_id"):
datastore_table.project_id = (
infra_object_proto.datastore_table.project_id.SerializeToString()
).decode("utf-8")
infra_object_proto.datastore_table.project_id.value
)
if infra_object_proto.datastore_table.HasField("namespace"):
datastore_table.namespace = (
infra_object_proto.datastore_table.namespace.SerializeToString()
).decode("utf-8")
infra_object_proto.datastore_table.namespace.value
)

return datastore_table

def update(self):
key = self.client.key("Project", self.project, "Table", self.name)
client = _initialize_client(self.project_id, self.namespace)
key = client.key("Project", self.project, "Table", self.name)
entity = datastore.Entity(
key=key, exclude_from_indexes=("created_ts", "event_ts", "values")
)
entity.update({"created_ts": datetime.utcnow()})
self.client.put(entity)
client.put(entity)

def teardown(self):
key = self.client.key("Project", self.project, "Table", self.name)
_delete_all_values(self.client, key)
client = _initialize_client(self.project_id, self.namespace)
key = client.key("Project", self.project, "Table", self.name)
_delete_all_values(client, key)

# Delete the table metadata datastore entity
self.client.delete(key)
client.delete(key)
19 changes: 11 additions & 8 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pydantic.typing import Literal

from feast import Entity, FeatureView, utils
from feast.infra.infra_object import InfraObject
from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.DynamoDBTable_pb2 import (
Expand Down Expand Up @@ -234,18 +234,21 @@ def __init__(self, name: str, region: str):
self.name = name
self.region = region

def to_proto(self) -> InfraObjectProto:
dynamodb_table_proto = DynamoDBTableProto()
dynamodb_table_proto.name = self.name
dynamodb_table_proto.region = self.region

def to_infra_object_proto(self) -> InfraObjectProto:
dynamodb_table_proto = self.to_proto()
return InfraObjectProto(
infra_object_class_type="feast.infra.online_stores.dynamodb.DynamoDBTable",
infra_object_class_type=DYNAMODB_INFRA_OBJECT_CLASS_TYPE,
dynamodb_table=dynamodb_table_proto,
)

def to_proto(self) -> Any:
dynamodb_table_proto = DynamoDBTableProto()
dynamodb_table_proto.name = self.name
dynamodb_table_proto.region = self.region
return dynamodb_table_proto

@staticmethod
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
return DynamoDBTable(
name=infra_object_proto.dynamodb_table.name,
region=infra_object_proto.dynamodb_table.region,
Expand Down
19 changes: 11 additions & 8 deletions sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

from feast import Entity
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
from feast.infra.infra_object import SQLITE_INFRA_OBJECT_CLASS_TYPE, InfraObject
from feast.infra.key_encoding_utils import serialize_entity_key
from feast.infra.online_stores.online_store import OnlineStore
from feast.protos.feast.core.InfraObject_pb2 import InfraObject as InfraObjectProto
Expand Down Expand Up @@ -241,18 +241,21 @@ def __init__(self, path: str, name: str):
self.name = name
self.conn = _initialize_conn(path)

def to_proto(self) -> InfraObjectProto:
sqlite_table_proto = SqliteTableProto()
sqlite_table_proto.path = self.path
sqlite_table_proto.name = self.name

def to_infra_object_proto(self) -> InfraObjectProto:
sqlite_table_proto = self.to_proto()
return InfraObjectProto(
infra_object_class_type="feast.infra.online_store.sqlite.SqliteTable",
infra_object_class_type=SQLITE_INFRA_OBJECT_CLASS_TYPE,
sqlite_table=sqlite_table_proto,
)

def to_proto(self) -> Any:
sqlite_table_proto = SqliteTableProto()
sqlite_table_proto.path = self.path
sqlite_table_proto.name = self.name
return sqlite_table_proto

@staticmethod
def from_proto(infra_object_proto: InfraObjectProto) -> Any:
def from_infra_object_proto(infra_object_proto: InfraObjectProto) -> Any:
return SqliteTable(
path=infra_object_proto.sqlite_table.path,
name=infra_object_proto.sqlite_table.name,
Expand Down
Loading