Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-13476][*] Improve DS load hdfs configuration automatically by using environment variable HADOOP_CONF_DIR #13478

Closed
wants to merge 8 commits into from
12 changes: 2 additions & 10 deletions docs/docs/en/architecture/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,23 +339,15 @@ The default configuration is as follows:

### dolphinscheduler_env.sh [load environment variables configs]

When using shell to commit tasks, DolphinScheduler will export environment variables from `bin/env/dolphinscheduler_env.sh`. The
mainly configuration including `JAVA_HOME` and other environment paths.
Configure the running environment for the DS. The mainly configuration including `JAVA_HOME`,`HADOOP_HOME` and `HADOOP_CONF_DIR`.

```bash
# JAVA_HOME, will use it to start DolphinScheduler server
export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}

# Tasks related configurations, need to change the configuration if you use the related tasks.
#DS will auto load HDFS configuration from environment
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME=${SPARK_HOME:-/opt/soft/spark}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}

export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH

# applicationId auto collection related configuration, the following configurations are unnecessary if setting appId.collect=log
export HADOOP_CLASSPATH=`hadoop classpath`:${DOLPHINSCHEDULER_HOME}/tools/libs/*
Expand Down
11 changes: 2 additions & 9 deletions docs/docs/zh/architecture/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,22 +332,15 @@ common.properties配置文件目前主要是配置hadoop/s3/yarn/applicationId

## dolphinscheduler_env.sh [环境变量配置]

通过类似shell方式提交任务的的时候,会加载该配置文件中的环境变量到主机中。涉及到的 `JAVA_HOME` 任务类型的环境配置,其中任务类型主要有: Shell任务、Python任务、Spark任务、Flink任务、Datax任务等等。
配置DS的运行环境. 主要的环境变量配置包括 `JAVA_HOME`,`HADOOP_HOME`,`HADOOP_CONF_DIR`

```bash
# JAVA_HOME, will use it to start DolphinScheduler server
export JAVA_HOME=${JAVA_HOME:-/opt/soft/java}

# Tasks related configurations, need to change the configuration if you use the related tasks.
# DS will auto load HDFS configuration from environment
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}
export SPARK_HOME=${SPARK_HOME:-/opt/soft/spark}
export PYTHON_HOME=${PYTHON_HOME:-/opt/soft/python}
export HIVE_HOME=${HIVE_HOME:-/opt/soft/hive}
export FLINK_HOME=${FLINK_HOME:-/opt/soft/flink}
export DATAX_HOME=${DATAX_HOME:-/opt/soft/datax}

export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$PYTHON_HOME/bin:$JAVA_HOME/bin:$HIVE_HOME/bin:$FLINK_HOME/bin:$DATAX_HOME/bin:$PATH

# applicationId auto collection related configuration, the following configurations are unnecessary if setting appId.collect=log
export HADOOP_CLASSPATH=`hadoop classpath`:${DOLPHINSCHEDULER_HOME}/tools/libs/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -79,6 +80,10 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {
private static final Logger logger = LoggerFactory.getLogger(HdfsStorageOperator.class);
private static HdfsStorageProperties hdfsProperties;
private static final String HADOOP_UTILS_KEY = "HADOOP_UTILS_KEY";
public static final String HADOOP_HOME = "HADOOP_HOME";
public static final String HADOOP_CONF_DIR = "HADOOP_CONF_DIR";
public static final String HDFS_SITE_XML = "hdfs-site.xml";
public static final String CORE_SITE_XML = "core-site.xml";

private static final LoadingCache<String, HdfsStorageOperator> cache = CacheBuilder
.newBuilder()
Expand All @@ -87,7 +92,7 @@ public class HdfsStorageOperator implements Closeable, StorageOperate {

@Override
public HdfsStorageOperator load(String key) throws Exception {
return new HdfsStorageOperator(hdfsProperties);
return new HdfsStorageOperator();
}
});

