Skip to content

Commit

Permalink
feat(celery_worker): add newrelic headers for distributed tracing
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas FION <[email protected]>
  • Loading branch information
ftom committed Jul 15, 2022
1 parent 80ea346 commit 4ebabb3
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 6 deletions.
3 changes: 1 addition & 2 deletions requirements/development.txt
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ newrelic==7.14.0.177
prophet==1.1
httpx==0.23.0
gevent==21.12.0
snowflake-connector-python==2.7.8
snowflake-sqlalchemy==1.2.5
snowflake-sqlalchemy==1.2.4
# The following packages are considered to be unsafe in a requirements file:
# setuptools
8 changes: 6 additions & 2 deletions superset/charts/data/commands/create_async_job_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
# specific language governing permissions and limitations
# under the License.
import logging
from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

import newrelic.agent
from flask import Request

from superset.extensions import async_query_manager
Expand All @@ -33,6 +34,9 @@ def validate(self, request: Request) -> None:
self._async_channel_id = jwt_data["channel"]

def run(self, form_data: Dict[str, Any], user_id: Optional[str]) -> Dict[str, Any]:
headers: List[str] = []
newrelic.agent.insert_distributed_trace_headers(headers)

job_metadata = async_query_manager.init_job(self._async_channel_id, user_id)
load_chart_data_into_cache.delay(job_metadata, form_data)
load_chart_data_into_cache.delay(job_metadata, form_data, headers=headers)
return job_metadata
18 changes: 17 additions & 1 deletion superset/tasks/async_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@

import copy
import logging
from typing import Any, cast, Dict, Optional, TYPE_CHECKING
from typing import Any, cast, Dict, List, Optional, TYPE_CHECKING

import newrelic.agent
from celery.exceptions import SoftTimeLimitExceeded
from flask import current_app, g
from marshmallow import ValidationError
Expand Down Expand Up @@ -66,14 +67,22 @@ def _create_query_context_from_form(form_data: Dict[str, Any]) -> QueryContext:


@celery_app.task(name="load_chart_data_into_cache", soft_time_limit=query_timeout)
@newrelic.agent.background_task()
def load_chart_data_into_cache(
job_metadata: Dict[str, Any],
form_data: Dict[str, Any],
headers: Optional[List[str]] = None,
) -> None:
# pylint: disable=import-outside-toplevel
from superset.charts.data.commands.get_data_command import ChartDataCommand

try:
# Add NewRelic traces
if headers:
newrelic.agent.accept_distributed_trace_headers(
headers, transport_type="Queue"
)

ensure_user_is_set(job_metadata.get("user_id"))
set_form_data(form_data)
query_context = _create_query_context_from_form(form_data)
Expand All @@ -100,14 +109,21 @@ def load_chart_data_into_cache(


@celery_app.task(name="load_explore_json_into_cache", soft_time_limit=query_timeout)
@newrelic.agent.background_task()
def load_explore_json_into_cache( # pylint: disable=too-many-locals
job_metadata: Dict[str, Any],
form_data: Dict[str, Any],
response_type: Optional[str] = None,
force: bool = False,
headers: Optional[List[str]] = None,
) -> None:
cache_key_prefix = "ejr-" # ejr: explore_json request
try:
if headers:
newrelic.agent.accept_distributed_trace_headers(
headers, transport_type="Queue"
)

ensure_user_is_set(job_metadata.get("user_id"))
set_form_data(form_data)
datasource_id, datasource_type = get_datasource_info(None, None, form_data)
Expand Down
5 changes: 4 additions & 1 deletion superset/views/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import backoff
import humanize
import newrelic.agent
import pandas as pd
import simplejson as json
from flask import abort, flash, g, Markup, redirect, render_template, request, Response
Expand Down Expand Up @@ -674,8 +675,10 @@ def explore_json(
job_metadata = async_query_manager.init_job(
async_channel_id, g.user.get_id()
)
headers: List[str] = []
newrelic.agent.insert_distributed_trace_headers(headers)
load_explore_json_into_cache.delay(
job_metadata, form_data, response_type, force
job_metadata, form_data, response_type, force, headers
)
except AsyncQueryTokenException:
return json_error_response("Not authorized", 401)
Expand Down

0 comments on commit 4ebabb3

Please sign in to comment.