Skip to content

Commit

Permalink
fix: Fix push sources and add docs / tests pushing via the python fea…
Browse files Browse the repository at this point in the history
…ture server (#2561)

* bug: Fixing push API endpoint to include a way to specify whether the registry should be looked up from a cache. Adding docs for feature server usage

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix

Signed-off-by: Danny Chiao <danny@tecton.ai>

* prune out unneeded fields in push source

Signed-off-by: Danny Chiao <danny@tecton.ai>

* prune out unneeded fields in push source

Signed-off-by: Danny Chiao <danny@tecton.ai>

* prune out unneeded fields in push source

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix comment

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix comment

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix comment

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix comment

Signed-off-by: Danny Chiao <danny@tecton.ai>

* fix generator

Signed-off-by: Danny Chiao <danny@tecton.ai>

* add data source creator teardown

Signed-off-by: Danny Chiao <danny@tecton.ai>

* add data source creator teardown

Signed-off-by: Danny Chiao <danny@tecton.ai>

* update push source to alpha

Signed-off-by: Danny Chiao <danny@tecton.ai>

* lint

Signed-off-by: Danny Chiao <danny@tecton.ai>

* lint

Signed-off-by: Danny Chiao <danny@tecton.ai>

* lint

Signed-off-by: Danny Chiao <danny@tecton.ai>

* lint

Signed-off-by: Danny Chiao <danny@tecton.ai>

* lint

Signed-off-by: Danny Chiao <danny@tecton.ai>
adchia authored Apr 19, 2022
1 parent d1f76e5 commit e8e418e
Showing 15 changed files with 221 additions and 82 deletions.
2 changes: 1 addition & 1 deletion docs/SUMMARY.md
Original file line number Diff line number Diff line change
@@ -81,7 +81,7 @@
* [feature\_store.yaml](reference/feature-repository/feature-store-yaml.md)
* [.feastignore](reference/feature-repository/feast-ignore.md)
* [Feature servers](reference/feature-servers/README.md)
* [Local feature server](reference/feature-servers/local-feature-server.md)
* [Python feature server](reference/feature-servers/python-feature-server.md)
* [Go-based feature retrieval](reference/feature-servers/go-feature-retrieval.md)
* [\[Alpha\] Data quality monitoring](reference/dqm.md)
* [\[Alpha\] On demand feature view](reference/alpha-on-demand-feature-view.md)
12 changes: 6 additions & 6 deletions docs/reference/data-sources/push.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# Push source

**Warning**: This is an _experimental_ feature. It's intended for early testing and feedback, and could change without warnings in future releases.

## Description

Push sources allow feature values to be pushed to the online store in real time. This allows fresh feature values to be made available to applications. Push sources supercede the
@@ -31,18 +33,14 @@ from feast.types import Int64

push_source = PushSource(
name="push_source",
schema=[
Field(name="user_id", dtype=Int64),
Field(name="life_time_value", dtype=Int64)
],
batch_source=BigQuerySource(table="test.test"),
)

fv = FeatureView(
name="feature view",
entities=["user_id"],
schema=[Field(name="life_time_value", dtype=Int64)],
stream_source=push_source,
source=push_source,
)
```

@@ -53,6 +51,8 @@ import pandas as pd

fs = FeatureStore(...)
feature_data_frame = pd.DataFrame()
fs.push("push_source", feature_data_frame)
fs.push("push_source_name", feature_data_frame)
```

See also [Python feature server](../feature-servers/python-feature-server.md) for instructions on how to push data to a deployed feature server.

2 changes: 1 addition & 1 deletion docs/reference/feature-servers/README.md
Original file line number Diff line number Diff line change
@@ -2,4 +2,4 @@

Feast users can choose to retrieve features from a feature server, as opposed to through the Python SDK.

{% page-ref page="local-feature-server.md" %}
{% page-ref page="python-feature-server.md" %}
2 changes: 1 addition & 1 deletion docs/reference/feature-servers/go-feature-retrieval.md
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@

## Overview

The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](local-feature-server.md).
The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](python-feature-server.md).

