diff --git a/.env b/.env index 97b809640f44..25822add1326 100644 --- a/.env +++ b/.env @@ -95,3 +95,6 @@ STATE_STORAGE_MINIO_BUCKET_NAME= STATE_STORAGE_MINIO_ENDPOINT= STATE_STORAGE_GCS_BUCKET_NAME= + +# Sentry +SENTRY_DSN="https://d4b03de0c4574c78999b8d58e55243dc@o1009025.ingest.sentry.io/6102835" diff --git a/.env.dev b/.env.dev index 8a95538f6761..c9dedb6aa3dc 100644 --- a/.env.dev +++ b/.env.dev @@ -22,3 +22,7 @@ API_URL=/api/v1/ INTERNAL_API_HOST=airbyte-server:8001 SYNC_JOB_MAX_ATTEMPTS=3 SYNC_JOB_MAX_TIMEOUT_DAYS=3 + +# Sentry +SENTRY_DSN="" + diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 40e5b2468edf..c5aed3c9a8df 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.1.45 +Integrate Sentry for performance and errors tracking. + ## 0.1.44 Log http response status code and its content. diff --git a/airbyte-cdk/python/airbyte_cdk/entrypoint.py b/airbyte-cdk/python/airbyte_cdk/entrypoint.py index 17891c6c6ad2..493911b3293e 100644 --- a/airbyte-cdk/python/airbyte_cdk/entrypoint.py +++ b/airbyte-cdk/python/airbyte_cdk/entrypoint.py @@ -9,12 +9,14 @@ import os.path import sys import tempfile -from typing import Iterable, List +from typing import Any, Dict, Iterable, List from airbyte_cdk.logger import AirbyteLogFormatter, init_logger from airbyte_cdk.models import AirbyteMessage, Status, Type +from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification from airbyte_cdk.sources import Source -from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, split_config +from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit, get_secret_values, split_config +from airbyte_cdk.sources.utils.sentry import AirbyteSentry from airbyte_cdk.utils.airbyte_secrets_utils import get_secrets logger = init_logger("airbyte") @@ -59,14 +61,23 @@ def parse_args(args: List[str]) -> argparse.Namespace: return main_parser.parse_args(args) + def configure_sentry(self, spec_schema: Dict[str, Any], parsed_args: argparse.Namespace): + secret_values = [] + if "config" in parsed_args: + config = self.source.read_config(parsed_args.config) + secret_values = get_secret_values(spec_schema, config) + source_name = self.source.__module__.split(".")[0] + source_name = source_name.split("_", 1)[-1] + AirbyteSentry.init(source_tag=source_name, secret_values=secret_values) + def run(self, parsed_args: argparse.Namespace) -> Iterable[str]: cmd = parsed_args.command if not cmd: raise Exception("No command passed") # todo: add try catch for exceptions with different exit codes - source_spec = self.source.spec(self.logger) - + source_spec: ConnectorSpecification = self.source.spec(self.logger) + self.configure_sentry(source_spec.connectionSpecification, parsed_args) with tempfile.TemporaryDirectory() as temp_dir: if cmd == "spec": message = AirbyteMessage(type=Type.SPEC, spec=source_spec) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py index f90d05cc46db..1abab94799f5 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py @@ -3,6 +3,7 @@ # +import logging import os from abc import ABC, abstractmethod from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union @@ -13,6 +14,7 @@ import vcr.cassette as Cassette from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.core import Stream +from airbyte_cdk.sources.utils.sentry import AirbyteSentry from requests.auth import AuthBase from .auth.core import HttpAuthenticator, NoAuth @@ -22,6 +24,8 @@ # list of all possible HTTP methods which can be used for sending of request bodies BODY_REQUEST_METHODS = ("POST", "PUT", "PATCH") +logging.getLogger("vcr").setLevel(logging.ERROR) + class HttpStream(Stream, ABC): """ @@ -272,7 +276,9 @@ def _send(self, request: requests.PreparedRequest, request_kwargs: Mapping[str, Unexpected transient exceptions use the default backoff parameters. Unexpected persistent exceptions are not handled and will cause the sync to fail. """ - response: requests.Response = self._session.send(request, **request_kwargs) + AirbyteSentry.add_breadcrumb(message=f"Issue {request.url}", data=request_kwargs) + with AirbyteSentry.start_transaction_span(op="_send", description=request.url): + response: requests.Response = self._session.send(request, **request_kwargs) if self.should_retry(response): custom_backoff_time = self.backoff_time(response) @@ -313,10 +319,12 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi """ if max_tries is not None: max_tries = max(0, max_tries) + 1 + AirbyteSentry.set_context("request", {"url": request.url, "headers": request.headers, "args": request_kwargs}) - user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send) - backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor) - return backoff_handler(user_backoff_handler)(request, request_kwargs) + with AirbyteSentry.start_transaction_span(op="_send_request"): + user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send) + backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor) + return backoff_handler(user_backoff_handler)(request, request_kwargs) def read_records( self, @@ -329,36 +337,38 @@ def read_records( pagination_complete = False next_page_token = None - while not pagination_complete: - request_headers = self.request_headers(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - request = self._create_prepared_request( - path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - headers=dict(request_headers, **self.authenticator.get_auth_header()), - params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), - ) - request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - - if self.use_cache: - # use context manager to handle and store cassette metadata - with self.cache_file as cass: - self.cassete = cass - # vcr tries to find records based on the request, if such records exist, return from cache file - # else make a request and save record in cache file - response = self._send_request(request, request_kwargs) + with AirbyteSentry.start_transaction("read_records", self.name), AirbyteSentry.start_transaction_span("read_records"): + while not pagination_complete: + request_headers = self.request_headers( + stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token + ) + request = self._create_prepared_request( + path=self.path(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + headers=dict(request_headers, **self.authenticator.get_auth_header()), + params=self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + json=self.request_body_json(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + data=self.request_body_data(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token), + ) + request_kwargs = self.request_kwargs(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) - else: - response = self._send_request(request, request_kwargs) + if self.use_cache: + # use context manager to handle and store cassette metadata + with self.cache_file as cass: + self.cassete = cass + # vcr tries to find records based on the request, if such records exist, return from cache file + # else make a request and save record in cache file + response = self._send_request(request, request_kwargs) - yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) + else: + response = self._send_request(request, request_kwargs) + yield from self.parse_response(response, stream_state=stream_state, stream_slice=stream_slice) - next_page_token = self.next_page_token(response) - if not next_page_token: - pagination_complete = True + next_page_token = self.next_page_token(response) + if not next_page_token: + pagination_complete = True - # Always return an empty generator just in case no records were ever yielded - yield from [] + # Always return an empty generator just in case no records were ever yielded + yield from [] class HttpSubStream(HttpStream, ABC): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py index 3bc9979ec20d..1db9ab5843b9 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/schema_helpers.py @@ -7,8 +7,9 @@ import json import os import pkgutil -from typing import Any, ClassVar, Dict, List, Mapping, MutableMapping, Optional, Tuple, Union +from typing import Any, ClassVar, Dict, List, Mapping, MutableMapping, Optional, Set, Tuple, Union +import dpath.util import jsonref from airbyte_cdk.models import ConnectorSpecification from jsonschema import RefResolver, validate @@ -191,3 +192,32 @@ def split_config(config: Mapping[str, Any]) -> Tuple[dict, InternalConfig]: else: main_config[k] = v return main_config, InternalConfig.parse_obj(internal_config) + + +def get_secret_values(schema: Mapping[str, Any], config: Mapping[str, Any]) -> List[str]: + def get_secret_pathes(schema: Mapping[str, Any]) -> Set[str]: + pathes = set() + + def traverse_schema(schema: Any, path: List[str]): + if isinstance(schema, dict): + for k, v in schema.items(): + traverse_schema(v, [*path, k]) + elif isinstance(schema, list): + for i in schema: + traverse_schema(i, path) + else: + if path[-1] == "airbyte_secret" and schema is True: + path = "/".join([p for p in path[:-1] if p not in ["properties", "oneOf"]]) + pathes.add(path) + + traverse_schema(schema, []) + return pathes + + secret_pathes = get_secret_pathes(schema) + result = [] + for path in secret_pathes: + try: + result.append(dpath.util.get(config, path)) + except KeyError: + pass + return result diff --git a/airbyte-cdk/python/airbyte_cdk/sources/utils/sentry.py b/airbyte-cdk/python/airbyte_cdk/sources/utils/sentry.py new file mode 100644 index 000000000000..14dfd69aafd6 --- /dev/null +++ b/airbyte-cdk/python/airbyte_cdk/sources/utils/sentry.py @@ -0,0 +1,228 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import contextlib +import os +import re +from typing import Any, Callable, List, Optional, Type, Union +from uuid import uuid4 + +import sentry_sdk +from sentry_sdk.integrations.atexit import AtexitIntegration +from sentry_sdk.integrations.excepthook import ExcepthookIntegration +from sentry_sdk.integrations.logging import LoggingIntegration + + +class AirbyteSentry: + """ + Class for working with sentry sdk. It provides methods to: + - init sentry sdk based on env variable + - add breadcrumbs and set context + - work with transactions and transaction spans + - set tag and capture message and capture exception + Also it implements client side sensitive data scrubbing. + """ + + DSN_ENV_NAME = "SENTRY_DSN" + SECRET_MASK = "***" + # Maximum number of breadcrumbs to send on fail. Breadcrumbs is trail of + # events that occured before the fail and being sent to server only + # if handled or unhandled exception occured. + MAX_BREADCRUMBS = 30 + # Event sending rate. could be from 0 (0%) to 1.0 (100 % events being sent + # to sentry server) + TRACES_SAMPLE_RATE = 1.0 + SECRET_REGEXP = [ + re.compile("(api_key=)[a-zA-Z0-9_]+"), + re.compile("(access_token=)[a-zA-Z0-9_]+"), + re.compile("(refresh_token=)[a-zA-Z0-9_]+"), + re.compile("(token )[a-zA-Z0-9_]+"), + re.compile("(Bearer )[a-zA-Z0-9_]+"), + ] + SENSITIVE_KEYS = ["Authorization", "client_secret", "access_token"] + + sentry_enabled = False + source_tag = "" + run_id = str(uuid4()) + secret_values: List[str] = [] + + @classmethod + def process_value(cls, key: str, value: str): + """ + Process single value. Used by recursive replace_value method or + standalone for single value. + """ + for secret in cls.secret_values: + value = value.replace(secret, cls.SECRET_MASK) + if key in cls.SENSITIVE_KEYS: + return cls.SECRET_MASK + for regexp in cls.SECRET_REGEXP: + value = regexp.sub(f"\\1{cls.SECRET_MASK}", value) + return value + + @classmethod + def replace_value(cls, key, value): + """ + Recursively scan event and replace all sensitive data with SECRET_MASK. + Perform inplace data replace i.e. its not creating new object. + """ + if isinstance(value, dict): + for k, v in value.items(): + value[k] = cls.replace_value(k, v) + elif isinstance(value, list): + for index, v in enumerate(value): + value[index] = cls.replace_value(index, v) + elif isinstance(value, str): + return cls.process_value(key, value) + return value + + @classmethod + def filter_event(cls, event, hint): + """ + Callback for before_send sentry hook. + """ + if "message" in event: + event["message"] = cls.process_value(None, event["message"]) + cls.replace_value(None, event.get("exception")) + cls.replace_value(None, event.get("contexts")) + return event + + @classmethod + def filter_breadcrumb(cls, event, hint): + """ + Callback for before_breadcrumb sentry hook. + """ + cls.replace_value(None, event) + return event + + @classmethod + def init( + cls, + source_tag: str = None, + transport: Optional[Union[Type[sentry_sdk.transport.Transport], Callable[[Any], None]]] = None, + secret_values: List[str] = [], + ): + """ + Read sentry data source name (DSN) from env variable and initialize sentry cdk. + Args: + source_tag: str - Source name to be used in "source" tag for events organazing. + transport: Transport or Callable - transport object for transfering + sentry event to remote server. Usually used for testing, by default + HTTP transport used + secret_values: List[str] - list of string that have to be filtered + out before sending event to sentry server. + + """ + sentry_dsn = os.environ.get(cls.DSN_ENV_NAME) + if sentry_dsn: + cls.sentry_enabled = True + cls.secret_values = secret_values + sentry_sdk.init( + sentry_dsn, + max_breadcrumbs=cls.MAX_BREADCRUMBS, + traces_sample_rate=cls.TRACES_SAMPLE_RATE, + before_send=AirbyteSentry.filter_event, + before_breadcrumb=AirbyteSentry.filter_breadcrumb, + transport=transport, + # Use only limited list of integration cause sentry may send + # transaction events e.g. it could send httplib request with + # url and authorization info over StdlibIntegration and it + # would bypass before_send hook. + integrations=[ + ExcepthookIntegration(always_run=True), + AtexitIntegration(), + LoggingIntegration(), + ], + # Disable default integrations cause sentry does not allow to + # filter transactions event that could transfer sensitive data + default_integrations=False, + ) + if source_tag: + sentry_sdk.set_tag("source", source_tag) + sentry_sdk.set_tag("run_id", cls.run_id) + cls.source_tag = source_tag + + def if_enabled(f): + def wrapper(cls, *args, **kvargs): + if cls.sentry_enabled: + return f(cls, *args, **kvargs) + + return wrapper + + def if_enabled_else(return_value): + def if_enabled(f): + def wrapper(cls, *args, **kvargs): + if cls.sentry_enabled: + return f(cls, *args, **kvargs) + else: + return return_value + + return wrapper + + return if_enabled + + @classmethod + @if_enabled + def set_tag(cls, tag_name: str, value: Any): + """ + Set tag that is handy for events organazing and filtering by sentry UI. + """ + sentry_sdk.set_tag(tag_name, value) + + @classmethod + @if_enabled + def add_breadcrumb(cls, message, data=None): + """ + Add sentry breadcrumb. + """ + sentry_sdk.add_breadcrumb(message=message, data=data) + + @classmethod + @if_enabled + def set_context(cls, name, data): + # Global context being used by transaction event as well. Since we cant + # filter senstitve data coming from transaction event using sentry + # before_event hook, apply filter to context here. + cls.replace_value(None, data) + sentry_sdk.set_context(name, data) + + @classmethod + @if_enabled + def capture_message(cls, message): + """ + Send message event to sentry. + """ + sentry_sdk.capture_message(message) + + @classmethod + @if_enabled + def capture_exception( + cls, + error: Optional[BaseException] = None, + scope: Optional[Any] = None, + **scope_args, + ): + """ + Report handled execption to sentry. + """ + sentry_sdk.capture_exception(error, scope=scope, **scope_args) + + @classmethod + @if_enabled_else(contextlib.nullcontext()) + def start_transaction(cls, op, name=None): + """ + Return context manager for starting sentry transaction for performance monitoring. + """ + return sentry_sdk.start_transaction(op=op, name=f"{cls.source_tag}.{name}") + + @classmethod + @if_enabled_else(contextlib.nullcontext()) + def start_transaction_span(cls, op, description=None): + """ + Return context manager for starting sentry transaction span inside existing sentry transaction. + """ + # Apply filter to description since we cannot use before_send sentry + # hook for transaction event. + description = cls.replace_value(None, description) + return sentry_sdk.start_span(op=op, description=description) diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 1ab0a74b6289..271c7db4ccb4 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.1.44", + version="0.1.45", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", @@ -46,18 +46,20 @@ packages=find_packages(exclude=("unit_tests",)), install_requires=[ "backoff", + "dpath==2.0.1", "jsonschema~=3.2.0", "jsonref~=0.2", "pendulum", "pydantic~=1.6", "PyYAML~=5.4", "requests", + "sentry-sdk~=1.5.1", "vcrpy", "Deprecated~=1.2", ], python_requires=">=3.7.0", extras_require={ - "dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock"], + "dev": ["MyPy~=0.812", "pytest", "pytest-cov", "pytest-mock", "requests-mock", "pytest-httpserver"], "sphinx-docs": [ "Sphinx~=4.2", "sphinx-rtd-theme~=1.0", diff --git a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py index df2b69ec8faf..b325733584e5 100644 --- a/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py +++ b/airbyte-cdk/python/unit_tests/sources/streams/http/test_http.py @@ -6,7 +6,7 @@ import json from http import HTTPStatus from typing import Any, Iterable, Mapping, Optional -from unittest.mock import ANY +from unittest.mock import ANY, MagicMock, patch import pytest import requests @@ -416,6 +416,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield response +@patch("airbyte_cdk.sources.streams.core.logging", MagicMock()) def test_using_cache(mocker): parent_stream = CacheHttpStreamWithSlices() mocker.patch.object(parent_stream, "url_base", "https://google.com/") diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py index 17faad0ba3a4..3899141a21ba 100644 --- a/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_schema_helpers.py @@ -12,9 +12,10 @@ from pathlib import Path import jsonref +import pytest from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models.airbyte_protocol import ConnectorSpecification -from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit +from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, check_config_against_spec_or_exit, get_secret_values from pytest import fixture from pytest import raises as pytest_raises @@ -80,7 +81,10 @@ def test_inline_schema_resolves(): "properties": { "str": {"type": "string"}, "int": {"type": "integer"}, - "obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + "obj": { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, }, } @@ -96,16 +100,26 @@ def test_shared_schemas_resolves(): "properties": { "str": {"type": "string"}, "int": {"type": "integer"}, - "obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + "obj": { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, }, } partial_schema = { "type": ["null", "object"], - "properties": {"str": {"type": "string"}, "int": {"type": "integer"}, "obj": {"$ref": "shared_schema.json"}}, + "properties": { + "str": {"type": "string"}, + "int": {"type": "integer"}, + "obj": {"$ref": "shared_schema.json"}, + }, } - referenced_schema = {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}} + referenced_schema = { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + } create_schema("complex_schema", partial_schema) create_schema("shared/shared_schema", referenced_schema) @@ -122,8 +136,19 @@ def test_shared_schemas_resolves_nested(): "properties": { "str": {"type": "string"}, "int": {"type": "integer"}, - "one_of": {"oneOf": [{"type": "string"}, {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}]}, - "obj": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + "one_of": { + "oneOf": [ + {"type": "string"}, + { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, + ] + }, + "obj": { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, }, } partial_schema = { @@ -131,7 +156,12 @@ def test_shared_schemas_resolves_nested(): "properties": { "str": {"type": "string"}, "int": {"type": "integer"}, - "one_of": {"oneOf": [{"type": "string"}, {"$ref": "shared_schema.json#/definitions/type_one"}]}, + "one_of": { + "oneOf": [ + {"type": "string"}, + {"$ref": "shared_schema.json#/definitions/type_one"}, + ] + }, "obj": {"$ref": "shared_schema.json#/definitions/type_one"}, }, } @@ -139,7 +169,10 @@ def test_shared_schemas_resolves_nested(): referenced_schema = { "definitions": { "type_one": {"$ref": "shared_schema.json#/definitions/type_nested"}, - "type_nested": {"type": ["null", "object"], "properties": {"k1": {"type": "string"}}}, + "type_nested": { + "type": ["null", "object"], + "properties": {"k1": {"type": "string"}}, + }, } } @@ -153,3 +186,69 @@ def test_shared_schemas_resolves_nested(): # Make sure generated schema is JSON serializable assert json.dumps(actual_schema) assert jsonref.JsonRef.replace_refs(actual_schema) + + +@pytest.mark.parametrize( + "schema,config,expected", + [ + ( + { + "type": "object", + "properties": { + "credentials": { + "type": "object", + "oneOf": [ + { + "type": "object", + "properties": { + "option_title": { + "type": "string", + "const": "OAuth Credentials", + } + }, + }, + { + "type": "object", + "properties": { + "option_title": {"type": "string"}, + "personal_access_token": { + "type": "string", + "airbyte_secret": True, + }, + }, + }, + ], + }, + "repository": {"type": "string"}, + "start_date": {"type": "string"}, + }, + }, + {"credentials": {"personal_access_token": "secret"}}, + ["secret"], + ), + ( + { + "type": "object", + "properties": { + "access_token": {"type": "string", "airbyte_secret": True}, + "whatever": {"type": "string", "airbyte_secret": False}, + }, + }, + {"access_token": "secret"}, + ["secret"], + ), + ( + { + "type": "object", + "properties": { + "access_token": {"type": "string", "airbyte_secret": False}, + "whatever": {"type": "string", "airbyte_secret": False}, + }, + }, + {"access_token": "secret"}, + [], + ), + ], +) +def test_get_secret_values(schema, config, expected): + assert get_secret_values(schema, config) == expected diff --git a/airbyte-cdk/python/unit_tests/sources/utils/test_sentry.py b/airbyte-cdk/python/unit_tests/sources/utils/test_sentry.py new file mode 100644 index 000000000000..fd01cc653615 --- /dev/null +++ b/airbyte-cdk/python/unit_tests/sources/utils/test_sentry.py @@ -0,0 +1,125 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import json +import os +from dataclasses import dataclass +from logging import getLogger +from typing import List +from unittest import mock + +import requests +from airbyte_cdk.sources.utils.sentry import AirbyteSentry +from sentry_sdk.transport import Transport + + +@mock.patch("airbyte_cdk.sources.utils.sentry.sentry_sdk") +def test_sentry_init_no_env(sentry_mock): + assert AirbyteSentry.DSN_ENV_NAME not in os.environ + AirbyteSentry.init("test_source") + assert not sentry_mock.init.called + assert not AirbyteSentry.sentry_enabled + AirbyteSentry.set_tag("tagname", "value") + assert not sentry_mock.set_tag.called + AirbyteSentry.add_breadcrumb("msg", data={}) + assert not sentry_mock.add_breadcrumb.called + + with AirbyteSentry.start_transaction("name", "op"): + assert not sentry_mock.start_transaction.called + + with AirbyteSentry.start_transaction_span("name", "op"): + assert not sentry_mock.start_span.called + + +@mock.patch.dict(os.environ, {AirbyteSentry.DSN_ENV_NAME: "dsn"}) +@mock.patch("airbyte_cdk.sources.utils.sentry.sentry_sdk") +def test_sentry_init(sentry_mock): + AirbyteSentry.init("test_source") + assert sentry_mock.init.called + sentry_mock.set_tag.assert_any_call("source", "test_source") + sentry_mock.set_tag.assert_any_call("run_id", mock.ANY) + assert AirbyteSentry.sentry_enabled + AirbyteSentry.set_tag("tagname", "value") + assert sentry_mock.set_tag.called + AirbyteSentry.add_breadcrumb("msg", data={}) + assert sentry_mock.add_breadcrumb.called + with AirbyteSentry.start_transaction("name", "op"): + assert sentry_mock.start_transaction.called + + with AirbyteSentry.start_transaction_span("name", "op"): + assert sentry_mock.start_span.called + + +@dataclass +class TestTransport(Transport): + secrets: List[str] + # Sentry sdk wraps sending event with try except that would intercept + # AssertionError exception resulting it would ignore assert directive. + # Use this variable to check if test failed after sentry code executed. + failed = None + + def capture_envelope(self, envelop): + for s in self.secrets: + for i in envelop.items: + payload = json.dumps(i.payload.json) + assert s not in payload + + def capture_event(self, event): + if self.failed: + return + event = json.dumps(event) + for s in self.secrets: + if s in event: + self.failed = f"{s} should not be in {event}" + return + + +@mock.patch.dict(os.environ, {AirbyteSentry.DSN_ENV_NAME: "https://22222@222.ingest.sentry.io/111"}) +def test_sentry_sensitive_info(httpserver): + SECRET = "SOME_secret" + UNEXPECTED_SECRET = "UnexEpectedSecret" + SECRETS = [SECRET] + transport = TestTransport(secrets=[*SECRETS, UNEXPECTED_SECRET]) + + AirbyteSentry.init("test_source", transport=transport, secret_values=SECRETS) + + AirbyteSentry.add_breadcrumb("msg", {"crumb": SECRET}) + AirbyteSentry.set_context("my secret", {"api_key": SECRET}) + AirbyteSentry.capture_message(f"this is {SECRET}") + AirbyteSentry.capture_message(f"Issue url http://localhost:{httpserver.port}/test?api_key={UNEXPECTED_SECRET}") + AirbyteSentry.capture_message(f"Issue url http://localhost:{httpserver.port}/test?access_token={UNEXPECTED_SECRET}") + AirbyteSentry.capture_message(f"Issue url http://localhost:{httpserver.port}/test?refresh_token={UNEXPECTED_SECRET}") + AirbyteSentry.set_context("headers", {"Authorization": f"Bearer {UNEXPECTED_SECRET}"}) + getLogger("airbyte").info(f"this is {SECRET}") + requests.get( + f"http://localhost:{httpserver.port}/test?api_key={SECRET}", + headers={"Authorization": f"Bearer {SECRET}"}, + ).text + requests.get( + f"http://localhost:{httpserver.port}/test?api_key={UNEXPECTED_SECRET}", + headers={"Authorization": f"Bearer {UNEXPECTED_SECRET}"}, + ).text + AirbyteSentry.capture_exception(Exception(f"Secret info: {SECRET}")) + assert not transport.failed + + +@mock.patch.dict(os.environ, {AirbyteSentry.DSN_ENV_NAME: "https://22222@222.ingest.sentry.io/111"}) +def test_sentry_sensitive_info_transactions(httpserver): + SECRET = "SOME_secret" + SECRETS = [SECRET] + UNEXPECTED_SECRET = "UnexEpectedSecret" + transport = TestTransport(secrets=[*SECRETS, UNEXPECTED_SECRET]) + AirbyteSentry.init("test_source", transport=transport, secret_values=SECRETS) + + AirbyteSentry.set_context("my secret", {"api_key": SECRET}) + AirbyteSentry.set_context("headers", {"Authorization": f"Bearer {UNEXPECTED_SECRET}"}) + with AirbyteSentry.start_transaction("name", "op"): + with AirbyteSentry.start_transaction_span( + "name", description=f"http://localhost:{httpserver.port}/test?api_key={UNEXPECTED_SECRET}" + ): + requests.get( + f"http://localhost:{httpserver.port}/test?api_key={SECRET}", + headers={"Authorization": f"Bearer {SECRET}"}, + ).text + assert not transport.failed diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9a0edbdce4a9..341476d3f4fd 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -252,7 +252,7 @@ - name: Google Search Console sourceDefinitionId: eb4c9e00-db83-4d63-a386-39cfa91012a8 dockerRepository: airbyte/source-google-search-console - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.io/integrations/sources/google-search-console icon: googlesearchconsole.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 02c566c2dc67..55a3560bfd99 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2413,7 +2413,7 @@ - - "client_secret" oauthFlowOutputParameters: - - "refresh_token" -- dockerImage: "airbyte/source-google-search-console:0.1.7" +- dockerImage: "airbyte/source-google-search-console:0.1.8" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/google-search-console" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-google-search-console/Dockerfile b/airbyte-integrations/connectors/source-google-search-console/Dockerfile index b638438717d3..841d087b3c63 100755 --- a/airbyte-integrations/connectors/source-google-search-console/Dockerfile +++ b/airbyte-integrations/connectors/source-google-search-console/Dockerfile @@ -10,7 +10,8 @@ COPY setup.py ./ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENV SENTRY_DSN "https://d4b03de0c4574c78999b8d58e55243dc@o1009025.ingest.sentry.io/6102835" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/source-google-search-console diff --git a/docs/integrations/sources/google-search-console.md b/docs/integrations/sources/google-search-console.md index 100655c77c18..e7519b1151fd 100644 --- a/docs/integrations/sources/google-search-console.md +++ b/docs/integrations/sources/google-search-console.md @@ -96,6 +96,7 @@ You should now be ready to use the Google Workspace Admin Reports API connector | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| `0.1.8` | 2021-12-21 | [8248](https://github.com/airbytehq/airbyte/pull/8248) | Enable Sentry for performance and errors tracking | | `0.1.7` | 2021-11-26 | [7431](https://github.com/airbytehq/airbyte/pull/7431) | Add default `end_date` param value | | `0.1.6` | 2021-09-27 | [6460](https://github.com/airbytehq/airbyte/pull/6460) | Update OAuth Spec File | | `0.1.4` | 2021-09-23 | [6394](https://github.com/airbytehq/airbyte/pull/6394) | Update Doc link Spec File |