Skip to content

Commit

Permalink
🐛 Source Shopify: fix 404 for configured streams, fix missing curso…
Browse files Browse the repository at this point in the history
…r error for old records (airbytehq#17777)
  • Loading branch information
bazarnov authored and jhammarstedt committed Oct 31, 2022
1 parent f1d0b1c commit 26d6cb9
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,7 @@
- name: Shopify
sourceDefinitionId: 9da77001-af33-4bcd-be46-6252bf9342b9
dockerRepository: airbyte/source-shopify
dockerImageTag: 0.1.37
dockerImageTag: 0.1.38
documentationUrl: https://docs.airbyte.com/integrations/sources/shopify
icon: shopify.svg
sourceType: api
Expand Down
40 changes: 22 additions & 18 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10010,7 +10010,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-shopify:0.1.37"
- dockerImage: "airbyte/source-shopify:0.1.38"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/shopify"
connectionSpecification:
Expand All @@ -10034,6 +10034,24 @@
type: "object"
order: 2
oneOf:
- title: "API Password"
description: "API Password Auth"
type: "object"
required:
- "auth_method"
- "api_password"
properties:
auth_method:
type: "string"
const: "api_password"
order: 0
api_password:
type: "string"
title: "API Password"
description: "The API Password for your private application in the\
\ `Shopify` store."
airbyte_secret: true
order: 1
- type: "object"
title: "OAuth2.0"
description: "OAuth2.0"
Expand All @@ -10049,33 +10067,19 @@
title: "Client ID"
description: "The Client ID of the Shopify developer application."
airbyte_secret: true
order: 1
client_secret:
type: "string"
title: "Client Secret"
description: "The Client Secret of the Shopify developer application."
airbyte_secret: true
order: 2
access_token:
type: "string"
title: "Access Token"
description: "The Access Token for making authenticated requests."
airbyte_secret: true
- title: "API Password"
description: "API Password Auth"
type: "object"
required:
- "auth_method"
- "api_password"
properties:
auth_method:
type: "string"
const: "api_password"
order: 0
api_password:
type: "string"
title: "API Password"
description: "The API Password for your private application in the\
\ `Shopify` store."
airbyte_secret: true
order: 3
start_date:
type: "string"
title: "Replication Start Date"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-shopify/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ COPY source_shopify ./source_shopify
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.37
LABEL io.airbyte.version=0.1.38
LABEL io.airbyte.name=airbyte/source-shopify
Original file line number Diff line number Diff line change
@@ -1,84 +1,80 @@
{
"customers": {
"updated_at": "2022-03-02T03:23:16-08:00"
"updated_at": "2022-06-22T03:50:13-07:00"
},
"orders": {
"updated_at": "2022-03-03T03:47:46-08:00"
"updated_at": "2022-10-10T06:21:53-07:00"
},
"draft_orders": {
"updated_at": "2022-02-22T04:29:57-08:00"
"updated_at": "2022-10-08T05:07:29-07:00"
},
"products": {
"updated_at": "2022-03-03T03:47:51-08:00"
},
"abandoned_checkouts": {
"updated_at": "2022-03-02T03:23:22-08:00"
"updated_at": "2022-10-10T06:21:56-07:00"
},
"abandoned_checkouts": {},
"metafields": {
"updated_at": "2021-07-08T03:38:45-07:00"
"updated_at": "2022-05-30T23:42:02-07:00"
},
"collects": {
"id": 29427031703741
"id": 29427031703740
},
"custom_collections": {
"updated_at": "2021-08-18T02:39:34-07:00"
"updated_at": "2022-10-08T04:44:51-07:00"
},
"order_refunds": {
"created_at": "2022-03-03T03:47:46-08:00",
"orders": {
"updated_at": "2022-03-03T03:47:46-08:00"
}
"created_at": "2022-10-10T06:21:53-07:00",
"orders": {
"updated_at": "2022-10-10T06:21:53-07:00"
}
},
"order_risks": {
"id": 6446736474301,
"orders": {
"updated_at": "2022-02-22T00:37:28-08:00"
}
"id": 6446736474301,
"orders": {
"updated_at": "2022-03-07T02:09:04-08:00"
}
},
"transactions": {
"created_at": "2022-03-03T03:47:45-08:00",
"orders": {
"updated_at": "2022-03-03T03:47:46-08:00"
}
"created_at": "2022-10-10T06:21:52-07:00",
"orders": {
"updated_at": "2022-10-10T06:21:53-07:00"
}
},
"tender_transactions": {
"processed_at": "2022-03-03T03:47:45-08:00"
"processed_at": "2022-10-10T06:21:52-07:00"
},
"pages": {
"updated_at": "2021-07-08T05:24:10-07:00"
"updated_at": "2022-10-08T08:07:00-07:00"
},
"price_rules": {
"updated_at": "2021-09-10T06:48:10-07:00"
"updated_at": "2021-09-10T06:48:10-07:00"
},
"discount_codes": {
"updated_at": "2021-09-10T06:48:10-07:00",
"price_rules": {
"price_rules": {
"updated_at": "2021-09-10T06:48:10-07:00"
},
"updated_at": "2021-09-10T06:48:10-07:00"
}
},
"inventory_items": {
"updated_at": "2022-02-22T00:40:26-08:00",
"products": {
"updated_at": "2022-02-18T02:39:48-07:00"
}
"products": {
"updated_at": "2022-03-17T03:10:35-07:00"
},
"updated_at": "2022-03-06T14:12:20-08:00"
},
"inventory_levels": {
"updated_at": "2022-03-03T03:47:51-08:00",
"locations": {}
"locations": {},
"updated_at": "2022-10-10T06:21:56-07:00"
},
"fulfillment_orders": {
"id": 5424260808893,
"orders": {
"updated_at": "2022-03-03T03:47:46-08:00"
}
"id": 5567724486845,
"orders": {
"updated_at": "2022-10-10T06:05:29-07:00"
}
},
"fulfillments": {
"updated_at": "2022-02-27T23:49:13-08:00",
"orders": {
"updated_at": "2022-03-03T03:47:46-08:00"
}
},
"balance_transactions": {
"id": 29427031703741
}
}
"updated_at": "2022-06-22T03:50:14-07:00",
"orders": {
"updated_at": "2022-10-10T06:05:29-07:00"
}
},
"balance_transactions": {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ class ShopifyStream(HttpStream, ABC):
order_field = "updated_at"
filter_field = "updated_at_min"

raise_on_http_errors = True

def __init__(self, config: Dict):
super().__init__(authenticator=config["authenticator"])
self._transformer = DataTypeEnforcer(self.get_json_schema())
Expand Down Expand Up @@ -82,6 +84,12 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
record["shop_url"] = self.config["shop"]
yield self._transformer.transform(record)

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 404:
self.logger.warn(f"Stream `{self.name}` is not available, skipping.")
setattr(self, "raise_on_http_errors", False)
return super().should_retry(response)

@property
@abstractmethod
def data_field(self) -> str:
Expand Down Expand Up @@ -125,11 +133,18 @@ def request_params(self, stream_state: Mapping[str, Any] = None, next_page_token
# Parse the `stream_slice` with respect to `stream_state` for `Incremental refresh`
# cases where we slice the stream, the endpoints for those classes don't accept any other filtering,
# but they provide us with the updated_at field in most cases, so we used that as incremental filtering during the order slicing.
def filter_records_newer_than_state(self, stream_state: Mapping[str, Any] = None, records_slice: Mapping[str, Any] = None) -> Iterable:
def filter_records_newer_than_state(self, stream_state: Mapping[str, Any] = None, records_slice: Iterable[Mapping] = None) -> Iterable:
# Getting records >= state
if stream_state:
for record in records_slice:
if record.get(self.cursor_field, "") >= stream_state.get(self.cursor_field):
if self.cursor_field in record:
if record.get(self.cursor_field, self.default_state_comparison_value) >= stream_state.get(self.cursor_field):
yield record
else:
# old entities could miss the cursor field
self.logger.warn(
f"Stream `{self.name}`, Record ID: `{record.get(self.primary_key)}` missing cursor: {self.cursor_field}"
)
yield record
else:
yield from records_slice
Expand Down Expand Up @@ -204,9 +219,11 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
{slice_key: 999
]
"""
sorted_substream_slices = []

# reading parent nested stream_state from child stream state
parent_stream_state = stream_state.get(self.parent_stream.name) if stream_state else {}

# reading the parent stream
for record in self.parent_stream.read_records(stream_state=parent_stream_state, **kwargs):
# updating the `stream_state` with the state of it's parent stream
Expand All @@ -217,10 +234,23 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
# and corresponds to the data of child_substream we need.
if self.nested_substream:
if record.get(self.nested_substream):
yield {self.slice_key: record[self.nested_record]}
sorted_substream_slices.append(
{
self.slice_key: record[self.nested_record],
self.cursor_field: record[self.nested_substream][0].get(self.cursor_field, self.default_state_comparison_value),
}
)
else:
yield {self.slice_key: record[self.nested_record]}

# output slice from sorted list to avoid filtering older records
if self.nested_substream:
if len(sorted_substream_slices) > 0:
# sort by cursor_field
sorted_substream_slices.sort(key=lambda x: x.get(self.cursor_field))
for sorted_slice in sorted_substream_slices:
yield {self.slice_key: sorted_slice[self.slice_key]}

def read_records(
self,
stream_state: Mapping[str, Any] = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,26 @@
"type": "object",
"order": 2,
"oneOf": [
{
"title": "API Password",
"description": "API Password Auth",
"type": "object",
"required": ["auth_method", "api_password"],
"properties": {
"auth_method": {
"type": "string",
"const": "api_password",
"order": 0
},
"api_password": {
"type": "string",
"title": "API Password",
"description": "The API Password for your private application in the `Shopify` store.",
"airbyte_secret": true,
"order": 1
}
}
},
{
"type": "object",
"title": "OAuth2.0",
Expand All @@ -34,38 +54,22 @@
"type": "string",
"title": "Client ID",
"description": "The Client ID of the Shopify developer application.",
"airbyte_secret": true
"airbyte_secret": true,
"order": 1
},
"client_secret": {
"type": "string",
"title": "Client Secret",
"description": "The Client Secret of the Shopify developer application.",
"airbyte_secret": true
"airbyte_secret": true,
"order": 2
},
"access_token": {
"type": "string",
"title": "Access Token",
"description": "The Access Token for making authenticated requests.",
"airbyte_secret": true
}
}
},
{
"title": "API Password",
"description": "API Password Auth",
"type": "object",
"required": ["auth_method", "api_password"],
"properties": {
"auth_method": {
"type": "string",
"const": "api_password",
"order": 0
},
"api_password": {
"type": "string",
"title": "API Password",
"description": "The API Password for your private application in the `Shopify` store.",
"airbyte_secret": true
"airbyte_secret": true,
"order": 3
}
}
}
Expand Down
Loading

0 comments on commit 26d6cb9

Please sign in to comment.