Skip to content

Commit

Permalink
feat!: Adds more telemetry events
Browse files Browse the repository at this point in the history
Signed-off-by: Caden Marofke <[email protected]>
  • Loading branch information
marofke committed Sep 27, 2023
1 parent cd95313 commit 1d181b5
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 69 deletions.
110 changes: 72 additions & 38 deletions src/deadline/client/api/_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
class TelemetryEvent:
"""Base class for telemetry events"""

event_type: str = "com.amazon.rum.uncategorized"
event_body: Dict[str, Any] = field(default_factory=dict)
event_type: str = "com.amazon.rum.deadline.uncategorized"
event_details: Dict[str, Any] = field(default_factory=dict)


class TelemetryClient:
Expand All @@ -54,7 +54,12 @@ class TelemetryClient:
'deadline config set "telemetry.opt_out" true'
"""

def __init__(self, config: Optional[ConfigParser] = None):
def __init__(
self,
config: Optional[ConfigParser] = None,
package_name: Optional[str] = None,
package_ver: Optional[str] = None,
):
self.telemetry_opted_out = config_file.str2bool(
config_file.get_setting("telemetry.opt_out", config=config)
)
Expand All @@ -66,10 +71,9 @@ def __init__(self, config: Optional[ConfigParser] = None):
self.session_id: str = str(uuid.uuid4())
self.telemetry_id: str = self._get_telemetry_identifier(config=config)
# Get common data we'll include in each request
self.studio_id: Optional[str] = get_studio_id(config=config)
self.user_id, _ = get_user_and_identity_store_id(config=config)
self.env_info: Dict[str, Any] = self._get_env_summary()
self.system_info: Dict[str, Any] = self._get_system_info()
self.system_metadata = self._get_system_metadata(
config=config, package_name=package_name, package_ver=package_ver
)

self._start_threads()

Expand All @@ -91,24 +95,37 @@ def _start_threads(self) -> None:
)
self.processing_thread.start()

def _get_env_summary(self) -> Dict[str, Any]:
"""Builds up a dict of non-identifiable information the environment."""
return {
"service": "deadline-cloud-library",
"version": ".".join(version.split(".")[:3]),
"pythonVersion": platform.python_version(),
}

def _get_system_info(self) -> Dict[str, Any]:
"""Builds up a dict of non-identifiable information about this machine."""
def _get_system_metadata(
self,
config: Optional[ConfigParser],
package_name: Optional[str] = None,
package_ver: Optional[str] = None,
) -> Dict[str, Any]:
"""
Builds up a dict of non-identifiable metadata about the system environment.
This will be used in the Rum event metadata, which has a limit of 10 unique values.
"""
platform_info = platform.uname()
return {
if not package_ver:
package_ver = version
metadata: Dict[str, Any] = {
"service": package_name if package_name else "deadline-cloud-library",
"version": ".".join(package_ver.split(".")[:3]),
"python_version": platform.python_version(),
"osName": "macOS" if platform_info.system == "Darwin" else platform_info.system,
"osVersion": platform_info.release,
"cpuType": platform_info.machine,
"cpuName": platform_info.processor,
}

user_id, _ = get_user_and_identity_store_id(config=config)
if user_id:
metadata["user_id"] = user_id
studio_id: Optional[str] = get_studio_id(config=config)
if studio_id:
metadata["studio_id"] = studio_id

return metadata

def _exit_cleanly(self):
self.event_queue.put(None)
self.processing_thread.join()
Expand All @@ -127,9 +144,9 @@ def _process_event_queue_thread(self):
"BatchId": str(uuid.uuid4()),
"RumEvents": [
{
"details": "{}",
"details": str(json.dumps(event_data.event_details)),
"id": str(uuid.uuid4()),
"metadata": str(json.dumps(event_data.event_body)),
"metadata": str(json.dumps(self.system_metadata)),
"timestamp": int(datetime.now().timestamp()),
"type": event_data.event_type,
},
Expand All @@ -153,30 +170,47 @@ def _record_summary_statistics(
):
if self.telemetry_opted_out:
return
data_body: Dict[str, Any] = asdict(summary)
data_body.update(self.env_info)
data_body.update(self.system_info)
if self.user_id:
data_body["userId"] = self.user_id
if self.studio_id:
data_body["studioId"] = self.studio_id
data_body["usageMode"] = "GUI" if from_gui else "CLI"
self.event_queue.put_nowait(TelemetryEvent(event_type=event_type, event_body=data_body))

def record_hashing_summary(self, summary: SummaryStatistics, from_gui: bool = False):
details: Dict[str, Any] = asdict(summary)
details["usage_mode"] = "GUI" if from_gui else "CLI"
self.event_queue.put_nowait(TelemetryEvent(event_type=event_type, event_details=details))

def record_hashing_summary(self, summary: SummaryStatistics, *, from_gui: bool = False):
self._record_summary_statistics(
"com.amazon.rum.job_attachments.hashing_summary", summary, from_gui
"com.amazon.rum.deadline.job_attachments.hashing_summary", summary, from_gui
)

def record_upload_summary(self, summary: SummaryStatistics, from_gui: bool = False):
def record_upload_summary(self, summary: SummaryStatistics, *, from_gui: bool = False):
self._record_summary_statistics(
"com.amazon.rum.job_attachments.upload_summary", summary, from_gui
"com.amazon.rum.deadline.job_attachments.upload_summary", summary, from_gui
)

def record_event(self, event_type: str, event_details: Dict[str, Any]):
if self.telemetry_opted_out:
return
self.event_queue.put_nowait(
TelemetryEvent(
event_type=event_type,
event_details=event_details,
)
)


def get_telemetry_client(config: Optional[ConfigParser] = None) -> TelemetryClient:
def get_telemetry_client(
config: Optional[ConfigParser] = None,
package_name: Optional[str] = None,
package_ver: Optional[str] = None,
) -> TelemetryClient:
"""
Retrieves the cached telemetry client, lazy-loading the first time this is called.
:param config: Optional configuration to use for the client. Loads defaults if not given.
:param package_name: Optional override package name to include in requests. Defaults to the 'deadline-cloud' package.
:param package_ver: Optional override package version to include in requests. Defaults to the 'deadline-cloud' version.
:return: Telemetry client to make requests with.
"""
global __cached_telemetry_client
if not __cached_telemetry_client:
__cached_telemetry_client = TelemetryClient(config)
__cached_telemetry_client = TelemetryClient(
config=config, package_name=package_name, package_ver=package_ver
)

return __cached_telemetry_client
40 changes: 9 additions & 31 deletions test/unit/deadline_client/api/test_api_telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,12 @@ def test_opt_out(fresh_deadline_config):
assert not hasattr(client, "endpoint")
assert not hasattr(client, "session_id")
assert not hasattr(client, "telemetry_id")
assert not hasattr(client, "studio_id")
assert not hasattr(client, "user_id")
assert not hasattr(client, "env_info")
assert not hasattr(client, "system_metadata")
assert not hasattr(client, "event_queue")
assert not hasattr(client, "processing_thread")
# Ensure nothing blows up if we try recording telemetry after we've opted out
client.record_hashing_summary(SummaryStatistics(), True)
client.record_upload_summary(SummaryStatistics(), False)
client.record_hashing_summary(SummaryStatistics(), from_gui=True)
client.record_upload_summary(SummaryStatistics(), from_gui=False)


def test_get_telemetry_identifier(telemetry_client):
Expand Down Expand Up @@ -97,24 +95,14 @@ def test_record_hashing_summary(telemetry_client):
"""Tests that recording a hashing summary sends the expected TelemetryEvent to the thread queue"""
# GIVEN
queue_mock = MagicMock()
expected_env_info = {"test_env": "test_val"}
expected_machine_info = {"test_machine": "test_val2"}
test_summary = SummaryStatistics(total_bytes=123, total_files=12, total_time=12345)

expected_summary = asdict(test_summary)
expected_summary["usageMode"] = "CLI"
expected_summary["userId"] = "user-id"
expected_summary["studioId"] = "studio-id"
expected_summary.update(expected_env_info)
expected_summary.update(expected_machine_info)

expected_summary["usage_mode"] = "CLI"
expected_event = TelemetryEvent(
event_type="com.amazon.rum.job_attachments.hashing_summary", event_body=expected_summary
event_type="com.amazon.rum.deadline.job_attachments.hashing_summary",
event_details=expected_summary,
)

telemetry_client.event_queue = queue_mock
telemetry_client.env_info = expected_env_info
telemetry_client.system_info = expected_machine_info

# WHEN
telemetry_client.record_hashing_summary(test_summary)
Expand All @@ -127,24 +115,14 @@ def test_record_upload_summary(telemetry_client):
"""Tests that recording an upload summary sends the expected TelemetryEvent to the thread queue"""
# GIVEN
queue_mock = MagicMock()
expected_env_info = {"test_env": "test_val"}
expected_machine_info = {"test_machine": "test_val2"}
test_summary = SummaryStatistics(total_bytes=123, total_files=12, total_time=12345)

expected_summary = asdict(test_summary)
expected_summary["usageMode"] = "GUI"
expected_summary["userId"] = "user-id"
expected_summary["studioId"] = "studio-id"
expected_summary.update(expected_env_info)
expected_summary.update(expected_machine_info)

expected_summary["usage_mode"] = "GUI"
expected_event = TelemetryEvent(
event_type="com.amazon.rum.job_attachments.upload_summary", event_body=expected_summary
event_type="com.amazon.rum.deadline.job_attachments.upload_summary",
event_details=expected_summary,
)

telemetry_client.event_queue = queue_mock
telemetry_client.env_info = expected_env_info
telemetry_client.system_info = expected_machine_info

# WHEN
telemetry_client.record_upload_summary(test_summary, from_gui=True)
Expand Down

0 comments on commit 1d181b5

Please sign in to comment.