Skip to content

Commit

Permalink
Add sentry sensitive data scrubbing.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro Rezchykov committed Dec 9, 2021
1 parent df1d745 commit f3165fd
Show file tree
Hide file tree
Showing 7 changed files with 388 additions and 18 deletions.
18 changes: 14 additions & 4 deletions airbyte-cdk/python/airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
import os.path
import sys
import tempfile
from typing import Iterable, List
from typing import Any, Dict, Iterable, List

from airbyte_cdk.logger import AirbyteLogFormatter, init_logger
from airbyte_cdk.models import AirbyteMessage, Status, Type
from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification
from airbyte_cdk.sources import Source
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, get_secret_values, split_config
from airbyte_cdk.sources.utils.sentry import AirbyteSentry
from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets

Expand Down Expand Up @@ -60,14 +61,23 @@ def parse_args(args: List[str]) -> argparse.Namespace:

return main_parser.parse_args(args)

def configure_sentry(self, spec_schema: Dict[str, Any], parsed_args: argparse.Namespace):
secret_values = []
if "config" in parsed_args:
config = self.source.read_config(parsed_args.config)
secret_values = get_secret_values(spec_schema, config)
source_name = self.source.__module__.split(".")[0]
source_name = source_name.split("_", 1)[-1]
AirbyteSentry.init(source_tag=source_name, secret_values=secret_values)

def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
cmd = parsed_args.command
if not cmd:
raise Exception("No command passed")

# todo: add try catch for exceptions with different exit codes
source_spec = self.source.spec(self.logger)

