Skip to content

Commit

Permalink
[improvement] DataQuality module improve (#14463)
Browse files Browse the repository at this point in the history
(cherry picked from commit 75d29f6)
  • Loading branch information
boy-xiaozhang authored and zhongjiajie committed Jul 20, 2023
1 parent 20c30db commit 3add3f1
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.data.quality;

import static org.apache.dolphinscheduler.data.quality.Constants.SPARK_APP_NAME;
import static org.apache.dolphinscheduler.data.quality.enums.ReaderType.HIVE;

import org.apache.dolphinscheduler.data.quality.config.Config;
import org.apache.dolphinscheduler.data.quality.config.DataQualityConfiguration;
Expand Down Expand Up @@ -64,9 +65,16 @@ public static void main(String[] args) throws Exception {
config.put(SPARK_APP_NAME, dataQualityConfiguration.getName());
}

SparkRuntimeEnvironment sparkRuntimeEnvironment = new SparkRuntimeEnvironment(config);
boolean hiveClientSupport = dataQualityConfiguration
.getReaderConfigs()
.stream()
.anyMatch(line -> line.getType().equalsIgnoreCase(HIVE.name()));

SparkRuntimeEnvironment sparkRuntimeEnvironment = new SparkRuntimeEnvironment(config, hiveClientSupport);

DataQualityContext dataQualityContext =
new DataQualityContext(sparkRuntimeEnvironment, dataQualityConfiguration);

dataQualityContext.execute();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,23 @@ public class SparkRuntimeEnvironment {

private Config config = new Config();

public SparkRuntimeEnvironment(Config config) {
public SparkRuntimeEnvironment(Config config, boolean hiveClientSupport) {
if (config != null) {
this.config = config;
}

this.prepare();
this.prepare(hiveClientSupport);
}

public Config getConfig() {
return this.config;
}

public void prepare() {
sparkSession = SparkSession.builder().config(createSparkConf()).enableHiveSupport().getOrCreate();
public void prepare(boolean hiveClientSupport) {
SparkSession.Builder sparkSessionBuilder = SparkSession.builder().config(createSparkConf());

this.sparkSession = hiveClientSupport ? sparkSessionBuilder.enableHiveSupport().getOrCreate()
: sparkSessionBuilder.getOrCreate();
}

private SparkConf createSparkConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public void init() {
config.put("spark.ui.port", 13000);
config.put("spark.master", "local[4]");

sparkRuntimeEnvironment = new SparkRuntimeEnvironment(new Config(config));
// The hive client is disabled by default, and the local execution of Unit Test is guaranteed to be successful.
sparkRuntimeEnvironment = new SparkRuntimeEnvironment(new Config(config), false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class SparkArgsUtils {

private static final String SPARK_ON_YARN = "yarn";

private static final String DEFAULT_QUALITY_CLASS =
"org.apache.dolphinscheduler.data.quality.DataQualityApplication";

private SparkArgsUtils() {
throw new IllegalStateException("Utility class");
}
Expand All @@ -62,9 +65,9 @@ public static List<String> buildArgs(SparkParameters param) {

ProgramType programType = param.getProgramType();
String mainClass = param.getMainClass();
if (programType != null && programType != ProgramType.PYTHON && StringUtils.isNotEmpty(mainClass)) {
if (programType != null && programType != ProgramType.PYTHON) {
args.add(SparkConstants.MAIN_CLASS);
args.add(mainClass);
args.add(StringUtils.isNotEmpty(mainClass) ? mainClass : DEFAULT_QUALITY_CLASS);
}

int driverCores = param.getDriverCores();
Expand Down

0 comments on commit 3add3f1

Please sign in to comment.