Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix push sources and add docs / tests pushing via the python feature server #2561

Merged
merged 18 commits into from
Apr 19, 2022
2 changes: 1 addition & 1 deletion docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
* [feature\_store.yaml](reference/feature-repository/feature-store-yaml.md)
* [.feastignore](reference/feature-repository/feast-ignore.md)
* [Feature servers](reference/feature-servers/README.md)
* [Local feature server](reference/feature-servers/local-feature-server.md)
* [Python feature server](reference/feature-servers/python-feature-server.md)
* [Go-based feature retrieval](reference/feature-servers/go-feature-retrieval.md)
* [\[Alpha\] Data quality monitoring](reference/dqm.md)
* [\[Alpha\] On demand feature view](reference/alpha-on-demand-feature-view.md)
Expand Down
12 changes: 6 additions & 6 deletions docs/reference/data-sources/push.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Push source

**Warning**: This is an _experimental_ feature. It's intended for early testing and feedback, and could change without warnings in future releases.

## Description

Push sources allow feature values to be pushed to the online store in real time. This allows fresh feature values to be made available to applications. Push sources supercede the
Expand Down Expand Up @@ -31,18 +33,14 @@ from feast.types import Int64

push_source = PushSource(
name="push_source",
schema=[
Field(name="user_id", dtype=Int64),
Field(name="life_time_value", dtype=Int64)
],
batch_source=BigQuerySource(table="test.test"),
)

fv = FeatureView(
name="feature view",
entities=["user_id"],
schema=[Field(name="life_time_value", dtype=Int64)],
stream_source=push_source,
source=push_source,
)
```

Expand All @@ -53,6 +51,8 @@ import pandas as pd

fs = FeatureStore(...)
feature_data_frame = pd.DataFrame()
fs.push("push_source", feature_data_frame)
fs.push("push_source_name", feature_data_frame)
```

See also [Python feature server](../feature-servers/python-feature-server.md) for instructions on how to push data to a deployed feature server.

2 changes: 1 addition & 1 deletion docs/reference/feature-servers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

Feast users can choose to retrieve features from a feature server, as opposed to through the Python SDK.

{% page-ref page="local-feature-server.md" %}
{% page-ref page="python-feature-server.md" %}
2 changes: 1 addition & 1 deletion docs/reference/feature-servers/go-feature-retrieval.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Overview

The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](local-feature-server.md).
The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](python-feature-server.md).

Currently, this component only supports online serving and does not have an offline component including APIs to create feast feature repositories or apply configuration to the registry to facilitate online materialization. It also does not expose its own dedicated cli to perform feast actions. Furthermore, this component is only meant to expose an online serving API that can be called through the python SDK to facilitate faster online feature retrieval.

Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
# Local feature server
# Python feature server

## Overview

The local feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to get features from Feast using any programming language that can make HTTP requests. A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is also available. A remote feature server on GCP Cloud Run is currently being developed.
The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests.

## CLI

There is a new CLI command that starts the server: `feast serve`. By default Feast uses port 6566; the port be overridden by a `--port` flag.
There is a CLI command that starts the server: `feast serve`. By default, Feast uses port 6566; the port be overridden by a `--port` flag.

## Deploying as a service

One can also deploy a feature server by building a docker image that bundles in the project's `feature_store.yaml`. See [helm chart](https://github.com/feast-dev/feast/blob/master/infra/charts/feast-python-server) for example.

A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is available. A remote feature server on GCP Cloud Run is currently being developed.


## Example

### Initializing a feature server
Here's the local feature server usage example with the local template:

```bash
Expand Down Expand Up @@ -41,6 +49,7 @@ INFO: Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit)
09/10/2021 10:42:11 AM INFO:Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit)
```

### Retrieving features from the online store
After the server starts, we can execute cURL commands from another terminal tab:

```bash
Expand Down Expand Up @@ -142,3 +151,45 @@ curl -X POST \
}
}' | jq
```

### Pushing features to the online store
You can push data corresponding to a push source to the online store (note that timestamps need to be strings):

```text
curl -X POST "http://localhost:6566/push" -d '{
"push_source_name": "driver_hourly_stats_push_source",
"df": {
"driver_id": [1001],
"event_timestamp": ["2022-05-13 10:59:42"],
"created": ["2022-05-13 10:59:42"],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000]
}
}' | jq
```

