Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remote offline server-IT #9

Merged
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
47faa21
feat: Added offline store remote deployment functionly using arrow fl…
redhatHameed May 7, 2024
e7cd32f
Merge pull request #1 from redhatHameed/remote-offline
dmartinol May 9, 2024
0d0cffd
Initial functional commit for remote get_historical_features
dmartinol May 9, 2024
ed36334
remote offline store example
dmartinol May 9, 2024
fe4186c
removing unneeded test code and fixinf impotrts
dmartinol May 9, 2024
01fa2f6
Merge pull request #2 from dmartinol/remote_offline
redhatHameed May 9, 2024
299650f
call do_put only once, postpone the invocation of do_put and simplifi…
dmartinol May 10, 2024
f297711
added primitive parameters to the command descriptor
dmartinol May 10, 2024
31d1fe8
removed redundant param
dmartinol May 13, 2024
ec763de
Merge pull request #4 from dmartinol/remote_offline
redhatHameed May 13, 2024
252afb9
Merge branch 'feast-dev:master' into remote_offline
redhatHameed May 13, 2024
22afc10
Merge branch 'feast-dev:master' into remote_offline
redhatHameed May 15, 2024
36c5c37
Initial skeleton of unit test for offline server
dmartinol May 15, 2024
889f89b
added unit test for offline store remote client
redhatHameed May 14, 2024
cb27b24
Merge pull request #6 from redhatHameed/update_client
dmartinol May 17, 2024
331975e
Merge remote-tracking branch 'upstream/remote_offline' into remote_of…
dmartinol May 17, 2024
cfa6189
testing all offlinestore APIs
dmartinol May 17, 2024
77ae13c
Updated remote offline server readme with the capability to init with…
tmihalac May 17, 2024
704fa34
integrated comments
dmartinol May 17, 2024
c52cc51
Merge remote-tracking branch 'upstream/remote_offline' into remote_of…
dmartinol May 17, 2024
b56b826
Merge pull request #5 from dmartinol/remote_offline
redhatHameed May 17, 2024
f599470
Merge branch 'feast-dev:master' into remote_offline
redhatHameed May 17, 2024
b73ec69
Merge branch 'feast-dev:master' into remote_offline
redhatHameed May 20, 2024
aa9bcbd
Merge branch 'feast-dev:master' into remote_offline
redhatHameed May 21, 2024
d901ac2
Merge remote-tracking branch 'origin/master' into remote_offline
dmartinol May 22, 2024
a1f660e
added RemoteOfflineStoreDataSourceCreator,
dmartinol May 27, 2024
c768eda
added missing CI requirement
dmartinol May 27, 2024
1c4004a
fixed linter
dmartinol May 27, 2024
cf3ddea
fixed multiprocess CI requirement
dmartinol May 27, 2024
3e181aa
updated CI dependencies
dmartinol May 28, 2024
19ee7f8
Merge remote-tracking branch 'upstream/remote_offline' into remote_of…
dmartinol May 29, 2024
37f7c21
fix test errors
dmartinol May 29, 2024
600fa4a
managing feature view aliases and restored skipped tests
dmartinol May 29, 2024
b466f8c
fixced linter issue
dmartinol May 29, 2024
ef42424
fixed broken test
dmartinol May 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions examples/remote-offline-store/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
# Feast Offline store with Arrow Flight remote server

