Skip to content

Commit

Permalink
feat: Offline push endpoint for pushing to offline stores (#2837)
Browse files Browse the repository at this point in the history
* Skaffolding for offline store push

Signed-off-by: Kevin Zhang <[email protected]>

* LInt

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* File source offline push

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Address review comments

Signed-off-by: Kevin Zhang <[email protected]>

* Add redshift function

Signed-off-by: Kevin Zhang <[email protected]>

* Add redshift

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Lint

Signed-off-by: Kevin Zhang <[email protected]>

* fix

Signed-off-by: Kevin Zhang <[email protected]>

* fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix test

Signed-off-by: Kevin Zhang <[email protected]>

* Fix test

Signed-off-by: Kevin Zhang <[email protected]>

* Fix test

Signed-off-by: Kevin Zhang <[email protected]>

* Fix interface

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Update

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Fix rebase

Signed-off-by: Kevin Zhang <[email protected]>

* Fix naming

Signed-off-by: Kevin Zhang <[email protected]>

* Fix

Signed-off-by: Kevin Zhang <[email protected]>

* Uncomment

Signed-off-by: Kevin Zhang <[email protected]>
  • Loading branch information
kevjumba authored Jun 22, 2022
1 parent fde7075 commit a88cd30
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 12 deletions.
11 changes: 8 additions & 3 deletions docs/reference/feature-servers/python-feature-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## Overview

The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests.
The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests.

## CLI

Expand Down Expand Up @@ -155,6 +155,10 @@ curl -X POST \
### Pushing features to the online store
You can push data corresponding to a push source to the online store (note that timestamps need to be strings):

You can also define a pushmode to push offline data, either to the online store, offline store, or both. The feature server will throw an error if the online/offline
store doesn't support the push api functionality.
The request definition for pushmode is a string parameter `to` where the options are: ["online", "offline", "both"].
```text
curl -X POST "http://localhost:6566/push" -d '{
"push_source_name": "driver_hourly_stats_push_source",
Expand Down Expand Up @@ -187,9 +191,10 @@ event_dict = {
}
push_data = {
"push_source_name":"driver_stats_push_source",
"df":event_dict
"df":event_dict,
"to":"online",
}
requests.post(
"http://localhost:6566/push",
"http://localhost:6566/push",
data=json.dumps(push_data))
```
6 changes: 6 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,12 @@ def to_proto(self) -> DataSourceProto:
return data_source_proto


class PushMode(enum.Enum):
ONLINE = 1
OFFLINE = 2
ONLINE_AND_OFFLINE = 3


@typechecked
class PushSource(DataSource):
"""
Expand Down
9 changes: 9 additions & 0 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import feast
from feast import proto_json
from feast.data_source import PushMode
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest


Expand All @@ -26,6 +27,7 @@ class PushFeaturesRequest(BaseModel):
push_source_name: str
df: dict
allow_registry_cache: bool = True
to: str = "online"


def get_app(store: "feast.FeatureStore"):
Expand Down Expand Up @@ -80,10 +82,17 @@ def push(body=Depends(get_body)):
try:
request = PushFeaturesRequest(**json.loads(body))
df = pd.DataFrame(request.df)
if request.to == "offline":
to = PushMode.OFFLINE
elif request.to == "online":
to = PushMode.ONLINE
else:
to = PushMode.ONLINE_AND_OFFLINE
store.push(
push_source_name=request.push_source_name,
df=df,
allow_registry_cache=request.allow_registry_cache,
to=to,
)
except Exception as e:
# Print the original exception on the server side
Expand Down
24 changes: 17 additions & 7 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from feast import feature_server, flags, flags_helper, ui_server, utils
from feast.base_feature_view import BaseFeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.data_source import DataSource
from feast.data_source import DataSource, PushMode
from feast.diff.infra_diff import InfraDiff, diff_infra_protos
from feast.diff.registry_diff import RegistryDiff, apply_diff_to_registry, diff_between
from feast.dqm.errors import ValidationFailed
Expand Down Expand Up @@ -1341,15 +1341,20 @@ def tqdm_builder(length):

@log_exceptions_and_usage
def push(
self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True
self,
push_source_name: str,
df: pd.DataFrame,
allow_registry_cache: bool = True,
to: PushMode = PushMode.ONLINE,
):
"""
Push features to a push source. This updates all the feature views that have the push source as stream source.
Args:
push_source_name: The name of the push source we want to push data to.
df: the data being pushed.
allow_registry_cache: whether to allow cached versions of the registry.
df: The data being pushed.
allow_registry_cache: Whether to allow cached versions of the registry.
to: Whether to push to online or offline store. Defaults to online store only.
"""
warnings.warn(
"Push source is an experimental feature. "
Expand All @@ -1373,9 +1378,14 @@ def push(
}

for fv in fvs_with_push_sources:
self.write_to_online_store(
fv.name, df, allow_registry_cache=allow_registry_cache
)
if to == PushMode.ONLINE or to == PushMode.ONLINE_AND_OFFLINE:
self.write_to_online_store(
fv.name, df, allow_registry_cache=allow_registry_cache
)
if to == PushMode.OFFLINE or to == PushMode.ONLINE_AND_OFFLINE:
self._write_to_offline_store(
fv.name, df, allow_registry_cache=allow_registry_cache
)

@log_exceptions_and_usage
def write_to_online_store(
Expand Down
4 changes: 3 additions & 1 deletion sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ def offline_write_batch(
set_usage_attribute("provider", self.__class__.__name__)

if self.offline_store:
self.offline_store.offline_write_batch(config, feature_view, data, progress)
self.offline_store.__class__.offline_write_batch(
config, feature_view, data, progress
)

@log_exceptions_and_usage(sampler=RatioSampler(ratio=0.001))
def online_read(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ def test_writing_consecutively_to_offline_store(environment, universal_data_sour
Field(name="acc_rate", dtype=Float32),
],
source=data_sources.driver,
ttl=timedelta(minutes=10),
ttl=timedelta(
minutes=10
), # This is to make sure all offline store data is out of date since get_historical_features() only searches backwards for a ttl window.
)

now = datetime.utcnow()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import datetime

import numpy as np
import pandas as pd
import pytest

from feast.data_source import PushMode
from tests.integration.feature_repos.repo_configuration import (
construct_universal_feature_views,
)
from tests.integration.feature_repos.universal.entities import (
customer,
driver,
location,
)


@pytest.mark.integration
@pytest.mark.universal_offline_stores(only=["file", "redshift"])
@pytest.mark.universal_online_stores(only=["sqlite"])
def test_push_features_and_read_from_offline_store(environment, universal_data_sources):
store = environment.feature_store

(_, _, data_sources) = universal_data_sources
feature_views = construct_universal_feature_views(data_sources)
now = pd.Timestamp(datetime.datetime.utcnow()).round("ms")

store.apply([driver(), customer(), location(), *feature_views.values()])
entity_df = pd.DataFrame.from_dict({"location_id": [1], "event_timestamp": [now]})

before_df = store.get_historical_features(
entity_df=entity_df,
features=["pushable_location_stats:temperature"],
full_feature_names=False,
).to_df()

data = {
"event_timestamp": [now],
"location_id": [1],
"temperature": [4],
"created": [now],
}
df_ingest = pd.DataFrame(data)
assert np.where(
before_df["location_id"].reset_index(drop=True)
== df_ingest["location_id"].reset_index(drop=True)
)
assert np.where(
before_df["temperature"].reset_index(drop=True)
!= df_ingest["temperature"].reset_index(drop=True)
)

store.push("location_stats_push_source", df_ingest, to=PushMode.OFFLINE)

df = store.get_historical_features(
entity_df=entity_df,
features=["pushable_location_stats:temperature"],
full_feature_names=False,
).to_df()
assert np.where(
df["location_id"].reset_index(drop=True)
== df_ingest["location_id"].reset_index(drop=True)
)
assert np.where(
df["temperature"].reset_index(drop=True)
== df_ingest["temperature"].reset_index(drop=True)
)

0 comments on commit a88cd30

Please sign in to comment.