diff --git a/src/deadline/client/api/__init__.py b/src/deadline/client/api/__init__.py index d7c5a011..64e81dad 100644 --- a/src/deadline/client/api/__init__.py +++ b/src/deadline/client/api/__init__.py @@ -21,6 +21,7 @@ "get_queue_user_boto3_session", "get_queue_parameter_definitions", "get_telemetry_client", + "get_deadline_cloud_library_telemetry_client", ] from configparser import ConfigParser @@ -46,7 +47,11 @@ ) from ._queue_parameters import get_queue_parameter_definitions from ._submit_job_bundle import create_job_from_job_bundle, wait_for_create_job_to_complete -from ._telemetry import get_telemetry_client, TelemetryClient +from ._telemetry import ( + get_telemetry_client, + get_deadline_cloud_library_telemetry_client, + TelemetryClient, +) logger = getLogger(__name__) diff --git a/src/deadline/client/api/_submit_job_bundle.py b/src/deadline/client/api/_submit_job_bundle.py index 01f7236f..addc81dc 100644 --- a/src/deadline/client/api/_submit_job_bundle.py +++ b/src/deadline/client/api/_submit_job_bundle.py @@ -271,7 +271,9 @@ def _default_update_hash_progress(hashing_metadata: Dict[str, str]) -> bool: hash_cache_dir=os.path.expanduser(os.path.join("~", ".deadline", "cache")), on_preparing_to_submit=hashing_progress_callback, ) - api.get_telemetry_client(config=config).record_hashing_summary(hashing_summary) + api.get_deadline_cloud_library_telemetry_client(config=config).record_hashing_summary( + hashing_summary + ) return manifests @@ -296,6 +298,8 @@ def _default_update_upload_progress(upload_metadata: Dict[str, str]) -> bool: upload_summary, attachment_settings = asset_manager.upload_assets( manifests, upload_progress_callback ) - api.get_telemetry_client(config=config).record_upload_summary(upload_summary) + api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary( + upload_summary + ) return attachment_settings.to_dict() diff --git a/src/deadline/client/api/_telemetry.py b/src/deadline/client/api/_telemetry.py index 0dfd26a8..7528a1b9 100644 --- a/src/deadline/client/api/_telemetry.py +++ b/src/deadline/client/api/_telemetry.py @@ -5,11 +5,13 @@ import logging import platform import uuid +import random +import time from configparser import ConfigParser from dataclasses import asdict, dataclass, field from datetime import datetime -from queue import Queue +from queue import Queue, Full from threading import Thread from typing import Any, Dict, Optional from urllib import request, error @@ -29,8 +31,8 @@ class TelemetryEvent: """Base class for telemetry events""" - event_type: str = "com.amazon.rum.uncategorized" - event_body: Dict[str, Any] = field(default_factory=dict) + event_type: str = "com.amazon.rum.deadline.uncategorized" + event_details: Dict[str, Any] = field(default_factory=dict) class TelemetryClient: @@ -54,22 +56,33 @@ class TelemetryClient: 'deadline config set "telemetry.opt_out" true' """ - def __init__(self, config: Optional[ConfigParser] = None): + # Used for backing off requests if we encounter errors from the service. + # See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ + MAX_QUEUE_SIZE = 25 + BASE_TIME = 0.5 + MAX_BACKOFF_SECONDS = 10 # The maximum amount of time to wait between retries + MAX_RETRY_ATTEMPTS = 4 + + def __init__( + self, + package_name: str, + package_ver: str, + config: Optional[ConfigParser] = None, + ): self.telemetry_opted_out = config_file.str2bool( config_file.get_setting("telemetry.opt_out", config=config) ) if self.telemetry_opted_out: return + self.package_name = package_name + self.package_ver = ".".join(package_ver.split(".")[:3]) self.endpoint: str = f"{config_file.get_setting('settings.deadline_endpoint_url', config=config)}/2023-10-12/telemetry" # IDs for this session self.session_id: str = str(uuid.uuid4()) self.telemetry_id: str = self._get_telemetry_identifier(config=config) # Get common data we'll include in each request - self.studio_id: Optional[str] = get_studio_id(config=config) - self.user_id, _ = get_user_and_identity_store_id(config=config) - self.env_info: Dict[str, Any] = self._get_env_summary() - self.system_info: Dict[str, Any] = self._get_system_info() + self.system_metadata = self._get_system_metadata(config=config) self._start_threads() @@ -84,35 +97,71 @@ def _get_telemetry_identifier(self, config: Optional[ConfigParser] = None): def _start_threads(self) -> None: """Set up background threads for shutdown checking and request sending""" - self.event_queue: Queue[Optional[TelemetryEvent]] = Queue() + self.event_queue: Queue[Optional[TelemetryEvent]] = Queue( + maxsize=TelemetryClient.MAX_QUEUE_SIZE + ) atexit.register(self._exit_cleanly) self.processing_thread: Thread = Thread( target=self._process_event_queue_thread, daemon=True ) self.processing_thread.start() - def _get_env_summary(self) -> Dict[str, Any]: - """Builds up a dict of non-identifiable information the environment.""" - return { - "service": "deadline-cloud-library", - "version": ".".join(version.split(".")[:3]), - "pythonVersion": platform.python_version(), - } + def _get_system_metadata(self, config: Optional[ConfigParser]) -> Dict[str, Any]: + """ + Builds up a dict of non-identifiable metadata about the system environment. - def _get_system_info(self) -> Dict[str, Any]: - """Builds up a dict of non-identifiable information about this machine.""" + This will be used in the Rum event metadata, which has a limit of 10 unique values. + """ platform_info = platform.uname() - return { + metadata: Dict[str, Any] = { + "service": self.package_name, + "version": self.package_ver, + "python_version": platform.python_version(), "osName": "macOS" if platform_info.system == "Darwin" else platform_info.system, "osVersion": platform_info.release, - "cpuType": platform_info.machine, - "cpuName": platform_info.processor, } + user_id, _ = get_user_and_identity_store_id(config=config) + if user_id: + metadata["user_id"] = user_id + studio_id: Optional[str] = get_studio_id(config=config) + if studio_id: + metadata["studio_id"] = studio_id + + return metadata + def _exit_cleanly(self): self.event_queue.put(None) self.processing_thread.join() + def _send_request(self, req: request.Request) -> None: + attempts = 0 + success = False + while not success: + try: + logger.warning(f"Sending telemetry data: {req.data}") + with request.urlopen(req): + logger.debug("Successfully sent telemetry.") + success = True + except error.HTTPError as httpe: + if httpe.code == 429 or httpe.code == 500: + logger.debug(f"Error received from service. Waiting to retry: {str(httpe)}") + + attempts += 1 + if attempts >= TelemetryClient.MAX_RETRY_ATTEMPTS: + raise Exception("Max retries reached sending telemetry") + + backoff_sleep = random.uniform( + 0, + min( + TelemetryClient.MAX_BACKOFF_SECONDS, + TelemetryClient.BASE_TIME * 2**attempts, + ), + ) + time.sleep(backoff_sleep) + else: # Reraise any exceptions we didn't expect + raise + def _process_event_queue_thread(self): """Background thread for processing the telemetry event data queue and sending telemetry requests.""" while True: @@ -127,9 +176,9 @@ def _process_event_queue_thread(self): "BatchId": str(uuid.uuid4()), "RumEvents": [ { - "details": "{}", + "details": str(json.dumps(event_data.event_details)), "id": str(uuid.uuid4()), - "metadata": str(json.dumps(event_data.event_body)), + "metadata": str(json.dumps(self.system_metadata)), "timestamp": int(datetime.now().timestamp()), "type": event_data.event_type, }, @@ -139,44 +188,74 @@ def _process_event_queue_thread(self): request_body_encoded = str(json.dumps(request_body)).encode("utf-8") req = request.Request(url=self.endpoint, data=request_body_encoded, headers=headers) try: - logger.debug(f"Sending telemetry data: {request_body}") - with request.urlopen(req): - logger.debug("Successfully sent telemetry.") - except error.HTTPError as httpe: - logger.debug(f"HTTPError sending telemetry: {str(httpe)}") - except Exception as ex: - logger.debug(f"Exception sending telemetry: {str(ex)}") + self._send_request(req) + except Exception: + # Silently swallow any kind of uncaught exception and stop sending telemetry + return self.event_queue.task_done() + def _put_telemetry_record(self, event: TelemetryEvent) -> None: + if self.telemetry_opted_out: + return + try: + self.event_queue.put_nowait(event) + except Full: + # Silently swallow the error if the event queue is full (due to throttling of the service) + pass + def _record_summary_statistics( self, event_type: str, summary: SummaryStatistics, from_gui: bool ): - if self.telemetry_opted_out: - return - data_body: Dict[str, Any] = asdict(summary) - data_body.update(self.env_info) - data_body.update(self.system_info) - if self.user_id: - data_body["userId"] = self.user_id - if self.studio_id: - data_body["studioId"] = self.studio_id - data_body["usageMode"] = "GUI" if from_gui else "CLI" - self.event_queue.put_nowait(TelemetryEvent(event_type=event_type, event_body=data_body)) - - def record_hashing_summary(self, summary: SummaryStatistics, from_gui: bool = False): + details: Dict[str, Any] = asdict(summary) + details["usage_mode"] = "GUI" if from_gui else "CLI" + self._put_telemetry_record(TelemetryEvent(event_type=event_type, event_details=details)) + + def record_hashing_summary(self, summary: SummaryStatistics, *, from_gui: bool = False): self._record_summary_statistics( - "com.amazon.rum.job_attachments.hashing_summary", summary, from_gui + "com.amazon.rum.deadline.job_attachments.hashing_summary", summary, from_gui ) - def record_upload_summary(self, summary: SummaryStatistics, from_gui: bool = False): + def record_upload_summary(self, summary: SummaryStatistics, *, from_gui: bool = False): self._record_summary_statistics( - "com.amazon.rum.job_attachments.upload_summary", summary, from_gui + "com.amazon.rum.deadline.job_attachments.upload_summary", summary, from_gui + ) + + def record_event(self, event_type: str, event_details: Dict[str, Any]): + self._put_telemetry_record( + TelemetryEvent( + event_type=event_type, + event_details=event_details, + ) ) -def get_telemetry_client(config: Optional[ConfigParser] = None) -> TelemetryClient: +def get_telemetry_client( + package_name: str, package_ver: str, config: Optional[ConfigParser] = None +) -> TelemetryClient: + """ + Retrieves the cached telemetry client, lazy-loading the first time this is called. + :param config: Optional configuration to use for the client. Loads defaults if not given. + :param package_name: Optional override package name to include in requests. Defaults to the 'deadline-cloud' package. + :param package_ver: Optional override package version to include in requests. Defaults to the 'deadline-cloud' version. + :return: Telemetry client to make requests with. + """ global __cached_telemetry_client if not __cached_telemetry_client: - __cached_telemetry_client = TelemetryClient(config) + __cached_telemetry_client = TelemetryClient( + package_name=package_name, + package_ver=package_ver, + config=config, + ) return __cached_telemetry_client + + +def get_deadline_cloud_library_telemetry_client( + config: Optional[ConfigParser] = None, +) -> TelemetryClient: + """ + Retrieves the cached telemetry client, specifying the Deadline Cloud Client Library's package information. + :param config: Optional configuration to use for the client. Loads defaults if not given. + :return: Telemetry client to make requests with. + """ + return get_telemetry_client("deadline-cloud-library", version, config=config) diff --git a/src/deadline/client/cli/_groups/bundle_group.py b/src/deadline/client/cli/_groups/bundle_group.py index 9a3ffd51..b007cad3 100644 --- a/src/deadline/client/cli/_groups/bundle_group.py +++ b/src/deadline/client/cli/_groups/bundle_group.py @@ -346,7 +346,9 @@ def _update_hash_progress(hashing_metadata: ProgressReportMetadata) -> bool: on_preparing_to_submit=_update_hash_progress, ) - api.get_telemetry_client(config=config).record_hashing_summary(hashing_summary) + api.get_deadline_cloud_library_telemetry_client(config=config).record_hashing_summary( + hashing_summary + ) click.echo("Hashing Summary:") click.echo(textwrap.indent(str(hashing_summary), " ")) @@ -380,7 +382,9 @@ def _update_upload_progress(upload_metadata: ProgressReportMetadata) -> bool: manifests, _update_upload_progress ) - api.get_telemetry_client(config=config).record_upload_summary(upload_summary) + api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary( + upload_summary + ) click.echo("Upload Summary:") click.echo(textwrap.indent(str(upload_summary), " ")) diff --git a/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py b/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py index a1306122..74abfe92 100644 --- a/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py +++ b/src/deadline/client/ui/dialogs/submit_job_progress_dialog.py @@ -421,7 +421,9 @@ def handle_hashing_thread_succeeded( Handles the signal sent from the background hashing thread when the hashing process has completed. """ - api.get_telemetry_client().record_hashing_summary(hashing_summary, from_gui=True) + api.get_deadline_cloud_library_telemetry_client().record_hashing_summary( + hashing_summary, from_gui=True + ) self.summary_edit.setText( f"\nHashing Summary:\n{textwrap.indent(str(hashing_summary), ' ')}" ) @@ -450,7 +452,9 @@ def handle_upload_thread_succeeded( "defaults.job_attachments_file_system" ) - api.get_telemetry_client().record_upload_summary(upload_summary, from_gui=True) + api.get_deadline_cloud_library_telemetry_client().record_upload_summary( + upload_summary, from_gui=True + ) self.summary_edit.setText( f"{self.summary_edit.toPlainText()}" + f"\nUpload Summary:\n{textwrap.indent(str(upload_summary), ' ')}" diff --git a/test/unit/deadline_client/api/test_api_telemetry.py b/test/unit/deadline_client/api/test_api_telemetry.py index 63a2fbbb..474e7bb4 100644 --- a/test/unit/deadline_client/api/test_api_telemetry.py +++ b/test/unit/deadline_client/api/test_api_telemetry.py @@ -2,6 +2,7 @@ import pytest import uuid +import time from unittest.mock import patch, MagicMock from dataclasses import asdict @@ -22,7 +23,9 @@ def fixture_telemetry_client(fresh_deadline_config): "get_user_and_identity_store_id", side_effect=[("user-id", "identity-store-id")], ): - return TelemetryClient(config=config.config_file.read_config()) + return TelemetryClient( + "test-library", "0.1.2.1234", config=config.config_file.read_config() + ) def test_opt_out(fresh_deadline_config): @@ -31,19 +34,19 @@ def test_opt_out(fresh_deadline_config): config.set_setting("defaults.aws_profile_name", "SomeRandomProfileName") config.set_setting("telemetry.opt_out", "true") # WHEN - client = TelemetryClient(config=config.config_file.read_config()) + client = TelemetryClient( + "test-library", "test-version", config=config.config_file.read_config() + ) # THEN assert not hasattr(client, "endpoint") assert not hasattr(client, "session_id") assert not hasattr(client, "telemetry_id") - assert not hasattr(client, "studio_id") - assert not hasattr(client, "user_id") - assert not hasattr(client, "env_info") + assert not hasattr(client, "system_metadata") assert not hasattr(client, "event_queue") assert not hasattr(client, "processing_thread") # Ensure nothing blows up if we try recording telemetry after we've opted out - client.record_hashing_summary(SummaryStatistics(), True) - client.record_upload_summary(SummaryStatistics(), False) + client.record_hashing_summary(SummaryStatistics(), from_gui=True) + client.record_upload_summary(SummaryStatistics(), from_gui=False) def test_get_telemetry_identifier(telemetry_client): @@ -78,9 +81,36 @@ def test_process_event_queue_thread(telemetry_client): assert queue_mock.get.call_count == 2 +@pytest.mark.parametrize( + "http_code,attempt_count", + [ + (400, 1), + (429, TelemetryClient.MAX_RETRY_ATTEMPTS), + (500, TelemetryClient.MAX_RETRY_ATTEMPTS), + ], +) +@pytest.mark.timeout(5) # Timeout in case we don't exit the while loop +def test_process_event_queue_thread_retries_and_exits(telemetry_client, http_code, attempt_count): + """Test that the thread exits cleanly after getting an unexpected exception""" + # GIVEN + http_error = request.HTTPError("http://test.com", http_code, "Http Error", {}, None) # type: ignore + queue_mock = MagicMock() + queue_mock.get.side_effect = [TelemetryEvent(), None] + telemetry_client.event_queue = queue_mock + # WHEN + with patch.object(request, "urlopen", side_effect=http_error) as urlopen_mock, patch.object( + time, "sleep" + ) as sleep_mock: + telemetry_client._process_event_queue_thread() + urlopen_mock.call_count = attempt_count + sleep_mock.call_count = attempt_count + # THEN + assert queue_mock.get.call_count == 1 + + @pytest.mark.timeout(5) # Timeout in case we don't exit the while loop -def test_process_event_queue_thread_handles_errors(telemetry_client): - """Test that the thread continues after getting exceptions""" +def test_process_event_queue_thread_handles_unexpected_error(telemetry_client): + """Test that the thread exits cleanly after getting an unexpected exception""" # GIVEN queue_mock = MagicMock() queue_mock.get.side_effect = [TelemetryEvent(), None] @@ -90,31 +120,21 @@ def test_process_event_queue_thread_handles_errors(telemetry_client): telemetry_client._process_event_queue_thread() urlopen_mock.assert_called_once() # THEN - assert queue_mock.get.call_count == 2 + assert queue_mock.get.call_count == 1 def test_record_hashing_summary(telemetry_client): """Tests that recording a hashing summary sends the expected TelemetryEvent to the thread queue""" # GIVEN queue_mock = MagicMock() - expected_env_info = {"test_env": "test_val"} - expected_machine_info = {"test_machine": "test_val2"} test_summary = SummaryStatistics(total_bytes=123, total_files=12, total_time=12345) - expected_summary = asdict(test_summary) - expected_summary["usageMode"] = "CLI" - expected_summary["userId"] = "user-id" - expected_summary["studioId"] = "studio-id" - expected_summary.update(expected_env_info) - expected_summary.update(expected_machine_info) - + expected_summary["usage_mode"] = "CLI" expected_event = TelemetryEvent( - event_type="com.amazon.rum.job_attachments.hashing_summary", event_body=expected_summary + event_type="com.amazon.rum.deadline.job_attachments.hashing_summary", + event_details=expected_summary, ) - telemetry_client.event_queue = queue_mock - telemetry_client.env_info = expected_env_info - telemetry_client.system_info = expected_machine_info # WHEN telemetry_client.record_hashing_summary(test_summary) @@ -127,24 +147,14 @@ def test_record_upload_summary(telemetry_client): """Tests that recording an upload summary sends the expected TelemetryEvent to the thread queue""" # GIVEN queue_mock = MagicMock() - expected_env_info = {"test_env": "test_val"} - expected_machine_info = {"test_machine": "test_val2"} test_summary = SummaryStatistics(total_bytes=123, total_files=12, total_time=12345) - expected_summary = asdict(test_summary) - expected_summary["usageMode"] = "GUI" - expected_summary["userId"] = "user-id" - expected_summary["studioId"] = "studio-id" - expected_summary.update(expected_env_info) - expected_summary.update(expected_machine_info) - + expected_summary["usage_mode"] = "GUI" expected_event = TelemetryEvent( - event_type="com.amazon.rum.job_attachments.upload_summary", event_body=expected_summary + event_type="com.amazon.rum.deadline.job_attachments.upload_summary", + event_details=expected_summary, ) - telemetry_client.event_queue = queue_mock - telemetry_client.env_info = expected_env_info - telemetry_client.system_info = expected_machine_info # WHEN telemetry_client.record_upload_summary(test_summary, from_gui=True)