diff --git a/CHANGELOG.md b/CHANGELOG.md index d640ceabea..380c41d6bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Improvements - Add support for reporting the integration resync state to expose more information about the integration state in the portal +- Fix kafka listener never ending resync loop due to resyncState updates ## 0.9.14 (2024-08-19) diff --git a/port_ocean/core/event_listener/kafka.py b/port_ocean/core/event_listener/kafka.py index f9c749e767..9efb7c9874 100644 --- a/port_ocean/core/event_listener/kafka.py +++ b/port_ocean/core/event_listener/kafka.py @@ -99,9 +99,13 @@ def _should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool: return False integration_identifier = after.get("identifier") - if integration_identifier == self.integration_identifier and ( - "change.log" in topic - ): + if integration_identifier != self.integration_identifier: + return False + + if after.get("updatedAt") == after.get("resyncState", {}).get("updatedAt"): + return False + + if "change.log" in topic: return msg_value.get("changelogDestination", {}).get("type", "") == "KAFKA" return False