Skip to content

Commit

Permalink
feat!: Adds more telemetry events
Browse files Browse the repository at this point in the history
Signed-off-by: Caden Marofke <[email protected]>
  • Loading branch information
marofke committed Sep 29, 2023
1 parent 8c4f044 commit d20481a
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 91 deletions.
7 changes: 6 additions & 1 deletion src/deadline/client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"get_queue_user_boto3_session",
"get_queue_parameter_definitions",
"get_telemetry_client",
"get_deadline_cloud_library_telemetry_client",
]

from configparser import ConfigParser
Expand All @@ -46,7 +47,11 @@
)
from ._queue_parameters import get_queue_parameter_definitions
from ._submit_job_bundle import create_job_from_job_bundle, wait_for_create_job_to_complete
from ._telemetry import get_telemetry_client, TelemetryClient
from ._telemetry import (
get_telemetry_client,
get_deadline_cloud_library_telemetry_client,
TelemetryClient,
)

logger = getLogger(__name__)

Expand Down
8 changes: 6 additions & 2 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ def _default_update_hash_progress(hashing_metadata: Dict[str, str]) -> bool:
hash_cache_dir=os.path.expanduser(os.path.join("~", ".deadline", "cache")),
on_preparing_to_submit=hashing_progress_callback,
)
api.get_telemetry_client(config=config).record_hashing_summary(hashing_summary)
api.get_deadline_cloud_library_telemetry_client(config=config).record_hashing_summary(
hashing_summary
)

return manifests

Expand All @@ -296,6 +298,8 @@ def _default_update_upload_progress(upload_metadata: Dict[str, str]) -> bool:
upload_summary, attachment_settings = asset_manager.upload_assets(
manifests, upload_progress_callback
)
api.get_telemetry_client(config=config).record_upload_summary(upload_summary)
api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary(
upload_summary
)

return attachment_settings.to_dict()
175 changes: 127 additions & 48 deletions src/deadline/client/api/_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import logging
import platform
import uuid
import random
import time

from configparser import ConfigParser
from dataclasses import asdict, dataclass, field
from datetime import datetime
from queue import Queue
from queue import Queue, Full
from threading import Thread
from typing import Any, Dict, Optional
from urllib import request, error
Expand All @@ -29,8 +31,8 @@
class TelemetryEvent:
"""Base class for telemetry events"""

event_type: str = "com.amazon.rum.uncategorized"
event_body: Dict[str, Any] = field(default_factory=dict)
event_type: str = "com.amazon.rum.deadline.uncategorized"
event_details: Dict[str, Any] = field(default_factory=dict)