source_spec: ConnectorSpecification = self.source.spec(self.logger)
self.configure_sentry(source_spec.connectionSpecification, parsed_args)
with tempfile.TemporaryDirectory() as temp_dir:
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str,
Unexpected persistent exceptions are not handled and will cause the sync to fail.
"""
AirbyteSentry.add_breadcrumb(message=f"Issue {request.url}", data=request_kwargs)
with AirbyteSentry.start_transaction_span(op="_send"):
with AirbyteSentry.start_transaction_span(op="_send", description=request.url):
response: requests.Response = self._session.send(request, **request_kwargs)

if self.should_retry(response):
Expand Down
32 changes: 31 additions & 1 deletion airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
import json
import os
import pkgutil
from typing import Any, ClassVar, Dict, Mapping, Tuple
from typing import Any, ClassVar, Dict, List, Mapping, Set, Tuple

import dpath.util
import jsonref
from airbyte_cdk.models import ConnectorSpecification
from jsonschema import validate
Expand Down Expand Up @@ -144,3 +145,32 @@ def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]:
else:
main_config[k] = v
return main_config, InternalConfig.parse_obj(internal_config)


def get_secret_values(schema: Mapping[str, Any], config: Mapping[str, Any]) -> List[str]:
def get_secret_pathes(schema: Mapping[str, Any]) -> Set[str]:
pathes = set()

def traverse_schema(schema: Any, path: List[str]):
if isinstance(schema, dict):
for k, v in schema.items():
traverse_schema(v, [*path, k])
elif isinstance(schema, list):
for i in schema:
traverse_schema(i, path)
else:
if path[-1] == "airbyte_secret" and schema is True:
path = "/".join([p for p in path[:-1] if p not in ["properties", "oneOf"]])
pathes.add(path)

traverse_schema(schema, [])
return pathes

secret_pathes = get_secret_pathes(schema)
result = []
for path in secret_pathes:
try:
result.append(dpath.util.get(config, path))
except KeyError:
pass
return result
154 changes: 152 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/utils/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,139 @@
#
import contextlib
import os
from typing import Any
import re
from typing import Any, Callable, List, Optional, Type, Union
from uuid import uuid4

import sentry_sdk
from sentry_sdk.integrations.atexit import AtexitIntegration
from sentry_sdk.integrations.excepthook import ExcepthookIntegration
from sentry_sdk.integrations.logging import LoggingIntegration


class AirbyteSentry:
"""
Class for working with sentry sdk. It provides methods to:
- init sentry sdk based on env variable
- add breadcrumbs and set context
- work with transactions and transaction spans
- set tag and capture message and capture exception
Also it implements client side sensitive data scrubbing.
"""

DSN_ENV_NAME = "SENTRY_DSN"
SECRET_MASK = "***"
# Maximum number of breadcrumbs to send on fail. Breadcrumbs is trail of
# events that occured before the fail and being sent to server only
# if handled or unhandled exception occured.
MAX_BREADCRUMBS = 30
# Event sending rate. could be from 0 (0%) to 1.0 (100 % events being sent
# to sentry server)
TRACES_SAMPLE_RATE = 1.0
SECRET_REGEXP = [
re.compile("(api_key=)[a-zA-Z0-9_]+"),
re.compile("(access_token=)[a-zA-Z0-9_]+"),
re.compile("(refresh_token=)[a-zA-Z0-9_]+"),
re.compile("(token )[a-zA-Z0-9_]+"),
re.compile("(Bearer )[a-zA-Z0-9_]+"),
]
SENSITIVE_KEYS = ["Authorization", "client_secret", "access_token"]

sentry_enabled = False
source_tag = ""
run_id = str(uuid4())
secret_values: List[str] = []

@classmethod
def init(cls, source_tag=None):
def process_value(cls, key: str, value: str):
"""
Process single value. Used by recursive replace_value method or
standalone for single value.
"""
for secret in cls.secret_values:
value = value.replace(secret, cls.SECRET_MASK)
if key in cls.SENSITIVE_KEYS:
return cls.SECRET_MASK
for regexp in cls.SECRET_REGEXP:
value = regexp.sub(f"\\1{cls.SECRET_MASK}", value)
return value

@classmethod
def replace_value(cls, key, value):
"""
Recursively scan event and replace all sensitive data with SECRET_MASK.
Perform inplace data replace i.e. its not creating new object.
"""
if isinstance(value, dict):
for k, v in value.items():
value[k] = cls.replace_value(k, v)
elif isinstance(value, list):
for index, v in enumerate(value):
value[index] = cls.replace_value(index, v)
elif isinstance(value, str):
return cls.process_value(key, value)
return value

@classmethod
def filter_event(cls, event, hint):
"""
Callback for before_send sentry hook.
"""
if "message" in event:
event["message"] = cls.process_value(None, event["message"])
cls.replace_value(None, event.get("exception"))
cls.replace_value(None, event.get("contexts"))
return event

@classmethod
def filter_breadcrumb(cls, event, hint):
"""
Callback for before_breadcrumb sentry hook.
"""
cls.replace_value(None, event)
return event

@classmethod
def init(
cls,
source_tag: str = None,
transport: Optional[Union[Type[sentry_sdk.transport.Transport], Callable[[Any], None]]] = None,
secret_values: List[str] = [],
):
"""
Read sentry data source name (DSN) from env variable and initialize sentry cdk.
Args:
source_tag: str - Source name to be used in "source" tag for events organazing.
transport: Transport or Callable - transport object for transfering
sentry event to remote server. Usually used for testing, by default
HTTP transport used
secret_values: List[str] - list of string that have to be filtered
out before sending event to sentry server.
"""
sentry_dsn = os.environ.get(cls.DSN_ENV_NAME)
if sentry_dsn:
cls.sentry_enabled = True
cls.secret_values = secret_values
sentry_sdk.init(
sentry_dsn,
max_breadcrumbs=cls.MAX_BREADCRUMBS,
traces_sample_rate=cls.TRACES_SAMPLE_RATE,
before_send=AirbyteSentry.filter_event,
before_breadcrumb=AirbyteSentry.filter_breadcrumb,
transport=transport,
# Use only limited list of integration cause sentry may send
# transaction events e.g. it could send httplib request with
# url and authorization info over StdlibIntegration and it
# would bypass before_send hook.
integrations=[
ExcepthookIntegration(always_run=True),
AtexitIntegration(),
LoggingIntegration(),
],
# Disable default integrations cause sentry does not allow to
# filter transactions event that could transfer sensitive data
default_integrations=False,
)
if source_tag:
sentry_sdk.set_tag("source", source_tag)
Expand Down Expand Up @@ -54,24 +164,64 @@ def wrapper(cls, *args, **kvargs):
@classmethod
@if_enabled
def set_tag(cls, tag_name: str, value: Any):
"""
Set tag that is handy for events organazing and filtering by sentry UI.
"""
sentry_sdk.set_tag(tag_name, value)

@classmethod
@if_enabled
def add_breadcrumb(cls, message, data=None):
"""
Add sentry breadcrumb.
"""
sentry_sdk.add_breadcrumb(message=message, data=data)

@classmethod
@if_enabled
def set_context(cls, name, data):
# Global context being used by transaction event as well. Since we cant
# filter senstitve data coming from transaction event using sentry
# before_event hook, apply filter to context here.
cls.replace_value(None, data)
sentry_sdk.set_context(name, data)

@classmethod
@if_enabled
def capture_message(cls, message):
"""
Send message event to sentry.
"""
sentry_sdk.capture_message(message)

@classmethod
@if_enabled
def capture_exception(
cls,
error: Optional[BaseException] = None,
scope: Optional[Any] = None,
**scope_args,
):
"""
Report handled execption to sentry.
"""
sentry_sdk.capture_exception(error, scope=scope, **scope_args)

@classmethod
@if_enabled_else(contextlib.nullcontext())
def start_transaction(cls, op, name=None):
"""
Return context manager for starting sentry transaction for performance monitoring.
"""
return sentry_sdk.start_transaction(op=op, name=f"{cls.source_tag}.{name}")

@classmethod
@if_enabled_else(contextlib.nullcontext())
def start_transaction_span(cls, op, description=None):
"""
Return context manager for starting sentry transaction span inside existing sentry transaction.
"""
# Apply filter to description since we cannot use before_send sentry
# hook for transaction event.
description = cls.replace_value(None, description)
return sentry_sdk.start_span(op=op, description=description)
3 changes: 2 additions & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
packages=find_packages(exclude=("unit_tests",)),
install_requires=[
"backoff",
"dpath==2.0.1",
"jsonschema~=3.2.0",
"jsonref~=0.2",
"pendulum",
Expand All @@ -57,7 +58,7 @@
],
python_requires=">=3.7.0",
extras_require={
"dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"],
"dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock", "pytest-httpserver"],
"sphinx-docs": [
"Sphinx~=4.2",
"sphinx-rtd-theme~=1.0",
Expand Down
Loading

0 comments on commit f3165fd

Please sign in to comment.