diff --git a/connectors/es/sink.py b/connectors/es/sink.py index 52df8f59a..9bee7e7b2 100644 --- a/connectors/es/sink.py +++ b/connectors/es/sink.py @@ -545,7 +545,8 @@ async def get_docs(self, generator, skip_unchanged_documents=False): if count % self.display_every == 0: self._log_progress() - doc_id = doc["id"] = doc.pop("_id") + doc_id = doc.pop("_id") + doc["id"] = str(doc_id) if self.basic_rule_engine and not self.basic_rule_engine.should_ingest( doc @@ -660,7 +661,8 @@ async def get_docs_incrementally(self, generator): if count % self.display_every == 0: self._log_progress() - doc_id = doc["id"] = doc.pop("_id") + doc_id = doc.pop("_id") + doc["id"] = str(doc_id) if self.basic_rule_engine and not self.basic_rule_engine.should_ingest( doc @@ -737,7 +739,8 @@ async def get_access_control_docs(self, generator): if count % self.display_every == 0: self._log_progress() - doc_id = doc["id"] = doc.pop("_id") + doc_id = doc.pop("_id") + doc["id"] = str(doc_id) doc_exists = doc_id in existing_ids if doc_exists: diff --git a/tests/test_sink.py b/tests/test_sink.py index c8f374f9c..98bba50f7 100644 --- a/tests/test_sink.py +++ b/tests/test_sink.py @@ -361,7 +361,8 @@ async def _dl(doit=True, timestamp=None): def index_operation(doc): # deepcopy as get_docs mutates docs doc_copy = deepcopy(doc) - doc_id = doc_copy["id"] = doc_copy.pop("_id") + doc_id = doc_copy.pop("_id") + doc_copy["id"] = str(doc_id) return {"_op_type": "index", "_index": INDEX, "_id": doc_id, "doc": doc_copy} @@ -369,7 +370,8 @@ def index_operation(doc): def update_operation(doc): # deepcopy as get_docs mutates docs doc_copy = deepcopy(doc) - doc_id = doc_copy["id"] = doc_copy.pop("_id") + doc_id = doc_copy.pop("_id") + doc_copy["id"] = str(doc_id) return {"_op_type": "update", "_index": INDEX, "_id": doc_id, "doc": doc_copy}