or equivalently from Python:
```python
import json
import requests
import pandas as pd
from datetime import datetime

event_dict = {
"driver_id": [1001],
"event_timestamp": [str(datetime(2021, 5, 13, 10, 59, 42))],
"created": [str(datetime(2021, 5, 13, 10, 59, 42))],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
"string_feature": "test2",
}
push_data = {
"push_source_name":"driver_stats_push_source",
"df":event_dict
}
requests.post(
"http://localhost:6566/push",
data=json.dumps(push_data))
```
3 changes: 1 addition & 2 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,7 @@ message DataSource {
// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
// the online store on-demand, such as by stream consumers.
message PushOptions {
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 1;
reserved 1;
}


Expand Down
29 changes: 3 additions & 26 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

from feast import type_map
from feast.data_format import StreamFormat
from feast.field import Field, from_value_type
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig, get_data_source_class_from_type
from feast.types import VALUE_TYPES_TO_FEAST_TYPES
Expand Down Expand Up @@ -714,45 +714,35 @@ class PushSource(DataSource):
A source that can be used to ingest features on request
"""

name: str
schema: List[Field]
# TODO(adchia): consider adding schema here in case where Feast manages pushing events to the offline store
# TODO(adchia): consider a "mode" to support pushing raw vs transformed events
batch_source: DataSource
timestamp_field: str

def __init__(
self,
*,
name: str,
schema: List[Field],
batch_source: DataSource,
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
):
"""
Creates a PushSource object.
Args:
name: Name of the push source
schema: Schema mapping from the input feature name to a ValueType
batch_source: The batch source that backs this push source. It's used when materializing from the offline
store to the online store, and when retrieving historical features.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the data source, typically the email of the primary
maintainer.
timestamp_field (optional): Event timestamp foe;d used for point in time
joins of feature values.

"""
super().__init__(name=name, description=description, tags=tags, owner=owner)
self.schema = sorted(schema) # TODO: add schema inference from a batch source
self.batch_source = batch_source
if not self.batch_source:
raise ValueError(f"batch_source is needed for push source {self.name}")
if not timestamp_field:
raise ValueError(f"timestamp field is needed for push source {self.name}")
self.timestamp_field = timestamp_field

def validate(self, config: RepoConfig):
pass
Expand All @@ -764,38 +754,25 @@ def get_table_column_names_and_types(

@staticmethod
def from_proto(data_source: DataSourceProto):
schema_pb = data_source.push_options.schema
schema = []
for key, val in schema_pb.items():
schema.append(Field(name=key, dtype=from_value_type(ValueType(val))))

assert data_source.HasField("batch_source")
batch_source = DataSource.from_proto(data_source.batch_source)

return PushSource(
name=data_source.name,
schema=sorted(schema),
batch_source=batch_source,
timestamp_field=data_source.timestamp_field,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for field in self.schema:
schema_pb[field.name] = field.dtype.to_value_type().value
batch_source_proto = None
if self.batch_source:
batch_source_proto = self.batch_source.to_proto()

options = DataSourceProto.PushOptions(schema=schema_pb,)
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.PUSH_SOURCE,
push_options=options,
timestamp_field=self.timestamp_field,
description=self.description,
tags=self.tags,
owner=self.owner,
Expand Down
4 changes: 1 addition & 3 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ def push(body=Depends(get_body)):
@app.post("/write-to-online-store")
def write_to_online_store(body=Depends(get_body)):
warnings.warn(
"write_to_online_store is an experimental feature. "
"This API is unstable and it could be changed in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
"write_to_online_store is deprecated. Please consider using /push instead",
RuntimeWarning,
)
try:
Expand Down
18 changes: 14 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@

warnings.simplefilter("once", DeprecationWarning)


if TYPE_CHECKING:
from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer

Expand Down Expand Up @@ -1186,16 +1185,25 @@ def tqdm_builder(length):
)

@log_exceptions_and_usage
def push(self, push_source_name: str, df: pd.DataFrame):
def push(
self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True
):
"""
Push features to a push source. This updates all the feature views that have the push source as stream source.
Args:
push_source_name: The name of the push source we want to push data to.
df: the data being pushed.
allow_registry_cache: whether to allow cached versions of the registry.
"""
warnings.warn(
"Push source is an experimental feature. "
"This API is unstable and it could and might change in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
RuntimeWarning,
)
from feast.data_source import PushSource

all_fvs = self.list_feature_views(allow_cache=True)
all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)

fvs_with_push_sources = {
fv
Expand All @@ -1208,7 +1216,9 @@ def push(self, push_source_name: str, df: pd.DataFrame):
}

for fv in fvs_with_push_sources:
self.write_to_online_store(fv.name, df, allow_registry_cache=True)
self.write_to_online_store(
fv.name, df, allow_registry_cache=allow_registry_cache
)

@log_exceptions_and_usage
def write_to_online_store(
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List

from feast import BigQuerySource, Entity, FileSource, RedshiftSource, SnowflakeSource
from feast.data_source import DataSource, RequestSource
from feast.data_source import DataSource, PushSource, RequestSource
from feast.errors import RegistryInferenceFailure
from feast.feature_view import FeatureView
from feast.field import Field, from_value_type
Expand Down Expand Up @@ -74,6 +74,8 @@ def update_data_sources_with_inferred_event_timestamp_col(
for data_source in data_sources:
if isinstance(data_source, RequestSource):
continue
if isinstance(data_source, PushSource):
data_source = data_source.batch_source
if data_source.timestamp_field is None or data_source.timestamp_field == "":
# prepare right match pattern for data source
ts_column_type_regex_pattern = ""
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ def teardown(
def _initialize_conn(db_path: str):
Path(db_path).parent.mkdir(exist_ok=True)
return sqlite3.connect(
db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
db_path,
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
check_same_thread=False,
)


Expand Down
9 changes: 1 addition & 8 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,7 @@
)

driver_locations_push_source = PushSource(
name="driver_locations_push",
schema=[
Field(name="driver_id", dtype=String),
Field(name="driver_lat", dtype=Float32),
Field(name="driver_long", dtype=String),
],
batch_source=driver_locations_source,
timestamp_field="event_timestamp",
name="driver_locations_push", batch_source=driver_locations_source,
)

driver = Entity(
Expand Down
Loading