Skip to content

Commit

Permalink
Add native support for prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
pdambrauskas committed Jul 14, 2020
1 parent ebc034c commit 1113b4c
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 20 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@
<artifactId>micrometer-registry-statsd</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.5.2</version>
</dependency>
</dependencies>

<build>
Expand Down
1 change: 1 addition & 0 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,7 @@ secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.OstrichM
# secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.MicroMeterMetricCollector
# secor.monitoring.metrics.collector.micrometer.jmx.enabled=true
# secor.monitoring.metrics.collector.micrometer.statsd.enabled=true
# secor.monitoring.metrics.collector.micrometer.prometheus.enabled=true

# Row group size in bytes for Parquet writers. Specifies how much data will be buffered in memory before flushing a
# block to disk. Larger values allow for larger column chinks which makes it possible to do larger sequential IO.
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -766,11 +766,19 @@ public String getMetricsCollectorClass() {
}

public boolean getMicroMeterCollectorJmxEnabled() {
return getBoolean("secor.monitoring.metrics.collector.micrometer.jmx.enabled");
return getBoolean("secor.monitoring.metrics.collector.micrometer.jmx.enabled", false);
}

public boolean getMicroMeterCollectorStatsdEnabled() {
return getBoolean("secor.monitoring.metrics.collector.micrometer.statsd.enabled");
return getBoolean("secor.monitoring.metrics.collector.micrometer.statsd.enabled", false);
}

public boolean getMicroMeterCollectorPrometheusEnabled() {
return getBoolean("secor.monitoring.metrics.collector.micrometer.prometheus.enabled", false);
}

