From 30ff59c043f143bc8d6038cb0c467f9b81b121c5 Mon Sep 17 00:00:00 2001 From: Jonas Lundberg Date: Sat, 13 Jul 2019 11:48:36 +0200 Subject: [PATCH 1/3] Include input actions when yielding in bulk helpers --- elasticsearch/helpers/actions.py | 155 +++++++++++++++--- test_elasticsearch/test_helpers.py | 2 +- .../test_server/test_helpers.py | 16 +- 3 files changed, 149 insertions(+), 24 deletions(-) diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index 61469f538..f4e54b24f 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -1,3 +1,4 @@ +from functools import partial from operator import methodcaller import time @@ -53,14 +54,21 @@ def expand_action(data): return action, data.get("_source", data) -def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer): +def _chunk_actions( + actions, + chunk_size, + max_chunk_bytes, + serializer, + expand_action_callback=expand_action +): """ Split actions into chunks by number or size, serialize them into strings in the process. """ bulk_actions, bulk_data = [], [] size, action_count = 0, 0 - for action, data in actions: + for input_action in actions: + action, data = expand_action_callback(input_action) raw_data, raw_action = data, action action = serializer.dumps(action) # +1 to account for the trailing new line character @@ -81,9 +89,9 @@ def _chunk_actions(actions, chunk_size, max_chunk_bytes, serializer): bulk_actions.append(action) if data is not None: bulk_actions.append(data) - bulk_data.append((raw_action, raw_data)) + bulk_data.append((input_action, raw_action, raw_data)) else: - bulk_data.append((raw_action,)) + bulk_data.append((input_action, raw_action,)) size += cur_size action_count += 1 @@ -121,10 +129,12 @@ def _process_bulk_chunk( for data in bulk_data: # collect all the information about failed actions - op_type, action = data[0].copy().popitem() + op_type, action = data[1].copy().popitem() info = {"error": err_message, "status": e.status_code, "exception": e} + # include original input action + info["action"] = data[0] if op_type != "delete": - info["data"] = data[1] + info["data"] = data[2] info.update(action) exc_errors.append({op_type: info}) @@ -142,11 +152,13 @@ def _process_bulk_chunk( for data, (op_type, item) in zip( bulk_data, map(methodcaller("popitem"), resp["items"]) ): + # include original input action + item["action"] = data[0] ok = 200 <= item.get("status", 500) < 300 if not ok and raise_on_error: # include original document source - if len(data) > 1: - item["data"] = data[1] + if len(data) > 2: + item["data"] = data[2] errors.append({op_type: item}) if ok or not errors: @@ -173,7 +185,6 @@ def streaming_bulk( *args, **kwargs ): - """ Streaming bulk consumes actions from the iterable passed in and yields results per action. For non-streaming usecases use @@ -206,10 +217,70 @@ def streaming_bulk( :arg max_backoff: maximum number of seconds a retry will wait :arg yield_ok: if set to False will skip successful documents in the output """ - actions = map(expand_action_callback, actions) + chunker = partial( + _chunk_actions, chunk_size=chunk_size, max_chunk_bytes=max_chunk_bytes + ) - for bulk_data, bulk_actions in _chunk_actions( - actions, chunk_size, max_chunk_bytes, client.transport.serializer + for item in streaming_chunks( + client, + actions, + chunker, + raise_on_error=raise_on_error, + expand_action_callback=expand_action_callback, + raise_on_exception=raise_on_exception, + max_retries=max_retries, + initial_backoff=initial_backoff, + max_backoff=max_backoff, + yield_ok=yield_ok, + *args, + **kwargs + ): + yield item + + +def streaming_chunks( + client, + actions, + chunker, + # chunk_size=500, + # max_chunk_bytes=100 * 1024 * 1024, + raise_on_error=True, + expand_action_callback=expand_action, + raise_on_exception=True, + max_retries=0, + initial_backoff=2, + max_backoff=600, + yield_ok=True, + *args, + **kwargs +): + """ + Implementation of the ``streaming_bulk`` helper, chunking actions using + given chunker function. + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use + :arg actions: iterable containing the actions to be executed + :arg chunker: function to chunk actions into separate ``bulk`` calls, + should yield tuples of raw data and serialized actions. + :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) + from the execution of the last chunk when some occur. By default we raise. + :arg raise_on_exception: if ``False`` then don't propagate exceptions from + call to ``bulk`` and just report the items that failed as failed. + :arg expand_action_callback: callback executed on each action passed in, + should return a tuple containing the action line and the data line + (`None` if data line should be omitted). + :arg max_retries: maximum number of times a document will be retried when + ``429`` is received, set to 0 (default) for no retries on ``429`` + :arg initial_backoff: number of seconds we should wait before the first + retry. Any subsequent retries will be powers of ``initial_backoff * + 2**retry_number`` + :arg max_backoff: maximum number of seconds a retry will wait + :arg yield_ok: if set to False will skip successful documents in the output + """ + for bulk_data, bulk_actions in chunker( + actions, + serializer=client.transport.serializer, + expand_action_callback=expand_action_callback ): for attempt in range(max_retries + 1): @@ -241,9 +312,9 @@ def streaming_bulk( and (attempt + 1) <= max_retries ): # _process_bulk_chunk expects strings so we need to - # re-serialize the data + # re-serialize the expanded action and data to_retry.extend( - map(client.transport.serializer.dumps, data) + map(client.transport.serializer.dumps, data[1:]) ) to_retry_data.append(data) else: @@ -338,12 +409,56 @@ def parallel_bulk( :arg queue_size: size of the task queue between the main thread (producing chunks to send) and the processing threads. """ + chunker = partial( + _chunk_actions, chunk_size=chunk_size, max_chunk_bytes=max_chunk_bytes + ) + + for item in parallel_chunks( + client, + actions, + chunker, + thread_count=thread_count, + queue_size=queue_size, + expand_action_callback=expand_action_callback, + *args, + **kwargs + ): + yield item + + +def parallel_chunks( + client, + actions, + chunker, + thread_count=4, + queue_size=4, + expand_action_callback=expand_action, + *args, + **kwargs +): + """ + Implementation of the ``parallel_bulk`` helper, chunking actions using + given chunker function. + + :arg client: instance of :class:`~elasticsearch.Elasticsearch` to use + :arg actions: iterator containing the actions + :arg chunker: function to chunk actions into separate ``bulk`` calls, + should yield tuples of raw data and serialized actions. + :arg thread_count: size of the threadpool to use for the bulk requests + :arg raise_on_error: raise ``BulkIndexError`` containing errors (as `.errors`) + from the execution of the last chunk when some occur. By default we raise. + :arg raise_on_exception: if ``False`` then don't propagate exceptions from + call to ``bulk`` and just report the items that failed as failed. + :arg expand_action_callback: callback executed on each action passed in, + should return a tuple containing the action line and the data line + (`None` if data line should be omitted). + :arg queue_size: size of the task queue between the main thread (producing + chunks to send) and the processing threads. + """ # Avoid importing multiprocessing unless parallel_bulk is used # to avoid exceptions on restricted environments like App Engine from multiprocessing.pool import ThreadPool - actions = map(expand_action_callback, actions) - class BlockingPool(ThreadPool): def _setup_queues(self): super(BlockingPool, self)._setup_queues() @@ -361,9 +476,11 @@ def _setup_queues(self): client, bulk_chunk[1], bulk_chunk[0], *args, **kwargs ) ), - _chunk_actions( - actions, chunk_size, max_chunk_bytes, client.transport.serializer - ), + chunker( + actions, + serializer=client.transport.serializer, + expand_action_callback=expand_action_callback + ) ): for item in result: yield item diff --git a/test_elasticsearch/test_helpers.py b/test_elasticsearch/test_helpers.py index b95792f79..b8ebc66f6 100644 --- a/test_elasticsearch/test_helpers.py +++ b/test_elasticsearch/test_helpers.py @@ -58,7 +58,7 @@ def test_chunk_sent_from_different_threads(self, _process_bulk_chunk): class TestChunkActions(TestCase): def setUp(self): super(TestChunkActions, self).setUp() - self.actions = [({"index": {}}, {"some": u"datá", "i": i}) for i in range(100)] + self.actions = [{"_op_type": "index", "some": u"datá", "i": i} for i in range(100)] def test_chunks_are_chopped_by_byte_size(self): self.assertEquals( diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index bb679e9e3..60a7fc5b1 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -56,10 +56,13 @@ def test_all_errors_from_chunk_are_raised_on_failure(self): self.client.cluster.health(wait_for_status="yellow") try: - for ok, item in helpers.streaming_bulk( - self.client, [{"a": "b"}, {"a": "c"}], index="i", raise_on_error=True - ): + docs = [{"a": "b"}, {"a": "c"}] + for i, (ok, item) in enumerate(helpers.streaming_bulk( + self.client, docs, index="i", raise_on_error=True + )): self.assertTrue(ok) + op_type, info = item.popitem() + self.assertEquals(info["action"], docs[i]) except helpers.BulkIndexError as e: self.assertEquals(2, len(e.errors)) else: @@ -81,8 +84,10 @@ def test_different_op_types(self): "doc": {"answer": 42}, }, ] - for ok, item in helpers.streaming_bulk(self.client, docs): + for i, (ok, item) in enumerate(helpers.streaming_bulk(self.client, docs)): self.assertTrue(ok) + op_type, info = item.popitem() + self.assertEquals(info["action"], docs[i]) self.assertFalse(self.client.exists(index="i", id=45)) self.assertEquals({"answer": 42}, self.client.get(index="i", id=42)["_source"]) @@ -117,6 +122,7 @@ def test_transport_error_can_becaught(self): "_index": "i", "_type": "_doc", "_id": 45, + "action": docs[1], "data": {"f": "v"}, "error": "TransportError(599, 'Error!')", "status": 599, @@ -147,6 +153,7 @@ def test_rejected_documents_are_retried(self): ) self.assertEquals(3, len(results)) self.assertEquals([True, True, True], [r[0] for r in results]) + self.assertEquals(results[1][1]["index"]["action"], docs[1]) self.client.indices.refresh(index="i") res = self.client.search(index="i") self.assertEquals({"value": 3, "relation": "eq"}, res["hits"]["total"]) @@ -175,6 +182,7 @@ def test_rejected_documents_are_retried_at_most_max_retries_times(self): ) self.assertEquals(3, len(results)) self.assertEquals([False, True, True], [r[0] for r in results]) + self.assertEquals(results[0][1]["index"]["action"], docs[0]) self.client.indices.refresh(index="i") res = self.client.search(index="i") self.assertEquals({"value": 2, "relation": "eq"}, res["hits"]["total"]) From 006fb4c18310d0cee65be822a21a419094fe76a1 Mon Sep 17 00:00:00 2001 From: Jonas Lundberg Date: Sun, 14 Jul 2019 22:59:55 +0200 Subject: [PATCH 2/3] Remove redundant expand_action_callback from streaming_chunks and parallel_chunks --- elasticsearch/helpers/__init__.py | 1 + elasticsearch/helpers/actions.py | 36 +++++++++++-------------------- 2 files changed, 13 insertions(+), 24 deletions(-) diff --git a/elasticsearch/helpers/__init__.py b/elasticsearch/helpers/__init__.py index 848138bf0..174bdaecd 100644 --- a/elasticsearch/helpers/__init__.py +++ b/elasticsearch/helpers/__init__.py @@ -1,4 +1,5 @@ from .errors import BulkIndexError, ScanError from .actions import expand_action, streaming_bulk, bulk, parallel_bulk +from .actions import streaming_chunks, parallel_chunks from .actions import scan, reindex from .actions import _chunk_actions, _process_bulk_chunk diff --git a/elasticsearch/helpers/actions.py b/elasticsearch/helpers/actions.py index f4e54b24f..54c4c41c1 100644 --- a/elasticsearch/helpers/actions.py +++ b/elasticsearch/helpers/actions.py @@ -218,7 +218,11 @@ def streaming_bulk( :arg yield_ok: if set to False will skip successful documents in the output """ chunker = partial( - _chunk_actions, chunk_size=chunk_size, max_chunk_bytes=max_chunk_bytes + _chunk_actions, + chunk_size=chunk_size, + max_chunk_bytes=max_chunk_bytes, + serializer=client.transport.serializer, + expand_action_callback=expand_action_callback ) for item in streaming_chunks( @@ -226,7 +230,6 @@ def streaming_bulk( actions, chunker, raise_on_error=raise_on_error, - expand_action_callback=expand_action_callback, raise_on_exception=raise_on_exception, max_retries=max_retries, initial_backoff=initial_backoff, @@ -242,10 +245,7 @@ def streaming_chunks( client, actions, chunker, - # chunk_size=500, - # max_chunk_bytes=100 * 1024 * 1024, raise_on_error=True, - expand_action_callback=expand_action, raise_on_exception=True, max_retries=0, initial_backoff=2, @@ -266,9 +266,6 @@ def streaming_chunks( from the execution of the last chunk when some occur. By default we raise. :arg raise_on_exception: if ``False`` then don't propagate exceptions from call to ``bulk`` and just report the items that failed as failed. - :arg expand_action_callback: callback executed on each action passed in, - should return a tuple containing the action line and the data line - (`None` if data line should be omitted). :arg max_retries: maximum number of times a document will be retried when ``429`` is received, set to 0 (default) for no retries on ``429`` :arg initial_backoff: number of seconds we should wait before the first @@ -277,11 +274,7 @@ def streaming_chunks( :arg max_backoff: maximum number of seconds a retry will wait :arg yield_ok: if set to False will skip successful documents in the output """ - for bulk_data, bulk_actions in chunker( - actions, - serializer=client.transport.serializer, - expand_action_callback=expand_action_callback - ): + for bulk_data, bulk_actions in chunker(actions): for attempt in range(max_retries + 1): to_retry, to_retry_data = [], [] @@ -410,7 +403,11 @@ def parallel_bulk( chunks to send) and the processing threads. """ chunker = partial( - _chunk_actions, chunk_size=chunk_size, max_chunk_bytes=max_chunk_bytes + _chunk_actions, + chunk_size=chunk_size, + max_chunk_bytes=max_chunk_bytes, + serializer=client.transport.serializer, + expand_action_callback=expand_action_callback ) for item in parallel_chunks( @@ -419,7 +416,6 @@ def parallel_bulk( chunker, thread_count=thread_count, queue_size=queue_size, - expand_action_callback=expand_action_callback, *args, **kwargs ): @@ -432,7 +428,6 @@ def parallel_chunks( chunker, thread_count=4, queue_size=4, - expand_action_callback=expand_action, *args, **kwargs ): @@ -449,9 +444,6 @@ def parallel_chunks( from the execution of the last chunk when some occur. By default we raise. :arg raise_on_exception: if ``False`` then don't propagate exceptions from call to ``bulk`` and just report the items that failed as failed. - :arg expand_action_callback: callback executed on each action passed in, - should return a tuple containing the action line and the data line - (`None` if data line should be omitted). :arg queue_size: size of the task queue between the main thread (producing chunks to send) and the processing threads. """ @@ -476,11 +468,7 @@ def _setup_queues(self): client, bulk_chunk[1], bulk_chunk[0], *args, **kwargs ) ), - chunker( - actions, - serializer=client.transport.serializer, - expand_action_callback=expand_action_callback - ) + chunker(actions) ): for item in result: yield item From d7cc58178c7de4566c628556783afaea9e05bda0 Mon Sep 17 00:00:00 2001 From: Jonas Lundberg Date: Sun, 14 Jul 2019 23:01:30 +0200 Subject: [PATCH 3/3] Test streaming_chunks --- .../test_server/test_helpers.py | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/test_elasticsearch/test_server/test_helpers.py b/test_elasticsearch/test_server/test_helpers.py index 60a7fc5b1..6cfe211c2 100644 --- a/test_elasticsearch/test_server/test_helpers.py +++ b/test_elasticsearch/test_server/test_helpers.py @@ -1,4 +1,4 @@ -from mock import patch +from mock import patch, MagicMock from elasticsearch import helpers, TransportError from elasticsearch.helpers import ScanError @@ -211,6 +211,39 @@ def streaming_bulk(): self.assertEquals(4, failing_client._called) +class TestStreamingChunks(ElasticsearchTestCase): + def simple_chunker(self, actions): + for item in actions: + raw_action = { + "index": { + "_id": item["id"] + } + } + data = { + "x": item["x"] + } + action_lines = list(map( + self.client.transport.serializer.dumps, (raw_action, data) + )) + yield [(item, raw_action, data)], action_lines + + def test_actions_chunker(self): + actions = [{"id": 1, "x": "A"}, {"id": 2, "x": "B"}] + actions_gen = (action for action in actions) + + mock_chunker = MagicMock() + mock_chunker.side_effect = self.simple_chunker + + for i, (ok, item) in enumerate(helpers.streaming_chunks( + self.client, actions_gen, mock_chunker, index="test-index" + )): + self.assertTrue(ok) + self.assertEquals(item["index"]["_id"], str(actions[i]["id"])) + self.assertEquals(item["index"]["action"], actions[i]) + + mock_chunker.assert_called_once_with(actions_gen) + + class TestBulk(ElasticsearchTestCase): def test_bulk_works_with_single_item(self): docs = [{"answer": 42, "_id": 1}]