Skip to content

Commit

Permalink
#14149 source marketo: retry job creation instead of skipping
Browse files Browse the repository at this point in the history
  • Loading branch information
davydov-d committed Aug 16, 2022
1 parent ab65ef5 commit 70fc423
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 15 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-marketo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_marketo ./source_marketo
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-marketo
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["null", "object"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["null", "integer"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["null", "object"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["null", "integer"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["object", "null"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"company": {
"type": ["string", "null"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["object", "null"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["integer", "null"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"type": ["object", "null"],
"additionalProperties": false,
"additionalProperties": true,
"properties": {
"id": {
"type": ["integer", "null"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,8 @@ def stream_slices(

export = self.create_export(param)

status, export_id = export.get("status", "").lower(), export.get("exportId")
if status != "created" or not export_id:
self.logger.warning(f"Failed to create export job for data slice {date_slice}!")
continue
date_slice["id"] = export_id
yield date_slice
date_slice["id"] = export["exportId"]
return date_slices

def sleep_till_export_completed(self, stream_slice: Mapping[str, Any]) -> bool:
while True:
Expand Down Expand Up @@ -270,6 +266,16 @@ class MarketoExportCreate(MarketoStream):
def path(self, **kwargs) -> str:
return f"bulk/v1/{self.stream_name}/export/create.json"

def should_retry(self, response: requests.Response) -> bool:
if response.status_code == 429 or 500 <= response.status_code < 600:
return True
record = next(self.parse_response(response, {}))
status, export_id = record.get("status", "").lower(), record.get("exportId")
if status != "created" or not export_id:
self.logger.warning(f"Failed to create export job! Status is {status}!")
return True
return False

def request_body_json(self, **kwargs) -> Optional[Mapping]:
params = {"format": "CSV"}
if self.param:
Expand Down Expand Up @@ -382,7 +388,7 @@ def get_json_schema(self) -> Mapping[str, Any]:
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"additionalProperties": False,
"additionalProperties": True,
"properties": properties,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def mock_requests(requests_mock):

@pytest.fixture
def config():
start_date = pendulum.now().subtract(days=100).strftime("%Y-%m-%dT%H:%M:%SZ")
start_date = pendulum.now().subtract(days=75).strftime("%Y-%m-%dT%H:%M:%SZ")
config = {
"client_id": "client-id",
"client_secret": "********",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ def test_create_export_job(send_email_stream, caplog):
{"endAt": ANY, "id": "cd465f55", "startAt": ANY},
{"endAt": ANY, "id": "232aafb4", "startAt": ANY},
]
assert "Failed to create export job for data slice " in caplog.records[-1].message
assert "Failed to create export job! Status is failed!" in caplog.records[-1].message

0 comments on commit 70fc423

Please sign in to comment.