public int getMicroMeterCacheSize() {
return getInt("secor.monitoring.metrics.collector.micrometer.cache.size", 500);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
package com.pinterest.secor.common;
package com.pinterest.secor.common.monitoring;

import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.util.StatsUtil;
import com.twitter.ostrich.admin.AdminServiceFactory;
import com.twitter.ostrich.admin.CustomHttpHandler;
Expand All @@ -30,6 +31,7 @@
import scala.collection.JavaConversions;
import scala.collection.Map$;
import scala.collection.immutable.List$;
import scala.collection.immutable.Map;
import scala.util.matching.Regex;

import java.util.Arrays;
Expand All @@ -44,21 +46,25 @@
public class OstrichAdminService {
private static final Logger LOG = LoggerFactory.getLogger(OstrichAdminService.class);
private final int mPort;
private final boolean mPrometheusEnabled;

public OstrichAdminService(int port) {
this.mPort = port;
public OstrichAdminService(SecorConfig config) {
mPort = config.getOstrichPort();
mPrometheusEnabled = config.getMicroMeterCollectorPrometheusEnabled();
}

public void start() {
Duration[] defaultLatchIntervals = {Duration.apply(1, TimeUnit.MINUTES)};
Map<String, CustomHttpHandler> handlers = mPrometheusEnabled ?
new Map.Map1<>("/prometheus", new PrometheusHandler()) : Map$.MODULE$.empty();
@SuppressWarnings("deprecation")
AdminServiceFactory adminServiceFactory = new AdminServiceFactory(
this.mPort,
20,
List$.MODULE$.<StatsFactory>empty(),
Option.<String>empty(),
List$.MODULE$.<Regex>empty(),
Map$.MODULE$.<String, CustomHttpHandler>empty(),
handlers,
JavaConversions
.asScalaBuffer(Arrays.asList(defaultLatchIntervals)).toList()
);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.pinterest.secor.common.monitoring;

import com.sun.net.httpserver.HttpExchange;
import com.twitter.ostrich.admin.CustomHttpHandler;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.apache.http.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

/**
* Initializes Http Endpoint for Prometheus
*
* @author Paulius Dambrauskas ([email protected])
*/
public class PrometheusHandler extends CustomHttpHandler {
private static final Logger LOG = LoggerFactory.getLogger(PrometheusHandler.class);

@Override
public void handle(HttpExchange exchange) {
Optional<PrometheusMeterRegistry> registry = Metrics.globalRegistry.getRegistries().stream()
.filter(meterRegistry -> meterRegistry instanceof PrometheusMeterRegistry)
.map(meterRegistry -> (PrometheusMeterRegistry) meterRegistry)
.findFirst();
if (registry.isPresent()) {
this.render(registry.get().scrape(), exchange, HttpStatus.SC_OK);
} else {
LOG.warn("Trying to scrape prometheus, while it is disabled, " +
"set \"secor.monitoring.metrics.collector.micrometer.prometheus.enabled\" to \"true\"");
this.render("Not Found", exchange, HttpStatus.SC_NOT_FOUND);
}
}
}
5 changes: 2 additions & 3 deletions src/main/java/com/pinterest/secor/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,9 @@ public class Consumer extends Thread {
private volatile boolean mShuttingDown = false;
private static volatile boolean mCallingSystemExit = false;

public Consumer(SecorConfig config) {
public Consumer(SecorConfig config, MetricCollector metricCollector) {
mConfig = config;
mMetricCollector = metricCollector;
isLegacyConsumer = true;
}

Expand All @@ -89,8 +90,6 @@ private void init() throws Exception {
}
mKafkaMessageIterator = KafkaMessageIteratorFactory.getIterator(mConfig.getKafkaMessageIteratorClass(), mConfig);
mMessageReader = new MessageReader(mConfig, mOffsetTracker, mKafkaMessageIterator);
mMetricCollector = ReflectionUtil.createMetricCollector(mConfig.getMetricsCollectorClass());
mMetricCollector.initialize(mConfig);

FileRegistry fileRegistry = new FileRegistry(mConfig);
UploadManager uploadManager = ReflectionUtil.createUploadManager(mConfig.getUploadManagerClass(), mConfig);
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/com/pinterest/secor/main/ConsumerMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@
*/
package com.pinterest.secor.main;

import com.pinterest.secor.common.OstrichAdminService;
import com.pinterest.secor.common.monitoring.OstrichAdminService;
import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.ShutdownHookRegistry;
import com.pinterest.secor.consumer.Consumer;
import com.pinterest.secor.io.StagingDirectoryCleaner;
import com.pinterest.secor.monitoring.MetricCollector;
import com.pinterest.secor.tools.LogFileDeleter;
import com.pinterest.secor.util.FileUtil;
import com.pinterest.secor.util.IdUtil;
import com.pinterest.secor.util.RateLimitUtil;
import com.pinterest.secor.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -59,7 +61,11 @@ public static void main(String[] args) {
SecorConfig config = SecorConfig.load();
String stagingDirectoryPath = config.getLocalPath() + '/' + IdUtil.getLocalMessageDir();
ShutdownHookRegistry.registerHook(10, new StagingDirectoryCleaner(stagingDirectoryPath));
OstrichAdminService ostrichService = new OstrichAdminService(config.getOstrichPort());

MetricCollector metricCollector = ReflectionUtil.createMetricCollector(config.getMetricsCollectorClass());
metricCollector.initialize(config);

OstrichAdminService ostrichService = new OstrichAdminService(config);
ostrichService.start();
FileUtil.configure(config);

Expand All @@ -70,7 +76,7 @@ public static void main(String[] args) {
LOG.info("starting {} consumer threads", config.getConsumerThreads());
LinkedList<Consumer> consumers = new LinkedList<Consumer>();
for (int i = 0; i < config.getConsumerThreads(); ++i) {
Consumer consumer = new Consumer(config);
Consumer consumer = new Consumer(config, metricCollector);
consumers.add(consumer);
consumer.start();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,36 @@
package com.pinterest.secor.monitoring;

import com.pinterest.secor.common.SecorConfig;

import com.google.common.util.concurrent.AtomicDouble;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.jmx.JmxConfig;
import io.micrometer.jmx.JmxMeterRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.micrometer.statsd.StatsdConfig;
import io.micrometer.statsd.StatsdMeterRegistry;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* MicorMeter meters can integrate with many different metrics backend
* (StatsD/Promethus/Graphite/JMX etc, see https://micrometer.io/docs)
*/
public class MicroMeterMetricCollector implements MetricCollector {
private static final Logger LOG = LoggerFactory.getLogger(MicroMeterMetricCollector.class);

private final Map<String, Double> mGaugeCache = new HashMap<>();
private SecorConfig mConfig;

@Override
public void initialize(SecorConfig config) {
mConfig = config;

if (config.getMicroMeterCollectorStatsdEnabled()) {
MeterRegistry statsdRegistry =
new StatsdMeterRegistry(StatsdConfig.DEFAULT, Clock.SYSTEM);
Expand All @@ -49,6 +59,11 @@ public void initialize(SecorConfig config) {
MeterRegistry jmxRegistry = new JmxMeterRegistry(JmxConfig.DEFAULT, Clock.SYSTEM);
Metrics.addRegistry(jmxRegistry);
}

if (config.getMicroMeterCollectorPrometheusEnabled()) {
MeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
Metrics.addRegistry(prometheusRegistry);
}
}

@Override
Expand All @@ -63,13 +78,21 @@ public void increment(String label, int delta, String topic) {

@Override
public void metric(String label, double value, String topic) {
Metrics.gauge(label, Collections.singletonList(
Tag.of("topic", topic)), new AtomicDouble(0)).set(value);
gauge(label, value, topic);
}

@Override
public void gauge(String label, double value, String topic) {
String key = label + "_" + topic;
if (mGaugeCache.size() <= mConfig.getMicroMeterCacheSize()) {
mGaugeCache.put(key, value);
} else {
LOG.error("Gauge cache size reached maximum, this may result in inaccurate metrics, "
+ "you can increase cache size by changing "
+ "\"secor.monitoring.metrics.collector.micrometer.cache.size\" property.");
}
Metrics.gauge(label, Collections.singletonList(
Tag.of("topic", topic)), new AtomicDouble(0)).set(value);
Tag.of("topic", topic)), mGaugeCache, g -> g.get(key));
}

}
41 changes: 41 additions & 0 deletions src/test/java/com/pinterest/secor/monitoring/PrometheusTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.pinterest.secor.monitoring;

import com.pinterest.secor.common.SecorConfig;
import com.pinterest.secor.common.monitoring.PrometheusHandler;
import com.sun.net.httpserver.HttpExchange;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.junit.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;

public class PrometheusTest {

@Test
public void testPrometheusIntegration() throws IOException {
PropertiesConfiguration properties = new PropertiesConfiguration();
properties.addProperty("secor.monitoring.metrics.collector.micrometer.prometheus.enabled", true);
SecorConfig config = new SecorConfig(properties);
MetricCollector collector = new MicroMeterMetricCollector();
collector.initialize(config);

final List<String> responses = new ArrayList<>();
PrometheusHandler handler = new PrometheusHandler() {
@Override
public void render(String body, HttpExchange exchange, int code) {
responses.add(body);
}
};
HttpExchange exchange = mock(HttpExchange.class);

collector.gauge("test", 1, "topic");

handler.handle(exchange);
assertTrue(responses.get(0).contains("test{topic=\"topic\",} 1.0"));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
//import com.google.common.collect.Maps;
//import com.pinterest.secor.common.LegacyKafkaClient;
//import com.pinterest.secor.common.OffsetTracker;
//import com.pinterest.secor.common.OstrichAdminService;
//import com.pinterest.secor.common.monitoring.OstrichAdminService;
//import com.pinterest.secor.common.SecorConfig;
//import com.pinterest.secor.common.TopicPartition;
//import com.pinterest.secor.consumer.Consumer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
//import com.google.common.collect.Maps;
//import com.pinterest.secor.common.LegacyKafkaClient;
//import com.pinterest.secor.common.OffsetTracker;
//import com.pinterest.secor.common.OstrichAdminService;
//import com.pinterest.secor.common.monitoring.OstrichAdminService;
//import com.pinterest.secor.common.SecorConfig;
//import com.pinterest.secor.common.TopicPartition;
//import com.pinterest.secor.consumer.Consumer;
Expand Down

0 comments on commit 1113b4c

Please sign in to comment.