Skip to content
This repository has been archived by the owner on Sep 23, 2024. It is now read-only.

Commit

Permalink
AP-1574 More logs and new config for ease of debugging (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
Samira-El authored Nov 20, 2023
1 parent 91f7aee commit 061ed1d
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 40 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
8.2.0 (2023-11-17)
------------------
- Add more info logs
- Add new config [`debug_contexts`](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts) for enabling debugging

8.1.0 (2023-08-25)
------------------
- Bump `dpath` from `2.0.6` to `2.1.*`
- Bump `confluent-kafka[protobuf]` from `1.9.2` to `2.2.*`
- Bump `grpcio-tools` from `1.51.1` to `1.57.*`
- Bump test dependencies

8.0.0 (2022-12-08)
------------------
- Switch from `subscribe` to `assign` for better initial offset control
Expand Down
37 changes: 19 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,25 @@ or

Full list of options in `config.json`:

| Property | Type | Required? | Description |
|-----------------------------------|---------|------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| bootstrap_servers | String | Yes | `host[:port]` string (or list of comma separated `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata. |
| group_id | String | Yes | The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. |
| topic | String | Yes | Name of kafka topic to subscribe to |
| partitions | List | | (Default: [] (all)) Partition(s) of topic to consume, example `[0,4]` |
| primary_keys | Object | | Optionally you can define primary key for the consumed messages. It requires a column name and `/slashed/paths` ala xpath selector to extract the value from the kafka messages. The extracted column will be added to every output singer message. |
| use_message_key | Bool | | (Default: true) Defines whether to use Kafka message key as a primary key for the record. Note: if a custom primary key(s) has been defined, it will be used instead of the message_key. |
| initial_start_time | String | | (Default: latest) Start time reference of the message consumption if no bookmarked position in `state.json`. One of: `beginning`, `earliest`, `latest` or an ISO-8601 formatted timestamp string. |
| max_runtime_ms | Integer | | (Default: 300000) The maximum time for the tap to collect new messages from Kafka topic. If this time exceeds it will flush the batch and close kafka connection. |
| commit_interval_ms | Integer | | (Default: 5000) Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store. |
| consumer_timeout_ms | Integer | | (Default: 10000) KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration |
| session_timeout_ms | Integer | | (Default: 30000) KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities. |
| heartbeat_interval_ms | Integer | | (Default: 10000) KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities. |
| max_poll_interval_ms | Integer | | (Default: 300000) KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management. |
| message_format | String | | (Default: json) Supported message formats are `json` and `protobuf`. |
| proto_schema | String | | Protobuf message format in `.proto` syntax. Required if the `message_format` is `protobuf`. |
| proto_classes_dir | String | | (Default: current working dir) |
| Property | Type | Required? | Description |
|-----------------------|---------|-----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| bootstrap_servers | String | Yes | `host[:port]` string (or list of comma separated `host[:port]` strings) that the consumer should contact to bootstrap initial cluster metadata. |
| group_id | String | Yes | The name of the consumer group to join for dynamic partition assignment (if enabled), and to use for fetching and committing offsets. |
| topic | String | Yes | Name of kafka topic to subscribe to |
| partitions | List | | (Default: [] (all)) Partition(s) of topic to consume, example `[0,4]` |
| primary_keys | Object | | Optionally you can define primary key for the consumed messages. It requires a column name and `/slashed/paths` ala xpath selector to extract the value from the kafka messages. The extracted column will be added to every output singer message. |
| use_message_key | Bool | | (Default: true) Defines whether to use Kafka message key as a primary key for the record. Note: if a custom primary key(s) has been defined, it will be used instead of the message_key. |
| initial_start_time | String | | (Default: latest) Start time reference of the message consumption if no bookmarked position in `state.json`. One of: `beginning`, `earliest`, `latest` or an ISO-8601 formatted timestamp string. |
| max_runtime_ms | Integer | | (Default: 300000) The maximum time for the tap to collect new messages from Kafka topic. If this time exceeds it will flush the batch and close kafka connection. |
| commit_interval_ms | Integer | | (Default: 5000) Number of milliseconds between two commits. This is different than the kafka auto commit feature. Tap-kafka sends commit messages automatically but only when the data consumed successfully and persisted to local store. |
| consumer_timeout_ms | Integer | | (Default: 10000) KafkaConsumer setting. Number of milliseconds to block during message iteration before raising StopIteration |
| session_timeout_ms | Integer | | (Default: 30000) KafkaConsumer setting. The timeout used to detect failures when using Kafka’s group management facilities. |
| heartbeat_interval_ms | Integer | | (Default: 10000) KafkaConsumer setting. The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities. |
| max_poll_interval_ms | Integer | | (Default: 300000) KafkaConsumer setting. The maximum delay between invocations of poll() when using consumer group management. |
| message_format | String | | (Default: json) Supported message formats are `json` and `protobuf`. |
| proto_schema | String | | Protobuf message format in `.proto` syntax. Required if the `message_format` is `protobuf`. |
| proto_classes_dir | String | | (Default: current working dir) |
| debug_contexts | String | | comma separated list of debug contexts to enable for the consumer [see librkafka](https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts) |


This tap reads Kafka messages and generating singer compatible SCHEMA and RECORD messages in the following format.
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
long_description = fh.read()

setup(name='pipelinewise-tap-kafka',
version='8.1.0',
version='8.2.0',
description='Singer.io tap for extracting data from Kafka topic - PipelineWise compatible',
long_description=long_description,
long_description_content_type='text/markdown',
Expand Down
1 change: 1 addition & 0 deletions tap_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def generate_config(args_config):
'message_format': args_config.get('message_format', DEFAULT_MESSAGE_FORMAT),
'proto_schema': args_config.get('proto_schema', DEFAULT_PROTO_SCHEMA),
'proto_classes_dir': args_config.get('proto_classes_dir', DEFAULT_PROTO_CLASSES_DIR),
'debug_contexts': args_config.get('debug_contexts'),
}

validate_config(config)
Expand Down
47 changes: 35 additions & 12 deletions tap_kafka/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import singer
import confluent_kafka

from confluent_kafka import KafkaError
from confluent_kafka import KafkaException
from typing import List

from singer import utils, metadata
from tap_kafka.errors import AllBrokersDownException
from tap_kafka.errors import InvalidBookmarkException
from tap_kafka.errors import InvalidConfigException
from tap_kafka.errors import InvalidTimestampException
Expand Down Expand Up @@ -111,7 +111,8 @@ def epoch_to_iso_timestamp(epoch) -> str:

def init_kafka_consumer(kafka_config):
LOGGER.info('Initialising Kafka Consumer...')
consumer = confluent_kafka.DeserializingConsumer({

consumer_conf = {
# Required parameters
'bootstrap.servers': kafka_config['bootstrap_servers'],
'group.id': kafka_config['group_id'],
Expand All @@ -124,7 +125,15 @@ def init_kafka_consumer(kafka_config):
# Non-configurable parameters
'enable.auto.commit': False,
'value.deserializer': init_value_deserializer(kafka_config),
})
}

if kafka_config['debug_contexts']:
# https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
consumer_conf['debug'] = kafka_config['debug_contexts']

consumer = confluent_kafka.DeserializingConsumer(consumer_conf)

LOGGER.info('Kafka Consumer initialised successfully')

return consumer

Expand Down Expand Up @@ -171,7 +180,7 @@ def kafka_message_to_singer_record(message, primary_keys: dict, use_message_key:
if primary_keys:
for key, pk_selector in primary_keys.items():
try:
record[key] = dpath.util.get(message.value(), pk_selector)
record[key] = dpath.get(message.value(), pk_selector)
except KeyError:
raise PrimaryKeyNotFoundException(f"Custom primary key not found in the message: '{pk_selector}'")
elif use_message_key:
Expand All @@ -189,17 +198,20 @@ def consume_kafka_message(message, topic, primary_keys, use_message_key):
singer.write_message(singer.RecordMessage(stream=topic, record=singer_record, time_extracted=utils.now()))


def select_kafka_partitions(consumer, kafka_config) -> confluent_kafka.TopicPartition:
def select_kafka_partitions(consumer, kafka_config) -> List[confluent_kafka.TopicPartition]:
"""Select partitions in topic"""

LOGGER.info(f"Selecting partitions in topic '{kafka_config['topic']}'")

topic = kafka_config['topic']
partition_ids_requested = kafka_config['partitions']

try:
topic_meta = consumer.list_topics(topic, timeout=kafka_config['max_poll_interval_ms'] / 1000)
partition_meta = topic_meta.topics[topic].partitions
except:
raise AllBrokersDownException
except KafkaException:
LOGGER.exception(f"Unable to list partitions in topic '{topic}'", exc_info=True)
raise

if not partition_meta:
raise InvalidConfigException(f"No partitions available in topic '{topic}'")
Expand All @@ -209,7 +221,7 @@ def select_kafka_partitions(consumer, kafka_config) -> confluent_kafka.TopicPart
for partition in partition_meta:
partition_ids_available.append(partition)

if partition_ids_requested == []:
if not partition_ids_requested:
partition_ids = partition_ids_available
LOGGER.info(f"Requesting all partitions in topic '{topic}'")
else:
Expand Down Expand Up @@ -302,21 +314,25 @@ def set_partition_offsets(consumer, partitions, kafka_config, state = {}):

def assign_kafka_partitions(consumer, partitions):
"""Assign and seek partitions to offsets"""
LOGGER.info(f"Assigning partitions '{partitions}'")
LOGGER.info("Assigning partitions to consumer ...")

consumer.assign(partitions)

partitions_committed = partitions
for partition in partitions_committed:
partition.offset = partition.offset - 1
partition.offset -= 1

if all(partition.offset >= 0 for partition in partitions_committed):
LOGGER.info(f"Committing partitions '{partitions_committed}'")
LOGGER.info("Committing partitions ")
consumer.commit(offsets=partitions_committed)
else:
LOGGER.info("Partitions not committed because one or more offsets are less than zero")


def commit_consumer_to_bookmarked_state(consumer, topic, state):
"""Commit every bookmarked offset to kafka"""
LOGGER.info("Committing bookmarked offsets to kafka ...")

offsets_to_commit = []
bookmarked_partitions = state.get('bookmarks', {}).get(topic, {})
for partition in bookmarked_partitions:
Expand All @@ -327,11 +343,15 @@ def commit_consumer_to_bookmarked_state(consumer, topic, state):
offsets_to_commit.append(topic_partition)

consumer.commit(offsets=offsets_to_commit)
LOGGER.info("Bookmarked offsets committed")


# pylint: disable=too-many-locals,too-many-statements
def read_kafka_messages(consumer, kafka_config, state):
"""Read kafka topic continuously and writing transformed singer messages to STDOUT"""

LOGGER.info('Starting Kafka messages consumption...')

topic = kafka_config['topic']
primary_keys = kafka_config['primary_keys']
use_message_key = kafka_config['use_message_key']
Expand All @@ -351,6 +371,9 @@ def read_kafka_messages(consumer, kafka_config, state):

# Stop consuming more messages if no new message and consumer_timeout_ms exceeded
if polled_message is None:
LOGGER.info('No new message received in %s ms. Stop consuming more messages.',
kafka_config["consumer_timeout_ms"]
)
break

message = polled_message
Expand Down
15 changes: 8 additions & 7 deletions tests/integration/test_consumer.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import unittest
import time
from datetime import datetime
import singer

from confluent_kafka import KafkaException
from datetime import datetime

import tap_kafka.serialization
import tests.integration.utils as test_utils

from tap_kafka import sync, get_args
from tap_kafka.errors import AllBrokersDownException
from tap_kafka import sync
from tap_kafka.errors import DiscoveryException
from tap_kafka.errors import InvalidConfigException
from tap_kafka.serialization.json_with_no_schema import JSONSimpleSerializer
import tests.integration.utils as test_utils

SINGER_MESSAGES = []

Expand Down Expand Up @@ -89,12 +90,12 @@ def test_tap_kafka_consumer_brokers_down(self):
'topic': topic,
'group_id': test_utils.generate_unique_consumer_group(),
'consumer_timeout_ms': 1000,
'session_timeout_ms': '1',
'max_poll_interval_ms': '10'
'session_timeout_ms': 1,
'max_poll_interval_ms': 10
})
catalog = {'streams': tap_kafka.common.generate_catalog(tap_kafka_config)}

with self.assertRaises(AllBrokersDownException):
with self.assertRaises(KafkaException):
sync.do_sync(tap_kafka_config, catalog, state={'bookmarks': {topic: {}}})

def test_tap_kafka_consumer(self):
Expand Down
Loading

0 comments on commit 061ed1d

Please sign in to comment.