Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#796][0.7] bug: Fix the issues of MetricReporter #821

Merged
merged 4 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Map;

import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.util.RssUtils;

public class RssBaseConf extends RssConf {

Expand Down Expand Up @@ -212,21 +213,17 @@ public class RssBaseConf extends RssConf {
.defaultValue(5L)
.withDescription("Reconfigure check interval.");

public boolean loadCommonConf(Map<String, String> properties) {
public boolean loadConfFromFile(String fileName, List<ConfigOption<Object>> configOptions) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);
if (properties == null) {
return false;
}
return loadCommonConf(properties) && loadConf(properties, configOptions, true);
}

public boolean loadCommonConf(Map<String, String> properties) {
List<ConfigOption<Object>> configOptions = ConfigUtils.getAllConfigOptions(RssBaseConf.class);
properties.forEach((k, v) -> {
configOptions.forEach(config -> {
if (config.key().equalsIgnoreCase(k)) {
set(config, ConfigUtils.convertValue(v, config.getClazz()));
}
});
});

return true;
return loadConf(properties, configOptions, false);
}

}
32 changes: 32 additions & 0 deletions common/src/main/java/org/apache/uniffle/common/config/RssConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.uniffle.common.config;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.google.common.collect.Sets;

Expand Down Expand Up @@ -600,6 +602,36 @@ private Optional<Object> getRawValueFromOption(ConfigOption<?> configOption) {
return getRawValue(configOption.key());
}

/**
* loadConf
* @param properties all config items in configration file
* @param configOptions the config items defined in base config class
* @param includeMissingKey if include the keys which not defined in base config class
* @return true if load successfully, otherwise false
*/
public boolean loadConf(
Map<String, String> properties,
List<ConfigOption<Object>> configOptions,
boolean includeMissingKey) {
if (properties == null || configOptions == null) {
return false;
}
Map<String, ConfigOption<Object>> configOptionMap =
configOptions.stream().collect(Collectors.toMap(c -> c.key().toLowerCase(), c -> c));
properties.forEach((k, v) -> {
ConfigOption<Object> config = configOptionMap.get(k.toLowerCase());
if (config == null) {
// if the key is not defined in configOptions, set it as a string value
if (includeMissingKey) {
setString(k, v);
}
} else {
set(config, ConfigUtils.convertValue(v, config.getClazz()));
}
});
return true;
}

