Skip to content

Commit

Permalink
Add configurable parameters for statsd client (#16283)
Browse files Browse the repository at this point in the history
Statsd client sometimes drops metrics when this queueSize of statsd client with max unprocessed messages is completely full. This causes some high cardinality metrics like per partition lag being droppped.
There are multiple parameters of statsdclient that can be initialized and can help increase the load/capacity of client to not to drop metrics more frequently.
Properties like queueSize, poolSize, processorWorkers and senderWorkers will now be configurable at runtime
  • Loading branch information
hardikbajaj authored Apr 17, 2024
1 parent 34237bc commit 0bf5e77
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 11 deletions.
4 changes: 4 additions & 0 deletions docs/development/extensions-contrib/statsd.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ All the configuration parameters for the StatsD emitter are under `druid.emitter
|`druid.emitter.statsd.includeHost`|Flag to include the hostname as part of the metric name.|no|false|
|`druid.emitter.statsd.dimensionMapPath`|JSON file defining the StatsD type, and desired dimensions for every Druid metric|no|Default mapping provided. See below.|
|`druid.emitter.statsd.blankHolder`|The blank character replacement as StatsD does not support path with blank character|no|"-"|
|`druid.emitter.statsd.queueSize`|Maximum number of unprocessed messages in the message queue.|no|Default value of StatsD Client(4096)|
|`druid.emitter.statsd.poolSize`|Network packet buffer pool size.|no|Default value of StatsD Client(512)|
|`druid.emitter.statsd.processorWorkers`|The number of processor worker threads assembling buffers for submission.|no|Default value of StatsD Client(1)|
|`druid.emitter.statsd.senderWorkers`| The number of sender worker threads submitting buffers to the socket.|no|Default value of StatsD Client(1)|
|`druid.emitter.statsd.dogstatsd`|Flag to enable [DogStatsD](https://docs.datadoghq.com/developers/dogstatsd/) support. Causes dimensions to be included as tags, not as a part of the metric name. `convertRange` fields will be ignored.|no|false|
|`druid.emitter.statsd.dogstatsdConstantTags`|If `druid.emitter.statsd.dogstatsd` is true, the tags in the JSON list of strings will be sent with every event.|no|[]|
|`druid.emitter.statsd.dogstatsdServiceAsTag`|If `druid.emitter.statsd.dogstatsd` and `druid.emitter.statsd.dogstatsdServiceAsTag` are true, druid service (e.g. `druid/broker`, `druid/coordinator`, etc) is reported as a tag (e.g. `druid_service:druid/broker`) instead of being included in metric name (e.g. `druid.broker.query.time`) and `druid` is used as metric prefix (e.g. `druid.query.time`).|no|false|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ static StatsDEmitter of(StatsDEmitterConfig config, ObjectMapper mapper)
.hostname(config.getHostname())
.port(config.getPort())
.constantTags(config.isDogstatsd() ? config.getDogstatsdConstantTags().toArray(new String[0]) : EMPTY_ARRAY)
.queueSize(config.getQueueSize())
.bufferPoolSize(config.getPoolSize())
.processorWorkers(config.getProcessorWorkers())
.senderWorkers(config.getSenderWorkers())
.errorHandler(new StatsDClientErrorHandler()
{
private int exceptionCount = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.timgroup.statsd.NonBlockingStatsDClient;

import javax.annotation.Nullable;
import java.util.Collections;
Expand Down Expand Up @@ -56,6 +57,14 @@ public class StatsDEmitterConfig
private final Boolean dogstatsdServiceAsTag;
@JsonProperty
private final Boolean dogstatsdEvents;
@JsonProperty
private final Integer queueSize;
@JsonProperty
private final Integer poolSize;
@JsonProperty
private final Integer processorWorkers;
@JsonProperty
private final Integer senderWorkers;

@JsonCreator
public StatsDEmitterConfig(
Expand All @@ -69,7 +78,11 @@ public StatsDEmitterConfig(
@JsonProperty("dogstatsd") @Nullable Boolean dogstatsd,
@JsonProperty("dogstatsdConstantTags") @Nullable List<String> dogstatsdConstantTags,
@JsonProperty("dogstatsdServiceAsTag") @Nullable Boolean dogstatsdServiceAsTag,
@JsonProperty("dogstatsdEvents") @Nullable Boolean dogstatsdEvents
@JsonProperty("dogstatsdEvents") @Nullable Boolean dogstatsdEvents,
@JsonProperty("queueSize") @Nullable Integer queueSize,
@JsonProperty("poolSize") @Nullable Integer poolSize,
@JsonProperty("processorWorkers") @Nullable Integer processorWorkers,
@JsonProperty("senderWorkers") @Nullable Integer senderWorkers
)
{
this.hostname = Preconditions.checkNotNull(hostname, "StatsD hostname cannot be null.");
Expand All @@ -83,6 +96,10 @@ public StatsDEmitterConfig(
this.dogstatsdConstantTags = dogstatsdConstantTags != null ? dogstatsdConstantTags : Collections.emptyList();
this.dogstatsdServiceAsTag = dogstatsdServiceAsTag != null ? dogstatsdServiceAsTag : false;
this.dogstatsdEvents = dogstatsdEvents != null ? dogstatsdEvents : false;
this.queueSize = queueSize != null ? queueSize : NonBlockingStatsDClient.DEFAULT_QUEUE_SIZE;
this.poolSize = poolSize != null ? poolSize : NonBlockingStatsDClient.DEFAULT_POOL_SIZE;
this.processorWorkers = processorWorkers != null ? processorWorkers : NonBlockingStatsDClient.DEFAULT_PROCESSOR_WORKERS;
this.senderWorkers = senderWorkers != null ? senderWorkers : NonBlockingStatsDClient.DEFAULT_SENDER_WORKERS;
}

@Override
Expand Down Expand Up @@ -121,14 +138,26 @@ public boolean equals(Object o)
if (!Objects.equals(dogstatsdServiceAsTag, that.dogstatsdServiceAsTag)) {
return false;
}
if (!Objects.equals(queueSize, that.queueSize)) {
return false;
}
if (!Objects.equals(poolSize, that.poolSize)) {
return false;
}
if (!Objects.equals(processorWorkers, that.processorWorkers)) {
return false;
}
if (!Objects.equals(senderWorkers, that.senderWorkers)) {
return false;
}
return Objects.equals(dogstatsdConstantTags, that.dogstatsdConstantTags);
}

@Override
public int hashCode()
{
return Objects.hash(hostname, port, prefix, separator, includeHost, dimensionMapPath,
blankHolder, dogstatsd, dogstatsdConstantTags, dogstatsdServiceAsTag);
blankHolder, dogstatsd, dogstatsdConstantTags, dogstatsdServiceAsTag, queueSize, poolSize, processorWorkers, senderWorkers);
}

@JsonProperty
Expand Down Expand Up @@ -197,4 +226,24 @@ public Boolean isDogstatsdEvents()
{
return dogstatsdEvents;
}
@JsonProperty
public Integer getQueueSize()
{
return queueSize;
}
@JsonProperty
public Integer getPoolSize()
{
return poolSize;
}
@JsonProperty
public Integer getProcessorWorkers()
{
return processorWorkers;
}
@JsonProperty
public Integer getSenderWorkers()
{
return senderWorkers;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testConvertRange()
{
StatsDClient client = mock(StatsDClient.class);
StatsDEmitter emitter = new StatsDEmitter(
new StatsDEmitterConfig("localhost", 8888, null, null, null, null, null, null, null, null, null),
new StatsDEmitterConfig("localhost", 8888, null, null, null, null, null, null, null, null, null, null, null, null, null),
new ObjectMapper(),
client
);
Expand All @@ -60,7 +60,7 @@ public void testConvertRangeWithDogstatsd()
{
StatsDClient client = mock(StatsDClient.class);
StatsDEmitter emitter = new StatsDEmitter(
new StatsDEmitterConfig("localhost", 8888, null, null, null, null, null, true, null, null, null),
new StatsDEmitterConfig("localhost", 8888, null, null, null, null, null, true, null, null, null, null, null, null, null),
new ObjectMapper(),
client
);
Expand All @@ -77,7 +77,7 @@ public void testNoConvertRange()
{
StatsDClient client = mock(StatsDClient.class);
StatsDEmitter emitter = new StatsDEmitter(
new StatsDEmitterConfig("localhost", 8888, null, null, null, null, null, null, null, null, null),
new StatsDEmitterConfig("localhost", 8888, null, null, null, null, null, null, null, null, null, null, null, null, null),
new ObjectMapper(),
client
);
Expand All @@ -103,7 +103,7 @@ public void testConfigOptions()
{
StatsDClient client = mock(StatsDClient.class);
StatsDEmitter emitter = new StatsDEmitter(
new StatsDEmitterConfig("localhost", 8888, null, "#", true, null, null, null, null, null, null),
new StatsDEmitterConfig("localhost", 8888, null, "#", true, null, null, null, null, null, null, null, null, null, null),
new ObjectMapper(),
client
);
Expand All @@ -129,7 +129,7 @@ public void testDogstatsdEnabled()
{
StatsDClient client = mock(StatsDClient.class);
StatsDEmitter emitter = new StatsDEmitter(
new StatsDEmitterConfig("localhost", 8888, null, "#", true, null, null, true, null, null, null),
new StatsDEmitterConfig("localhost", 8888, null, "#", true, null, null, true, null, null, null, null, null, null, null),
new ObjectMapper(),
client
);
Expand All @@ -156,7 +156,7 @@ public void testBlankHolderOptions()
{
StatsDClient client = mock(StatsDClient.class);
StatsDEmitter emitter = new StatsDEmitter(
new StatsDEmitterConfig("localhost", 8888, null, null, true, null, null, null, null, null, null),
new StatsDEmitterConfig("localhost", 8888, null, null, true, null, null, null, null, null, null, null, null, null, null),
new ObjectMapper(),
client
);
Expand All @@ -173,7 +173,7 @@ public void testServiceAsTagOption()
{
StatsDClient client = mock(StatsDClient.class);
StatsDEmitter emitter = new StatsDEmitter(
new StatsDEmitterConfig("localhost", 8888, null, null, true, null, null, true, null, true, null),
new StatsDEmitterConfig("localhost", 8888, null, null, true, null, null, true, null, true, null, null, null, null, null),
new ObjectMapper(),
client
);
Expand All @@ -192,7 +192,7 @@ public void testAlertEvent()
{
StatsDClient client = mock(StatsDClient.class);
StatsDEmitter emitter = new StatsDEmitter(
new StatsDEmitterConfig("localhost", 8888, null, null, true, null, null, true, null, true, true),
new StatsDEmitterConfig("localhost", 8888, null, null, true, null, null, true, null, true, true, null, null, null, null),
new ObjectMapper(),
client
);
Expand Down Expand Up @@ -239,7 +239,12 @@ public void testInitialization()
true,
ImmutableList.of("tag1", "value1"),
true,
true
true,
5100,
512,
1,
1

);
try (StatsDEmitter emitter = StatsDEmitter.of(config, new ObjectMapper())) {

Expand Down

0 comments on commit 0bf5e77

Please sign in to comment.