Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #1341: Support Micrometer metrics facade to integrate with more… #1342

Merged
merged 1 commit into from
May 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ Secor is a service persisting [Kafka] logs to [Amazon S3], [Google Cloud Storage
- **horizontal scalability**: scaling the system out to handle more load is as easy as starting extra Secor processes. Reducing the resource footprint can be achieved by killing any of the running Secor processes. Neither ramping up nor down has any impact on data consistency,
- **output partitioning**: Secor parses incoming messages and puts them under partitioned s3 paths to enable direct import into systems like [Hive]. day,hour,minute level partitions are supported by secor
- **configurable upload policies**: commit points controlling when data is persisted in S3 are configured through size-based and time-based policies (e.g., upload data when local buffer reaches size of 100MB and at least once per hour),
- **monitoring**: metrics tracking various performance properties are exposed through [Ostrich] and optionally exported to [OpenTSDB] / [statsD],
- **monitoring**: metrics tracking various performance properties are exposed through [Ostrich], [Micrometer] and optionally exported to [OpenTSDB] / [statsD],
- **customizability**: external log message parser may be loaded by updating the configuration,
- **event transformation**: external message level transformation can be done by using customized class.
- **Qubole interface**: Secor connects to [Qubole] to add finalized output partitions to Hive tables.
Expand Down Expand Up @@ -89,6 +89,7 @@ If you have any questions or comments, you can reach us at [secor-users@googlegr
[eventual consistency]:http://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyMode
[Hive]:http://hive.apache.org/
[Ostrich]: https://github.com/twitter/ostrich
[Micrometer]: https://micrometer.io
[OpenTSDB]: http://opentsdb.net/
[Qubole]: http://www.qubole.com/
[statsD]: https://github.com/etsy/statsd/
Expand Down
10 changes: 10 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,16 @@
<artifactId>orc-core</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-jmx</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-statsd</artifactId>
<version>1.5.1</version>
</dependency>
</dependencies>

<build>
Expand Down
4 changes: 4 additions & 0 deletions src/main/config/secor.common.properties
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,10 @@ secor.file.age.youngest=true
# Class that manages metric collection.
# Sending metrics to Ostrich is the default implementation.
secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.OstrichMetricCollector
# You can also hook up with Micrometer metrics facade to connect to with multiple metrics backend (JMX/StatsD/Graphite etc)
# secor.monitoring.metrics.collector.class=com.pinterest.secor.monitoring.MicroMeterMetricCollector
# secor.monitoring.metrics.collector.micrometer.jmx.enabled=true
# secor.monitoring.metrics.collector.micrometer.statsd.enabled=true

# Row group size in bytes for Parquet writers. Specifies how much data will be buffered in memory before flushing a
# block to disk. Larger values allow for larger column chinks which makes it possible to do larger sequential IO.
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/com/pinterest/secor/common/SecorConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,15 @@ public String getThriftProtocolClass() {
public String getMetricsCollectorClass() {
return getString("secor.monitoring.metrics.collector.class");
}


public boolean getMicroMeterCollectorJmxEnabled() {
return getBoolean("secor.monitoring.metrics.collector.micrometer.jmx.enabled");
}

public boolean getMicroMeterCollectorStatsdEnabled() {
return getBoolean("secor.monitoring.metrics.collector.micrometer.statsd.enabled");
}

/**
* This method is used for fetching all the properties which start with the given prefix.
* It returns a Map of all those key-val.
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/pinterest/secor/consumer/Consumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ private void init() throws Exception {
mKafkaMessageIterator = KafkaMessageIteratorFactory.getIterator(mConfig.getKafkaMessageIteratorClass(), mConfig);
mMessageReader = new MessageReader(mConfig, mOffsetTracker, mKafkaMessageIterator);
mMetricCollector = ReflectionUtil.createMetricCollector(mConfig.getMetricsCollectorClass());
mMetricCollector.initialize(mConfig);

FileRegistry fileRegistry = new FileRegistry(mConfig);
UploadManager uploadManager = ReflectionUtil.createUploadManager(mConfig.getUploadManagerClass(), mConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@
*/
package com.pinterest.secor.monitoring;

import com.pinterest.secor.common.SecorConfig;

/**
* Component which may be used to post metrics.
*
* All methods should be non-blocking and do not throw exceptions.
*/
public interface MetricCollector {
/**
* Initialize the collector with SecorConfig
* @param config
*/
void initialize(SecorConfig config);

/**
* Increments the specified counter by one.
* Convenience method equivalent to {@link #increment(String, int, String)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
*/
package com.pinterest.secor.monitoring;

import com.pinterest.secor.common.SecorConfig;

import com.twitter.ostrich.stats.Stats;

public class OstrichMetricCollector implements MetricCollector {
@Override
public void initialize(SecorConfig config) {
}

@Override
public void increment(String label, String topic) {
Stats.incr(label);
Expand Down