Skip to content

Commit

Permalink
Update lp_notifications.py (flyteorg#212)
Browse files Browse the repository at this point in the history
* Update lp_notifications.py

* update lp_notifications.py (flyteorg#215)

Signed-off-by: Samhita Alla <[email protected]>

Co-authored-by: Samhita Alla <[email protected]>
  • Loading branch information
SandraGH5 and samhita-alla authored May 19, 2021
1 parent 9e999fd commit 773a541
Showing 1 changed file with 99 additions and 21 deletions.
120 changes: 99 additions & 21 deletions cookbook/deployment/workflow/lp_notifications.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,29 @@
"""
Getting notifications on workflow termination
-------------------------------------------------
For background on launch plans, refer to :any:`launch_plans`.
For up-to-date documentation on notifications see the `official docs <https://lyft.github.io/flyte/user/features/notifications.html>`_
###############################################
Receiving Notifications on Workflow Termination
###############################################
"""

# %%
# Let's consider the following example workflow:
# When a workflow is completed, users can be notified through:
#
# * email
# * `pagerduty <https://www.pagerduty.com/>`__
# * `slack <https://slack.com/>`__
#
# The content of these notifications is configurable at the platform level.
#
# ************
# Code Example
# ************
#
# When a workflow reaches a specified `terminal workflow execution phase <https://github.com/flyteorg/flytekit/blob/v0.16.0b7/flytekit/core/notification.py#L10,L15>`__,
# the :py:class:`flytekit:flytekit.Email`, :py:class:`flytekit:flytekit.PagerDuty`, or :py:class:`flytekit:flytekit.Slack`
# objects can be used in the construction of a :py:class:`flytekit:flytekit.LaunchPlan`.

# %%
# Consider the following example workflow:
from flytekit import Email, LaunchPlan, task, workflow
from flytekit.models.core.execution import WorkflowExecutionPhase

Expand All @@ -25,11 +38,13 @@ def int_doubler_wf(a: int) -> str:
doubled = double_int_and_print(a=a)
return doubled


# This launch plan triggers email notifications when the workflow execution it triggered reaches the phase `SUCCEEDED`.
int_doubler_wf_lp = LaunchPlan.create(
"int_doubler_wf",
int_doubler_wf,
# %%
# Here are three scenarios that can help deepen your understanding of how notifications work:
#
# 1. Launch Plan triggers email notifications when the workflow execution reaches the ``SUCCEEDED`` phase.
int_doubler_wf_lp = LaunchPlan.get_or_create(
name="int_doubler_wf",
workflow=int_doubler_wf,
default_inputs={"a": 4},
notifications=[
Email(
Expand All @@ -40,14 +55,14 @@ def int_doubler_wf(a: int) -> str:
)

# %%
# Notifications shine when used for scheduled workflows to alert on failures:
# 2. Notifications shine when used for scheduled workflows to alert for failures.
from datetime import timedelta

from flytekit import FixedRate, PagerDuty

int_doubler_wf_scheduled_lp = LaunchPlan.create(
"int_doubler_wf_scheduled",
int_doubler_wf,
int_doubler_wf_scheduled_lp = LaunchPlan.get_or_create(
name="int_doubler_wf_scheduled",
workflow=int_doubler_wf,
default_inputs={"a": 4},
notifications=[
PagerDuty(
Expand All @@ -60,12 +75,12 @@ def int_doubler_wf(a: int) -> str:


# %%
# If you desire you can combine notifications with different permutations of terminal phases and recipient targets:
# 3. Notifications can be combined with different permutations of terminal phases and recipient targets.
from flytekit import Slack

wacky_int_doubler_lp = LaunchPlan.create(
"wacky_int_doubler",
int_doubler_wf,
wacky_int_doubler_lp = LaunchPlan.get_or_create(
name="wacky_int_doubler",
workflow=int_doubler_wf,
default_inputs={"a": 4},
notifications=[
Email(
Expand All @@ -86,3 +101,66 @@ def int_doubler_wf(a: int) -> str:
),
],
)

# %%
# Future work
# ===========
# Work is ongoing to support a generic event egress system that can be used to publish events for tasks, workflows, and
# workflow nodes. When this is complete, generic event subscribers can asynchronously process these events for a rich
# and fully customizable experience.
#
# ******************************
# Platform Configuration Changes
# ******************************
#
# The ``notifications`` top-level portion of the Flyteadmin config specifies how to handle notifications.
#
# As in schedules, the handling of notifications is composed of two parts— one part handles enqueuing notifications asynchronously. The other part handles processing pending notifications and sends out emails and alerts.
#
# This is only supported for Flyte instances running on AWS.
#
# Config
# ======
#
# To publish notifications, you'll need to set up an `SNS topic <https://aws.amazon.com/sns/?whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc>`_.
#
# To process notifications, you'll need to set up an `AWS SQS <https://aws.amazon.com/sqs/>`_ queue to consume notification events. This queue must be configured as a subscription to your SNS topic you created above.
#
# To publish notifications, you'll need a `verified SES email address <https://docs.aws.amazon.com/ses/latest/DeveloperGuide/verify-addresses-and-domains.html>`_ which will be used to send notification emails and alerts using email APIs.
#
# The role you use to run Flyteadmin must have permissions to read and write to your SNS topic and SQS queue.
#
# Let's look into the following config section and explain what each value represents:
#
# .. code-block:: bash
#
# notifications:
# type: "aws"
# region: "us-east-1"
# publisher:
# topicName: "arn:aws:sns:us-east-1:{{ YOUR ACCOUNT ID }}:{{ YOUR TOPIC }}"
# processor:
# queueName: "{{ YOUR QUEUE NAME }}"
# accountId: "{{ YOUR ACCOUNT ID }}"
# emailer:
# subject: "Notice: Execution \"{{ workflow.name }}\" has {{ phase }} in \"{{ domain }}\"."
# sender: "[email protected]"
# body: >
# Execution \"{{ workflow.name }} [{{ name }}]\" has {{ phase }} in \"{{ domain }}\". View details at
# <a href=\http://flyte.company.com/console/projects/{{ project }}/domains/{{ domain }}/executions/{{ name }}>
# http://flyte.company.com/console/projects/{{ project }}/domains/{{ domain }}/executions/{{ name }}</a>. {{ error }}
#
# * **type**: AWS is the only cloud back-end supported for executing scheduled workflows; hence ``"aws"`` is the only valid value. By default, the no-op executor is used.
# * **region**: Specifies the region AWS clients should use when creating SNS and SQS clients.
# * **publisher**: Handles pushing notification events to your SNS topic.
# * **topicName**: This is the arn of your SNS topic.
# * **processor**: Handles recording notification events and enqueueing them to be processed asynchronously.
# * **queueName**: Name of the SQS queue which will capture pending notification events.
# * **accountId**: AWS `account id <https://docs.aws.amazon.com/IAM/latest/UserGuide/console_account-alias.html#FindingYourAWSId>`_.
# * **emailer**: Encloses config details for sending and formatting emails used as notifications.
# * **subject**: Configurable subject line used in notification emails.
# * **sender**: Your verified SES email sender.
# * **body**: Configurable email body used in notifications.
#
# The complete set of parameters that can be used for email templating are checked in `here <https://github.com/flyteorg/flyteadmin/blob/a84223dab00dfa52d8ba1ed2d057e77b6c6ab6a7/pkg/async/notifications/email.go#L18,L30>`_.

0 comments on commit 773a541

Please sign in to comment.