Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HGI-6506: Parallelize requests to improve performance #41

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 33 additions & 28 deletions tap_salesforce/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#!/usr/bin/env python3
import copy
import json
import sys
import singer
Expand All @@ -11,6 +12,7 @@
from tap_salesforce.salesforce.bulk import Bulk
from tap_salesforce.salesforce.exceptions import (
TapSalesforceException, TapSalesforceQuotaExceededException, TapSalesforceBulkAPIDisabledException)
import concurrent.futures

LOGGER = singer.get_logger()

Expand Down Expand Up @@ -55,7 +57,8 @@ def stream_is_selected(mdata):
def build_state(raw_state, catalog):
state = {}

for catalog_entry in catalog['streams']:
for read_only_catalog_entry in catalog['streams']:
catalog_entry = copy.deepcopy(read_only_catalog_entry)
tap_stream_id = catalog_entry['tap_stream_id']
catalog_metadata = metadata.to_map(catalog_entry['metadata'])
replication_method = catalog_metadata.get((), {}).get('replication-method')
Expand All @@ -74,7 +77,7 @@ def build_state(raw_state, catalog):
state = singer.write_bookmark(state, tap_stream_id, 'JobHighestBookmarkSeen', current_bookmark)

if replication_method == 'INCREMENTAL':
replication_key = catalog_metadata.get((), {}).get('replication-key')
replication_key = next(iter(catalog_metadata.get((), {}).get('valid-replication-keys', [])), None)
replication_key_value = singer.get_bookmark(raw_state,
tap_stream_id,
replication_key)
Expand Down Expand Up @@ -255,7 +258,7 @@ def get_views_list(sf):


# pylint: disable=too-many-branches,too-many-statements
def do_discover(sf):
def do_discover(sf: Salesforce):
"""Describes a Salesforce instance's objects and generates a JSON schema for each field."""
global_description = sf.describe()

Expand All @@ -271,44 +274,53 @@ def do_discover(sf):
if sf.api_type == 'BULK' and not Bulk(sf).has_permissions():
raise TapSalesforceBulkAPIDisabledException('This client does not have Bulk API permissions, received "API_DISABLED_FOR_ORG" error code')

for sobject_name in sorted(objects_to_discover):

def process_sobject(sobject_name):
# Skip blacklisted SF objects depending on the api_type in use
# ChangeEvent objects are not queryable via Bulk or REST (undocumented)
if (sobject_name in sf.get_blacklisted_objects() and sobject_name not in ACTIVITY_STREAMS) \
or sobject_name.endswith("ChangeEvent"):
continue
return None, None, None

sobject_description = sf.describe(sobject_name)

if sobject_description is None:
continue
return None, None, None

# Cache customSetting and Tag objects to check for blacklisting after
# all objects have been described
if sobject_description.get("customSetting"):
sf_custom_setting_objects.append(sobject_name)
return sobject_name, None, None
elif sobject_name.endswith("__Tag"):
relationship_field = next(
(f for f in sobject_description["fields"] if f.get("relationshipName") == "Item"),
None)
if relationship_field:
# Map {"Object":"Object__Tag"}
object_to_tag_references[relationship_field["referenceTo"]
[0]] = sobject_name
return None, {relationship_field["referenceTo"][0]: sobject_name}, None

fields = sobject_description['fields']
replication_key = get_replication_key(sobject_name, fields)

# Salesforce Objects are skipped when they do not have an Id field
if not [f["name"] for f in fields if f["name"]=="Id"]:
if not [f["name"] for f in fields if f["name"] == "Id"]:
LOGGER.info(
"Skipping Salesforce Object %s, as it has no Id field",
sobject_name)
continue
return None, None, None

entry = generate_schema(fields, sf, sobject_name, replication_key)
entries.append(entry)
return None, None, entry

with concurrent.futures.ThreadPoolExecutor() as executor:
results = list(executor.map(process_sobject, sorted(objects_to_discover)))

for custom_setting, tag_reference, entry in results:
if custom_setting:
sf_custom_setting_objects.append(custom_setting)
if tag_reference:
object_to_tag_references.update(tag_reference)
if entry:
entries.append(entry)

