Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source marketo: retry job creation instead of skipping #15683

Merged
merged 4 commits into from
Aug 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@
- name: Marketo
sourceDefinitionId: 9e0556f4-69df-4522-a3fb-03264d36b348
dockerRepository: airbyte/source-marketo
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/sources/marketo
icon: marketo.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5078,7 +5078,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-marketo:0.1.4"
- dockerImage: "airbyte/source-marketo:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/marketo"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-marketo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-marketo
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
empty_streams: ["activities_visit_webpage"]
timeout_seconds: 3600
timeout_seconds: 4800
expect_records:
path: "integration_tests/expected_records.txt"
incremental:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
future_state_path: "integration_tests/abnormal_state.json"
timeout_seconds: 3600
timeout_seconds: 4800
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
timeout_seconds: 3600
timeout_seconds: 4800
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["null", "object"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["null", "integer"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["null", "object"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["null", "integer"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["object", "null"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"company": {
"type": ["string", "null"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["object", "null"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["integer", "null"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["object", "null"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["integer", "null"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,8 @@ def stream_slices(

export = self.create_export(param)

status, export_id = export.get("status", "").lower(), export.get("exportId")
if status != "created" or not export_id:
self.logger.warning(f"Failed to create export job for data slice {date_slice}!")
continue
date_slice["id"] = export_id
yield date_slice
date_slice["id"] = export["exportId"]
return date_slices

def sleep_till_export_completed(self, stream_slice: Mapping[str, Any]) -> bool:
while True:
Expand Down Expand Up @@ -270,6 +266,16 @@ class MarketoExportCreate(MarketoStream):
def path(self, **kwargs) -> str:
return f"bulk/v1/{self.stream_name}/export/create.json"

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 429 or 500 <= response.status_code < 600:
return True
record = next(self.parse_response(response, {}))
status, export_id = record.get("status", "").lower(), record.get("exportId")
if status != "created" or not export_id:
self.logger.warning(f"Failed to create export job! Status is {status}!")
return True
return False

def request_body_json(self, **kwargs) -> Optional[Mapping]:
params = {"format": "CSV"}
if self.param:
Expand Down Expand Up @@ -382,7 +388,7 @@ def get_json_schema(self) -> Mapping[str, Any]:
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"additionalProperties": False,
"additionalProperties": True,
"properties": properties,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def mock_requests(requests_mock):

@pytest.fixture
def config():
start_date = pendulum.now().subtract(days=100).strftime("%Y-%m-%dT%H:%M:%SZ")
start_date = pendulum.now().subtract(days=75).strftime("%Y-%m-%dT%H:%M:%SZ")
config = {
"client_id": "client-id",
"client_secret": "********",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ def test_create_export_job(send_email_stream, caplog):
{"endAt": ANY, "id": "cd465f55", "startAt": ANY},
{"endAt": ANY, "id": "232aafb4", "startAt": ANY},
]
assert "Failed to create export job for data slice " in caplog.records[-1].message
assert "Failed to create export job! Status is failed!" in caplog.records[-1].message
15 changes: 8 additions & 7 deletions docs/integrations/sources/marketo.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,12 @@ We're almost there! Armed with your Endpoint & Identity URLs and your Client ID

## CHANGELOG

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------|
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------|
| `0.1.5` | 2022-08-16 | [15683](https://github.com/airbytehq/airbyte/pull/15683) | Retry failed creation of a job instead of skipping it |
| `0.1.4` | 2022-06-20 | [13930](https://github.com/airbytehq/airbyte/pull/13930) | Process failing creation of export jobs |
| `0.1.3` | 2021-12-10 | [8429](https://github.com/airbytehq/airbyte/pull/8578) | Updated titles and descriptions |
| `0.1.2` | 2021-12-03 | [8483](https://github.com/airbytehq/airbyte/pull/8483) | Improve field conversion to conform schema |
| `0.1.1` | 2021-11-29 | [0000](https://github.com/airbytehq/airbyte/pull/0000) | Fix timestamp value format issue |
| `0.1.0` | 2021-09-06 | [5863](https://github.com/airbytehq/airbyte/pull/5863) | Release Marketo CDK Connector |