From 715746c0762e7d70e5a1a4de829469160f484682 Mon Sep 17 00:00:00 2001 From: Henry Cai Date: Thu, 21 May 2020 23:58:45 -0700 Subject: [PATCH] Issue #1341: Support Micrometer metrics facade to integrate with more metrics backend systems Currently secor's metrics monitoring relies on Twitter's Ostrich library which is already deprecated. There are many stats/metrics collection libraries and backend systems on the market (e.g. StatsD, Promethus, Graphite, Datadog, Influx etc). A good way to support integration with those metrics collection system is using Micrometer facade (micrometer.io) which provides a common interface and prebuilt integration with most popular metrics systems. Adding the new MicrometerMetricCollector implementation and plug in through: secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.MicroMeterMetricCollector Micrometer metrics can use a CompositeMetricRegistry which supports to multiple metrics systems simultaneously. We added the following two integration points to JMX and StatsD, more (e.g. Graphite, Datadog) can be added in the similar fashion: secor.monitoring.metrics.collector.micrometer.jmx.enabled=true secor.monitoring.metrics.collector.micrometer.statsd.enabled=true --- README.md | 3 ++- pom.xml | 10 ++++++++++ src/main/config/secor.common.properties | 4 ++++ .../java/com/pinterest/secor/common/SecorConfig.java | 10 +++++++++- .../java/com/pinterest/secor/consumer/Consumer.java | 1 + .../pinterest/secor/monitoring/MetricCollector.java | 8 ++++++++ .../secor/monitoring/OstrichMetricCollector.java | 6 ++++++ 7 files changed, 40 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 60354b5c9..ae4a1cd92 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ Secor is a service persisting [Kafka] logs to [Amazon S3], [Google Cloud Storage - **horizontal scalability**: scaling the system out to handle more load is as easy as starting extra Secor processes. Reducing the resource footprint can be achieved by killing any of the running Secor processes. Neither ramping up nor down has any impact on data consistency, - **output partitioning**: Secor parses incoming messages and puts them under partitioned s3 paths to enable direct import into systems like [Hive]. day,hour,minute level partitions are supported by secor - **configurable upload policies**: commit points controlling when data is persisted in S3 are configured through size-based and time-based policies (e.g., upload data when local buffer reaches size of 100MB and at least once per hour), - - **monitoring**: metrics tracking various performance properties are exposed through [Ostrich] and optionally exported to [OpenTSDB] / [statsD], + - **monitoring**: metrics tracking various performance properties are exposed through [Ostrich], [Micrometer] and optionally exported to [OpenTSDB] / [statsD], - **customizability**: external log message parser may be loaded by updating the configuration, - **event transformation**: external message level transformation can be done by using customized class. - **Qubole interface**: Secor connects to [Qubole] to add finalized output partitions to Hive tables. @@ -89,6 +89,7 @@ If you have any questions or comments, you can reach us at [secor-users@googlegr [eventual consistency]:http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyMode [Hive]:http://hive.apache.org/ [Ostrich]: https://github.com/twitter/ostrich +[Micrometer]: https://micrometer.io [OpenTSDB]: http://opentsdb.net/ [Qubole]: http://www.qubole.com/ [statsD]: https://github.com/etsy/statsd/ diff --git a/pom.xml b/pom.xml index 611077e84..8c3e4be9e 100644 --- a/pom.xml +++ b/pom.xml @@ -372,6 +372,16 @@ orc-core 1.6.3 + + io.micrometer + micrometer-registry-jmx + 1.5.1 + + + io.micrometer + micrometer-registry-statsd + 1.5.1 + diff --git a/src/main/config/secor.common.properties b/src/main/config/secor.common.properties index e2eef8609..f392e4a72 100644 --- a/src/main/config/secor.common.properties +++ b/src/main/config/secor.common.properties @@ -492,6 +492,10 @@ secor.file.age.youngest=true # Class that manages metric collection. # Sending metrics to Ostrich is the default implementation. secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.OstrichMetricCollector +# You can also hook up with Micrometer metrics facade to connect to with multiple metrics backend (JMX/StatsD/Graphite etc) +# secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.MicroMeterMetricCollector +# secor.monitoring.metrics.collector.micrometer.jmx.enabled=true +# secor.monitoring.metrics.collector.micrometer.statsd.enabled=true # Row group size in bytes for Parquet writers. Specifies how much data will be buffered in memory before flushing a # block to disk. Larger values allow for larger column chinks which makes it possible to do larger sequential IO. diff --git a/src/main/java/com/pinterest/secor/common/SecorConfig.java b/src/main/java/com/pinterest/secor/common/SecorConfig.java index 531d72286..04d203171 100644 --- a/src/main/java/com/pinterest/secor/common/SecorConfig.java +++ b/src/main/java/com/pinterest/secor/common/SecorConfig.java @@ -756,7 +756,15 @@ public String getThriftProtocolClass() { public String getMetricsCollectorClass() { return getString("secor.monitoring.metrics.collector.class"); } - + + public boolean getMicroMeterCollectorJmxEnabled() { + return getBoolean("secor.monitoring.metrics.collector.micrometer.jmx.enabled"); + } + + public boolean getMicroMeterCollectorStatsdEnabled() { + return getBoolean("secor.monitoring.metrics.collector.micrometer.statsd.enabled"); + } + /** * This method is used for fetching all the properties which start with the given prefix. * It returns a Map of all those key-val. diff --git a/src/main/java/com/pinterest/secor/consumer/Consumer.java b/src/main/java/com/pinterest/secor/consumer/Consumer.java index 74a925227..d6731019c 100644 --- a/src/main/java/com/pinterest/secor/consumer/Consumer.java +++ b/src/main/java/com/pinterest/secor/consumer/Consumer.java @@ -93,6 +93,7 @@ private void init() throws Exception { mKafkaMessageIterator = KafkaMessageIteratorFactory.getIterator(mConfig.getKafkaMessageIteratorClass(), mConfig); mMessageReader = new MessageReader(mConfig, mOffsetTracker, mKafkaMessageIterator); mMetricCollector = ReflectionUtil.createMetricCollector(mConfig.getMetricsCollectorClass()); + mMetricCollector.initialize(mConfig); FileRegistry fileRegistry = new FileRegistry(mConfig); UploadManager uploadManager = ReflectionUtil.createUploadManager(mConfig.getUploadManagerClass(), mConfig); diff --git a/src/main/java/com/pinterest/secor/monitoring/MetricCollector.java b/src/main/java/com/pinterest/secor/monitoring/MetricCollector.java index cdcadd0ac..dd3e361f9 100644 --- a/src/main/java/com/pinterest/secor/monitoring/MetricCollector.java +++ b/src/main/java/com/pinterest/secor/monitoring/MetricCollector.java @@ -18,12 +18,20 @@ */ package com.pinterest.secor.monitoring; +import com.pinterest.secor.common.SecorConfig; + /** * Component which may be used to post metrics. * * All methods should be non-blocking and do not throw exceptions. */ public interface MetricCollector { + /** + * Initialize the collector with SecorConfig + * @param config + */ + void initialize(SecorConfig config); + /** * Increments the specified counter by one. * Convenience method equivalent to {@link #increment(String, int, String)}. diff --git a/src/main/java/com/pinterest/secor/monitoring/OstrichMetricCollector.java b/src/main/java/com/pinterest/secor/monitoring/OstrichMetricCollector.java index 059c0611f..b34fa71ce 100644 --- a/src/main/java/com/pinterest/secor/monitoring/OstrichMetricCollector.java +++ b/src/main/java/com/pinterest/secor/monitoring/OstrichMetricCollector.java @@ -18,9 +18,15 @@ */ package com.pinterest.secor.monitoring; +import com.pinterest.secor.common.SecorConfig; + import com.twitter.ostrich.stats.Stats; public class OstrichMetricCollector implements MetricCollector { + @Override + public void initialize(SecorConfig config) { + } + @Override public void increment(String label, String topic) { Stats.incr(label);