diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/c2281cee-86f9-4a86-bb48-d23286b4c7bd.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/c2281cee-86f9-4a86-bb48-d23286b4c7bd.json index d535b0f62aa6..83b1d7823486 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/c2281cee-86f9-4a86-bb48-d23286b4c7bd.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/c2281cee-86f9-4a86-bb48-d23286b4c7bd.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "c2281cee-86f9-4a86-bb48-d23286b4c7bd", "name": "Slack", "dockerRepository": "airbyte/source-slack", - "dockerImageTag": "0.1.8", + "dockerImageTag": "0.1.9", "documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-slack", "icon": "slack.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index e956bc1e4f90..f5dd0e093f31 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -346,7 +346,7 @@ - sourceDefinitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd name: Slack dockerRepository: airbyte/source-slack - dockerImageTag: 0.1.8 + dockerImageTag: 0.1.9 documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-slack icon: slack.svg - sourceDefinitionId: 6ff047c0-f5d5-4ce5-8c81-204a830fa7e1 diff --git a/airbyte-integrations/connectors/source-slack/Dockerfile b/airbyte-integrations/connectors/source-slack/Dockerfile index 745fecf685db..46cd52cef9d5 100644 --- a/airbyte-integrations/connectors/source-slack/Dockerfile +++ b/airbyte-integrations/connectors/source-slack/Dockerfile @@ -16,5 +16,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.8 +LABEL io.airbyte.version=0.1.9 LABEL io.airbyte.name=airbyte/source-slack diff --git a/airbyte-integrations/connectors/source-slack/acceptance-test-config.yml b/airbyte-integrations/connectors/source-slack/acceptance-test-config.yml index 54381c1aac0d..2582853ac118 100644 --- a/airbyte-integrations/connectors/source-slack/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-slack/acceptance-test-config.yml @@ -12,13 +12,13 @@ tests: basic_read: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/full_refresh_catalog.json" - validate_output_from_all_streams: yes incremental: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog.json" timeout_seconds: 3600 cursor_paths: channel_messages: ["float_ts"] + threads: ["float_ts"] full_refresh: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/full_refresh_catalog.json" diff --git a/airbyte-integrations/connectors/source-slack/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-slack/integration_tests/configured_catalog.json index c03b16c51c09..13072b83bdf3 100644 --- a/airbyte-integrations/connectors/source-slack/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-slack/integration_tests/configured_catalog.json @@ -1,5 +1,4 @@ { - "comment": "deleted threads from configured catalog due to the timeout error in incremental stream", "streams": [ { "stream": { @@ -223,6 +222,80 @@ "cursor_field": ["float_ts"], "sync_mode": "incremental", "destination_sync_mode": "append" + }, + { + "stream": { + "name": "threads", + "json_schema": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "channel_id": { + "type": ["null", "string"] + }, + "client_msg_id": { + "type": ["null", "string"] + }, + "type": { + "type": ["null", "string"] + }, + "text": { + "type": ["null", "string"] + }, + "user": { + "type": ["null", "string"] + }, + "ts": { + "type": ["null", "string"] + }, + "float_ts": { + "type": ["null", "number"] + }, + "team": { + "type": ["null", "string"] + }, + "blocks": { + "type": ["null", "array"], + "items": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "type": { + "type": ["null", "string"] + } + } + } + }, + "thread_ts": { + "type": ["null", "string"] + }, + "reply_count": { + "type": ["null", "integer"] + }, + "reply_users_count": { + "type": ["null", "number"] + }, + "latest_reply": { + "type": ["null", "string"] + }, + "reply_users": { + "type": ["null", "array"], + "items": { + "type": "string" + } + }, + "subscribed": { + "type": ["null", "boolean"] + } + } + }, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["float_ts"] + }, + "cursor_field": ["float_ts"], + "sync_mode": "incremental", + "destination_sync_mode": "append" } ] } diff --git a/airbyte-integrations/connectors/source-slack/integration_tests/integration_test.py b/airbyte-integrations/connectors/source-slack/integration_tests/integration_test.py new file mode 100644 index 000000000000..baa10de548f9 --- /dev/null +++ b/airbyte-integrations/connectors/source-slack/integration_tests/integration_test.py @@ -0,0 +1,28 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + + +def test_dummy_test(): + """This test added for successful passing customIntegrationTests""" + pass diff --git a/airbyte-integrations/connectors/source-slack/requirements.txt b/airbyte-integrations/connectors/source-slack/requirements.txt new file mode 100644 index 000000000000..0411042aa091 --- /dev/null +++ b/airbyte-integrations/connectors/source-slack/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-slack/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-slack/sample_files/configured_catalog.json index 7cc0909e504f..13072b83bdf3 100644 --- a/airbyte-integrations/connectors/source-slack/sample_files/configured_catalog.json +++ b/airbyte-integrations/connectors/source-slack/sample_files/configured_catalog.json @@ -1,6 +1,228 @@ { - "comment": "deleted threads from configured catalog due to the timeout error in incremental stream", "streams": [ + { + "stream": { + "name": "channel_messages", + "json_schema": { + "additionalProperties": false, + "properties": { + "channel_id": { + "type": ["null", "string"] + }, + "blocks": { + "items": { + "additionalProperties": true, + "properties": { + "type": { + "type": ["null", "string"] + } + }, + "type": ["null", "object"] + }, + "type": ["null", "array"] + }, + "bot_id": { + "type": ["null", "string"] + }, + "bot_profile": { + "additionalProperties": false, + "properties": { + "app_id": { + "type": ["null", "string"] + }, + "deleted": { + "type": ["null", "boolean"] + }, + "id": { + "type": ["null", "string"] + }, + "name": { + "type": ["null", "string"] + }, + "team_id": { + "type": ["null", "string"] + }, + "updated": { + "type": ["null", "string"], + "format": "date-time" + } + }, + "type": ["null", "object"] + }, + "attachments": { + "items": { + "properties": { + "title": { + "type": ["null", "string"] + }, + "id": { + "type": ["null", "integer"] + }, + "color": { + "type": ["null", "string"] + }, + "fallback": { + "type": ["null", "string"] + } + }, + "type": ["null", "object"] + }, + "type": ["null", "array"] + }, + "client_msg_id": { + "type": ["null", "string"] + }, + "display_as_bot": { + "type": ["null", "boolean"] + }, + "file_id": { + "type": ["null", "string"] + }, + "file_ids": { + "items": { + "type": ["null", "string"] + }, + "type": ["null", "array"] + }, + "icons": { + "additionalProperties": false, + "properties": { + "emoji": { + "type": ["null", "string"] + } + }, + "type": ["null", "object"] + }, + "inviter": { + "type": ["null", "string"] + }, + "is_delayed_message": { + "type": ["null", "boolean"] + }, + "is_intro": { + "type": ["null", "boolean"] + }, + "is_starred": { + "type": ["null", "boolean"] + }, + "last_read": { + "type": ["null", "string"], + "format": "date-time" + }, + "latest_reply": { + "type": ["null", "string"] + }, + "name": { + "type": ["null", "string"] + }, + "old_name": { + "type": ["null", "string"] + }, + "parent_user_id": { + "type": ["null", "string"] + }, + "permalink": { + "format": "uri", + "type": ["null", "string"] + }, + "pinned_to": { + "items": { + "type": ["null", "string"] + }, + "type": ["null", "array"] + }, + "purpose": { + "type": ["null", "string"] + }, + "reactions": { + "items": { + "additionalProperties": true, + "properties": { + "count": { + "type": ["null", "integer"] + }, + "name": { + "type": ["null", "string"] + }, + "users": { + "items": { + "type": ["null", "string"] + }, + "type": ["null", "array"] + } + }, + "type": ["null", "object"] + }, + "type": ["null", "array"] + }, + "reply_count": { + "type": ["null", "integer"] + }, + "reply_users": { + "items": { + "type": ["null", "string"] + }, + "type": ["null", "array"] + }, + "reply_users_count": { + "type": ["null", "integer"] + }, + "source_team": { + "type": ["null", "string"] + }, + "subscribed": { + "type": ["null", "boolean"] + }, + "subtype": { + "type": ["null", "string"] + }, + "team": { + "type": ["null", "string"] + }, + "text": { + "type": ["null", "string"] + }, + "thread_ts": { + "type": ["null", "string"] + }, + "topic": { + "type": ["null", "string"] + }, + "ts": { + "type": ["null", "string"] + }, + "float_ts": { + "type": ["null", "number"] + }, + "type": { + "type": ["null", "string"] + }, + "unread_count": { + "type": ["null", "integer"] + }, + "upload": { + "type": ["null", "boolean"] + }, + "user": { + "type": ["null", "string"] + }, + "user_team": { + "type": ["null", "string"] + }, + "username": { + "type": ["null", "string"] + } + }, + "type": ["null", "object"] + }, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["float_ts"] + }, + "cursor_field": ["float_ts"], + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, { "stream": { "name": "threads", diff --git a/airbyte-integrations/connectors/source-slack/setup.py b/airbyte-integrations/connectors/source-slack/setup.py index ae53beadb99c..f4c9831e8c42 100644 --- a/airbyte-integrations/connectors/source-slack/setup.py +++ b/airbyte-integrations/connectors/source-slack/setup.py @@ -31,6 +31,6 @@ author="Airbyte", author_email="contact@airbyte.io", packages=find_packages(), - install_requires=["airbyte-cdk==0.1.2", "pytest==6.1.2", "slack_sdk==3.4.2", "pendulum>=2,<3"], + install_requires=["airbyte-cdk", "pytest==6.1.2", "slack_sdk==3.4.2", "pendulum>=2,<3"], package_data={"": ["*.json"]}, ) diff --git a/airbyte-integrations/connectors/source-slack/source_slack/source.py b/airbyte-integrations/connectors/source-slack/source_slack/source.py index 7907d718e907..58a31846f80f 100644 --- a/airbyte-integrations/connectors/source-slack/source_slack/source.py +++ b/airbyte-integrations/connectors/source-slack/source_slack/source.py @@ -22,7 +22,7 @@ # SOFTWARE. # - +import copy from abc import ABC, abstractmethod from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple @@ -44,8 +44,10 @@ class SlackStream(HttpStream, ABC): page_size = 100 def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: - # Slack uses a cursor-based pagination strategy. - # Extract the cursor from the response if it exists and return it in a format that can be used to update request parameters + """Slack uses a cursor-based pagination strategy. + Extract the cursor from the response if it exists and return it in a format + that can be used to update request parameters""" + json_response = response.json() next_cursor = json_response.get("response_metadata", {}).get("next_cursor") if next_cursor: @@ -73,11 +75,14 @@ def parse_response( yield from json_response.get(self.data_field, []) def backoff_time(self, response: requests.Response) -> Optional[float]: - # This method is called if we run into the rate limit. Slack puts the retry time in the `Retry-After` response header so we - # we return that value. If the response is anything other than a 429 (e.g: 5XX) fall back on default retry behavior. - # https://api.slack.com/docs/rate-limits#web - if response.status_code == 429: - return int(response.headers.get("Retry-After", 0)) + """This method is called if we run into the rate limit. + Slack puts the retry time in the `Retry-After` response header so we + we return that value. If the response is anything other than a 429 (e.g: 5XX) + fall back on default retry behavior. + + Rate Limits Docs: https://api.slack.com/docs/rate-limits#web""" + + return int(response.headers.get("Retry-After", 0)) @property @abstractmethod @@ -132,6 +137,7 @@ def chunk_date_range(start_date: DateTime, interval=pendulum.duration(days=1)) - Yields a list of the beginning and ending timestamps of each day between the start date and now. The return value is a pendulum.period """ + now = pendulum.now() # Each stream_slice contains the beginning and ending timestamp for a 24 hour period while start_date <= now: @@ -147,8 +153,17 @@ class IncrementalMessageStream(SlackStream, ABC): def __init__(self, default_start_date: DateTime, **kwargs): self._start_ts = default_start_date.timestamp() + self.set_sub_primary_key() super().__init__(**kwargs) + def set_sub_primary_key(self): + if isinstance(self.primary_key, list): + for index, value in enumerate(self.primary_key): + setattr(self, f"sub_primary_key_{index + 1}", value) + else: + logger = AirbyteLogger() + logger.error("Failed during setting sub primary keys. Primary key should be list.") + def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, **kwargs) -> MutableMapping[str, Any]: params = super().request_params(stream_state=stream_state, stream_slice=stream_slice, **kwargs) params.update(**stream_slice) @@ -156,8 +171,8 @@ def request_params(self, stream_state: Mapping[str, Any], stream_slice: Mapping[ def parse_response(self, response: requests.Response, stream_slice: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping]: for record in super().parse_response(response, **kwargs): - record[self.primary_key[0]] = stream_slice.get("channel", "") - record[self.cursor_field] = float(record[self.primary_key[1]]) + record[self.sub_primary_key_1] = stream_slice.get("channel", "") + record[self.cursor_field] = float(record[self.sub_primary_key_2]) yield record def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: @@ -219,21 +234,38 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite stream_state = stream_state or {} channels_stream = Channels(authenticator=self.authenticator) + if self.cursor_field in stream_state: # Since new messages can be posted to threads continuously after the parent message has been posted, we get messages from the latest date - # found in the state minus 7 days to pick up any new messages in threads. + # found in the state minus X days to pick up any new messages in threads. # If there is state always use lookback messages_start_date = pendulum.from_timestamp(stream_state[self.cursor_field]) - self.messages_lookback_window else: # If there is no state i.e: this is the first sync then there is no use for lookback, just get messages from the default start date messages_start_date = pendulum.from_timestamp(self._start_ts) + messages_stream = ChannelMessages(authenticator=self.authenticator, default_start_date=messages_start_date) + for message_chunk in messages_stream.stream_slices(stream_state={self.cursor_field: messages_start_date.timestamp()}): self.logger.info(f"Syncing replies {message_chunk}") + for channel in channels_stream.read_records(sync_mode=SyncMode.full_refresh): message_chunk["channel"] = channel["id"] + for message in messages_stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=message_chunk): - yield {"channel": channel["id"], self.cursor_field: message[self.primary_key]} + yield {"channel": channel["id"], self.sub_primary_key_2: message[self.sub_primary_key_2]} + + def read_records(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Mapping[str, Any]]: + """ + Filtering already readed records for incremental sync. Copied state value to X after the last sync + to really 100% make sure no one can edit the state during the run. + """ + + initial_state = copy.deepcopy(stream_state) or {} + + for record in super().read_records(stream_state=stream_state, **kwargs): + if record.get(self.cursor_field, 0) >= initial_state.get(self.cursor_field, 0): + yield record class JoinChannelsStream(HttpStream): diff --git a/docs/integrations/sources/slack.md b/docs/integrations/sources/slack.md index 52e4277d1d29..be8971936d6f 100644 --- a/docs/integrations/sources/slack.md +++ b/docs/integrations/sources/slack.md @@ -101,5 +101,6 @@ We recommend creating a restricted, read-only key specifically for Airbyte acces | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | +| 0.1.9 | 2021-07-20 | [4860](https://github.com/airbytehq/airbyte/pull/4860) | Fixed reading threads issue | | 0.1.8 | 2021-07-14 | [4683](https://github.com/airbytehq/airbyte/pull/4683) | Add float_ts primary key | | 0.1.7 | 2021-06-25 | [3978](https://github.com/airbytehq/airbyte/pull/3978) | Release Slack CDK Connector |