Skip to content

Commit

Permalink
add config for custom metrics tags
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Jun 13, 2019
1 parent 6ca94a6 commit e3a1799
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 29 deletions.
11 changes: 11 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_WRAP_SINGLE_VALUES =
"ksql.persistence.wrap.single.values";

public static final String KSQL_CUSTOM_METRICS_TAGS = "ksql.metrics.tags.custom";
private static final String KSQL_CUSTOM_METRICS_TAGS_DOC =
"A list of tags to be included with emitted JMX metrics, formatted as a string of key:value "
+ "pairs separated by semicolons. For example, 'key1:value1;key2:value2'.";

public static final String
defaultSchemaRegistryUrl = "http://localhost:8081";

Expand Down Expand Up @@ -470,6 +475,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
+ "e.g. '{\"FOO\": 10}." + System.lineSeparator()
+ "Note: the DELIMITED format ignores this setting as it does not support the "
+ "concept of a STRUCT, record or object."
).define(
KSQL_CUSTOM_METRICS_TAGS,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_TAGS_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
4 changes: 3 additions & 1 deletion ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ public static KsqlContext create(
final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry();
UdfLoader.newInstance(ksqlConfig, functionRegistry, ".").load();
final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final String customMetricsTags = ksqlConfig.getString(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS);
final KsqlEngine engine = new KsqlEngine(
serviceContext,
processingLogContext,
functionRegistry,
serviceId);
serviceId,
customMetricsTags);

return new KsqlContext(
serviceContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ public KsqlEngine(
final ServiceContext serviceContext,
final ProcessingLogContext processingLogContext,
final FunctionRegistry functionRegistry,
final String serviceId
final String serviceId,
final String customMetricsTags
) {
this(
serviceContext,
processingLogContext,
serviceId,
new MetaStoreImpl(functionRegistry),
KsqlEngineMetrics::new);
(engine) -> new KsqlEngineMetrics(engine, customMetricsTags));
}

KsqlEngine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,20 @@

package io.confluent.ksql.internal;

import com.google.common.base.Splitter;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.metrics.MetricCollectors;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlConstants;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.QueryMetadata;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -55,24 +60,26 @@ public class KsqlEngineMetrics implements Closeable {
private final Sensor errorRate;

private final String ksqlServiceId;

private final Map<String, String> customMetricsTags;

private final KsqlEngine ksqlEngine;
private final Metrics metrics;

public KsqlEngineMetrics(final KsqlEngine ksqlEngine) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics());
public KsqlEngineMetrics(final KsqlEngine ksqlEngine, final String customMetricsTags) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics(), customMetricsTags);
}

KsqlEngineMetrics(
final String metricGroupPrefix,
final KsqlEngine ksqlEngine,
final Metrics metrics) {
final Metrics metrics,
final String customMetricsTags) {
this.ksqlEngine = ksqlEngine;
this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId();
this.sensors = new ArrayList<>();
this.countMetrics = new ArrayList<>();
this.metricGroupName = metricGroupPrefix + "-query-stats";
this.customMetricsTags = createMetricsTags(customMetricsTags);

this.metrics = metrics;

Expand Down Expand Up @@ -269,7 +276,8 @@ private void configureMetric(
statSupplier.get());
// new
sensor.add(
metrics.metricName(metricName, ksqlServiceId + metricGroupName, description),
metrics.metricName(
metricName, ksqlServiceId + metricGroupName, description, customMetricsTags),
statSupplier.get());
}

