diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 75501820ff..51edb2baa4 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -27,6 +27,7 @@ * [feature\_store.yaml](reference/feature-store-yaml.md) * [.feastignore](reference/feast-ignore.md) * [Python API reference](http://rtd.feast.dev/) +* [Telemetry](reference/telemetry.md) ## Feast on Kubernetes @@ -75,7 +76,6 @@ * [Audit Logging](feast-on-kubernetes/advanced-1/audit-logging.md) * [Security](feast-on-kubernetes/advanced-1/security.md) * [Upgrading Feast](feast-on-kubernetes/advanced-1/upgrading.md) - * [Telemetry](feast-on-kubernetes/advanced-1/telemetry.md) ## Contributing diff --git a/docs/feast-on-kubernetes/advanced-1/telemetry.md b/docs/feast-on-kubernetes/advanced-1/telemetry.md deleted file mode 100644 index c817e1ce15..0000000000 --- a/docs/feast-on-kubernetes/advanced-1/telemetry.md +++ /dev/null @@ -1,10 +0,0 @@ -# Telemetry - -## How telemetry is used - -The Feast maintainers use anonymous usage statistics to help shape the Feast roadmap. Several client methods are tracked, beginning in Feast 0.9. Users are assigned a UUID which is sent along with the name of the method, the Feast version, the OS \(using `sys.platform`\), and the current time. For more detailed information see [the source code](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/telemetry.py). - -## How to disable telemetry - -To opt out of telemetry, simply set the environment variable `FEAST_TELEMETRY` to `False` in the environment in which the Feast client is run. - diff --git a/docs/reference/telemetry.md b/docs/reference/telemetry.md new file mode 100644 index 0000000000..bd7f7e5954 --- /dev/null +++ b/docs/reference/telemetry.md @@ -0,0 +1,10 @@ +# Telemetry + +## How telemetry is used + +The Feast maintainers use anonymous usage statistics and error tracking to help shape the Feast roadmap. Several client methods are tracked, beginning in Feast 0.9. Users are assigned a UUID which is sent along with the name of the method, the Feast version, the OS \(using `sys.platform`\), and the current time. For more detailed information see [the source code](https://github.com/feast-dev/feast/blob/master/sdk/python/feast/telemetry.py). + +## How to disable telemetry + +To opt out of all telemetry, simply set the environment variable `FEAST_TELEMETRY` to `False` in the environment in which the Feast client is run. + diff --git a/sdk/python/feast/cli.py b/sdk/python/feast/cli.py index 9055ea5ba8..4f42534dc7 100644 --- a/sdk/python/feast/cli.py +++ b/sdk/python/feast/cli.py @@ -32,7 +32,6 @@ registry_dump, teardown, ) -from feast.telemetry import Telemetry _logger = logging.getLogger(__name__) DATETIME_ISO = "%Y-%m-%dT%H:%M:%s" @@ -175,8 +174,6 @@ def apply_total_command(ctx: click.Context): repo = ctx.obj["CHDIR"] cli_check_repo(repo) repo_config = load_repo_config(repo) - tele = Telemetry() - tele.log("apply") try: apply_total(repo_config, repo) except FeastProviderLoginError as e: @@ -192,8 +189,6 @@ def teardown_command(ctx: click.Context): repo = ctx.obj["CHDIR"] cli_check_repo(repo) repo_config = load_repo_config(repo) - tele = Telemetry() - tele.log("teardown") teardown(repo_config, repo) @@ -207,8 +202,6 @@ def registry_dump_command(ctx: click.Context): repo = ctx.obj["CHDIR"] cli_check_repo(repo) repo_config = load_repo_config(repo) - tele = Telemetry() - tele.log("registry-dump") registry_dump(repo_config, repo_path=repo) @@ -285,8 +278,6 @@ def init_command(project_directory, minimal: bool, template: str): if minimal: template = "minimal" - tele = Telemetry() - tele.log("init") init_repo(project_directory, template) diff --git a/sdk/python/feast/entity.py b/sdk/python/feast/entity.py index b413d8df29..e1c6f601f2 100644 --- a/sdk/python/feast/entity.py +++ b/sdk/python/feast/entity.py @@ -23,6 +23,7 @@ from feast.protos.feast.core.Entity_pb2 import Entity as EntityV2Proto from feast.protos.feast.core.Entity_pb2 import EntityMeta as EntityMetaProto from feast.protos.feast.core.Entity_pb2 import EntitySpecV2 as EntitySpecProto +from feast.telemetry import log_exceptions from feast.value_type import ValueType @@ -31,6 +32,7 @@ class Entity: Represents a collection of entities and associated metadata. """ + @log_exceptions def __init__( self, name: str, diff --git a/sdk/python/feast/feature_store.py b/sdk/python/feast/feature_store.py index d6f2da5b7b..49b2afed74 100644 --- a/sdk/python/feast/feature_store.py +++ b/sdk/python/feast/feature_store.py @@ -35,7 +35,7 @@ from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto from feast.registry import Registry from feast.repo_config import RepoConfig, load_repo_config -from feast.telemetry import Telemetry +from feast.telemetry import log_exceptions, log_exceptions_and_usage from feast.version import get_version @@ -52,6 +52,7 @@ class FeatureStore: repo_path: Path _registry: Registry + @log_exceptions def __init__( self, repo_path: Optional[str] = None, config: Optional[RepoConfig] = None, ): @@ -72,8 +73,8 @@ def __init__( repo_path=self.repo_path, cache_ttl=timedelta(seconds=registry_config.cache_ttl_seconds), ) - self._tele = Telemetry() + @log_exceptions def version(self) -> str: """Returns the version of the current Feast SDK/CLI""" @@ -87,6 +88,7 @@ def _get_provider(self) -> Provider: # TODO: Bake self.repo_path into self.config so that we dont only have one interface to paths return get_provider(self.config, self.repo_path) + @log_exceptions_and_usage def refresh_registry(self): """Fetches and caches a copy of the feature registry in memory. @@ -101,7 +103,6 @@ def refresh_registry(self): greater than 0, then once the cache becomes stale (more time than the TTL has passed), a new cache will be downloaded synchronously, which may increase latencies if the triggering method is get_online_features() """ - self._tele.log("refresh_registry") registry_config = self.config.get_registry_config() self._registry = Registry( @@ -111,6 +112,7 @@ def refresh_registry(self): ) self._registry.refresh() + @log_exceptions_and_usage def list_entities(self, allow_cache: bool = False) -> List[Entity]: """ Retrieve a list of entities from the registry @@ -121,10 +123,10 @@ def list_entities(self, allow_cache: bool = False) -> List[Entity]: Returns: List of entities """ - self._tele.log("list_entities") return self._registry.list_entities(self.project, allow_cache=allow_cache) + @log_exceptions_and_usage def list_feature_views(self) -> List[FeatureView]: """ Retrieve a list of feature views from the registry @@ -132,10 +134,10 @@ def list_feature_views(self) -> List[FeatureView]: Returns: List of feature views """ - self._tele.log("list_feature_views") return self._registry.list_feature_views(self.project) + @log_exceptions_and_usage def get_entity(self, name: str) -> Entity: """ Retrieves an entity. @@ -147,10 +149,10 @@ def get_entity(self, name: str) -> Entity: Returns either the specified entity, or raises an exception if none is found """ - self._tele.log("get_entity") return self._registry.get_entity(name, self.project) + @log_exceptions_and_usage def get_feature_view(self, name: str) -> FeatureView: """ Retrieves a feature view. @@ -162,10 +164,10 @@ def get_feature_view(self, name: str) -> FeatureView: Returns either the specified feature view, or raises an exception if none is found """ - self._tele.log("get_feature_view") return self._registry.get_feature_view(name, self.project) + @log_exceptions_and_usage def delete_feature_view(self, name: str): """ Deletes a feature view or raises an exception if not found. @@ -173,10 +175,10 @@ def delete_feature_view(self, name: str): Args: name: Name of feature view """ - self._tele.log("delete_feature_view") return self._registry.delete_feature_view(name, self.project) + @log_exceptions_and_usage def apply( self, objects: Union[Entity, FeatureView, List[Union[FeatureView, Entity]]] ): @@ -210,13 +212,13 @@ def apply( >>> fs.apply([customer_entity, customer_feature_view]) """ - self._tele.log("apply") - # TODO: Add locking # TODO: Optimize by only making a single call (read/write) if isinstance(objects, Entity) or isinstance(objects, FeatureView): objects = [objects] + assert isinstance(objects, list) + views_to_update = [] entities_to_update = [] for ob in objects: @@ -239,6 +241,7 @@ def apply( partial=True, ) + @log_exceptions_and_usage def get_historical_features( self, entity_df: Union[pd.DataFrame, str], feature_refs: List[str], ) -> RetrievalJob: @@ -279,7 +282,6 @@ def get_historical_features( >>> feature_data = job.to_df() >>> model.fit(feature_data) # insert your modeling framework here. """ - self._tele.log("get_historical_features") all_feature_views = self._registry.list_feature_views(project=self.project) try: @@ -304,6 +306,7 @@ def get_historical_features( return job + @log_exceptions_and_usage def materialize_incremental( self, end_date: datetime, feature_views: Optional[List[str]] = None, ) -> None: @@ -330,7 +333,6 @@ def materialize_incremental( >>> fs = FeatureStore(config=RepoConfig(provider="gcp", registry="gs://my-fs/", project="my_fs_proj")) >>> fs.materialize_incremental(end_date=datetime.utcnow() - timedelta(minutes=5)) """ - self._tele.log("materialize_incremental") feature_views_to_materialize = [] if feature_views is None: @@ -377,6 +379,7 @@ def tqdm_builder(length): tqdm_builder, ) + @log_exceptions_and_usage def materialize( self, start_date: datetime, @@ -408,7 +411,6 @@ def materialize( >>> start_date=datetime.utcnow() - timedelta(hours=3), end_date=datetime.utcnow() - timedelta(minutes=10) >>> ) """ - self._tele.log("materialize") if utils.make_tzaware(start_date) > utils.make_tzaware(end_date): raise ValueError( @@ -448,6 +450,7 @@ def tqdm_builder(length): tqdm_builder, ) + @log_exceptions_and_usage def get_online_features( self, feature_refs: List[str], entity_rows: List[Dict[str, Any]], ) -> OnlineResponse: @@ -484,7 +487,6 @@ def get_online_features( >>> print(online_response_dict) {'sales:daily_transactions': [1.1,1.2], 'sales:customer_id': [0,1]} """ - self._tele.log("get_online_features") provider = self._get_provider() entities = self.list_entities(allow_cache=True) diff --git a/sdk/python/feast/feature_view.py b/sdk/python/feast/feature_view.py index e0477b73f8..a28f524a08 100644 --- a/sdk/python/feast/feature_view.py +++ b/sdk/python/feast/feature_view.py @@ -32,6 +32,7 @@ from feast.protos.feast.core.FeatureView_pb2 import ( MaterializationInterval as MaterializationIntervalProto, ) +from feast.telemetry import log_exceptions from feast.value_type import ValueType @@ -52,6 +53,7 @@ class FeatureView: last_updated_timestamp: Optional[Timestamp] = None materialization_intervals: List[Tuple[datetime, datetime]] + @log_exceptions def __init__( self, name: str, diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index bb18119b42..59dd4c4059 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -5,6 +5,8 @@ from pydantic.error_wrappers import ErrorWrapper from pydantic.typing import Dict, Literal, Optional, Union +from feast.telemetry import log_exceptions + class FeastBaseModel(BaseModel): """ Feast Pydantic Configuration Class """ @@ -75,6 +77,7 @@ def get_registry_config(self): return self.registry @root_validator(pre=True) + @log_exceptions def _validate_online_store_config(cls, values): # This method will validate whether the online store configurations are set correctly. This explicit validation # is necessary because Pydantic Unions throw very verbose and cryptic exceptions. We also use this method to diff --git a/sdk/python/feast/repo_operations.py b/sdk/python/feast/repo_operations.py index 552d3a4bc3..1c6e76c8d2 100644 --- a/sdk/python/feast/repo_operations.py +++ b/sdk/python/feast/repo_operations.py @@ -15,6 +15,7 @@ from feast.names import adjectives, animals from feast.registry import Registry from feast.repo_config import RepoConfig +from feast.telemetry import log_exceptions_and_usage def py_path_to_module(path: Path, repo_root: Path) -> str: @@ -103,6 +104,7 @@ def parse_repo(repo_root: Path) -> ParsedRepo: return res +@log_exceptions_and_usage def apply_total(repo_config: RepoConfig, repo_path: Path): from colorama import Fore, Style @@ -209,6 +211,7 @@ def apply_total(repo_config: RepoConfig, repo_path: Path): ) +@log_exceptions_and_usage def teardown(repo_config: RepoConfig, repo_path: Path): registry_config = repo_config.get_registry_config() registry = Registry( @@ -229,6 +232,7 @@ def teardown(repo_config: RepoConfig, repo_path: Path): ) +@log_exceptions_and_usage def registry_dump(repo_config: RepoConfig, repo_path: Path): """ For debugging only: output contents of the metadata registry """ registry_config = repo_config.get_registry_config() @@ -255,6 +259,7 @@ def cli_check_repo(repo_path: Path): sys.exit(1) +@log_exceptions_and_usage def init_repo(repo_name: str, template: str): import os from distutils.dir_util import copy_tree diff --git a/sdk/python/feast/telemetry.py b/sdk/python/feast/telemetry.py index 2f7f5b5f46..f483e69b69 100644 --- a/sdk/python/feast/telemetry.py +++ b/sdk/python/feast/telemetry.py @@ -16,8 +16,10 @@ import sys import uuid from datetime import datetime +from functools import wraps from os.path import expanduser, join from pathlib import Path +from typing import List, Tuple import requests @@ -30,29 +32,38 @@ class Telemetry: def __init__(self): - feast_home_dir = join(expanduser("~"), ".feast") - Path(feast_home_dir).mkdir(exist_ok=True) - telemetry_filepath = join(feast_home_dir, "telemetry") - self._telemetry_enabled = ( + self._telemetry_enabled = False + self.check_env_and_configure() + + def check_env_and_configure(self): + telemetry_enabled = ( os.getenv("FEAST_TELEMETRY", default="True") == "True" ) # written this way to turn the env var string into a boolean - self._is_test = os.getenv("FEAST_IS_TELEMETRY_TEST", "False") == "True" - - if self._telemetry_enabled: - self._telemetry_counter = {"get_online_features": 0} - if os.path.exists(telemetry_filepath): - with open(telemetry_filepath, "r") as f: - self._telemetry_id = f.read() - else: - self._telemetry_id = str(uuid.uuid4()) - - with open(telemetry_filepath, "w") as f: - f.write(self._telemetry_id) - print( - "Feast is an open source project that collects anonymized usage statistics. To opt out or learn" - " more see https://docs.feast.dev/v/master/feast-on-kubernetes/advanced-1/telemetry" - ) + # Check if it changed + if telemetry_enabled != self._telemetry_enabled: + self._telemetry_enabled = telemetry_enabled + + if self._telemetry_enabled: + feast_home_dir = join(expanduser("~"), ".feast") + Path(feast_home_dir).mkdir(exist_ok=True) + telemetry_filepath = join(feast_home_dir, "telemetry") + + self._is_test = os.getenv("FEAST_IS_TELEMETRY_TEST", "False") == "True" + self._telemetry_counter = {"get_online_features": 0} + + if os.path.exists(telemetry_filepath): + with open(telemetry_filepath, "r") as f: + self._telemetry_id = f.read() + else: + self._telemetry_id = str(uuid.uuid4()) + + with open(telemetry_filepath, "w") as f: + f.write(self._telemetry_id) + print( + "Feast is an open source project that collects anonymized error reporting and usage statistics. To opt out or learn" + " more see https://docs.feast.dev/reference/telemetry" + ) @property def telemetry_id(self): @@ -61,6 +72,7 @@ def telemetry_id(self): return self._telemetry_id def log(self, function_name: str): + self.check_env_and_configure() if self._telemetry_enabled and self.telemetry_id: if function_name == "get_online_features": @@ -84,3 +96,83 @@ def log(self, function_name: str): else: pass return + + def log_exception(self, error_type: str, traceback: List[Tuple[str, int, str]]): + self.check_env_and_configure() + + if self._telemetry_enabled and self.telemetry_id: + json = { + "error_type": error_type, + "traceback": traceback, + "telemetry_id": self.telemetry_id, + "version": get_version(), + "os": sys.platform, + "is_test": self._is_test, + } + try: + requests.post(TELEMETRY_ENDPOINT, json=json) + except Exception as e: + if self._is_test: + raise e + else: + pass + return + + +def log_exceptions(func): + @wraps(func) + def exception_logging_wrapper(*args, **kwargs): + try: + result = func(*args, **kwargs) + except Exception as e: + error_type = type(e).__name__ + trace_to_log = [] + tb = e.__traceback__ + while tb is not None: + trace_to_log.append( + ( + _trim_filename(tb.tb_frame.f_code.co_filename), + tb.tb_lineno, + tb.tb_frame.f_code.co_name, + ) + ) + tb = tb.tb_next + tele.log_exception(error_type, trace_to_log) + raise + return result + + return exception_logging_wrapper + + +def log_exceptions_and_usage(func): + @wraps(func) + def exception_logging_wrapper(*args, **kwargs): + try: + result = func(*args, **kwargs) + tele.log(func.__name__) + except Exception as e: + error_type = type(e).__name__ + trace_to_log = [] + tb = e.__traceback__ + while tb is not None: + trace_to_log.append( + ( + _trim_filename(tb.tb_frame.f_code.co_filename), + tb.tb_lineno, + tb.tb_frame.f_code.co_name, + ) + ) + tb = tb.tb_next + tele.log_exception(error_type, trace_to_log) + raise + return result + + return exception_logging_wrapper + + +def _trim_filename(filename: str) -> str: + return filename.split("/")[-1] + + +# Single global telemetry object +tele = Telemetry() diff --git a/sdk/python/telemetry_tests/test_telemetry.py b/sdk/python/telemetry_tests/test_telemetry.py index 44c2c16e13..be6f2cb621 100644 --- a/sdk/python/telemetry_tests/test_telemetry.py +++ b/sdk/python/telemetry_tests/test_telemetry.py @@ -151,6 +151,42 @@ def test_telemetry_off(): assert rows.total_rows == 0 +def test_exception_telemetry_on(): + old_environ = dict(os.environ) + test_telemetry_id = str(uuid.uuid4()) + os.environ["FEAST_FORCE_TELEMETRY_UUID"] = test_telemetry_id + os.environ["FEAST_IS_TELEMETRY_TEST"] = "True" + os.environ["FEAST_TELEMETRY"] = "True" + + try: + test_feature_store = FeatureStore("/tmp/non_existent_directory") + except: + pass + + os.environ.clear() + os.environ.update(old_environ) + ensure_bigquery_telemetry_id_with_retry(test_telemetry_id) + + +def test_exception_telemetry_off(): + old_environ = dict(os.environ) + test_telemetry_id = str(uuid.uuid4()) + os.environ["FEAST_IS_TELEMETRY_TEST"] = "True" + os.environ["FEAST_TELEMETRY"] = "False" + os.environ["FEAST_FORCE_TELEMETRY_UUID"] = test_telemetry_id + + try: + test_feature_store = FeatureStore("/tmp/non_existent_directory") + except: + pass + + os.environ.clear() + os.environ.update(old_environ) + sleep(30) + rows = read_bigquery_telemetry_id(test_telemetry_id) + assert rows.total_rows == 0 + + @retry(wait=wait_exponential(multiplier=1, min=1, max=10), stop=stop_after_attempt(5)) def ensure_bigquery_telemetry_id_with_retry(telemetry_id): rows = read_bigquery_telemetry_id(telemetry_id)