Skip to content

Commit

Permalink
perf: Default to async endpoints, use threadpool for sync (#4647)
Browse files Browse the repository at this point in the history
  • Loading branch information
robhowley authored Oct 18, 2024
1 parent 0a2bb47 commit c1f1912
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 4 deletions.
16 changes: 12 additions & 4 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import psutil
from dateutil import parser
from fastapi import Depends, FastAPI, Request, Response, status
from fastapi.concurrency import run_in_threadpool
from fastapi.logger import logger
from fastapi.responses import JSONResponse
from google.protobuf.json_format import MessageToDict
Expand Down Expand Up @@ -112,7 +113,7 @@ async def get_body(request: Request):
"/get-online-features",
dependencies=[Depends(inject_user_details)],
)
def get_online_features(body=Depends(get_body)):
async def get_online_features(body=Depends(get_body)):
body = json.loads(body)
full_feature_names = body.get("full_feature_names", False)
entity_rows = body["entities"]
Expand Down Expand Up @@ -145,15 +146,22 @@ def get_online_features(body=Depends(get_body)):
resource=od_feature_view, actions=[AuthzedAction.READ_ONLINE]
)

response_proto = store.get_online_features(
read_params = dict(
features=features,
entity_rows=entity_rows,
full_feature_names=full_feature_names,
).proto
)

if store._get_provider().async_supported.online.read:
response = await store.get_online_features_async(**read_params)
else:
response = await run_in_threadpool(
lambda: store.get_online_features(**read_params)
)

# Convert the Protobuf object to JSON and return it
return MessageToDict(
response_proto, preserving_proto_field_name=True, float_precision=18
response.proto, preserving_proto_field_name=True, float_precision=18
)

@app.post("/push", dependencies=[Depends(inject_user_details)])
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from feast.infra.infra_object import DYNAMODB_INFRA_OBJECT_CLASS_TYPE, InfraObject
from feast.infra.online_stores.helpers import compute_entity_id
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.supported_async_methods import SupportedAsyncMethods
from feast.protos.feast.core.DynamoDBTable_pb2 import (
DynamoDBTable as DynamoDBTableProto,
)
Expand Down Expand Up @@ -88,6 +89,10 @@ class DynamoDBOnlineStore(OnlineStore):
_dynamodb_resource = None
_aioboto_session = None

@property
def async_supported(self) -> SupportedAsyncMethods:
return SupportedAsyncMethods(read=True)

def update(
self,
config: RepoConfig,
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/infra/online_stores/online_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from feast.feature_view import FeatureView
from feast.infra.infra_object import InfraObject
from feast.infra.registry.base_registry import BaseRegistry
from feast.infra.supported_async_methods import SupportedAsyncMethods
from feast.online_response import OnlineResponse
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand All @@ -36,6 +37,10 @@ class OnlineStore(ABC):
The interface that Feast uses to interact with the storage system that handles online features.
"""

@property
def async_supported(self) -> SupportedAsyncMethods:
return SupportedAsyncMethods()

@abstractmethod
def online_write_batch(
self,
Expand Down
7 changes: 7 additions & 0 deletions sdk/python/feast/infra/passthrough_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from feast.infra.online_stores.helpers import get_online_store_from_config
from feast.infra.provider import Provider
from feast.infra.registry.base_registry import BaseRegistry
from feast.infra.supported_async_methods import ProviderAsyncMethods
from feast.online_response import OnlineResponse
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto
Expand Down Expand Up @@ -79,6 +80,12 @@ def offline_store(self):
)
return self._offline_store

@property
def async_supported(self) -> ProviderAsyncMethods:
return ProviderAsyncMethods(
online=self.online_store.async_supported,
)

@property
def batch_engine(self) -> BatchMaterializationEngine:
if self._batch_engine:
Expand Down
5 changes: 5 additions & 0 deletions sdk/python/feast/infra/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from feast.infra.infra_object import Infra
from feast.infra.offline_stores.offline_store import RetrievalJob
from feast.infra.registry.base_registry import BaseRegistry
from feast.infra.supported_async_methods import ProviderAsyncMethods
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
Expand Down Expand Up @@ -55,6 +56,10 @@ class Provider(ABC):
def __init__(self, config: RepoConfig):
pass

@property
def async_supported(self) -> ProviderAsyncMethods:
return ProviderAsyncMethods()

@abstractmethod
def update_infra(
self,
Expand Down
10 changes: 10 additions & 0 deletions sdk/python/feast/infra/supported_async_methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pydantic import BaseModel, Field


class SupportedAsyncMethods(BaseModel):
read: bool = Field(default=False)
write: bool = Field(default=False)


class ProviderAsyncMethods(BaseModel):
online: SupportedAsyncMethods = Field(default_factory=SupportedAsyncMethods)

0 comments on commit c1f1912

Please sign in to comment.