From 745a1b43d20c0169b675b1f28039854205fb8180 Mon Sep 17 00:00:00 2001 From: Tsotne Tabidze Date: Fri, 20 Aug 2021 18:31:46 -0700 Subject: [PATCH] Local feature server implementation (HTTP endpoint) (#1780) * Local feature server implementation (HTTP endpoint) Signed-off-by: Tsotne Tabidze * Update the request protobufs and hack json/protobuf conversion to avoid extra structure (e.g. "int64_val") in json Signed-off-by: Tsotne Tabidze * Revert update to the service Signed-off-by: Tsotne Tabidze --- protos/feast/serving/ServingService.proto | 16 ++ protos/feast/types/Value.proto | 13 ++ sdk/python/feast/cli.py | 14 ++ sdk/python/feast/feature_server.py | 59 +++++++ sdk/python/feast/feature_store.py | 7 +- sdk/python/feast/proto_json.py | 196 ++++++++++++++++++++++ sdk/python/feast/type_map.py | 10 +- sdk/python/setup.py | 2 + sdk/python/tests/unit/test_proto_json.py | 115 +++++++++++++ 9 files changed, 427 insertions(+), 5 deletions(-) create mode 100644 sdk/python/feast/feature_server.py create mode 100644 sdk/python/feast/proto_json.py create mode 100644 sdk/python/tests/unit/test_proto_json.py diff --git a/protos/feast/serving/ServingService.proto b/protos/feast/serving/ServingService.proto index 5ed7c0c55d..577b4cf9ce 100644 --- a/protos/feast/serving/ServingService.proto +++ b/protos/feast/serving/ServingService.proto @@ -81,6 +81,22 @@ message GetOnlineFeaturesRequestV2 { } } +// In JSON "val" field can be omitted +message FeatureList { + repeated string val = 1; +} + +message GetOnlineFeaturesRequest { + oneof kind { + string feature_service = 1; + FeatureList features = 2; + } + // The entity data is specified in a columnar format + // A map of entity name -> list of values + map entities = 3; + bool full_feature_names = 4; +} + message GetOnlineFeaturesResponse { // Feature values retrieved from feast. repeated FieldValues field_values = 1; diff --git a/protos/feast/types/Value.proto b/protos/feast/types/Value.proto index 23d03e651b..b00d4d9b84 100644 --- a/protos/feast/types/Value.proto +++ b/protos/feast/types/Value.proto @@ -41,12 +41,14 @@ message ValueType { FLOAT_LIST = 16; BOOL_LIST = 17; UNIX_TIMESTAMP_LIST = 18; + NULL = 19; } } message Value { // ValueType is referenced by the metadata types, FeatureInfo and EntityInfo. // The enum values do not have to match the oneof val field ids, but they should. + // In JSON "*_val" field can be omitted oneof val { bytes bytes_val = 1; string string_val = 2; @@ -64,9 +66,14 @@ message Value { FloatList float_list_val = 16; BoolList bool_list_val = 17; Int64List unix_timestamp_list_val = 18; + Null null_val = 19; } } +enum Null { + NULL = 0; +} + message BytesList { repeated bytes val = 1; } @@ -94,3 +101,9 @@ message FloatList { message BoolList { repeated bool val = 1; } + +// This is to avoid an issue of being unable to specify `repeated value` in oneofs or maps +// In JSON "val" field can be omitted +message RepeatedValue { + repeated Value val = 1; +} \ No newline at end of file diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index a7028640a4..fea1fac6ca 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -357,5 +357,19 @@ def init_command(project_directory, minimal: bool, template: str): init_repo(project_directory, template) +@cli.command("serve") +@click.option( + "--port", "-p", type=click.INT, default=6566, help="Specify a port for the server" +) +@click.pass_context +def serve_command(ctx: click.Context, port: int): + """Start a the feature consumption server locally on a given port.""" + repo = ctx.obj["CHDIR"] + cli_check_repo(repo) + store = FeatureStore(repo_path=str(repo)) + + store.serve(port) + + if __name__ == "__main__": cli() diff --git a/sdk/python/feast/feature_server.py b/sdk/python/feast/feature_server.py new file mode 100644 index 0000000000..93e90819f3 --- /dev/null +++ b/sdk/python/feast/feature_server.py @@ -0,0 +1,59 @@ +import uvicorn +from fastapi import FastAPI, HTTPException, Request +from fastapi.logger import logger +from google.protobuf.json_format import MessageToDict, Parse + +import feast +from feast import proto_json +from feast.protos.feast.serving.ServingService_pb2 import GetOnlineFeaturesRequest + + +def get_app(store: "feast.FeatureStore"): + proto_json.patch() + + app = FastAPI() + + @app.get("/get-online-features/") + async def get_online_features(request: Request): + try: + # Validate and parse the request data into GetOnlineFeaturesRequest Protobuf object + body = await request.body() + request_proto = GetOnlineFeaturesRequest() + Parse(body, request_proto) + + # Initialize parameters for FeatureStore.get_online_features(...) call + if request_proto.HasField("feature_service"): + features = store.get_feature_service(request_proto.feature_service) + else: + features = list(request_proto.features.val) + + full_feature_names = request_proto.full_feature_names + + batch_sizes = [len(v.val) for v in request_proto.entities.values()] + num_entities = batch_sizes[0] + if any(batch_size != num_entities for batch_size in batch_sizes): + raise HTTPException(status_code=500, detail="Uneven number of columns") + + entity_rows = [ + {k: v.val[idx] for k, v in request_proto.entities.items()} + for idx in range(num_entities) + ] + + response_proto = store.get_online_features( + features, entity_rows, full_feature_names=full_feature_names + ).proto + + # Convert the Protobuf object to JSON and return it + return MessageToDict(response_proto, preserving_proto_field_name=True) + except Exception as e: + # Print the original exception on the server side + logger.exception(e) + # Raise HTTPException to return the error message to the client + raise HTTPException(status_code=500, detail=str(e)) + + return app + + +def start_server(store: "feast.FeatureStore", port: int): + app = get_app(store) + uvicorn.run(app, host="127.0.0.1", port=port) diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index c43c6766ca..0f84bff7d3 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -22,7 +22,7 @@ from colorama import Fore, Style from tqdm import tqdm -from feast import utils +from feast import feature_server, utils from feast.entity import Entity from feast.errors import ( EntityNotFoundException, @@ -761,6 +761,11 @@ def get_online_features( return OnlineResponse(GetOnlineFeaturesResponse(field_values=result_rows)) + @log_exceptions_and_usage + def serve(self, port: int) -> None: + """Start the feature consumption server locally on a given port.""" + feature_server.start_server(self, port) + def _entity_row_to_key(row: GetOnlineFeaturesRequestV2.EntityRow) -> EntityKeyProto: names, values = zip(*row.fields.items()) diff --git a/sdk/python/feast/proto_json.py b/sdk/python/feast/proto_json.py new file mode 100644 index 0000000000..549d7b6d14 --- /dev/null +++ b/sdk/python/feast/proto_json.py @@ -0,0 +1,196 @@ +import uuid +from typing import Any, Callable, Type + +from google.protobuf.json_format import ( # type: ignore + _WKTJSONMETHODS, + ParseError, + _Parser, + _Printer, +) + +from feast.protos.feast.serving.ServingService_pb2 import FeatureList +from feast.protos.feast.types.Value_pb2 import RepeatedValue, Value + +ProtoMessage = Any +JsonObject = Any + + +def _patch_proto_json_encoding( + proto_type: Type[ProtoMessage], + to_json_object: Callable[[_Printer, ProtoMessage], JsonObject], + from_json_object: Callable[[_Parser, JsonObject, ProtoMessage], None], +) -> None: + """Patch Protobuf JSON Encoder / Decoder for a desired Protobuf type with to_json & from_json methods.""" + to_json_fn_name = "_" + uuid.uuid4().hex + from_json_fn_name = "_" + uuid.uuid4().hex + setattr(_Printer, to_json_fn_name, to_json_object) + setattr(_Parser, from_json_fn_name, from_json_object) + _WKTJSONMETHODS[proto_type.DESCRIPTOR.full_name] = [ + to_json_fn_name, + from_json_fn_name, + ] + + +def _patch_feast_value_json_encoding(): + """Patch Protobuf JSON Encoder / Decoder with a Feast Value type. + + This allows encoding the proto object as a native type, without the dummy structural wrapper. + + Here's a before example: + + { + "value_1": { + "int64_val": 1 + }, + "value_2": { + "double_list_val": [1.0, 2.0, 3.0] + }, + } + + And here's an after example: + + { + "value_1": 1, + "value_2": [1.0, 2.0, 3.0] + } + """ + + def to_json_object(printer: _Printer, message: ProtoMessage) -> JsonObject: + which = message.WhichOneof("val") + # If the Value message is not set treat as null_value when serialize + # to JSON. The parse back result will be different from original message. + if which is None or which == "null_val": + return None + elif "_list_" in which: + value = list(getattr(message, which).val) + else: + value = getattr(message, which) + return value + + def from_json_object( + parser: _Parser, value: JsonObject, message: ProtoMessage + ) -> None: + if value is None: + message.null_val = 0 + elif isinstance(value, bool): + message.bool_val = value + elif isinstance(value, str): + message.string_val = value + elif isinstance(value, int): + message.int64_val = value + elif isinstance(value, float): + message.double_val = value + elif isinstance(value, list): + if len(value) == 0: + # Clear will mark the struct as modified so it will be created even if there are no values + message.int64_list_val.Clear() + elif isinstance(value[0], bool): + message.bool_list_val.val.extend(value) + elif isinstance(value[0], str): + message.string_list_val.val.extend(value) + elif isinstance(value[0], (float, int, type(None))): + # Identify array as ints if all of the elements are ints + if all(isinstance(item, int) for item in value): + message.int64_list_val.val.extend(value) + # If any of the elements are floats or nulls, then parse it as a float array + else: + # Convert each null as NaN. + message.double_list_val.val.extend( + [item if item is not None else float("nan") for item in value] + ) + else: + raise ParseError( + "Value {0} has unexpected type {1}.".format( + value[0], type(value[0]) + ) + ) + else: + raise ParseError( + "Value {0} has unexpected type {1}.".format(value, type(value)) + ) + + _patch_proto_json_encoding(Value, to_json_object, from_json_object) + + +def _patch_feast_repeated_value_json_encoding(): + """Patch Protobuf JSON Encoder / Decoder with a Feast RepeatedValue type. + + This allows list of lists without dummy field name "val". + + Here's a before example: + + { + "repeated_value": [ + {"val": [1,2,3]}, + {"val": [4,5,6]} + ] + } + + And here's an after example: + + { + "repeated_value": [ + [1,2,3], + [4,5,6] + ] + } + """ + + def to_json_object(printer: _Printer, message: ProtoMessage) -> JsonObject: + return [printer._MessageToJsonObject(item) for item in message.val] + + def from_json_object( + parser: _Parser, value: JsonObject, message: ProtoMessage + ) -> None: + array = value if isinstance(value, list) else value["val"] + for item in array: + parser.ConvertMessage(item, message.val.add()) + + _patch_proto_json_encoding(RepeatedValue, to_json_object, from_json_object) + + +def _patch_feast_feature_list_json_encoding(): + """Patch Protobuf JSON Encoder / Decoder with a Feast FeatureList type. + + This allows list of lists without dummy field name "features". + + Here's a before example: + + { + "feature_list": { + "features": [ + "feature-1", + "feature-2", + "feature-3" + ] + } + } + + And here's an after example: + + { + "feature_list": [ + "feature-1", + "feature-2", + "feature-3" + ] + } + """ + + def to_json_object(printer: _Printer, message: ProtoMessage) -> JsonObject: + return list(message.val) + + def from_json_object( + parser: _Parser, value: JsonObject, message: ProtoMessage + ) -> None: + array = value if isinstance(value, list) else value["val"] + message.val.extend(array) + + _patch_proto_json_encoding(FeatureList, to_json_object, from_json_object) + + +def patch(): + """Patch Protobuf JSON Encoder / Decoder with all desired Feast types.""" + _patch_feast_value_json_encoding() + _patch_feast_repeated_value_json_encoding() + _patch_feast_feature_list_json_encoding() diff --git a/sdk/python/feast/type_map.py b/sdk/python/feast/type_map.py index 53ab1183c2..af09069407 100644 --- a/sdk/python/feast/type_map.py +++ b/sdk/python/feast/type_map.py @@ -204,7 +204,9 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: return ProtoValue( int32_list_val=Int32List( val=[ - item if type(item) is np.int32 else _type_err(item, np.int32) + item + if type(item) in [np.int32, int] + else _type_err(item, np.int32) for item in value ] ) @@ -215,7 +217,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: int64_list_val=Int64List( val=[ item - if type(item) in [np.int64, np.int32] + if type(item) in [np.int64, np.int32, int] else _type_err(item, np.int64) for item in value ] @@ -227,7 +229,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: int64_list_val=Int64List( val=[ item - if type(item) in [np.int64, np.int32] + if type(item) in [np.int64, np.int32, int] else _type_err(item, np.int64) for item in value ] @@ -283,7 +285,7 @@ def _python_value_to_proto_value(feast_value_type, value) -> ProtoValue: elif feast_value_type == ValueType.FLOAT: return ProtoValue(float_val=float(value)) elif feast_value_type == ValueType.DOUBLE: - assert type(value) is float or np.float64 + assert type(value) in [float, np.float64] return ProtoValue(double_val=value) elif feast_value_type == ValueType.STRING: return ProtoValue(string_val=str(value)) diff --git a/sdk/python/setup.py b/sdk/python/setup.py index 1b8cfc0e68..93d660238b 100644 --- a/sdk/python/setup.py +++ b/sdk/python/setup.py @@ -57,6 +57,8 @@ "tenacity>=7.*", "toml==0.10.*", "tqdm==4.*", + "fastapi>=0.68.0", + "uvicorn[standard]>=0.14.0", ] GCP_REQUIRED = [ diff --git a/sdk/python/tests/unit/test_proto_json.py b/sdk/python/tests/unit/test_proto_json.py new file mode 100644 index 0000000000..1b352ccb19 --- /dev/null +++ b/sdk/python/tests/unit/test_proto_json.py @@ -0,0 +1,115 @@ +import assertpy +import pytest +from google.protobuf.json_format import MessageToDict, Parse + +from feast import proto_json +from feast.protos.feast.serving.ServingService_pb2 import ( + FeatureList, + GetOnlineFeaturesResponse, +) +from feast.protos.feast.types.Value_pb2 import RepeatedValue + +FieldValues = GetOnlineFeaturesResponse.FieldValues + + +@pytest.fixture(scope="module") +def proto_json_patch(): + proto_json.patch() + + +def test_feast_value(proto_json_patch): + # FieldValues contains "map fields" proto field. + # We want to test that feast.types.Value can take different types in JSON + # without using additional structure (e.g. 1 instead of {int64_val: 1}). + field_values_str = """{ + "fields": { + "a": 1, + "b": 2.0, + "c": true, + "d": "foo", + "e": [1, 2, 3], + "f": [2.0, 3.0, 4.0, null], + "g": [true, false, true], + "h": ["foo", "bar", "foobar"], + "i": null + } + }""" + field_values_proto = FieldValues() + Parse(field_values_str, field_values_proto) + assertpy.assert_that(field_values_proto.fields.keys()).is_equal_to( + {"a", "b", "c", "d", "e", "f", "g", "h", "i"} + ) + assertpy.assert_that(field_values_proto.fields["a"].int64_val).is_equal_to(1) + assertpy.assert_that(field_values_proto.fields["b"].double_val).is_equal_to(2.0) + assertpy.assert_that(field_values_proto.fields["c"].bool_val).is_equal_to(True) + assertpy.assert_that(field_values_proto.fields["d"].string_val).is_equal_to("foo") + assertpy.assert_that(field_values_proto.fields["e"].int64_list_val.val).is_equal_to( + [1, 2, 3] + ) + # Can't directly check equality to [2.0, 3.0, 4.0, float("nan")], because float("nan") != float("nan") + assertpy.assert_that( + field_values_proto.fields["f"].double_list_val.val[:3] + ).is_equal_to([2.0, 3.0, 4.0]) + assertpy.assert_that(field_values_proto.fields["f"].double_list_val.val[3]).is_nan() + assertpy.assert_that(field_values_proto.fields["g"].bool_list_val.val).is_equal_to( + [True, False, True] + ) + assertpy.assert_that( + field_values_proto.fields["h"].string_list_val.val + ).is_equal_to(["foo", "bar", "foobar"]) + assertpy.assert_that(field_values_proto.fields["i"].null_val).is_equal_to(0) + + # Now convert protobuf back to json and check that + field_values_json = MessageToDict(field_values_proto) + assertpy.assert_that(field_values_json["fields"].keys()).is_equal_to( + {"a", "b", "c", "d", "e", "f", "g", "h", "i"} + ) + assertpy.assert_that(field_values_json["fields"]["a"]).is_equal_to(1) + assertpy.assert_that(field_values_json["fields"]["b"]).is_equal_to(2.0) + assertpy.assert_that(field_values_json["fields"]["c"]).is_equal_to(True) + assertpy.assert_that(field_values_json["fields"]["d"]).is_equal_to("foo") + assertpy.assert_that(field_values_json["fields"]["e"]).is_equal_to([1, 2, 3]) + # Can't directly check equality to [2.0, 3.0, 4.0, float("nan")], because float("nan") != float("nan") + assertpy.assert_that(field_values_json["fields"]["f"][:3]).is_equal_to( + [2.0, 3.0, 4.0] + ) + assertpy.assert_that(field_values_json["fields"]["f"][3]).is_nan() + assertpy.assert_that(field_values_json["fields"]["g"]).is_equal_to( + [True, False, True] + ) + assertpy.assert_that(field_values_json["fields"]["h"]).is_equal_to( + ["foo", "bar", "foobar"] + ) + assertpy.assert_that(field_values_json["fields"]["i"]).is_equal_to(None) + + +def test_feast_repeated_value(proto_json_patch): + # Make sure that RepeatedValue in JSON does not need the + # additional structure (e.g. [1,2,3] instead of {"val": [1,2,3]}) + repeated_value_str = "[1,2,3]" + repeated_value_proto = RepeatedValue() + Parse(repeated_value_str, repeated_value_proto) + assertpy.assert_that(len(repeated_value_proto.val)).is_equal_to(3) + assertpy.assert_that(repeated_value_proto.val[0].int64_val).is_equal_to(1) + assertpy.assert_that(repeated_value_proto.val[1].int64_val).is_equal_to(2) + assertpy.assert_that(repeated_value_proto.val[2].int64_val).is_equal_to(3) + # Now convert protobuf back to json and check that + repeated_value_json = MessageToDict(repeated_value_proto) + assertpy.assert_that(repeated_value_json).is_equal_to([1, 2, 3]) + + +def test_feature_list(proto_json_patch): + # Make sure that FeatureList in JSON does not need the additional structure + # (e.g. ["foo", "bar"] instead of {"val": ["foo", "bar"]}) + feature_list_str = '["feature-a", "feature-b", "feature-c"]' + feature_list_proto = FeatureList() + Parse(feature_list_str, feature_list_proto) + assertpy.assert_that(len(feature_list_proto.val)).is_equal_to(3) + assertpy.assert_that(feature_list_proto.val[0]).is_equal_to("feature-a") + assertpy.assert_that(feature_list_proto.val[1]).is_equal_to("feature-b") + assertpy.assert_that(feature_list_proto.val[2]).is_equal_to("feature-c") + # Now convert protobuf back to json and check that + feature_list_json = MessageToDict(feature_list_proto) + assertpy.assert_that(feature_list_json).is_equal_to( + ["feature-a", "feature-b", "feature-c"] + )