Skip to content

Commit

Permalink
Allow retries for statuses other than 429 in streaming_bulk
Browse files Browse the repository at this point in the history
  • Loading branch information
david-a authored Aug 28, 2019
1 parent 4312d9a commit 7b54d49
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions elasticsearch/helpers/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,18 @@ def _process_bulk_chunk(
raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)


def _retry_for_status(status):
if status == 429: return True
return False

def streaming_bulk(
client,
actions,
chunk_size=500,
max_chunk_bytes=100 * 1024 * 1024,
raise_on_error=True,
expand_action_callback=expand_action,
retry_for_status_callback=_retry_for_status,
raise_on_exception=True,
max_retries=0,
initial_backoff=2,
Expand Down Expand Up @@ -198,6 +203,9 @@ def streaming_bulk(
:arg expand_action_callback: callback executed on each action passed in,
should return a tuple containing the action line and the data line
(`None` if data line should be omitted).
:arg retry_for_status_callback: callback executed on each item's status,
should return a True if the status require a retry and False if not.
(if `None` is specified only status 429 will retry).
:arg max_retries: maximum number of times a document will be retried when
``429`` is received, set to 0 (default) for no retries on ``429``
:arg initial_backoff: number of seconds we should wait before the first
Expand Down Expand Up @@ -233,12 +241,12 @@ def streaming_bulk(

if not ok:
action, info = info.popitem()
# retry if retries enabled, we get 429, and we are not
# in the last attempt
# retry if retries enabled, we are not in the last attempt,
# and we get 429 (or retry_for_status_callback is true)
if (
max_retries
and info["status"] == 429
and (attempt + 1) <= max_retries
and retry_for_status_callback(info["status"])
):
# _process_bulk_chunk expects strings so we need to
# re-serialize the data
Expand All @@ -252,8 +260,8 @@ def streaming_bulk(
yield ok, info

except TransportError as e:
# suppress 429 errors since we will retry them
if attempt == max_retries or e.status_code != 429:
# suppress 429 errors (or any status which retry_for_status_callback is true for) since we will retry them
if attempt == max_retries or not retry_for_status_callback(e.status_code):
raise
else:
if not to_retry:
Expand Down

0 comments on commit 7b54d49

Please sign in to comment.