diff --git a/sdk/python/feast/stream_feature_view.py b/sdk/python/feast/stream_feature_view.py index 888f6c3f76..214ab083ab 100644 --- a/sdk/python/feast/stream_feature_view.py +++ b/sdk/python/feast/stream_feature_view.py @@ -3,14 +3,14 @@ import warnings from datetime import timedelta from types import MethodType -from typing import Dict, List, Optional, Type, Union +from typing import Dict, List, Optional, Union import dill from google.protobuf.duration_pb2 import Duration from feast import utils from feast.aggregation import Aggregation -from feast.data_source import DataSource, KafkaSource +from feast.data_source import DataSource, KafkaSource, PushSource from feast.entity import Entity from feast.feature_view import FeatureView from feast.field import Field @@ -106,7 +106,9 @@ def __init__( self.mode = mode or "" self.timestamp_field = timestamp_field or "" self.udf = udf - _batch_source = source.batch_source if source.batch_source else None + _batch_source = None + if isinstance(source, KafkaSource) or isinstance(source, PushSource): + _batch_source = source.batch_source if source.batch_source else None _ttl = ttl if not _ttl: _ttl = timedelta(days=0) @@ -124,17 +126,20 @@ def __init__( source=source, ) - def __eq__(self, other) -> bool: + def __eq__(self, other): if not isinstance(other, StreamFeatureView): raise TypeError("Comparisons should only involve StreamFeatureViews") if not super().__eq__(other): return False - + if not self.udf: + return not other.udf + if not other.udf: + return False if ( self.mode != other.mode or self.timestamp_field != other.timestamp_field - or (self.udf and self.udf.__code__.co_code != other.udf.__code__.co_code) + or self.udf.__code__.co_code != other.udf.__code__.co_code or self.aggregations != other.aggregations ): return False @@ -144,7 +149,7 @@ def __eq__(self, other) -> bool: def __hash__(self) -> int: return super().__hash__() - def to_proto(self) -> StreamFeatureViewProto: + def to_proto(self): meta = StreamFeatureViewMetaProto(materialization_intervals=[]) if self.created_timestamp: meta.created_timestamp.FromDatetime(self.created_timestamp) @@ -270,10 +275,6 @@ def from_proto(cls, sfv_proto): return sfv_feature_view - @property - def proto_class(self) -> Type[StreamFeatureViewProto]: - return StreamFeatureViewProto - def __copy__(self): fv = StreamFeatureView( name=self.name,