# Handle ListViews
views = get_views_list(sf)
Expand Down Expand Up @@ -413,7 +425,8 @@ def do_sync(sf, catalog, state,config=None):
catalog["streams"] = list_view + catalog["streams"]

# Sync Streams
for catalog_entry in catalog["streams"]:
for read_only_catalog_entry in catalog["streams"]:
catalog_entry = copy.deepcopy(read_only_catalog_entry)
stream_version = get_stream_version(catalog_entry, state)
stream = catalog_entry['stream']
stream_alias = catalog_entry.get('stream_alias')
Expand All @@ -422,11 +435,9 @@ def do_sync(sf, catalog, state,config=None):
stream=(stream_alias or stream.replace("/","_")), version=stream_version)

catalog_metadata = metadata.to_map(catalog_entry['metadata'])
replication_key = catalog_metadata.get((), {}).get('replication-key')

mdata = metadata.to_map(catalog_entry['metadata'])
replication_key = next(iter(catalog_metadata.get((), {}).get('valid-replication-keys', [])), None)

if not stream_is_selected(mdata):
if not stream_is_selected(catalog_metadata):
LOGGER.info("%s: Skipping - not selected", stream_name)
continue

Expand Down Expand Up @@ -465,9 +476,9 @@ def do_sync(sf, catalog, state,config=None):
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('JobID', None)
state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}).pop('BatchIDs', None)
bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \
.pop('JobHighestBookmarkSeen', None)
.pop('JobHighestBookmarkSeen', None)
existing_bookmark = state.get('bookmarks', {}).get(catalog_entry['tap_stream_id'], {}) \
.pop(replication_key, None)
.pop(replication_key, None)
state = singer.write_bookmark(
state,
catalog_entry['tap_stream_id'],
Expand All @@ -480,14 +491,9 @@ def do_sync(sf, catalog, state,config=None):
bookmark_is_empty = state.get('bookmarks', {}).get(
catalog_entry['tap_stream_id']) is None

if "/" in state["current_stream"]:
# get current name
old_key = state["current_stream"]
# get the new key name
new_key = old_key.replace("/","_")
state["current_stream"] = new_key

state["current_stream"] = state["current_stream"].replace("/", "_")
catalog_entry['tap_stream_id'] = catalog_entry['tap_stream_id'].replace("/","_")

if replication_key or bookmark_is_empty:
singer.write_message(activate_version_message)
state = singer.write_bookmark(state,
Expand All @@ -496,7 +502,6 @@ def do_sync(sf, catalog, state,config=None):
stream_version)
counter = sync_stream(sf, catalog_entry, state, input_state, catalog,config)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value)

state["current_stream"] = None
singer.write_state(state)
LOGGER.info("Finished sync")
Expand Down
38 changes: 20 additions & 18 deletions tap_salesforce/salesforce/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
import singer.utils as singer_utils
from singer import metadata, metrics

