Skip to content

Commit

Permalink
fix: Request data api update (#2488)
Browse files Browse the repository at this point in the history
* Update request data source

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Data source update

Signed-off-by: Author <[email protected]>
Signed-off-by: Kevin Zhang <[email protected]>

* Data source

Signed-off-by: Kevin Zhang <[email protected]>

Signed-off-by: Author <[email protected]>
Signed-off-by: Kevin Zhang <[email protected]>

* Temp fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Update internal var names

Signed-off-by: Kevin Zhang <[email protected]>

* Add test

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>
  • Loading branch information
kevjumba authored Apr 5, 2022
1 parent 31a0ad5 commit 0c9e5b7
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 74 deletions.
4 changes: 2 additions & 2 deletions docs/getting-started/concepts/feature-view.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ Feature names must be unique within a [feature view](feature-view.md#feature-vie
On demand feature views allows users to use existing features and request time data (features only available at request time) to transform and create new features. Users define python transformation logic which is executed in both historical retrieval and online retrieval paths:

```python
# Define a request data source which encodes features / information only
# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestDataSource(
input_request = RequestSource(
name="vals_to_add",
schema={
"val_to_add": ValueType.INT64,
Expand Down
8 changes: 4 additions & 4 deletions examples/java-demo/feature_repo/driver_repo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd
from feast import Entity, Feature, FeatureView, FileSource, ValueType
from feast.data_source import RequestDataSource
from feast.data_source import RequestSource
from feast.on_demand_feature_view import on_demand_feature_view
from feast.request_feature_view import RequestFeatureView
from google.protobuf.duration_pb2 import Duration
Expand Down Expand Up @@ -28,13 +28,13 @@

# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestDataSource(
input_request = RequestSource(
name="vals_to_add",
schema={"val_to_add": ValueType.INT64, "val_to_add_2": ValueType.INT64},
)

# Define an on demand feature view which can generate new features based on
# existing feature views and RequestDataSource features
# existing feature views and RequestSource features
@on_demand_feature_view(
inputs={
"driver_hourly_stats": driver_hourly_stats_view,
Expand All @@ -55,7 +55,7 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
# Define request feature view
driver_age_request_fv = RequestFeatureView(
name="driver_age",
request_data_source=RequestDataSource(
request_source=RequestSource(
name="driver_age", schema={"driver_age": ValueType.INT64,}
),
)
20 changes: 14 additions & 6 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.


import enum
import warnings
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -144,7 +143,7 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake_source.SnowflakeSource",
DataSourceProto.SourceType.STREAM_KAFKA: "feast.data_source.KafkaSource",
DataSourceProto.SourceType.STREAM_KINESIS: "feast.data_source.KinesisSource",
DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestDataSource",
DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestSource",
DataSourceProto.SourceType.PUSH_SOURCE: "feast.data_source.PushSource",
}

Expand Down Expand Up @@ -422,9 +421,9 @@ def get_table_query_string(self) -> str:
raise NotImplementedError


class RequestDataSource(DataSource):
class RequestSource(DataSource):
"""
RequestDataSource that can be used to provide input features for on demand transforms
RequestSource that can be used to provide input features for on demand transforms
Args:
name: Name of the request data source
Expand All @@ -446,7 +445,7 @@ def __init__(
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
):
"""Creates a RequestDataSource object."""
"""Creates a RequestSource object."""
super().__init__(name=name, description=description, tags=tags, owner=owner)
self.schema = schema

Expand All @@ -464,7 +463,7 @@ def from_proto(data_source: DataSourceProto):
schema = {}
for key, val in schema_pb.items():
schema[key] = ValueType(val)
return RequestDataSource(
return RequestSource(
name=data_source.name,
schema=schema,
description=data_source.description,
Expand Down Expand Up @@ -496,6 +495,15 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
raise NotImplementedError


class RequestDataSource(RequestSource):
def __init__(self, *args, **kwargs):
warnings.warn(
"The 'RequestDataSource' class is deprecated and was renamed to RequestSource. Please use RequestSource instead. This class name will be removed in Feast 0.23.",
DeprecationWarning,
)
super().__init__(*args, **kwargs)


class KinesisSource(DataSource):
def validate(self, config: RepoConfig):
pass
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ def apply(
data_sources_set_to_update.add(rfv.request_data_source)

for odfv in odfvs_to_update:
for v in odfv.source_request_data_sources.values():
for v in odfv.source_request_sources.values():
data_sources_set_to_update.add(v)

data_sources_to_update = list(data_sources_set_to_update)
Expand Down
4 changes: 2 additions & 2 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
RedshiftSource,
SnowflakeSource,
)
from feast.data_source import DataSource, RequestDataSource
from feast.data_source import DataSource, RequestSource
from feast.errors import RegistryInferenceFailure
from feast.feature_view import FeatureView
from feast.repo_config import RepoConfig
Expand Down Expand Up @@ -78,7 +78,7 @@ def update_data_sources_with_inferred_event_timestamp_col(
ERROR_MSG_PREFIX = "Unable to infer DataSource event_timestamp_column"

for data_source in data_sources:
if isinstance(data_source, RequestDataSource):
if isinstance(data_source, RequestSource):
continue
if (
data_source.event_timestamp_column is None
Expand Down
40 changes: 18 additions & 22 deletions sdk/python/feast/on_demand_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pandas as pd

from feast.base_feature_view import BaseFeatureView
from feast.data_source import RequestDataSource
from feast.data_source import RequestSource
from feast.errors import RegistryInferenceFailure, SpecifiedFeaturesNotPresentError
from feast.feature import Feature
from feast.feature_view import FeatureView
Expand Down Expand Up @@ -45,8 +45,8 @@ class OnDemandFeatureView(BaseFeatureView):
features: The list of features in the output of the on demand feature view.
source_feature_view_projections: A map from input source names to actual input
sources with type FeatureViewProjection.
source_request_data_sources: A map from input source names to the actual input
sources with type RequestDataSource.
source_request_sources: A map from input source names to the actual input
sources with type RequestSource.
udf: The user defined transformation function, which must take pandas dataframes
as inputs.
description: A human-readable description.
Expand All @@ -59,7 +59,7 @@ class OnDemandFeatureView(BaseFeatureView):
name: str
features: List[Feature]
source_feature_view_projections: Dict[str, FeatureViewProjection]
source_request_data_sources: Dict[str, RequestDataSource]
source_request_sources: Dict[str, RequestSource]
udf: MethodType
description: str
tags: Dict[str, str]
Expand All @@ -71,11 +71,11 @@ def __init__(
name: str,
features: List[Feature],
sources: Optional[
Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]]
Dict[str, Union[FeatureView, FeatureViewProjection, RequestSource]]
] = None,
udf: Optional[MethodType] = None,
inputs: Optional[
Dict[str, Union[FeatureView, FeatureViewProjection, RequestDataSource]]
Dict[str, Union[FeatureView, FeatureViewProjection, RequestSource]]
] = None,
description: str = "",
tags: Optional[Dict[str, str]] = None,
Expand Down Expand Up @@ -124,10 +124,10 @@ def __init__(

assert sources is not None
self.source_feature_view_projections: Dict[str, FeatureViewProjection] = {}
self.source_request_data_sources: Dict[str, RequestDataSource] = {}
self.source_request_sources: Dict[str, RequestSource] = {}
for source_name, odfv_source in sources.items():
if isinstance(odfv_source, RequestDataSource):
self.source_request_data_sources[source_name] = odfv_source
if isinstance(odfv_source, RequestSource):
self.source_request_sources[source_name] = odfv_source
elif isinstance(odfv_source, FeatureViewProjection):
self.source_feature_view_projections[source_name] = odfv_source
else:
Expand All @@ -149,8 +149,7 @@ def __copy__(self):
name=self.name,
features=self.features,
sources=dict(
**self.source_feature_view_projections,
**self.source_request_data_sources,
**self.source_feature_view_projections, **self.source_request_sources,
),
udf=self.udf,
description=self.description,
Expand All @@ -167,7 +166,7 @@ def __eq__(self, other):
if (
not self.source_feature_view_projections
== other.source_feature_view_projections
or not self.source_request_data_sources == other.source_request_data_sources
or not self.source_request_sources == other.source_request_sources
or not self.udf.__code__.co_code == other.udf.__code__.co_code
):
return False
Expand All @@ -194,12 +193,9 @@ def to_proto(self) -> OnDemandFeatureViewProto:
sources[source_name] = OnDemandSource(
feature_view_projection=fv_projection.to_proto()
)
for (
source_name,
request_data_source,
) in self.source_request_data_sources.items():
for (source_name, request_sources,) in self.source_request_sources.items():
sources[source_name] = OnDemandSource(
request_data_source=request_data_source.to_proto()
request_data_source=request_sources.to_proto()
)

spec = OnDemandFeatureViewSpec(
Expand Down Expand Up @@ -241,7 +237,7 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):
on_demand_source.feature_view_projection
)
else:
sources[source_name] = RequestDataSource.from_proto(
sources[source_name] = RequestSource.from_proto(
on_demand_source.request_data_source
)
on_demand_feature_view_obj = cls(
Expand Down Expand Up @@ -282,8 +278,8 @@ def from_proto(cls, on_demand_feature_view_proto: OnDemandFeatureViewProto):

def get_request_data_schema(self) -> Dict[str, ValueType]:
schema: Dict[str, ValueType] = {}
for request_data_source in self.source_request_data_sources.values():
schema.update(request_data_source.schema)
for request_source in self.source_request_sources.values():
schema.update(request_source.schema)
return schema

def get_transformed_features_df(
Expand Down Expand Up @@ -339,7 +335,7 @@ def infer_features(self):
dtype=dtype
)
df[f"{feature.name}"] = pd.Series(dtype=dtype)
for request_data in self.source_request_data_sources.values():
for request_data in self.source_request_sources.values():
for feature_name, feature_type in request_data.schema.items():
dtype = feast_value_type_to_pandas_type(feature_type)
df[f"{feature_name}"] = pd.Series(dtype=dtype)
Expand Down Expand Up @@ -385,7 +381,7 @@ def get_requested_odfvs(feature_refs, project, registry):


def on_demand_feature_view(
features: List[Feature], sources: Dict[str, Union[FeatureView, RequestDataSource]]
features: List[Feature], sources: Dict[str, Union[FeatureView, RequestSource]]
):
"""
Declare an on-demand feature view
Expand Down
18 changes: 8 additions & 10 deletions sdk/python/feast/request_feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Dict, List, Optional, Type

from feast.base_feature_view import BaseFeatureView
from feast.data_source import RequestDataSource
from feast.data_source import RequestSource
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
from feast.protos.feast.core.RequestFeatureView_pb2 import (
Expand All @@ -20,7 +20,7 @@ class RequestFeatureView(BaseFeatureView):
Attributes:
name: The unique name of the request feature view.
request_data_source: The request data source that specifies the schema and
request_source: The request source that specifies the schema and
features of the request feature view.
features: The list of features defined as part of this request feature view.
description: A human-readable description.
Expand All @@ -30,7 +30,7 @@ class RequestFeatureView(BaseFeatureView):
"""

name: str
request_data_source: RequestDataSource
request_source: RequestSource
features: List[Feature]
description: str
tags: Dict[str, str]
Expand All @@ -40,7 +40,7 @@ class RequestFeatureView(BaseFeatureView):
def __init__(
self,
name: str,
request_data_source: RequestDataSource,
request_data_source: RequestSource,
description: str = "",
tags: Optional[Dict[str, str]] = None,
owner: str = "",
Expand Down Expand Up @@ -73,7 +73,7 @@ def __init__(
tags=tags,
owner=owner,
)
self.request_data_source = request_data_source
self.request_source = request_data_source

@property
def proto_class(self) -> Type[RequestFeatureViewProto]:
Expand All @@ -88,7 +88,7 @@ def to_proto(self) -> RequestFeatureViewProto:
"""
spec = RequestFeatureViewSpec(
name=self.name,
request_data_source=self.request_data_source.to_proto(),
request_data_source=self.request_source.to_proto(),
description=self.description,
tags=self.tags,
owner=self.owner,
Expand All @@ -110,7 +110,7 @@ def from_proto(cls, request_feature_view_proto: RequestFeatureViewProto):

request_feature_view_obj = cls(
name=request_feature_view_proto.spec.name,
request_data_source=RequestDataSource.from_proto(
request_data_source=RequestSource.from_proto(
request_feature_view_proto.spec.request_data_source
),
description=request_feature_view_proto.spec.description,
Expand All @@ -127,8 +127,6 @@ def from_proto(cls, request_feature_view_proto: RequestFeatureViewProto):
return request_feature_view_obj

def __copy__(self):
fv = RequestFeatureView(
name=self.name, request_data_source=self.request_data_source
)
fv = RequestFeatureView(name=self.name, request_data_source=self.request_source)
fv.projection = copy.copy(self.projection)
return fv
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
)
from tests.integration.feature_repos.universal.feature_views import (
conv_rate_plus_100_feature_view,
create_conv_rate_request_data_source,
create_conv_rate_request_source,
create_customer_daily_profile_feature_view,
create_driver_hourly_stats_feature_view,
create_field_mapping_feature_view,
Expand Down Expand Up @@ -279,7 +279,7 @@ def construct_universal_feature_views(
driver_odfv=conv_rate_plus_100_feature_view(
{
"driver": driver_hourly_stats,
"input_request": create_conv_rate_request_data_source(),
"input_request": create_conv_rate_request_source(),
}
)
if with_odfv
Expand Down
Loading

0 comments on commit 0c9e5b7

Please sign in to comment.