-
Notifications
You must be signed in to change notification settings - Fork 0
/
enable_monte_carlo_databricks_job_incidents.py
132 lines (111 loc) · 4.45 KB
/
enable_monte_carlo_databricks_job_incidents.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from typing import Set
import click
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
Webhook,
WebhookNotifications,
Job,
JobAccessControlRequest,
JobPermissionLevel,
)
from databricks.sdk.service.iam import ServicePrincipal
@click.command(help="Enable Monte Carlo incidents for Databricks jobs")
@click.pass_obj
@click.option(
"--mcd-notification-id",
required=True,
help="UUID of the existing Databricks Notification pointing to the MC Webhook endpoint.",
)
@click.option(
"--mcd-service-principal-name",
required=True,
help="Application ID of the existing Monte Carlo service principal in Databricks.",
)
@click.option(
"--databricks-job-name",
required=False,
multiple=True,
help="Databricks Job Name to enable MC incidents for. Can be used multiple times. If not specified, enable MC incidents for all jobs.",
)
def enable_monte_carlo_databricks_job_incidents(ctx, **kwargs):
"""
Add the Monte Carlo Webhook to notify on Job Failures
"""
databricks_job_names = set(kwargs["databricks_job_name"])
_enable_monte_carlo_databricks_job_incidents(
mcd_notification_id=kwargs["mcd_notification_id"],
mcd_service_principal_name=kwargs["mcd_service_principal_name"],
databricks_job_names=databricks_job_names,
)
def _enable_monte_carlo_databricks_job_incidents(
mcd_notification_id: str,
mcd_service_principal_name: str,
databricks_job_names: Set[str],
):
workspace_client = WorkspaceClient()
# assert that the notification exists
workspace_client.notification_destinations.get(id=mcd_notification_id)
mcd_notification = Webhook(id=mcd_notification_id)
# assert that the service principal exists
mcd_service_principal = None
service_principals = workspace_client.service_principals.list()
for service_principal in service_principals:
if service_principal.application_id == mcd_service_principal_name:
mcd_service_principal = service_principal
break
assert mcd_service_principal is not None
jobs = list(workspace_client.jobs.list())
click.echo(f"Configuring the Monte Carlo webhook for {len(jobs)} jobs")
for job in jobs:
job_name = job.settings.name
if databricks_job_names and job.settings.name not in databricks_job_names:
continue
_add_monte_carlo_webhook_to_job(workspace_client, job, mcd_notification)
_add_can_view_permissions(workspace_client, job, mcd_service_principal)
def _add_monte_carlo_webhook_to_job(
workspace_client: WorkspaceClient, job: Job, mcd_notification: Webhook
):
job_name = job.settings.name
existing_settings = job.settings
existing_webhooks = existing_settings.webhook_notifications
if existing_webhooks is None:
updated_webhooks = WebhookNotifications(on_failure=[mcd_notification])
else:
updated_webhooks = existing_webhooks
if mcd_notification not in updated_webhooks.on_failure:
updated_webhooks.on_failure.append(mcd_notification)
else:
click.echo(f"The Monte Carlo webhook is already configured for {job_name}")
return
new_settings = existing_settings
new_settings.webhook_notifications = updated_webhooks
try:
workspace_client.jobs.update(job_id=job.job_id, new_settings=new_settings)
click.echo(f"Successfully added the Monte Carlo webhook to {job_name}")
except Exception as e:
click.echo(
f"Failed to add the Monte Carlo webhook to {job_name} due to {str(e)}",
err=True,
)
def _add_can_view_permissions(
workspace_client: WorkspaceClient, job: Job, mcd_service_principal: ServicePrincipal
):
job_name = job.settings.name
permission_to_add = JobAccessControlRequest(
service_principal_name=mcd_service_principal.application_id,
permission_level=JobPermissionLevel.CAN_VIEW,
)
try:
workspace_client.jobs.update_permissions(
job_id=job.job_id, access_control_list=[permission_to_add]
)
click.echo(
f"Successfully gave Can View permissions to the Monte Carlo Service Principal for {job_name}"
)
except Exception as e:
click.echo(
f"Failed to give Can Veiw permissions the Monte Carlo Service Principal for {job_name} due to {str(e)}",
err=True,
)
if __name__ == "__main__":
enable_monte_carlo_databricks_job_incidents()