Skip to content

Commit

Permalink
Local feature server implementation (HTTP endpoint) (#1780)
Browse files Browse the repository at this point in the history
* Local feature server implementation (HTTP endpoint)

Signed-off-by: Tsotne Tabidze <[email protected]>

* Update the request protobufs and hack json/protobuf conversion to avoid extra structure (e.g. "int64_val") in json

Signed-off-by: Tsotne Tabidze <[email protected]>

* Revert update to the service

Signed-off-by: Tsotne Tabidze <[email protected]>
  • Loading branch information
Tsotne Tabidze authored Aug 21, 2021
1 parent 954565e commit 745a1b4
Show file tree
Hide file tree
Showing 9 changed files with 427 additions and 5 deletions.
16 changes: 16 additions & 0 deletions protos/feast/serving/ServingService.proto
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ message GetOnlineFeaturesRequestV2 {
}
}

// In JSON "val" field can be omitted
message FeatureList {
repeated string val = 1;
}

message GetOnlineFeaturesRequest {
oneof kind {
string feature_service = 1;
FeatureList features = 2;
}
// The entity data is specified in a columnar format
// A map of entity name -> list of values
map<string, feast.types.RepeatedValue> entities = 3;
bool full_feature_names = 4;
}

message GetOnlineFeaturesResponse {
// Feature values retrieved from feast.
repeated FieldValues field_values = 1;
Expand Down
13 changes: 13 additions & 0 deletions protos/feast/types/Value.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ message ValueType {
FLOAT_LIST = 16;
BOOL_LIST = 17;
UNIX_TIMESTAMP_LIST = 18;
NULL = 19;
}
}

