Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: Adds more telemetry events #60

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/deadline/client/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"get_queue_user_boto3_session",
"get_queue_parameter_definitions",
"get_telemetry_client",
"get_deadline_cloud_library_telemetry_client",
]

from configparser import ConfigParser
Expand All @@ -46,7 +47,11 @@
)
from ._queue_parameters import get_queue_parameter_definitions
from ._submit_job_bundle import create_job_from_job_bundle, wait_for_create_job_to_complete
from ._telemetry import get_telemetry_client, TelemetryClient
from ._telemetry import (
get_telemetry_client,
get_deadline_cloud_library_telemetry_client,
TelemetryClient,
)

logger = getLogger(__name__)

Expand Down
8 changes: 6 additions & 2 deletions src/deadline/client/api/_submit_job_bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ def _default_update_hash_progress(hashing_metadata: Dict[str, str]) -> bool:
hash_cache_dir=os.path.expanduser(os.path.join("~", ".deadline", "cache")),
on_preparing_to_submit=hashing_progress_callback,
)
api.get_telemetry_client(config=config).record_hashing_summary(hashing_summary)
api.get_deadline_cloud_library_telemetry_client(config=config).record_hashing_summary(
hashing_summary
)

return manifests

Expand All @@ -296,6 +298,8 @@ def _default_update_upload_progress(upload_metadata: Dict[str, str]) -> bool:
upload_summary, attachment_settings = asset_manager.upload_assets(
manifests, upload_progress_callback
)
api.get_telemetry_client(config=config).record_upload_summary(upload_summary)
api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary(
upload_summary
)

return attachment_settings.to_dict()
175 changes: 127 additions & 48 deletions src/deadline/client/api/_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import logging
import platform
import uuid
import random
import time

from configparser import ConfigParser
from dataclasses import asdict, dataclass, field
from datetime import datetime
from queue import Queue
from queue import Queue, Full
from threading import Thread
from typing import Any, Dict, Optional
from urllib import request, error
Expand All @@ -29,8 +31,8 @@
class TelemetryEvent:
"""Base class for telemetry events"""

event_type: str = "com.amazon.rum.uncategorized"
event_body: Dict[str, Any] = field(default_factory=dict)
event_type: str = "com.amazon.rum.deadline.uncategorized"
event_details: Dict[str, Any] = field(default_factory=dict)


class TelemetryClient:
Expand All @@ -54,22 +56,33 @@ class TelemetryClient:
'deadline config set "telemetry.opt_out" true'
"""

def __init__(self, config: Optional[ConfigParser] = None):
# Used for backing off requests if we encounter errors from the service.
# See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/
MAX_QUEUE_SIZE = 25
BASE_TIME = 0.5
MAX_BACKOFF_SECONDS = 10 # The maximum amount of time to wait between retries
MAX_RETRY_ATTEMPTS = 4

def __init__(
self,
package_name: str,
package_ver: str,
config: Optional[ConfigParser] = None,
):
self.telemetry_opted_out = config_file.str2bool(
config_file.get_setting("telemetry.opt_out", config=config)
)
if self.telemetry_opted_out:
return
self.package_name = package_name
self.package_ver = ".".join(package_ver.split(".")[:3])
self.endpoint: str = f"{config_file.get_setting('settings.deadline_endpoint_url', config=config)}/2023-10-12/telemetry"

# IDs for this session
self.session_id: str = str(uuid.uuid4())
self.telemetry_id: str = self._get_telemetry_identifier(config=config)
# Get common data we'll include in each request
self.studio_id: Optional[str] = get_studio_id(config=config)
self.user_id, _ = get_user_and_identity_store_id(config=config)
self.env_info: Dict[str, Any] = self._get_env_summary()
self.system_info: Dict[str, Any] = self._get_system_info()
self.system_metadata = self._get_system_metadata(config=config)

self._start_threads()

Expand All @@ -84,35 +97,71 @@ def _get_telemetry_identifier(self, config: Optional[ConfigParser] = None):

def _start_threads(self) -> None:
"""Set up background threads for shutdown checking and request sending"""
self.event_queue: Queue[Optional[TelemetryEvent]] = Queue()
self.event_queue: Queue[Optional[TelemetryEvent]] = Queue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have optionals on a queue? should we not enqueue if it's None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxsize=TelemetryClient.MAX_QUEUE_SIZE
)
atexit.register(self._exit_cleanly)
self.processing_thread: Thread = Thread(
target=self._process_event_queue_thread, daemon=True
)
self.processing_thread.start()

def _get_env_summary(self) -> Dict[str, Any]:
"""Builds up a dict of non-identifiable information the environment."""
return {
"service": "deadline-cloud-library",
"version": ".".join(version.split(".")[:3]),
"pythonVersion": platform.python_version(),
}
def _get_system_metadata(self, config: Optional[ConfigParser]) -> Dict[str, Any]:
"""
Builds up a dict of non-identifiable metadata about the system environment.

