diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 0000000..22c3c8b --- /dev/null +++ b/.pylintrc @@ -0,0 +1,2 @@ +[MESSAGES CONTROL] +disable=missing-docstring,too-few-public-methods,too-many-arguments,logging-format-interpolation diff --git a/README.md b/README.md index 6a9e278..1f6b10a 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,29 @@ This tap: - Outputs the schema for each resource - Incrementally pulls data based on the input state +## Configuration + +Create a `config.json` file that looks like this: + +```json +{ + "start_date": "2010-01-01", + "access_token": "your-access-token", + "user_agent": "tap-zendesk-chat " +} +``` + +Overview over the possible config properties: + +| Config property | Required / Default value | Description +| --------------------------- | ------------------------ | ----------- +| `start_date` | Yes | For streams with replication method INCREMENTAL the start date time to be used +| `access_token` | Yes | Your Zen Desk Chat access token +| `user_agent` | No | User agent to be used for HTTP requests +| `agents_page_limit` | No, default `"100"` | page limit for stream `agents` +| `chat_search_interval_days` | No, default: `"14"` | the interval in days for stream `chats` +| `chats_full_sync_days` | No | See section "Chats Full Re-syncs" below + ## Quick Start 1. Install @@ -43,16 +66,8 @@ Zopim account. 3. Create the Config File -Create a JSON file called `config.json` containing the access token and a -`start_date`, which specifies the date at which the tap will begin pulling data -(for those resources that support this). - -```json -{ - "start_date": "2010-01-01", - "access_token": "your-access-token" -} -``` +Create a JSON file called `config.json` from the `sample_config.json` file in this +repository. 4. Run the Tap in Discovery Mode @@ -94,4 +109,5 @@ tap's "state." --- -Copyright © 2017 Stitch +Copyright © 2017 Stitch
+Copyright © 2020 Horze International GmbH diff --git a/sample_config.json b/sample_config.json new file mode 100644 index 0000000..5cb3385 --- /dev/null +++ b/sample_config.json @@ -0,0 +1,5 @@ +{ + "start_date": "2010-01-01", + "access_token": "your-access-token", + "user_agent": "tap-zendesk-chat " +} \ No newline at end of file diff --git a/setup.py b/setup.py index f14e71b..2d35d79 100755 --- a/setup.py +++ b/setup.py @@ -9,9 +9,7 @@ classifiers=["Programming Language :: Python :: 3 :: Only"], py_modules=["tap_zendesk_chat"], install_requires=[ - "python-dateutil==2.6.0", # because of singer-python issue - "pendulum==1.2.0", # because of singer-python issue - "singer-python==5.0.3", + "singer-python==5.9.1", "requests==2.20.0", ], entry_points=""" diff --git a/tap_zendesk_chat/__init__.py b/tap_zendesk_chat/__init__.py index 6e8028d..33dd00e 100644 --- a/tap_zendesk_chat/__init__.py +++ b/tap_zendesk_chat/__init__.py @@ -5,8 +5,10 @@ from singer.catalog import Catalog, CatalogEntry, Schema from requests.exceptions import HTTPError from . import streams as streams_ +from .streams import STREAM_OBJECTS from .context import Context from .http import Client +from .sync import sync REQUIRED_CONFIG_KEYS = ["start_date", "access_token"] LOGGER = singer.get_logger() @@ -30,14 +32,14 @@ def load_schema(tap_stream_id): def ensure_credentials_are_authorized(client): # The request will throw an exception if the credentials are not authorized - client.request(streams_.DEPARTMENTS.tap_stream_id) + client.request(STREAM_OBJECTS['departments'].tap_stream_id) def is_account_endpoint_authorized(client): # The account endpoint is restricted to zopim accounts, meaning integrated # Zendesk accounts will get a 403 for this endpoint. try: - client.request(streams_.ACCOUNT.tap_stream_id) + client.request(STREAM_OBJECTS['account'].tap_stream_id) except HTTPError as e: if e.response.status_code == 403: LOGGER.info( @@ -46,8 +48,7 @@ def is_account_endpoint_authorized(client): "from discovery." ) return False - else: - raise + raise return True @@ -55,61 +56,35 @@ def discover(config): client = Client(config) ensure_credentials_are_authorized(client) include_account_stream = is_account_endpoint_authorized(client) - catalog = Catalog([]) - for stream in streams_.all_streams: + streams = [] + for _, stream in STREAM_OBJECTS.items(): if (not include_account_stream - and stream.tap_stream_id == streams_.ACCOUNT.tap_stream_id): + and stream.tap_stream_id == STREAM_OBJECTS['account'].tap_stream_id): continue raw_schema = load_schema(stream.tap_stream_id) - mdata = build_metadata(raw_schema) schema = Schema.from_dict(raw_schema) - catalog.streams.append(CatalogEntry( + streams.append(CatalogEntry( stream=stream.tap_stream_id, tap_stream_id=stream.tap_stream_id, key_properties=stream.pk_fields, schema=schema, - metadata=metadata.to_list(mdata) + metadata=metadata.get_standard_metadata( + schema=raw_schema, + schema_name=stream.tap_stream_id, + key_properties=stream.pk_fields, + valid_replication_keys=stream.replication_keys, + replication_method=stream.replication_method) )) - return catalog - -def build_metadata(raw_schema): - mdata = metadata.new() - for prop in raw_schema['properties'].keys(): - metadata.write(mdata, ('properties', prop), 'inclusion', 'automatic') - return mdata - - -def output_schema(stream): - schema = load_schema(stream.tap_stream_id) - singer.write_schema(stream.tap_stream_id, schema, stream.pk_fields) - - -def is_selected(stream): - mdata = metadata.to_map(stream.metadata) - return metadata.get(mdata, (), 'selected') - -def sync(ctx): - currently_syncing = ctx.state.get("currently_syncing") - start_idx = streams_.all_stream_ids.index(currently_syncing) \ - if currently_syncing else 0 - stream_ids_to_sync = [cs.tap_stream_id for cs in ctx.catalog.streams - if is_selected(cs)] - streams = [s for s in streams_.all_streams[start_idx:] - if s.tap_stream_id in stream_ids_to_sync] - for stream in streams: - ctx.state["currently_syncing"] = stream.tap_stream_id - output_schema(stream) - ctx.write_state() - stream.sync(ctx) - ctx.state["currently_syncing"] = None - ctx.write_state() + return Catalog(streams) def main_impl(): args = utils.parse_args(REQUIRED_CONFIG_KEYS) if args.discover: discover(args.config).dump() - print() + elif args.catalog: + ctx = Context(args.config, args.state, args.catalog) + sync(ctx) else: catalog = Catalog.from_dict(args.properties) \ if args.properties else discover(args.config) diff --git a/tap_zendesk_chat/context.py b/tap_zendesk_chat/context.py index 16772f3..8147011 100644 --- a/tap_zendesk_chat/context.py +++ b/tap_zendesk_chat/context.py @@ -1,16 +1,16 @@ -from datetime import datetime -from .http import Client +from datetime import datetime, timezone import singer -from datetime import datetime + +from .http import Client -class Context(object): +class Context: def __init__(self, config, state, catalog): self.config = config self.state = state self.catalog = catalog self.client = Client(config) - self.now = datetime.utcnow() + self.now = datetime.utcnow().replace(tzinfo=timezone.utc) @property def bookmarks(self): diff --git a/tap_zendesk_chat/http.py b/tap_zendesk_chat/http.py index acd0c3e..36ea1bd 100644 --- a/tap_zendesk_chat/http.py +++ b/tap_zendesk_chat/http.py @@ -9,7 +9,7 @@ class RateLimitException(Exception): pass -class Client(object): +class Client: def __init__(self, config): # self.session = requests.Session() self.access_token = config["access_token"] diff --git a/tap_zendesk_chat/schemas/chats.json b/tap_zendesk_chat/schemas/chats.json index 896b177..d5941de 100644 --- a/tap_zendesk_chat/schemas/chats.json +++ b/tap_zendesk_chat/schemas/chats.json @@ -4,25 +4,29 @@ "type": [ "null", "integer" - ] + ], + "description": "The ID of the department to which the chat is directed" }, "comment": { "type": [ "null", "string" - ] + ], + "description": "The customer comment on the chat" }, "missed": { "type": [ "null", "boolean" - ] + ], + "description": "Whether the chat was missed or not" }, "rating": { "type": [ "null", "string" - ] + ], + "description": "The customer satisfaction rating for the chat" }, "conversions": { "items": { @@ -31,13 +35,16 @@ "type": [ "null", "array" - ] + ], + "description": "Last 20 conversions (if any) attributed to the chat" }, "type": { "type": [ "null", "string" - ] + ], + "enum": ["offline_msg", "chat"], + "description": "Chat type. One of offline_msg, chat" }, "webpath": { "items": { @@ -46,19 +53,22 @@ "type": [ "null", "array" - ] + ], + "description": "The list of pages the customer navigated to during the chat" }, "triggered": { "type": [ "null", "boolean" - ] + ], + "description": "Whether the chat was a triggered chat or not" }, "message": { "type": [ "null", "string" - ] + ], + "description": "Message of the chat" }, "referrer_search_terms": { "type": [ @@ -76,21 +86,23 @@ "type": [ "null", "integer" - ] + ], + "description": "The ID of the Zendesk Support ticket created from this chat. Available only if using version 2 of the Zendesk Chat-Support integration" }, "unread": { "type": [ "null", - "boolean", - "integer" - ] + "boolean" + ], + "description": "Whether the chat is unread" }, "timestamp": { "type": [ "null", "string" ], - "format": "date-time" + "format": "date-time", + "description": "Timestamp for the chat" }, "end_timestamp": { "type": [ @@ -100,13 +112,15 @@ "format": "date-time" }, "response_time": { - "$ref": "chat_response_time" + "$ref": "chat_response_time", + "description": "Statistics about the response times in the chat, avg, max and first" }, "session": { "type": [ "null", "object" ], + "description": "Information related to the session of the session of the chat", "additionalProperties": true }, "history": { @@ -116,7 +130,8 @@ "type": [ "null", "array" - ] + ], + "description": "Chronological list of messages in the chat" }, "agent_names": { "items": { @@ -127,7 +142,8 @@ "type": [ "null", "array" - ] + ], + "description": "Names of agents involved in the chat" }, "tags": { "items": { @@ -138,31 +154,38 @@ "type": [ "null", "array" - ] + ], + "description": "Tags associated with the chat" }, "visitor": { - "$ref": "chat_visitor" + "$ref": "chat_visitor", + "description": "Information about the visitor" }, "started_by": { "type": [ "null", "string" - ] + ], + "enum": ["visitor", "agent", "trigger"], + "description": "Who started the chat. Can be one of visitor, agent or trigger" }, "triggered_response": { "type": [ "null", "boolean" - ] + ], + "description": "Whether the response was a triggered response or not" }, "id": { "type": [ "null", "string" - ] + ], + "description": "The ID of the chat" }, "count": { - "$ref": "chat_count" + "$ref": "chat_count", + "description": "Number of messages (each) by the visitor and the agent(s)" }, "agent_ids": { "items": { @@ -173,19 +196,22 @@ "type": [ "null", "array" - ] + ], + "description": "IDs of agents involved in the chat" }, "department_name": { "type": [ "null", "string" - ] + ], + "description": "The name of the department to which the chat is directed" }, "duration": { "type": [ "null", "integer" - ] + ], + "description": "Duration of the chat" } }, "additionalProperties": false, diff --git a/tap_zendesk_chat/streams.py b/tap_zendesk_chat/streams.py index 6c3dbae..77abca7 100644 --- a/tap_zendesk_chat/streams.py +++ b/tap_zendesk_chat/streams.py @@ -1,53 +1,66 @@ -from singer import metrics -from pendulum import parse as dt_parse -import time from datetime import datetime, timedelta +import inspect import json import singer +from singer import metrics, Transformer, metadata, utils LOGGER = singer.get_logger() def break_into_intervals(days, start_time: str, now: datetime): delta = timedelta(days=days) - start_dt = dt_parse(start_time) + # conver to datetime + add 1 millisecond so that we only get new records + start_dt = utils.strptime_to_utc(start_time) \ + + timedelta(milliseconds=1) while start_dt < now: end_dt = min(start_dt + delta, now) yield start_dt, end_dt start_dt = end_dt -class Stream(object): +class Stream: """Information about and functions for syncing streams. Important class properties: :var tap_stream_id: :var pk_fields: A list of primary key fields""" - def __init__(self, tap_stream_id, pk_fields): - self.tap_stream_id = tap_stream_id - self.pk_fields = pk_fields + tap_stream_id = None + pk_fields = None + replication_method = None + replication_keys = None def metrics(self, page): with metrics.record_counter(self.tap_stream_id) as counter: counter.increment(len(page)) def format_response(self, response): - return [response] if type(response) != list else response + return [response] if not isinstance(response, list) else response - def write_page(self, page): + def write_page(self, ctx, page): """Formats a list of records in place and outputs the data to stdout.""" - singer.write_records(self.tap_stream_id, page) + stream = ctx.catalog.get_stream(self.tap_stream_id) + with Transformer() as transformer: + for rec in page: + singer.write_record( + self.tap_stream_id, + transformer.transform( + rec, stream.schema.to_dict(), metadata.to_map(stream.metadata), + ) + ) self.metrics(page) class Everything(Stream): def sync(self, ctx): - self.write_page(ctx.client.request(self.tap_stream_id)) + self.write_page(ctx, ctx.client.request(self.tap_stream_id)) class Agents(Stream): + tap_stream_id = 'agents' + pk_fields = ["id"] + def sync(self, ctx): since_id_offset = [self.tap_stream_id, "offset", "id"] since_id = ctx.bookmark(since_id_offset) or 0 @@ -59,7 +72,7 @@ def sync(self, ctx): page = ctx.client.request(self.tap_stream_id, params) if not page: break - self.write_page(page) + self.write_page(ctx, page) since_id = page[-1]["id"] + 1 ctx.set_bookmark(since_id_offset, since_id) ctx.write_state() @@ -68,6 +81,10 @@ def sync(self, ctx): class Chats(Stream): + tap_stream_id = 'chats' + pk_fields = ["id"] + replication_method = 'INCREMENTAL' + def _bulk_chats(self, ctx, chat_ids): if not chat_ids: return [] @@ -123,7 +140,7 @@ def _pull(self, ctx, chat_type, ts_field, *, full_sync): chat_ids = [r["id"] for r in search_resp["results"]] chats = self._bulk_chats(ctx, chat_ids) if chats: - self.write_page(chats) + self.write_page(ctx, chats) max_bookmark = max(max_bookmark, *[c[ts_field] for c in chats]) if not next_url: break @@ -137,7 +154,7 @@ def _should_run_full_sync(self, ctx): if not last_sync: LOGGER.info("Running full sync of chats: no last sync time") return True - next_sync = dt_parse(last_sync) + timedelta(days=int(sync_days)) + next_sync = utils.strptime_to_utc(last_sync) + timedelta(days=int(sync_days)) if next_sync <= ctx.now: LOGGER.info("Running full sync of chats: " "last sync was {}, configured to run every {} days" @@ -147,46 +164,69 @@ def _should_run_full_sync(self, ctx): def sync(self, ctx): full_sync = self._should_run_full_sync(ctx) - self._pull(ctx, "chat", "end_timestamp", full_sync=full_sync), + self._pull(ctx, "chat", "end_timestamp", full_sync=full_sync) self._pull(ctx, "offline_msg", "timestamp", full_sync=full_sync) if full_sync: ctx.state["chats_last_full_sync"] = ctx.now.isoformat() ctx.write_state() +class Shortcuts(Everything): + tap_stream_id = 'shortcuts' + pk_fields = ["name"] + replication_method = 'FULL_TABLE' + + class Triggers(Stream): + tap_stream_id = 'triggers' + pk_fields = ["id"] + replication_method = 'FULL_TABLE' + def sync(self, ctx): page = ctx.client.request(self.tap_stream_id) for trigger in page: definition = trigger["definition"] for k in ["condition", "actions"]: definition[k] = json.dumps(definition[k]) - self.write_page(page) + self.write_page(ctx, page) class Bans(Stream): + tap_stream_id = 'bans' + pk_fields = ['id'] + replication_method = 'FULL_TABLE' + def sync(self, ctx): response = ctx.client.request(self.tap_stream_id) page = response["visitor"] + response["ip_address"] - self.write_page(page) + self.write_page(ctx, page) + + +class Departments(Everything): + tap_stream_id = 'departments' + pk_fields = ["id"] + replication_method = 'FULL_TABLE' + + +class Goals(Everything): + tap_stream_id = 'goals' + pk_fields = ["id"] + replication_method = 'FULL_TABLE' class Account(Stream): + tap_stream_id = 'account' + pk_fields = ['account_key'] + replication_method = 'FULL_TABLE' + def sync(self, ctx): # The account endpoint returns a single item, so we have to wrap it in # a list to write a "page" - self.write_page([ctx.client.request(self.tap_stream_id)]) - -DEPARTMENTS = Everything("departments", ["id"]) -ACCOUNT = Account("account", ["account_key"]) -all_streams = [ - Agents("agents", ["id"]), - Chats("chats", ["id"]), - Everything("shortcuts", ["name"]), - Triggers("triggers", ["id"]), - Bans("bans", ["id"]), - DEPARTMENTS, - Everything("goals", ["id"]), - ACCOUNT, -] -all_stream_ids = [s.tap_stream_id for s in all_streams] + self.write_page(ctx, [ctx.client.request(self.tap_stream_id)]) + + +STREAM_OBJECTS = { + cls.tap_stream_id: cls + for cls in globals().values() + if inspect.isclass(cls) and issubclass(cls, Stream) and cls.tap_stream_id +} diff --git a/tap_zendesk_chat/sync.py b/tap_zendesk_chat/sync.py new file mode 100644 index 0000000..7ad7727 --- /dev/null +++ b/tap_zendesk_chat/sync.py @@ -0,0 +1,30 @@ +import singer + +from .streams import STREAM_OBJECTS + +LOGGER = singer.get_logger() + + +def sync(ctx): + selected_streams = ctx.catalog.get_selected_streams(ctx.state) + + for stream in selected_streams: + stream_name = stream.tap_stream_id + stream_object = STREAM_OBJECTS.get(stream_name)() + + if stream_object is None: + raise Exception("Attempted to sync unknown stream {}".format(stream_name)) + + singer.write_schema( + stream_name, + stream.schema.to_dict(), + stream_object.pk_fields, + ) + + LOGGER.info("Syncing stream: " + stream_name) + ctx.state["currently_syncing"] = stream_name + + ctx.write_state() + stream_object.sync(ctx) + ctx.state["currently_syncing"] = None + ctx.write_state() diff --git a/test/test_streams.py b/test/test_streams.py index c21c9c6..86848f5 100644 --- a/test/test_streams.py +++ b/test/test_streams.py @@ -1,14 +1,14 @@ -import pendulum +from singer import utils + from tap_zendesk_chat.streams import break_into_intervals def test_intervals(): days = 30 - now = pendulum.parse("2018-02-14T10:30:20") + now = utils.strptime_to_utc("2018-02-14T10:30:20") broken = break_into_intervals(days, "2018-01-02T18:14:33", now) as_strs = [(x.isoformat(), y.isoformat()) for x, y in broken] assert as_strs == [ ("2018-01-02T18:14:33+00:00", "2018-02-01T18:14:33+00:00"), ("2018-02-01T18:14:33+00:00", "2018-02-14T10:30:20+00:00"), ] -