This POC proposes a solution to implement the issue [Remote offline feature server deployment](https://github.com/feast-dev/feast/issues/4032)
using [Arrow Flight](https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/) server as the remote Feast offline store.

## Architecture
The [offline_server](./offline_server) folder contains a sample Feast repository using the `feast init` command.
We can serve remote offline requests using the `feast server_offline` command that has been introduced as part of this new feature:
* Spins up an `Arrow Flight` server at default port 8815.
* The server accepts `do_get` requests for the `get_historical_features` command and delegates the implementation to the
current `FeatureStore`.

*Note*: The offline server can be initialized by providing the `feature_store.yml` file from an environment variable named `FEATURE_STORE_YAML_BASE64`. A temporary directory will be created with the provided yaml as `feature_store.yml` file in it.

The [offline_client](./offline_client) folder includes a test python function that uses an offline store of type `remote`, leveraging the remote server as the
actual data provider. The offline store is implementated by the new `RemoteOfflineStore` class:
* For now it implements only the `get_historical_features` method.
* Since the implementation is lazy, the returned `RetrievalJob` does not run the `do_get` request until any method to synchronously execute
the underlying query is invoked (e.g., `to_df` or `to_arrow`).

## Parameter transfer protocol
The `Apache Arrow Flight` protocol defines generic APIs for efficient data streaming to and from the server, leveraging the gRPC communication framework.

The server exposes services to retrieve or push data streams associated with a particular `Flight` descriptor, using the `do_get` and `do_put` endpoints,
but the APIs are not sufficiently detailed for the use that we have in mind, e.g. **adopt the Arrow Flight server as a generic gRPC server
to serve the Feast offline APIs**.

Each API in the `OfflineStore` interface has multiple parameters that have to be transferred to the server to perform the actual implementation.
The way we implement the parameter transfer protocol is the following:
* The client, e.g. the `RemoteOfflineStore` instance, receives a call to the `get_historical_features` API with the required parameters
(e.g., `entity_df` and `feature_refs`).
* The client creates a unique identifier for a new command, using the UUID format, and generates a `Flight Descriptor` to represent it.
* Then the client sends the received API parameters to the server using multiple calls to the `do_put` service
* Each call includes the data stream of the parameter value (e.g. the `entity_df` DataFrame).
* Each call also adds additional metadata values to the data schema to let the server identify the exact API to invoke:
* A `command` metadata with the unique command identifier calculated before.
* An `api` metadata with the name of the API to be invoked remotely, e.g. `get_historical_features`.
* A `param` metadata with the name of each parameter of the API.
* When the server receives the `do_put` calls, it stores the data in memory, using an ad-hoc `flights` dictionary indexed by the unique
`command` identifier and storing a document with the streamed metadata values:
```json
{
"(b'8d6a366a-c8d3-4f96-b085-f9f686111815')": {
"command": "8d6a366a-c8d3-4f96-b085-f9f686111815",
"api": "get_historical_features",
"entity_df": ".....",
"features": "...."
}
}
```
* Indexing a flight descriptor by a unique `command` identifier enables the server to efficiently handle concurrent requests from multiple clients
for the same service without any overlaps.
* Since the client implementation is lazy, the returned instance of `RemoteRetrievalJob` invokes the `do_get` service on the server only when
the data is requested, e.g. in the `_to_arrow_internal` method.
* When the server receives the `do_get` request, it unpacks the API parameters from the `flights` dictionary and, if the requested API is
set to `get_historical_features`, forwards the request to the internal instance of `FeatureStore`.
* Once the `do_get` request is consumed, the associated flight is removed from the `flights` dictionary, as we do not expect the same
API request to be executed twice from any client.

Other APIs of the `OfflineStore` interface can be implemented the same way, assuming that both the client and the server implementation
agree on the parameter transfer protocol to be used to let the server execute the service remotely.

As an alternative, APIs that do not have any returned data may be implemented as a `do_action` service in the server.

## Validating the POC
### Launch the offline server
A default Feast store has been defined in [offline_server/feature_repo](./offline_server/feature_repo/) using the `feast init` command
and must be first initialized with:
```console
cd offline_server
feast -c feature_repo apply
```

Then the offline server can be started at the default port 8815 with:
```console
feast -c feature_repo serve_offline
```

Sample output:
```console
Serving on grpc+tcp://127.0.0.1:8815
get_historical_features for: entity_df from 0 to 2, features from driver_hourly_stats:conv_rate to transformed_conv_rate:conv_rate_plus_val2
```

## Launch a remote offline client
The test class is located under [offline_client](./offline_client/) and uses a remote configuration of the offline store to delegate the actual
implementation to the offline store server:
```yaml
offline_store:
type: remote
host: localhost
port: 8815
```

The test code in [test.py](./offline_client/test.py) initializes the store from the local configuration and then fetches the historical features
from the store like any other Feast client, but the actual implementation is delegated to the offline server
```py
store = FeatureStore(repo_path=".")
training_df = store.get_historical_features(entity_df, features).to_df()
```

Sample output of `cd offline_client; python test.py`:
```console
config.offline_store is <class 'feast.infra.offline_stores.remote.RemoteOfflineStoreConfig'>
----- Feature schema -----

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 10 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 driver_id 3 non-null int64
1 event_timestamp 3 non-null datetime64[ns, UTC]
2 label_driver_reported_satisfaction 3 non-null int64
3 val_to_add 3 non-null int64
4 val_to_add_2 3 non-null int64
5 conv_rate 3 non-null float32
6 acc_rate 3 non-null float32
7 avg_daily_trips 3 non-null int32
8 conv_rate_plus_val1 3 non-null float64
9 conv_rate_plus_val2 3 non-null float64
dtypes: datetime64[ns, UTC](1), float32(2), float64(2), int32(1), int64(4)
memory usage: 332.0 bytes
None

----- Features -----

driver_id event_timestamp label_driver_reported_satisfaction ... avg_daily_trips conv_rate_plus_val1 conv_rate_plus_val2
0 1001 2021-04-12 10:59:42+00:00 1 ... 590 1.022378 10.022378
1 1002 2021-04-12 08:12:10+00:00 5 ... 974 2.762213 20.762213
2 1003 2021-04-12 16:40:26+00:00 3 ... 127 3.419828 30.419828

[3 rows x 10 columns]
------training_df----
driver_id event_timestamp label_driver_reported_satisfaction ... avg_daily_trips conv_rate_plus_val1 conv_rate_plus_val2
0 1001 2021-04-12 10:59:42+00:00 1 ... 590 1.022378 10.022378
1 1002 2021-04-12 08:12:10+00:00 5 ... 974 2.762213 20.762213
2 1003 2021-04-12 16:40:26+00:00 3 ... 127 3.419828 30.419828

[3 rows x 10 columns]
```

Empty file.
10 changes: 10 additions & 0 deletions examples/remote-offline-store/offline_client/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
project: offline_server
# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry)
registry: ../offline_server/feature_repo/data/registry.db
# The provider primarily specifies default offline / online stores & storing the registry in a given cloud
provider: local
offline_store:
type: remote
host: localhost
port: 8815
entity_key_serialization_version: 2
40 changes: 40 additions & 0 deletions examples/remote-offline-store/offline_client/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from datetime import datetime
from feast import FeatureStore
import pandas as pd

entity_df = pd.DataFrame.from_dict(
{
"driver_id": [1001, 1002, 1003],
"event_timestamp": [
datetime(2021, 4, 12, 10, 59, 42),
datetime(2021, 4, 12, 8, 12, 10),
datetime(2021, 4, 12, 16, 40, 26),
],
"label_driver_reported_satisfaction": [1, 5, 3],
"val_to_add": [1, 2, 3],
"val_to_add_2": [10, 20, 30],
}
)

features = [
"driver_hourly_stats:conv_rate",
"driver_hourly_stats:acc_rate",
"driver_hourly_stats:avg_daily_trips",
"transformed_conv_rate:conv_rate_plus_val1",
"transformed_conv_rate:conv_rate_plus_val2",
]

store = FeatureStore(repo_path=".")

training_df = store.get_historical_features(entity_df, features).to_df()

print("----- Feature schema -----\n")
print(training_df.info())

print()
print("----- Features -----\n")
print(training_df.head())

print("------training_df----")

print(training_df)
Empty file.
Empty file.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# This is an example feature definition file

from datetime import timedelta

import pandas as pd
import os

from feast import (
Entity,
FeatureService,
FeatureView,
Field,
FileSource,
PushSource,
RequestSource,
)
from feast.on_demand_feature_view import on_demand_feature_view
from feast.types import Float32, Float64, Int64

# Define an entity for the driver. You can think of an entity as a primary key used to
# fetch features.
driver = Entity(name="driver", join_keys=["driver_id"])

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_stats_source = FileSource(
name="driver_hourly_stats_source",
path=f"{os.path.dirname(os.path.abspath(__file__))}/data/driver_stats.parquet",
timestamp_field="event_timestamp",
created_timestamp_column="created",
)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_stats_fv = FeatureView(
# The unique name of this feature view. Two feature views in a single
# project cannot have the same name
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(days=1),
# The list of features defined below act as a schema to both define features
# for both materialization of features into a store, and are used as references
# during retrieval for building a training dataset or serving features
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64, description="Average daily trips"),
],
online=True,
source=driver_stats_source,
# Tags are user defined key/value pairs that are attached to each
# feature view
tags={"team": "driver_performance"},
)