def _get_system_info(self) -> Dict[str, Any]:
"""Builds up a dict of non-identifiable information about this machine."""
This will be used in the Rum event metadata, which has a limit of 10 unique values.
"""
platform_info = platform.uname()
return {
metadata: Dict[str, Any] = {
marofke marked this conversation as resolved.
Show resolved Hide resolved
"service": self.package_name,
"version": self.package_ver,
"python_version": platform.python_version(),
"osName": "macOS" if platform_info.system == "Darwin" else platform_info.system,
"osVersion": platform_info.release,
"cpuType": platform_info.machine,
"cpuName": platform_info.processor,
}

user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
metadata["user_id"] = user_id
studio_id: Optional[str] = get_studio_id(config=config)
if studio_id:
metadata["studio_id"] = studio_id

return metadata

def _exit_cleanly(self):
self.event_queue.put(None)
self.processing_thread.join()

def _send_request(self, req: request.Request) -> None:
attempts = 0
success = False
while not success:
try:
logger.warning(f"Sending telemetry data: {req.data}")
with request.urlopen(req):
logger.debug("Successfully sent telemetry.")
success = True
except error.HTTPError as httpe:
if httpe.code == 429 or httpe.code == 500:
logger.debug(f"Error received from service. Waiting to retry: {str(httpe)}")

attempts += 1
if attempts >= TelemetryClient.MAX_RETRY_ATTEMPTS:
raise Exception("Max retries reached sending telemetry")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this caught? we don't want to kill the worker/submitter/whatever if telemetry is out, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm I see we catch it down below!


backoff_sleep = random.uniform(
0,
min(
TelemetryClient.MAX_BACKOFF_SECONDS,
TelemetryClient.BASE_TIME * 2**attempts,
),
)
time.sleep(backoff_sleep)
else: # Reraise any exceptions we didn't expect
raise

def _process_event_queue_thread(self):
"""Background thread for processing the telemetry event data queue and sending telemetry requests."""
while True:
Expand All @@ -127,9 +176,9 @@ def _process_event_queue_thread(self):
"BatchId": str(uuid.uuid4()),
"RumEvents": [
{
"details": "{}",
"details": str(json.dumps(event_data.event_details)),
"id": str(uuid.uuid4()),
"metadata": str(json.dumps(event_data.event_body)),
"metadata": str(json.dumps(self.system_metadata)),
"timestamp": int(datetime.now().timestamp()),
"type": event_data.event_type,
},
Expand All @@ -139,44 +188,74 @@ def _process_event_queue_thread(self):
request_body_encoded = str(json.dumps(request_body)).encode("utf-8")
req = request.Request(url=self.endpoint, data=request_body_encoded, headers=headers)
try:
logger.debug(f"Sending telemetry data: {request_body}")
with request.urlopen(req):
logger.debug("Successfully sent telemetry.")
except error.HTTPError as httpe:
logger.debug(f"HTTPError sending telemetry: {str(httpe)}")
except Exception as ex:
logger.debug(f"Exception sending telemetry: {str(ex)}")
self._send_request(req)
except Exception:
# Silently swallow any kind of uncaught exception and stop sending telemetry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we log the exception? if we do we should probably do some deduping so we only see one of any particular exception type or something like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what the value is for a customer since telemetry should just run in the background and not cause additional noise. We could try adding it later on if it makes debugging easier for us, but I think it's fine to leave for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes debugging for us is more what I'm thinking about. maybe we have a config for it?

return
self.event_queue.task_done()

def _put_telemetry_record(self, event: TelemetryEvent) -> None:
if self.telemetry_opted_out:
return
try:
self.event_queue.put_nowait(event)
except Full:
# Silently swallow the error if the event queue is full (due to throttling of the service)
pass

def _record_summary_statistics(
self, event_type: str, summary: SummaryStatistics, from_gui: bool
):
if self.telemetry_opted_out:
return
data_body: Dict[str, Any] = asdict(summary)
data_body.update(self.env_info)
data_body.update(self.system_info)
if self.user_id:
data_body["userId"] = self.user_id
if self.studio_id:
data_body["studioId"] = self.studio_id
data_body["usageMode"] = "GUI" if from_gui else "CLI"
self.event_queue.put_nowait(TelemetryEvent(event_type=event_type, event_body=data_body))

def record_hashing_summary(self, summary: SummaryStatistics, from_gui: bool = False):
details: Dict[str, Any] = asdict(summary)
details["usage_mode"] = "GUI" if from_gui else "CLI"
self._put_telemetry_record(TelemetryEvent(event_type=event_type, event_details=details))

def record_hashing_summary(self, summary: SummaryStatistics, *, from_gui: bool = False):
self._record_summary_statistics(
"com.amazon.rum.job_attachments.hashing_summary", summary, from_gui
"com.amazon.rum.deadline.job_attachments.hashing_summary", summary, from_gui
)

def record_upload_summary(self, summary: SummaryStatistics, from_gui: bool = False):
def record_upload_summary(self, summary: SummaryStatistics, *, from_gui: bool = False):
self._record_summary_statistics(
"com.amazon.rum.job_attachments.upload_summary", summary, from_gui
"com.amazon.rum.deadline.job_attachments.upload_summary", summary, from_gui
)

def record_event(self, event_type: str, event_details: Dict[str, Any]):
self._put_telemetry_record(
TelemetryEvent(
event_type=event_type,
event_details=event_details,
)
)


def get_telemetry_client(config: Optional[ConfigParser] = None) -> TelemetryClient:
def get_telemetry_client(
package_name: str, package_ver: str, config: Optional[ConfigParser] = None
) -> TelemetryClient:
"""
Retrieves the cached telemetry client, lazy-loading the first time this is called.
:param config: Optional configuration to use for the client. Loads defaults if not given.
:param package_name: Optional override package name to include in requests. Defaults to the 'deadline-cloud' package.
:param package_ver: Optional override package version to include in requests. Defaults to the 'deadline-cloud' version.
:return: Telemetry client to make requests with.
"""
global __cached_telemetry_client
if not __cached_telemetry_client:
__cached_telemetry_client = TelemetryClient(config)
__cached_telemetry_client = TelemetryClient(
package_name=package_name,
package_ver=package_ver,
config=config,
)

return __cached_telemetry_client


def get_deadline_cloud_library_telemetry_client(
config: Optional[ConfigParser] = None,
) -> TelemetryClient:
"""
Retrieves the cached telemetry client, specifying the Deadline Cloud Client Library's package information.
:param config: Optional configuration to use for the client. Loads defaults if not given.
:return: Telemetry client to make requests with.
"""
return get_telemetry_client("deadline-cloud-library", version, config=config)
8 changes: 6 additions & 2 deletions src/deadline/client/cli/_groups/bundle_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,9 @@ def _update_hash_progress(hashing_metadata: ProgressReportMetadata) -> bool:
on_preparing_to_submit=_update_hash_progress,
)

api.get_telemetry_client(config=config).record_hashing_summary(hashing_summary)
api.get_deadline_cloud_library_telemetry_client(config=config).record_hashing_summary(
hashing_summary
)
click.echo("Hashing Summary:")
click.echo(textwrap.indent(str(hashing_summary), " "))

Expand Down Expand Up @@ -380,7 +382,9 @@ def _update_upload_progress(upload_metadata: ProgressReportMetadata) -> bool:
manifests, _update_upload_progress
)

api.get_telemetry_client(config=config).record_upload_summary(upload_summary)
api.get_deadline_cloud_library_telemetry_client(config=config).record_upload_summary(
upload_summary
)
click.echo("Upload Summary:")
click.echo(textwrap.indent(str(upload_summary), " "))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,9 @@ def handle_hashing_thread_succeeded(
Handles the signal sent from the background hashing thread when the
hashing process has completed.
"""
api.get_telemetry_client().record_hashing_summary(hashing_summary, from_gui=True)
api.get_deadline_cloud_library_telemetry_client().record_hashing_summary(
hashing_summary, from_gui=True
)
self.summary_edit.setText(
f"\nHashing Summary:\n{textwrap.indent(str(hashing_summary), ' ')}"
)
Expand Down Expand Up @@ -450,7 +452,9 @@ def handle_upload_thread_succeeded(
"defaults.job_attachments_file_system"
)

api.get_telemetry_client().record_upload_summary(upload_summary, from_gui=True)
api.get_deadline_cloud_library_telemetry_client().record_upload_summary(
upload_summary, from_gui=True
)
self.summary_edit.setText(
f"{self.summary_edit.toPlainText()}"
+ f"\nUpload Summary:\n{textwrap.indent(str(upload_summary), ' ')}"
Expand Down
Loading