Currently, this component only supports online serving and does not have an offline component including APIs to create feast feature repositories or apply configuration to the registry to facilitate online materialization. It also does not expose its own dedicated cli to perform feast actions. Furthermore, this component is only meant to expose an online serving API that can be called through the python SDK to facilitate faster online feature retrieval.

Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
# Local feature server
# Python feature server

## Overview

The local feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to get features from Feast using any programming language that can make HTTP requests. A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is also available. A remote feature server on GCP Cloud Run is currently being developed.
The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests.

## CLI

There is a new CLI command that starts the server: `feast serve`. By default Feast uses port 6566; the port be overridden by a `--port` flag.
There is a CLI command that starts the server: `feast serve`. By default, Feast uses port 6566; the port be overridden by a `--port` flag.

## Deploying as a service

One can also deploy a feature server by building a docker image that bundles in the project's `feature_store.yaml`. See [helm chart](https://github.com/feast-dev/feast/blob/master/infra/charts/feast-python-server) for example.

A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is available. A remote feature server on GCP Cloud Run is currently being developed.


## Example

### Initializing a feature server
Here's the local feature server usage example with the local template:

```bash
@@ -41,6 +49,7 @@ INFO: Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit)
09/10/2021 10:42:11 AM INFO:Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit)
```
### Retrieving features from the online store
After the server starts, we can execute cURL commands from another terminal tab:
```bash
@@ -142,3 +151,45 @@ curl -X POST \
}
}' | jq
```

### Pushing features to the online store
You can push data corresponding to a push source to the online store (note that timestamps need to be strings):

```text
curl -X POST "http://localhost:6566/push" -d '{
"push_source_name": "driver_hourly_stats_push_source",
"df": {
"driver_id": [1001],
"event_timestamp": ["2022-05-13 10:59:42"],
"created": ["2022-05-13 10:59:42"],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000]
}
}' | jq
```