# Define a request data source which encodes features / information only
# available at request time (e.g. part of the user initiated HTTP request)
input_request = RequestSource(
name="vals_to_add",
schema=[
Field(name="val_to_add", dtype=Int64),
Field(name="val_to_add_2", dtype=Int64),
],
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fv, input_request],
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


# This groups features into a model version
driver_activity_v1 = FeatureService(
name="driver_activity_v1",
features=[
driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view
transformed_conv_rate, # Selects all features from the feature view
],
)
driver_activity_v2 = FeatureService(
name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate]
)

# Defines a way to push data (to be available offline, online or both) into Feast.
driver_stats_push_source = PushSource(
name="driver_stats_push_source",
batch_source=driver_stats_source,
)

# Defines a slightly modified version of the feature view from above, where the source
# has been changed to the push source. This allows fresh features to be directly pushed
# to the online store for this feature view.
driver_stats_fresh_fv = FeatureView(
name="driver_hourly_stats_fresh",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="conv_rate", dtype=Float32),
Field(name="acc_rate", dtype=Float32),
Field(name="avg_daily_trips", dtype=Int64),
],
online=True,
source=driver_stats_push_source, # Changed from above
tags={"team": "driver_performance"},
)


# Define an on demand feature view which can generate new features based on
# existing feature views and RequestSource features
@on_demand_feature_view(
sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV
schema=[
Field(name="conv_rate_plus_val1", dtype=Float64),
Field(name="conv_rate_plus_val2", dtype=Float64),
],
)
def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame:
df = pd.DataFrame()
df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"]
df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"]
return df


