Skip to content

Commit

Permalink
feat: add configurable metrics reporters (#3490)
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenpyzhang authored Oct 9, 2019
1 parent 13e5dfe commit 378b8af
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.metrics;

import io.confluent.common.utils.Time;
import io.confluent.ksql.util.KsqlConfig;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -90,6 +91,15 @@ static String addCollector(final String id, final MetricCollector collector) {
return finalId;
}

public static void addConfigurableReporter(final KsqlConfig ksqlConfig) {
final List<MetricsReporter> reporters = ksqlConfig.getConfiguredInstances(
KsqlConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
for (final MetricsReporter reporter: reporters) {
metrics.addReporter(reporter);
}
}

static void remove(final String id) {
collectorMap.remove(id);
}
Expand Down
11 changes: 11 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -53,6 +54,11 @@ public class KsqlConfig extends AbstractConfig {
static final String KSQ_FUNCTIONS_GLOBAL_PROPERTY_PREFIX =
KSQL_FUNCTIONS_PROPERTY_PREFIX + "_global_.";

public static final String METRIC_REPORTER_CLASSES_CONFIG = "ksql.metric.reporters";

public static final String METRIC_REPORTER_CLASSES_DOC =
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC;

public static final String SINK_NUMBER_OF_PARTITIONS_PROPERTY = "ksql.sink.partitions";

public static final String SINK_NUMBER_OF_REPLICAS_PROPERTY = "ksql.sink.replicas";
Expand Down Expand Up @@ -551,6 +557,11 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
),
ConfigDef.Importance.LOW,
KSQL_ACCESS_VALIDATOR_DOC
).define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
"",
Importance.LOW,
METRIC_REPORTER_CLASSES_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,29 @@
package io.confluent.ksql.metrics;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import io.confluent.ksql.util.KsqlConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.record.TimestampType;
import org.junit.After;
import org.junit.Before;
Expand All @@ -42,6 +48,8 @@ public class MetricCollectorsTest {

private static final String TEST_TOPIC = "shared-topic";

private KsqlConfig ksqlConfig = mock(KsqlConfig.class);

@Before
public void setUp() {
MetricCollectors.initialize();
Expand All @@ -60,6 +68,18 @@ public void shouldAggregateStats() {
assertThat(aggregateMetrics.values().iterator().next().getValue(), equalTo(3.0));
}

@Test
public void shouldAddConfigurableReporters() {
final MetricsReporter mockReporter = mock(MetricsReporter.class);
assertThat(MetricCollectors.getMetrics().reporters().size(), equalTo(1));
when(ksqlConfig.getConfiguredInstances(any(), any()))
.thenReturn(Collections.singletonList(mockReporter));

MetricCollectors.addConfigurableReporter(ksqlConfig);
final List<MetricsReporter> reporters = MetricCollectors.getMetrics().reporters();
assertThat(reporters, hasItem(mockReporter));
}

@Test
public void shouldKeepWorkingWhenDuplicateTopicConsumerIsRemoved() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.confluent.ksql.json.JsonMapper;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
Expand Down Expand Up @@ -440,6 +441,8 @@ static KsqlRestApplication buildApplication(

final KsqlConfig ksqlConfig = new KsqlConfig(restConfig.getKsqlConfigProperties());

MetricCollectors.addConfigurableReporter(ksqlConfig);

final ProcessingLogConfig processingLogConfig
= new ProcessingLogConfig(restConfig.getOriginals());
final ProcessingLogContext processingLogContext
Expand All @@ -451,7 +454,7 @@ static KsqlRestApplication buildApplication(
Thread.setDefaultUncaughtExceptionHandler(
new KsqlUncaughtExceptionHandler(LogManager::shutdown));
}

final HybridQueryIdGenerator hybridQueryIdGenerator =
new HybridQueryIdGenerator();

Expand Down

0 comments on commit 378b8af

Please sign in to comment.