diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 2e205dee0a..af6362da3e 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -85,6 +85,7 @@ * [PostgreSQL (contrib)](reference/offline-stores/postgres.md) * [Trino (contrib)](reference/offline-stores/trino.md) * [Azure Synapse + Azure SQL (contrib)](reference/offline-stores/mssql.md) + * [Remote Offline](reference/offline-stores/remote-offline-store.md) * [Online stores](reference/online-stores/README.md) * [Overview](reference/online-stores/overview.md) * [SQLite](reference/online-stores/sqlite.md) @@ -117,6 +118,8 @@ * [Python feature server](reference/feature-servers/python-feature-server.md) * [\[Alpha\] Go feature server](reference/feature-servers/go-feature-server.md) * [\[Alpha\] AWS Lambda feature server](reference/feature-servers/alpha-aws-lambda-feature-server.md) + * [Offline Feature Server](reference/feature-servers/offline-feature-server) + * [\[Beta\] Web UI](reference/alpha-web-ui.md) * [\[Alpha\] On demand feature view](reference/alpha-on-demand-feature-view.md) * [\[Alpha\] Data quality monitoring](reference/dqm.md) diff --git a/docs/reference/feature-servers/README.md b/docs/reference/feature-servers/README.md index f9a40104c3..d5a4312f73 100644 --- a/docs/reference/feature-servers/README.md +++ b/docs/reference/feature-servers/README.md @@ -12,4 +12,8 @@ Feast users can choose to retrieve features from a feature server, as opposed to {% content-ref url="alpha-aws-lambda-feature-server.md" %} [alpha-aws-lambda-feature-server.md](alpha-aws-lambda-feature-server.md) +{% endcontent-ref %} + +{% content-ref url="offline-feature-server.md" %} +[offline-feature-server.md](offline-feature-server.md) {% endcontent-ref %} \ No newline at end of file diff --git a/docs/reference/feature-servers/offline-feature-server.md b/docs/reference/feature-servers/offline-feature-server.md new file mode 100644 index 0000000000..6c2fdf7a25 --- /dev/null +++ b/docs/reference/feature-servers/offline-feature-server.md @@ -0,0 +1,35 @@ +# Offline feature server + +## Description + +The Offline feature server is an Apache Arrow Flight Server that uses the gRPC communication protocol to exchange data. +This server wraps calls to existing offline store implementations and exposes interfaces as Arrow Flight endpoints. + +## How to configure the server + +## CLI + +There is a CLI command that starts the Offline feature server: `feast serve_offline`. By default, remote offline server uses port 8815, the port can be overridden with a `--port` flag. + +## Deploying as a service on Kubernetes + +The Offline feature server can be deployed using helm chart see this [helm chart](https://github.com/feast-dev/feast/blob/master/infra/charts/feast-feature-server). + +User need to set `feast_mode=offline`, when installing Offline feature server as shown in the helm command below: + +``` +helm install feast-offline-server feast-charts/feast-feature-server --set feast_mode=offline --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +``` + +## Server Example + +The complete example can be find under [remote-offline-store-example](../../../examples/remote-offline-store) + +## How to configure the client + +Please see the detail how to configure offline store client [remote-offline-store.md](../offline-stores/remote-offline-store.md) + +## Functionality Matrix + +The set of functionalities supported by remote offline stores is the same as those supported by offline stores with the SDK, which are described in detail [here](../offline-stores/overview.md#functionality). + diff --git a/docs/reference/offline-stores/remote-offline-store.md b/docs/reference/offline-stores/remote-offline-store.md new file mode 100644 index 0000000000..0179e0f06f --- /dev/null +++ b/docs/reference/offline-stores/remote-offline-store.md @@ -0,0 +1,28 @@ +# Remote Offline Store + +## Description + +The Remote Offline Store is an Arrow Flight client for the offline store that implements the `RemoteOfflineStore` class using the existing `OfflineStore` interface. +The client implements various methods, including `get_historical_features`, `pull_latest_from_table_or_query`, `write_logged_features`, and `offline_write_batch`. + +## How to configure the client + +User needs to create client side `feature_store.yaml` file and set the `offline_store` type `remote` and provide the server connection configuration +including adding the host and specifying the port (default is 8815) required by the Arrow Flight client to connect with the Arrow Flight server. + +{% code title="feature_store.yaml" %} +```yaml +offline_store: + type: remote + host: localhost + port: 8815 +``` +{% endcode %} + +## Client Example + +The complete example can be find under [remote-offline-store-example](../../../examples/remote-offline-store) + +## How to configure the server + +Please see the detail how to configure offline feature server [offline-feature-server.md](../feature-servers/offline-feature-server.md) \ No newline at end of file diff --git a/examples/remote-offline-store/README.md b/examples/remote-offline-store/README.md new file mode 100644 index 0000000000..c07d7f3041 --- /dev/null +++ b/examples/remote-offline-store/README.md @@ -0,0 +1,98 @@ +# Feast Remote Offline Store Server + +This example demonstrates the steps using an [Arrow Flight](https://arrow.apache.org/blog/2019/10/13/introducing-arrow-flight/) server/client as the remote Feast offline store. + +## Launch the offline server locally + +1. **Create Feast Project**: Using the `feast init` command for example the [offline_server](./offline_server) folder contains a sample Feast repository. + +2. **Start Remote Offline Server**: Use the `feast server_offline` command to start remote offline requests. This command will: + - Spin up an `Arrow Flight` server at the default port 8815. + +3. **Initialize Offline Server**: The offline server can be initialized by providing the `feature_store.yml` file via an environment variable named `FEATURE_STORE_YAML_BASE64`. A temporary directory will be created with the provided YAML file named `feature_store.yml`. + +Example + +```console +cd offline_server +feast -c feature_repo apply +``` + +```console +feast -c feature_repo serve_offline +``` + +Sample output: +```console +Serving on grpc+tcp://127.0.0.1:8815 +``` + +## Launch a remote offline client + +The [offline_client](./offline_client) folder includes a test python function that uses an offline store of type `remote`, leveraging the remote server as the +actual data provider. + + +The test class is located under [offline_client](./offline_client/) and uses a remote configuration of the offline store to delegate the actual +implementation to the offline store server: +```yaml +offline_store: + type: remote + host: localhost + port: 8815 +``` + +The test code in [test.py](./offline_client/test.py) initializes the store from the local configuration and then fetches the historical features +from the store like any other Feast client, but the actual implementation is delegated to the offline server +```py +store = FeatureStore(repo_path=".") +training_df = store.get_historical_features(entity_df, features).to_df() +``` + + +Run client +`cd offline_client; + python test.py` + +Sample output: + +```console +config.offline_store is +----- Feature schema ----- + + +RangeIndex: 3 entries, 0 to 2 +Data columns (total 10 columns): + # Column Non-Null Count Dtype +--- ------ -------------- ----- + 0 driver_id 3 non-null int64 + 1 event_timestamp 3 non-null datetime64[ns, UTC] + 2 label_driver_reported_satisfaction 3 non-null int64 + 3 val_to_add 3 non-null int64 + 4 val_to_add_2 3 non-null int64 + 5 conv_rate 3 non-null float32 + 6 acc_rate 3 non-null float32 + 7 avg_daily_trips 3 non-null int32 + 8 conv_rate_plus_val1 3 non-null float64 + 9 conv_rate_plus_val2 3 non-null float64 +dtypes: datetime64[ns, UTC](1), float32(2), float64(2), int32(1), int64(4) +memory usage: 332.0 bytes +None + +----- Features ----- + + driver_id event_timestamp label_driver_reported_satisfaction ... avg_daily_trips conv_rate_plus_val1 conv_rate_plus_val2 +0 1001 2021-04-12 10:59:42+00:00 1 ... 590 1.022378 10.022378 +1 1002 2021-04-12 08:12:10+00:00 5 ... 974 2.762213 20.762213 +2 1003 2021-04-12 16:40:26+00:00 3 ... 127 3.419828 30.419828 + +[3 rows x 10 columns] +------training_df---- + driver_id event_timestamp label_driver_reported_satisfaction ... avg_daily_trips conv_rate_plus_val1 conv_rate_plus_val2 +0 1001 2021-04-12 10:59:42+00:00 1 ... 590 1.022378 10.022378 +1 1002 2021-04-12 08:12:10+00:00 5 ... 974 2.762213 20.762213 +2 1003 2021-04-12 16:40:26+00:00 3 ... 127 3.419828 30.419828 + +[3 rows x 10 columns] +``` + diff --git a/examples/remote-offline-store/offline_client/__init__.py b/examples/remote-offline-store/offline_client/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/remote-offline-store/offline_client/feature_store.yaml b/examples/remote-offline-store/offline_client/feature_store.yaml new file mode 100644 index 0000000000..24ee5d7042 --- /dev/null +++ b/examples/remote-offline-store/offline_client/feature_store.yaml @@ -0,0 +1,10 @@ +project: offline_server +# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry) +registry: ../offline_server/feature_repo/data/registry.db +# The provider primarily specifies default offline / online stores & storing the registry in a given cloud +provider: local +offline_store: + type: remote + host: localhost + port: 8815 +entity_key_serialization_version: 2 diff --git a/examples/remote-offline-store/offline_client/test.py b/examples/remote-offline-store/offline_client/test.py new file mode 100644 index 0000000000..172ee73bf0 --- /dev/null +++ b/examples/remote-offline-store/offline_client/test.py @@ -0,0 +1,40 @@ +from datetime import datetime +from feast import FeatureStore +import pandas as pd + +entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002, 1003], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + "label_driver_reported_satisfaction": [1, 5, 3], + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } +) + +features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", +] + +store = FeatureStore(repo_path=".") + +training_df = store.get_historical_features(entity_df, features).to_df() + +print("----- Feature schema -----\n") +print(training_df.info()) + +print() +print("----- Features -----\n") +print(training_df.head()) + +print("------training_df----") + +print(training_df) diff --git a/examples/remote-offline-store/offline_server/__init__.py b/examples/remote-offline-store/offline_server/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/remote-offline-store/offline_server/feature_repo/__init__.py b/examples/remote-offline-store/offline_server/feature_repo/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/examples/remote-offline-store/offline_server/feature_repo/data/driver_stats.parquet b/examples/remote-offline-store/offline_server/feature_repo/data/driver_stats.parquet new file mode 100644 index 0000000000..19279202d8 Binary files /dev/null and b/examples/remote-offline-store/offline_server/feature_repo/data/driver_stats.parquet differ diff --git a/examples/remote-offline-store/offline_server/feature_repo/data/online_store.db b/examples/remote-offline-store/offline_server/feature_repo/data/online_store.db new file mode 100644 index 0000000000..d230f45b93 Binary files /dev/null and b/examples/remote-offline-store/offline_server/feature_repo/data/online_store.db differ diff --git a/examples/remote-offline-store/offline_server/feature_repo/example_repo.py b/examples/remote-offline-store/offline_server/feature_repo/example_repo.py new file mode 100644 index 0000000000..c06ebc788b --- /dev/null +++ b/examples/remote-offline-store/offline_server/feature_repo/example_repo.py @@ -0,0 +1,140 @@ +# This is an example feature definition file + +from datetime import timedelta + +import pandas as pd +import os + +from feast import ( + Entity, + FeatureService, + FeatureView, + Field, + FileSource, + PushSource, + RequestSource, +) +from feast.on_demand_feature_view import on_demand_feature_view +from feast.types import Float32, Float64, Int64 + +# Define an entity for the driver. You can think of an entity as a primary key used to +# fetch features. +driver = Entity(name="driver", join_keys=["driver_id"]) + +# Read data from parquet files. Parquet is convenient for local development mode. For +# production, you can use your favorite DWH, such as BigQuery. See Feast documentation +# for more info. +driver_stats_source = FileSource( + name="driver_hourly_stats_source", + path=f"{os.path.dirname(os.path.abspath(__file__))}/data/driver_stats.parquet", + timestamp_field="event_timestamp", + created_timestamp_column="created", +) + +# Our parquet files contain sample data that includes a driver_id column, timestamps and +# three feature column. Here we define a Feature View that will allow us to serve this +# data to our model online. +driver_stats_fv = FeatureView( + # The unique name of this feature view. Two feature views in a single + # project cannot have the same name + name="driver_hourly_stats", + entities=[driver], + ttl=timedelta(days=1), + # The list of features defined below act as a schema to both define features + # for both materialization of features into a store, and are used as references + # during retrieval for building a training dataset or serving features + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64, description="Average daily trips"), + ], + online=True, + source=driver_stats_source, + # Tags are user defined key/value pairs that are attached to each + # feature view + tags={"team": "driver_performance"}, +) + +# Define a request data source which encodes features / information only +# available at request time (e.g. part of the user initiated HTTP request) +input_request = RequestSource( + name="vals_to_add", + schema=[ + Field(name="val_to_add", dtype=Int64), + Field(name="val_to_add_2", dtype=Int64), + ], +) + + +# Define an on demand feature view which can generate new features based on +# existing feature views and RequestSource features +@on_demand_feature_view( + sources=[driver_stats_fv, input_request], + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), + ], +) +def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + return df + + +# This groups features into a model version +driver_activity_v1 = FeatureService( + name="driver_activity_v1", + features=[ + driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view + transformed_conv_rate, # Selects all features from the feature view + ], +) +driver_activity_v2 = FeatureService( + name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate] +) + +# Defines a way to push data (to be available offline, online or both) into Feast. +driver_stats_push_source = PushSource( + name="driver_stats_push_source", + batch_source=driver_stats_source, +) + +# Defines a slightly modified version of the feature view from above, where the source +# has been changed to the push source. This allows fresh features to be directly pushed +# to the online store for this feature view. +driver_stats_fresh_fv = FeatureView( + name="driver_hourly_stats_fresh", + entities=[driver], + ttl=timedelta(days=1), + schema=[ + Field(name="conv_rate", dtype=Float32), + Field(name="acc_rate", dtype=Float32), + Field(name="avg_daily_trips", dtype=Int64), + ], + online=True, + source=driver_stats_push_source, # Changed from above + tags={"team": "driver_performance"}, +) + + +# Define an on demand feature view which can generate new features based on +# existing feature views and RequestSource features +@on_demand_feature_view( + sources=[driver_stats_fresh_fv, input_request], # relies on fresh version of FV + schema=[ + Field(name="conv_rate_plus_val1", dtype=Float64), + Field(name="conv_rate_plus_val2", dtype=Float64), + ], +) +def transformed_conv_rate_fresh(inputs: pd.DataFrame) -> pd.DataFrame: + df = pd.DataFrame() + df["conv_rate_plus_val1"] = inputs["conv_rate"] + inputs["val_to_add"] + df["conv_rate_plus_val2"] = inputs["conv_rate"] + inputs["val_to_add_2"] + return df + + +driver_activity_v3 = FeatureService( + name="driver_activity_v3", + features=[driver_stats_fresh_fv, transformed_conv_rate_fresh], +) diff --git a/examples/remote-offline-store/offline_server/feature_repo/feature_store.yaml b/examples/remote-offline-store/offline_server/feature_repo/feature_store.yaml new file mode 100644 index 0000000000..a751706d07 --- /dev/null +++ b/examples/remote-offline-store/offline_server/feature_repo/feature_store.yaml @@ -0,0 +1,9 @@ +project: offline_server +# By default, the registry is a file (but can be turned into a more scalable SQL-backed registry) +registry: data/registry.db +# The provider primarily specifies default offline / online stores & storing the registry in a given cloud +provider: local +online_store: + type: sqlite + path: data/online_store.db +entity_key_serialization_version: 2 diff --git a/infra/charts/feast-feature-server/README.md b/infra/charts/feast-feature-server/README.md index 457aeff245..9ff5652485 100644 --- a/infra/charts/feast-feature-server/README.md +++ b/infra/charts/feast-feature-server/README.md @@ -13,9 +13,18 @@ helm repo update Install Feast Feature Server on Kubernetes -A base64 encoded version of the `feature_store.yaml` file is needed. Helm install example: +- Feast Deployment Mode: The Feast Feature Server supports multiple deployment modes using the `feast_mode` property. Supported modes are `online` (default), `offline`, `ui`, and `registry`. +Users can set the `feast_mode` based on their deployment choice. The `online` mode is the default and maintains backward compatibility with previous Feast Feature Server implementations. + +- Feature Store File: A base64 encoded version of the `feature_store.yaml` file is needed. + +Helm install examples: ``` -helm install feast-feature-server feast-charts/feast-feature-server --set feature_store_yaml_base64=$(base64 feature_store.yaml) +helm install feast-feature-server feast-charts/feast-feature-server --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-offline-server feast-charts/feast-feature-server --set feast_mode=offline --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-ui-server feast-charts/feast-feature-server --set feast_mode=ui --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-registry-server feast-charts/feast-feature-server --set feast_mode=registry --set feature_store_yaml_base64=$(base64 > feature_store.yaml) + ``` ## Tutorial @@ -26,6 +35,7 @@ See [here](https://github.com/feast-dev/feast/tree/master/examples/python-helm-d | Key | Type | Default | Description | |-----|------|---------|-------------| | affinity | object | `{}` | | +| feast_mode | string | `"online"` | Feast supported deployment modes - online (default), offline, ui and registry | | feature_store_yaml_base64 | string | `""` | [required] a base64 encoded version of feature_store.yaml | | fullnameOverride | string | `""` | | | image.pullPolicy | string | `"IfNotPresent"` | | diff --git a/infra/charts/feast-feature-server/README.md.gotmpl b/infra/charts/feast-feature-server/README.md.gotmpl index fb877208e0..be2fdae248 100644 --- a/infra/charts/feast-feature-server/README.md.gotmpl +++ b/infra/charts/feast-feature-server/README.md.gotmpl @@ -13,9 +13,18 @@ helm repo update Install Feast Feature Server on Kubernetes -A base64 encoded version of the `feature_store.yaml` file is needed. Helm install example: +- Feast Deployment Mode: The Feast Feature Server supports multiple deployment modes using the `feast_mode` property. Supported modes are `online` (default), `offline`, `ui`, and `registry`. +Users can set the `feast_mode` based on their deployment choice. The `online` mode is the default and maintains backward compatibility with previous Feast Feature Server implementations. + +- Feature Store File: A base64 encoded version of the `feature_store.yaml` file is needed. + +Helm install examples: ``` -helm install feast-feature-server feast-charts/feast-feature-server --set feature_store_yaml_base64=$(base64 feature_store.yaml) +helm install feast-feature-server feast-charts/feast-feature-server --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-offline-server feast-charts/feast-feature-server --set feast_mode=offline --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-ui-server feast-charts/feast-feature-server --set feast_mode=ui --set feature_store_yaml_base64=$(base64 > feature_store.yaml) +helm install feast-registry-server feast-charts/feast-feature-server --set feast_mode=registry --set feature_store_yaml_base64=$(base64 > feature_store.yaml) + ``` ## Tutorial diff --git a/infra/charts/feast-feature-server/templates/deployment.yaml b/infra/charts/feast-feature-server/templates/deployment.yaml index 94c56de9dd..85b323610d 100644 --- a/infra/charts/feast-feature-server/templates/deployment.yaml +++ b/infra/charts/feast-feature-server/templates/deployment.yaml @@ -33,19 +33,46 @@ spec: env: - name: FEATURE_STORE_YAML_BASE64 value: {{ .Values.feature_store_yaml_base64 }} - command: ["feast", "serve", "-h", "0.0.0.0"] + command: + {{- if eq .Values.feast_mode "offline" }} + - "feast" + - "serve_offline" + - "-h" + - "0.0.0.0" + {{- else if eq .Values.feast_mode "ui" }} + - "feast" + - "ui" + - "-h" + - "0.0.0.0" + {{- else if eq .Values.feast_mode "registry" }} + - "feast" + - "serve_registry" + {{- else }} + - "feast" + - "serve" + - "-h" + - "0.0.0.0" + {{- end }} ports: - - name: http + - name: {{ .Values.feast_mode }} + {{- if eq .Values.feast_mode "offline" }} + containerPort: 8815 + {{- else if eq .Values.feast_mode "ui" }} + containerPort: 8888 + {{- else if eq .Values.feast_mode "registry" }} + containerPort: 6570 + {{- else }} containerPort: 6566 + {{- end }} protocol: TCP livenessProbe: tcpSocket: - port: http + port: {{ .Values.feast_mode }} initialDelaySeconds: {{ .Values.livenessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.livenessProbe.periodSeconds }} readinessProbe: tcpSocket: - port: http + port: {{ .Values.feast_mode }} initialDelaySeconds: {{ .Values.readinessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.readinessProbe.periodSeconds }} resources: diff --git a/infra/charts/feast-feature-server/templates/service.yaml b/infra/charts/feast-feature-server/templates/service.yaml index db0ac8b10b..68f096264e 100644 --- a/infra/charts/feast-feature-server/templates/service.yaml +++ b/infra/charts/feast-feature-server/templates/service.yaml @@ -8,7 +8,7 @@ spec: type: {{ .Values.service.type }} ports: - port: {{ .Values.service.port }} - targetPort: http + targetPort: {{ .Values.feast_mode }} protocol: TCP name: http selector: diff --git a/infra/charts/feast-feature-server/values.yaml b/infra/charts/feast-feature-server/values.yaml index 168164ffe9..a6dd2d0f94 100644 --- a/infra/charts/feast-feature-server/values.yaml +++ b/infra/charts/feast-feature-server/values.yaml @@ -18,6 +18,9 @@ fullnameOverride: "" # feature_store_yaml_base64 -- [required] a base64 encoded version of feature_store.yaml feature_store_yaml_base64: "" +# feast_mode -- Feast supported deployment modes - online (default), offline, ui and registry +feast_mode: "online" + podAnnotations: {} podSecurityContext: {} diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index d0766b0f4a..eeffc29fab 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -27,6 +27,7 @@ from feast import utils from feast.constants import ( DEFAULT_FEATURE_TRANSFORMATION_SERVER_PORT, + DEFAULT_OFFLINE_SERVER_PORT, DEFAULT_REGISTRY_SERVER_PORT, ) from feast.errors import FeastObjectNotFoundException, FeastProviderLoginError @@ -765,6 +766,34 @@ def serve_registry_command(ctx: click.Context, port: int): store.serve_registry(port) +@cli.command("serve_offline") +@click.option( + "--host", + "-h", + type=click.STRING, + default="127.0.0.1", + show_default=True, + help="Specify a host for the server", +) +@click.option( + "--port", + "-p", + type=click.INT, + default=DEFAULT_OFFLINE_SERVER_PORT, + help="Specify a port for the server", +) +@click.pass_context +def serve_offline_command( + ctx: click.Context, + host: str, + port: int, +): + """Start a remote server locally on a given host, port.""" + store = create_feature_store(ctx) + + store.serve_offline(host, port) + + @cli.command("validate") @click.option( "--feature-service", diff --git a/sdk/python/feast/constants.py b/sdk/python/feast/constants.py index 6aad3e60bb..fa8674d91d 100644 --- a/sdk/python/feast/constants.py +++ b/sdk/python/feast/constants.py @@ -41,6 +41,9 @@ # Default registry server port DEFAULT_REGISTRY_SERVER_PORT = 6570 +# Default offline server port +DEFAULT_OFFLINE_SERVER_PORT = 8815 + # Environment variable for feature server docker image tag DOCKER_IMAGE_TAG_ENV_NAME: str = "FEAST_SERVER_DOCKER_IMAGE_TAG" diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index 577bd3fe52..716e706ebe 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -2569,6 +2569,12 @@ def serve_registry(self, port: int) -> None: registry_server.start_server(self, port) + def serve_offline(self, host: str, port: int) -> None: + """Start offline server locally on a given port.""" + from feast import offline_server + + offline_server.start_server(self, host, port) + def serve_transformations(self, port: int) -> None: """Start the feature transformation server locally on a given port.""" warnings.warn( diff --git a/sdk/python/feast/infra/offline_stores/remote.py b/sdk/python/feast/infra/offline_stores/remote.py new file mode 100644 index 0000000000..dc657017d9 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/remote.py @@ -0,0 +1,407 @@ +import json +import logging +import uuid +from datetime import datetime +from pathlib import Path +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union + +import numpy as np +import pandas as pd +import pyarrow as pa +import pyarrow.flight as fl +import pyarrow.parquet +from pydantic import StrictInt, StrictStr + +from feast import OnDemandFeatureView +from feast.data_source import DataSource +from feast.feature_logging import ( + FeatureServiceLoggingSource, + LoggingConfig, + LoggingSource, +) +from feast.feature_view import FeatureView +from feast.infra.offline_stores import offline_utils +from feast.infra.offline_stores.offline_store import ( + OfflineStore, + RetrievalJob, + RetrievalMetadata, +) +from feast.infra.registry.base_registry import BaseRegistry +from feast.repo_config import FeastConfigBaseModel, RepoConfig +from feast.saved_dataset import SavedDatasetStorage + +logger = logging.getLogger(__name__) + + +class RemoteOfflineStoreConfig(FeastConfigBaseModel): + type: Literal["remote"] = "remote" + host: StrictStr + """ str: remote offline store server port, e.g. the host URL for offline store of arrow flight server. """ + + port: Optional[StrictInt] = None + """ str: remote offline store server port.""" + + +class RemoteRetrievalJob(RetrievalJob): + def __init__( + self, + client: fl.FlightClient, + api: str, + api_parameters: Dict[str, Any], + entity_df: Union[pd.DataFrame, str] = None, + table: pa.Table = None, + metadata: Optional[RetrievalMetadata] = None, + ): + # Initialize the client connection + self.client = client + self.api = api + self.api_parameters = api_parameters + self.entity_df = entity_df + self.table = table + self._metadata = metadata + + # Invoked to realize the Pandas DataFrame + def _to_df_internal(self, timeout: Optional[int] = None) -> pd.DataFrame: + # We use arrow format because it gives better control of the table schema + return self._to_arrow_internal().to_pandas() + + # Invoked to synchronously execute the underlying query and return the result as an arrow table + # This is where do_get service is invoked + def _to_arrow_internal(self, timeout: Optional[int] = None) -> pa.Table: + return _send_retrieve_remote( + self.api, self.api_parameters, self.entity_df, self.table, self.client + ) + + @property + def on_demand_feature_views(self) -> List[OnDemandFeatureView]: + return [] + + @property + def metadata(self) -> Optional[RetrievalMetadata]: + return self._metadata + + @property + def full_feature_names(self) -> bool: + return self.api_parameters["full_feature_names"] + + def persist( + self, + storage: SavedDatasetStorage, + allow_overwrite: bool = False, + timeout: Optional[int] = None, + ): + """ + Arrow flight action is being used to perform the persist action remotely + """ + + api_parameters = { + "data_source_name": storage.to_data_source().name, + "allow_overwrite": allow_overwrite, + "timeout": timeout, + } + + # Add api parameters to command + for key, value in self.api_parameters.items(): + api_parameters[key] = value + + api_parameters["retrieve_func"] = self.api + + _call_put( + api=RemoteRetrievalJob.persist.__name__, + api_parameters=api_parameters, + client=self.client, + table=self.table, + entity_df=self.entity_df, + ) + + +class RemoteOfflineStore(OfflineStore): + @staticmethod + def get_historical_features( + config: RepoConfig, + feature_views: List[FeatureView], + feature_refs: List[str], + entity_df: Union[pd.DataFrame, str], + registry: BaseRegistry, + project: str, + full_feature_names: bool = False, + ) -> RemoteRetrievalJob: + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + feature_view_names = [fv.name for fv in feature_views] + name_aliases = [fv.projection.name_alias for fv in feature_views] + + api_parameters = { + "feature_view_names": feature_view_names, + "feature_refs": feature_refs, + "project": project, + "full_feature_names": full_feature_names, + "name_aliases": name_aliases, + } + + return RemoteRetrievalJob( + client=client, + api=OfflineStore.get_historical_features.__name__, + api_parameters=api_parameters, + entity_df=entity_df, + metadata=_create_retrieval_metadata(feature_refs, entity_df), + ) + + @staticmethod + def pull_all_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + api_parameters = { + "data_source_name": data_source.name, + "join_key_columns": join_key_columns, + "feature_name_columns": feature_name_columns, + "timestamp_field": timestamp_field, + "start_date": start_date.isoformat(), + "end_date": end_date.isoformat(), + } + + return RemoteRetrievalJob( + client=client, + api=OfflineStore.pull_all_from_table_or_query.__name__, + api_parameters=api_parameters, + ) + + @staticmethod + def pull_latest_from_table_or_query( + config: RepoConfig, + data_source: DataSource, + join_key_columns: List[str], + feature_name_columns: List[str], + timestamp_field: str, + created_timestamp_column: Optional[str], + start_date: datetime, + end_date: datetime, + ) -> RetrievalJob: + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + api_parameters = { + "data_source_name": data_source.name, + "join_key_columns": join_key_columns, + "feature_name_columns": feature_name_columns, + "timestamp_field": timestamp_field, + "created_timestamp_column": created_timestamp_column, + "start_date": start_date.isoformat(), + "end_date": end_date.isoformat(), + } + + return RemoteRetrievalJob( + client=client, + api=OfflineStore.pull_latest_from_table_or_query.__name__, + api_parameters=api_parameters, + ) + + @staticmethod + def write_logged_features( + config: RepoConfig, + data: Union[pyarrow.Table, Path], + source: LoggingSource, + logging_config: LoggingConfig, + registry: BaseRegistry, + ): + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + assert isinstance(source, FeatureServiceLoggingSource) + + if isinstance(data, Path): + data = pyarrow.parquet.read_table(data, use_threads=False, pre_buffer=False) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + api_parameters = { + "feature_service_name": source._feature_service.name, + } + + _call_put( + api=OfflineStore.write_logged_features.__name__, + api_parameters=api_parameters, + client=client, + table=data, + entity_df=None, + ) + + @staticmethod + def offline_write_batch( + config: RepoConfig, + feature_view: FeatureView, + table: pyarrow.Table, + progress: Optional[Callable[[int], Any]], + ): + assert isinstance(config.offline_store, RemoteOfflineStoreConfig) + + # Initialize the client connection + client = RemoteOfflineStore.init_client(config) + + feature_view_names = [feature_view.name] + name_aliases = [feature_view.projection.name_alias] + + api_parameters = { + "feature_view_names": feature_view_names, + "progress": progress, + "name_aliases": name_aliases, + } + + _call_put( + api=OfflineStore.offline_write_batch.__name__, + api_parameters=api_parameters, + client=client, + table=table, + entity_df=None, + ) + + @staticmethod + def init_client(config): + location = f"grpc://{config.offline_store.host}:{config.offline_store.port}" + client = fl.connect(location=location) + logger.info(f"Connecting FlightClient at {location}") + return client + + +def _create_retrieval_metadata(feature_refs: List[str], entity_df: pd.DataFrame): + entity_schema = _get_entity_schema( + entity_df=entity_df, + ) + + event_timestamp_col = offline_utils.infer_event_timestamp_from_entity_df( + entity_schema=entity_schema, + ) + + timestamp_range = _get_entity_df_event_timestamp_range( + entity_df, event_timestamp_col + ) + + return RetrievalMetadata( + features=feature_refs, + keys=list(set(entity_df.columns) - {event_timestamp_col}), + min_event_timestamp=timestamp_range[0], + max_event_timestamp=timestamp_range[1], + ) + + +def _get_entity_schema(entity_df: pd.DataFrame) -> Dict[str, np.dtype]: + return dict(zip(entity_df.columns, entity_df.dtypes)) + + +def _get_entity_df_event_timestamp_range( + entity_df: Union[pd.DataFrame, str], + entity_df_event_timestamp_col: str, +) -> Tuple[datetime, datetime]: + if not isinstance(entity_df, pd.DataFrame): + raise ValueError( + f"Please provide an entity_df of type {type(pd.DataFrame)} instead of type {type(entity_df)}" + ) + + entity_df_event_timestamp = entity_df.loc[ + :, entity_df_event_timestamp_col + ].infer_objects() + if pd.api.types.is_string_dtype(entity_df_event_timestamp): + entity_df_event_timestamp = pd.to_datetime(entity_df_event_timestamp, utc=True) + + return ( + entity_df_event_timestamp.min().to_pydatetime(), + entity_df_event_timestamp.max().to_pydatetime(), + ) + + +def _send_retrieve_remote( + api: str, + api_parameters: Dict[str, Any], + entity_df: Union[pd.DataFrame, str], + table: pa.Table, + client: fl.FlightClient, +): + command_descriptor = _call_put(api, api_parameters, client, entity_df, table) + return _call_get(client, command_descriptor) + + +def _call_get(client: fl.FlightClient, command_descriptor: fl.FlightDescriptor): + flight = client.get_flight_info(command_descriptor) + ticket = flight.endpoints[0].ticket + reader = client.do_get(ticket) + return reader.read_all() + + +def _call_put( + api: str, + api_parameters: Dict[str, Any], + client: fl.FlightClient, + entity_df: Union[pd.DataFrame, str], + table: pa.Table, +): + # Generate unique command identifier + command_id = str(uuid.uuid4()) + command = { + "command_id": command_id, + "api": api, + } + # Add api parameters to command + for key, value in api_parameters.items(): + command[key] = value + + command_descriptor = fl.FlightDescriptor.for_command( + json.dumps( + command, + ) + ) + + _put_parameters(command_descriptor, entity_df, table, client) + return command_descriptor + + +def _put_parameters( + command_descriptor: fl.FlightDescriptor, + entity_df: Union[pd.DataFrame, str], + table: pa.Table, + client: fl.FlightClient, +): + updatedTable: pa.Table + + if entity_df is not None: + updatedTable = pa.Table.from_pandas(entity_df) + elif table is not None: + updatedTable = table + else: + updatedTable = _create_empty_table() + + writer, _ = client.do_put( + command_descriptor, + updatedTable.schema, + ) + + writer.write_table(updatedTable) + writer.close() + + +def _create_empty_table(): + schema = pa.schema( + { + "key": pa.string(), + } + ) + + keys = ["mock_key"] + + table = pa.Table.from_pydict(dict(zip(schema.names, keys)), schema=schema) + + return table diff --git a/sdk/python/feast/offline_server.py b/sdk/python/feast/offline_server.py new file mode 100644 index 0000000000..718da1b109 --- /dev/null +++ b/sdk/python/feast/offline_server.py @@ -0,0 +1,332 @@ +import ast +import json +import logging +import traceback +from datetime import datetime +from typing import Any, Dict, List + +import pyarrow as pa +import pyarrow.flight as fl + +from feast import FeatureStore, FeatureView, utils +from feast.feature_logging import FeatureServiceLoggingSource +from feast.feature_view import DUMMY_ENTITY_NAME +from feast.infra.offline_stores.offline_utils import get_offline_store_from_config +from feast.saved_dataset import SavedDatasetStorage + +logger = logging.getLogger(__name__) + + +class OfflineServer(fl.FlightServerBase): + def __init__(self, store: FeatureStore, location: str, **kwargs): + super(OfflineServer, self).__init__(location, **kwargs) + self._location = location + # A dictionary of configured flights, e.g. API calls received and not yet served + self.flights: Dict[str, Any] = {} + self.store = store + self.offline_store = get_offline_store_from_config(store.config.offline_store) + + @classmethod + def descriptor_to_key(self, descriptor: fl.FlightDescriptor): + return ( + descriptor.descriptor_type.value, + descriptor.command, + tuple(descriptor.path or tuple()), + ) + + def _make_flight_info(self, key: Any, descriptor: fl.FlightDescriptor): + endpoints = [fl.FlightEndpoint(repr(key), [self._location])] + # TODO calculate actual schema from the given features + schema = pa.schema([]) + + return fl.FlightInfo(schema, descriptor, endpoints, -1, -1) + + def get_flight_info( + self, context: fl.ServerCallContext, descriptor: fl.FlightDescriptor + ): + key = OfflineServer.descriptor_to_key(descriptor) + if key in self.flights: + return self._make_flight_info(key, descriptor) + raise KeyError("Flight not found.") + + def list_flights(self, context: fl.ServerCallContext, criteria: bytes): + for key, table in self.flights.items(): + if key[1] is not None: + descriptor = fl.FlightDescriptor.for_command(key[1]) + else: + descriptor = fl.FlightDescriptor.for_path(*key[2]) + + yield self._make_flight_info(key, descriptor) + + # Expects to receive request parameters and stores them in the flights dictionary + # Indexed by the unique command + def do_put( + self, + context: fl.ServerCallContext, + descriptor: fl.FlightDescriptor, + reader: fl.MetadataRecordBatchReader, + writer: fl.FlightMetadataWriter, + ): + key = OfflineServer.descriptor_to_key(descriptor) + command = json.loads(key[1]) + if "api" in command: + data = reader.read_all() + logger.debug(f"do_put: command is{command}, data is {data}") + self.flights[key] = data + + self._call_api(command, key) + else: + logger.warning(f"No 'api' field in command: {command}") + + def _call_api(self, command: dict, key: str): + remove_data = False + try: + api = command["api"] + if api == OfflineServer.offline_write_batch.__name__: + self.offline_write_batch(command, key) + remove_data = True + elif api == OfflineServer.write_logged_features.__name__: + self.write_logged_features(command, key) + remove_data = True + elif api == OfflineServer.persist.__name__: + self.persist(command["retrieve_func"], command, key) + remove_data = True + except Exception as e: + remove_data = True + logger.exception(e) + traceback.print_exc() + raise e + finally: + if remove_data: + # Get service is consumed, so we clear the corresponding flight and data + del self.flights[key] + + def get_feature_view_by_name( + self, fv_name: str, name_alias: str, project: str + ) -> FeatureView: + """ + Retrieves a feature view by name, including all subclasses of FeatureView. + + Args: + fv_name: Name of feature view + name_alias: Alias to be applied to the projection of the registered view + project: Feast project that this feature view belongs to + + Returns: + Returns either the specified feature view, or raises an exception if + none is found + """ + try: + fv = self.store.registry.get_feature_view(name=fv_name, project=project) + if name_alias is not None: + for fs in self.store.registry.list_feature_services(project=project): + for p in fs.feature_view_projections: + if p.name_alias == name_alias: + logger.debug( + f"Found matching FeatureService {fs.name} with projection {p}" + ) + fv = fv.with_projection(p) + return fv + except Exception: + try: + return self.store.registry.get_stream_feature_view( + name=fv_name, project=project + ) + except Exception as e: + logger.error( + f"Cannot find any FeatureView by name {fv_name} in project {project}" + ) + raise e + + def list_feature_views_by_name( + self, feature_view_names: List[str], name_aliases: List[str], project: str + ) -> List[FeatureView]: + return [ + remove_dummies( + self.get_feature_view_by_name( + fv_name=fv_name, name_alias=name_aliases[index], project=project + ) + ) + for index, fv_name in enumerate(feature_view_names) + ] + + # Extracts the API parameters from the flights dictionary, delegates the execution to the FeatureStore instance + # and returns the stream of data + def do_get(self, context: fl.ServerCallContext, ticket: fl.Ticket): + key = ast.literal_eval(ticket.ticket.decode()) + if key not in self.flights: + logger.error(f"Unknown key {key}") + return None + + command = json.loads(key[1]) + api = command["api"] + logger.debug(f"get command is {command}") + logger.debug(f"requested api is {api}") + try: + if api == OfflineServer.get_historical_features.__name__: + table = self.get_historical_features(command, key).to_arrow() + elif api == OfflineServer.pull_all_from_table_or_query.__name__: + table = self.pull_all_from_table_or_query(command).to_arrow() + elif api == OfflineServer.pull_latest_from_table_or_query.__name__: + table = self.pull_latest_from_table_or_query(command).to_arrow() + else: + raise NotImplementedError + except Exception as e: + logger.exception(e) + traceback.print_exc() + raise e + + # Get service is consumed, so we clear the corresponding flight and data + del self.flights[key] + return fl.RecordBatchStream(table) + + def offline_write_batch(self, command: dict, key: str): + feature_view_names = command["feature_view_names"] + assert ( + len(feature_view_names) == 1 + ), "feature_view_names list should only have one item" + name_aliases = command["name_aliases"] + assert len(name_aliases) == 1, "name_aliases list should only have one item" + project = self.store.config.project + feature_views = self.list_feature_views_by_name( + feature_view_names=feature_view_names, + name_aliases=name_aliases, + project=project, + ) + + assert len(feature_views) == 1 + table = self.flights[key] + self.offline_store.offline_write_batch( + self.store.config, feature_views[0], table, command["progress"] + ) + + def write_logged_features(self, command: dict, key: str): + table = self.flights[key] + feature_service = self.store.get_feature_service( + command["feature_service_name"] + ) + + assert feature_service.logging_config is not None + + self.offline_store.write_logged_features( + config=self.store.config, + data=table, + source=FeatureServiceLoggingSource( + feature_service, self.store.config.project + ), + logging_config=feature_service.logging_config, + registry=self.store.registry, + ) + + def pull_all_from_table_or_query(self, command: dict): + return self.offline_store.pull_all_from_table_or_query( + self.store.config, + self.store.get_data_source(command["data_source_name"]), + command["join_key_columns"], + command["feature_name_columns"], + command["timestamp_field"], + utils.make_tzaware(datetime.fromisoformat(command["start_date"])), + utils.make_tzaware(datetime.fromisoformat(command["end_date"])), + ) + + def pull_latest_from_table_or_query(self, command: dict): + return self.offline_store.pull_latest_from_table_or_query( + self.store.config, + self.store.get_data_source(command["data_source_name"]), + command["join_key_columns"], + command["feature_name_columns"], + command["timestamp_field"], + command["created_timestamp_column"], + utils.make_tzaware(datetime.fromisoformat(command["start_date"])), + utils.make_tzaware(datetime.fromisoformat(command["end_date"])), + ) + + def list_actions(self, context): + return [ + ( + OfflineServer.offline_write_batch.__name__, + "Writes the specified arrow table to the data source underlying the specified feature view.", + ), + ( + OfflineServer.write_logged_features.__name__, + "Writes logged features to a specified destination in the offline store.", + ), + ( + OfflineServer.persist.__name__, + "Synchronously executes the underlying query and persists the result in the same offline store at the " + "specified destination.", + ), + ] + + def get_historical_features(self, command: dict, key: str): + # Extract parameters from the internal flights dictionary + entity_df_value = self.flights[key] + entity_df = pa.Table.to_pandas(entity_df_value) + feature_view_names = command["feature_view_names"] + name_aliases = command["name_aliases"] + feature_refs = command["feature_refs"] + project = command["project"] + full_feature_names = command["full_feature_names"] + feature_views = self.list_feature_views_by_name( + feature_view_names=feature_view_names, + name_aliases=name_aliases, + project=project, + ) + retJob = self.offline_store.get_historical_features( + config=self.store.config, + feature_views=feature_views, + feature_refs=feature_refs, + entity_df=entity_df, + registry=self.store.registry, + project=project, + full_feature_names=full_feature_names, + ) + return retJob + + def persist(self, retrieve_func: str, command: dict, key: str): + try: + if retrieve_func == OfflineServer.get_historical_features.__name__: + ret_job = self.get_historical_features(command, key) + elif ( + retrieve_func == OfflineServer.pull_latest_from_table_or_query.__name__ + ): + ret_job = self.pull_latest_from_table_or_query(command) + elif retrieve_func == OfflineServer.pull_all_from_table_or_query.__name__: + ret_job = self.pull_all_from_table_or_query(command) + else: + raise NotImplementedError + + data_source = self.store.get_data_source(command["data_source_name"]) + storage = SavedDatasetStorage.from_data_source(data_source) + ret_job.persist(storage, command["allow_overwrite"], command["timeout"]) + except Exception as e: + logger.exception(e) + traceback.print_exc() + raise e + + def do_action(self, context: fl.ServerCallContext, action: fl.Action): + pass + + def do_drop_dataset(self, dataset): + pass + + +def remove_dummies(fv: FeatureView) -> FeatureView: + """ + Removes dummmy IDs from FeatureView instances created with FeatureView.from_proto + """ + if DUMMY_ENTITY_NAME in fv.entities: + fv.entities = [] + fv.entity_columns = [] + return fv + + +def start_server( + store: FeatureStore, + host: str, + port: int, +): + location = "grpc+tcp://{}:{}".format(host, port) + server = OfflineServer(store, location) + logger.info(f"Offline store server serving on {location}") + server.serve() diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 6ef81794bf..b7c7b0a9d0 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -77,6 +77,7 @@ "athena": "feast.infra.offline_stores.contrib.athena_offline_store.athena.AthenaOfflineStore", "mssql": "feast.infra.offline_stores.contrib.mssql_offline_store.mssql.MsSqlServerOfflineStore", "duckdb": "feast.infra.offline_stores.duckdb.DuckDBOfflineStore", + "remote": "feast.infra.offline_stores.remote.RemoteOfflineStore", } FEATURE_SERVER_CONFIG_CLASS_FOR_TYPE = { diff --git a/sdk/python/feast/templates/local/bootstrap.py b/sdk/python/feast/templates/local/bootstrap.py index 125eb7c2e7..ee2847c19c 100644 --- a/sdk/python/feast/templates/local/bootstrap.py +++ b/sdk/python/feast/templates/local/bootstrap.py @@ -24,6 +24,7 @@ def bootstrap(): example_py_file = repo_path / "example_repo.py" replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path)) + replace_str_in_file(example_py_file, "%LOGGING_PATH%", str(data_path)) if __name__ == "__main__": diff --git a/sdk/python/feast/templates/local/feature_repo/example_repo.py b/sdk/python/feast/templates/local/feature_repo/example_repo.py index 5aed3371b1..debe9d45e9 100644 --- a/sdk/python/feast/templates/local/feature_repo/example_repo.py +++ b/sdk/python/feast/templates/local/feature_repo/example_repo.py @@ -13,6 +13,8 @@ PushSource, RequestSource, ) +from feast.feature_logging import LoggingConfig +from feast.infra.offline_stores.file_source import FileLoggingDestination from feast.on_demand_feature_view import on_demand_feature_view from feast.types import Float32, Float64, Int64 @@ -88,6 +90,9 @@ def transformed_conv_rate(inputs: pd.DataFrame) -> pd.DataFrame: driver_stats_fv[["conv_rate"]], # Sub-selects a feature from a feature view transformed_conv_rate, # Selects all features from the feature view ], + logging_config=LoggingConfig( + destination=FileLoggingDestination(path="%LOGGING_PATH%") + ), ) driver_activity_v2 = FeatureService( name="driver_activity_v2", features=[driver_stats_fv, transformed_conv_rate] diff --git a/sdk/python/tests/conftest.py b/sdk/python/tests/conftest.py index 7c875fc9bd..775db8c388 100644 --- a/sdk/python/tests/conftest.py +++ b/sdk/python/tests/conftest.py @@ -305,10 +305,7 @@ def pytest_generate_tests(metafunc: pytest.Metafunc): @pytest.fixture def feature_server_endpoint(environment): - if ( - not environment.python_feature_server - or environment.test_repo_config.provider != "local" - ): + if not environment.python_feature_server or environment.provider != "local": yield environment.feature_store.get_feature_server_endpoint() return diff --git a/sdk/python/tests/integration/feature_repos/repo_configuration.py b/sdk/python/tests/integration/feature_repos/repo_configuration.py index 4d1c63127c..be01a1e1ac 100644 --- a/sdk/python/tests/integration/feature_repos/repo_configuration.py +++ b/sdk/python/tests/integration/feature_repos/repo_configuration.py @@ -35,6 +35,7 @@ DuckDBDataSourceCreator, DuckDBDeltaDataSourceCreator, FileDataSourceCreator, + RemoteOfflineStoreDataSourceCreator, ) from tests.integration.feature_repos.universal.data_sources.redshift import ( RedshiftDataSourceCreator, @@ -121,6 +122,7 @@ ("local", FileDataSourceCreator), ("local", DuckDBDataSourceCreator), ("local", DuckDBDeltaDataSourceCreator), + ("local", RemoteOfflineStoreDataSourceCreator), ] if os.getenv("FEAST_IS_LOCAL_TEST", "False") == "True": diff --git a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py index 18094b723f..f7ab55d868 100644 --- a/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py +++ b/sdk/python/tests/integration/feature_repos/universal/data_sources/file.py @@ -1,18 +1,22 @@ +import logging import os.path import shutil +import subprocess import tempfile import uuid +from pathlib import Path from typing import Any, Dict, List, Optional import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +import yaml from minio import Minio from testcontainers.core.generic import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs from testcontainers.minio import MinioContainer -from feast import FileSource +from feast import FileSource, RepoConfig from feast.data_format import DeltaFormat, ParquetFormat from feast.data_source import DataSource from feast.feature_logging import LoggingDestination @@ -22,10 +26,15 @@ FileLoggingDestination, SavedDatasetFileStorage, ) -from feast.repo_config import FeastConfigBaseModel +from feast.infra.offline_stores.remote import RemoteOfflineStoreConfig +from feast.repo_config import FeastConfigBaseModel, RegistryConfig +from feast.wait import wait_retry_backoff # noqa: E402 from tests.integration.feature_repos.universal.data_source_creator import ( DataSourceCreator, ) +from tests.utils.http_server import check_port_open, free_port # noqa: E402 + +logger = logging.getLogger(__name__) class FileDataSourceCreator(DataSourceCreator): @@ -352,3 +361,69 @@ def create_offline_store_config(self): staging_location_endpoint_override=self.endpoint_url, ) return self.duckdb_offline_store_config + + +class RemoteOfflineStoreDataSourceCreator(FileDataSourceCreator): + def __init__(self, project_name: str, *args, **kwargs): + super().__init__(project_name) + self.server_port: int = 0 + self.proc = None + + def setup(self, registry: RegistryConfig): + parent_offline_config = super().create_offline_store_config() + config = RepoConfig( + project=self.project_name, + provider="local", + offline_store=parent_offline_config, + registry=registry.path, + entity_key_serialization_version=2, + ) + + repo_path = Path(tempfile.mkdtemp()) + with open(repo_path / "feature_store.yaml", "w") as outfile: + yaml.dump(config.dict(by_alias=True), outfile) + repo_path = str(repo_path.resolve()) + + self.server_port = free_port() + host = "0.0.0.0" + cmd = [ + "feast", + "-c" + repo_path, + "serve_offline", + "--host", + host, + "--port", + str(self.server_port), + ] + self.proc = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL + ) + + _time_out_sec: int = 60 + # Wait for server to start + wait_retry_backoff( + lambda: (None, check_port_open(host, self.server_port)), + timeout_secs=_time_out_sec, + timeout_msg=f"Unable to start the feast remote offline server in {_time_out_sec} seconds at port={self.server_port}", + ) + return "grpc+tcp://{}:{}".format(host, self.server_port) + + def create_offline_store_config(self) -> FeastConfigBaseModel: + self.remote_offline_store_config = RemoteOfflineStoreConfig( + type="remote", host="0.0.0.0", port=self.server_port + ) + return self.remote_offline_store_config + + def teardown(self): + super().teardown() + if self.proc is not None: + self.proc.kill() + + # wait server to free the port + wait_retry_backoff( + lambda: ( + None, + not check_port_open("localhost", self.server_port), + ), + timeout_secs=30, + ) diff --git a/sdk/python/tests/integration/offline_store/test_feature_logging.py b/sdk/python/tests/integration/offline_store/test_feature_logging.py index eba994544d..32f506f90b 100644 --- a/sdk/python/tests/integration/offline_store/test_feature_logging.py +++ b/sdk/python/tests/integration/offline_store/test_feature_logging.py @@ -34,8 +34,6 @@ def test_feature_service_logging(environment, universal_data_sources, pass_as_pa (_, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) - store.apply([customer(), driver(), location(), *feature_views.values()]) - feature_service = FeatureService( name="test_service", features=[ @@ -49,6 +47,17 @@ def test_feature_service_logging(environment, universal_data_sources, pass_as_pa ), ) + store.apply( + [customer(), driver(), location(), *feature_views.values()], feature_service + ) + + # Added to handle the case that the offline store is remote + store.registry.apply_feature_service(feature_service, store.config.project) + store.registry.apply_data_source( + feature_service.logging_config.destination.to_data_source(), + store.config.project, + ) + driver_df = datasets.driver_df driver_df["val_to_add"] = 50 driver_df = driver_df.join(conv_rate_plus_100(driver_df)) diff --git a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py index a6db7f2535..bfb8a56200 100644 --- a/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py +++ b/sdk/python/tests/integration/offline_store/test_universal_historical_retrieval.py @@ -19,6 +19,9 @@ construct_universal_feature_views, table_name_from_data_source, ) +from tests.integration.feature_repos.universal.data_sources.file import ( + RemoteOfflineStoreDataSourceCreator, +) from tests.integration.feature_repos.universal.data_sources.snowflake import ( SnowflakeDataSourceCreator, ) @@ -157,22 +160,25 @@ def test_historical_features_main( timestamp_precision=timedelta(milliseconds=1), ) - assert_feature_service_correctness( - store, - feature_service, - full_feature_names, - entity_df_with_request_data, - expected_df, - event_timestamp, - ) - assert_feature_service_entity_mapping_correctness( - store, - feature_service_entity_mapping, - full_feature_names, - entity_df_with_request_data, - full_expected_df, - event_timestamp, - ) + if not isinstance( + environment.data_source_creator, RemoteOfflineStoreDataSourceCreator + ): + assert_feature_service_correctness( + store, + feature_service, + full_feature_names, + entity_df_with_request_data, + expected_df, + event_timestamp, + ) + assert_feature_service_entity_mapping_correctness( + store, + feature_service_entity_mapping, + full_feature_names, + entity_df_with_request_data, + full_expected_df, + event_timestamp, + ) table_from_df_entities: pd.DataFrame = job_from_df.to_arrow().to_pandas() validate_dataframes( @@ -375,8 +381,13 @@ def test_historical_features_persisting( (entities, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) + storage = environment.data_source_creator.create_saved_dataset_destination() + store.apply([driver(), customer(), location(), *feature_views.values()]) + # Added to handle the case that the offline store is remote + store.registry.apply_data_source(storage.to_data_source(), store.config.project) + entity_df = datasets.entity_df.drop( columns=["order_id", "origin_id", "destination_id"] ) @@ -398,7 +409,7 @@ def test_historical_features_persisting( saved_dataset = store.create_saved_dataset( from_=job, name="saved_dataset", - storage=environment.data_source_creator.create_saved_dataset_destination(), + storage=storage, tags={"env": "test"}, allow_overwrite=True, ) diff --git a/sdk/python/tests/integration/offline_store/test_validation.py b/sdk/python/tests/integration/offline_store/test_validation.py index fdf182be57..1731f823c8 100644 --- a/sdk/python/tests/integration/offline_store/test_validation.py +++ b/sdk/python/tests/integration/offline_store/test_validation.py @@ -45,8 +45,13 @@ def test_historical_retrieval_with_validation(environment, universal_data_source store = environment.feature_store (entities, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) + storage = environment.data_source_creator.create_saved_dataset_destination() + store.apply([driver(), customer(), location(), *feature_views.values()]) + # Added to handle the case that the offline store is remote + store.registry.apply_data_source(storage.to_data_source(), store.config.project) + # Create two identical retrieval jobs entity_df = datasets.entity_df.drop( columns=["order_id", "origin_id", "destination_id"] @@ -64,7 +69,7 @@ def test_historical_retrieval_with_validation(environment, universal_data_source store.create_saved_dataset( from_=reference_job, name="my_training_dataset", - storage=environment.data_source_creator.create_saved_dataset_destination(), + storage=storage, allow_overwrite=True, ) saved_dataset = store.get_saved_dataset("my_training_dataset") @@ -80,9 +85,13 @@ def test_historical_retrieval_fails_on_validation(environment, universal_data_so (entities, datasets, data_sources) = universal_data_sources feature_views = construct_universal_feature_views(data_sources) + storage = environment.data_source_creator.create_saved_dataset_destination() store.apply([driver(), customer(), location(), *feature_views.values()]) + # Added to handle the case that the offline store is remote + store.registry.apply_data_source(storage.to_data_source(), store.config.project) + entity_df = datasets.entity_df.drop( columns=["order_id", "origin_id", "destination_id"] ) @@ -95,7 +104,7 @@ def test_historical_retrieval_fails_on_validation(environment, universal_data_so store.create_saved_dataset( from_=reference_job, name="my_other_dataset", - storage=environment.data_source_creator.create_saved_dataset_destination(), + storage=storage, allow_overwrite=True, ) @@ -149,10 +158,19 @@ def test_logged_features_validation(environment, universal_data_sources): ), ) + storage = environment.data_source_creator.create_saved_dataset_destination() + store.apply( [driver(), customer(), location(), feature_service, *feature_views.values()] ) + # Added to handle the case that the offline store is remote + store.registry.apply_data_source( + feature_service.logging_config.destination.to_data_source(), + store.config.project, + ) + store.registry.apply_data_source(storage.to_data_source(), store.config.project) + entity_df = datasets.entity_df.drop( columns=["order_id", "origin_id", "destination_id"] ) @@ -180,7 +198,7 @@ def test_logged_features_validation(environment, universal_data_sources): entity_df=entity_df, features=store_fs, full_feature_names=True ), name="reference_for_validating_logged_features", - storage=environment.data_source_creator.create_saved_dataset_destination(), + storage=storage, allow_overwrite=True, ) diff --git a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py index 79a3a27b67..fd50d37632 100644 --- a/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py +++ b/sdk/python/tests/unit/infra/offline_stores/test_offline_store.py @@ -29,6 +29,10 @@ RedshiftOfflineStoreConfig, RedshiftRetrievalJob, ) +from feast.infra.offline_stores.remote import ( + RemoteOfflineStoreConfig, + RemoteRetrievalJob, +) from feast.infra.offline_stores.snowflake import ( SnowflakeOfflineStoreConfig, SnowflakeRetrievalJob, @@ -104,6 +108,7 @@ def metadata(self) -> Optional[RetrievalMetadata]: PostgreSQLRetrievalJob, SparkRetrievalJob, TrinoRetrievalJob, + RemoteRetrievalJob, ] ) def retrieval_job(request, environment): @@ -203,6 +208,35 @@ def retrieval_job(request, environment): config=environment.config, full_feature_names=False, ) + elif request.param is RemoteRetrievalJob: + offline_store_config = RemoteOfflineStoreConfig( + type="remote", + host="localhost", + port=0, + ) + environment.config._offline_store = offline_store_config + + entity_df = pd.DataFrame.from_dict( + { + "id": [1], + "event_timestamp": ["datetime"], + "val_to_add": [1], + } + ) + + return RemoteRetrievalJob( + client=MagicMock(), + api_parameters={ + "str": "str", + }, + api="api", + table=pyarrow.Table.from_pandas(entity_df), + entity_df=entity_df, + metadata=RetrievalMetadata( + features=["1", "2", "3", "4"], + keys=["1", "2", "3", "4"], + ), + ) else: return request.param() diff --git a/sdk/python/tests/unit/test_offline_server.py b/sdk/python/tests/unit/test_offline_server.py new file mode 100644 index 0000000000..5991e7450d --- /dev/null +++ b/sdk/python/tests/unit/test_offline_server.py @@ -0,0 +1,250 @@ +import os +import tempfile +from datetime import datetime, timedelta + +import assertpy +import pandas as pd +import pyarrow as pa +import pyarrow.flight as flight +import pytest + +from feast import FeatureStore +from feast.feature_logging import FeatureServiceLoggingSource +from feast.infra.offline_stores.remote import ( + RemoteOfflineStore, + RemoteOfflineStoreConfig, +) +from feast.offline_server import OfflineServer +from feast.repo_config import RepoConfig +from tests.utils.cli_repo_creator import CliRunner + +PROJECT_NAME = "test_remote_offline" + + +@pytest.fixture +def empty_offline_server(environment): + store = environment.feature_store + + location = "grpc+tcp://localhost:0" + return OfflineServer(store=store, location=location) + + +@pytest.fixture +def arrow_client(empty_offline_server): + return flight.FlightClient(f"grpc://localhost:{empty_offline_server.port}") + + +def test_offline_server_is_alive(environment, empty_offline_server, arrow_client): + server = empty_offline_server + client = arrow_client + + assertpy.assert_that(server).is_not_none + assertpy.assert_that(server.port).is_not_equal_to(0) + + actions = list(client.list_actions()) + flights = list(client.list_flights()) + + assertpy.assert_that(actions).is_equal_to( + [ + ( + "offline_write_batch", + "Writes the specified arrow table to the data source underlying the specified feature view.", + ), + ( + "write_logged_features", + "Writes logged features to a specified destination in the offline store.", + ), + ( + "persist", + "Synchronously executes the underlying query and persists the result in the same offline store at the " + "specified destination.", + ), + ] + ) + assertpy.assert_that(flights).is_empty() + + +def default_store(temp_dir): + runner = CliRunner() + result = runner.run(["init", PROJECT_NAME], cwd=temp_dir) + repo_path = os.path.join(temp_dir, PROJECT_NAME, "feature_repo") + assert result.returncode == 0 + + result = runner.run(["--chdir", repo_path, "apply"], cwd=temp_dir) + assert result.returncode == 0 + + fs = FeatureStore(repo_path=repo_path) + return fs + + +def remote_feature_store(offline_server): + offline_config = RemoteOfflineStoreConfig( + type="remote", host="0.0.0.0", port=offline_server.port + ) + + registry_path = os.path.join( + str(offline_server.store.repo_path), + offline_server.store.config.registry.path, + ) + store = FeatureStore( + config=RepoConfig( + project=PROJECT_NAME, + registry=registry_path, + provider="local", + offline_store=offline_config, + entity_key_serialization_version=2, + ) + ) + return store + + +def test_remote_offline_store_apis(): + with tempfile.TemporaryDirectory() as temp_dir: + store = default_store(str(temp_dir)) + location = "grpc+tcp://localhost:0" + server = OfflineServer(store=store, location=location) + + assertpy.assert_that(server).is_not_none + assertpy.assert_that(server.port).is_not_equal_to(0) + + fs = remote_feature_store(server) + + _test_get_historical_features_returns_data(fs) + _test_get_historical_features_returns_nan(fs) + _test_offline_write_batch(str(temp_dir), fs) + _test_write_logged_features(str(temp_dir), fs) + _test_pull_latest_from_table_or_query(str(temp_dir), fs) + _test_pull_all_from_table_or_query(str(temp_dir), fs) + + +def _test_get_historical_features_returns_data(fs: FeatureStore): + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1001, 1002, 1003], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + "label_driver_reported_satisfaction": [1, 5, 3], + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } + ) + + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ] + + training_df = fs.get_historical_features(entity_df, features).to_df() + + assertpy.assert_that(training_df).is_not_none() + assertpy.assert_that(len(training_df)).is_equal_to(3) + + for index, driver_id in enumerate(entity_df["driver_id"]): + assertpy.assert_that(training_df["driver_id"][index]).is_equal_to(driver_id) + for feature in features: + column_id = feature.split(":")[1] + value = training_df[column_id][index] + assertpy.assert_that(value).is_not_nan() + + +def _test_get_historical_features_returns_nan(fs: FeatureStore): + entity_df = pd.DataFrame.from_dict( + { + "driver_id": [1, 2, 3], + "event_timestamp": [ + datetime(2021, 4, 12, 10, 59, 42), + datetime(2021, 4, 12, 8, 12, 10), + datetime(2021, 4, 12, 16, 40, 26), + ], + "label_driver_reported_satisfaction": [1, 5, 3], + "val_to_add": [1, 2, 3], + "val_to_add_2": [10, 20, 30], + } + ) + + features = [ + "driver_hourly_stats:conv_rate", + "driver_hourly_stats:acc_rate", + "driver_hourly_stats:avg_daily_trips", + "transformed_conv_rate:conv_rate_plus_val1", + "transformed_conv_rate:conv_rate_plus_val2", + ] + + training_df = fs.get_historical_features(entity_df, features).to_df() + + assertpy.assert_that(training_df).is_not_none() + assertpy.assert_that(len(training_df)).is_equal_to(3) + + for index, driver_id in enumerate(entity_df["driver_id"]): + assertpy.assert_that(training_df["driver_id"][index]).is_equal_to(driver_id) + for feature in features: + column_id = feature.split(":")[1] + value = training_df[column_id][index] + assertpy.assert_that(value).is_nan() + + +def _test_offline_write_batch(temp_dir, fs: FeatureStore): + data_file = os.path.join( + temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" + ) + data_df = pd.read_parquet(data_file) + feature_view = fs.get_feature_view("driver_hourly_stats") + + RemoteOfflineStore.offline_write_batch( + fs.config, feature_view, pa.Table.from_pandas(data_df), progress=None + ) + + +def _test_write_logged_features(temp_dir, fs: FeatureStore): + data_file = os.path.join( + temp_dir, fs.project, "feature_repo/data/driver_stats.parquet" + ) + data_df = pd.read_parquet(data_file) + feature_service = fs.get_feature_service("driver_activity_v1") + + RemoteOfflineStore.write_logged_features( + config=fs.config, + data=pa.Table.from_pandas(data_df), + source=FeatureServiceLoggingSource(feature_service, fs.config.project), + logging_config=feature_service.logging_config, + registry=fs.registry, + ) + + +def _test_pull_latest_from_table_or_query(temp_dir, fs: FeatureStore): + data_source = fs.get_data_source("driver_hourly_stats_source") + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + RemoteOfflineStore.pull_latest_from_table_or_query( + config=fs.config, + data_source=data_source, + join_key_columns=[], + feature_name_columns=[], + timestamp_field="event_timestamp", + created_timestamp_column="created", + start_date=start_date, + end_date=end_date, + ).to_df() + + +def _test_pull_all_from_table_or_query(temp_dir, fs: FeatureStore): + data_source = fs.get_data_source("driver_hourly_stats_source") + + end_date = datetime.now().replace(microsecond=0, second=0, minute=0) + start_date = end_date - timedelta(days=15) + RemoteOfflineStore.pull_all_from_table_or_query( + config=fs.config, + data_source=data_source, + join_key_columns=[], + feature_name_columns=[], + timestamp_field="event_timestamp", + start_date=start_date, + end_date=end_date, + ).to_df() diff --git a/sdk/python/tests/utils/http_server.py b/sdk/python/tests/utils/http_server.py index 47c6cb8ac1..5bb6255d72 100644 --- a/sdk/python/tests/utils/http_server.py +++ b/sdk/python/tests/utils/http_server.py @@ -3,9 +3,9 @@ def free_port(): - sock = socket.socket() - sock.bind(("", 0)) - return sock.getsockname()[1] + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + sock.bind(("", 0)) + return sock.getsockname()[1] def check_port_open(host, port) -> bool: