-
Notifications
You must be signed in to change notification settings - Fork 22
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update/kafka implementations #3753
Update/kafka implementations #3753
Conversation
📝 WalkthroughWalkthroughThe changes in this pull request involve significant updates to the Changes
Possibly related PRs
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (3)
src/workflows/airqo_etl_utils/message_broker_utils.py (2)
155-155
: LGTM! Consider extracting timeout values to constants.The addition of
request.timeout.ms
aligns well with existing timeout configurations and provides better resilience against network issues. However, consider extracting the timeout values (300000ms) to class-level constants for better maintainability.class MessageBrokerUtils: + DEFAULT_TIMEOUT_MS = 300000 # 5 minutes MAX_MESSAGE_SIZE = 1 * 1024 * 1024
Line range hint
266-267
: Enhance error handling for consumer errors.Consider improving error handling by adding more context and implementing retries for recoverable errors:
if msg.error(): - logger.exception(f"Consumer error: {msg.error()}") + error_code = msg.error().code() + if error_code in (KafkaError._PARTITION_EOF, KafkaError._TIMED_OUT): + logger.warning(f"Recoverable consumer error: {msg.error()}") + continue + logger.exception(f"Fatal consumer error ({error_code}): {msg.error()}") continue except Exception as e: - logger.exception(f"Error while consuming messages from topic {topic}: {e}") + logger.exception( + f"Unexpected error while consuming messages from topic {topic}. " + f"Group: {group_id}, Error: {str(e)}" + )Don't forget to add the import:
from confluent_kafka import KafkaErrorAlso applies to: 271-272
src/workflows/airqo_etl_utils/airqo_utils.py (1)
1055-1060
: Enhance error handling for message key and value processing.The new implementation adds proper JSON decoding error handling, which is good. However, consider these improvements:
- The error handling could be more specific about which message failed
- The continue statement after the exception handler is redundant
Apply this diff to improve the error handling:
- key = message.get("key", None) - try: - value = json.loads(message.get("value", None)) - except json.JSONDecodeError as e: - logger.exception(f"Error decoding JSON: {e}") - continue + key = message.get("key", None) + value = message.get("value", None) + try: + value = json.loads(value) if value else None + except json.JSONDecodeError as e: + logger.exception(f"Error decoding JSON for message with key {key}: {e}") + continue
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
- src/workflows/airqo_etl_utils/airqo_utils.py (2 hunks)
- src/workflows/airqo_etl_utils/message_broker_utils.py (1 hunks)
🔇 Additional comments (3)
src/workflows/airqo_etl_utils/message_broker_utils.py (1)
Line range hint
213-224
: Verify impact of removingfrom_beginning
parameter.While simplifying the API is good, we should ensure all existing consumers are updated to use
offset
parameter if they previously relied onfrom_beginning
.Additionally, consider enhancing the docstring to better explain offset behavior:
offset: Optional[int] = None, wait_time_sec: int = 30, streaming: bool = False, ) -> Any: """ Consume messages from a Kafka topic and return them. + + Note: To start consuming from the beginning of the topic, set both + auto_offset_reset='earliest' and offset=0. For normal consumption, + leave offset as None and use auto_offset_reset to control the initial position. Args: topic: The Kafka topic to consume from. group_id: The consumer group ID.src/workflows/airqo_etl_utils/airqo_utils.py (2)
1043-1043
: LGTM: Good addition of json module and Kafka configuration.The explicit import of the json module and setting
auto_commit=False
are good practices:
- JSON module allows for proper error handling during message decoding
- Disabling auto-commit provides better control over message consumption
Also applies to: 1052-1052
Line range hint
1043-1089
: Verify Kafka consumer group behavior.The changes to the Kafka consumer configuration with
auto_commit=False
might affect message processing guarantees. We should verify that messages are properly processed and not reprocessed unnecessarily.✅ Verification successful
Let me gather more information about the Kafka consumer configuration and message processing patterns.
This will help us understand the broader context of Kafka consumer configurations and error handling patterns in the codebase.
Kafka consumer configuration is properly implemented with explicit message handling
The implementation shows a well-structured approach to message processing:
- The
auto_commit=False
setting is intentional and safe, as messages are processed within a try-catch block- Proper error handling is in place for both JSON decoding and Kafka exceptions
- Messages with missing required fields are logged and skipped gracefully
- The consumer configuration is consistently implemented across the codebase
The code maintains message processing guarantees through:
- Explicit error handling for each message
- Clear logging of skipped messages and processing errors
- Proper duplicate handling via
drop_duplicates
on device identifiers🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Check for potential message reprocessing issues in the consumer group # Test: Look for duplicate message processing patterns in logs rg -A 5 "Skipping message with key:" # Test: Check for any error patterns in message processing rg -A 5 "Error decoding JSON"Length of output: 1117
Script:
#!/bin/bash # Check for Kafka consumer configuration patterns and message processing logic # Look for auto_commit usage and message processing patterns ast-grep --pattern 'consume_from_topic($$$)' # Check for other instances of Kafka consumer configuration rg -A 5 "auto_commit" # Look for message processing and commit patterns rg -A 5 "KafkaException"Length of output: 3095
Description
This PR cleans up the kafka implementations by improving error handling.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation