diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java index 761e5bf389..ae6bce0d87 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.uniffle.common.ClientType; +import org.apache.uniffle.common.util.RssUtils; public class RssBaseConf extends RssConf { @@ -212,21 +213,17 @@ public class RssBaseConf extends RssConf { .defaultValue(5L) .withDescription("Reconfigure check interval."); - public boolean loadCommonConf(Map properties) { + public boolean loadConfFromFile(String fileName, List> configOptions) { + Map properties = RssUtils.getPropertiesFromFile(fileName); if (properties == null) { return false; } + return loadCommonConf(properties) && loadConf(properties, configOptions, true); + } + public boolean loadCommonConf(Map properties) { List> configOptions = ConfigUtils.getAllConfigOptions(RssBaseConf.class); - properties.forEach((k, v) -> { - configOptions.forEach(config -> { - if (config.key().equalsIgnoreCase(k)) { - set(config, ConfigUtils.convertValue(v, config.getClazz())); - } - }); - }); - - return true; + return loadConf(properties, configOptions, false); } } diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java index f7f832a12c..1f9da66669 100644 --- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java +++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java @@ -18,10 +18,12 @@ package org.apache.uniffle.common.config; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import com.google.common.collect.Sets; @@ -600,6 +602,36 @@ private Optional getRawValueFromOption(ConfigOption configOption) { return getRawValue(configOption.key()); } + /** + * loadConf + * @param properties all config items in configration file + * @param configOptions the config items defined in base config class + * @param includeMissingKey if include the keys which not defined in base config class + * @return true if load successfully, otherwise false + */ + public boolean loadConf( + Map properties, + List> configOptions, + boolean includeMissingKey) { + if (properties == null || configOptions == null) { + return false; + } + Map> configOptionMap = + configOptions.stream().collect(Collectors.toMap(c -> c.key().toLowerCase(), c -> c)); + properties.forEach((k, v) -> { + ConfigOption config = configOptionMap.get(k.toLowerCase()); + if (config == null) { + // if the key is not defined in configOptions, set it as a string value + if (includeMissingKey) { + setString(k, v); + } + } else { + set(config, ConfigUtils.convertValue(v, config.getClazz())); + } + }); + return true; + } + @Override public int hashCode() { int hash = 0; diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java index d66576dfca..d25a9527e8 100644 --- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java +++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricReporterFactory.java @@ -33,7 +33,7 @@ public static MetricReporter getMetricReporter(RssConf conf, String instanceId) } Class klass = Class.forName(name); Constructor constructor; - constructor = klass.getConstructor(conf.getClass(), instanceId.getClass()); + constructor = klass.getConstructor(RssConf.class, String.class); return (AbstractMetricReporter) constructor.newInstance(conf, instanceId); } } diff --git a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java index 8795d5b654..1b55c88640 100644 --- a/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java +++ b/common/src/main/java/org/apache/uniffle/common/metrics/prometheus/PrometheusPushGatewayMetricReporter.java @@ -68,7 +68,7 @@ public void start() { scheduledExecutorService.scheduleWithFixedDelay(() -> { for (CollectorRegistry registry : registryList) { try { - pushGateway.push(registry, jobName, groupingKey); + pushGateway.pushAdd(registry, jobName, groupingKey); } catch (Throwable e) { LOG.error("Failed to send metrics to push gateway.", e); } diff --git a/common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java b/common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java new file mode 100644 index 0000000000..e3071a3786 --- /dev/null +++ b/common/src/test/java/org/apache/uniffle/common/metrics/MetricReporterFactoryTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.uniffle.common.metrics; + +import org.junit.jupiter.api.Test; + +import org.apache.uniffle.common.config.RssBaseConf; +import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.metrics.prometheus.PrometheusPushGatewayMetricReporter; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class MetricReporterFactoryTest { + + @Test + public void testGetMetricReporter() throws Exception { + CustomRssConf conf = new CustomRssConf(); + conf.set(RssBaseConf.RSS_METRICS_REPORTER_CLASS, + PrometheusPushGatewayMetricReporter.class.getCanonicalName()); + MetricReporter metricReporter = MetricReporterFactory.getMetricReporter(conf, "1"); + assertTrue(metricReporter instanceof PrometheusPushGatewayMetricReporter); + } + + class CustomRssConf extends RssConf { + + } +} diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index 6f2b94137e..424d7d1ee5 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -18,13 +18,11 @@ package org.apache.uniffle.coordinator; import java.util.List; -import java.util.Map; import org.apache.uniffle.common.config.ConfigOption; import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.ConfigUtils; import org.apache.uniffle.common.config.RssBaseConf; -import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy; import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory; @@ -209,22 +207,6 @@ public CoordinatorConf(String fileName) { } public boolean loadConfFromFile(String fileName) { - Map properties = RssUtils.getPropertiesFromFile(fileName); - - if (properties == null) { - return false; - } - - loadCommonConf(properties); - - List> configOptions = ConfigUtils.getAllConfigOptions(CoordinatorConf.class); - properties.forEach((k, v) -> { - configOptions.forEach(config -> { - if (config.key().equalsIgnoreCase(k)) { - set(config, ConfigUtils.convertValue(v, config.getClazz())); - } - }); - }); - return true; + return loadConfFromFile(fileName, ConfigUtils.getAllConfigOptions(CoordinatorConf.class)); } } diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index a016df6a8d..fa648983e4 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -101,6 +101,9 @@ public void start() throws Exception { startReconfigureThread(); jettyServer.start(); server.start(); + if (metricReporter != null) { + metricReporter.start(); + } Runtime.getRuntime().addShutdownHook(new Thread() { @Override diff --git a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java index 3d06f79f94..0ece6b827c 100644 --- a/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java +++ b/coordinator/src/test/java/org/apache/uniffle/coordinator/CoordinatorConfTest.java @@ -46,6 +46,8 @@ public void test() { assertEquals(256, conf.getInteger(CoordinatorConf.JETTY_CORE_POOL_SIZE)); assertEquals(60 * 1000, conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL)); + // test custom keys defined in plugins + assertEquals("v1", conf.getString("plugin.custom.key", null)); } } diff --git a/coordinator/src/test/resources/coordinator.conf b/coordinator/src/test/resources/coordinator.conf index a29bfdfbc4..82e6889c39 100644 --- a/coordinator/src/test/resources/coordinator.conf +++ b/coordinator/src/test/resources/coordinator.conf @@ -28,3 +28,4 @@ rss.coordinator.access.candidates.updateIntervalSec 1 rss.coordinator.access.loadChecker.serverNum.threshold 2 rss.coordinator.access.loadChecker.memory.percentage 20.0 rss.coordinator.dynamicClientConf.updateIntervalSec 1 +plugin.custom.key v1 diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md index d53a5b37c4..dc7f19751d 100644 --- a/docs/coordinator_guide.md +++ b/docs/coordinator_guide.md @@ -126,6 +126,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, whi |Property Name|Default| Description | |---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.
prometheus.PrometheusPushGatewayMetricReporter|The class of metrics reporter.| |rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL including scheme, host name, and port. | |rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). | |rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. | diff --git a/docs/server_guide.md b/docs/server_guide.md index e3f9a99c24..11f6782de8 100644 --- a/docs/server_guide.md +++ b/docs/server_guide.md @@ -98,6 +98,7 @@ PrometheusPushGatewayMetricReporter is one of the built-in metrics reporter, whi |Property Name|Default| Description | |---|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +|rss.metrics.reporter.class|org.apache.uniffle.common.metrics.
prometheus.PrometheusPushGatewayMetricReporter|The class of metrics reporter.| |rss.metrics.prometheus.pushgateway.addr|-| The PushGateway server host URL including scheme, host name, and port. | |rss.metrics.prometheus.pushgateway.groupingkey|-| Specifies the grouping key which is the group and global labels of all metrics. The label name and value are separated by '=', and labels are separated by ';', e.g., k1=v1;k2=v2. Please ensure that your grouping key meets the [Prometheus requirements](https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels). | |rss.metrics.prometheus.pushgateway.jobname|-| The job name under which metrics will be pushed. | diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java index 796ba3058f..e1058dd8bf 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java @@ -110,6 +110,9 @@ public void start() throws Exception { registerHeartBeat.startHeartBeat(); jettyServer.start(); server.start(); + if (metricReporter != null) { + metricReporter.start(); + } Runtime.getRuntime().addShutdownHook(new Thread() { @Override diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index 5b18107551..fc2f80244b 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -18,7 +18,6 @@ package org.apache.uniffle.server; import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -26,7 +25,6 @@ import org.apache.uniffle.common.config.ConfigOptions; import org.apache.uniffle.common.config.ConfigUtils; import org.apache.uniffle.common.config.RssBaseConf; -import org.apache.uniffle.common.util.RssUtils; public class ShuffleServerConf extends RssBaseConf { @@ -380,28 +378,6 @@ public Configuration getHadoopConf() { } public boolean loadConfFromFile(String fileName) { - Map properties = RssUtils.getPropertiesFromFile(fileName); - - if (properties == null) { - return false; - } - - loadCommonConf(properties); - - List> configOptions = ConfigUtils.getAllConfigOptions(ShuffleServerConf.class); - - properties.forEach((k, v) -> { - configOptions.forEach(config -> { - if (config.key().equalsIgnoreCase(k)) { - set(config, ConfigUtils.convertValue(v, config.getClazz())); - } - }); - - if (k.startsWith(PREFIX_HADOOP_CONF)) { - setString(k, v); - } - }); - - return true; + return loadConfFromFile(fileName, ConfigUtils.getAllConfigOptions(ShuffleServerConf.class)); } } diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java index f70ca342b1..07f180a5cb 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerConfTest.java @@ -61,8 +61,9 @@ public void confTest() { assertFalse(shuffleServerConf.loadConfFromFile("/var/tmp/null")); assertEquals(2, shuffleServerConf.getLong(ShuffleServerConf.SERVER_BUFFER_CAPACITY)); assertEquals("value1", shuffleServerConf.getString("rss.server.hadoop.a.b", "")); - assertEquals("", shuffleServerConf.getString("rss.server.had.a.b", "")); - assertEquals("GRPC", shuffleServerConf.getString(ShuffleServerConf.RPC_SERVER_TYPE)); + assertEquals("value2", shuffleServerConf.getString("rss.server.had.a.b", "")); + assertEquals("GRPC", shuffleServerConf.get(ShuffleServerConf.RPC_SERVER_TYPE)); + assertEquals("v1", shuffleServerConf.getString("plugin.custom.key", null)); } @Test diff --git a/server/src/test/resources/confTest.conf b/server/src/test/resources/confTest.conf index 69170b4fbc..6007fb9041 100644 --- a/server/src/test/resources/confTest.conf +++ b/server/src/test/resources/confTest.conf @@ -25,3 +25,4 @@ rss.server.hadoop.a.b value1 rss.server.had.a.b value2 rss.server.multistorage.enable true rss.server.cluster.hadoop.clustere1. +plugin.custom.key v1