from tap_salesforce.salesforce.bulk import Bulk
from tap_salesforce.salesforce.rest import Rest
from simplejson.scanner import JSONDecodeError
from tap_salesforce.salesforce.exceptions import (
TapSalesforceException,
Expand Down Expand Up @@ -144,17 +142,17 @@ def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches
sf_type = field['type']

if sf_type in STRING_TYPES:
property_schema['type'] = "string"
property_schema['type'] = ["null", "string"]
elif sf_type in DATE_TYPES:
date_type = {"type": "string", "format": "date-time"}
string_type = {"type": ["string", "null"]}
date_type = {"type": ["null", "string"], "format": "date-time"}
string_type = {"type": ["null", "string"]}
property_schema["anyOf"] = [date_type, string_type]
elif sf_type == "boolean":
property_schema['type'] = "boolean"
property_schema['type'] = ["null", "boolean"]
elif sf_type in NUMBER_TYPES:
property_schema['type'] = "number"
property_schema['type'] = ["null", "number"]
elif sf_type == "address":
property_schema['type'] = "object"
property_schema['type'] = ["null", "object"]
property_schema['properties'] = {
"street": {"type": ["null", "string"]},
"state": {"type": ["null", "string"]},
Expand All @@ -166,9 +164,9 @@ def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches
"geocodeAccuracy": {"type": ["null", "string"]}
}
elif sf_type == "int":
property_schema['type'] = "integer"
property_schema['type'] = ["null", "integer"]
elif sf_type == "time":
property_schema['type'] = "string"
property_schema['type'] = ["null", "string"]
elif sf_type in LOOSE_TYPES:
return property_schema, mdata # No type = all types
elif sf_type in BINARY_TYPES:
Expand All @@ -178,19 +176,20 @@ def field_to_property_schema(field, mdata): # pylint:disable=too-many-branches
return property_schema, mdata
elif sf_type == 'location':
# geo coordinates are numbers or objects divided into two fields for lat/long
property_schema['type'] = ["number", "object", "null"]
property_schema['type'] = ["null", "object", "number"]
property_schema['properties'] = {
"longitude": {"type": ["null", "number"]},
"latitude": {"type": ["null", "number"]}
}
elif sf_type == 'json':
property_schema['type'] = "string"
property_schema['type'] = ["null", "string"]
else:
raise TapSalesforceException("Found unsupported type: {}".format(sf_type))

# The nillable field cannot be trusted
if field_name != 'Id' and sf_type != 'location' and sf_type not in DATE_TYPES:
property_schema['type'] = ["null", property_schema['type']]
if "null" not in property_schema['type']:
property_schema['type'].insert(0, "null")

return property_schema, mdata

Expand Down Expand Up @@ -283,12 +282,13 @@ def check_rest_quota_usage(self, headers):
on_backoff=log_backoff_attempt)
def _make_request(self, http_method, url, headers=None, body=None, stream=False, params=None, validate_json=False, timeout=None):
if http_method == "GET":
LOGGER.info("Making %s request to %s with params: %s", http_method, url, params)
LOGGER.debug("[REST] Making %s request to %s", http_method, url)
resp = self.session.get(url, headers=headers, stream=stream, params=params, timeout=timeout)
LOGGER.info("Completed %s request to %s with params: %s", http_method, url, params)
LOGGER.debug("[REST] Completed %s request to %s", http_method, url)
elif http_method == "POST":
LOGGER.info("Making %s request to %s with body %s", http_method, url, body)
LOGGER.debug("[REST] Making %s request to %s", http_method, url)
resp = self.session.post(url, headers=headers, data=body)
LOGGER.debug("[REST] Completed %s request to %s", http_method, url)
else:
raise TapSalesforceException("Unsupported HTTP method")

Expand Down Expand Up @@ -400,7 +400,7 @@ def _get_selected_properties(self, catalog_entry):

def get_start_date(self, state, catalog_entry):
catalog_metadata = metadata.to_map(catalog_entry['metadata'])
replication_key = catalog_metadata.get((), {}).get('replication-key')
replication_key = next(iter(catalog_metadata.get((), {}).get('valid-replication-keys', [])), None)

return (singer.get_bookmark(state,
catalog_entry['tap_stream_id'],
Expand All @@ -412,7 +412,7 @@ def _build_query_string(self, catalog_entry, start_date, end_date=None, order_by
query = "SELECT {} FROM {}".format(",".join(selected_properties), catalog_entry['stream'])

catalog_metadata = metadata.to_map(catalog_entry['metadata'])
replication_key = catalog_metadata.get((), {}).get('replication-key')
replication_key = next(iter(catalog_metadata.get((), {}).get('valid-replication-keys', [])), None)

if replication_key:
where_clause = " WHERE {} > {} ".format(
Expand All @@ -436,9 +436,11 @@ def query(self, catalog_entry, state, query_override=None):
if state["bookmarks"]["ListView"].get("SystemModstamp"):
del state["bookmarks"]["ListView"]["SystemModstamp"]
if self.api_type == BULK_API_TYPE and query_override is None:
from tap_salesforce.salesforce.bulk import Bulk
bulk = Bulk(self)
return bulk.query(catalog_entry, state)
elif self.api_type == REST_API_TYPE or query_override is not None:
from tap_salesforce.salesforce.rest import Rest
rest = Rest(self)
return rest.query(catalog_entry, state, query_override=query_override)
else:
Expand Down
Loading