diff --git a/sdk/python/docs/source/index.rst b/sdk/python/docs/source/index.rst index 347f45ed73..2a4dbfcfb6 100644 --- a/sdk/python/docs/source/index.rst +++ b/sdk/python/docs/source/index.rst @@ -3,7 +3,7 @@ Feast Python API Documentation Feature Store ---------------------------- +================== .. automodule:: feast.feature_store :members: @@ -45,3 +45,10 @@ Feature .. automodule:: feast.feature :inherited-members: :members: + +Feature Service +================== + +.. automodule:: feast.feature_service + :inherited-members: + :members: \ No newline at end of file diff --git a/sdk/python/feast/data_source.py b/sdk/python/feast/data_source.py index 4b782a25d9..262961b8db 100644 --- a/sdk/python/feast/data_source.py +++ b/sdk/python/feast/data_source.py @@ -15,7 +15,7 @@ import enum from abc import ABC, abstractmethod -from typing import Callable, Dict, Iterable, Optional, Tuple +from typing import Any, Callable, Dict, Iterable, Optional, Tuple from feast import type_map from feast.data_format import StreamFormat @@ -220,20 +220,42 @@ def to_proto(self) -> DataSourceProto.KinesisOptions: class DataSource(ABC): """ - DataSource that can be used source features + DataSource that can be used to source features. + + Args: + event_timestamp_column (optional): Event timestamp column used for point in time + joins of feature values. + created_timestamp_column (optional): Timestamp column indicating when the row + was created, used for deduplicating rows. + field_mapping (optional): A dictionary mapping of column names in this data + source to feature names in a feature table or view. Only used for feature + columns, not entity or timestamp columns. + date_partition_column (optional): Timestamp column used for partitioning. """ + _event_timestamp_column: str + _created_timestamp_column: str + _field_mapping: Dict[str, str] + _date_partition_column: str + def __init__( self, - event_timestamp_column: Optional[str] = "", - created_timestamp_column: Optional[str] = "", + event_timestamp_column: Optional[str] = None, + created_timestamp_column: Optional[str] = None, field_mapping: Optional[Dict[str, str]] = None, - date_partition_column: Optional[str] = "", + date_partition_column: Optional[str] = None, ): - self._event_timestamp_column = event_timestamp_column - self._created_timestamp_column = created_timestamp_column + """Creates a DataSource object.""" + self._event_timestamp_column = ( + event_timestamp_column if event_timestamp_column else "" + ) + self._created_timestamp_column = ( + created_timestamp_column if created_timestamp_column else "" + ) self._field_mapping = field_mapping if field_mapping else {} - self._date_partition_column = date_partition_column + self._date_partition_column = ( + date_partition_column if date_partition_column else "" + ) def __eq__(self, other): if not isinstance(other, DataSource): @@ -250,68 +272,76 @@ def __eq__(self, other): return True @property - def field_mapping(self): + def field_mapping(self) -> Dict[str, str]: """ - Returns the field mapping of this data source + Returns the field mapping of this data source. """ return self._field_mapping @field_mapping.setter def field_mapping(self, field_mapping): """ - Sets the field mapping of this data source + Sets the field mapping of this data source. """ self._field_mapping = field_mapping @property - def event_timestamp_column(self): + def event_timestamp_column(self) -> str: """ - Returns the event timestamp column of this data source + Returns the event timestamp column of this data source. """ return self._event_timestamp_column @event_timestamp_column.setter def event_timestamp_column(self, event_timestamp_column): """ - Sets the event timestamp column of this data source + Sets the event timestamp column of this data source. """ self._event_timestamp_column = event_timestamp_column @property - def created_timestamp_column(self): + def created_timestamp_column(self) -> str: """ - Returns the created timestamp column of this data source + Returns the created timestamp column of this data source. """ return self._created_timestamp_column @created_timestamp_column.setter def created_timestamp_column(self, created_timestamp_column): """ - Sets the created timestamp column of this data source + Sets the created timestamp column of this data source. """ self._created_timestamp_column = created_timestamp_column @property - def date_partition_column(self): + def date_partition_column(self) -> str: """ - Returns the date partition column of this data source + Returns the date partition column of this data source. """ return self._date_partition_column @date_partition_column.setter def date_partition_column(self, date_partition_column): """ - Sets the date partition column of this data source + Sets the date partition column of this data source. """ self._date_partition_column = date_partition_column @staticmethod @abstractmethod - def from_proto(data_source: DataSourceProto): - """ - Convert data source config in FeatureTable spec to a DataSource class object. + def from_proto(data_source: DataSourceProto) -> Any: """ + Converts data source config in FeatureTable spec to a DataSource class object. + Args: + data_source: A protobuf representation of a DataSource. + + Returns: + A DataSource class object. + + Raises: + ValueError: The type of DataSource could not be identified. + """ if data_source.data_source_class_type: cls = get_data_source_class_from_type(data_source.data_source_class_type) return cls.from_proto(data_source) @@ -343,7 +373,7 @@ def from_proto(data_source: DataSourceProto): ): data_source_obj = KinesisSource.from_proto(data_source) else: - raise ValueError("Could not identify the source type being added") + raise ValueError("Could not identify the source type being added.") return data_source_obj @@ -357,6 +387,9 @@ def to_proto(self) -> DataSourceProto: def validate(self, config: RepoConfig): """ Validates the underlying data source. + + Args: + config: Configuration object used to configure a feature store. """ raise NotImplementedError @@ -364,7 +397,7 @@ def validate(self, config: RepoConfig): @abstractmethod def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]: """ - Get the callable method that returns Feast type given the raw column type + Returns the callable method that returns Feast type given the raw column type. """ raise NotImplementedError @@ -372,12 +405,17 @@ def get_table_column_names_and_types( self, config: RepoConfig ) -> Iterable[Tuple[str, str]]: """ - Get the list of column names and raw column types + Returns the list of column names and raw column types. + + Args: + config: Configuration object used to configure a feature store. """ raise NotImplementedError def get_table_query_string(self) -> str: - """Returns a string that can directly be used to reference this table in SQL""" + """ + Returns a string that can directly be used to reference this table in SQL. + """ raise NotImplementedError diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index 16740bdc17..fa5d903048 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, MutableMapping, Optional +from datetime import datetime +from typing import Dict, Optional import yaml from google.protobuf import json_format from google.protobuf.json_format import MessageToDict, MessageToJson -from google.protobuf.timestamp_pb2 import Timestamp from feast.loaders import yaml as feast_yaml from feast.protos.feast.core.Entity_pb2 import Entity as EntityV2Proto @@ -30,8 +30,25 @@ class Entity: """ Represents a collection of entities and associated metadata. + + Args: + name: Name of the entity. + value_type (optional): The type of the entity, such as string or float. + description (optional): Additional information to describe the entity. + join_key (optional): A property that uniquely identifies different entities + within the collection. Used as a key for joining entities with their + associated features. If not specified, defaults to the name of the entity. + labels (optional): User-defined metadata in dictionary form. """ + _name: str + _value_type: ValueType + _description: str + _join_key: str + _labels: Dict[str, str] + _created_timestamp: Optional[datetime] + _last_updated_timestamp: Optional[datetime] + @log_exceptions def __init__( self, @@ -39,8 +56,9 @@ def __init__( value_type: ValueType = ValueType.UNKNOWN, description: str = "", join_key: Optional[str] = None, - labels: Optional[MutableMapping[str, str]] = None, + labels: Optional[Dict[str, str]] = None, ): + """Creates an Entity object.""" self._name = name self._description = description self._value_type = value_type @@ -49,14 +67,13 @@ def __init__( else: self._join_key = name - self._labels: MutableMapping[str, str] if labels is None: self._labels = dict() else: self._labels = labels - self._created_timestamp: Optional[Timestamp] = None - self._last_updated_timestamp: Optional[Timestamp] = None + self._created_timestamp: Optional[datetime] = None + self._last_updated_timestamp: Optional[datetime] = None def __eq__(self, other): if not isinstance(other, Entity): @@ -77,96 +94,96 @@ def __str__(self): return str(MessageToJson(self.to_proto())) @property - def name(self): + def name(self) -> str: """ - Returns the name of this entity + Gets the name of this entity. """ return self._name @name.setter def name(self, name): """ - Sets the name of this entity + Sets the name of this entity. """ self._name = name @property - def description(self): + def description(self) -> str: """ - Returns the description of this entity + Gets the description of this entity. """ return self._description @description.setter def description(self, description): """ - Sets the description of this entity + Sets the description of this entity. """ self._description = description @property - def join_key(self): + def join_key(self) -> str: """ - Returns the join key of this entity + Gets the join key of this entity. """ return self._join_key @join_key.setter def join_key(self, join_key): """ - Sets the join key of this entity + Sets the join key of this entity. """ self._join_key = join_key @property def value_type(self) -> ValueType: """ - Returns the type of this entity + Gets the type of this entity. """ return self._value_type @value_type.setter def value_type(self, value_type: ValueType): """ - Set the type for this entity + Sets the type of this entity. """ self._value_type = value_type @property - def labels(self): + def labels(self) -> Dict[str, str]: """ - Returns the labels of this entity. This is the user defined metadata - defined as a dictionary. + Gets the labels of this entity. """ return self._labels @labels.setter - def labels(self, labels: MutableMapping[str, str]): + def labels(self, labels: Dict[str, str]): """ - Set the labels for this entity + Sets the labels of this entity. """ self._labels = labels @property - def created_timestamp(self): + def created_timestamp(self) -> Optional[datetime]: """ - Returns the created_timestamp of this entity + Gets the created_timestamp of this entity. """ return self._created_timestamp @property - def last_updated_timestamp(self): + def last_updated_timestamp(self) -> Optional[datetime]: """ - Returns the last_updated_timestamp of this entity + Gets the last_updated_timestamp of this entity. """ return self._last_updated_timestamp def is_valid(self): """ - Validates the state of a entity locally. Raises an exception - if entity is invalid. - """ + Validates the state of this entity locally. + Raises: + ValueError: The entity does not have a name or does not have a type. + """ if not self.name: raise ValueError("No name found in entity.") @@ -176,29 +193,27 @@ def is_valid(self): @classmethod def from_yaml(cls, yml: str): """ - Creates an entity from a YAML string body or a file path + Creates an entity from a YAML string body or a file path. Args: - yml: Either a file path containing a yaml file or a YAML string + yml: Either a file path containing a yaml file or a YAML string. Returns: - Returns a EntityV2 object based on the YAML file + An EntityV2 object based on the YAML file. """ - return cls.from_dict(feast_yaml.yaml_loader(yml, load_single=True)) @classmethod def from_dict(cls, entity_dict): """ - Creates an entity from a dict + Creates an entity from a dict. Args: - entity_dict: A dict representation of an entity + entity_dict: A dict representation of an entity. Returns: - Returns a EntityV2 object based on the entity dict + An EntityV2 object based on the entity dict. """ - entity_proto = json_format.ParseDict( entity_dict, EntityV2Proto(), ignore_unknown_fields=True ) @@ -207,15 +222,14 @@ def from_dict(cls, entity_dict): @classmethod def from_proto(cls, entity_proto: EntityV2Proto): """ - Creates an entity from a protobuf representation of an entity + Creates an entity from a protobuf representation of an entity. Args: - entity_proto: A protobuf representation of an entity + entity_proto: A protobuf representation of an entity. Returns: - Returns a EntityV2 object based on the entity protobuf + An EntityV2 object based on the entity protobuf. """ - entity = cls( name=entity_proto.spec.name, description=entity_proto.spec.description, @@ -224,23 +238,27 @@ def from_proto(cls, entity_proto: EntityV2Proto): join_key=entity_proto.spec.join_key, ) - entity._created_timestamp = entity_proto.meta.created_timestamp - entity._last_updated_timestamp = entity_proto.meta.last_updated_timestamp + if entity_proto.meta.HasField("created_timestamp"): + entity._created_timestamp = entity_proto.meta.created_timestamp.ToDatetime() + if entity_proto.meta.HasField("last_updated_timestamp"): + entity._last_updated_timestamp = ( + entity_proto.meta.last_updated_timestamp.ToDatetime() + ) return entity def to_proto(self) -> EntityV2Proto: """ - Converts an entity object to its protobuf representation + Converts an entity object to its protobuf representation. Returns: - EntityV2Proto protobuf + An EntityV2Proto protobuf. """ - - meta = EntityMetaProto( - created_timestamp=self.created_timestamp, - last_updated_timestamp=self.last_updated_timestamp, - ) + meta = EntityMetaProto() + if self._created_timestamp: + meta.created_timestamp.FromDatetime(self._created_timestamp) + if self._last_updated_timestamp: + meta.last_updated_timestamp.FromDatetime(self._last_updated_timestamp) spec = EntitySpecProto( name=self.name, @@ -254,12 +272,11 @@ def to_proto(self) -> EntityV2Proto: def to_dict(self) -> Dict: """ - Converts entity to dict + Converts entity to dict. Returns: - Dictionary object representation of entity + Dictionary object representation of entity. """ - entity_dict = MessageToDict(self.to_proto()) # Remove meta when empty for more readable exports @@ -273,7 +290,7 @@ def to_yaml(self): Converts a entity to a YAML string. Returns: - Entity string returned in YAML format + An entity string returned in YAML format. """ entity_dict = self.to_dict() return yaml.dump(entity_dict, allow_unicode=True, sort_keys=False) @@ -284,9 +301,8 @@ def to_spec_proto(self) -> EntitySpecProto: Used when passing EntitySpecV2 object to Feast request. Returns: - EntitySpecV2 protobuf + An EntitySpecV2 protobuf. """ - spec = EntitySpecProto( name=self.name, description=self.description, @@ -299,12 +315,11 @@ def to_spec_proto(self) -> EntitySpecProto: def _update_from_entity(self, entity): """ - Deep replaces one entity with another + Deep replaces one entity with another. Args: - entity: Entity to use as a source of configuration + entity: Entity to use as a source of configuration. """ - self.name = entity.name self.description = entity.description self.value_type = entity.value_type diff --git a/sdk/python/feast/feature.py b/sdk/python/feast/feature.py index c7bcac98ed..8e4457d3b6 100644 --- a/sdk/python/feast/feature.py +++ b/sdk/python/feast/feature.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, MutableMapping, Optional +from typing import Dict, List, Optional from feast.protos.feast.core.Feature_pb2 import FeatureSpecV2 as FeatureSpecProto from feast.protos.feast.serving.ServingService_pb2 import ( @@ -23,20 +23,25 @@ class Feature: - """Feature field type""" + """ + A Feature represents a class of serveable feature. + + Args: + name: Name of the feature. + dtype: The type of the feature, such as string or float. + labels (optional): User-defined metadata in dictionary form. + """ def __init__( - self, - name: str, - dtype: ValueType, - labels: Optional[MutableMapping[str, str]] = None, + self, name: str, dtype: ValueType, labels: Optional[Dict[str, str]] = None, ): + """Creates a Feature object.""" self._name = name if not isinstance(dtype, ValueType): raise ValueError("dtype is not a valid ValueType") self._dtype = dtype if labels is None: - self._labels = dict() # type: MutableMapping + self._labels = dict() else: self._labels = labels @@ -63,26 +68,31 @@ def __str__(self): @property def name(self): """ - Getter for name of this field + Gets the name of this feature. """ return self._name @property def dtype(self) -> ValueType: """ - Getter for data type of this field + Gets the data type of this feature. """ return self._dtype @property - def labels(self) -> MutableMapping[str, str]: + def labels(self) -> Dict[str, str]: """ - Getter for labels of this field + Gets the labels of this feature. """ return self._labels def to_proto(self) -> FeatureSpecProto: - """Converts Feature object to its Protocol Buffer representation""" + """ + Converts Feature object to its Protocol Buffer representation. + + Returns: + A FeatureSpecProto protobuf. + """ value_type = ValueTypeProto.ValueType.Enum.Value(self.dtype.name) return FeatureSpecProto( @@ -98,11 +108,10 @@ def from_proto(cls, feature_proto: FeatureSpecProto): Returns: Feature object """ - feature = cls( name=feature_proto.name, dtype=ValueType(feature_proto.value_type), - labels=feature_proto.labels, + labels=dict(feature_proto.labels), ) return feature diff --git a/sdk/python/feast/feature_service.py b/sdk/python/feast/feature_service.py index 52df88cdf5..0fdb157d94 100644 --- a/sdk/python/feast/feature_service.py +++ b/sdk/python/feast/feature_service.py @@ -19,6 +19,13 @@ class FeatureService: """ A feature service is a logical grouping of features for retrieval (training or serving). The features grouped by a feature service may come from any number of feature views. + + Args: + name: Unique name of the feature service. + features: A list of Features that are grouped as part of this FeatureService. + The list may contain Feature Views, Feature Tables, or a subset of either. + tags (optional): A dictionary of key-value pairs used for organizing Feature + Services. """ name: str @@ -34,13 +41,10 @@ def __init__( tags: Optional[Dict[str, str]] = None, ): """ - Create a new Feature Service object. + Creates a FeatureService object. - Args: - name: A unique name for the Feature Service. - features: A list of Features that are grouped as part of this FeatureService. - The list may contain Feature Views, Feature Tables, or a subset of either. - tags: A dictionary of key-value pairs used for organizing Feature Services. + Raises: + ValueError: If one of the specified features is not a valid type. """ self.name = name self.features = [] @@ -80,6 +84,12 @@ def __eq__(self, other): @staticmethod def from_proto(feature_service_proto: FeatureServiceProto): + """ + Converts a FeatureServiceProto to a FeatureService object. + + Args: + feature_service_proto: A protobuf representation of a FeatureService. + """ fs = FeatureService( name=feature_service_proto.spec.name, features=[ @@ -101,6 +111,12 @@ def from_proto(feature_service_proto: FeatureServiceProto): return fs def to_proto(self) -> FeatureServiceProto: + """ + Converts a FeatureService to its protobuf representation. + + Returns: + A FeatureServiceProto protobuf. + """ meta = FeatureServiceMeta() if self.created_timestamp: meta.created_timestamp.FromDatetime(self.created_timestamp) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 39261acdc1..0721aa562a 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -49,8 +49,9 @@ class FeatureStore: A FeatureStore object is used to define, create, and retrieve features. Args: - repo_path: Path to a `feature_store.yaml` used to configure the feature store - config (RepoConfig): Configuration object used to configure the feature store + repo_path (optional): Path to a `feature_store.yaml` used to configure the + feature store. + config (optional): Configuration object used to configure the feature store. """ config: RepoConfig @@ -61,8 +62,14 @@ class FeatureStore: def __init__( self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, ): + """ + Creates a FeatureStore object. + + Raises: + ValueError: If both or neither of repo_path and config are specified. + """ if repo_path is not None and config is not None: - raise ValueError("You cannot specify both repo_path and config") + raise ValueError("You cannot specify both repo_path and config.") if config is not None: self.repo_path = Path(os.getcwd()) self.config = config @@ -70,7 +77,7 @@ def __init__( self.repo_path = Path(repo_path) self.config = load_repo_config(Path(repo_path)) else: - raise ValueError("Please specify one of repo_path or config") + raise ValueError("Please specify one of repo_path or config.") registry_config = self.config.get_registry_config() self._registry = Registry( @@ -81,12 +88,12 @@ def __init__( @log_exceptions def version(self) -> str: - """Returns the version of the current Feast SDK/CLI""" - + """Returns the version of the current Feast SDK/CLI.""" return get_version() @property def project(self) -> str: + """Gets the project of this feature store.""" return self.config.project def _get_provider(self) -> Provider: @@ -108,7 +115,6 @@ def refresh_registry(self): greater than 0, then once the cache becomes stale (more time than the TTL has passed), a new cache will be downloaded synchronously, which may increase latencies if the triggering method is get_online_features() """ - registry_config = self.config.get_registry_config() self._registry = Registry( registry_path=registry_config.path, @@ -120,37 +126,34 @@ def refresh_registry(self): @log_exceptions_and_usage def list_entities(self, allow_cache: bool = False) -> List[Entity]: """ - Retrieve a list of entities from the registry + Retrieves the list of entities from the registry. Args: - allow_cache (bool): Whether to allow returning entities from a cached registry + allow_cache: Whether to allow returning entities from a cached registry. Returns: - List of entities + A list of entities. """ - return self._registry.list_entities(self.project, allow_cache=allow_cache) @log_exceptions_and_usage def list_feature_services(self) -> List[FeatureService]: """ - Retrieve a list of feature services from the registry + Retrieves the list of feature services from the registry. Returns: - List of feature services + A list of feature services. """ - return self._registry.list_feature_services(self.project) @log_exceptions_and_usage def list_feature_views(self) -> List[FeatureView]: """ - Retrieve a list of feature views from the registry + Retrieves the list of feature views from the registry. Returns: - List of feature views + A list of feature views. """ - return self._registry.list_feature_views(self.project) @log_exceptions_and_usage @@ -159,13 +162,14 @@ def get_entity(self, name: str) -> Entity: Retrieves an entity. Args: - name: Name of entity + name: Name of entity. Returns: - Returns either the specified entity, or raises an exception if - none is found - """ + The specified entity. + Raises: + EntityNotFoundException: The entity could not be found. + """ return self._registry.get_entity(name, self.project) @log_exceptions_and_usage @@ -174,13 +178,14 @@ def get_feature_service(self, name: str) -> FeatureService: Retrieves a feature service. Args: - name: Name of FeatureService + name: Name of feature service. Returns: - Returns either the specified FeatureService, or raises an exception if - none is found - """ + The specified feature service. + Raises: + FeatureServiceNotFoundException: The feature service could not be found. + """ return self._registry.get_feature_service(name, self.project) @log_exceptions_and_usage @@ -189,24 +194,27 @@ def get_feature_view(self, name: str) -> FeatureView: Retrieves a feature view. Args: - name: Name of feature view + name: Name of feature view. Returns: - Returns either the specified feature view, or raises an exception if - none is found - """ + The specified feature view. + Raises: + FeatureViewNotFoundException: The feature view could not be found. + """ return self._registry.get_feature_view(name, self.project) @log_exceptions_and_usage def delete_feature_view(self, name: str): """ - Deletes a feature view or raises an exception if not found. + Deletes a feature view. Args: - name: Name of feature view - """ + name: Name of feature view. + Raises: + FeatureViewNotFoundException: The feature view could not be found. + """ return self._registry.delete_feature_view(name, self.project) def _get_features( @@ -248,6 +256,9 @@ def apply( Args: objects: A single object, or a list of objects that should be registered with the Feature Store. + Raises: + ValueError: The 'objects' parameter could not be parsed properly. + Examples: Register a single Entity and FeatureView. @@ -266,7 +277,6 @@ def apply( >>> ) >>> fs.apply([customer_entity, customer_feature_view]) """ - # TODO: Add locking if not isinstance(objects, Iterable): @@ -314,6 +324,7 @@ def apply( @log_exceptions_and_usage def teardown(self): + """Tears down all local and cloud resources for the feature store.""" tables: List[Union[FeatureView, FeatureTable]] = [] feature_views = self.list_feature_views() feature_tables = self._registry.list_feature_tables(self.project) @@ -375,7 +386,6 @@ def get_historical_features( >>> feature_data = retrieval_job.to_df() >>> model.fit(feature_data) # insert your modeling framework here. """ - _feature_refs = self._get_features(features, feature_refs) print(f"_feature_refs: {_feature_refs}") @@ -419,6 +429,9 @@ def materialize_incremental( feature_views (List[str]): Optional list of feature view names. If selected, will only run materialization for the specified feature views. + Raises: + Exception: A feature view being materialized does not have a TTL set. + Examples: Materialize all features into the online store up to 5 minutes ago. @@ -428,7 +441,6 @@ def materialize_incremental( >>> fs = FeatureStore(config=RepoConfig(provider="gcp", registry="gs://my-fs/", project="my_fs_proj")) >>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5)) """ - feature_views_to_materialize = [] if feature_views is None: feature_views_to_materialize = self._registry.list_feature_views( @@ -514,7 +526,6 @@ def materialize( >>> start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10) >>> ) """ - if utils.make_tzaware(start_date) > utils.make_tzaware(end_date): raise ValueError( f"The given start_date {start_date} is greater than the given end_date {end_date}." @@ -587,8 +598,13 @@ def get_online_features( the feature and feature table names respectively. Only the feature name is required. entity_rows: A list of dictionaries where each key-value is an entity-name, entity-value pair. + Returns: OnlineResponse containing the feature data in records. + + Raises: + Exception: No entity with the specified name exists. + Examples: >>> from feast import FeatureStore >>> @@ -602,7 +618,6 @@ def get_online_features( >>> print(online_response_dict) {'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]} """ - _feature_refs = self._get_features(features, feature_refs) provider = self._get_provider() diff --git a/sdk/python/feast/feature_table.py b/sdk/python/feast/feature_table.py index 2b09fea8bd..6a63d9c3cd 100644 --- a/sdk/python/feast/feature_table.py +++ b/sdk/python/feast/feature_table.py @@ -275,7 +275,7 @@ def from_proto(cls, feature_table_proto: FeatureTableProto): Feature( name=feature.name, dtype=ValueType(feature.value_type), - labels=feature.labels, + labels=dict(feature.labels), ) for feature in feature_table_proto.spec.features ], diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index c1cfbd1167..886d19d639 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -18,7 +18,6 @@ from google.protobuf.duration_pb2 import Duration from google.protobuf.json_format import MessageToJson -from google.protobuf.timestamp_pb2 import Timestamp from feast import utils from feast.data_source import DataSource @@ -45,19 +44,34 @@ class FeatureView: """ A FeatureView defines a logical grouping of serveable features. + + Args: + name: Name of the group of features. + entities: The entities to which this group of features is associated. + ttl: The amount of time this group of features lives. A ttl of 0 indicates that + this group of features lives forever. Note that large ttl's or a ttl of 0 + can result in extremely computationally intensive queries. + input: The source of data where this group of features is stored. + batch_source (optional): The batch source of data where this group of features + is stored. + stream_source (optional): The stream source of data where this group of features + is stored. + features (optional): The set of features defined as part of this FeatureView. + tags (optional): A dictionary of key-value pairs used for organizing + FeatureViews. """ name: str entities: List[str] features: List[Feature] tags: Optional[Dict[str, str]] - ttl: Optional[timedelta] + ttl: timedelta online: bool input: DataSource batch_source: DataSource stream_source: Optional[DataSource] = None - created_timestamp: Optional[Timestamp] = None - last_updated_timestamp: Optional[Timestamp] = None + created_timestamp: Optional[datetime] = None + last_updated_timestamp: Optional[datetime] = None materialization_intervals: List[Tuple[datetime, datetime]] @log_exceptions @@ -65,14 +79,20 @@ def __init__( self, name: str, entities: List[str], - ttl: Optional[Union[Duration, timedelta]], + ttl: Union[Duration, timedelta], input: Optional[DataSource] = None, batch_source: Optional[DataSource] = None, stream_source: Optional[DataSource] = None, - features: List[Feature] = None, + features: Optional[List[Feature]] = None, tags: Optional[Dict[str, str]] = None, online: bool = True, ): + """ + Creates a FeatureView object. + + Raises: + ValueError: A field mapping conflicts with an Entity or a Feature. + """ warnings.warn( ( "The argument 'input' is being deprecated. Please use 'batch_source' " @@ -112,6 +132,9 @@ def __init__( self.materialization_intervals = [] + self.created_timestamp: Optional[datetime] = None + self.last_updated_timestamp: Optional[datetime] = None + def __repr__(self): items = (f"{k} = {v}" for k, v in self.__dict__.items()) return f"<{self.__class__.__name__}({', '.join(items)})>" @@ -159,29 +182,29 @@ def __eq__(self, other): def is_valid(self): """ - Validates the state of a feature view locally. Raises an exception - if feature view is invalid. - """ + Validates the state of this feature view locally. + Raises: + ValueError: The feature view does not have a name or does not have entities. + """ if not self.name: - raise ValueError("Feature view needs a name") + raise ValueError("Feature view needs a name.") if not self.entities: - raise ValueError("Feature view has no entities") + raise ValueError("Feature view has no entities.") def to_proto(self) -> FeatureViewProto: """ - Converts an feature view object to its protobuf representation. + Converts a feature view object to its protobuf representation. Returns: - FeatureViewProto protobuf + A FeatureViewProto protobuf. """ - - meta = FeatureViewMetaProto( - created_timestamp=self.created_timestamp, - last_updated_timestamp=self.last_updated_timestamp, - materialization_intervals=[], - ) + meta = FeatureViewMetaProto(materialization_intervals=[]) + if self.created_timestamp: + meta.created_timestamp.FromDatetime(self.created_timestamp) + if self.last_updated_timestamp: + meta.last_updated_timestamp.FromDatetime(self.last_updated_timestamp) for interval in self.materialization_intervals: interval_proto = MaterializationIntervalProto() interval_proto.start_time.FromDatetime(interval[0]) @@ -217,15 +240,14 @@ def to_proto(self) -> FeatureViewProto: @classmethod def from_proto(cls, feature_view_proto: FeatureViewProto): """ - Creates a feature view from a protobuf representation of a feature view + Creates a feature view from a protobuf representation of a feature view. Args: - feature_view_proto: A protobuf representation of a feature view + feature_view_proto: A protobuf representation of a feature view. Returns: - Returns a FeatureViewProto object based on the feature view protobuf + A FeatureViewProto object based on the feature view protobuf. """ - batch_source = DataSource.from_proto(feature_view_proto.spec.batch_source) stream_source = ( DataSource.from_proto(feature_view_proto.spec.stream_source) @@ -239,7 +261,7 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): Feature( name=feature.name, dtype=ValueType(feature.value_type), - labels=feature.labels, + labels=dict(feature.labels), ) for feature in feature_view_proto.spec.features ], @@ -256,7 +278,14 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): stream_source=stream_source, ) - feature_view.created_timestamp = feature_view_proto.meta.created_timestamp + if feature_view_proto.meta.HasField("created_timestamp"): + feature_view.created_timestamp = ( + feature_view_proto.meta.created_timestamp.ToDatetime() + ) + if feature_view_proto.meta.HasField("last_updated_timestamp"): + feature_view.last_updated_timestamp = ( + feature_view_proto.meta.last_updated_timestamp.ToDatetime() + ) for interval in feature_view_proto.meta.materialization_intervals: feature_view.materialization_intervals.append( @@ -270,11 +299,26 @@ def from_proto(cls, feature_view_proto: FeatureViewProto): @property def most_recent_end_time(self) -> Optional[datetime]: + """ + Retrieves the latest time up to which the feature view has been materialized. + + Returns: + The latest time, or None if the feature view has not been materialized. + """ if len(self.materialization_intervals) == 0: return None return max([interval[1] for interval in self.materialization_intervals]) def infer_features_from_batch_source(self, config: RepoConfig): + """ + Infers the set of features associated to this feature view from the input source. + + Args: + config: Configuration object used to configure the feature store. + + Raises: + RegistryInferenceFailure: The set of features could not be inferred. + """ if not self.features: columns_to_exclude = { self.batch_source.event_timestamp_column, diff --git a/sdk/python/feast/inference.py b/sdk/python/feast/inference.py index 421e01a68c..0417cae257 100644 --- a/sdk/python/feast/inference.py +++ b/sdk/python/feast/inference.py @@ -108,6 +108,7 @@ def update_data_sources_with_inferred_event_timestamp_col( matched_flag = True event_timestamp_column = col_name if matched_flag: + assert event_timestamp_column data_source.event_timestamp_column = event_timestamp_column else: raise RegistryInferenceFailure(