class TelemetryClient:
Expand All @@ -54,22 +56,33 @@ class TelemetryClient:
'deadline config set "telemetry.opt_out" true'
"""

def __init__(self, config: Optional[ConfigParser] = None):
# Used for backing off requests if we encounter errors from the service.
# See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
MAX_QUEUE_SIZE = 25
BASE_TIME = 0.5
MAX_BACKOFF_SECONDS = 10 # The maximum amount of time to wait between retries
MAX_RETRY_ATTEMPTS = 4

def __init__(
self,
package_name: str,
package_ver: str,
config: Optional[ConfigParser] = None,
):
self.telemetry_opted_out = config_file.str2bool(
config_file.get_setting("telemetry.opt_out", config=config)
)
if self.telemetry_opted_out:
return
self.package_name = package_name
self.package_ver = ".".join(package_ver.split(".")[:3])
self.endpoint: str = f"{config_file.get_setting('settings.deadline_endpoint_url', config=config)}/2023-10-12/telemetry"

# IDs for this session
self.session_id: str = str(uuid.uuid4())
self.telemetry_id: str = self._get_telemetry_identifier(config=config)
# Get common data we'll include in each request
self.studio_id: Optional[str] = get_studio_id(config=config)
self.user_id, _ = get_user_and_identity_store_id(config=config)
self.env_info: Dict[str, Any] = self._get_env_summary()
self.system_info: Dict[str, Any] = self._get_system_info()
self.system_metadata = self._get_system_metadata(config=config)

self._start_threads()

Expand All @@ -84,35 +97,71 @@ def _get_telemetry_identifier(self, config: Optional[ConfigParser] = None):

def _start_threads(self) -> None:
"""Set up background threads for shutdown checking and request sending"""
self.event_queue: Queue[Optional[TelemetryEvent]] = Queue()
self.event_queue: Queue[Optional[TelemetryEvent]] = Queue(
maxsize=TelemetryClient.MAX_QUEUE_SIZE
)
atexit.register(self._exit_cleanly)
self.processing_thread: Thread = Thread(
target=self._process_event_queue_thread, daemon=True
)
self.processing_thread.start()

def _get_env_summary(self) -> Dict[str, Any]:
"""Builds up a dict of non-identifiable information the environment."""
return {
"service": "deadline-cloud-library",
"version": ".".join(version.split(".")[:3]),
"pythonVersion": platform.python_version(),
}
def _get_system_metadata(self, config: Optional[ConfigParser]) -> Dict[str, Any]:
"""
Builds up a dict of non-identifiable metadata about the system environment.
def _get_system_info(self) -> Dict[str, Any]:
"""Builds up a dict of non-identifiable information about this machine."""
This will be used in the Rum event metadata, which has a limit of 10 unique values.
"""
platform_info = platform.uname()
return {
metadata: Dict[str, Any] = {
"service": self.package_name,
"version": self.package_ver,
"python_version": platform.python_version(),
"osName": "macOS" if platform_info.system == "Darwin" else platform_info.system,
"osVersion": platform_info.release,
"cpuType": platform_info.machine,
"cpuName": platform_info.processor,
}

user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
metadata["user_id"] = user_id
studio_id: Optional[str] = get_studio_id(config=config)
if studio_id:
metadata["studio_id"] = studio_id

return metadata

def _exit_cleanly(self):
self.event_queue.put(None)
self.processing_thread.join()

def _send_request(self, req: request.Request) -> None:
attempts = 0
success = False
while not success:
try:
logger.warning(f"Sending telemetry data: {req.data}")
with request.urlopen(req):
logger.debug("Successfully sent telemetry.")
success = True
except error.HTTPError as httpe:
if httpe.code == 429 or httpe.code == 500:
logger.debug(f"Error received from service. Waiting to retry: {str(httpe)}")

attempts += 1
if attempts >= TelemetryClient.MAX_RETRY_ATTEMPTS:
raise Exception("Max retries reached sending telemetry")

backoff_sleep = random.uniform(
0,
min(
TelemetryClient.MAX_BACKOFF_SECONDS,
TelemetryClient.BASE_TIME * 2**attempts,
),
)
time.sleep(backoff_sleep)
else: # Reraise any exceptions we didn't expect
raise

def _process_event_queue_thread(self):
"""Background thread for processing the telemetry event data queue and sending telemetry requests."""
while True:
Expand All @@ -127,9 +176,9 @@ def _process_event_queue_thread(self):
"BatchId": str(uuid.uuid4()),
"RumEvents": [
{
"details": "{}",
"details": str(json.dumps(event_data.event_details)),
"id": str(uuid.uuid4()),
"metadata": str(json.dumps(event_data.event_body)),
"metadata": str(json.dumps(self.system_metadata)),
"timestamp": int(datetime.now().timestamp()),
"type": event_data.event_type,
},
Expand All @@ -139,44 +188,74 @@ def _process_event_queue_thread(self):
request_body_encoded = str(json.dumps(request_body)).encode("utf-8")
req = request.Request(url=self.endpoint, data=request_body_encoded, headers=headers)
try:
logger.debug(f"Sending telemetry data: {request_body}")
with request.urlopen(req):
logger.debug("Successfully sent telemetry.")
except error.HTTPError as httpe:
logger.debug(f"HTTPError sending telemetry: {str(httpe)}")
except Exception as ex:
logger.debug(f"Exception sending telemetry: {str(ex)}")
self._send_request(req)
except Exception:
# Silently swallow any kind of uncaught exception and stop sending telemetry
return
self.event_queue.task_done()

def _put_telemetry_record(self, event: TelemetryEvent) -> None:
if self.telemetry_opted_out:
return
try:
self.event_queue.put_nowait(event)
except Full:
# Silently swallow the error if the event queue is full (due to throttling of the service)
pass

def _record_summary_statistics(
self, event_type: str, summary: SummaryStatistics, from_gui: bool
):
if self.telemetry_opted_out:
return
data_body: Dict[str, Any] = asdict(summary)
data_body.update(self.env_info)
data_body.update(self.system_info)
if self.user_id:
data_body["userId"] = self.user_id
if self.studio_id:
data_body["studioId"] = self.studio_id
data_body["usageMode"] = "GUI" if from_gui else "CLI"
self.event_queue.put_nowait(TelemetryEvent(event_type=event_type, event_body=data_body))

def record_hashing_summary(self, summary: SummaryStatistics, from_gui: bool = False):
details: Dict[str, Any] = asdict(summary)
details["usage_mode"] = "GUI" if from_gui else "CLI"
self._put_telemetry_record(TelemetryEvent(event_type=event_type, event_details=details))

def record_hashing_summary(self, summary: SummaryStatistics, *, from_gui: bool = False):
self._record_summary_statistics(
"com.amazon.rum.job_attachments.hashing_summary", summary, from_gui
"com.amazon.rum.deadline.job_attachments.hashing_summary", summary, from_gui
)

def record_upload_summary(self, summary: SummaryStatistics, from_gui: bool = False):
def record_upload_summary(self, summary: SummaryStatistics, *, from_gui: bool = False):
self._record_summary_statistics(
"com.amazon.rum.job_attachments.upload_summary", summary, from_gui
"com.amazon.rum.deadline.job_attachments.upload_summary", summary, from_gui
)

def record_event(self, event_type: str, event_details: Dict[str, Any]):
self._put_telemetry_record(
TelemetryEvent(
event_type=event_type,
event_details=event_details,
)
)


def get_telemetry_client(config: Optional[ConfigParser] = None) -> TelemetryClient:
def get_telemetry_client(
package_name: str, package_ver: str, config: Optional[ConfigParser] = None
) -> TelemetryClient:
"""
Retrieves the cached telemetry client, lazy-loading the first time this is called.
:param config: Optional configuration to use for the client. Loads defaults if not given.
:param package_name: Optional override package name to include in requests. Defaults to the 'deadline-cloud' package.
:param package_ver: Optional override package version to include in requests. Defaults to the 'deadline-cloud' version.
:return: Telemetry client to make requests with.
"""
global __cached_telemetry_client
if not __cached_telemetry_client:
__cached_telemetry_client = TelemetryClient(config)
__cached_telemetry_client = TelemetryClient(
package_name=package_name,
package_ver=package_ver,
config=config,
)

return __cached_telemetry_client


def get_deadline_cloud_library_telemetry_client(
config: Optional[ConfigParser] = None,
) -> TelemetryClient:
"""
Retrieves the cached telemetry client, specifying the Deadline Cloud Client Library's package information.
:param config: Optional configuration to use for the client. Loads defaults if not given.
:return: Telemetry client to make requests with.
"""
return get_telemetry_client("deadline-cloud-library", version, config=config)
8 changes: 6 additions & 2 deletions src/deadline/client/cli/_groups/bundle_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ def _update_hash_progress(hashing_metadata: ProgressReportMetadata) -> bool:
on_preparing_to_submit=_update_hash_progress,
)

api.get_telemetry_client(config=config).record_hashing_summary(hashing_summary)
api.get_deadline_cloud_library_telemetry_client(config=config).record_hashing_summary(
hashing_summary
)
click.echo("Hashing Summary:")
click.echo(textwrap.indent(str(hashing_summary), " "))

Expand Down Expand Up @@ -380,7 +382,9 @@ def _update_upload_progress(upload_metadata: ProgressReportMetadata) -> bool:
manifests, _update_upload_progress
)

api.get_telemetry_client(config=config).record_upload_summary(upload_summary)
api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary(
upload_summary
)
click.echo("Upload Summary:")
click.echo(textwrap.indent(str(upload_summary), " "))

Expand Down
8 changes: 6 additions & 2 deletions src/deadline/client/ui/dialogs/submit_job_progress_dialog.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,9 @@ def handle_hashing_thread_succeeded(
Handles the signal sent from the background hashing thread when the
hashing process has completed.
"""
api.get_telemetry_client().record_hashing_summary(hashing_summary, from_gui=True)
api.get_deadline_cloud_library_telemetry_client().record_hashing_summary(
hashing_summary, from_gui=True
)
self.summary_edit.setText(
f"\nHashing Summary:\n{textwrap.indent(str(hashing_summary), ' ')}"
)
Expand Down Expand Up @@ -450,7 +452,9 @@ def handle_upload_thread_succeeded(
"defaults.job_attachments_file_system"
)

api.get_telemetry_client().record_upload_summary(upload_summary, from_gui=True)
api.get_deadline_cloud_library_telemetry_client().record_upload_summary(
upload_summary, from_gui=True
)
self.summary_edit.setText(
f"{self.summary_edit.toPlainText()}"
+ f"\nUpload Summary:\n{textwrap.indent(str(upload_summary), ' ')}"
Expand Down
Loading

0 comments on commit d20481a

Please sign in to comment.