Expand All @@ -293,6 +301,7 @@ private void configureGaugeForState(
final Metrics metrics,
final String name,
final String group,
final Map<String, String> tags,
final KafkaStreams.State state
) {
final Gauge<Long> gauge =
Expand All @@ -302,7 +311,7 @@ private void configureGaugeForState(
.filter(queryMetadata -> queryMetadata.getState().equals(state.toString()))
.count();
final String description = String.format("Count of queries in %s state.", state.toString());
final MetricName metricName = metrics.metricName(name, group, description);
final MetricName metricName = metrics.metricName(name, group, description, tags);
final CountMetric countMetric = new CountMetric(metricName, gauge);
metrics.addMetric(metricName, gauge);
countMetrics.add(countMetric);
Expand All @@ -315,19 +324,36 @@ private void configureNumActiveQueriesForGivenState(
// legacy
configureGaugeForState(
metrics,
ksqlServiceId + metricGroupName + "-" + name,
ksqlServiceId + metricGroupName + "-" + name,
metricGroupName,
Collections.emptyMap(),
state
);
// new
configureGaugeForState(
metrics,
name,
ksqlServiceId + metricGroupName,
customMetricsTags,
state
);
}

private static Map<String, String> createMetricsTags(final String tags) {
try {
return tags.equals("")
? Collections.emptyMap()
: Splitter.on(";").trimResults().withKeyValueSeparator(":").split(tags);
} catch (IllegalArgumentException e) {
throw new KsqlException(
String.format(
"Invalid config value for '%s'. value: %s. reason: %s",
KsqlConfig.KSQL_CUSTOM_METRICS_TAGS,
tags,
e.getMessage()));
}
}

private static class CountMetric {
private final Gauge<Long> count;
private final MetricName metricName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public static KsqlContext create(
serviceContext,
ProcessingLogContext.create(),
functionRegistry,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ksqlConfig.getString(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS)
);

return new KsqlContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static KsqlEngine createKsqlEngine(
ProcessingLogContext.create(),
"test_instance_",
metaStore,
KsqlEngineMetrics::new
(engine) -> new KsqlEngineMetrics(engine, "")
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ public void before() throws Exception {
serviceContext,
ProcessingLogContext.create(),
new InternalFunctionRegistry(),
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ksqlConfig.getString(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS));

topicClient = serviceContext.getTopicClient();
metaStore = ksqlEngine.getMetaStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.ops;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.prefixedResource;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.resource;
import static io.confluent.ksql.util.KsqlConfig.KSQL_CUSTOM_METRICS_TAGS;
import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
Expand Down Expand Up @@ -269,7 +270,8 @@ private void givenTestSetupWithConfig(final Map<String, Object> ksqlConfigs) {
serviceContext,
ProcessingLogContext.create(),
new InternalFunctionRegistry(),
ksqlConfig.getString(KSQL_SERVICE_ID_CONFIG));
ksqlConfig.getString(KSQL_SERVICE_ID_CONFIG),
ksqlConfig.getString(KSQL_CUSTOM_METRICS_TAGS));

execInitCreateStreamQueries();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.number.IsCloseTo.closeTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -63,6 +62,8 @@ public class KsqlEngineMetricsTest {
private KsqlEngineMetrics engineMetrics;
private static final String KSQL_SERVICE_ID = "test-ksql-service-id";
private static final String metricNamePrefix = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + KSQL_SERVICE_ID;
private static final String CUSTOM_TAGS = "tag1:value1;tag2:value2";
private static final Map<String, String> EXPECTED_TAGS = ImmutableMap.of("tag1", "value1", "tag2", "value2");

@Mock
private KsqlEngine ksqlEngine;
Expand All @@ -75,7 +76,7 @@ public void setUp() {
when(ksqlEngine.getServiceId()).thenReturn(KSQL_SERVICE_ID);
when(query1.getQueryApplicationId()).thenReturn("app-1");

engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine, MetricCollectors.getMetrics());
engineMetrics = new KsqlEngineMetrics(METRIC_GROUP, ksqlEngine, MetricCollectors.getMetrics(), CUSTOM_TAGS);
}

@After
Expand All @@ -93,15 +94,6 @@ public void shouldRemoveAllSensorsOnClose() {
engineMetrics.registeredSensors().forEach(sensor -> assertThat(engineMetrics.getMetrics().getSensor(sensor.name()), is(nullValue())));
}

private void shouldRecordRate(final String name, final double expected, final double error) {
assertThat(
Math.floor(getMetricValueLegacy(name)),
closeTo(expected, error));
assertThat(
Math.floor(getMetricValue(name)),
closeTo(expected, error));
}

@Test
public void shouldRecordNumberOfActiveQueries() {
when(ksqlEngine.numberOfLiveQueries()).thenReturn(3);
Expand Down Expand Up @@ -264,7 +256,7 @@ private double getMetricValue(final String metricName) {
return Double.valueOf(
metrics.metric(
metrics.metricName(
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats")
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats", EXPECTED_TAGS)
).metricValue().toString()
);
}
Expand All @@ -274,7 +266,7 @@ private long getLongMetricValue(final String metricName) {
return Long.parseLong(
metrics.metric(
metrics.metricName(
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats")
metricName, metricNamePrefix + METRIC_GROUP + "-query-stats", EXPECTED_TAGS)
).metricValue().toString()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,8 @@ static KsqlRestApplication buildApplication(
serviceContext,
processingLogContext,
functionRegistry,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ksqlConfig.getString(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS));

UdfLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ static StandaloneExecutor create(
serviceContext,
processingLogContext,
functionRegistry,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG),
ksqlConfig.getString(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS));

final UdfLoader udfLoader =
UdfLoader.newInstance(ksqlConfig, functionRegistry, installDir);
Expand Down

0 comments on commit e3a1799

Please sign in to comment.