or equivalently from Python:
```python
import json
import requests
import pandas as pd
from datetime import datetime
event_dict = {
"driver_id": [1001],
"event_timestamp": [str(datetime(2021, 5, 13, 10, 59, 42))],
"created": [str(datetime(2021, 5, 13, 10, 59, 42))],
"conv_rate": [1.0],
"acc_rate": [1.0],
"avg_daily_trips": [1000],
"string_feature": "test2",
}
push_data = {
"push_source_name":"driver_stats_push_source",
"df":event_dict
}
requests.post(
"http://localhost:6566/push",
data=json.dumps(push_data))
```
3 changes: 1 addition & 2 deletions protos/feast/core/DataSource.proto
Original file line number Diff line number Diff line change
@@ -222,8 +222,7 @@ message DataSource {
// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
// the online store on-demand, such as by stream consumers.
message PushOptions {
// Mapping of feature name to type
map<string, feast.types.ValueType.Enum> schema = 1;
reserved 1;
}


29 changes: 3 additions & 26 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@

from feast import type_map
from feast.data_format import StreamFormat
from feast.field import Field, from_value_type
from feast.field import Field
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.repo_config import RepoConfig, get_data_source_class_from_type
from feast.types import VALUE_TYPES_TO_FEAST_TYPES
@@ -714,45 +714,35 @@ class PushSource(DataSource):
A source that can be used to ingest features on request
"""

name: str
schema: List[Field]
# TODO(adchia): consider adding schema here in case where Feast manages pushing events to the offline store
# TODO(adchia): consider a "mode" to support pushing raw vs transformed events
batch_source: DataSource
timestamp_field: str

def __init__(
self,
*,
name: str,
schema: List[Field],
batch_source: DataSource,
description: Optional[str] = "",
tags: Optional[Dict[str, str]] = None,
owner: Optional[str] = "",
timestamp_field: Optional[str] = "",
):
"""
Creates a PushSource object.
Args:
name: Name of the push source
schema: Schema mapping from the input feature name to a ValueType
batch_source: The batch source that backs this push source. It's used when materializing from the offline
store to the online store, and when retrieving historical features.
description (optional): A human-readable description.
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
owner (optional): The owner of the data source, typically the email of the primary
maintainer.
timestamp_field (optional): Event timestamp foe;d used for point in time
joins of feature values.
"""
super().__init__(name=name, description=description, tags=tags, owner=owner)
self.schema = sorted(schema) # TODO: add schema inference from a batch source
self.batch_source = batch_source
if not self.batch_source:
raise ValueError(f"batch_source is needed for push source {self.name}")
if not timestamp_field:
raise ValueError(f"timestamp field is needed for push source {self.name}")
self.timestamp_field = timestamp_field

def validate(self, config: RepoConfig):
pass
@@ -764,38 +754,25 @@ def get_table_column_names_and_types(

@staticmethod
def from_proto(data_source: DataSourceProto):
schema_pb = data_source.push_options.schema
schema = []
for key, val in schema_pb.items():
schema.append(Field(name=key, dtype=from_value_type(ValueType(val))))

assert data_source.HasField("batch_source")
batch_source = DataSource.from_proto(data_source.batch_source)

return PushSource(
name=data_source.name,
schema=sorted(schema),
batch_source=batch_source,
timestamp_field=data_source.timestamp_field,
description=data_source.description,
tags=dict(data_source.tags),
owner=data_source.owner,
)

def to_proto(self) -> DataSourceProto:
schema_pb = {}
for field in self.schema:
schema_pb[field.name] = field.dtype.to_value_type().value
batch_source_proto = None
if self.batch_source:
batch_source_proto = self.batch_source.to_proto()

options = DataSourceProto.PushOptions(schema=schema_pb,)
data_source_proto = DataSourceProto(
name=self.name,
type=DataSourceProto.PUSH_SOURCE,
push_options=options,
timestamp_field=self.timestamp_field,
description=self.description,
tags=self.tags,
owner=self.owner,
4 changes: 1 addition & 3 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
@@ -94,9 +94,7 @@ def push(body=Depends(get_body)):
@app.post("/write-to-online-store")
def write_to_online_store(body=Depends(get_body)):
warnings.warn(
"write_to_online_store is an experimental feature. "
"This API is unstable and it could be changed in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
"write_to_online_store is deprecated. Please consider using /push instead",
RuntimeWarning,
)
try:
18 changes: 14 additions & 4 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
@@ -92,7 +92,6 @@

warnings.simplefilter("once", DeprecationWarning)


if TYPE_CHECKING:
from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer

@@ -1186,16 +1185,25 @@ def tqdm_builder(length):
)

@log_exceptions_and_usage
def push(self, push_source_name: str, df: pd.DataFrame):
def push(
self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True
):
"""
Push features to a push source. This updates all the feature views that have the push source as stream source.
Args:
push_source_name: The name of the push source we want to push data to.
df: the data being pushed.
allow_registry_cache: whether to allow cached versions of the registry.
"""
warnings.warn(
"Push source is an experimental feature. "
"This API is unstable and it could and might change in the future. "
"We do not guarantee that future changes will maintain backward compatibility.",
RuntimeWarning,
)
from feast.data_source import PushSource

all_fvs = self.list_feature_views(allow_cache=True)
all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)

fvs_with_push_sources = {
fv
@@ -1208,7 +1216,9 @@ def push(self, push_source_name: str, df: pd.DataFrame):
}

for fv in fvs_with_push_sources:
self.write_to_online_store(fv.name, df, allow_registry_cache=True)
self.write_to_online_store(
fv.name, df, allow_registry_cache=allow_registry_cache
)

@log_exceptions_and_usage
def write_to_online_store(
4 changes: 3 additions & 1 deletion sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@
from typing import List

from feast import BigQuerySource, Entity, FileSource, RedshiftSource, SnowflakeSource
from feast.data_source import DataSource, RequestSource
from feast.data_source import DataSource, PushSource, RequestSource
from feast.errors import RegistryInferenceFailure
from feast.feature_view import FeatureView
from feast.field import Field, from_value_type
@@ -74,6 +74,8 @@ def update_data_sources_with_inferred_event_timestamp_col(
for data_source in data_sources:
if isinstance(data_source, RequestSource):
continue
if isinstance(data_source, PushSource):
data_source = data_source.batch_source
if data_source.timestamp_field is None or data_source.timestamp_field == "":
# prepare right match pattern for data source
ts_column_type_regex_pattern = ""
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/online_stores/sqlite.py
Original file line number Diff line number Diff line change
@@ -230,7 +230,9 @@ def teardown(
def _initialize_conn(db_path: str):
Path(db_path).parent.mkdir(exist_ok=True)
return sqlite3.connect(
db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
db_path,
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
check_same_thread=False,
)


9 changes: 1 addition & 8 deletions sdk/python/tests/example_repos/example_feature_repo_1.py
Original file line number Diff line number Diff line change
@@ -40,14 +40,7 @@
)

driver_locations_push_source = PushSource(
name="driver_locations_push",
schema=[
Field(name="driver_id", dtype=String),
Field(name="driver_lat", dtype=Float32),
Field(name="driver_long", dtype=String),
],
batch_source=driver_locations_source,
timestamp_field="event_timestamp",
name="driver_locations_push", batch_source=driver_locations_source,
)

driver = Entity(
121 changes: 121 additions & 0 deletions sdk/python/tests/integration/e2e/test_python_feature_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import contextlib
import json
from datetime import datetime
from typing import List

import pytest
from fastapi.testclient import TestClient

from feast.feast_object import FeastObject
from feast.feature_server import get_app
from tests.integration.feature_repos.integration_test_repo_config import (
IntegrationTestRepoConfig,
)
from tests.integration.feature_repos.repo_configuration import (
construct_test_environment,
construct_universal_feature_views,
construct_universal_test_data,
)
from tests.integration.feature_repos.universal.entities import (
customer,
driver,
location,
)


@pytest.mark.integration
@pytest.mark.universal
def test_get_online_features():
with setup_python_fs_client() as client:
request_data_dict = {
"features": [
"driver_stats:conv_rate",
"driver_stats:acc_rate",
"driver_stats:avg_daily_trips",
],
"entities": {"driver_id": [5001, 5002]},
}
response = client.post(
"/get-online-features", data=json.dumps(request_data_dict)
)

# Check entities and features are present
parsed_response = json.loads(response.text)
assert "metadata" in parsed_response
metadata = parsed_response["metadata"]
expected_features = ["driver_id", "conv_rate", "acc_rate", "avg_daily_trips"]
response_feature_names = metadata["feature_names"]
assert len(response_feature_names) == len(expected_features)
for expected_feature in expected_features:
assert expected_feature in response_feature_names
assert "results" in parsed_response
results = parsed_response["results"]
for result in results:
# Same order as in metadata
assert len(result["statuses"]) == 2 # Requested two entities
for status in result["statuses"]:
assert status == "PRESENT"
results_driver_id_index = response_feature_names.index("driver_id")
assert (
results[results_driver_id_index]["values"]
== request_data_dict["entities"]["driver_id"]
)


@pytest.mark.integration
@pytest.mark.universal
def test_push():
with setup_python_fs_client() as client:
initial_temp = get_temperatures(client, location_ids=[1])[0]
json_data = json.dumps(
{
"push_source_name": "location_stats_push_source",
"df": {
"location_id": [1],
"temperature": [initial_temp * 100],
"event_timestamp": [str(datetime.utcnow())],
"created": [str(datetime.utcnow())],
},
}
)
response = client.post("/push", data=json_data,)

# Check new pushed temperature is fetched
assert response.status_code == 200
assert get_temperatures(client, location_ids=[1]) == [initial_temp * 100]


def get_temperatures(client, location_ids: List[int]):
get_request_data = {
"features": ["pushable_location_stats:temperature"],
"entities": {"location_id": location_ids},
}
response = client.post("/get-online-features", data=json.dumps(get_request_data))
parsed_response = json.loads(response.text)
assert "metadata" in parsed_response
metadata = parsed_response["metadata"]
response_feature_names = metadata["feature_names"]
assert "results" in parsed_response
results = parsed_response["results"]
results_temperature_index = response_feature_names.index("temperature")
return results[results_temperature_index]["values"]


@contextlib.contextmanager
def setup_python_fs_client():
config = IntegrationTestRepoConfig()
environment = construct_test_environment(config)
fs = environment.feature_store
try:
entities, datasets, data_sources = construct_universal_test_data(environment)
feature_views = construct_universal_feature_views(data_sources)
feast_objects: List[FeastObject] = []
feast_objects.extend(feature_views.values())
feast_objects.extend([driver(), customer(), location()])
fs.apply(feast_objects)
fs.materialize(environment.start_date, environment.end_date)
client = TestClient(get_app(fs))
yield client
finally:
fs.teardown()
environment.data_source_creator.teardown()
Original file line number Diff line number Diff line change
@@ -13,7 +13,7 @@
ValueType,
)
from feast.data_source import DataSource, RequestSource
from feast.types import Array, FeastType, Float32, Float64, Int32, Int64
from feast.types import Array, FeastType, Float32, Float64, Int32
from tests.integration.feature_repos.universal.entities import location


@@ -65,19 +65,19 @@ def conv_rate_plus_100(features_df: pd.DataFrame) -> pd.DataFrame:
def conv_rate_plus_100_feature_view(
sources: Dict[str, Union[RequestSource, FeatureView]],
infer_features: bool = False,
features: Optional[List[Feature]] = None,
features: Optional[List[Field]] = None,
) -> OnDemandFeatureView:
# Test that positional arguments and Features still work for ODFVs.
_features = features or [
Feature(name="conv_rate_plus_100", dtype=ValueType.DOUBLE),
Feature(name="conv_rate_plus_val_to_add", dtype=ValueType.DOUBLE),
Feature(name="conv_rate_plus_100_rounded", dtype=ValueType.INT32),
Field(name="conv_rate_plus_100", dtype=Float64),
Field(name="conv_rate_plus_val_to_add", dtype=Float64),
Field(name="conv_rate_plus_100_rounded", dtype=Int32),
]
return OnDemandFeatureView(
conv_rate_plus_100.__name__,
[] if infer_features else _features,
sources,
conv_rate_plus_100,
name=conv_rate_plus_100.__name__,
schema=[] if infer_features else _features,
sources=sources,
udf=conv_rate_plus_100,
)


@@ -237,13 +237,7 @@ def create_field_mapping_feature_view(source):

def create_pushable_feature_view(batch_source: DataSource):
push_source = PushSource(
name="location_stats_push_source",
schema=[
Field(name="location_id", dtype=Int64),
Field(name="temperature", dtype=Int32),
],
timestamp_field="timestamp",
batch_source=batch_source,
name="location_stats_push_source", batch_source=batch_source,
)
return FeatureView(
name="pushable_location_stats",
10 changes: 1 addition & 9 deletions sdk/python/tests/unit/test_data_sources.py
Original file line number Diff line number Diff line change
@@ -9,13 +9,7 @@

def test_push_with_batch():
push_source = PushSource(
name="test",
schema=[
Field(name="f1", dtype=PrimitiveFeastType.FLOAT32),
Field(name="f2", dtype=PrimitiveFeastType.BOOL),
],
timestamp_field="event_timestamp",
batch_source=BigQuerySource(table="test.test"),
name="test", batch_source=BigQuerySource(table="test.test"),
)
push_source_proto = push_source.to_proto()
assert push_source_proto.HasField("batch_source")
@@ -25,8 +19,6 @@ def test_push_with_batch():
push_source_unproto = PushSource.from_proto(push_source_proto)

assert push_source.name == push_source_unproto.name
assert push_source.schema == push_source_unproto.schema
assert push_source.timestamp_field == push_source_unproto.timestamp_field
assert push_source.batch_source.name == push_source_unproto.batch_source.name


0 comments on commit e8e418e

Please sign in to comment.