diff --git a/sdk/python/feast/errors.py b/sdk/python/feast/errors.py index 2eed986d7f..d39009ae7a 100644 --- a/sdk/python/feast/errors.py +++ b/sdk/python/feast/errors.py @@ -1,4 +1,7 @@ -from typing import Any, List, Set +import importlib +import json +import logging +from typing import Any, List, Optional, Set from colorama import Fore, Style from fastapi import status as HttpStatusCode @@ -6,16 +9,61 @@ from feast.field import Field +logger = logging.getLogger(__name__) + class FeastError(Exception): pass - def rpc_status_code(self) -> GrpcStatusCode: + def grpc_status_code(self) -> GrpcStatusCode: return GrpcStatusCode.INTERNAL def http_status_code(self) -> int: return HttpStatusCode.HTTP_500_INTERNAL_SERVER_ERROR + def __str__(self) -> str: + if hasattr(self, "__overridden_message__"): + return str(getattr(self, "__overridden_message__")) + return super().__str__() + + def __repr__(self) -> str: + if hasattr(self, "__overridden_message__"): + return f"{type(self).__name__}('{getattr(self,'__overridden_message__')}')" + return super().__repr__() + + def to_error_detail(self) -> str: + """ + Returns a JSON representation of the error for serialization purposes. + + Returns: + str: a string representation of a JSON document including `module`, `class` and `message` fields. + """ + + m = { + "module": f"{type(self).__module__}", + "class": f"{type(self).__name__}", + "message": f"{str(self)}", + } + return json.dumps(m) + + @staticmethod + def from_error_detail(detail: str) -> Optional["FeastError"]: + try: + m = json.loads(detail) + if all(f in m for f in ["module", "class", "message"]): + module_name = m["module"] + class_name = m["class"] + message = m["message"] + module = importlib.import_module(module_name) + class_reference = getattr(module, class_name) + + instance = class_reference(message) + setattr(instance, "__overridden_message__", message) + return instance + except Exception as e: + logger.warning(f"Invalid error detail: {detail}: {e}") + return None + class DataSourceNotFoundException(FeastError): def __init__(self, path): @@ -41,7 +89,7 @@ def __init__(self, ds_name: str): class FeastObjectNotFoundException(FeastError): pass - def rpc_status_code(self) -> GrpcStatusCode: + def grpc_status_code(self) -> GrpcStatusCode: return GrpcStatusCode.NOT_FOUND def http_status_code(self) -> int: @@ -443,7 +491,7 @@ class FeastPermissionError(FeastError, PermissionError): def __init__(self, details: str): super().__init__(f"Permission error:\n{details}") - def rpc_status_code(self) -> GrpcStatusCode: + def grpc_status_code(self) -> GrpcStatusCode: return GrpcStatusCode.PERMISSION_DENIED def http_status_code(self) -> int: diff --git a/sdk/python/feast/grpc_error_interceptor.py b/sdk/python/feast/grpc_error_interceptor.py new file mode 100644 index 0000000000..c638d461ed --- /dev/null +++ b/sdk/python/feast/grpc_error_interceptor.py @@ -0,0 +1,48 @@ +import grpc + +from feast.errors import FeastError + + +def exception_wrapper(behavior, request, context): + try: + return behavior(request, context) + except grpc.RpcError as e: + context.abort(e.code(), e.details()) + except FeastError as e: + context.abort( + e.grpc_status_code(), + e.to_error_detail(), + ) + + +class ErrorInterceptor(grpc.ServerInterceptor): + def intercept_service(self, continuation, handler_call_details): + handler = continuation(handler_call_details) + if handler is None: + return None + + if handler.unary_unary: + return grpc.unary_unary_rpc_method_handler( + lambda req, ctx: exception_wrapper(handler.unary_unary, req, ctx), + request_deserializer=handler.request_deserializer, + response_serializer=handler.response_serializer, + ) + elif handler.unary_stream: + return grpc.unary_stream_rpc_method_handler( + lambda req, ctx: exception_wrapper(handler.unary_stream, req, ctx), + request_deserializer=handler.request_deserializer, + response_serializer=handler.response_serializer, + ) + elif handler.stream_unary: + return grpc.stream_unary_rpc_method_handler( + lambda req, ctx: exception_wrapper(handler.stream_unary, req, ctx), + request_deserializer=handler.request_deserializer, + response_serializer=handler.response_serializer, + ) + elif handler.stream_stream: + return grpc.stream_stream_rpc_method_handler( + lambda req, ctx: exception_wrapper(handler.stream_stream, req, ctx), + request_deserializer=handler.request_deserializer, + response_serializer=handler.response_serializer, + ) + return handler diff --git a/sdk/python/feast/permissions/client/grpc_client_auth_interceptor.py b/sdk/python/feast/permissions/client/grpc_client_auth_interceptor.py index 98cc445c7b..5155b80cb5 100644 --- a/sdk/python/feast/permissions/client/grpc_client_auth_interceptor.py +++ b/sdk/python/feast/permissions/client/grpc_client_auth_interceptor.py @@ -2,6 +2,7 @@ import grpc +from feast.errors import FeastError from feast.permissions.auth_model import AuthConfig from feast.permissions.client.auth_client_manager_factory import get_auth_token @@ -20,26 +21,31 @@ def __init__(self, auth_type: AuthConfig): def intercept_unary_unary( self, continuation, client_call_details, request_iterator ): - client_call_details = self._append_auth_header_metadata(client_call_details) - return continuation(client_call_details, request_iterator) + return self._handle_call(continuation, client_call_details, request_iterator) def intercept_unary_stream( self, continuation, client_call_details, request_iterator ): - client_call_details = self._append_auth_header_metadata(client_call_details) - return continuation(client_call_details, request_iterator) + return self._handle_call(continuation, client_call_details, request_iterator) def intercept_stream_unary( self, continuation, client_call_details, request_iterator ): - client_call_details = self._append_auth_header_metadata(client_call_details) - return continuation(client_call_details, request_iterator) + return self._handle_call(continuation, client_call_details, request_iterator) def intercept_stream_stream( self, continuation, client_call_details, request_iterator ): + return self._handle_call(continuation, client_call_details, request_iterator) + + def _handle_call(self, continuation, client_call_details, request_iterator): client_call_details = self._append_auth_header_metadata(client_call_details) - return continuation(client_call_details, request_iterator) + result = continuation(client_call_details, request_iterator) + if result.exception() is not None: + mapped_error = FeastError.from_error_detail(result.exception().details()) + if mapped_error is not None: + raise mapped_error + return result def _append_auth_header_metadata(self, client_call_details): logger.debug( diff --git a/sdk/python/feast/permissions/server/grpc.py b/sdk/python/feast/permissions/server/grpc.py index 3c94240869..96f2690b88 100644 --- a/sdk/python/feast/permissions/server/grpc.py +++ b/sdk/python/feast/permissions/server/grpc.py @@ -1,6 +1,5 @@ import asyncio import logging -from typing import Optional import grpc @@ -8,32 +7,11 @@ get_auth_manager, ) from feast.permissions.security_manager import get_security_manager -from feast.permissions.server.utils import ( - AuthManagerType, -) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -def grpc_interceptors( - auth_type: AuthManagerType, -) -> Optional[list[grpc.ServerInterceptor]]: - """ - A list of the authorization interceptors. - - Args: - auth_type: The type of authorization manager, from the feature store configuration. - - Returns: - list[grpc.ServerInterceptor]: Optional list of interceptors. If the authorization type is set to `NONE`, it returns `None`. - """ - if auth_type == AuthManagerType.NONE: - return None - - return [AuthInterceptor()] - - class AuthInterceptor(grpc.ServerInterceptor): def intercept_service(self, continuation, handler_call_details): sm = get_security_manager() diff --git a/sdk/python/feast/registry_server.py b/sdk/python/feast/registry_server.py index 7b779e9f9e..40475aa580 100644 --- a/sdk/python/feast/registry_server.py +++ b/sdk/python/feast/registry_server.py @@ -1,6 +1,6 @@ from concurrent import futures from datetime import datetime, timezone -from typing import Union, cast +from typing import Optional, Union, cast import grpc from google.protobuf.empty_pb2 import Empty @@ -13,6 +13,7 @@ from feast.errors import FeatureViewNotFoundException from feast.feast_object import FeastObject from feast.feature_view import FeatureView +from feast.grpc_error_interceptor import ErrorInterceptor from feast.infra.infra_object import Infra from feast.infra.registry.base_registry import BaseRegistry from feast.on_demand_feature_view import OnDemandFeatureView @@ -23,8 +24,9 @@ assert_permissions_to_update, permitted_resources, ) -from feast.permissions.server.grpc import grpc_interceptors +from feast.permissions.server.grpc import AuthInterceptor from feast.permissions.server.utils import ( + AuthManagerType, ServerType, init_auth_manager, init_security_manager, @@ -645,7 +647,7 @@ def start_server(store: FeatureStore, port: int, wait_for_termination: bool = Tr server = grpc.server( futures.ThreadPoolExecutor(max_workers=10), - interceptors=grpc_interceptors(auth_manager_type), + interceptors=_grpc_interceptors(auth_manager_type), ) RegistryServer_pb2_grpc.add_RegistryServerServicer_to_server( RegistryServer(store.registry), server @@ -668,3 +670,21 @@ def start_server(store: FeatureStore, port: int, wait_for_termination: bool = Tr server.wait_for_termination() else: return server + + +def _grpc_interceptors( + auth_type: AuthManagerType, +) -> Optional[list[grpc.ServerInterceptor]]: + """ + A list of the interceptors for the registry server. + + Args: + auth_type: The type of authorization manager, from the feature store configuration. + + Returns: + list[grpc.ServerInterceptor]: Optional list of interceptors. If the authorization type is set to `NONE`, it returns `None`. + """ + if auth_type == AuthManagerType.NONE: + return [ErrorInterceptor()] + + return [AuthInterceptor(), ErrorInterceptor()] diff --git a/sdk/python/tests/unit/permissions/auth/server/test_auth_registry_server.py b/sdk/python/tests/unit/permissions/auth/server/test_auth_registry_server.py index bc16bdac3b..9e9bc1473e 100644 --- a/sdk/python/tests/unit/permissions/auth/server/test_auth_registry_server.py +++ b/sdk/python/tests/unit/permissions/auth/server/test_auth_registry_server.py @@ -8,6 +8,11 @@ from feast import ( FeatureStore, ) +from feast.errors import ( + EntityNotFoundException, + FeastPermissionError, + FeatureViewNotFoundException, +) from feast.permissions.permission import Permission from feast.registry_server import start_server from feast.wait import wait_retry_backoff # noqa: E402 @@ -70,7 +75,9 @@ def test_registry_apis( print(f"Running for\n:{auth_config}") remote_feature_store = get_remote_registry_store(server_port, feature_store) permissions = _test_list_permissions(remote_feature_store, applied_permissions) + _test_get_entity(remote_feature_store, applied_permissions) _test_list_entities(remote_feature_store, applied_permissions) + _test_get_fv(remote_feature_store, applied_permissions) _test_list_fvs(remote_feature_store, applied_permissions) if _permissions_exist_in_permission_list( @@ -118,6 +125,20 @@ def _test_get_historical_features(client_fs: FeatureStore): assertpy.assert_that(training_df).is_not_none() +def _test_get_entity(client_fs: FeatureStore, permissions: list[Permission]): + if not _is_auth_enabled(client_fs) or _is_permission_enabled( + client_fs, permissions, read_entities_perm + ): + entity = client_fs.get_entity("driver") + assertpy.assert_that(entity).is_not_none() + assertpy.assert_that(entity.name).is_equal_to("driver") + else: + with pytest.raises(FeastPermissionError): + client_fs.get_entity("driver") + with pytest.raises(EntityNotFoundException): + client_fs.get_entity("invalid-name") + + def _test_list_entities(client_fs: FeatureStore, permissions: list[Permission]): entities = client_fs.list_entities() @@ -188,6 +209,20 @@ def _is_auth_enabled(client_fs: FeatureStore) -> bool: return client_fs.config.auth_config.type != "no_auth" +def _test_get_fv(client_fs: FeatureStore, permissions: list[Permission]): + if not _is_auth_enabled(client_fs) or _is_permission_enabled( + client_fs, permissions, read_fv_perm + ): + fv = client_fs.get_feature_view("driver_hourly_stats") + assertpy.assert_that(fv).is_not_none() + assertpy.assert_that(fv.name).is_equal_to("driver_hourly_stats") + else: + with pytest.raises(FeastPermissionError): + client_fs.get_feature_view("driver_hourly_stats") + with pytest.raises(FeatureViewNotFoundException): + client_fs.get_feature_view("invalid-name") + + def _test_list_fvs(client_fs: FeatureStore, permissions: list[Permission]): if _is_auth_enabled(client_fs) and _permissions_exist_in_permission_list( [invalid_list_entities_perm], permissions diff --git a/sdk/python/tests/unit/test_errors.py b/sdk/python/tests/unit/test_errors.py new file mode 100644 index 0000000000..b3f33690da --- /dev/null +++ b/sdk/python/tests/unit/test_errors.py @@ -0,0 +1,26 @@ +import re + +import assertpy + +import feast.errors as errors + + +def test_error_error_detail(): + e = errors.FeatureViewNotFoundException("abc") + + d = e.to_error_detail() + + assertpy.assert_that(d).is_not_none() + assertpy.assert_that(d).contains('"module": "feast.errors"') + assertpy.assert_that(d).contains('"class": "FeatureViewNotFoundException"') + assertpy.assert_that(re.search(r"abc", d)).is_true() + + converted_e = errors.FeastError.from_error_detail(d) + assertpy.assert_that(converted_e).is_not_none() + assertpy.assert_that(str(converted_e)).is_equal_to(str(e)) + assertpy.assert_that(repr(converted_e)).is_equal_to(repr(e)) + + +def test_invalid_error_error_detail(): + e = errors.FeastError.from_error_detail("invalid") + assertpy.assert_that(e).is_none()