From 750dbe94b7ce5fd0e0b4a3f2ac2ed278625bd66c Mon Sep 17 00:00:00 2001 From: Shalev Avhar <51760613+shalev007@users.noreply.github.com> Date: Wed, 21 Aug 2024 21:00:38 +0300 Subject: [PATCH] fix resync-state kafka listener bug (#931) - **fix: kafka listener constant resync loop** - **feat: bump version** # Description *Bugfix* ```diff ! What - Fix kafka listener never ending loop of resyncs bug - Why - This bug was created when we started to updated the resyncState along with integration data, - therefor each update would create a new audit-log change which triggered the Kafka listener to start a new resync + How - we validate that it was not an update created by the resyc-state by comparing them ``` ## Type of change Please leave one option from the following and delete the rest: - [X] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. --------- Co-authored-by: Shalev Avhar --- CHANGELOG.md | 1 + port_ocean/core/event_listener/kafka.py | 10 +++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d640ceabea..380c41d6bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Improvements - Add support for reporting the integration resync state to expose more information about the integration state in the portal +- Fix kafka listener never ending resync loop due to resyncState updates ## 0.9.14 (2024-08-19) diff --git a/port_ocean/core/event_listener/kafka.py b/port_ocean/core/event_listener/kafka.py index f9c749e767..9efb7c9874 100644 --- a/port_ocean/core/event_listener/kafka.py +++ b/port_ocean/core/event_listener/kafka.py @@ -99,9 +99,13 @@ def _should_be_processed(self, msg_value: dict[Any, Any], topic: str) -> bool: return False integration_identifier = after.get("identifier") - if integration_identifier == self.integration_identifier and ( - "change.log" in topic - ): + if integration_identifier != self.integration_identifier: + return False + + if after.get("updatedAt") == after.get("resyncState", {}).get("updatedAt"): + return False + + if "change.log" in topic: return msg_value.get("changelogDestination", {}).get("type", "") == "KAFKA" return False