Skip to content

Commit

Permalink
Make BulkIndexError and ScanError serializable (#2669)
Browse files Browse the repository at this point in the history
Co-authored-by: Quentin Pradet <[email protected]>
(cherry picked from commit 08addf2)
  • Loading branch information
seagrine authored and github-actions[bot] committed Nov 12, 2024
1 parent ab92623 commit 6bd1a85
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 4 deletions.
16 changes: 12 additions & 4 deletions elasticsearch/helpers/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,26 @@
# specific language governing permissions and limitations
# under the License.

from typing import Any, Dict, List
from typing import Any, Dict, List, Tuple, Type


class BulkIndexError(Exception):
def __init__(self, message: Any, errors: List[Dict[str, Any]]):
def __init__(self, message: str, errors: List[Dict[str, Any]]):
super().__init__(message)
self.errors: List[Dict[str, Any]] = errors

def __reduce__(
self,
) -> Tuple[Type["BulkIndexError"], Tuple[str, List[Dict[str, Any]]]]:
return (self.__class__, (self.args[0], self.errors))


class ScanError(Exception):
scroll_id: str

def __init__(self, scroll_id: str, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
def __init__(self, scroll_id: str, *args: Any) -> None:
super().__init__(*args)
self.scroll_id = scroll_id

def __reduce__(self) -> Tuple[Type["ScanError"], Tuple[str, str]]:
return (self.__class__, (self.scroll_id,) + self.args)
17 changes: 17 additions & 0 deletions test_elasticsearch/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import pickle
import threading
import time
from unittest import mock
Expand Down Expand Up @@ -182,3 +183,19 @@ class TestExpandActions:
@pytest.mark.parametrize("action", ["whatever", b"whatever"])
def test_string_actions_are_marked_as_simple_inserts(self, action):
assert ({"index": {}}, b"whatever") == helpers.expand_action(action)


def test_serialize_bulk_index_error():
error = helpers.BulkIndexError("message", [{"error": 1}])
pickled = pickle.loads(pickle.dumps(error))
assert pickled.__class__ == helpers.BulkIndexError
assert pickled.errors == error.errors
assert pickled.args == error.args


def test_serialize_scan_error():
error = helpers.ScanError("scroll_id", "shard_message")
pickled = pickle.loads(pickle.dumps(error))
assert pickled.__class__ == helpers.ScanError
assert pickled.scroll_id == error.scroll_id
assert pickled.args == error.args

0 comments on commit 6bd1a85

Please sign in to comment.