Skip to content

Commit

Permalink
feat: refractored SQL-based alerting framework (apache#10605)
Browse files Browse the repository at this point in the history
* added new tables for alerting refractor

* reformatted inheritance structure

* added workflow for updated framework

* added suggested changes

* cleaned up changes

* added obervations to alert table to enable view

* added comments

* added requested changes

* fix tests

* added styling changes

* mypy

* added requested changes

* updated operator logic

* requested changes, 1 validator, styling changes

* refactored tests

* fix test alert workflow

* fixed create_alert in test

Co-authored-by: Jason Davis <@dropbox.com>
  • Loading branch information
JasonD28 authored and auxten committed Nov 20, 2020
1 parent 8a96bd0 commit 6a62297
Show file tree
Hide file tree
Showing 12 changed files with 944 additions and 176 deletions.
11 changes: 10 additions & 1 deletion superset/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,13 @@ def init_views(self) -> None:
from superset.datasets.api import DatasetRestApi
from superset.queries.api import QueryRestApi
from superset.views.access_requests import AccessRequestsModelView
from superset.views.alerts import AlertLogModelView, AlertModelView
from superset.views.alerts import (
AlertLogModelView,
AlertModelView,
AlertObservationModelView,
ValidatorInlineView,
SQLObserverInlineView,
)
from superset.views.annotations import (
AnnotationLayerModelView,
AnnotationModelView,
Expand Down Expand Up @@ -399,6 +405,9 @@ def init_views(self) -> None:
category_label=__("Manage"),
icon="fa-exclamation-triangle",
)
appbuilder.add_view_no_menu(SQLObserverInlineView)
appbuilder.add_view_no_menu(ValidatorInlineView)
appbuilder.add_view_no_menu(AlertObservationModelView)
appbuilder.add_view_no_menu(AlertLogModelView)

#
Expand Down
125 changes: 125 additions & 0 deletions superset/migrations/versions/2e5a0ee25ed4_refractor_alerting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""refractor_alerting
Revision ID: 2e5a0ee25ed4
Revises: f80a3b88324b
Create Date: 2020-08-31 20:30:30.781478
"""

# revision identifiers, used by Alembic.
revision = "2e5a0ee25ed4"
down_revision = "f80a3b88324b"

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import mysql


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"alert_validators",
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.Column("changed_on", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("validator_type", sa.String(length=100), nullable=False),
sa.Column("config", sa.Text(), nullable=True),
sa.Column("created_by_fk", sa.Integer(), nullable=True),
sa.Column("changed_by_fk", sa.Integer(), nullable=True),
sa.Column("alert_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"],),
sa.ForeignKeyConstraint(["changed_by_fk"], ["ab_user.id"],),
sa.ForeignKeyConstraint(["created_by_fk"], ["ab_user.id"],),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"sql_observers",
sa.Column("created_on", sa.DateTime(), nullable=True),
sa.Column("changed_on", sa.DateTime(), nullable=True),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("sql", sa.Text(), nullable=False),
sa.Column("created_by_fk", sa.Integer(), nullable=True),
sa.Column("changed_by_fk", sa.Integer(), nullable=True),
sa.Column("alert_id", sa.Integer(), nullable=False),
sa.Column("database_id", sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"],),
sa.ForeignKeyConstraint(["changed_by_fk"], ["ab_user.id"],),
sa.ForeignKeyConstraint(["created_by_fk"], ["ab_user.id"],),
sa.ForeignKeyConstraint(["database_id"], ["dbs.id"],),
sa.PrimaryKeyConstraint("id"),
)
op.create_table(
"sql_observations",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("dttm", sa.DateTime(), nullable=True),
sa.Column("observer_id", sa.Integer(), nullable=False),
sa.Column("alert_id", sa.Integer(), nullable=True),
sa.Column("value", sa.Float(), nullable=True),
sa.Column("error_msg", sa.String(length=500), nullable=True),
sa.ForeignKeyConstraint(["alert_id"], ["alerts.id"],),
sa.ForeignKeyConstraint(["observer_id"], ["sql_observers.id"],),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_sql_observations_dttm"), "sql_observations", ["dttm"], unique=False
)

with op.batch_alter_table("alerts") as batch_op:
batch_op.add_column(sa.Column("changed_by_fk", sa.Integer(), nullable=True))
batch_op.add_column(sa.Column("changed_on", sa.DateTime(), nullable=True))
batch_op.add_column(sa.Column("created_by_fk", sa.Integer(), nullable=True))
batch_op.add_column(sa.Column("created_on", sa.DateTime(), nullable=True))
batch_op.alter_column(
"crontab", existing_type=mysql.VARCHAR(length=50), nullable=False
)
batch_op.create_foreign_key(
"alerts_ibfk_3", "ab_user", ["changed_by_fk"], ["id"]
)
batch_op.create_foreign_key(
"alerts_ibfk_4", "ab_user", ["created_by_fk"], ["id"]
)
batch_op.drop_column("sql")
batch_op.drop_column("database_id")
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table("alerts") as batch_op:
batch_op.add_column(
sa.Column(
"database_id", mysql.INTEGER(), autoincrement=False, nullable=False
)
)
batch_op.add_column(sa.Column("sql", mysql.TEXT(), nullable=True))
batch_op.drop_constraint("alerts_ibfk_3", type_="foreignkey")
batch_op.drop_constraint("alerts_ibfk_4", type_="foreignkey")
batch_op.alter_column(
"crontab", existing_type=mysql.VARCHAR(length=50), nullable=True
)
batch_op.drop_column("created_on")
batch_op.drop_column("created_by_fk")
batch_op.drop_column("changed_on")
batch_op.drop_column("changed_by_fk")

op.drop_index(op.f("ix_sql_observations_dttm"), table_name="sql_observations")
op.drop_table("sql_observations")
op.drop_table("sql_observers")
op.drop_table("alert_validators")
# ### end Alembic commands ###
127 changes: 114 additions & 13 deletions superset/models/alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,27 @@
# specific language governing permissions and limitations
# under the License.
"""Models for scheduled execution of jobs"""
import textwrap
from datetime import datetime
from typing import Any, Optional

from flask_appbuilder import Model
from sqlalchemy import (
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Integer,
String,
Table,
Text,
)
from sqlalchemy.orm import backref, relationship
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import backref, relationship, RelationshipProperty

from superset import security_manager
from superset import db, security_manager
from superset.models.helpers import AuditMixinNullable

metadata = Model.metadata # pylint: disable=no-member

Expand All @@ -44,23 +49,23 @@
)


class Alert(Model):
class Alert(Model, AuditMixinNullable):

"""Schedules for emailing slices / dashboards"""

__tablename__ = "alerts"

id = Column(Integer, primary_key=True)
label = Column(String(150))
label = Column(String(150), nullable=False)
active = Column(Boolean, default=True, index=True)
crontab = Column(String(50))
sql = Column(Text)
crontab = Column(String(50), nullable=False)

alert_type = Column(String(50))
owners = relationship(security_manager.user_model, secondary=alert_owner)
recipients = Column(Text)
slack_channel = Column(Text)

# TODO: implement log_retention
log_retention = Column(Integer, default=90)
grace_period = Column(Integer, default=60 * 60 * 24)

Expand All @@ -70,13 +75,6 @@ class Alert(Model):
dashboard_id = Column(Integer, ForeignKey("dashboards.id"))
dashboard = relationship("Dashboard", backref="alert", foreign_keys=[dashboard_id])

database_id = Column(Integer, ForeignKey("dbs.id"), nullable=False)
database = relationship(
"Database",
foreign_keys=[database_id],
backref=backref("alerts", cascade="all, delete-orphan"),
)

last_eval_dttm = Column(DateTime, default=datetime.utcnow)
last_state = Column(String(10))

Expand All @@ -100,3 +98,106 @@ class AlertLog(Model):
@property
def duration(self) -> int:
return (self.dttm_end - self.dttm_start).total_seconds()


# TODO: Currently SQLObservation table will constantly grow with no limit,
# add some retention restriction or more to a more scalable db e.g.
# https://github.com/apache/incubator-superset/blob/master/superset/utils/log.py#L32
class SQLObserver(Model, AuditMixinNullable):
"""Runs SQL-based queries for alerts"""

__tablename__ = "sql_observers"

id = Column(Integer, primary_key=True)
sql = Column(Text, nullable=False)

@declared_attr
def alert_id(self) -> int:
return Column(Integer, ForeignKey("alerts.id"), nullable=False)

@declared_attr
def alert(self) -> RelationshipProperty:
return relationship(
"Alert",
foreign_keys=[self.alert_id],
backref=backref("sql_observer", cascade="all, delete-orphan"),
)

@declared_attr
def database_id(self) -> int:
return Column(Integer, ForeignKey("dbs.id"), nullable=False)

@declared_attr
def database(self) -> RelationshipProperty:
return relationship(
"Database",
foreign_keys=[self.database_id],
backref=backref("sql_observers", cascade="all, delete-orphan"),
)

def get_last_observation(self) -> Optional[Any]:
observations = list(
db.session.query(SQLObservation)
.filter_by(observer_id=self.id)
.order_by(SQLObservation.dttm.desc())
.limit(1)
)

if observations:
return observations[0]

return None


class SQLObservation(Model): # pylint: disable=too-few-public-methods
"""Keeps track of values retrieved from SQLObservers"""

__tablename__ = "sql_observations"

id = Column(Integer, primary_key=True)
dttm = Column(DateTime, default=datetime.utcnow, index=True)
observer_id = Column(Integer, ForeignKey("sql_observers.id"), nullable=False)
observer = relationship(
"SQLObserver",
foreign_keys=[observer_id],
backref=backref("observations", cascade="all, delete-orphan"),
)
alert_id = Column(Integer, ForeignKey("alerts.id"))
alert = relationship(
"Alert",
foreign_keys=[alert_id],
backref=backref("observations", cascade="all, delete-orphan"),
)
value = Column(Float)
error_msg = Column(String(500))


class Validator(Model, AuditMixinNullable):
"""Used to determine how an alert and its observations should be validated"""

__tablename__ = "alert_validators"

id = Column(Integer, primary_key=True)
validator_type = Column(String(100), nullable=False)
config = Column(
Text,
default=textwrap.dedent(
"""
{
}
"""
),
)

@declared_attr
def alert_id(self) -> int:
return Column(Integer, ForeignKey("alerts.id"), nullable=False)

@declared_attr
def alert(self) -> RelationshipProperty:
return relationship(
"Alert",
foreign_keys=[self.alert_id],
backref=backref("validators", cascade="all, delete-orphan"),
)
17 changes: 17 additions & 0 deletions superset/tasks/alerts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit 6a62297

Please sign in to comment.