Skip to content

Commit

Permalink
perf: cache dashboard bootstrap data (apache#11234)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktmud authored and auxten committed Nov 20, 2020
1 parent 838c84c commit ff63083
Show file tree
Hide file tree
Showing 15 changed files with 286 additions and 126 deletions.
2 changes: 2 additions & 0 deletions superset/charts/commands/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from superset.commands.base import BaseCommand
from superset.dao.exceptions import DAODeleteFailedError
from superset.exceptions import SupersetSecurityException
from superset.models.dashboard import Dashboard
from superset.models.slice import Slice
from superset.views.base import check_ownership

Expand All @@ -44,6 +45,7 @@ def __init__(self, user: User, model_id: int):
def run(self) -> Model:
self.validate()
try:
Dashboard.clear_cache_for_slice(slice_id=self._model_id)
chart = ChartDAO.delete(self._model)
except DAODeleteFailedError as ex:
logger.exception(ex.exception)
Expand Down
2 changes: 2 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ def _try_json_readsha( # pylint: disable=unused-argument
"PRESTO_EXPAND_DATA": False,
# Exposes API endpoint to compute thumbnails
"THUMBNAILS": False,
"DASHBOARD_CACHE": False,
"REMOVE_SLICE_LEVEL_LABEL_COLORS": False,
"SHARE_QUERIES_VIA_KV_STORE": False,
"SIP_38_VIZ_REARCHITECTURE": False,
Expand Down Expand Up @@ -368,6 +369,7 @@ def _try_json_readsha( # pylint: disable=unused-argument
CACHE_DEFAULT_TIMEOUT = 60 * 60 * 24
CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "null"}
TABLE_NAMES_CACHE_CONFIG: CacheConfig = {"CACHE_TYPE": "null"}
DASHBOARD_CACHE_TIMEOUT = 60 * 60 * 24 * 365

# CORS Options
ENABLE_CORS = False
Expand Down
1 change: 0 additions & 1 deletion superset/connectors/druid/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from . import models, views
2 changes: 1 addition & 1 deletion superset/dashboards/dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def set_dash_metadata(
and obj["meta"]["chartId"]
):
chart_id = obj["meta"]["chartId"]
obj["meta"]["uuid"] = uuid_map[chart_id]
obj["meta"]["uuid"] = uuid_map.get(chart_id)

# remove leading and trailing white spaces in the dumped json
dashboard.position_json = json.dumps(
Expand Down
6 changes: 1 addition & 5 deletions superset/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@

from superset import app, db_engine_specs, is_feature_enabled, security_manager
from superset.db_engine_specs.base import TimeGrain
from superset.models.dashboard import Dashboard
from superset.models.helpers import AuditMixinNullable, ImportMixin
from superset.models.tags import DashboardUpdater, FavStarUpdater
from superset.models.tags import FavStarUpdater
from superset.result_set import SupersetResultSet
from superset.utils import cache as cache_util, core as utils

Expand Down Expand Up @@ -719,8 +718,5 @@ class FavStar(Model): # pylint: disable=too-few-public-methods

# events for updating tags
if is_feature_enabled("TAGGING_SYSTEM"):
sqla.event.listen(Dashboard, "after_insert", DashboardUpdater.after_insert)
sqla.event.listen(Dashboard, "after_update", DashboardUpdater.after_update)
sqla.event.listen(Dashboard, "after_delete", DashboardUpdater.after_delete)
sqla.event.listen(FavStar, "after_insert", FavStarUpdater.after_insert)
sqla.event.listen(FavStar, "after_delete", FavStarUpdater.after_delete)
139 changes: 119 additions & 20 deletions superset/models/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
import json
import logging
from copy import copy
from functools import partial
from json.decoder import JSONDecodeError
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING
from typing import Any, Callable, Dict, List, Optional, Set, Union
from urllib import parse

import sqlalchemy as sqla
Expand All @@ -40,8 +41,20 @@
from sqlalchemy.engine.base import Connection
from sqlalchemy.orm import relationship, sessionmaker, subqueryload
from sqlalchemy.orm.mapper import Mapper

from superset import app, ConnectorRegistry, db, is_feature_enabled, security_manager
from sqlalchemy.orm.session import object_session
from sqlalchemy.sql import join, select

from superset import (
app,
cache,
ConnectorRegistry,
db,
is_feature_enabled,
security_manager,
)
from superset.connectors.base.models import BaseDatasource
from superset.connectors.druid.models import DruidColumn, DruidMetric
from superset.connectors.sqla.models import SqlMetric, TableColumn
from superset.models.helpers import AuditMixinNullable, ImportMixin
from superset.models.slice import Slice
from superset.models.tags import DashboardUpdater
Expand All @@ -52,11 +65,9 @@
convert_filter_scopes,
copy_filter_scopes,
)
from superset.utils.decorators import debounce
from superset.utils.urls import get_url_path

if TYPE_CHECKING:
from superset.connectors.base.models import BaseDatasource

metadata = Model.metadata # pylint: disable=no-member
config = app.config
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -131,7 +142,7 @@ class Dashboard( # pylint: disable=too-many-instance-attributes
css = Column(Text)
json_metadata = Column(Text)
slug = Column(String(255), unique=True)
slices = relationship("Slice", secondary=dashboard_slices, backref="dashboards")
slices = relationship(Slice, secondary=dashboard_slices, backref="dashboards")
owners = relationship(security_manager.user_model, secondary=dashboard_user)
published = Column(Boolean, default=False)

Expand All @@ -145,7 +156,7 @@ class Dashboard( # pylint: disable=too-many-instance-attributes
]

def __repr__(self) -> str:
return self.dashboard_title or str(self.id)
return f"Dashboard<{self.slug or self.id}>"

@property
def table_names(self) -> str:
Expand Down Expand Up @@ -177,11 +188,11 @@ def url(self) -> str:
return url

@property
def datasources(self) -> Set[Optional["BaseDatasource"]]:
def datasources(self) -> Set[BaseDatasource]:
return {slc.datasource for slc in self.slices}

@property
def charts(self) -> List[Optional["BaseDatasource"]]:
def charts(self) -> List[BaseDatasource]:
return [slc.chart for slc in self.slices]

@property
Expand Down Expand Up @@ -240,6 +251,29 @@ def data(self) -> Dict[str, Any]:
"last_modified_time": self.changed_on.replace(microsecond=0).timestamp(),
}

@cache.memoize(
# manually maintain cache key version
make_name=lambda fname: f"{fname}-v1",
timeout=config["DASHBOARD_CACHE_TIMEOUT"],
unless=lambda: not is_feature_enabled("DASHBOARD_CACHE"),
)
def full_data(self) -> Dict[str, Any]:
"""Bootstrap data for rendering the dashboard page."""
slices = self.slices
datasource_slices = utils.indexed(slices, "datasource")
return {
# dashboard metadata
"dashboard": self.data,
# slices metadata
"slices": [slc.data for slc in slices],
# datasource metadata
"datasources": {
# Filter out unneeded fields from the datasource payload
datasource.uid: datasource.data_for_slices(slices)
for datasource, slices in datasource_slices.items()
},
}

@property # type: ignore
def params(self) -> str: # type: ignore
return self.json_metadata
Expand All @@ -254,6 +288,39 @@ def position(self) -> Dict[str, Any]:
return json.loads(self.position_json)
return {}

def update_thumbnail(self) -> None:
url = get_url_path("Superset.dashboard", dashboard_id_or_slug=self.id)
cache_dashboard_thumbnail.delay(url, self.digest, force=True)

@debounce(0.1)
def clear_cache(self) -> None:
cache.delete_memoized(self.full_data)

@classmethod
@debounce(0.1)
def clear_cache_for_slice(cls, slice_id: int) -> None:
filter_query = select([dashboard_slices.c.dashboard_id], distinct=True).where(
dashboard_slices.c.slice_id == slice_id
)
for (dashboard_id,) in db.session.execute(filter_query):
cls(id=dashboard_id).clear_cache()

@classmethod
@debounce(0.1)
def clear_cache_for_datasource(cls, datasource_id: int) -> None:
filter_query = select(
[dashboard_slices.c.dashboard_id], distinct=True,
).select_from(
join(
Slice,
dashboard_slices,
Slice.id == dashboard_slices.c.slice_id,
Slice.datasource_id == datasource_id,
)
)
for (dashboard_id,) in db.session.execute(filter_query):
cls(id=dashboard_id).clear_cache()

@classmethod
def import_obj(
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
Expand Down Expand Up @@ -489,21 +556,53 @@ def export_dashboards( # pylint: disable=too-many-locals
)


def event_after_dashboard_changed(
_mapper: Mapper, _connection: Connection, target: Dashboard
) -> None:
url = get_url_path("Superset.dashboard", dashboard_id_or_slug=target.id)
cache_dashboard_thumbnail.delay(url, target.digest, force=True)

OnDashboardChange = Callable[[Mapper, Connection, Dashboard], Any]

# events for updating tags
if is_feature_enabled("TAGGING_SYSTEM"):
sqla.event.listen(Dashboard, "after_insert", DashboardUpdater.after_insert)
sqla.event.listen(Dashboard, "after_update", DashboardUpdater.after_update)
sqla.event.listen(Dashboard, "after_delete", DashboardUpdater.after_delete)


# events for updating tags
if is_feature_enabled("THUMBNAILS_SQLA_LISTENERS"):
sqla.event.listen(Dashboard, "after_insert", event_after_dashboard_changed)
sqla.event.listen(Dashboard, "after_update", event_after_dashboard_changed)
update_thumbnail: OnDashboardChange = lambda _, __, dash: dash.update_thumbnail()
sqla.event.listen(Dashboard, "after_insert", update_thumbnail)
sqla.event.listen(Dashboard, "after_update", update_thumbnail)

if is_feature_enabled("DASHBOARD_CACHE"):

def clear_dashboard_cache(
_mapper: Mapper,
_connection: Connection,
obj: Union[Slice, BaseDatasource, Dashboard],
check_modified: bool = True,
) -> None:
if check_modified and not object_session(obj).is_modified(obj):
# needed for avoiding excessive cache purging when duplicating a dashboard
return
if isinstance(obj, Dashboard):
obj.clear_cache()
elif isinstance(obj, Slice):
Dashboard.clear_cache_for_slice(slice_id=obj.id)
elif isinstance(obj, BaseDatasource):
Dashboard.clear_cache_for_datasource(datasource_id=obj.id)
elif isinstance(obj, (SqlMetric, TableColumn)):
Dashboard.clear_cache_for_datasource(datasource_id=obj.table_id)
elif isinstance(obj, (DruidMetric, DruidColumn)):
Dashboard.clear_cache_for_datasource(datasource_id=obj.datasource_id)

sqla.event.listen(Dashboard, "after_update", clear_dashboard_cache)
sqla.event.listen(
Dashboard, "after_delete", partial(clear_dashboard_cache, check_modified=False)
)
sqla.event.listen(Slice, "after_update", clear_dashboard_cache)
sqla.event.listen(Slice, "after_delete", clear_dashboard_cache)
sqla.event.listen(
BaseDatasource, "after_update", clear_dashboard_cache, propagage=True
)
# also clear cache on column/metric updates since updates to these will not
# trigger update events for BaseDatasource.
sqla.event.listen(SqlMetric, "after_update", clear_dashboard_cache)
sqla.event.listen(TableColumn, "after_update", clear_dashboard_cache)
sqla.event.listen(DruidMetric, "after_update", clear_dashboard_cache)
sqla.event.listen(DruidColumn, "after_update", clear_dashboard_cache)
11 changes: 11 additions & 0 deletions superset/utils/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1418,6 +1418,17 @@ def get_column_names_from_metrics(metrics: List[Metric]) -> List[str]:
return columns


def indexed(
items: List[Any], key: Union[str, Callable[[Any], Any]]
) -> Dict[Any, List[Any]]:
"""Build an index for a list of objects"""
idx: Dict[Any, Any] = {}
for item in items:
key_ = getattr(item, key) if isinstance(key, str) else key(item)
idx.setdefault(key_, []).append(item)
return idx


class LenientEnum(Enum):
"""Enums that do not raise ValueError when value is invalid"""

Expand Down
39 changes: 38 additions & 1 deletion superset/utils/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
# specific language governing permissions and limitations
# under the License.
import logging
import time
from datetime import datetime, timedelta
from functools import wraps
from typing import Any, Callable, Iterator
from typing import Any, Callable, Dict, Iterator, Union

from contextlib2 import contextmanager
from flask import request
Expand Down Expand Up @@ -123,3 +124,39 @@ def wrapper(*args: Any, **kwargs: Any) -> ETagResponseMixin:
return wrapper

return decorator


def arghash(args: Any, kwargs: Dict[str, Any]) -> int:
"""Simple argument hash with kwargs sorted."""
sorted_args = tuple(
x if hasattr(x, "__repr__") else x for x in [*args, *sorted(kwargs.items())]
)
return hash(sorted_args)


def debounce(duration: Union[float, int] = 0.1) -> Callable[..., Any]:
"""Ensure a function called with the same arguments executes only once
per `duration` (default: 100ms).
"""

def decorate(f: Callable[..., Any]) -> Callable[..., Any]:
last: Dict[str, Any] = {"t": None, "input": None, "output": None}

def wrapped(*args: Any, **kwargs: Any) -> Any:
now = time.time()
updated_hash = arghash(args, kwargs)
if (
last["t"] is None
or now - last["t"] >= duration
or last["input"] != updated_hash
):
result = f(*args, **kwargs)
last["t"] = time.time()
last["input"] = updated_hash
last["output"] = result
return result
return last["output"]

return wrapped

return decorate
Loading

0 comments on commit ff63083

Please sign in to comment.