driver_activity_v3 = FeatureService(
name="driver_activity_v3",
features=[driver_stats_fresh_fv, transformed_conv_rate_fresh],
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
project: offline_server
# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry)
registry: data/registry.db
# The provider primarily specifies default offline / online stores & storing the registry in a given cloud
provider: local
online_store:
type: sqlite
path: data/online_store.db
entity_key_serialization_version: 2
29 changes: 29 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from feast import utils
from feast.constants import (
DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT,
DEFAULT_OFFLINE_SERVER_PORT,
DEFAULT_REGISTRY_SERVER_PORT,
)
from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError
Expand Down Expand Up @@ -773,6 +774,34 @@ def serve_registry_command(ctx: click.Context, port: int):
store.serve_registry(port)


@cli.command("serve_offline")
@click.option(
"--host",
"-h",
type=click.STRING,
default="127.0.0.1",
show_default=True,
help="Specify a host for the server",
)
@click.option(
"--port",
"-p",
type=click.INT,
default=DEFAULT_OFFLINE_SERVER_PORT,
help="Specify a port for the server",
)
@click.pass_context
def serve_offline_command(
ctx: click.Context,
host: str,
port: int,
):
"""Start a remote server locally on a given host, port."""
store = create_feature_store(ctx)

store.serve_offline(host, port)


@cli.command("validate")
@click.option(
"--feature-service",
Expand Down
3 changes: 3 additions & 0 deletions sdk/python/feast/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
# Default registry server port
DEFAULT_REGISTRY_SERVER_PORT = 6570

# Default offline server port
DEFAULT_OFFLINE_SERVER_PORT = 8815

# Environment variable for feature server docker image tag
DOCKER_IMAGE_TAG_ENV_NAME: str = "FEAST_SERVER_DOCKER_IMAGE_TAG"

Expand Down
Loading
Loading