@Override
public int hashCode() {
int hash = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public static MetricReporter getMetricReporter(RssConf conf, String instanceId)
}
Class<?> klass = Class.forName(name);
Constructor<?> constructor;
constructor = klass.getConstructor(conf.getClass(), instanceId.getClass());
constructor = klass.getConstructor(RssConf.class, String.class);
return (AbstractMetricReporter) constructor.newInstance(conf, instanceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void start() {
scheduledExecutorService.scheduleWithFixedDelay(() -> {
for (CollectorRegistry registry : registryList) {
try {
pushGateway.push(registry, jobName, groupingKey);
pushGateway.pushAdd(registry, jobName, groupingKey);
} catch (Throwable e) {
LOG.error("Failed to send metrics to push gateway.", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.common.metrics;

import org.junit.jupiter.api.Test;

import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.metrics.prometheus.PrometheusPushGatewayMetricReporter;

import static org.junit.jupiter.api.Assertions.assertTrue;

public class MetricReporterFactoryTest {

@Test
public void testGetMetricReporter() throws Exception {
CustomRssConf conf = new CustomRssConf();
conf.set(RssBaseConf.RSS_METRICS_REPORTER_CLASS,
PrometheusPushGatewayMetricReporter.class.getCanonicalName());
MetricReporter metricReporter = MetricReporterFactory.getMetricReporter(conf, "1");
assertTrue(metricReporter instanceof PrometheusPushGatewayMetricReporter);
}

class CustomRssConf extends RssConf {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
package org.apache.uniffle.coordinator;

import java.util.List;
import java.util.Map;

import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;

Expand Down Expand Up @@ -209,22 +207,6 @@ public CoordinatorConf(String fileName) {
}

public boolean loadConfFromFile(String fileName) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);

if (properties == null) {
return false;
}

loadCommonConf(properties);

List<ConfigOption<Object>> configOptions = ConfigUtils.getAllConfigOptions(CoordinatorConf.class);
properties.forEach((k, v) -> {
configOptions.forEach(config -> {
if (config.key().equalsIgnoreCase(k)) {
set(config, ConfigUtils.convertValue(v, config.getClazz()));
}
});
});
return true;
return loadConfFromFile(fileName, ConfigUtils.getAllConfigOptions(CoordinatorConf.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public void start() throws Exception {
startReconfigureThread();
jettyServer.start();
server.start();
if (metricReporter != null) {
metricReporter.start();
}

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public void test() {
assertEquals(256, conf.getInteger(CoordinatorConf.JETTY_CORE_POOL_SIZE));
assertEquals(60 * 1000, conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL));

// test custom keys defined in plugins
assertEquals("v1", conf.getString("plugin.custom.key", null));
}

}
1 change: 1 addition & 0 deletions coordinator/src/test/resources/coordinator.conf
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ rss.coordinator.access.candidates.updateIntervalSec 1
rss.coordinator.access.loadChecker.serverNum.threshold 2
rss.coordinator.access.loadChecker.memory.percentage 20.0
rss.coordinator.dynamicClientConf.updateIntervalSec 1
plugin.custom.key v1
1 change: 1 addition & 0 deletions docs/coordinator_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, whi

|Property Name|Default| Description |
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The class of metrics reporter.|
|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL including scheme, host name, and port. |
|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). |
|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. |
Expand Down
1 change: 1 addition & 0 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, whi

|Property Name|Default| Description |
|---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.<br/>prometheus.PrometheusPushGatewayMetricReporter|The class of metrics reporter.|
|rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL including scheme, host name, and port. |
|rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). |
|rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ public void start() throws Exception {
registerHeartBeat.startHeartBeat();
jettyServer.start();
server.start();
if (metricReporter != null) {
metricReporter.start();
}

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
package org.apache.uniffle.server;

import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;

import org.apache.uniffle.common.config.ConfigOption;
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.util.RssUtils;

public class ShuffleServerConf extends RssBaseConf {

Expand Down Expand Up @@ -380,28 +378,6 @@ public Configuration getHadoopConf() {
}

public boolean loadConfFromFile(String fileName) {
Map<String, String> properties = RssUtils.getPropertiesFromFile(fileName);

if (properties == null) {
return false;
}

loadCommonConf(properties);

List<ConfigOption<Object>> configOptions = ConfigUtils.getAllConfigOptions(ShuffleServerConf.class);

properties.forEach((k, v) -> {
configOptions.forEach(config -> {
if (config.key().equalsIgnoreCase(k)) {
set(config, ConfigUtils.convertValue(v, config.getClazz()));
}
});

if (k.startsWith(PREFIX_HADOOP_CONF)) {
setString(k, v);
}
});

return true;
return loadConfFromFile(fileName, ConfigUtils.getAllConfigOptions(ShuffleServerConf.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ public void confTest() {
assertFalse(shuffleServerConf.loadConfFromFile("/var/tmp/null"));
assertEquals(2, shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY));
assertEquals("value1", shuffleServerConf.getString("rss.server.hadoop.a.b", ""));
assertEquals("", shuffleServerConf.getString("rss.server.had.a.b", ""));
assertEquals("GRPC", shuffleServerConf.getString(ShuffleServerConf.RPC_SERVER_TYPE));
assertEquals("value2", shuffleServerConf.getString("rss.server.had.a.b", ""));
assertEquals("GRPC", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE));
assertEquals("v1", shuffleServerConf.getString("plugin.custom.key", null));
}

@Test
Expand Down
1 change: 1 addition & 0 deletions server/src/test/resources/confTest.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ rss.server.hadoop.a.b value1
rss.server.had.a.b value2
rss.server.multistorage.enable true
rss.server.cluster.hadoop.clustere1.
plugin.custom.key v1