message Value {
// ValueType is referenced by the metadata types, FeatureInfo and EntityInfo.
// The enum values do not have to match the oneof val field ids, but they should.
// In JSON "*_val" field can be omitted
oneof val {
bytes bytes_val = 1;
string string_val = 2;
Expand All @@ -64,9 +66,14 @@ message Value {
FloatList float_list_val = 16;
BoolList bool_list_val = 17;
Int64List unix_timestamp_list_val = 18;
Null null_val = 19;
}
}

enum Null {
NULL = 0;
}

message BytesList {
repeated bytes val = 1;
}
Expand Down Expand Up @@ -94,3 +101,9 @@ message FloatList {
message BoolList {
repeated bool val = 1;
}

// This is to avoid an issue of being unable to specify `repeated value` in oneofs or maps
// In JSON "val" field can be omitted
message RepeatedValue {
repeated Value val = 1;
}
14 changes: 14 additions & 0 deletions sdk/python/feast/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,5 +357,19 @@ def init_command(project_directory, minimal: bool, template: str):
init_repo(project_directory, template)


@cli.command("serve")
@click.option(
"--port", "-p", type=click.INT, default=6566, help="Specify a port for the server"
)
@click.pass_context
def serve_command(ctx: click.Context, port: int):
"""Start a the feature consumption server locally on a given port."""
repo = ctx.obj["CHDIR"]
cli_check_repo(repo)
store = FeatureStore(repo_path=str(repo))

store.serve(port)


if __name__ == "__main__":
cli()
59 changes: 59 additions & 0 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import uvicorn
from fastapi import FastAPI, HTTPException, Request
from fastapi.logger import logger
from google.protobuf.json_format import MessageToDict, Parse

import feast
from feast import proto_json
from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest


def get_app(store: "feast.FeatureStore"):
proto_json.patch()

app = FastAPI()

@app.get("/get-online-features/")
async def get_online_features(request: Request):
try:
# Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object
body = await request.body()
request_proto = GetOnlineFeaturesRequest()
Parse(body, request_proto)

# Initialize parameters for FeatureStore.get_online_features(...) call
if request_proto.HasField("feature_service"):
features = store.get_feature_service(request_proto.feature_service)
else:
features = list(request_proto.features.val)

full_feature_names = request_proto.full_feature_names

batch_sizes = [len(v.val) for v in request_proto.entities.values()]
num_entities = batch_sizes[0]
if any(batch_size != num_entities for batch_size in batch_sizes):
raise HTTPException(status_code=500, detail="Uneven number of columns")

entity_rows = [
{k: v.val[idx] for k, v in request_proto.entities.items()}
for idx in range(num_entities)
]

response_proto = store.get_online_features(
features, entity_rows, full_feature_names=full_feature_names
).proto

# Convert the Protobuf object to JSON and return it
return MessageToDict(response_proto, preserving_proto_field_name=True)
except Exception as e:
# Print the original exception on the server side
logger.exception(e)
# Raise HTTPException to return the error message to the client
raise HTTPException(status_code=500, detail=str(e))

return app


def start_server(store: "feast.FeatureStore", port: int):
app = get_app(store)
uvicorn.run(app, host="127.0.0.1", port=port)
7 changes: 6 additions & 1 deletion sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from colorama import Fore, Style
from tqdm import tqdm

from feast import utils
from feast import feature_server, utils
from feast.entity import Entity
from feast.errors import (
EntityNotFoundException,
Expand Down Expand Up @@ -761,6 +761,11 @@ def get_online_features(

return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows))

@log_exceptions_and_usage
def serve(self, port: int) -> None:
"""Start the feature consumption server locally on a given port."""
feature_server.start_server(self, port)


def _entity_row_to_key(row: GetOnlineFeaturesRequestV2.EntityRow) -> EntityKeyProto:
names, values = zip(*row.fields.items())
Expand Down
196 changes: 196 additions & 0 deletions sdk/python/feast/proto_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import uuid
from typing import Any, Callable, Type

from google.protobuf.json_format import ( # type: ignore
_WKTJSONMETHODS,
ParseError,
_Parser,
_Printer,
)

from feast.protos.feast.serving.ServingService_pb2 import FeatureList
from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value

ProtoMessage = Any
JsonObject = Any


def _patch_proto_json_encoding(
proto_type: Type[ProtoMessage],
to_json_object: Callable[[_Printer, ProtoMessage], JsonObject],
from_json_object: Callable[[_Parser, JsonObject, ProtoMessage], None],
) -> None:
"""Patch Protobuf JSON Encoder / Decoder for a desired Protobuf type with to_json & from_json methods."""
to_json_fn_name = "_" + uuid.uuid4().hex
from_json_fn_name = "_" + uuid.uuid4().hex
setattr(_Printer, to_json_fn_name, to_json_object)
setattr(_Parser, from_json_fn_name, from_json_object)
_WKTJSONMETHODS[proto_type.DESCRIPTOR.full_name] = [
to_json_fn_name,
from_json_fn_name,
]


def _patch_feast_value_json_encoding():
"""Patch Protobuf JSON Encoder / Decoder with a Feast Value type.
This allows encoding the proto object as a native type, without the dummy structural wrapper.
Here's a before example:
{
"value_1": {
"int64_val": 1
},
"value_2": {
"double_list_val": [1.0, 2.0, 3.0]
},
}
And here's an after example:
{
"value_1": 1,
"value_2": [1.0, 2.0, 3.0]
}
"""

def to_json_object(printer: _Printer, message: ProtoMessage) -> JsonObject:
which = message.WhichOneof("val")
# If the Value message is not set treat as null_value when serialize
# to JSON. The parse back result will be different from original message.
if which is None or which == "null_val":
return None
elif "_list_" in which:
value = list(getattr(message, which).val)
else:
value = getattr(message, which)
return value

def from_json_object(
parser: _Parser, value: JsonObject, message: ProtoMessage
) -> None:
if value is None:
message.null_val = 0
elif isinstance(value, bool):
message.bool_val = value
elif isinstance(value, str):
message.string_val = value
elif isinstance(value, int):
message.int64_val = value
elif isinstance(value, float):
message.double_val = value
elif isinstance(value, list):
if len(value) == 0:
# Clear will mark the struct as modified so it will be created even if there are no values
message.int64_list_val.Clear()
elif isinstance(value[0], bool):
message.bool_list_val.val.extend(value)
elif isinstance(value[0], str):
message.string_list_val.val.extend(value)
elif isinstance(value[0], (float, int, type(None))):
# Identify array as ints if all of the elements are ints
if all(isinstance(item, int) for item in value):
message.int64_list_val.val.extend(value)
# If any of the elements are floats or nulls, then parse it as a float array
else:
# Convert each null as NaN.
message.double_list_val.val.extend(
[item if item is not None else float("nan") for item in value]
)
else:
raise ParseError(
"Value {0} has unexpected type {1}.".format(
value[0], type(value[0])
)
)
else:
raise ParseError(
"Value {0} has unexpected type {1}.".format(value, type(value))
)

_patch_proto_json_encoding(Value, to_json_object, from_json_object)


def _patch_feast_repeated_value_json_encoding():
"""Patch Protobuf JSON Encoder / Decoder with a Feast RepeatedValue type.
This allows list of lists without dummy field name "val".
Here's a before example:
{
"repeated_value": [
{"val": [1,2,3]},
{"val": [4,5,6]}
]
}
And here's an after example:
{
"repeated_value": [
[1,2,3],
[4,5,6]
]
}
"""

def to_json_object(printer: _Printer, message: ProtoMessage) -> JsonObject:
return [printer._MessageToJsonObject(item) for item in message.val]

def from_json_object(
parser: _Parser, value: JsonObject, message: ProtoMessage
) -> None:
array = value if isinstance(value, list) else value["val"]
for item in array:
parser.ConvertMessage(item, message.val.add())

_patch_proto_json_encoding(RepeatedValue, to_json_object, from_json_object)


def _patch_feast_feature_list_json_encoding():
"""Patch Protobuf JSON Encoder / Decoder with a Feast FeatureList type.
This allows list of lists without dummy field name "features".
Here's a before example:
{
"feature_list": {
"features": [
"feature-1",
"feature-2",
"feature-3"
]
}
}
And here's an after example:
{
"feature_list": [
"feature-1",
"feature-2",
"feature-3"
]
}
"""

def to_json_object(printer: _Printer, message: ProtoMessage) -> JsonObject:
return list(message.val)

def from_json_object(
parser: _Parser, value: JsonObject, message: ProtoMessage
) -> None:
array = value if isinstance(value, list) else value["val"]
message.val.extend(array)

_patch_proto_json_encoding(FeatureList, to_json_object, from_json_object)


def patch():
"""Patch Protobuf JSON Encoder / Decoder with all desired Feast types."""
_patch_feast_value_json_encoding()
_patch_feast_repeated_value_json_encoding()
_patch_feast_feature_list_json_encoding()
10 changes: 6 additions & 4 deletions sdk/python/feast/type_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
return ProtoValue(
int32_list_val=Int32List(
val=[
item if type(item) is np.int32 else _type_err(item, np.int32)
item
if type(item) in [np.int32, int]
else _type_err(item, np.int32)
for item in value
]
)
Expand All @@ -215,7 +217,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
int64_list_val=Int64List(
val=[
item
if type(item) in [np.int64, np.int32]
if type(item) in [np.int64, np.int32, int]
else _type_err(item, np.int64)
for item in value
]
Expand All @@ -227,7 +229,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
int64_list_val=Int64List(
val=[
item
if type(item) in [np.int64, np.int32]
if type(item) in [np.int64, np.int32, int]
else _type_err(item, np.int64)
for item in value
]
Expand Down Expand Up @@ -283,7 +285,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue:
elif feast_value_type == ValueType.FLOAT:
return ProtoValue(float_val=float(value))
elif feast_value_type == ValueType.DOUBLE:
assert type(value) is float or np.float64
assert type(value) in [float, np.float64]
return ProtoValue(double_val=value)
elif feast_value_type == ValueType.STRING:
return ProtoValue(string_val=str(value))
Expand Down
Loading

0 comments on commit 745a1b4

Please sign in to comment.