Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhanced Kafka source logging through the use of MDC and thread naming #4663

Merged
merged 1 commit into from
Jun 26, 2024

Conversation

dlvenable
Copy link
Member

Description

This PR continues the work done in #4131 by adding similar support for the Kafka source.

This adds logging MDC to the kafka source to disambiguate it against the kafka buffer Entry points into the Sink interface set the MDC value. Also, the threads which the kafka source directly creates will have MDC and also have a useful thread name.

This MDC is now available for both Data Prepper loggers and Apache Kafka loggers.

Results

First, I updated the logging pattern to include the MDC key:

appender.console.layout.pattern = %d{ISO8601} [%t] [%X{kafkaPluginType}] %-5p %40C - %m%n

Then I ran. The logs look like:

2024-06-25T17:44:34,212 [logs-kafka-source-1] [source] INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] Subscribed to topic(s): logs
2024-06-25T17:44:34,215 [logs-pipeline-sink-worker-2-thread-1] [source] INFO  org.apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka version: 7.6.0-ccs
2024-06-25T17:44:34,215 [logs-pipeline-sink-worker-2-thread-1] [source] INFO  org.apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka commitId: 1991cb733c81d679
2024-06-25T17:44:34,215 [logs-pipeline-sink-worker-2-thread-1] [source] INFO  org.apache.kafka.common.utils.AppInfoParser$AppInfo - Kafka startTimeMs: 1719355474215
2024-06-25T17:44:34,215 [logs-kafka-source-2] [source] INFO  org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] Subscribed to topic(s): logs
2024-06-25T17:44:34,216 [logs-pipeline-sink-worker-2-thread-1] [source] INFO  org.opensearch.dataprepper.plugins.kafka.source.KafkaSource - Started Kafka source for topic logs
2024-06-25T17:44:34,216 [logs-pipeline-sink-worker-2-thread-1] [] INFO  org.opensearch.dataprepper.pipeline.Pipeline - Pipeline [logs-pipeline] - Submitting request to initiate the pipeline processing
2024-06-25T17:44:34,349 [logs-kafka-source-2] [source] INFO         org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] Cluster ID: _yZHgyDeT6ynMobR262CaA
2024-06-25T17:44:34,349 [logs-kafka-source-1] [source] INFO         org.apache.kafka.clients.Metadata - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] Cluster ID: _yZHgyDeT6ynMobR262CaA
2024-06-25T17:44:34,349 [logs-kafka-source-1] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-06-25T17:44:34,349 [logs-kafka-source-2] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
2024-06-25T17:44:34,350 [logs-kafka-source-1] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-data-prepper-1, groupId=data-prepper] (Re-)joining group
2024-06-25T17:44:34,350 [logs-kafka-source-2] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] (Re-)joining group
2024-06-25T17:44:34,362 [logs-kafka-source-2] [source] INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-data-prepper-2, groupId=data-prepper] Request joining group due to: need to re-join with the given member-id: consumer-data-prepper-2-ae860ae1-e687-4e2c-9bad-c5b56ab911a2

Issues Resolved

Resolves #4126

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

…d names for Kafka source threads. Resolves opensearch-project#4126.

Signed-off-by: David Venable <[email protected]>
allTopicExecutorServices.add(executorService);

IntStream.range(0, numWorkers).forEach(index -> {
while (true) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part of the code is increasing, Probably good idea to move to separate helper function. Maybe a separate PR.

@dlvenable dlvenable merged commit 36d599f into opensearch-project:main Jun 26, 2024
43 of 50 checks passed
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this pull request Jul 23, 2024
…d names for Kafka source threads. Resolves opensearch-project#4126. (opensearch-project#4663)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this pull request Jul 23, 2024
…d names for Kafka source threads. Resolves opensearch-project#4126. (opensearch-project#4663)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this pull request Jul 30, 2024
…d names for Kafka source threads. Resolves opensearch-project#4126. (opensearch-project#4663)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this pull request Aug 8, 2024
…d names for Kafka source threads. Resolves opensearch-project#4126. (opensearch-project#4663)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this pull request Aug 12, 2024
…d names for Kafka source threads. Resolves opensearch-project#4126. (opensearch-project#4663)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
kkondaka pushed a commit to kkondaka/kk-data-prepper-f2 that referenced this pull request Aug 14, 2024
…d names for Kafka source threads. Resolves opensearch-project#4126. (opensearch-project#4663)

Signed-off-by: David Venable <[email protected]>
Signed-off-by: Krishna Kondaka <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support enhanced configuration of the Kafka source and buffer loggers
3 participants