Skip to content

Commit

Permalink
Add config for custom metrics tags (confluentinc#2971)
Browse files Browse the repository at this point in the history
  • Loading branch information
vcrfxia committed Jun 20, 2019
1 parent 539824a commit 6d12685
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 30 deletions.
28 changes: 28 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
Changelog
=========

<<<<<<< HEAD
=======
Version 5.4.0
-------------

KSQL 5.4.0 includes new features, including:

* UDAFs support STRUCTs as parameters and return values.

* KSQL now supports working with source data where the value is an anonymous Avro or JSON serialized
``ARRAY``, ``MAP`` or primitive type, for example ``STRING`` or ``BIGINT``. Previously KSQL required all
Avro values to be Avro records, and all JSON values to be JSON objects.
For more information, refer to :ref:`ksql_single_field_wrapping`.

* KSQL now allows users to control how results containing only a single value field are serialized
to Kafka. Users can now choose to serialize the single value as a named field within an outer
Avro record or JSON object, depending on the format in use, or as an anonymous value.
For more information, refer to :ref:`ksql_single_field_wrapping`.

* A new config ``ksql.metrics.tags.custom`` for adding custom tags to emitted JMX metrics.
See :ref:`ksql-metrics-tags-custom` for usage.

KSQL 5.4.0 includes the following misc. changes:

* Require either the value for a ``@UdfParameter`` or for the UDF JAR to be compiled with
the Java8 ``-parameters`` compilation option. The UDF archetype now includes this flag.

>>>>>>> a0cd791c8... Add config for custom metrics tags (#2971)
Version 5.3.0
-------------

Expand Down
10 changes: 10 additions & 0 deletions docs/installation/server-config/config-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,16 @@ The corresponding environment variable in the
`KSQL Server image <https://hub.docker.com/r/confluentinc/cp-ksql-server/>`__ is
``KSQL_LISTENERS``.

.. _ksql-metrics-tags-custom:

------------------------
ksql.metrics.tags.custom
------------------------

A list of tags to be included with emitted :ref:`JMX metrics <ksql-monitoring-and-metrics>`,
formatted as a string of ``key:value`` pairs separated by commas.
For example, ``key1:value1,key2:value2``.

.. _ksql-c3-settings:

|c3| Settings
Expand Down
28 changes: 28 additions & 0 deletions ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package io.confluent.ksql.util;

import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.config.ConfigItem;
Expand Down Expand Up @@ -143,6 +144,11 @@ public class KsqlConfig extends AbstractConfig {
private static final String
defaultSchemaRegistryUrl = "http://localhost:8081";

public static final String KSQL_CUSTOM_METRICS_TAGS = "ksql.metrics.tags.custom";
private static final String KSQL_CUSTOM_METRICS_TAGS_DOC =
"A list of tags to be included with emitted JMX metrics, formatted as a string of key:value "
+ "pairs separated by commas. For example, 'key1:value1,key2:value2'.";

public static final String KSQL_STREAMS_PREFIX = "ksql.streams.";

public static final String KSQL_COLLECT_UDF_METRICS = "ksql.udf.collect.metrics";
Expand Down Expand Up @@ -447,6 +453,12 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
KSQL_SECURITY_EXTENSION_DEFAULT,
ConfigDef.Importance.LOW,
KSQL_SECURITY_EXTENSION_DOC
).define(
KSQL_CUSTOM_METRICS_TAGS,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
KSQL_CUSTOM_METRICS_TAGS_DOC
)
.withClientSslSupport();
for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down Expand Up @@ -684,6 +696,22 @@ public KsqlConfig overrideBreakingConfigsWithOriginalValues(final Map<String, St
return new KsqlConfig(ConfigGeneration.LEGACY, mergedProperties, mergedStreamConfigProps);
}

public Map<String, String> getStringAsMap(final String key) {
final String value = getString(key).trim();
try {
return value.equals("")
? Collections.emptyMap()
: Splitter.on(",").trimResults().withKeyValueSeparator(":").split(value);
} catch (IllegalArgumentException e) {
throw new KsqlException(
String.format(
"Invalid config value for '%s'. value: %s. reason: %s",
key,
value,
e.getMessage()));
}
}

private static Set<String> sslConfigNames() {
final ConfigDef sslConfig = new ConfigDef();
SslConfigs.addClientSslSupport(sslConfig);
Expand Down
4 changes: 2 additions & 2 deletions ksql-engine/src/main/java/io/confluent/ksql/KsqlContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ public static KsqlContext create(
final ServiceContext serviceContext = DefaultServiceContext.create(ksqlConfig);
final MutableFunctionRegistry functionRegistry = new InternalFunctionRegistry();
UdfLoader.newInstance(ksqlConfig, functionRegistry, ".").load();
final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final ServiceInfo serviceInfo = ServiceInfo.create(ksqlConfig);
final KsqlEngine engine = new KsqlEngine(
serviceContext,
processingLogContext,
functionRegistry,
serviceId);
serviceInfo);

return new KsqlContext(
serviceContext,
Expand Down
55 changes: 55 additions & 0 deletions ksql-engine/src/main/java/io/confluent/ksql/ServiceInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright 2019 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql;

import io.confluent.ksql.util.KsqlConfig;
import java.util.Map;
import java.util.Objects;

public final class ServiceInfo {

private final String serviceId;
private final Map<String, String> customMetricsTags;

/**
* Create an object to be passed from the KSQL context down to the KSQL engine.
*/
public static ServiceInfo create(final KsqlConfig ksqlConfig) {
Objects.requireNonNull(ksqlConfig, "ksqlConfig cannot be null.");

final String serviceId = ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG);
final Map<String, String> customMetricsTags =
ksqlConfig.getStringAsMap(KsqlConfig.KSQL_CUSTOM_METRICS_TAGS);

return new ServiceInfo(serviceId, customMetricsTags);
}

private ServiceInfo(
final String serviceId,
final Map<String, String> customMetricsTags
) {
this.serviceId = Objects.requireNonNull(serviceId, "serviceId");
this.customMetricsTags = Objects.requireNonNull(customMetricsTags, "customMetricsTags");
}

public String serviceId() {
return serviceId;
}

public Map<String, String> customMetricsTags() {
return customMetricsTags;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.ddl.commands.DdlCommandExec;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
Expand Down Expand Up @@ -67,14 +68,14 @@ public KsqlEngine(
final ServiceContext serviceContext,
final ProcessingLogContext processingLogContext,
final FunctionRegistry functionRegistry,
final String serviceId
final ServiceInfo serviceInfo
) {
this(
serviceContext,
processingLogContext,
serviceId,
serviceInfo.serviceId(),
new MetaStoreImpl(functionRegistry),
KsqlEngineMetrics::new);
(engine) -> new KsqlEngineMetrics(engine, serviceInfo.customMetricsTags()));
}

KsqlEngine(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.common.MetricName;
Expand Down Expand Up @@ -55,24 +57,28 @@ public class KsqlEngineMetrics implements Closeable {
private final Sensor errorRate;

private final String ksqlServiceId;

private final Map<String, String> customMetricsTags;

private final KsqlEngine ksqlEngine;
private final Metrics metrics;

public KsqlEngineMetrics(final KsqlEngine ksqlEngine) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics());
public KsqlEngineMetrics(
final KsqlEngine ksqlEngine,
final Map<String, String> customMetricsTags) {
this(METRIC_GROUP_PREFIX, ksqlEngine, MetricCollectors.getMetrics(), customMetricsTags);
}

KsqlEngineMetrics(
final String metricGroupPrefix,
final KsqlEngine ksqlEngine,
final Metrics metrics) {
final Metrics metrics,
final Map<String, String> customMetricsTags) {
this.ksqlEngine = ksqlEngine;
this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId();
this.sensors = new ArrayList<>();
this.countMetrics = new ArrayList<>();
this.metricGroupName = metricGroupPrefix + "-query-stats";
this.customMetricsTags = customMetricsTags;

this.metrics = metrics;

Expand Down Expand Up @@ -269,7 +275,8 @@ private void configureMetric(
statSupplier.get());
// new
sensor.add(
metrics.metricName(metricName, ksqlServiceId + metricGroupName, description),
metrics.metricName(
metricName, ksqlServiceId + metricGroupName, description, customMetricsTags),
statSupplier.get());
}

Expand All @@ -293,6 +300,7 @@ private void configureGaugeForState(
final Metrics metrics,
final String name,
final String group,
final Map<String, String> tags,
final KafkaStreams.State state
) {
final Gauge<Long> gauge =
Expand All @@ -302,7 +310,7 @@ private void configureGaugeForState(
.filter(queryMetadata -> queryMetadata.getState().equals(state.toString()))
.count();
final String description = String.format("Count of queries in %s state.", state.toString());
final MetricName metricName = metrics.metricName(name, group, description);
final MetricName metricName = metrics.metricName(name, group, description, tags);
final CountMetric countMetric = new CountMetric(metricName, gauge);
metrics.addMetric(metricName, gauge);
countMetrics.add(countMetric);
Expand All @@ -315,15 +323,17 @@ private void configureNumActiveQueriesForGivenState(
// legacy
configureGaugeForState(
metrics,
ksqlServiceId + metricGroupName + "-" + name,
ksqlServiceId + metricGroupName + "-" + name,
metricGroupName,
Collections.emptyMap(),
state
);
// new
configureGaugeForState(
metrics,
name,
ksqlServiceId + metricGroupName,
customMetricsTags,
state
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static KsqlContext create(
serviceContext,
ProcessingLogContext.create(),
functionRegistry,
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG)
ServiceInfo.create(ksqlConfig)
);

return new KsqlContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.QueryMetadata;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -48,7 +50,7 @@ public static KsqlEngine createKsqlEngine(
ProcessingLogContext.create(),
"test_instance_",
metaStore,
KsqlEngineMetrics::new
(engine) -> new KsqlEngineMetrics(engine, Collections.emptyMap())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlConfigTestUtil;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand Down Expand Up @@ -96,7 +97,7 @@ public void before() throws Exception {
serviceContext,
processingLogContext,
new InternalFunctionRegistry(),
ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

topicClient = serviceContext.getTopicClient();
metaStore = ksqlEngine.getMetaStore();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.ops;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.prefixedResource;
import static io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster.resource;
import static io.confluent.ksql.util.KsqlConfig.KSQL_CUSTOM_METRICS_TAGS;
import static io.confluent.ksql.util.KsqlConfig.KSQL_SERVICE_ID_CONFIG;
import static org.apache.kafka.common.acl.AclOperation.ALL;
import static org.apache.kafka.common.acl.AclOperation.CREATE;
Expand All @@ -37,6 +38,7 @@

import io.confluent.common.utils.IntegrationTest;
import io.confluent.ksql.KsqlConfigTestUtil;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlEngineTestUtil;
import io.confluent.ksql.function.InternalFunctionRegistry;
Expand Down Expand Up @@ -269,7 +271,7 @@ private void givenTestSetupWithConfig(final Map<String, Object> ksqlConfigs) {
serviceContext,
ProcessingLogContext.create(),
new InternalFunctionRegistry(),
ksqlConfig.getString(KSQL_SERVICE_ID_CONFIG));
ServiceInfo.create(ksqlConfig));

execInitCreateStreamQueries();
}
Expand Down
Loading

0 comments on commit 6d12685

Please sign in to comment.