From 9d1cd42ff9f3118e2312ea9c94ad647f1baaad73 Mon Sep 17 00:00:00 2001 From: Luis Gomez <781929+lgomezm@users.noreply.github.com> Date: Mon, 30 May 2022 05:28:50 -0500 Subject: [PATCH] =?UTF-8?q?=F0=9F=8E=89=20Source=20Freshdesk:=20Migrated?= =?UTF-8?q?=20to=20latest=20CDK=20(#12334)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-freshdesk/CHANGELOG.md | 21 - .../connectors/source-freshdesk/Dockerfile | 2 +- .../acceptance-test-config.yml | 3 + .../integration_tests/configured_catalog.json | 652 ++++++++++++++++++ .../integration_tests/test_client.py | 64 -- .../source-freshdesk/source_freshdesk/api.py | 358 ---------- .../source_freshdesk/client.py | 88 --- .../source_freshdesk/errors.py | 37 - .../source_freshdesk/schemas/groups.json | 2 +- .../source_freshdesk/schemas/tickets.json | 10 +- .../source_freshdesk/source.py | 66 +- .../source_freshdesk/streams.py | 243 +++++++ .../source_freshdesk/utils.py | 50 -- .../source-freshdesk/unit_tests/conftest.py | 16 + .../unit_tests/test_300_page.py | 150 ++-- .../unit_tests/test_client.py | 45 -- .../unit_tests/test_source.py | 57 ++ .../unit_tests/test_streams.py | 99 +++ docs/integrations/sources/freshdesk.md | 2 +- 21 files changed, 1247 insertions(+), 722 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-freshdesk/CHANGELOG.md create mode 100644 airbyte-integrations/connectors/source-freshdesk/integration_tests/configured_catalog.json delete mode 100644 airbyte-integrations/connectors/source-freshdesk/integration_tests/test_client.py delete mode 100644 airbyte-integrations/connectors/source-freshdesk/source_freshdesk/api.py delete mode 100644 airbyte-integrations/connectors/source-freshdesk/source_freshdesk/client.py delete mode 100644 airbyte-integrations/connectors/source-freshdesk/source_freshdesk/errors.py create mode 100644 airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py create mode 100644 airbyte-integrations/connectors/source-freshdesk/unit_tests/conftest.py delete mode 100644 airbyte-integrations/connectors/source-freshdesk/unit_tests/test_client.py create mode 100644 airbyte-integrations/connectors/source-freshdesk/unit_tests/test_source.py create mode 100644 airbyte-integrations/connectors/source-freshdesk/unit_tests/test_streams.py diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index e665b3d04391..fcddb196b785 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -271,7 +271,7 @@ - name: Freshdesk sourceDefinitionId: ec4b9503-13cb-48ab-a4ab-6ade4be46567 dockerRepository: airbyte/source-freshdesk - dockerImageTag: 0.2.11 + dockerImageTag: 0.3.0 documentationUrl: https://docs.airbyte.io/integrations/sources/freshdesk icon: freshdesk.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index dc4292b4b9df..07ec226164f1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -2405,7 +2405,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-freshdesk:0.2.11" +- dockerImage: "airbyte/source-freshdesk:0.3.0" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/freshdesk" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-freshdesk/CHANGELOG.md b/airbyte-integrations/connectors/source-freshdesk/CHANGELOG.md deleted file mode 100644 index 6236bdb8ab7d..000000000000 --- a/airbyte-integrations/connectors/source-freshdesk/CHANGELOG.md +++ /dev/null @@ -1,21 +0,0 @@ -# Changelog - -## 0.2.9 -Maximum pagination limit `maximum_page = 500` is removed. - -## 0.2.8 -Include `requester` and `stats` fields in `tickets` stream - -## 0.2.7 -Add start_date parameter to specification from which to start pulling data. - -## 0.2.6 -Fix `unique_external_id` type in `contacts` schema. Should be a string -instead of an integer. - -## 0.2.4 -Fix the issue when server doesn't allow the client to fetch more than 300 pages from Tickets Stream: -`Validation failed: [{'field': 'page', 'message': 'You cannot access tickets beyond the 300th page. Please provide a smaller page number.', 'code': 'invalid_value'}]` - -## 0.2.3 -Fix discovery and set default cursor field as "updated_at" diff --git a/airbyte-integrations/connectors/source-freshdesk/Dockerfile b/airbyte-integrations/connectors/source-freshdesk/Dockerfile index 83765ef3f49b..da9cf238c8bb 100644 --- a/airbyte-integrations/connectors/source-freshdesk/Dockerfile +++ b/airbyte-integrations/connectors/source-freshdesk/Dockerfile @@ -34,5 +34,5 @@ COPY source_freshdesk ./source_freshdesk ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.2.11 +LABEL io.airbyte.version=0.3.0 LABEL io.airbyte.name=airbyte/source-freshdesk diff --git a/airbyte-integrations/connectors/source-freshdesk/acceptance-test-config.yml b/airbyte-integrations/connectors/source-freshdesk/acceptance-test-config.yml index a3e6ff797dac..3a2b3cfb605f 100644 --- a/airbyte-integrations/connectors/source-freshdesk/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-freshdesk/acceptance-test-config.yml @@ -13,9 +13,12 @@ tests: - config_path: "secrets/config.json" basic_read: - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" empty_streams: ["satisfaction_ratings", "tickets", "time_entries", "conversations"] incremental: - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" future_state_path: "integration_tests/abnormal_state.json" full_refresh: - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-freshdesk/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-freshdesk/integration_tests/configured_catalog.json new file mode 100644 index 000000000000..484765452fd7 --- /dev/null +++ b/airbyte-integrations/connectors/source-freshdesk/integration_tests/configured_catalog.json @@ -0,0 +1,652 @@ +{ + "streams": [ + { + "stream": { + "name": "agents", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "available": { + "type": "boolean" + }, + "occasional": { + "type": "boolean" + }, + "id": { + "type": "integer" + }, + "signature": { + "type": ["string", "null"] + }, + "ticket_scope": { + "type": "integer" + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "last_active_at": { + "type": ["string", "null"] + }, + "available_since": { + "type": ["string", "null"] + }, + "type": { + "type": "string" + }, + "contact": { + "type": "object", + "properties": { + "active": { + "type": "boolean" + }, + "email": { + "type": "string" + }, + "job_title": { + "type": ["string", "null"] + }, + "language": { + "type": "string" + }, + "last_login_at": { + "type": ["string", "null"] + }, + "mobile": { + "type": ["string", "integer", "null"] + }, + "name": { + "type": "string" + }, + "phone": { + "type": ["string", "integer", "null"] + }, + "time_zone": { + "type": "string" + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "companies", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "description": { + "type": ["string", "null"] + }, + "note": { + "type": ["string", "null"] + }, + "domains": { + "type": "array", + "items": { + "type": "string" + } + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "custom_fields": { + "type": "object" + }, + "health_score": { + "type": ["string", "null"] + }, + "account_tier": { + "type": "string" + }, + "renewal_date": { + "type": ["string", "null"] + }, + "industry": { + "type": ["string", "null"] + } + } + }, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "default_cursor_field": null + }, + "sync_mode": "full_refresh", + "cursor_field": null, + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "contacts", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "active": { + "type": "boolean" + }, + "address": { + "type": ["string", "null"] + }, + "company_id": { + "type": ["integer", "null"] + }, + "description": { + "type": ["string", "null"] + }, + "email": { + "type": "string" + }, + "id": { + "type": "integer" + }, + "job_title": { + "type": ["string", "null"] + }, + "language": { + "type": "string" + }, + "mobile": { + "type": ["string", "integer", "null"] + }, + "name": { + "type": "string" + }, + "phone": { + "type": ["string", "integer", "null"] + }, + "time_zone": { + "type": "string" + }, + "twitter_id": { + "type": ["integer", "null"] + }, + "custom_fields": { + "type": "object" + }, + "facebook_id": { + "type": ["integer", "null"] + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "csat_rating": { + "type": ["integer", "null"] + }, + "preferred_source": { + "type": "string" + }, + "unique_external_id": { + "type": ["integer", "null"] + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "conversations", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "body": { + "type": "string" + }, + "body_text": { + "type": "string" + }, + "id": { + "type": "integer" + }, + "incoming": { + "type": "boolean" + }, + "private": { + "type": "boolean" + }, + "user_id": { + "type": "integer" + }, + "support_email": { + "type": ["string", "null"] + }, + "source": { + "type": "integer" + }, + "category": { + "type": "integer" + }, + "to_emails": { + "type": ["array", "null"] + }, + "from_email": { + "type": ["string", "null"] + }, + "cc_emails": { + "type": ["array", "null"] + }, + "bcc_emails": { + "type": ["array", "null"] + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "attachments": { + "type": ["array", "null"] + }, + "ticket_id": { + "type": "integer" + }, + "source_additional_info": { + "type": ["object", "null"] + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "groups", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "description": { + "type": "string" + }, + "escalate_to": { + "type": ["integer", "null"] + }, + "unassigned_for": { + "type": ["string", "null"] + }, + "business_hour_id": { + "type": ["integer", "null"] + }, + "group_type": { + "type": "string" + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "auto_ticket_assign": { + "type": ["integer", "null"] + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "roles", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": "integer" + }, + "name": { + "type": "string" + }, + "description": { + "type": "string" + }, + "default": { + "type": "boolean" + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "satisfaction_ratings", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": "integer" + }, + "survey_id": { + "type": "integer" + }, + "user_id": { + "type": "integer" + }, + "agent_id": { + "type": "integer" + }, + "feedback": { + "type": "string" + }, + "group_id": { + "type": "integer" + }, + "ticket_id": { + "type": "integer" + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "ratings": { + "type": "object" + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "surveys", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": "integer" + }, + "title": { + "type": "string" + }, + "active": { + "type": "boolean" + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "questions": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "label": { + "type": "string" + }, + "accepted_ratings": { + "type": "array", + "items": { + "type": "integer" + } + }, + "default": { + "type": "boolean" + } + } + } + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "tickets", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "cc_emails": { + "type": ["array", "null"] + }, + "fwd_emails": { + "type": ["array", "null"] + }, + "reply_cc_emails": { + "type": ["array", "null"] + }, + "ticket_cc_emails": { + "type": ["array", "null"] + }, + "fr_escalated": { + "type": "boolean" + }, + "spam": { + "type": "boolean" + }, + "email_config_id": { + "type": ["integer", "null"] + }, + "group_id": { + "type": ["integer", "null"] + }, + "priority": { + "type": ["integer", "null"] + }, + "requester_id": { + "type": ["integer", "null"] + }, + "responder_id": { + "type": ["integer", "null"] + }, + "source": { + "type": ["integer", "null"] + }, + "company_id": { + "type": ["integer", "null"] + }, + "status": { + "type": ["integer", "null"] + }, + "subject": { + "type": ["string", "null"] + }, + "association_type": { + "type": ["integer", "null"] + }, + "to_emails": { + "type": ["array", "null"], + "items": { + "type": "string" + } + }, + "product_id": { + "type": ["integer", "null"] + }, + "id": { + "type": "integer" + }, + "type": { + "type": "string" + }, + "due_by": { + "type": "string" + }, + "fr_due_by": { + "type": "string" + }, + "is_escalated": { + "type": "boolean" + }, + "custom_fields": { + "type": "object" + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + }, + "associated_tickets_count": { + "type": ["integer", "null"] + }, + "tags": { + "type": "array" + }, + "nr_due_by": { + "type": ["string", "null"] + }, + "nr_escalated": { + "type": "boolean" + }, + "description": { + "type": "string" + }, + "description_text": { + "type": "string" + }, + "requester": { + "type": "object" + }, + "stats": { + "type": "object" + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "time_entries", + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "billable": { + "type": "boolean" + }, + "note": { + "type": "string" + }, + "id": { + "type": "integer" + }, + "timer_running": { + "type": "boolean" + }, + "agent_id": { + "type": "integer" + }, + "ticket_id": { + "type": "integer" + }, + "company_id": { + "type": ["integer", "null"] + }, + "time_spent": { + "type": "string" + }, + "executed_at": { + "type": "string" + }, + "start_time": { + "type": "string" + }, + "created_at": { + "type": "string" + }, + "updated_at": { + "type": "string" + } + } + }, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, + "default_cursor_field": null + }, + "sync_mode": "incremental", + "cursor_field": null, + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-freshdesk/integration_tests/test_client.py b/airbyte-integrations/connectors/source-freshdesk/integration_tests/test_client.py deleted file mode 100644 index 5b197e9fa5f8..000000000000 --- a/airbyte-integrations/connectors/source-freshdesk/integration_tests/test_client.py +++ /dev/null @@ -1,64 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -import json -from pathlib import Path -from typing import Mapping - -import pytest -from source_freshdesk.client import Client - -HERE = Path(__file__).parent.absolute() - - -@pytest.fixture(scope="session") -def account_creds() -> Mapping[str, str]: - config_filename = HERE.parent / "secrets" / "config.json" - - if not config_filename.exists(): - raise RuntimeError(f"Please provide credentials in {config_filename}") - - with open(str(config_filename)) as json_file: - return json.load(json_file) - - -@pytest.fixture -def unknown_account() -> str: - return "unknownaccount.freshdesk.com" - - -@pytest.fixture -def non_freshdesk_account() -> str: - return "unknownaccount.somedomain.com" - - -def test_client_wrong_domain(non_freshdesk_account): - expected_error = "Freshdesk v2 API works only via Freshdesk domains and not via custom CNAMEs" - with pytest.raises(AttributeError, match=expected_error): - Client(domain=non_freshdesk_account, api_key="wrong_key") - - -def test_client_wrong_account(unknown_account): - client = Client(domain=unknown_account, api_key="wrong_key") - alive, error = client.health_check() - - assert not alive - assert error == "Invalid credentials" - - -def test_client_wrong_cred(account_creds): - client = Client(domain=account_creds["domain"], api_key="wrong_key") - alive, error = client.health_check() - - assert not alive - assert error == "Invalid credentials" - - -def test_client_ok(account_creds): - client = Client(domain=account_creds["domain"], api_key=account_creds["api_key"]) - alive, error = client.health_check() - - assert alive - assert not error diff --git a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/api.py b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/api.py deleted file mode 100644 index c02c575b93a5..000000000000 --- a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/api.py +++ /dev/null @@ -1,358 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -from abc import ABC, abstractmethod -from functools import partial -from itertools import count -from typing import Any, Callable, Iterator, Mapping, MutableMapping, Optional, Sequence - -import pendulum -import requests -from airbyte_cdk.entrypoint import logger # FIXME (Eugene K): use standard logger -from requests import HTTPError -from source_freshdesk.errors import ( - FreshdeskAccessDenied, - FreshdeskBadRequest, - FreshdeskError, - FreshdeskNotFound, - FreshdeskRateLimited, - FreshdeskServerError, - FreshdeskUnauthorized, -) -from source_freshdesk.utils import CallCredit, retry_after_handler, retry_connection_handler - - -class API: - def __init__( - self, - domain: str, - api_key: str, - requests_per_minute: int = None, - verify: bool = True, - proxies: MutableMapping[str, Any] = None, - start_date: str = None, - ): - """Basic HTTP interface to read from endpoints""" - self._api_prefix = f"https://{domain.rstrip('/')}/api/v2/" - self._session = requests.Session() - self._session.auth = (api_key, "unused_with_api_key") - self._session.verify = verify - self._session.proxies = proxies - self._session.headers = { - "Content-Type": "application/json", - "User-Agent": "Airbyte", - } - - self._call_credit = CallCredit(balance=requests_per_minute) if requests_per_minute else None - - # By default, only tickets that have been created within the past 30 days will be returned. - # Since this logic rely not on updated tickets, it can break tickets dependant streams - conversations. - # So updated_since parameter will be always used in tickets streams. And start_date will be used too - # with default value 30 days look back. - self._start_date = pendulum.parse(start_date) if start_date else pendulum.now() - pendulum.duration(days=30) - - if domain.find("freshdesk.com") < 0: - raise AttributeError("Freshdesk v2 API works only via Freshdesk domains and not via custom CNAMEs") - - @staticmethod - def _parse_and_handle_errors(response): - try: - body = response.json() - except ValueError: - body = {} - - error_message = "Freshdesk Request Failed" - if "errors" in body: - error_message = f"{body.get('description')}: {body['errors']}" - # API docs don't mention this clearly, but in the case of bad credentials the returned JSON will have a - # "message" field at the top level - elif "message" in body: - error_message = f"{body.get('code')}: {body['message']}" - - if response.status_code == 400: - raise FreshdeskBadRequest(error_message or "Wrong input, check your data", response=response) - elif response.status_code == 401: - raise FreshdeskUnauthorized(error_message or "Invalid credentials", response=response) - elif response.status_code == 403: - raise FreshdeskAccessDenied(error_message or "You don't have enough permissions", response=response) - elif response.status_code == 404: - raise FreshdeskNotFound(error_message or "Resource not found", response=response) - elif response.status_code == 429: - retry_after = response.headers.get("Retry-After") - raise FreshdeskRateLimited( - f"429 Rate Limit Exceeded: API rate-limit has been reached until {retry_after} seconds." - " See http://freshdesk.com/api#ratelimit", - response=response, - ) - elif 500 <= response.status_code < 600: - raise FreshdeskServerError(f"{response.status_code}: Server Error", response=response) - - # Catch any other errors - try: - response.raise_for_status() - except HTTPError as err: - raise FreshdeskError(f"{err}: {body}", response=response) from err - - return body - - @retry_connection_handler(max_tries=5, factor=5) - @retry_after_handler(max_tries=3) - def get(self, url: str, params: Mapping = None): - """Wrapper around request.get() to use the API prefix. Returns a JSON response.""" - params = params or {} - response = self._session.get(self._api_prefix + url, params=params) - return self._parse_and_handle_errors(response) - - def consume_credit(self, credit): - """Consume call credit, if there is no credit left within current window will sleep til next period""" - if self._call_credit: - self._call_credit.consume(credit) - - -class StreamAPI(ABC): - """Basic stream API that allows to iterate over entities""" - - result_return_limit = 100 # maximum value - call_credit = 1 # see https://developers.freshdesk.com/api/#embedding - - def __init__(self, api: API, *args, **kwargs): - super().__init__(*args, **kwargs) - self._api = api - - def _api_get(self, url: str, params: Mapping = None): - """Wrapper around API GET method to respect call rate limit""" - self._api.consume_credit(self.call_credit) - return self._api.get(url, params=params) - - @abstractmethod - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - - def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: - """Read using getter""" - params = params or {} - - for page in count(start=1): - batch = list( - getter( - params={ - **params, - "per_page": self.result_return_limit, - "page": page, - } - ) - ) - yield from batch - - if len(batch) < self.result_return_limit: - return iter(()) - - -class IncrementalStreamAPI(StreamAPI, ABC): - state_pk = "updated_at" # Name of the field associated with the state - state_filter = "updated_since" # Name of filter that corresponds to the state - - @property - def state(self) -> Optional[Mapping[str, Any]]: - """Current state, if wasn't set return None""" - if self._state: - return {self.state_pk: str(self._state).replace("+00:00", "Z")} - return None - - @state.setter - def state(self, value: Mapping[str, Any]): - self._state = pendulum.parse(value[self.state_pk]) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._state: Optional[Mapping[str, Any]] = None - - def _state_params(self) -> Mapping[str, Any]: - """Build query parameters responsible for current state""" - if self._state: - return {self.state_filter: self._state} - return {self.state_filter: self._api._start_date} - - @property - def name(self): - """Name of the stream""" - stream_name = self.__class__.__name__ - if stream_name.endswith("API"): - stream_name = stream_name[:-3] - return stream_name - - def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: - """Read using getter, patched to respect current state""" - params = params or {} - params = {**params, **self._state_params()} - latest_cursor = None - for record in super().read(getter, params): - cursor = pendulum.parse(record[self.state_pk]) - # filter out records older then state - if self._state and self._state >= cursor: - continue - latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor - yield record - - if latest_cursor: - logger.info(f"Advancing bookmark for {self.name} stream from {self._state} to {latest_cursor}") - self._state = max(latest_cursor, self._state) if self._state else latest_cursor - - -class ClientIncrementalStreamAPI(IncrementalStreamAPI, ABC): - """Incremental stream that don't have native API support, i.e we filter on the client side only""" - - def _state_params(self) -> Mapping[str, Any]: - """Build query parameters responsible for current state, override because API doesn't support this""" - return {} - - -class AgentsAPI(ClientIncrementalStreamAPI): - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - yield from self.read(partial(self._api_get, url="agents")) - - -class CompaniesAPI(ClientIncrementalStreamAPI): - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - yield from self.read(partial(self._api_get, url="companies")) - - -class ContactsAPI(IncrementalStreamAPI): - state_filter = "_updated_since" - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - yield from self.read(partial(self._api_get, url="contacts")) - - -class GroupsAPI(ClientIncrementalStreamAPI): - """Only users with admin privileges can access the following APIs.""" - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - yield from self.read(partial(self._api_get, url="groups")) - - -class RolesAPI(ClientIncrementalStreamAPI): - """Only users with admin privileges can access the following APIs.""" - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - yield from self.read(partial(self._api_get, url="roles")) - - -class SkillsAPI(ClientIncrementalStreamAPI): - """Only users with admin privileges can access the following APIs.""" - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - yield from self.read(partial(self._api_get, url="skills")) - - -class SurveysAPI(ClientIncrementalStreamAPI): - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - yield from self.read(partial(self._api_get, url="surveys")) - - -class TicketsAPI(IncrementalStreamAPI): - call_credit = 3 # each include consumes 2 additional credits - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - includes = ["description", "requester", "stats"] - params = {"include": ",".join(includes)} - yield from self.read(partial(self._api_get, url="tickets"), params=params) - - @staticmethod - def get_tickets( - result_return_limit: int, getter: Callable, params: Mapping[str, Any] = None, ticket_paginate_limit: int = 300 - ) -> Iterator: - """ - Read using getter - - This block extends TicketsAPI Stream to overcome '300 page' server error. - Since the TicketsAPI Stream list has a 300 page pagination limit, after 300 pages, update the parameters with - query using 'updated_since' = last_record, if there is more data remaining. - """ - params = params or {} - - # Start page - page = 1 - # Initial request parameters - params = { - **params, - "order_type": "asc", # ASC order, to get the old records first - "order_by": "updated_at", - "per_page": result_return_limit, - } - - while True: - params["page"] = page - batch = list(getter(params=params)) - yield from batch - - if len(batch) < result_return_limit: - return iter(()) - - # checkpoint & switch the pagination - if page == ticket_paginate_limit: - # get last_record from latest batch, pos. -1, because of ACS order of records - last_record_updated_at = batch[-1]["updated_at"] - page = 0 # reset page counter - last_record_updated_at = pendulum.parse(last_record_updated_at) - # updating request parameters with last_record state - params["updated_since"] = last_record_updated_at - # Increment page - page += 1 - - # Override the super().read() method with modified read for tickets - def read(self, getter: Callable, params: Mapping[str, Any] = None) -> Iterator: - """Read using getter, patched to respect current state""" - params = params or {} - params = {**params, **self._state_params()} - latest_cursor = None - for record in self.get_tickets(self.result_return_limit, getter, params): - cursor = pendulum.parse(record[self.state_pk]) - # filter out records older then state - if self._state and self._state >= cursor: - continue - latest_cursor = max(cursor, latest_cursor) if latest_cursor else cursor - yield record - - if latest_cursor: - logger.info(f"Advancing bookmark for {self.name} stream from {self._state} to {latest_cursor}") - self._state = max(latest_cursor, self._state) if self._state else latest_cursor - - -class TimeEntriesAPI(ClientIncrementalStreamAPI): - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - yield from self.read(partial(self._api_get, url="time_entries")) - - -class ConversationsAPI(ClientIncrementalStreamAPI): - """Notes and Replies""" - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - tickets = TicketsAPI(self._api) - if self.state: - tickets.state = self.state - for ticket in tickets.list(): - url = f"tickets/{ticket['id']}/conversations" - yield from self.read(partial(self._api_get, url=url)) - - -class SatisfactionRatingsAPI(IncrementalStreamAPI): - """Surveys satisfaction replies""" - - state_filter = "created_since" - - def list(self, fields: Sequence[str] = None) -> Iterator[dict]: - """Iterate over entities""" - yield from self.read(partial(self._api_get, url="surveys/satisfaction_ratings")) diff --git a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/client.py b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/client.py deleted file mode 100644 index 4c6a305a85b6..000000000000 --- a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/client.py +++ /dev/null @@ -1,88 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -from typing import Any, Iterable, Mapping, Tuple - -from airbyte_cdk.models import AirbyteStream -from airbyte_cdk.sources.deprecated.client import BaseClient - -from .api import ( - API, - AgentsAPI, - CompaniesAPI, - ContactsAPI, - ConversationsAPI, - FreshdeskError, - FreshdeskNotFound, - FreshdeskUnauthorized, - GroupsAPI, - RolesAPI, - SatisfactionRatingsAPI, - SkillsAPI, - SurveysAPI, - TicketsAPI, - TimeEntriesAPI, -) - - -class Client(BaseClient): - def __init__(self, domain, api_key, requests_per_minute: int = None, start_date: str = None): - self._api = API(domain=domain, api_key=api_key, requests_per_minute=requests_per_minute, start_date=start_date) - self._apis = { - "agents": AgentsAPI(self._api), - "companies": CompaniesAPI(self._api), - "contacts": ContactsAPI(self._api), - "conversations": ConversationsAPI(self._api), - "groups": GroupsAPI(self._api), - "roles": RolesAPI(self._api), - "skills": SkillsAPI(self._api), - "surveys": SurveysAPI(self._api), - "tickets": TicketsAPI(self._api), - "time_entries": TimeEntriesAPI(self._api), - "satisfaction_ratings": SatisfactionRatingsAPI(self._api), - } - super().__init__() - - @property - def streams(self) -> Iterable[AirbyteStream]: - """List of available streams""" - for stream in super().streams: - if stream.source_defined_cursor: - stream.default_cursor_field = [self._apis[stream.name].state_pk] - yield stream - - def settings(self): - url = "settings/helpdesk" - return self._api.get(url) - - def stream_has_state(self, name: str) -> bool: - """Tell if stream supports incremental sync""" - return hasattr(self._apis[name], "state") - - def get_stream_state(self, name: str) -> Any: - """Get state of stream with corresponding name""" - return self._apis[name].state - - def set_stream_state(self, name: str, state: Any): - """Set state of stream with corresponding name""" - self._apis[name].state = state - - def _enumerate_methods(self) -> Mapping[str, callable]: - return {name: api.list for name, api in self._apis.items()} - - def health_check(self) -> Tuple[bool, str]: - alive = True - error_msg = None - - try: - self.settings() - except (FreshdeskUnauthorized, FreshdeskNotFound): - alive = False - error_msg = "Invalid credentials" - except FreshdeskError as error: - alive = False - error_msg = repr(error) - - return alive, error_msg diff --git a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/errors.py b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/errors.py deleted file mode 100644 index 846386294717..000000000000 --- a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/errors.py +++ /dev/null @@ -1,37 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -from requests import HTTPError - - -class FreshdeskError(HTTPError): - """ - Base error class. - Subclassing HTTPError to avoid breaking existing code that expects only HTTPErrors. - """ - - -class FreshdeskBadRequest(FreshdeskError): - """Most 40X and 501 status codes""" - - -class FreshdeskUnauthorized(FreshdeskError): - """401 Unauthorized""" - - -class FreshdeskAccessDenied(FreshdeskError): - """403 Forbidden""" - - -class FreshdeskNotFound(FreshdeskError): - """404""" - - -class FreshdeskRateLimited(FreshdeskError): - """429 Rate Limit Reached""" - - -class FreshdeskServerError(FreshdeskError): - """50X errors""" diff --git a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/schemas/groups.json b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/schemas/groups.json index 785511166015..e20a8fb85be0 100644 --- a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/schemas/groups.json +++ b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/schemas/groups.json @@ -30,7 +30,7 @@ "type": "string" }, "auto_ticket_assign": { - "type": ["string", "null"] + "type": ["integer", "null"] } } } diff --git a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/schemas/tickets.json b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/schemas/tickets.json index c23283526765..745a159ea809 100644 --- a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/schemas/tickets.json +++ b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/schemas/tickets.json @@ -3,16 +3,16 @@ "type": "object", "properties": { "cc_emails": { - "type": "array" + "type": ["array", "null"] }, "fwd_emails": { - "type": "array" + "type": ["array", "null"] }, "reply_cc_emails": { - "type": "array" + "type": ["array", "null"] }, "ticket_cc_emails": { - "type": "array" + "type": ["array", "null"] }, "fr_escalated": { "type": "boolean" @@ -33,7 +33,7 @@ "type": "integer" }, "responder_id": { - "type": "integer" + "type": ["integer", "null"] }, "source": { "type": "integer" diff --git a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/source.py b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/source.py index dec7ee54460c..90acc351103f 100644 --- a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/source.py +++ b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/source.py @@ -2,11 +2,69 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # +import logging +from typing import Any, List, Mapping, Optional, Tuple +from urllib.parse import urljoin -from airbyte_cdk.sources.deprecated.base_source import BaseSource +import requests +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from requests.auth import HTTPBasicAuth +from source_freshdesk.streams import ( + Agents, + Companies, + Contacts, + Conversations, + Groups, + Roles, + SatisfactionRatings, + Skills, + Surveys, + Tickets, + TimeEntries, +) -from .client import Client +class FreshdeskAuth(HTTPBasicAuth): + def __init__(self, api_key: str) -> None: + """ + Freshdesk expects the user to provide an api_key. Any string can be used as password: + https://developers.freshdesk.com/api/#authentication + """ + super().__init__(username=api_key, password="unused_with_api_key") -class SourceFreshdesk(BaseSource): - client_class = Client + +class SourceFreshdesk(AbstractSource): + def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: + alive = True + error_msg = None + try: + url = urljoin(f"https://{config['domain'].rstrip('/')}", "/api/v2/settings/helpdesk") + response = requests.get(url=url, auth=FreshdeskAuth(config["api_key"])) + response.raise_for_status() + except requests.HTTPError as error: + alive = False + body = error.response.json() + error_msg = f"{body.get('code')}: {body.get('message')}" + except Exception as error: + alive = False + error_msg = repr(error) + + return alive, error_msg + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + authenticator = FreshdeskAuth(config["api_key"]) + stream_kwargs = {"authenticator": authenticator, "config": config} + return [ + Agents(**stream_kwargs), + Companies(**stream_kwargs), + Contacts(**stream_kwargs), + Conversations(**stream_kwargs), + Groups(**stream_kwargs), + Roles(**stream_kwargs), + Skills(**stream_kwargs), + Surveys(**stream_kwargs), + TimeEntries(**stream_kwargs), + Tickets(**stream_kwargs), + SatisfactionRatings(**stream_kwargs), + ] diff --git a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py new file mode 100644 index 000000000000..5479064cd33a --- /dev/null +++ b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/streams.py @@ -0,0 +1,243 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + + +import re +from abc import ABC +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional +from urllib import parse + +import pendulum +import requests +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.core import IncrementalMixin +from airbyte_cdk.sources.streams.http import HttpStream +from requests.auth import AuthBase +from source_freshdesk.utils import CallCredit + + +class FreshdeskStream(HttpStream, ABC): + """Basic stream API that allows to iterate over entities""" + + call_credit = 1 # see https://developers.freshdesk.com/api/#embedding + result_return_limit = 100 + primary_key = "id" + link_regex = re.compile(r'<(.*?)>;\s*rel="next"') + + def __init__(self, authenticator: AuthBase, config: Mapping[str, Any], *args, **kwargs): + super().__init__(authenticator=authenticator) + requests_per_minute = config.get("requests_per_minute") + self.domain = config["domain"] + self._call_credit = CallCredit(balance=requests_per_minute) if requests_per_minute else None + # By default, only tickets that have been created within the past 30 days will be returned. + # Since this logic rely not on updated tickets, it can break tickets dependant streams - conversations. + # So updated_since parameter will be always used in tickets streams. And start_date will be used too + # with default value 30 days look back. + self.start_date = ( + pendulum.parse(config.get("start_date")) if config.get("start_date") else pendulum.now() - pendulum.duration(days=30) + ) + + @property + def url_base(self) -> str: + return parse.urljoin(f"https://{self.domain.rstrip('/')}", "/api/v2") + + def backoff_time(self, response: requests.Response) -> Optional[float]: + if response.status_code == requests.codes.too_many_requests: + return float(response.headers.get("Retry-After", 0)) + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + link_header = response.headers.get("Link") + if not link_header: + return {} + match = self.link_regex.search(link_header) + next_url = match.group(1) + params = parse.parse_qs(parse.urlparse(next_url).query) + return self.parse_link_params(link_query_params=params) + + def parse_link_params(self, link_query_params: Mapping[str, List[str]]) -> Mapping[str, Any]: + return {"per_page": link_query_params["per_page"][0], "page": link_query_params["page"][0]} + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + params = {"per_page": self.result_return_limit} + if next_page_token and "page" in next_page_token: + params["page"] = next_page_token["page"] + return params + + def _consume_credit(self, credit): + """Consume call credit, if there is no credit left within current window will sleep til next period""" + if self._call_credit: + self._call_credit.consume(credit) + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + self._consume_credit(self.call_credit) + yield from super().read_records( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ) + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + data = response.json() + return data if data else [] + + +class IncrementalFreshdeskStream(FreshdeskStream, IncrementalMixin): + + cursor_filter = "updated_since" # Name of filter that corresponds to the state + state_checkpoint_interval = 100 + + def __init__(self, authenticator: AuthBase, config: Mapping[str, Any], *args, **kwargs): + super().__init__(authenticator=authenticator, config=config, *args, **kwargs) + self._cursor_value = "" + + @property + def cursor_field(self) -> str: + return "updated_at" + + @property + def state(self) -> MutableMapping[str, Any]: + return {self.cursor_field: self._cursor_value} if self._cursor_value else {} + + @state.setter + def state(self, value: MutableMapping[str, Any]): + self._cursor_value = value.get(self.cursor_field, self.start_date) + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + params[self.cursor_filter] = stream_state.get(self.cursor_field, self.start_date) + return params + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + for record in super().read_records( + sync_mode=sync_mode, cursor_field=cursor_field, stream_slice=stream_slice, stream_state=stream_state + ): + yield record + self._cursor_value = max(record[self.cursor_field], self._cursor_value) + + +class Agents(FreshdeskStream): + def path(self, **kwargs) -> str: + return "agents" + + +class Companies(FreshdeskStream): + def path(self, **kwargs) -> str: + return "companies" + + +class Contacts(IncrementalFreshdeskStream): + cursor_filter = "_updated_since" + + def path(self, **kwargs) -> str: + return "contacts" + + +class Groups(FreshdeskStream): + def path(self, **kwargs) -> str: + return "groups" + + +class Roles(FreshdeskStream): + def path(self, **kwargs) -> str: + return "roles" + + +class Skills(FreshdeskStream): + def path(self, **kwargs) -> str: + return "skills" + + +class TimeEntries(FreshdeskStream): + def path(self, **kwargs) -> str: + return "time_entries" + + +class Surveys(FreshdeskStream): + def path(self, **kwargs) -> str: + return "surveys" + + +class Tickets(IncrementalFreshdeskStream): + ticket_paginate_limit = 300 + call_credit = 3 # each include consumes 2 additional credits + use_cache = True + + def path(self, **kwargs) -> str: + return "tickets" + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=next_page_token) + includes = ["description", "requester", "stats"] + params.update( + {"order_type": "asc", "order_by": self.cursor_field, "include": ",".join(includes)} # ASC order, to get the old records first + ) + if next_page_token and self.cursor_filter in next_page_token: + params[self.cursor_filter] = next_page_token[self.cursor_filter] + return params + + def parse_link_params(self, link_query_params: Mapping[str, List[str]]) -> Mapping[str, Any]: + params = super().parse_link_params(link_query_params) + if self.cursor_filter in link_query_params: + params[self.cursor_filter] = link_query_params[self.cursor_filter][0] + return params + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + """ + This block extends Incremental stream to overcome '300 page' server error. + Since the Ticket endpoint has a 300 page pagination limit, after 300 pages, update the parameters with + query using 'updated_since' = last_record, if there is more data remaining. + """ + next_page_token = super().next_page_token(response=response) + + if next_page_token and int(next_page_token["page"]) > self.ticket_paginate_limit: + # get last_record from latest batch, pos. -1, because of ACS order of records + last_record_updated_at = response.json()[-1]["updated_at"] + last_record_updated_at = pendulum.parse(last_record_updated_at) + # updating request parameters with last_record state + next_page_token[self.cursor_filter] = last_record_updated_at + next_page_token.pop("page") + + return next_page_token + + +class Conversations(FreshdeskStream): + """Notes and Replies""" + + def __init__(self, authenticator: AuthBase, config: Mapping[str, Any], *args, **kwargs): + super().__init__(authenticator=authenticator, config=config, args=args, kwargs=kwargs) + self.tickets_stream = Tickets(authenticator=authenticator, config=config) + + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: + return f"tickets/{stream_slice['id']}/conversations" + + def stream_slices( + self, *, sync_mode: SyncMode, cursor_field: List[str] = None, stream_state: Mapping[str, Any] = None + ) -> Iterable[Optional[Mapping[str, Any]]]: + for ticket in self.tickets_stream.read_records( + sync_mode=SyncMode.full_refresh, cursor_field=cursor_field, stream_slice={}, stream_state={} + ): + yield {"id": ticket["id"]} + + +class SatisfactionRatings(IncrementalFreshdeskStream): + cursor_filter = "created_since" + + def path(self, **kwargs) -> str: + return "surveys/satisfaction_ratings" diff --git a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/utils.py b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/utils.py index 279eb00b1848..e85896e56eab 100644 --- a/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/utils.py +++ b/airbyte-integrations/connectors/source-freshdesk/source_freshdesk/utils.py @@ -2,59 +2,9 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - -import sys import time -import backoff -import requests from airbyte_cdk.entrypoint import logger -from source_freshdesk.errors import FreshdeskRateLimited - - -def retry_connection_handler(**kwargs): - """Retry helper, log each attempt""" - - def log_retry_attempt(details): - _, exc, _ = sys.exc_info() - logger.info(str(exc)) - logger.info(f"Caught retryable error after {details['tries']} tries. Waiting {details['wait']} more seconds then retrying...") - - def giveup_handler(exc): - return exc.response is not None and 400 <= exc.response.status_code < 500 - - return backoff.on_exception( - backoff.expo, - requests.exceptions.RequestException, - jitter=None, - on_backoff=log_retry_attempt, - giveup=giveup_handler, - **kwargs, - ) - - -def retry_after_handler(**kwargs): - """Retry helper when we hit the call limit, sleeps for specific duration""" - - def sleep_on_ratelimit(_details): - _, exc, _ = sys.exc_info() - if isinstance(exc, FreshdeskRateLimited): - retry_after = int(exc.response.headers["Retry-After"]) - logger.info(f"Rate limit reached. Sleeping for {retry_after} seconds") - time.sleep(retry_after + 1) # extra second to cover any fractions of second - - def log_giveup(_details): - logger.error("Max retry limit reached") - - return backoff.on_exception( - backoff.constant, - FreshdeskRateLimited, - jitter=None, - on_backoff=sleep_on_ratelimit, - on_giveup=log_giveup, - interval=0, # skip waiting part, we will wait in on_backoff handler - **kwargs, - ) class CallCredit: diff --git a/airbyte-integrations/connectors/source-freshdesk/unit_tests/conftest.py b/airbyte-integrations/connectors/source-freshdesk/unit_tests/conftest.py new file mode 100644 index 000000000000..410faa8833b0 --- /dev/null +++ b/airbyte-integrations/connectors/source-freshdesk/unit_tests/conftest.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pytest +from requests.auth import HTTPBasicAuth + + +@pytest.fixture(name="config") +def config_fixture(): + return {"domain": "test.freshdesk.com", "api_key": "secret_api_key", "requests_per_minute": 50, "start_date": "2002-02-10T22:21:44Z"} + + +@pytest.fixture(name="authenticator") +def authenticator_fixture(config): + return HTTPBasicAuth(username=config["api_key"], password="unused_with_api_key") diff --git a/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_300_page.py b/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_300_page.py index a23a1ea702c7..a43443f80218 100644 --- a/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_300_page.py +++ b/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_300_page.py @@ -2,50 +2,86 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -import pendulum -from source_freshdesk.api import TicketsAPI +import pytest +from airbyte_cdk.models import SyncMode +from source_freshdesk.streams import Tickets -class Test300PageLimit: - - tickets_input = [ - {"id": 1, "updated_at": "2018-01-02T00:00:00Z"}, - {"id": 2, "updated_at": "2018-02-02T00:00:00Z"}, - {"id": 3, "updated_at": "2018-03-02T00:00:00Z"}, - {"id": 4, "updated_at": "2019-01-03T00:00:00Z"}, - {"id": 5, "updated_at": "2019-02-03T00:00:00Z"}, - {"id": 6, "updated_at": "2019-03-03T00:00:00Z"}, - ] - - expected_output = [ - {"id": 1, "updated_at": "2018-01-02T00:00:00Z"}, - {"id": 2, "updated_at": "2018-02-02T00:00:00Z"}, - {"id": 2, "updated_at": "2018-02-02T00:00:00Z"}, # duplicate - {"id": 3, "updated_at": "2018-03-02T00:00:00Z"}, - {"id": 3, "updated_at": "2018-03-02T00:00:00Z"}, # duplicate - {"id": 4, "updated_at": "2019-01-03T00:00:00Z"}, - {"id": 4, "updated_at": "2019-01-03T00:00:00Z"}, # duplicate - {"id": 5, "updated_at": "2019-02-03T00:00:00Z"}, - {"id": 5, "updated_at": "2019-02-03T00:00:00Z"}, # duplicate - {"id": 6, "updated_at": "2019-03-03T00:00:00Z"}, - {"id": 6, "updated_at": "2019-03-03T00:00:00Z"}, # duplicate +@pytest.fixture(name="responses") +def responses_fixtures(): + return [ + { + "url": "/api/tickets?per_page=1&updated_since=2002-02-10T22%3A21%3A44%2B00%3A00", + "json": [{"id": 1, "updated_at": "2018-01-02T00:00:00Z"}], + "headers": { + "Link": '; rel="next"' + }, + }, + { + "url": "/api/tickets?per_page=1&page=2&updated_since=2002-02-10T22%3A21%3A44%2B00%3A00", + "json": [{"id": 2, "updated_at": "2018-02-02T00:00:00Z"}], + "headers": { + "Link": '; rel="next"' + }, + }, + { + "url": "/api/tickets?per_page=1&updated_since=2018-02-02T00%3A00%3A00%2B00%3A00", + "json": [{"id": 2, "updated_at": "2018-02-02T00:00:00Z"}], + "headers": { + "Link": '; rel="next"' + }, + }, + { + "url": "/api/tickets?per_page=1&page=2&updated_since=2018-02-02T00%3A00%3A00%2B00%3A00", + "json": [{"id": 3, "updated_at": "2018-03-02T00:00:00Z"}], + "headers": { + "Link": '; rel="next"' + }, + }, + { + "url": "/api/tickets?per_page=1&updated_since=2018-03-02T00%3A00%3A00%2B00%3A00", + "json": [{"id": 3, "updated_at": "2018-03-02T00:00:00Z"}], + "headers": { + "Link": '; rel="next"' + }, + }, + { + "url": "/api/tickets?per_page=1&page=2&updated_since=2018-03-02T00%3A00%3A00%2B00%3A00", + "json": [{"id": 4, "updated_at": "2019-01-03T00:00:00Z"}], + "headers": { + "Link": '; rel="next"' + }, + }, + { + "url": "/api/tickets?per_page=1&updated_since=2019-01-03T00%3A00%3A00%2B00%3A00", + "json": [{"id": 4, "updated_at": "2019-01-03T00:00:00Z"}], + "headers": { + "Link": '; rel="next"' + }, + }, + { + "url": "/api/tickets?per_page=1&page=2&updated_since=2019-01-03T00%3A00%3A00%2B00%3A00", + "json": [{"id": 5, "updated_at": "2019-02-03T00:00:00Z"}], + "headers": { + "Link": '; rel="next"' + }, + }, + { + "url": "/api/tickets?per_page=1&updated_since=2019-02-03T00%3A00%3A00%2B00%3A00", + "json": [{"id": 5, "updated_at": "2019-02-03T00:00:00Z"}], + "headers": { + "Link": '; rel="next"' + }, + }, + { + "url": "/api/tickets?per_page=1&page=2&updated_since=2019-02-03T00%3A00%3A00%2B00%3A00", + "json": [{"id": 6, "updated_at": "2019-03-03T00:00:00Z"}], + }, ] - # Mocking the getter: Callable to produce the server output - def _getter(self, params, **args): - - tickets_stream = self.tickets_input - updated_since = params.get("updated_since", None) - - if updated_since: - tickets_stream = filter(lambda ticket: pendulum.parse(ticket["updated_at"]) >= updated_since, self.tickets_input) - - start_from = (params["page"] - 1) * params["per_page"] - output = list(tickets_stream)[start_from : start_from + params["per_page"]] - return output - - def test_not_all_records(self): +class Test300PageLimit: + def test_not_all_records(self, requests_mock, authenticator, config, responses): """ TEST 1 - not all records are retrieved @@ -64,16 +100,40 @@ def test_not_all_records(self): Main pricipal here is: airbyte is at-least-once delivery, but skipping records is data loss. """ + expected_output = [ + {"id": 1, "updated_at": "2018-01-02T00:00:00Z"}, + {"id": 2, "updated_at": "2018-02-02T00:00:00Z"}, + {"id": 2, "updated_at": "2018-02-02T00:00:00Z"}, # duplicate + {"id": 3, "updated_at": "2018-03-02T00:00:00Z"}, + {"id": 3, "updated_at": "2018-03-02T00:00:00Z"}, # duplicate + {"id": 4, "updated_at": "2019-01-03T00:00:00Z"}, + {"id": 4, "updated_at": "2019-01-03T00:00:00Z"}, # duplicate + {"id": 5, "updated_at": "2019-02-03T00:00:00Z"}, + {"id": 5, "updated_at": "2019-02-03T00:00:00Z"}, # duplicate + {"id": 6, "updated_at": "2019-03-03T00:00:00Z"}, + ] + # INT value of page number where the switch state should be triggered. # in this test case values from: 1 - 4, assuming we want to switch state on this page. ticket_paginate_limit = 2 # This parameter mocks the "per_page" parameter in the API Call result_return_limit = 1 - # Calling the TicketsAPI.get_tickets method directly from the module - test1 = list( - TicketsAPI.get_tickets( - result_return_limit=result_return_limit, getter=self._getter, ticket_paginate_limit=ticket_paginate_limit + + # Create test_stream instance. + test_stream = Tickets(authenticator=authenticator, config=config) + test_stream.ticket_paginate_limit = ticket_paginate_limit + test_stream.result_return_limit = result_return_limit + + # Mocking Request + for response in responses: + requests_mock.register_uri( + "GET", + response["url"], + json=response["json"], + headers=response.get("headers", {}), ) - ) + + records = list(test_stream.read_records(sync_mode=SyncMode.full_refresh)) + # We're expecting 6 records to return from the tickets_stream - assert self.expected_output == test1 + assert records == expected_output diff --git a/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_client.py b/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_client.py deleted file mode 100644 index fab3cb17b700..000000000000 --- a/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_client.py +++ /dev/null @@ -1,45 +0,0 @@ -# -# Copyright (c) 2022 Airbyte, Inc., all rights reserved. -# - - -from pathlib import Path - -from pytest import fixture -from source_freshdesk.client import Client - -HERE = Path(__file__).parent.absolute() - - -@fixture(autouse=True) -def time_sleep_mock(mocker): - time_mock = mocker.patch("time.sleep", lambda x: None) - yield time_mock - - -def test_client_backoff_on_limit_reached(requests_mock): - """Error once, check that we retry and not fail""" - responses = [ - {"json": {"error": "limit reached"}, "status_code": 429, "headers": {"Retry-After": "0"}}, - {"json": {"status": "ok"}, "status_code": 200}, - ] - requests_mock.register_uri("GET", "/api/v2/settings/helpdesk", responses) - client = Client(domain="someaccount.freshdesk.com", api_key="somekey") - - result = client.settings() - - assert result == {"status": "ok"} - - -def test_client_backoff_on_server_error(requests_mock): - """Error once, check that we retry and not fail""" - responses = [ - {"json": {"error": "something bad"}, "status_code": 500}, - {"json": {"status": "ok"}, "status_code": 200}, - ] - requests_mock.register_uri("GET", "/api/v2/settings/helpdesk", responses) - client = Client(domain="someaccount.freshdesk.com", api_key="somekey") - - result = client.settings() - - assert result == {"status": "ok"} diff --git a/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_source.py b/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_source.py new file mode 100644 index 000000000000..3a9668f4ca63 --- /dev/null +++ b/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_source.py @@ -0,0 +1,57 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import logging + +from source_freshdesk import SourceFreshdesk + +logger = logging.getLogger("test_source") + + +def test_check_connection_ok(requests_mock, config): + json_resp = {"primary_language": "en", "supported_languages": [], "portal_languages": []} + + requests_mock.register_uri("GET", "/api/v2/settings/helpdesk", json=json_resp) + ok, error_msg = SourceFreshdesk().check_connection(logger, config=config) + + assert ok and not error_msg + + +def test_check_connection_invalid_api_key(requests_mock, config): + responses = [ + {"json": {"code": "invalid_credentials", "message": "You have to be logged in to perform this action."}, "status_code": 401} + ] + + requests_mock.register_uri("GET", "/api/v2/settings/helpdesk", responses) + ok, error_msg = SourceFreshdesk().check_connection(logger, config=config) + + assert not ok and error_msg == "invalid_credentials: You have to be logged in to perform this action." + + +def test_check_connection_empty_config(config): + config = {} + + ok, error_msg = SourceFreshdesk().check_connection(logger, config=config) + + assert not ok and error_msg + + +def test_check_connection_invalid_config(config): + config.pop("api_key") + + ok, error_msg = SourceFreshdesk().check_connection(logger, config=config) + + assert not ok and error_msg + + +def test_check_connection_exception(config): + ok, error_msg = SourceFreshdesk().check_connection(logger, config=config) + + assert not ok and error_msg + + +def test_streams(config): + streams = SourceFreshdesk().streams(config) + + assert len(streams) == 11 diff --git a/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_streams.py new file mode 100644 index 000000000000..e74569512e5a --- /dev/null +++ b/airbyte-integrations/connectors/source-freshdesk/unit_tests/test_streams.py @@ -0,0 +1,99 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import random +from typing import Any, MutableMapping + +import pytest +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams import Stream +from source_freshdesk.streams import ( + Agents, + Companies, + Contacts, + Conversations, + Groups, + Roles, + SatisfactionRatings, + Skills, + Tickets, + TimeEntries, +) + + +def _read_full_refresh(stream_instance: Stream): + records = [] + slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh) + for slice in slices: + records.extend(list(stream_instance.read_records(stream_slice=slice, sync_mode=SyncMode.full_refresh))) + return records + + +def _read_incremental(stream_instance: Stream, stream_state: MutableMapping[str, Any]): + res = [] + slices = stream_instance.stream_slices(sync_mode=SyncMode.incremental, stream_state=stream_state) + for slice in slices: + records = stream_instance.read_records(sync_mode=SyncMode.incremental, stream_slice=slice, stream_state=stream_state) + for record in records: + res.append(record) + return res, stream_instance.state + + +@pytest.mark.parametrize( + "stream, resource", + [ + (Agents, "agents"), + (Companies, "companies"), + (Contacts, "contacts"), + (Groups, "groups"), + (Roles, "roles"), + (Skills, "skills"), + (TimeEntries, "time_entries"), + (SatisfactionRatings, "surveys/satisfaction_ratings"), + ], +) +def test_full_refresh(stream, resource, authenticator, config, requests_mock): + requests_mock.register_uri("GET", f"/api/{resource}", json=[{"id": x, "updated_at": "2022-05-05T00:00:00Z"} for x in range(25)]) + + stream = stream(authenticator=authenticator, config=config) + records = _read_full_refresh(stream) + + assert len(records) == 25 + + +def test_full_refresh_conversations(authenticator, config, requests_mock): + requests_mock.register_uri("GET", "/api/tickets", json=[{"id": x, "updated_at": "2022-05-05T00:00:00Z"} for x in range(5)]) + for i in range(5): + requests_mock.register_uri("GET", f"/api/tickets/{i}/conversations", json=[{"id": x} for x in range(10)]) + + stream = Conversations(authenticator=authenticator, config=config) + records = _read_full_refresh(stream) + + assert len(records) == 50 + + +@pytest.mark.parametrize( + "stream, resource", + [ + (Contacts, "contacts"), + (Tickets, "tickets"), + (SatisfactionRatings, "surveys/satisfaction_ratings"), + ], +) +def test_incremental(stream, resource, authenticator, config, requests_mock): + highest_updated_at = "2022-04-25T22:00:00Z" + other_updated_at = "2022-04-01T00:00:00Z" + highest_index = random.randint(0, 25) + requests_mock.register_uri( + "GET", + f"/api/{resource}", + json=[{"id": x, "updated_at": highest_updated_at if x == highest_index else other_updated_at} for x in range(25)], + ) + + stream = stream(authenticator=authenticator, config=config) + records, state = _read_incremental(stream, {}) + + assert len(records) == 25 + assert "updated_at" in state + assert state["updated_at"] == highest_updated_at diff --git a/docs/integrations/sources/freshdesk.md b/docs/integrations/sources/freshdesk.md index ee2eeb8d80cb..6a63af9f4c8c 100644 --- a/docs/integrations/sources/freshdesk.md +++ b/docs/integrations/sources/freshdesk.md @@ -53,9 +53,9 @@ Please read [How to find your API key](https://support.freshdesk.com/support/sol | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------| +| 0.3.0 | 2022-05-30 | [12334](https://github.com/airbytehq/airbyte/pull/12334) | Implement with latest CDK | 0.2.11 | 2021-12-14 | [8682](https://github.com/airbytehq/airbyte/pull/8682) | Migrate to the CDK | | 0.2.10 | 2021-12-06 | [8524](https://github.com/airbytehq/airbyte/pull/8524) | Update connector fields title/description | | 0.2.9 | 2021-11-16 | [8017](https://github.com/airbytehq/airbyte/pull/8017) | Bugfix an issue that caused the connector not to sync more than 50000 contacts | | 0.2.8 | 2021-10-28 | [7486](https://github.com/airbytehq/airbyte/pull/7486) | Include "requester" and "stats" fields in "tickets" stream | | 0.2.7 | 2021-10-13 | [6442](https://github.com/airbytehq/airbyte/pull/6442) | Add start_date parameter to specification from which to start pulling data. | -