Expand Down Expand Up @@ -132,46 +137,81 @@ private void initHdfsPath() {
private void init() throws NullPointerException {
try {
configuration = new HdfsConfiguration();

String hdfsUser = hdfsProperties.getUser();
loadHdfsConfigurationFromEnv(configuration);
boolean isKerberosAuthen = false;
if (CommonUtils.loadKerberosConf(configuration)) {
hdfsUser = "";
isKerberosAuthen = true;
}

String defaultFS = getDefaultFS();
// first get key from core-site.xml hdfs-site.xml ,if null ,then try to get from properties file
// the default is the local file system
if (StringUtils.isNotBlank(defaultFS)) {
Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("fs.");
configuration.set(Constants.HDFS_DEFAULT_FS, defaultFS);
fsRelatedProps.forEach((key, value) -> configuration.set(key, value));
Map<String, String> fsRelatedProps = PropertyUtils.getPrefixedProperties("resource.hdfs.fs.");
fsRelatedProps.forEach((key, value) -> configuration.set(key.substring(14), value));
} else {
logger.error("property:{} can not to be empty, please set!", Constants.FS_DEFAULT_FS);
throw new NullPointerException(
String.format("property: %s can not to be empty, please set!", Constants.FS_DEFAULT_FS));
}

if (!defaultFS.startsWith("file")) {
logger.info("get property:{} -> {}, from core-site.xml hdfs-site.xml ", Constants.FS_DEFAULT_FS,
logger.info("get property:{} -> {}, will use HDFS storage", Constants.FS_DEFAULT_FS,
defaultFS);
}

if (StringUtils.isNotEmpty(hdfsUser)) {
String hdfsUser = hdfsProperties.getUser();
if (!isKerberosAuthen && StringUtils.isNotEmpty(hdfsUser)) {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser(hdfsUser);
ugi.doAs((PrivilegedExceptionAction<Boolean>) () -> {
fs = FileSystem.get(configuration);
return true;
});
} else {
logger.warn("resource.hdfs.root.user is not set value!");
fs = FileSystem.get(configuration);
if (!isKerberosAuthen) {
logger.warn("resource.hdfs.root.user is not set value!");
}
}

} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

/**
* auto load hdfs configuration from os env
* @param configuration
*/
private void loadHdfsConfigurationFromEnv(Configuration configuration) {
String hadoopConfDirEnv = System.getenv(HADOOP_CONF_DIR);
String hadoopHomeEnv = System.getenv(HADOOP_HOME);
String hadoopConfPath = getHadoopConfPath(hadoopConfDirEnv, hadoopHomeEnv);
if (StringUtils.isNotBlank(hadoopConfPath)) {
configuration.addResource(new Path(hadoopConfPath, HDFS_SITE_XML));
configuration.addResource(new Path(hadoopConfPath, CORE_SITE_XML));
}
}

private String getHadoopConfPath(String hadoopConfDirEnv, String hadoopHomeEnv) {
String hadoopConfPath = null;
if (StringUtils.isBlank(hadoopConfDirEnv) || !Files.exists(java.nio.file.Paths.get(hadoopConfDirEnv))) {
if (StringUtils.isNotBlank(hadoopHomeEnv)) {
java.nio.file.Path confPath = Paths.get(hadoopHomeEnv, "conf");
java.nio.file.Path confPath2 = Paths.get(hadoopHomeEnv, "/etc/hadoop"); // hadoop 2.2
if (Files.exists(confPath)) {
hadoopConfPath = confPath.toString();
} else if (Files.exists(confPath2)) {
hadoopConfPath = confPath2.toString();
}
}
} else {
hadoopConfPath = hadoopConfDirEnv;
}
return hadoopConfPath;
}

/**
* @return Configuration
*/
Expand All @@ -184,7 +224,10 @@ public Configuration getConfiguration() {
*/
public String getDefaultFS() {
String defaultFS = getConfiguration().get(Constants.FS_DEFAULT_FS);
if (StringUtils.isBlank(defaultFS)) {
if (StringUtils.isBlank(defaultFS) || defaultFS.equals("file:///")) {
logger.info(
"the property [{}] value from core-site.xml is [{}] . It is not the expected value, will use the config value in common.properties.",
Constants.HDFS_DEFAULT_FS, defaultFS);
defaultFS = hdfsProperties.getDefaultFS();
}
return defaultFS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,15 @@
import org.apache.dolphinscheduler.common.utils.HttpUtils;
import org.apache.dolphinscheduler.spi.enums.ResourceType;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.MockedStatic;
Expand All @@ -38,35 +46,35 @@ public class HdfsStorageOperatorTest {
private static final Logger logger = LoggerFactory.getLogger(HdfsStorageOperatorTest.class);

@Test
public void getHdfsTenantDir() {
void getHdfsTenantDir() {
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
logger.info(hdfsStorageOperator.getHdfsTenantDir("1234"));
Assertions.assertTrue(true);
}

@Test
public void getHdfsUdfFileName() {
void getHdfsUdfFileName() {
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
logger.info(hdfsStorageOperator.getHdfsUdfFileName("admin", "file_name"));
Assertions.assertTrue(true);
}

@Test
public void getHdfsResourceFileName() {
void getHdfsResourceFileName() {
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
logger.info(hdfsStorageOperator.getHdfsResourceFileName("admin", "file_name"));
Assertions.assertTrue(true);
}

@Test
public void getHdfsFileName() {
void getHdfsFileName() {
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
logger.info(hdfsStorageOperator.getHdfsFileName(ResourceType.FILE, "admin", "file_name"));
Assertions.assertTrue(true);
}

@Test
public void getAppAddress() {
void getAppAddress() {
HdfsStorageOperator hdfsStorageOperator = new HdfsStorageOperator();
try (MockedStatic<HttpUtils> mockedHttpUtils = Mockito.mockStatic(HttpUtils.class)) {
mockedHttpUtils.when(() -> HttpUtils.get("http://ds1:8088/ws/v1/cluster/info"))
Expand All @@ -76,4 +84,49 @@ public void getAppAddress() {
}
}

@DisplayName("test load Hdfs Configuration by env avaliable HADOOP_CONF_DIR, and directory exist")
@Test
void testGetHadoopConfPathFromEnvByHADOOP_CONF_DIR1() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
String hadoopConfDirEnv = System.getProperty("user.dir");
String hadoopHomeEnv = "/not_expected";
Assertions.assertEquals(hadoopConfDirEnv, invokeGetHadoopConfPath(hadoopConfDirEnv, hadoopHomeEnv));
}

@DisplayName("test load Hdfs Configuration by env avaliable HADOOP_CONF_DIR, but directory not exist")
@Test
void testGetHadoopConfPathFromEnvByHADOOP_CONF_DIR2() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
String hadoopConfDirEnv = "/not_exist";
String hadoopHomeEnv = null;
Assertions.assertNull(invokeGetHadoopConfPath(hadoopConfDirEnv, hadoopHomeEnv));
}

@DisplayName("test load Hdfs Configuration by env avaliable HADOOP_HOME, and directory exist")
@Test
void testGetHadoopConfPathFromEnvByHADOOP_HOME1() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, IOException {
String hadoopConfDirEnv = null;
String hadoopHomeEnv = System.getProperty("user.dir");
Path hoemConfPath = Paths.get(hadoopHomeEnv, "conf");
Files.createDirectory(hoemConfPath);
Assertions.assertEquals(hoemConfPath.toString(),
invokeGetHadoopConfPath(hadoopConfDirEnv, hadoopHomeEnv));
Files.delete(hoemConfPath);
}

@DisplayName("test load Hdfs Configuration by env avaliable HADOOP_HOME, and directory not exist")
@Test
void testGetHadoopConfPathFromEnvByHADOOP_HOME2() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException {
String hadoopConfDirEnv = null;
String hadoopHomeEnv = "/not_exist";
Assertions.assertNull(invokeGetHadoopConfPath(hadoopConfDirEnv, hadoopHomeEnv));
}

private String invokeGetHadoopConfPath(String hadoopConfDirEnv,
String hadoopHomeEnv) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Method method =
HdfsStorageOperator.class.getDeclaredMethod("getHadoopConfPath", String.class, String.class);
method.setAccessible(true);
HdfsStorageOperator hdfsStorageOperator = Mockito.mock(HdfsStorageOperator.class);
return (String) method.invoke(hdfsStorageOperator, hadoopConfDirEnv, hadoopHomeEnv);
}

}
5 changes: 4 additions & 1 deletion script/env/dolphinscheduler_env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@


# Never put sensitive config such as database password here in your production environment,
# this file will be sourced everytime a new task is executed.

#DS will auto load hdfs configuration from environment
export HADOOP_HOME=${HADOOP_HOME:-/opt/soft/hadoop}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-/opt/soft/hadoop/etc/hadoop}

# applicationId auto collection related configuration, the following configurations are unnecessary if setting appId.collect=log
#export HADOOP_CLASSPATH=`hadoop classpath`:${DOLPHINSCHEDULER_HOME}/tools/libs/*
Expand Down