From 81e6501140d986a06b5406bceaae3e1796490ee7 Mon Sep 17 00:00:00 2001 From: David Venable Date: Wed, 26 Jun 2024 14:25:57 -0500 Subject: [PATCH] Enhanced Kafka source logging through the use of MDC and better thread names for Kafka source threads. Resolves #4126. (#4663) Signed-off-by: David Venable Signed-off-by: Krishna Kondaka --- .../plugins/kafka/common/KafkaMdc.java | 4 +- .../thread/KafkaPluginThreadFactory.java | 33 ++++- .../plugins/kafka/source/KafkaSource.java | 132 ++++++++++-------- .../thread/KafkaPluginThreadFactoryTest.java | 73 ++++++++++ .../plugins/kafka/source/KafkaSourceTest.java | 40 ++++++ 5 files changed, 225 insertions(+), 57 deletions(-) diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaMdc.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaMdc.java index 9ae8985908..785d565e78 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaMdc.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/KafkaMdc.java @@ -3,6 +3,8 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.dataprepper.plugins.kafka.common;public class KafkaMdc { +package org.opensearch.dataprepper.plugins.kafka.common; + +public class KafkaMdc { public static final String MDC_KAFKA_PLUGIN_KEY = "kafkaPluginType"; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactory.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactory.java index a05540c320..b5dede6cda 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactory.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactory.java @@ -25,7 +25,16 @@ public class KafkaPluginThreadFactory implements ThreadFactory { final ThreadFactory delegateThreadFactory, final String kafkaPluginType) { this.delegateThreadFactory = delegateThreadFactory; - this.threadPrefix = "kafka-" + kafkaPluginType + "-"; + this.threadPrefix = createPluginPart(kafkaPluginType); + this.kafkaPluginType = kafkaPluginType; + } + + KafkaPluginThreadFactory( + final ThreadFactory delegateThreadFactory, + final String kafkaPluginType, + final String kafkaTopic) { + this.delegateThreadFactory = delegateThreadFactory; + this.threadPrefix = normalizeName(kafkaTopic) + "-" + createPluginPart(kafkaPluginType); this.kafkaPluginType = kafkaPluginType; } @@ -39,6 +48,28 @@ public static KafkaPluginThreadFactory defaultExecutorThreadFactory(final String return new KafkaPluginThreadFactory(Executors.defaultThreadFactory(), kafkaPluginType); } + /** + * Creates an instance specifically for use with {@link Executors}. + * + * @param kafkaPluginType The name of the plugin type. e.g. sink, source, buffer + * @return An instance of the {@link KafkaPluginThreadFactory}. + */ + public static KafkaPluginThreadFactory defaultExecutorThreadFactory( + final String kafkaPluginType, + final String kafkaTopic) { + return new KafkaPluginThreadFactory(Executors.defaultThreadFactory(), kafkaPluginType, kafkaTopic); + } + + private static String createPluginPart(final String kafkaPluginType) { + return "kafka-" + kafkaPluginType + "-"; + } + + private static String normalizeName(final String kafkaTopic) { + final String limitedName = kafkaTopic.length() > 20 ? kafkaTopic.substring(0, 20) : kafkaTopic; + return limitedName + .toLowerCase().replaceAll("[^a-z0-9]", "-"); + } + @Override public Thread newThread(final Runnable runnable) { final Thread thread = delegateThreadFactory.newThread(() -> { diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java index 525c754929..e235594ce2 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSource.java @@ -29,6 +29,8 @@ import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; +import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc; +import org.opensearch.dataprepper.plugins.kafka.common.thread.KafkaPluginThreadFactory; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.OAuthConfig; @@ -46,6 +48,7 @@ import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import java.io.IOException; import java.util.ArrayList; @@ -73,10 +76,10 @@ public class KafkaSource implements Source> { private static final String NO_RESOLVABLE_URLS_ERROR_MESSAGE = "No resolvable bootstrap urls given in bootstrap.servers"; private static final long RETRY_SLEEP_INTERVAL = 30000; + private static final String MDC_KAFKA_PLUGIN_VALUE = "source"; private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); private final KafkaSourceConfig sourceConfig; private final AtomicBoolean shutdownInProgress; - private ExecutorService executorService; private final PluginMetrics pluginMetrics; private KafkaCustomConsumer consumer; private KafkaConsumer kafkaConsumer; @@ -112,59 +115,65 @@ public KafkaSource(final KafkaSourceConfig sourceConfig, @Override public void start(Buffer> buffer) { - Properties authProperties = new Properties(); - KafkaSecurityConfigurer.setDynamicSaslClientCallbackHandler(authProperties, sourceConfig, pluginConfigObservable); - KafkaSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); - sourceConfig.getTopics().forEach(topic -> { - consumerGroupID = topic.getGroupId(); - KafkaTopicConsumerMetrics topicMetrics = new KafkaTopicConsumerMetrics(topic.getName(), pluginMetrics, true); - Properties consumerProperties = getConsumerProperties(topic, authProperties); - MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); - try { - int numWorkers = topic.getWorkers(); - executorService = Executors.newFixedThreadPool(numWorkers); - allTopicExecutorServices.add(executorService); - - IntStream.range(0, numWorkers).forEach(index -> { - while (true) { - try { - kafkaConsumer = createKafkaConsumer(schema, consumerProperties); - break; - } catch (ConfigException ce) { - if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) { - LOG.warn("Exception while creating Kafka consumer: ", ce); - LOG.warn("Bootstrap URL could not be resolved. Retrying in {} ms...", RETRY_SLEEP_INTERVAL); - try { - sleep(RETRY_SLEEP_INTERVAL); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); + try { + setMdc(); + Properties authProperties = new Properties(); + KafkaSecurityConfigurer.setDynamicSaslClientCallbackHandler(authProperties, sourceConfig, pluginConfigObservable); + KafkaSecurityConfigurer.setAuthProperties(authProperties, sourceConfig, LOG); + sourceConfig.getTopics().forEach(topic -> { + consumerGroupID = topic.getGroupId(); + KafkaTopicConsumerMetrics topicMetrics = new KafkaTopicConsumerMetrics(topic.getName(), pluginMetrics, true); + Properties consumerProperties = getConsumerProperties(topic, authProperties); + MessageFormat schema = MessageFormat.getByMessageFormatByName(schemaType); + try { + int numWorkers = topic.getWorkers(); + final ExecutorService executorService = Executors.newFixedThreadPool( + numWorkers, KafkaPluginThreadFactory.defaultExecutorThreadFactory(MDC_KAFKA_PLUGIN_VALUE, topic.getName())); + allTopicExecutorServices.add(executorService); + + IntStream.range(0, numWorkers).forEach(index -> { + while (true) { + try { + kafkaConsumer = createKafkaConsumer(schema, consumerProperties); + break; + } catch (ConfigException ce) { + if (ce.getMessage().contains(NO_RESOLVABLE_URLS_ERROR_MESSAGE)) { + LOG.warn("Exception while creating Kafka consumer: ", ce); + LOG.warn("Bootstrap URL could not be resolved. Retrying in {} ms...", RETRY_SLEEP_INTERVAL); + try { + sleep(RETRY_SLEEP_INTERVAL); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } else { + throw ce; } - } else { - throw ce; } + } + consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, + acknowledgementSetManager, null, topicMetrics, PauseConsumePredicate.noPause()); + allTopicConsumers.add(consumer); + executorService.submit(consumer); + }); + } catch (Exception e) { + if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) { + LOG.error("The kafka broker is not available..."); + } else { + LOG.error("Failed to setup the Kafka Source Plugin.", e); } - consumer = new KafkaCustomConsumer(kafkaConsumer, shutdownInProgress, buffer, sourceConfig, topic, schemaType, - acknowledgementSetManager, null, topicMetrics, PauseConsumePredicate.noPause()); - allTopicConsumers.add(consumer); - - executorService.submit(consumer); - }); - } catch (Exception e) { - if (e instanceof BrokerNotAvailableException || e instanceof TimeoutException) { - LOG.error("The kafka broker is not available..."); - } else { - LOG.error("Failed to setup the Kafka Source Plugin.", e); + throw new RuntimeException(e); } - throw new RuntimeException(e); - } - LOG.info("Started Kafka source for topic " + topic.getName()); - }); + LOG.info("Started Kafka source for topic " + topic.getName()); + }); + } finally { + removeMdc(); + } } - public KafkaConsumer createKafkaConsumer(final MessageFormat schema, final Properties consumerProperties) { + KafkaConsumer createKafkaConsumer(final MessageFormat schema, final Properties consumerProperties) { switch (schema) { case JSON: return new KafkaConsumer(consumerProperties); @@ -183,19 +192,24 @@ public void start(Buffer> buffer) { @Override public void stop() { - shutdownInProgress.set(true); - final long shutdownWaitTime = calculateLongestThreadWaitingTime(); + try { + setMdc(); + shutdownInProgress.set(true); + final long shutdownWaitTime = calculateLongestThreadWaitingTime(); - LOG.info("Shutting down {} Executor services", allTopicExecutorServices.size()); - allTopicExecutorServices.forEach(executor -> stopExecutor(executor, shutdownWaitTime)); + LOG.info("Shutting down {} Executor services", allTopicExecutorServices.size()); + allTopicExecutorServices.forEach(executor -> stopExecutor(executor, shutdownWaitTime)); - LOG.info("Closing {} consumers", allTopicConsumers.size()); - allTopicConsumers.forEach(consumer -> consumer.closeConsumer()); + LOG.info("Closing {} consumers", allTopicConsumers.size()); + allTopicConsumers.forEach(consumer -> consumer.closeConsumer()); - LOG.info("Kafka source shutdown successfully..."); + LOG.info("Kafka source shutdown successfully..."); + } finally { + removeMdc(); + } } - public void stopExecutor(final ExecutorService executorService, final long shutdownWaitTime) { + private void stopExecutor(final ExecutorService executorService, final long shutdownWaitTime) { executorService.shutdown(); try { if (!executorService.awaitTermination(shutdownWaitTime, TimeUnit.SECONDS)) { @@ -346,7 +360,7 @@ private void setPropertiesForSchemaRegistryConnectivity(Properties properties) { } } - protected void sleep(final long millis) throws InterruptedException { + void sleep(final long millis) throws InterruptedException { Thread.sleep(millis); } @@ -366,4 +380,12 @@ private void updateConfig(final KafkaClusterConfigSupplier kafkaClusterConfigSup } } } + + private static void setMdc() { + MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, MDC_KAFKA_PLUGIN_VALUE); + } + + private static void removeMdc() { + MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY); + } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactoryTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactoryTest.java index 589f81a74c..1f1bc854dc 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactoryTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/common/thread/KafkaPluginThreadFactoryTest.java @@ -8,6 +8,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -37,10 +39,12 @@ class KafkaPluginThreadFactoryTest { @Mock private Runnable runnable; private String pluginType; + private String topic; @BeforeEach void setUp() { pluginType = UUID.randomUUID().toString(); + topic = UUID.randomUUID().toString(); when(delegateThreadFactory.newThread(any(Runnable.class))).thenReturn(innerThread); } @@ -50,11 +54,20 @@ private KafkaPluginThreadFactory createObjectUnderTest() { return new KafkaPluginThreadFactory(delegateThreadFactory, pluginType); } + private KafkaPluginThreadFactory createObjectUnderTestWithTopic() { + return new KafkaPluginThreadFactory(delegateThreadFactory, pluginType, topic); + } + @Test void newThread_creates_thread_from_delegate() { assertThat(createObjectUnderTest().newThread(runnable), equalTo(innerThread)); } + @Test + void newThread_with_topic_creates_thread_from_delegate() { + assertThat(createObjectUnderTestWithTopic().newThread(runnable), equalTo(innerThread)); + } + @Test void newThread_creates_thread_with_name() { final KafkaPluginThreadFactory objectUnderTest = createObjectUnderTest(); @@ -69,6 +82,30 @@ void newThread_creates_thread_with_name() { verify(thread2).setName(String.format("kafka-%s-2", pluginType)); } + @ParameterizedTest + @CsvSource({ + "abcd12,abcd12", + "aBCd12,abcd12", + "abcd-12,abcd-12", + "has space,has-space", + "has!character,has-character", + "this-is-somewhat-too-long,this-is-somewhat-too" + }) + void newThread_with_topic_creates_thread_with_name( + final String topicName, + final String expectedPrefix) { + this.topic = topicName; + final KafkaPluginThreadFactory objectUnderTest = createObjectUnderTestWithTopic(); + + final Thread thread1 = objectUnderTest.newThread(runnable); + assertThat(thread1, notNullValue()); + verify(thread1).setName(String.format("%s-kafka-%s-1", expectedPrefix, pluginType)); + + final Thread thread2 = objectUnderTest.newThread(runnable); + assertThat(thread2, notNullValue()); + verify(thread2).setName(String.format("%s-kafka-%s-2", expectedPrefix, pluginType)); + } + @Test void newThread_creates_thread_with_wrapping_runnable() { createObjectUnderTest().newThread(runnable); @@ -85,6 +122,22 @@ void newThread_creates_thread_with_wrapping_runnable() { verify(runnable).run(); } + @Test + void newThread_with_topic_creates_thread_with_wrapping_runnable() { + createObjectUnderTestWithTopic().newThread(runnable); + + final ArgumentCaptor actualRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(delegateThreadFactory).newThread(actualRunnableCaptor.capture()); + + final Runnable actualRunnable = actualRunnableCaptor.getValue(); + + assertThat(actualRunnable, not(equalTo(runnable))); + + verifyNoInteractions(runnable); + actualRunnable.run(); + verify(runnable).run(); + } + @Test void newThread_creates_thread_that_calls_MDC_on_run() { createObjectUnderTest().newThread(runnable); @@ -104,4 +157,24 @@ void newThread_creates_thread_that_calls_MDC_on_run() { assertThat(actualKafkaPluginType[0], equalTo(pluginType)); } + + @Test + void newThread_with_topic_creates_thread_that_calls_MDC_on_run() { + createObjectUnderTestWithTopic().newThread(runnable); + + final ArgumentCaptor actualRunnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(delegateThreadFactory).newThread(actualRunnableCaptor.capture()); + + final Runnable actualRunnable = actualRunnableCaptor.getValue(); + + final String[] actualKafkaPluginType = new String[1]; + doAnswer(a -> { + actualKafkaPluginType[0] = MDC.get(KafkaMdc.MDC_KAFKA_PLUGIN_KEY); + return null; + }).when(runnable).run(); + + actualRunnable.run(); + + assertThat(actualKafkaPluginType[0], equalTo(pluginType)); + } } \ No newline at end of file diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java index 1503a7424d..3433a92b76 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/source/KafkaSourceTest.java @@ -6,11 +6,14 @@ package org.opensearch.dataprepper.plugins.kafka.source; import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; @@ -21,6 +24,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.plugin.PluginConfigObservable; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.kafka.common.KafkaMdc; import org.opensearch.dataprepper.plugins.kafka.configuration.AuthConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.AwsConfig; import org.opensearch.dataprepper.plugins.kafka.configuration.TopicConsumerConfig; @@ -29,6 +33,7 @@ import org.opensearch.dataprepper.plugins.kafka.configuration.SchemaConfig; import org.opensearch.dataprepper.plugins.kafka.extension.KafkaClusterConfigSupplier; import org.opensearch.dataprepper.plugins.kafka.util.MessageFormat; +import org.slf4j.MDC; import java.time.Duration; import java.util.Collections; @@ -41,6 +46,7 @@ import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -230,4 +236,38 @@ void test_updateConfig_not_using_kafkaClusterConfigExtension() { verify(sourceConfig, never()).setAwsConfig(any()); verify(sourceConfig, never()).setEncryptionConfig(any()); } + + @Nested + class MdcTests { + private MockedStatic mdcMockedStatic; + + @BeforeEach + void setUp() { + mdcMockedStatic = mockStatic(MDC.class); + } + + @AfterEach + void tearDown() { + mdcMockedStatic.close(); + } + + @Test + void start_sets_and_removes_MDC() { + when(topic1.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + when(topic2.getSessionTimeOut()).thenReturn(Duration.ofSeconds(15)); + + createObjectUnderTest().start(buffer); + + mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "source")); + mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY)); + } + + @Test + void stop_sets_and_removes_MDC() { + createObjectUnderTest().stop(); + + mdcMockedStatic.verify(() -> MDC.put(KafkaMdc.MDC_KAFKA_PLUGIN_KEY, "source")); + mdcMockedStatic.verify(() -> MDC.remove(KafkaMdc.MDC_KAFKA_PLUGIN_KEY)); + } + } }