From 061ed1d6d0027f90e43d29245b05985f94380873 Mon Sep 17 00:00:00 2001 From: Samira El Aabidi <54845154+Samira-El@users.noreply.github.com> Date: Mon, 20 Nov 2023 09:40:19 +0000 Subject: [PATCH] AP-1574 More logs and new config for ease of debugging (#161) --- CHANGELOG.md | 12 ++++++++ README.md | 37 +++++++++++------------ setup.py | 2 +- tap_kafka/__init__.py | 1 + tap_kafka/sync.py | 47 ++++++++++++++++++++++-------- tests/integration/test_consumer.py | 15 +++++----- tests/unit/test_tap_kafka.py | 7 +++-- 7 files changed, 81 insertions(+), 40 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 34e7026..b94ba4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +8.2.0 (2023-11-17) +------------------ +- Add more info logs +- Add new config [`debug_contexts`](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts) for enabling debugging + +8.1.0 (2023-08-25) +------------------ +- Bump `dpath` from `2.0.6` to `2.1.*` +- Bump `confluent-kafka[protobuf]` from `1.9.2` to `2.2.*` +- Bump `grpcio-tools` from `1.51.1` to `1.57.*` +- Bump test dependencies + 8.0.0 (2022-12-08) ------------------ - Switch from `subscribe` to `assign` for better initial offset control diff --git a/README.md b/README.md index e432171..4765b93 100644 --- a/README.md +++ b/README.md @@ -53,24 +53,25 @@ or Full list of options in `config.json`: -| Property | Type | Required? | Description | -|-----------------------------------|---------|------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| bootstrap_servers | String | Yes | `host[:port]` string (or list of comma separated `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata. | -| group_id | String | Yes | The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. | -| topic | String | Yes | Name of kafka topic to subscribe to | -| partitions | List | | (Default: [] (all)) Partition(s) of topic to consume, example `[0,4]` | -| primary_keys | Object | | Optionally you can define primary key for the consumed messages. It requires a column name and `/slashed/paths` ala xpath selector to extract the value from the kafka messages. The extracted column will be added to every output singer message. | -| use_message_key | Bool | | (Default: true) Defines whether to use Kafka message key as a primary key for the record. Note: if a custom primary key(s) has been defined, it will be used instead of the message_key. | -| initial_start_time | String | | (Default: latest) Start time reference of the message consumption if no bookmarked position in `state.json`. One of: `beginning`, `earliest`, `latest` or an ISO-8601 formatted timestamp string. | -| max_runtime_ms | Integer | | (Default: 300000) The maximum time for the tap to collect new messages from Kafka topic. If this time exceeds it will flush the batch and close kafka connection. | -| commit_interval_ms | Integer | | (Default: 5000) Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store. | -| consumer_timeout_ms | Integer | | (Default: 10000) KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration | -| session_timeout_ms | Integer | | (Default: 30000) KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities. | -| heartbeat_interval_ms | Integer | | (Default: 10000) KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities. | -| max_poll_interval_ms | Integer | | (Default: 300000) KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management. | -| message_format | String | | (Default: json) Supported message formats are `json` and `protobuf`. | -| proto_schema | String | | Protobuf message format in `.proto` syntax. Required if the `message_format` is `protobuf`. | -| proto_classes_dir | String | | (Default: current working dir) | +| Property | Type | Required? | Description | +|-----------------------|---------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| bootstrap_servers | String | Yes | `host[:port]` string (or list of comma separated `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata. | +| group_id | String | Yes | The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. | +| topic | String | Yes | Name of kafka topic to subscribe to | +| partitions | List | | (Default: [] (all)) Partition(s) of topic to consume, example `[0,4]` | +| primary_keys | Object | | Optionally you can define primary key for the consumed messages. It requires a column name and `/slashed/paths` ala xpath selector to extract the value from the kafka messages. The extracted column will be added to every output singer message. | +| use_message_key | Bool | | (Default: true) Defines whether to use Kafka message key as a primary key for the record. Note: if a custom primary key(s) has been defined, it will be used instead of the message_key. | +| initial_start_time | String | | (Default: latest) Start time reference of the message consumption if no bookmarked position in `state.json`. One of: `beginning`, `earliest`, `latest` or an ISO-8601 formatted timestamp string. | +| max_runtime_ms | Integer | | (Default: 300000) The maximum time for the tap to collect new messages from Kafka topic. If this time exceeds it will flush the batch and close kafka connection. | +| commit_interval_ms | Integer | | (Default: 5000) Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store. | +| consumer_timeout_ms | Integer | | (Default: 10000) KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration | +| session_timeout_ms | Integer | | (Default: 30000) KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities. | +| heartbeat_interval_ms | Integer | | (Default: 10000) KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities. | +| max_poll_interval_ms | Integer | | (Default: 300000) KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management. | +| message_format | String | | (Default: json) Supported message formats are `json` and `protobuf`. | +| proto_schema | String | | Protobuf message format in `.proto` syntax. Required if the `message_format` is `protobuf`. | +| proto_classes_dir | String | | (Default: current working dir) | +| debug_contexts | String | | comma separated list of debug contexts to enable for the consumer [see librkafka](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts) | This tap reads Kafka messages and generating singer compatible SCHEMA and RECORD messages in the following format. diff --git a/setup.py b/setup.py index 874a450..1cbebfb 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ long_description = fh.read() setup(name='pipelinewise-tap-kafka', - version='8.1.0', + version='8.2.0', description='Singer.io tap for extracting data from Kafka topic - PipelineWise compatible', long_description=long_description, long_description_content_type='text/markdown', diff --git a/tap_kafka/__init__.py b/tap_kafka/__init__.py index 3b12833..9926601 100644 --- a/tap_kafka/__init__.py +++ b/tap_kafka/__init__.py @@ -119,6 +119,7 @@ def generate_config(args_config): 'message_format': args_config.get('message_format', DEFAULT_MESSAGE_FORMAT), 'proto_schema': args_config.get('proto_schema', DEFAULT_PROTO_SCHEMA), 'proto_classes_dir': args_config.get('proto_classes_dir', DEFAULT_PROTO_CLASSES_DIR), + 'debug_contexts': args_config.get('debug_contexts'), } validate_config(config) diff --git a/tap_kafka/sync.py b/tap_kafka/sync.py index ce18c75..b44a3e5 100644 --- a/tap_kafka/sync.py +++ b/tap_kafka/sync.py @@ -8,10 +8,10 @@ import singer import confluent_kafka -from confluent_kafka import KafkaError +from confluent_kafka import KafkaException +from typing import List from singer import utils, metadata -from tap_kafka.errors import AllBrokersDownException from tap_kafka.errors import InvalidBookmarkException from tap_kafka.errors import InvalidConfigException from tap_kafka.errors import InvalidTimestampException @@ -111,7 +111,8 @@ def epoch_to_iso_timestamp(epoch) -> str: def init_kafka_consumer(kafka_config): LOGGER.info('Initialising Kafka Consumer...') - consumer = confluent_kafka.DeserializingConsumer({ + + consumer_conf = { # Required parameters 'bootstrap.servers': kafka_config['bootstrap_servers'], 'group.id': kafka_config['group_id'], @@ -124,7 +125,15 @@ def init_kafka_consumer(kafka_config): # Non-configurable parameters 'enable.auto.commit': False, 'value.deserializer': init_value_deserializer(kafka_config), - }) + } + + if kafka_config['debug_contexts']: + # https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts + consumer_conf['debug'] = kafka_config['debug_contexts'] + + consumer = confluent_kafka.DeserializingConsumer(consumer_conf) + + LOGGER.info('Kafka Consumer initialised successfully') return consumer @@ -171,7 +180,7 @@ def kafka_message_to_singer_record(message, primary_keys: dict, use_message_key: if primary_keys: for key, pk_selector in primary_keys.items(): try: - record[key] = dpath.util.get(message.value(), pk_selector) + record[key] = dpath.get(message.value(), pk_selector) except KeyError: raise PrimaryKeyNotFoundException(f"Custom primary key not found in the message: '{pk_selector}'") elif use_message_key: @@ -189,17 +198,20 @@ def consume_kafka_message(message, topic, primary_keys, use_message_key): singer.write_message(singer.RecordMessage(stream=topic, record=singer_record, time_extracted=utils.now())) -def select_kafka_partitions(consumer, kafka_config) -> confluent_kafka.TopicPartition: +def select_kafka_partitions(consumer, kafka_config) -> List[confluent_kafka.TopicPartition]: """Select partitions in topic""" + LOGGER.info(f"Selecting partitions in topic '{kafka_config['topic']}'") + topic = kafka_config['topic'] partition_ids_requested = kafka_config['partitions'] try: topic_meta = consumer.list_topics(topic, timeout=kafka_config['max_poll_interval_ms'] / 1000) partition_meta = topic_meta.topics[topic].partitions - except: - raise AllBrokersDownException + except KafkaException: + LOGGER.exception(f"Unable to list partitions in topic '{topic}'", exc_info=True) + raise if not partition_meta: raise InvalidConfigException(f"No partitions available in topic '{topic}'") @@ -209,7 +221,7 @@ def select_kafka_partitions(consumer, kafka_config) -> confluent_kafka.TopicPart for partition in partition_meta: partition_ids_available.append(partition) - if partition_ids_requested == []: + if not partition_ids_requested: partition_ids = partition_ids_available LOGGER.info(f"Requesting all partitions in topic '{topic}'") else: @@ -302,21 +314,25 @@ def set_partition_offsets(consumer, partitions, kafka_config, state = {}): def assign_kafka_partitions(consumer, partitions): """Assign and seek partitions to offsets""" - LOGGER.info(f"Assigning partitions '{partitions}'") + LOGGER.info("Assigning partitions to consumer ...") consumer.assign(partitions) partitions_committed = partitions for partition in partitions_committed: - partition.offset = partition.offset - 1 + partition.offset -= 1 if all(partition.offset >= 0 for partition in partitions_committed): - LOGGER.info(f"Committing partitions '{partitions_committed}'") + LOGGER.info("Committing partitions ") consumer.commit(offsets=partitions_committed) + else: + LOGGER.info("Partitions not committed because one or more offsets are less than zero") def commit_consumer_to_bookmarked_state(consumer, topic, state): """Commit every bookmarked offset to kafka""" + LOGGER.info("Committing bookmarked offsets to kafka ...") + offsets_to_commit = [] bookmarked_partitions = state.get('bookmarks', {}).get(topic, {}) for partition in bookmarked_partitions: @@ -327,11 +343,15 @@ def commit_consumer_to_bookmarked_state(consumer, topic, state): offsets_to_commit.append(topic_partition) consumer.commit(offsets=offsets_to_commit) + LOGGER.info("Bookmarked offsets committed") # pylint: disable=too-many-locals,too-many-statements def read_kafka_messages(consumer, kafka_config, state): """Read kafka topic continuously and writing transformed singer messages to STDOUT""" + + LOGGER.info('Starting Kafka messages consumption...') + topic = kafka_config['topic'] primary_keys = kafka_config['primary_keys'] use_message_key = kafka_config['use_message_key'] @@ -351,6 +371,9 @@ def read_kafka_messages(consumer, kafka_config, state): # Stop consuming more messages if no new message and consumer_timeout_ms exceeded if polled_message is None: + LOGGER.info('No new message received in %s ms. Stop consuming more messages.', + kafka_config["consumer_timeout_ms"] + ) break message = polled_message diff --git a/tests/integration/test_consumer.py b/tests/integration/test_consumer.py index d9e6e10..8b5f3bc 100644 --- a/tests/integration/test_consumer.py +++ b/tests/integration/test_consumer.py @@ -1,16 +1,17 @@ import unittest import time -from datetime import datetime import singer +from confluent_kafka import KafkaException +from datetime import datetime + import tap_kafka.serialization +import tests.integration.utils as test_utils -from tap_kafka import sync, get_args -from tap_kafka.errors import AllBrokersDownException +from tap_kafka import sync from tap_kafka.errors import DiscoveryException from tap_kafka.errors import InvalidConfigException from tap_kafka.serialization.json_with_no_schema import JSONSimpleSerializer -import tests.integration.utils as test_utils SINGER_MESSAGES = [] @@ -89,12 +90,12 @@ def test_tap_kafka_consumer_brokers_down(self): 'topic': topic, 'group_id': test_utils.generate_unique_consumer_group(), 'consumer_timeout_ms': 1000, - 'session_timeout_ms': '1', - 'max_poll_interval_ms': '10' + 'session_timeout_ms': 1, + 'max_poll_interval_ms': 10 }) catalog = {'streams': tap_kafka.common.generate_catalog(tap_kafka_config)} - with self.assertRaises(AllBrokersDownException): + with self.assertRaises(KafkaException): sync.do_sync(tap_kafka_config, catalog, state={'bookmarks': {topic: {}}}) def test_tap_kafka_consumer(self): diff --git a/tests/unit/test_tap_kafka.py b/tests/unit/test_tap_kafka.py index d0e93c9..3da521a 100644 --- a/tests/unit/test_tap_kafka.py +++ b/tests/unit/test_tap_kafka.py @@ -146,6 +146,7 @@ def test_generate_config_with_defaults(self): 'partitions': tap_kafka.DEFAULT_PARTITIONS, 'proto_classes_dir': tap_kafka.DEFAULT_PROTO_CLASSES_DIR, 'proto_schema': tap_kafka.DEFAULT_PROTO_SCHEMA, + 'debug_contexts': None }) def test_generate_config_with_custom_parameters(self): @@ -169,7 +170,8 @@ def test_generate_config_with_custom_parameters(self): 'max_poll_interval_ms': 5555, 'message_format': 'protobuf', 'proto_classes_dir': '/tmp/proto-classes', - 'proto_schema': 'proto-schema' + 'proto_schema': 'proto-schema', + 'debug_contexts': 'topic,cgrp' } self.assertDictEqual(tap_kafka.generate_config(custom_config), { 'topic': 'my_topic', @@ -190,7 +192,8 @@ def test_generate_config_with_custom_parameters(self): 'max_poll_interval_ms': 5555, 'message_format': 'protobuf', 'proto_classes_dir': '/tmp/proto-classes', - 'proto_schema': 'proto-schema' + 'proto_schema': 'proto-schema', + 'debug_contexts': 'topic,cgrp' }) def test_validate_config(self):