From a248f826ddd78f809d7a36acd46756370e2f1745 Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Thu, 5 Dec 2024 22:37:47 +0100 Subject: [PATCH] Stringify doc[id] field (#3015) --- connectors/es/sink.py | 9 ++++++--- tests/test_sink.py | 6 ++++-- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/connectors/es/sink.py b/connectors/es/sink.py index e38d2d526..0b9621b3a 100644 --- a/connectors/es/sink.py +++ b/connectors/es/sink.py @@ -545,7 +545,8 @@ async def get_docs(self, generator, skip_unchanged_documents=False): if count % self.display_every == 0: self._log_progress() - doc_id = doc["id"] = doc.pop("_id") + doc_id = doc.pop("_id") + doc["id"] = str(doc_id) if self.basic_rule_engine and not self.basic_rule_engine.should_ingest( doc @@ -660,7 +661,8 @@ async def get_docs_incrementally(self, generator): if count % self.display_every == 0: self._log_progress() - doc_id = doc["id"] = doc.pop("_id") + doc_id = doc.pop("_id") + doc["id"] = str(doc_id) if self.basic_rule_engine and not self.basic_rule_engine.should_ingest( doc @@ -737,7 +739,8 @@ async def get_access_control_docs(self, generator): if count % self.display_every == 0: self._log_progress() - doc_id = doc["id"] = doc.pop("_id") + doc_id = doc.pop("_id") + doc["id"] = str(doc_id) doc_exists = doc_id in existing_ids if doc_exists: diff --git a/tests/test_sink.py b/tests/test_sink.py index 5fab77114..00f7235f3 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -383,7 +383,8 @@ async def _dl(doit=True, timestamp=None): def index_operation(doc): # deepcopy as get_docs mutates docs doc_copy = deepcopy(doc) - doc_id = doc_copy["id"] = doc_copy.pop("_id") + doc_id = doc_copy.pop("_id") + doc_copy["id"] = str(doc_id) return {"_op_type": "index", "_index": INDEX, "_id": doc_id, "doc": doc_copy} @@ -391,7 +392,8 @@ def index_operation(doc): def update_operation(doc): # deepcopy as get_docs mutates docs doc_copy = deepcopy(doc) - doc_id = doc_copy["id"] = doc_copy.pop("_id") + doc_id = doc_copy.pop("_id") + doc_copy["id"] = str(doc_id) return {"_op_type": "update", "_index": INDEX, "_id": doc_id, "doc": doc_copy}