Skip to content

Commit

Permalink
[#796][0.7] bug: Fix the issues of MetricReporter (#821)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
1.Support custom config keys defined in plugins
2.Refactor the logic for load config file
3.Fix some issues of metricReporter.
### Why are the changes needed?
Metric reporter is unusable.
Fix: #796

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
UT and Manual testing
  • Loading branch information
xianjingfeng authored Apr 13, 2023
1 parent 8cc4a67 commit bcb591e
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;

import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.util.RssUtils;

public class RssBaseConf extends RssConf {

Expand Down Expand Up @@ -212,21 +213,17 @@ public class RssBaseConf extends RssConf {
.defaultValue(5L)
.withDescription("Reconfigure check interval.");

public boolean loadCommonConf(Map<String, String> properties) {
public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>> configOptions) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
if (properties == null) {
return false;
}
return loadCommonConf(properties) && loadConf(properties, configOptions, true);
}

public boolean loadCommonConf(Map<String, String> properties) {
List<ConfigOption<Object>> configOptions = ConfigUtils.getAllConfigOptions(RssBaseConf.class);
properties.forEach((k, v) -> {
configOptions.forEach(config -> {
if (config.key().equalsIgnoreCase(k)) {
set(config, ConfigUtils.convertValue(v, config.getClazz()));
}
});
});

return true;
return loadConf(properties, configOptions, false);
}

}
32 changes: 32 additions & 0 deletions common/src/main/java/org/apache/uniffle/common/config/RssConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.uniffle.common.config;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.google.common.collect.Sets;

Expand Down Expand Up @@ -600,6 +602,36 @@ private Optional<Object> getRawValueFromOption(ConfigOption<?> configOption) {
return getRawValue(configOption.key());
}

/**
* loadConf
* @param properties all config items in configration file
* @param configOptions the config items defined in base config class
* @param includeMissingKey if include the keys which not defined in base config class
* @return true if load successfully, otherwise false
*/
public boolean loadConf(
Map<String, String> properties,
List<ConfigOption<Object>> configOptions,
boolean includeMissingKey) {
if (properties == null || configOptions == null) {
return false;
}
Map<String, ConfigOption<Object>> configOptionMap =
configOptions.stream().collect(Collectors.toMap(c -> c.key().toLowerCase(), c -> c));
properties.forEach((k, v) -> {
ConfigOption<Object> config = configOptionMap.get(k.toLowerCase());
if (config == null) {
// if the key is not defined in configOptions, set it as a string value
if (includeMissingKey) {
setString(k, v);
}
} else {
set(config, ConfigUtils.convertValue(v, config.getClazz()));
}
});
return true;
}

@Override
public int hashCode() {
int hash = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static MetricReporter getMetricReporter(RssConf conf, String instanceId)
}
Class<?> klass = Class.forName(name);
Constructor<?> constructor;
constructor = klass.getConstructor(conf.getClass(), instanceId.getClass());
constructor = klass.getConstructor(RssConf.class, String.class);
return (AbstractMetricReporter) constructor.newInstance(conf, instanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void start() {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
for (CollectorRegistry registry : registryList) {
try {
pushGateway.push(registry, jobName, groupingKey);
pushGateway.pushAdd(registry, jobName, groupingKey);
} catch (Throwable e) {
LOG.error("Failed to send metrics to push gateway.", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.metrics;

import org.junit.jupiter.api.Test;

import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.prometheus.PrometheusPushGatewayMetricReporter;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class MetricReporterFactoryTest {

@Test
public void testGetMetricReporter() throws Exception {
CustomRssConf conf = new CustomRssConf();
conf.set(RssBaseConf.RSS_METRICS_REPORTER_CLASS,
PrometheusPushGatewayMetricReporter.class.getCanonicalName());
MetricReporter metricReporter = MetricReporterFactory.getMetricReporter(conf, "1");
assertTrue(metricReporter instanceof PrometheusPushGatewayMetricReporter);
}

class CustomRssConf extends RssConf {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
package org.apache.uniffle.coordinator;

import java.util.List;
import java.util.Map;

import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;

Expand Down Expand Up @@ -209,22 +207,6 @@ public CoordinatorConf(String fileName) {
}

public boolean loadConfFromFile(String fileName) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);

if (properties == null) {
return false;
}

loadCommonConf(properties);

List<ConfigOption<Object>> configOptions = ConfigUtils.getAllConfigOptions(CoordinatorConf.class);
properties.forEach((k, v) -> {
configOptions.forEach(config -> {
if (config.key().equalsIgnoreCase(k)) {
set(config, ConfigUtils.convertValue(v, config.getClazz()));
}
});
});
return true;
return loadConfFromFile(fileName, ConfigUtils.getAllConfigOptions(CoordinatorConf.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public void start() throws Exception {
startReconfigureThread();
jettyServer.start();
server.start();
if (metricReporter != null) {
metricReporter.start();
}

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public void test() {
assertEquals(256, conf.getInteger(CoordinatorConf.JETTY_CORE_POOL_SIZE));
assertEquals(60 * 1000, conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL));

// test custom keys defined in plugins
assertEquals("v1", conf.getString("plugin.custom.key", null));
}

}
1 change: 1 addition & 0 deletions coordinator/src/test/resources/coordinator.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ rss.coordinator.access.candidates.updateIntervalSec 1
rss.coordinator.access.loadChecker.serverNum.threshold 2
rss.coordinator.access.loadChecker.memory.percentage 20.0
rss.coordinator.dynamicClientConf.updateIntervalSec 1
plugin.custom.key v1
1 change: 1 addition & 0 deletions docs/coordinator_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, whi

|Property Name|Default| Description |
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The class of metrics reporter.|
|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL including scheme, host name, and port. |
|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). |
|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. |
Expand Down
1 change: 1 addition & 0 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, whi

|Property Name|Default| Description |
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The class of metrics reporter.|
|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL including scheme, host name, and port. |
|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). |
|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ public void start() throws Exception {
registerHeartBeat.startHeartBeat();
jettyServer.start();
server.start();
if (metricReporter != null) {
metricReporter.start();
}

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
package org.apache.uniffle.server;

import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.RssUtils;

public class ShuffleServerConf extends RssBaseConf {

Expand Down Expand Up @@ -380,28 +378,6 @@ public Configuration getHadoopConf() {
}

public boolean loadConfFromFile(String fileName) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);

if (properties == null) {
return false;
}

loadCommonConf(properties);

List<ConfigOption<Object>> configOptions = ConfigUtils.getAllConfigOptions(ShuffleServerConf.class);

properties.forEach((k, v) -> {
configOptions.forEach(config -> {
if (config.key().equalsIgnoreCase(k)) {
set(config, ConfigUtils.convertValue(v, config.getClazz()));
}
});

if (k.startsWith(PREFIX_HADOOP_CONF)) {
setString(k, v);
}
});

return true;
return loadConfFromFile(fileName, ConfigUtils.getAllConfigOptions(ShuffleServerConf.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ public void confTest() {
assertFalse(shuffleServerConf.loadConfFromFile("/var/tmp/null"));
assertEquals(2, shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
assertEquals("value1", shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
assertEquals("", shuffleServerConf.getString("rss.server.had.a.b", ""));
assertEquals("GRPC", shuffleServerConf.getString(ShuffleServerConf.RPC_SERVER_TYPE));
assertEquals("value2", shuffleServerConf.getString("rss.server.had.a.b", ""));
assertEquals("GRPC", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE));
assertEquals("v1", shuffleServerConf.getString("plugin.custom.key", null));
}

@Test
Expand Down
1 change: 1 addition & 0 deletions server/src/test/resources/confTest.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ rss.server.hadoop.a.b value1
rss.server.had.a.b value2
rss.server.multistorage.enable true
rss.server.cluster.hadoop.clustere1.
plugin.custom.key v1

0 comments on commit bcb591e

Please sign in to comment.