Skip to content

Commit

Permalink
feat: Add materialize and materialize-incremental rest endpoints (fea…
Browse files Browse the repository at this point in the history
…st-dev#3761)

* resolve feast-dev#3760

Signed-off-by: snowron <[email protected]>

* format feature_server.py

Signed-off-by: snowron <[email protected]>

---------

Signed-off-by: snowron <[email protected]>
Signed-off-by: Attila Toth <[email protected]>
  • Loading branch information
snowron authored and zseta committed Feb 7, 2024
1 parent 2c1f4b3 commit c1873c7
Showing 1 changed file with 42 additions and 1 deletion.
43 changes: 42 additions & 1 deletion sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
import json
import traceback
import warnings
from typing import List, Optional

import gunicorn.app.base
import pandas as pd
from dateutil import parser
from fastapi import FastAPI, HTTPException, Request, Response, status
from fastapi.logger import logger
from fastapi.params import Depends
from google.protobuf.json_format import MessageToDict, Parse
from pydantic import BaseModel

import feast
from feast import proto_json
from feast import proto_json, utils
from feast.data_source import PushMode
from feast.errors import PushSourceNotFoundException
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest
Expand All @@ -31,6 +33,17 @@ class PushFeaturesRequest(BaseModel):
to: str = "online"


class MaterializeRequest(BaseModel):
start_ts: str
end_ts: str
feature_views: Optional[List[str]] = None


class MaterializeIncrementalRequest(BaseModel):
end_ts: str
feature_views: Optional[List[str]] = None


def get_app(store: "feast.FeatureStore"):
proto_json.patch()

Expand Down Expand Up @@ -134,6 +147,34 @@ def write_to_online_store(body=Depends(get_body)):
def health():
return Response(status_code=status.HTTP_200_OK)

@app.post("/materialize")
def materialize(body=Depends(get_body)):
try:
request = MaterializeRequest(**json.loads(body))
store.materialize(
utils.make_tzaware(parser.parse(request.start_ts)),
utils.make_tzaware(parser.parse(request.end_ts)),
request.feature_views,
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

@app.post("/materialize-incremental")
def materialize_incremental(body=Depends(get_body)):
try:
request = MaterializeIncrementalRequest(**json.loads(body))
store.materialize_incremental(
utils.make_tzaware(parser.parse(request.end_ts)), request.feature_views
)
except Exception as e:
# Print the original exception on the server side
logger.exception(traceback.format_exc())
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

return app


Expand Down

0 comments on commit c1873c7

Please sign in to comment.