From f3165fd71d23ed9e3e80224fd7f74c765334d1cb Mon Sep 17 00:00:00 2001 From: Dmytro Rezchykov Date: Thu, 9 Dec 2021 11:17:55 +0200 Subject: [PATCH] Add sentry sensitive data scrubbing. --- airbyte-cdk/python/airbyte_cdk/entrypoint.py | 18 +- .../airbyte_cdk/sources/streams/http/http.py | 2 +- .../sources/utils/schema_helpers.py | 32 +++- .../airbyte_cdk/sources/utils/sentry.py | 154 +++++++++++++++++- airbyte-cdk/python/setup.py | 3 +- .../sources/utils/test_schema_helpers.py | 117 ++++++++++++- .../unit_tests/sources/utils/test_sentry.py | 80 +++++++++ 7 files changed, 388 insertions(+), 18 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index 736640c8e13b..493911b3293e 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -9,12 +9,13 @@ import os.path import sys import tempfile -from typing import Iterable, List +from typing import Any, Dict, Iterable, List from airbyte_cdk.logger import AirbyteLogFormatter, init_logger from airbyte_cdk.models import AirbyteMessage, Status, Type +from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, get_secret_values, split_config from airbyte_cdk.sources.utils.sentry import AirbyteSentry from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets @@ -60,14 +61,23 @@ def parse_args(args: List[str]) -> argparse.Namespace: return main_parser.parse_args(args) + def configure_sentry(self, spec_schema: Dict[str, Any], parsed_args: argparse.Namespace): + secret_values = [] + if "config" in parsed_args: + config = self.source.read_config(parsed_args.config) + secret_values = get_secret_values(spec_schema, config) + source_name = self.source.__module__.split(".")[0] + source_name = source_name.split("_", 1)[-1] + AirbyteSentry.init(source_tag=source_name, secret_values=secret_values) + def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: cmd = parsed_args.command if not cmd: raise Exception("No command passed") # todo: add try catch for exceptions with different exit codes - source_spec = self.source.spec(self.logger) - + source_spec: ConnectorSpecification = self.source.spec(self.logger) + self.configure_sentry(source_spec.connectionSpecification, parsed_args) with tempfile.TemporaryDirectory() as temp_dir: if cmd == "spec": message = AirbyteMessage(type=Type.SPEC, spec=source_spec) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index 841f6a17f898..1abab94799f5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -277,7 +277,7 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Unexpected persistent exceptions are not handled and will cause the sync to fail. """ AirbyteSentry.add_breadcrumb(message=f"Issue {request.url}", data=request_kwargs) - with AirbyteSentry.start_transaction_span(op="_send"): + with AirbyteSentry.start_transaction_span(op="_send", description=request.url): response: requests.Response = self._session.send(request, **request_kwargs) if self.should_retry(response): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index 903fdcf8c54a..a5e64f0f40d2 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -7,8 +7,9 @@ import json import os import pkgutil -from typing import Any, ClassVar, Dict, Mapping, Tuple +from typing import Any, ClassVar, Dict, List, Mapping, Set, Tuple +import dpath.util import jsonref from airbyte_cdk.models import ConnectorSpecification from jsonschema import validate @@ -144,3 +145,32 @@ def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]: else: main_config[k] = v return main_config, InternalConfig.parse_obj(internal_config) + + +def get_secret_values(schema: Mapping[str, Any], config: Mapping[str, Any]) -> List[str]: + def get_secret_pathes(schema: Mapping[str, Any]) -> Set[str]: + pathes = set() + + def traverse_schema(schema: Any, path: List[str]): + if isinstance(schema, dict): + for k, v in schema.items(): + traverse_schema(v, [*path, k]) + elif isinstance(schema, list): + for i in schema: + traverse_schema(i, path) + else: + if path[-1] == "airbyte_secret" and schema is True: + path = "/".join([p for p in path[:-1] if p not in ["properties", "oneOf"]]) + pathes.add(path) + + traverse_schema(schema, []) + return pathes + + secret_pathes = get_secret_pathes(schema) + result = [] + for path in secret_pathes: + try: + result.append(dpath.util.get(config, path)) + except KeyError: + pass + return result diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/sentry.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/sentry.py index 234e570332d8..2e5baa2507c7 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/sentry.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/sentry.py @@ -3,29 +3,139 @@ # import contextlib import os -from typing import Any +import re +from typing import Any, Callable, List, Optional, Type, Union from uuid import uuid4 import sentry_sdk +from sentry_sdk.integrations.atexit import AtexitIntegration +from sentry_sdk.integrations.excepthook import ExcepthookIntegration +from sentry_sdk.integrations.logging import LoggingIntegration class AirbyteSentry: + """ + Class for working with sentry sdk. It provides methods to: + - init sentry sdk based on env variable + - add breadcrumbs and set context + - work with transactions and transaction spans + - set tag and capture message and capture exception + Also it implements client side sensitive data scrubbing. + """ + DSN_ENV_NAME = "SENTRY_DSN" + SECRET_MASK = "***" + # Maximum number of breadcrumbs to send on fail. Breadcrumbs is trail of + # events that occured before the fail and being sent to server only + # if handled or unhandled exception occured. MAX_BREADCRUMBS = 30 + # Event sending rate. could be from 0 (0%) to 1.0 (100 % events being sent + # to sentry server) TRACES_SAMPLE_RATE = 1.0 + SECRET_REGEXP = [ + re.compile("(api_key=)[a-zA-Z0-9_]+"), + re.compile("(access_token=)[a-zA-Z0-9_]+"), + re.compile("(refresh_token=)[a-zA-Z0-9_]+"), + re.compile("(token )[a-zA-Z0-9_]+"), + re.compile("(Bearer )[a-zA-Z0-9_]+"), + ] + SENSITIVE_KEYS = ["Authorization", "client_secret", "access_token"] + sentry_enabled = False source_tag = "" run_id = str(uuid4()) + secret_values: List[str] = [] @classmethod - def init(cls, source_tag=None): + def process_value(cls, key: str, value: str): + """ + Process single value. Used by recursive replace_value method or + standalone for single value. + """ + for secret in cls.secret_values: + value = value.replace(secret, cls.SECRET_MASK) + if key in cls.SENSITIVE_KEYS: + return cls.SECRET_MASK + for regexp in cls.SECRET_REGEXP: + value = regexp.sub(f"\\1{cls.SECRET_MASK}", value) + return value + + @classmethod + def replace_value(cls, key, value): + """ + Recursively scan event and replace all sensitive data with SECRET_MASK. + Perform inplace data replace i.e. its not creating new object. + """ + if isinstance(value, dict): + for k, v in value.items(): + value[k] = cls.replace_value(k, v) + elif isinstance(value, list): + for index, v in enumerate(value): + value[index] = cls.replace_value(index, v) + elif isinstance(value, str): + return cls.process_value(key, value) + return value + + @classmethod + def filter_event(cls, event, hint): + """ + Callback for before_send sentry hook. + """ + if "message" in event: + event["message"] = cls.process_value(None, event["message"]) + cls.replace_value(None, event.get("exception")) + cls.replace_value(None, event.get("contexts")) + return event + + @classmethod + def filter_breadcrumb(cls, event, hint): + """ + Callback for before_breadcrumb sentry hook. + """ + cls.replace_value(None, event) + return event + + @classmethod + def init( + cls, + source_tag: str = None, + transport: Optional[Union[Type[sentry_sdk.transport.Transport], Callable[[Any], None]]] = None, + secret_values: List[str] = [], + ): + """ + Read sentry data source name (DSN) from env variable and initialize sentry cdk. + Args: + source_tag: str - Source name to be used in "source" tag for events organazing. + transport: Transport or Callable - transport object for transfering + sentry event to remote server. Usually used for testing, by default + HTTP transport used + secret_values: List[str] - list of string that have to be filtered + out before sending event to sentry server. + + """ sentry_dsn = os.environ.get(cls.DSN_ENV_NAME) if sentry_dsn: cls.sentry_enabled = True + cls.secret_values = secret_values sentry_sdk.init( sentry_dsn, max_breadcrumbs=cls.MAX_BREADCRUMBS, traces_sample_rate=cls.TRACES_SAMPLE_RATE, + before_send=AirbyteSentry.filter_event, + before_breadcrumb=AirbyteSentry.filter_breadcrumb, + transport=transport, + # Use only limited list of integration cause sentry may send + # transaction events e.g. it could send httplib request with + # url and authorization info over StdlibIntegration and it + # would bypass before_send hook. + integrations=[ + ExcepthookIntegration(always_run=True), + AtexitIntegration(), + LoggingIntegration(), + ], + # Disable default integrations cause sentry does not allow to + # filter transactions event that could transfer sensitive data + default_integrations=False, ) if source_tag: sentry_sdk.set_tag("source", source_tag) @@ -54,24 +164,64 @@ def wrapper(cls, *args, **kvargs): @classmethod @if_enabled def set_tag(cls, tag_name: str, value: Any): + """ + Set tag that is handy for events organazing and filtering by sentry UI. + """ sentry_sdk.set_tag(tag_name, value) @classmethod @if_enabled def add_breadcrumb(cls, message, data=None): + """ + Add sentry breadcrumb. + """ sentry_sdk.add_breadcrumb(message=message, data=data) @classmethod @if_enabled def set_context(cls, name, data): + # Global context being used by transaction event as well. Since we cant + # filter senstitve data coming from transaction event using sentry + # before_event hook, apply filter to context here. + cls.replace_value(None, data) sentry_sdk.set_context(name, data) + @classmethod + @if_enabled + def capture_message(cls, message): + """ + Send message event to sentry. + """ + sentry_sdk.capture_message(message) + + @classmethod + @if_enabled + def capture_exception( + cls, + error: Optional[BaseException] = None, + scope: Optional[Any] = None, + **scope_args, + ): + """ + Report handled execption to sentry. + """ + sentry_sdk.capture_exception(error, scope=scope, **scope_args) + @classmethod @if_enabled_else(contextlib.nullcontext()) def start_transaction(cls, op, name=None): + """ + Return context manager for starting sentry transaction for performance monitoring. + """ return sentry_sdk.start_transaction(op=op, name=f"{cls.source_tag}.{name}") @classmethod @if_enabled_else(contextlib.nullcontext()) def start_transaction_span(cls, op, description=None): + """ + Return context manager for starting sentry transaction span inside existing sentry transaction. + """ + # Apply filter to description since we cannot use before_send sentry + # hook for transaction event. + description = cls.replace_value(None, description) return sentry_sdk.start_span(op=op, description=description) diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 66269249f7aa..5f1efccdf524 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -46,6 +46,7 @@ packages=find_packages(exclude=("unit_tests",)), install_requires=[ "backoff", + "dpath==2.0.1", "jsonschema~=3.2.0", "jsonref~=0.2", "pendulum", @@ -57,7 +58,7 @@ ], python_requires=">=3.7.0", extras_require={ - "dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"], + "dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock", "pytest-httpserver"], "sphinx-docs": [ "Sphinx~=4.2", "sphinx-rtd-theme~=1.0", diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py index 17faad0ba3a4..3899141a21ba 100644 --- a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py @@ -12,9 +12,10 @@ from pathlib import Path import jsonref +import pytest from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification -from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit +from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit, get_secret_values from pytest import fixture from pytest import raises as pytest_raises @@ -80,7 +81,10 @@ def test_inline_schema_resolves(): "properties": { "str": {"type": "string"}, "int": {"type": "integer"}, - "obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + "obj": { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, }, } @@ -96,16 +100,26 @@ def test_shared_schemas_resolves(): "properties": { "str": {"type": "string"}, "int": {"type": "integer"}, - "obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + "obj": { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, }, } partial_schema = { "type": ["null", "object"], - "properties": {"str": {"type": "string"}, "int": {"type": "integer"}, "obj": {"$ref": "shared_schema.json"}}, + "properties": { + "str": {"type": "string"}, + "int": {"type": "integer"}, + "obj": {"$ref": "shared_schema.json"}, + }, } - referenced_schema = {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}} + referenced_schema = { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + } create_schema("complex_schema", partial_schema) create_schema("shared/shared_schema", referenced_schema) @@ -122,8 +136,19 @@ def test_shared_schemas_resolves_nested(): "properties": { "str": {"type": "string"}, "int": {"type": "integer"}, - "one_of": {"oneOf": [{"type": "string"}, {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}]}, - "obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + "one_of": { + "oneOf": [ + {"type": "string"}, + { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, + ] + }, + "obj": { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, }, } partial_schema = { @@ -131,7 +156,12 @@ def test_shared_schemas_resolves_nested(): "properties": { "str": {"type": "string"}, "int": {"type": "integer"}, - "one_of": {"oneOf": [{"type": "string"}, {"$ref": "shared_schema.json#/definitions/type_one"}]}, + "one_of": { + "oneOf": [ + {"type": "string"}, + {"$ref": "shared_schema.json#/definitions/type_one"}, + ] + }, "obj": {"$ref": "shared_schema.json#/definitions/type_one"}, }, } @@ -139,7 +169,10 @@ def test_shared_schemas_resolves_nested(): referenced_schema = { "definitions": { "type_one": {"$ref": "shared_schema.json#/definitions/type_nested"}, - "type_nested": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + "type_nested": { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, } } @@ -153,3 +186,69 @@ def test_shared_schemas_resolves_nested(): # Make sure generated schema is JSON serializable assert json.dumps(actual_schema) assert jsonref.JsonRef.replace_refs(actual_schema) + + +@pytest.mark.parametrize( + "schema,config,expected", + [ + ( + { + "type": "object", + "properties": { + "credentials": { + "type": "object", + "oneOf": [ + { + "type": "object", + "properties": { + "option_title": { + "type": "string", + "const": "OAuth Credentials", + } + }, + }, + { + "type": "object", + "properties": { + "option_title": {"type": "string"}, + "personal_access_token": { + "type": "string", + "airbyte_secret": True, + }, + }, + }, + ], + }, + "repository": {"type": "string"}, + "start_date": {"type": "string"}, + }, + }, + {"credentials": {"personal_access_token": "secret"}}, + ["secret"], + ), + ( + { + "type": "object", + "properties": { + "access_token": {"type": "string", "airbyte_secret": True}, + "whatever": {"type": "string", "airbyte_secret": False}, + }, + }, + {"access_token": "secret"}, + ["secret"], + ), + ( + { + "type": "object", + "properties": { + "access_token": {"type": "string", "airbyte_secret": False}, + "whatever": {"type": "string", "airbyte_secret": False}, + }, + }, + {"access_token": "secret"}, + [], + ), + ], +) +def test_get_secret_values(schema, config, expected): + assert get_secret_values(schema, config) == expected diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_sentry.py b/airbyte-cdk/python/unit_tests/sources/utils/test_sentry.py index 391547a3047d..e78857981330 100644 --- a/airbyte-cdk/python/unit_tests/sources/utils/test_sentry.py +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_sentry.py @@ -1,10 +1,16 @@ # # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +import json import os +from dataclasses import dataclass +from logging import getLogger +from typing import List from unittest import mock +import requests from airbyte_cdk.sources.utils.sentry import AirbyteSentry +from sentry_sdk.transport import Transport @mock.patch("airbyte_cdk.sources.utils.sentry.sentry_sdk") @@ -42,3 +48,77 @@ def test_sentry_init(sentry_mock): with AirbyteSentry.start_transaction_span("name", "op"): assert sentry_mock.start_span.called + + +@dataclass +class TestTransport(Transport): + secrets: List[str] + # Sentry sdk wraps sending event with try except that would intercept + # AssertionError exception resulting it would ignore assert directive. + # Use this variable to check if test failed after sentry code executed. + failed = None + + def capture_envelope(self, envelop): + for s in self.secrets: + for i in envelop.items: + payload = json.dumps(i.payload.json) + assert s not in payload + + def capture_event(self, event): + if self.failed: + return + event = json.dumps(event) + for s in self.secrets: + if s in event: + self.failed = f"{s} should not be in {event}" + return + + +@mock.patch.dict(os.environ, {AirbyteSentry.DSN_ENV_NAME: "https://22222@222.ingest.sentry.io/111"}) +def test_sentry_sensitive_info(httpserver): + SECRET = "SOME_secret" + UNEXPECTED_SECRET = "UnexEpectedSecret" + SECRETS = [SECRET] + transport = TestTransport(secrets=[*SECRETS, UNEXPECTED_SECRET]) + + AirbyteSentry.init("test_source", transport=transport, secret_values=SECRETS) + + AirbyteSentry.add_breadcrumb("msg", {"crumb": SECRET}) + AirbyteSentry.set_context("my secret", {"api_key": SECRET}) + AirbyteSentry.capture_message(f"this is {SECRET}") + AirbyteSentry.capture_message(f"Issue url http://localhost:{httpserver.port}/test?api_key={UNEXPECTED_SECRET}") + AirbyteSentry.capture_message(f"Issue url http://localhost:{httpserver.port}/test?access_token={UNEXPECTED_SECRET}") + AirbyteSentry.capture_message(f"Issue url http://localhost:{httpserver.port}/test?refresh_token={UNEXPECTED_SECRET}") + AirbyteSentry.set_context("headers", {"Authorization": f"Bearer {UNEXPECTED_SECRET}"}) + getLogger("airbyte").info(f"this is {SECRET}") + requests.get( + f"http://localhost:{httpserver.port}/test?api_key={SECRET}", + headers={"Authorization": f"Bearer {SECRET}"}, + ).text + requests.get( + f"http://localhost:{httpserver.port}/test?api_key={UNEXPECTED_SECRET}", + headers={"Authorization": f"Bearer {UNEXPECTED_SECRET}"}, + ).text + AirbyteSentry.capture_exception(Exception(f"Secret info: {SECRET}")) + assert not transport.failed + + +@mock.patch.dict(os.environ, {AirbyteSentry.DSN_ENV_NAME: "https://22222@222.ingest.sentry.io/111"}) +def test_sentry_sensitive_info_transactions(httpserver): + SECRET = "SOME_secret" + SECRETS = [SECRET] + UNEXPECTED_SECRET = "UnexEpectedSecret" + transport = TestTransport(secrets=[*SECRETS, UNEXPECTED_SECRET]) + AirbyteSentry.init("test_source", transport=transport, secret_values=SECRETS) + + AirbyteSentry.set_context("my secret", {"api_key": SECRET}) + AirbyteSentry.set_context("headers", {"Authorization": f"Bearer {UNEXPECTED_SECRET}"}) + with AirbyteSentry.start_transaction("name", "op"): + with AirbyteSentry.start_transaction_span( + "name", description=f"http://localhost:{httpserver.port}/test?api_key={UNEXPECTED_SECRET}" + ): + requests.get( + f"http://localhost:{httpserver.port}/test?api_key={SECRET}", + headers={"Authorization": f"Bearer {SECRET}"}, + ).text + assert not transport.failed